dm thin: fix rcu_read_lock being held in code that can sleep

Commit c140e1c4e23 ("dm thin: use per thin device deferred bio lists")
introduced the use of an rculist for all active thin devices.  The use
of rcu_read_lock() in process_deferred_bios() can result in a BUG if a
dm_bio_prison_cell must be allocated as a side-effect of bio_detain():

 BUG: sleeping function called from invalid context at mm/mempool.c:203
 in_atomic(): 1, irqs_disabled(): 0, pid: 6, name: kworker/u8:0
 3 locks held by kworker/u8:0/6:
   #0:  ("dm-" "thin"){.+.+..}, at: [<ffffffff8106be42>] process_one_work+0x192/0x550
   #1:  ((&pool->worker)){+.+...}, at: [<ffffffff8106be42>] process_one_work+0x192/0x550
   #2:  (rcu_read_lock){.+.+..}, at: [<ffffffff816360b5>] do_worker+0x5/0x4d0

We can't process deferred bios with the rcu lock held, since
dm_bio_prison_cell allocation may block if the bio-prison's cell mempool
is exhausted.

To fix:

- Introduce a refcount and completion field to each thin_c

- Add thin_get/put methods for adjusting the refcount.  If the refcount
  hits zero then the completion is triggered.

- Initialise refcount to 1 when creating thin_c

- When iterating the active_thins list we thin_get() whilst the rcu
  lock is held.

- After the rcu lock is dropped we process the deferred bios for that
  thin.

- When destroying a thin_c we thin_put() and then wait for the
  completion -- to avoid a race between the worker thread iterating
  from that thin_c and destroying the thin_c.

Signed-off-by: Joe Thornber <ejt@redhat.com>
Signed-off-by: Mike Snitzer <snitzer@redhat.com>
diff --git a/drivers/md/dm-thin.c b/drivers/md/dm-thin.c
index ae5fd0b..28fc282 100644
--- a/drivers/md/dm-thin.c
+++ b/drivers/md/dm-thin.c
@@ -232,6 +232,13 @@
 	struct bio_list deferred_bio_list;
 	struct bio_list retry_on_resume_list;
 	struct rb_root sort_bio_list; /* sorted list of deferred bios */
+
+	/*
+	 * Ensures the thin is not destroyed until the worker has finished
+	 * iterating the active_thins list.
+	 */
+	atomic_t refcount;
+	struct completion can_destroy;
 };
 
 /*----------------------------------------------------------------*/
@@ -1486,6 +1493,45 @@
 	blk_finish_plug(&plug);
 }
 
+static void thin_get(struct thin_c *tc);
+static void thin_put(struct thin_c *tc);
+
+/*
+ * We can't hold rcu_read_lock() around code that can block.  So we
+ * find a thin with the rcu lock held; bump a refcount; then drop
+ * the lock.
+ */
+static struct thin_c *get_first_thin(struct pool *pool)
+{
+	struct thin_c *tc = NULL;
+
+	rcu_read_lock();
+	if (!list_empty(&pool->active_thins)) {
+		tc = list_entry_rcu(pool->active_thins.next, struct thin_c, list);
+		thin_get(tc);
+	}
+	rcu_read_unlock();
+
+	return tc;
+}
+
+static struct thin_c *get_next_thin(struct pool *pool, struct thin_c *tc)
+{
+	struct thin_c *old_tc = tc;
+
+	rcu_read_lock();
+	list_for_each_entry_continue_rcu(tc, &pool->active_thins, list) {
+		thin_get(tc);
+		thin_put(old_tc);
+		rcu_read_unlock();
+		return tc;
+	}
+	thin_put(old_tc);
+	rcu_read_unlock();
+
+	return NULL;
+}
+
 static void process_deferred_bios(struct pool *pool)
 {
 	unsigned long flags;
@@ -1493,10 +1539,11 @@
 	struct bio_list bios;
 	struct thin_c *tc;
 
-	rcu_read_lock();
-	list_for_each_entry_rcu(tc, &pool->active_thins, list)
+	tc = get_first_thin(pool);
+	while (tc) {
 		process_thin_deferred_bios(tc);
-	rcu_read_unlock();
+		tc = get_next_thin(pool, tc);
+	}
 
 	/*
 	 * If there are any deferred flush bios, we must commit
@@ -3061,11 +3108,25 @@
 /*----------------------------------------------------------------
  * Thin target methods
  *--------------------------------------------------------------*/
+static void thin_get(struct thin_c *tc)
+{
+	atomic_inc(&tc->refcount);
+}
+
+static void thin_put(struct thin_c *tc)
+{
+	if (atomic_dec_and_test(&tc->refcount))
+		complete(&tc->can_destroy);
+}
+
 static void thin_dtr(struct dm_target *ti)
 {
 	struct thin_c *tc = ti->private;
 	unsigned long flags;
 
+	thin_put(tc);
+	wait_for_completion(&tc->can_destroy);
+
 	spin_lock_irqsave(&tc->pool->lock, flags);
 	list_del_rcu(&tc->list);
 	spin_unlock_irqrestore(&tc->pool->lock, flags);
@@ -3192,6 +3253,9 @@
 
 	mutex_unlock(&dm_thin_pool_table.mutex);
 
+	atomic_set(&tc->refcount, 1);
+	init_completion(&tc->can_destroy);
+
 	spin_lock_irqsave(&tc->pool->lock, flags);
 	list_add_tail_rcu(&tc->list, &tc->pool->active_thins);
 	spin_unlock_irqrestore(&tc->pool->lock, flags);