dm thin: remap the bios in a cell immediately

This use of direct submission in process_prepared_mapping() reduces
latency for submitting bios in a cell by avoiding adding those bios to
the deferred list and waiting for the next iteration of the worker.

But this direct submission exposes the potential for a race between
releasing a cell and incrementing deferred set.  Fix this by introducing
dm_cell_visit_release() and refactoring inc_remap_and_issue_cell()
accordingly.

Signed-off-by: Joe Thornber <ejt@redhat.com>
Signed-off-by: Mike Snitzer <snitzer@redhat.com>
diff --git a/drivers/md/dm-thin.c b/drivers/md/dm-thin.c
index 912d7f4..5036d4b 100644
--- a/drivers/md/dm-thin.c
+++ b/drivers/md/dm-thin.c
@@ -343,6 +343,15 @@
 	dm_bio_prison_free_cell(pool->prison, cell);
 }
 
+static void cell_visit_release(struct pool *pool,
+			       void (*fn)(void *, struct dm_bio_prison_cell *),
+			       void *context,
+			       struct dm_bio_prison_cell *cell)
+{
+	dm_cell_visit_release(pool->prison, fn, context, cell);
+	dm_bio_prison_free_cell(pool->prison, cell);
+}
+
 static void cell_release_no_holder(struct pool *pool,
 				   struct dm_bio_prison_cell *cell,
 				   struct bio_list *bios)
@@ -697,22 +706,8 @@
  */
 
 /*
- * This sends the bios in the cell back to the deferred_bios list.
- */
-static void cell_defer(struct thin_c *tc, struct dm_bio_prison_cell *cell)
-{
-	struct pool *pool = tc->pool;
-	unsigned long flags;
-
-	spin_lock_irqsave(&tc->lock, flags);
-	cell_release(pool, cell, &tc->deferred_bio_list);
-	spin_unlock_irqrestore(&tc->lock, flags);
-
-	wake_worker(pool);
-}
-
-/*
- * Same as cell_defer above, except it omits the original holder of the cell.
+ * This sends the bios in the cell, except the original holder, back
+ * to the deferred_bios list.
  */
 static void cell_defer_no_holder(struct thin_c *tc, struct dm_bio_prison_cell *cell)
 {
@@ -728,24 +723,58 @@
 
 static void thin_defer_bio(struct thin_c *tc, struct bio *bio);
 
+struct remap_info {
+	struct thin_c *tc;
+	struct bio_list defer_bios;
+	struct bio_list issue_bios;
+};
+
+static void __inc_remap_and_issue_cell(void *context,
+				       struct dm_bio_prison_cell *cell)
+{
+	struct remap_info *info = context;
+	struct bio *bio;
+
+	while ((bio = bio_list_pop(&cell->bios))) {
+		if (bio->bi_rw & (REQ_DISCARD | REQ_FLUSH | REQ_FUA))
+			bio_list_add(&info->defer_bios, bio);
+		else {
+			inc_all_io_entry(info->tc->pool, bio);
+
+			/*
+			 * We can't issue the bios with the bio prison lock
+			 * held, so we add them to a list to issue on
+			 * return from this function.
+			 */
+			bio_list_add(&info->issue_bios, bio);
+		}
+	}
+}
+
 static void inc_remap_and_issue_cell(struct thin_c *tc,
 				     struct dm_bio_prison_cell *cell,
 				     dm_block_t block)
 {
 	struct bio *bio;
-	struct bio_list bios;
+	struct remap_info info;
 
-	bio_list_init(&bios);
-	cell_release_no_holder(tc->pool, cell, &bios);
+	info.tc = tc;
+	bio_list_init(&info.defer_bios);
+	bio_list_init(&info.issue_bios);
 
-	while ((bio = bio_list_pop(&bios))) {
-		if (bio->bi_rw & (REQ_DISCARD | REQ_FLUSH | REQ_FUA))
-			thin_defer_bio(tc, bio);
-		else {
-			inc_all_io_entry(tc->pool, bio);
-			remap_and_issue(tc, bio, block);
-		}
-	}
+	/*
+	 * We have to be careful to inc any bios we're about to issue
+	 * before the cell is released, and avoid a race with new bios
+	 * being added to the cell.
+	 */
+	cell_visit_release(tc->pool, __inc_remap_and_issue_cell,
+			   &info, cell);
+
+	while ((bio = bio_list_pop(&info.defer_bios)))
+		thin_defer_bio(tc, bio);
+
+	while ((bio = bio_list_pop(&info.issue_bios)))
+		remap_and_issue(info.tc, bio, block);
 }
 
 static void process_prepared_mapping_fail(struct dm_thin_new_mapping *m)
@@ -796,10 +825,13 @@
 	 * the bios in the cell.
 	 */
 	if (bio) {
-		cell_defer_no_holder(tc, m->cell);
+		inc_remap_and_issue_cell(tc, m->cell, m->data_block);
 		bio_endio(bio, 0);
-	} else
-		cell_defer(tc, m->cell);
+	} else {
+		inc_all_io_entry(tc->pool, m->cell->holder);
+		remap_and_issue(tc, m->cell->holder, m->data_block);
+		inc_remap_and_issue_cell(tc, m->cell, m->data_block);
+	}
 
 out:
 	list_del(&m->list);