dm thin: use per thin device deferred bio lists

The thin-pool previously only had a single deferred_bios list that would
collect bios for all thin devices in the pool.  Split this per-pool
deferred_bios list out to per-thin deferred_bios_list -- doing so
enables increased parallelism when processing deferred bios.  And now
that each thin device has it's own deferred_bios_list we can sort all
bios in the list using logical sector.  The requeue code in error
handling path is also cleaner as a side-effect.

Signed-off-by: Mike Snitzer <snitzer@redhat.com>
Acked-by: Joe Thornber <ejt@redhat.com>
diff --git a/drivers/md/dm-thin.c b/drivers/md/dm-thin.c
index af871fd..08e62ae 100644
--- a/drivers/md/dm-thin.c
+++ b/drivers/md/dm-thin.c
@@ -12,6 +12,7 @@
 #include <linux/dm-io.h>
 #include <linux/dm-kcopyd.h>
 #include <linux/list.h>
+#include <linux/rculist.h>
 #include <linux/init.h>
 #include <linux/module.h>
 #include <linux/slab.h>
@@ -178,12 +179,10 @@
 	unsigned ref_count;
 
 	spinlock_t lock;
-	struct bio_list deferred_bios;
 	struct bio_list deferred_flush_bios;
 	struct list_head prepared_mappings;
 	struct list_head prepared_discards;
-
-	struct bio_list retry_on_resume_list;
+	struct list_head active_thins;
 
 	struct dm_deferred_set *shared_read_ds;
 	struct dm_deferred_set *all_io_ds;
@@ -220,6 +219,7 @@
  * Target context for a thin.
  */
 struct thin_c {
+	struct list_head list;
 	struct dm_dev *pool_dev;
 	struct dm_dev *origin_dev;
 	dm_thin_id dev_id;
@@ -227,6 +227,9 @@
 	struct pool *pool;
 	struct dm_thin_device *td;
 	bool requeue_mode:1;
+	spinlock_t lock;
+	struct bio_list deferred_bio_list;
+	struct bio_list retry_on_resume_list;
 };
 
 /*----------------------------------------------------------------*/
@@ -287,9 +290,9 @@
 	struct pool *pool = tc->pool;
 	unsigned long flags;
 
-	spin_lock_irqsave(&pool->lock, flags);
-	dm_cell_release_no_holder(pool->prison, cell, &pool->deferred_bios);
-	spin_unlock_irqrestore(&pool->lock, flags);
+	spin_lock_irqsave(&tc->lock, flags);
+	dm_cell_release_no_holder(pool->prison, cell, &tc->deferred_bio_list);
+	spin_unlock_irqrestore(&tc->lock, flags);
 
 	wake_worker(pool);
 }
@@ -378,30 +381,22 @@
 
 	bio_list_init(&bios);
 
-	spin_lock_irqsave(&tc->pool->lock, flags);
+	spin_lock_irqsave(&tc->lock, flags);
 	bio_list_merge(&bios, master);
 	bio_list_init(master);
-	spin_unlock_irqrestore(&tc->pool->lock, flags);
+	spin_unlock_irqrestore(&tc->lock, flags);
 
-	while ((bio = bio_list_pop(&bios))) {
-		struct dm_thin_endio_hook *h = dm_per_bio_data(bio, sizeof(struct dm_thin_endio_hook));
-
-		if (h->tc == tc)
-			bio_endio(bio, DM_ENDIO_REQUEUE);
-		else
-			bio_list_add(master, bio);
-	}
+	while ((bio = bio_list_pop(&bios)))
+		bio_endio(bio, DM_ENDIO_REQUEUE);
 }
 
 static void requeue_io(struct thin_c *tc)
 {
-	struct pool *pool = tc->pool;
-
-	requeue_bio_list(tc, &pool->deferred_bios);
-	requeue_bio_list(tc, &pool->retry_on_resume_list);
+	requeue_bio_list(tc, &tc->deferred_bio_list);
+	requeue_bio_list(tc, &tc->retry_on_resume_list);
 }
 
-static void error_retry_list(struct pool *pool)
+static void error_thin_retry_list(struct thin_c *tc)
 {
 	struct bio *bio;
 	unsigned long flags;
@@ -409,15 +404,25 @@
 
 	bio_list_init(&bios);
 
-	spin_lock_irqsave(&pool->lock, flags);
-	bio_list_merge(&bios, &pool->retry_on_resume_list);
-	bio_list_init(&pool->retry_on_resume_list);
-	spin_unlock_irqrestore(&pool->lock, flags);
+	spin_lock_irqsave(&tc->lock, flags);
+	bio_list_merge(&bios, &tc->retry_on_resume_list);
+	bio_list_init(&tc->retry_on_resume_list);
+	spin_unlock_irqrestore(&tc->lock, flags);
 
 	while ((bio = bio_list_pop(&bios)))
 		bio_io_error(bio);
 }
 
+static void error_retry_list(struct pool *pool)
+{
+	struct thin_c *tc;
+
+	rcu_read_lock();
+	list_for_each_entry_rcu(tc, &pool->active_thins, list)
+		error_thin_retry_list(tc);
+	rcu_read_unlock();
+}
+
 /*
  * This section of code contains the logic for processing a thin device's IO.
  * Much of the code depends on pool object resources (lists, workqueues, etc)
@@ -608,9 +613,9 @@
 	struct pool *pool = tc->pool;
 	unsigned long flags;
 
-	spin_lock_irqsave(&pool->lock, flags);
-	cell_release(pool, cell, &pool->deferred_bios);
-	spin_unlock_irqrestore(&tc->pool->lock, flags);
+	spin_lock_irqsave(&tc->lock, flags);
+	cell_release(pool, cell, &tc->deferred_bio_list);
+	spin_unlock_irqrestore(&tc->lock, flags);
 
 	wake_worker(pool);
 }
@@ -623,9 +628,9 @@
 	struct pool *pool = tc->pool;
 	unsigned long flags;
 
-	spin_lock_irqsave(&pool->lock, flags);
-	cell_release_no_holder(pool, cell, &pool->deferred_bios);
-	spin_unlock_irqrestore(&pool->lock, flags);
+	spin_lock_irqsave(&tc->lock, flags);
+	cell_release_no_holder(pool, cell, &tc->deferred_bio_list);
+	spin_unlock_irqrestore(&tc->lock, flags);
 
 	wake_worker(pool);
 }
@@ -1001,12 +1006,11 @@
 {
 	struct dm_thin_endio_hook *h = dm_per_bio_data(bio, sizeof(struct dm_thin_endio_hook));
 	struct thin_c *tc = h->tc;
-	struct pool *pool = tc->pool;
 	unsigned long flags;
 
-	spin_lock_irqsave(&pool->lock, flags);
-	bio_list_add(&pool->retry_on_resume_list, bio);
-	spin_unlock_irqrestore(&pool->lock, flags);
+	spin_lock_irqsave(&tc->lock, flags);
+	bio_list_add(&tc->retry_on_resume_list, bio);
+	spin_unlock_irqrestore(&tc->lock, flags);
 }
 
 static bool should_error_unserviceable_bio(struct pool *pool)
@@ -1363,38 +1367,36 @@
 	       jiffies > pool->last_commit_jiffies + COMMIT_PERIOD;
 }
 
-static void process_deferred_bios(struct pool *pool)
+static void process_thin_deferred_bios(struct thin_c *tc)
 {
+	struct pool *pool = tc->pool;
 	unsigned long flags;
 	struct bio *bio;
 	struct bio_list bios;
 
+	if (tc->requeue_mode) {
+		requeue_bio_list(tc, &tc->deferred_bio_list);
+		return;
+	}
+
 	bio_list_init(&bios);
 
-	spin_lock_irqsave(&pool->lock, flags);
-	bio_list_merge(&bios, &pool->deferred_bios);
-	bio_list_init(&pool->deferred_bios);
-	spin_unlock_irqrestore(&pool->lock, flags);
+	spin_lock_irqsave(&tc->lock, flags);
+	bio_list_merge(&bios, &tc->deferred_bio_list);
+	bio_list_init(&tc->deferred_bio_list);
+	spin_unlock_irqrestore(&tc->lock, flags);
 
 	while ((bio = bio_list_pop(&bios))) {
-		struct dm_thin_endio_hook *h = dm_per_bio_data(bio, sizeof(struct dm_thin_endio_hook));
-		struct thin_c *tc = h->tc;
-
-		if (tc->requeue_mode) {
-			bio_endio(bio, DM_ENDIO_REQUEUE);
-			continue;
-		}
-
 		/*
 		 * If we've got no free new_mapping structs, and processing
 		 * this bio might require one, we pause until there are some
 		 * prepared mappings to process.
 		 */
 		if (ensure_next_mapping(pool)) {
-			spin_lock_irqsave(&pool->lock, flags);
-			bio_list_add(&pool->deferred_bios, bio);
-			bio_list_merge(&pool->deferred_bios, &bios);
-			spin_unlock_irqrestore(&pool->lock, flags);
+			spin_lock_irqsave(&tc->lock, flags);
+			bio_list_add(&tc->deferred_bio_list, bio);
+			bio_list_merge(&tc->deferred_bio_list, &bios);
+			spin_unlock_irqrestore(&tc->lock, flags);
 			break;
 		}
 
@@ -1403,6 +1405,19 @@
 		else
 			pool->process_bio(tc, bio);
 	}
+}
+
+static void process_deferred_bios(struct pool *pool)
+{
+	unsigned long flags;
+	struct bio *bio;
+	struct bio_list bios;
+	struct thin_c *tc;
+
+	rcu_read_lock();
+	list_for_each_entry_rcu(tc, &pool->active_thins, list)
+		process_thin_deferred_bios(tc);
+	rcu_read_unlock();
 
 	/*
 	 * If there are any deferred flush bios, we must commit
@@ -1634,9 +1649,9 @@
 	unsigned long flags;
 	struct pool *pool = tc->pool;
 
-	spin_lock_irqsave(&pool->lock, flags);
-	bio_list_add(&pool->deferred_bios, bio);
-	spin_unlock_irqrestore(&pool->lock, flags);
+	spin_lock_irqsave(&tc->lock, flags);
+	bio_list_add(&tc->deferred_bio_list, bio);
+	spin_unlock_irqrestore(&tc->lock, flags);
 
 	wake_worker(pool);
 }
@@ -1767,10 +1782,19 @@
 	return bdi_congested(&q->backing_dev_info, bdi_bits);
 }
 
-static void __requeue_bios(struct pool *pool)
+static void requeue_bios(struct pool *pool)
 {
-	bio_list_merge(&pool->deferred_bios, &pool->retry_on_resume_list);
-	bio_list_init(&pool->retry_on_resume_list);
+	unsigned long flags;
+	struct thin_c *tc;
+
+	rcu_read_lock();
+	list_for_each_entry_rcu(tc, &pool->active_thins, list) {
+		spin_lock_irqsave(&tc->lock, flags);
+		bio_list_merge(&tc->deferred_bio_list, &tc->retry_on_resume_list);
+		bio_list_init(&tc->retry_on_resume_list);
+		spin_unlock_irqrestore(&tc->lock, flags);
+	}
+	rcu_read_unlock();
 }
 
 /*----------------------------------------------------------------
@@ -1951,12 +1975,11 @@
 	INIT_WORK(&pool->worker, do_worker);
 	INIT_DELAYED_WORK(&pool->waker, do_waker);
 	spin_lock_init(&pool->lock);
-	bio_list_init(&pool->deferred_bios);
 	bio_list_init(&pool->deferred_flush_bios);
 	INIT_LIST_HEAD(&pool->prepared_mappings);
 	INIT_LIST_HEAD(&pool->prepared_discards);
+	INIT_LIST_HEAD(&pool->active_thins);
 	pool->low_water_triggered = false;
-	bio_list_init(&pool->retry_on_resume_list);
 
 	pool->shared_read_ds = dm_deferred_set_create();
 	if (!pool->shared_read_ds) {
@@ -2501,8 +2524,8 @@
 
 	spin_lock_irqsave(&pool->lock, flags);
 	pool->low_water_triggered = false;
-	__requeue_bios(pool);
 	spin_unlock_irqrestore(&pool->lock, flags);
+	requeue_bios(pool);
 
 	do_waker(&pool->waker.work);
 }
@@ -2962,6 +2985,12 @@
 static void thin_dtr(struct dm_target *ti)
 {
 	struct thin_c *tc = ti->private;
+	unsigned long flags;
+
+	spin_lock_irqsave(&tc->pool->lock, flags);
+	list_del_rcu(&tc->list);
+	spin_unlock_irqrestore(&tc->pool->lock, flags);
+	synchronize_rcu();
 
 	mutex_lock(&dm_thin_pool_table.mutex);
 
@@ -3008,6 +3037,9 @@
 		r = -ENOMEM;
 		goto out_unlock;
 	}
+	spin_lock_init(&tc->lock);
+	bio_list_init(&tc->deferred_bio_list);
+	bio_list_init(&tc->retry_on_resume_list);
 
 	if (argc == 3) {
 		r = dm_get_device(ti, argv[2], FMODE_READ, &origin_dev);
@@ -3079,6 +3111,17 @@
 
 	mutex_unlock(&dm_thin_pool_table.mutex);
 
+	spin_lock(&tc->pool->lock);
+	list_add_tail_rcu(&tc->list, &tc->pool->active_thins);
+	spin_unlock(&tc->pool->lock);
+	/*
+	 * This synchronize_rcu() call is needed here otherwise we risk a
+	 * wake_worker() call finding no bios to process (because the newly
+	 * added tc isn't yet visible).  So this reduces latency since we
+	 * aren't then dependent on the periodic commit to wake_worker().
+	 */
+	synchronize_rcu();
+
 	return 0;
 
 bad_target_max_io_len: