blk-cgroup: Allow sleeping while dynamically allocating a group

Currently, all the cfq_group or throtl_group allocations happen while
we are holding ->queue_lock and sleeping is not allowed.

Soon, we will move to per cpu stats and also need to allocate the
per group stats. As one can not call alloc_percpu() from atomic
context as it can sleep, we need to drop ->queue_lock, allocate the
group, retake the lock and continue processing.

In throttling code, I check the queue DEAD flag again to make sure
that driver did not call blk_cleanup_queue() in the mean time.

Signed-off-by: Vivek Goyal <vgoyal@redhat.com>
Signed-off-by: Jens Axboe <jaxboe@fusionio.com>
diff --git a/block/blk-throttle.c b/block/blk-throttle.c
index fa9a900..c201967 100644
--- a/block/blk-throttle.c
+++ b/block/blk-throttle.c
@@ -188,8 +188,40 @@
 	td->nr_undestroyed_grps++;
 }
 
-static struct throtl_grp * throtl_find_alloc_tg(struct throtl_data *td,
-			struct blkio_cgroup *blkcg)
+static void throtl_init_add_tg_lists(struct throtl_data *td,
+			struct throtl_grp *tg, struct blkio_cgroup *blkcg)
+{
+	struct backing_dev_info *bdi = &td->queue->backing_dev_info;
+	unsigned int major, minor;
+
+	/* Add group onto cgroup list */
+	sscanf(dev_name(bdi->dev), "%u:%u", &major, &minor);
+	blkiocg_add_blkio_group(blkcg, &tg->blkg, (void *)td,
+				MKDEV(major, minor), BLKIO_POLICY_THROTL);
+
+	tg->bps[READ] = blkcg_get_read_bps(blkcg, tg->blkg.dev);
+	tg->bps[WRITE] = blkcg_get_write_bps(blkcg, tg->blkg.dev);
+	tg->iops[READ] = blkcg_get_read_iops(blkcg, tg->blkg.dev);
+	tg->iops[WRITE] = blkcg_get_write_iops(blkcg, tg->blkg.dev);
+
+	throtl_add_group_to_td_list(td, tg);
+}
+
+/* Should be called without queue lock and outside of rcu period */
+static struct throtl_grp *throtl_alloc_tg(struct throtl_data *td)
+{
+	struct throtl_grp *tg = NULL;
+
+	tg = kzalloc_node(sizeof(*tg), GFP_ATOMIC, td->queue->node);
+	if (!tg)
+		return NULL;
+
+	throtl_init_group(tg);
+	return tg;
+}
+
+static struct
+throtl_grp *throtl_find_tg(struct throtl_data *td, struct blkio_cgroup *blkcg)
 {
 	struct throtl_grp *tg = NULL;
 	void *key = td;
@@ -197,12 +229,6 @@
 	unsigned int major, minor;
 
 	/*
-	 * TODO: Speed up blkiocg_lookup_group() by maintaining a radix
-	 * tree of blkg (instead of traversing through hash list all
-	 * the time.
-	 */
-
-	/*
 	 * This is the common case when there are no blkio cgroups.
  	 * Avoid lookup in this case
  	 */
@@ -215,43 +241,83 @@
 	if (tg && !tg->blkg.dev && bdi->dev && dev_name(bdi->dev)) {
 		sscanf(dev_name(bdi->dev), "%u:%u", &major, &minor);
 		tg->blkg.dev = MKDEV(major, minor);
-		goto done;
 	}
 
-	if (tg)
-		goto done;
-
-	tg = kzalloc_node(sizeof(*tg), GFP_ATOMIC, td->queue->node);
-	if (!tg)
-		goto done;
-
-	throtl_init_group(tg);
-
-	/* Add group onto cgroup list */
-	sscanf(dev_name(bdi->dev), "%u:%u", &major, &minor);
-	blkiocg_add_blkio_group(blkcg, &tg->blkg, (void *)td,
-				MKDEV(major, minor), BLKIO_POLICY_THROTL);
-
-	tg->bps[READ] = blkcg_get_read_bps(blkcg, tg->blkg.dev);
-	tg->bps[WRITE] = blkcg_get_write_bps(blkcg, tg->blkg.dev);
-	tg->iops[READ] = blkcg_get_read_iops(blkcg, tg->blkg.dev);
-	tg->iops[WRITE] = blkcg_get_write_iops(blkcg, tg->blkg.dev);
-
-	throtl_add_group_to_td_list(td, tg);
-done:
 	return tg;
 }
 
+/*
+ * This function returns with queue lock unlocked in case of error, like
+ * request queue is no more
+ */
 static struct throtl_grp * throtl_get_tg(struct throtl_data *td)
 {
-	struct throtl_grp *tg = NULL;
+	struct throtl_grp *tg = NULL, *__tg = NULL;
 	struct blkio_cgroup *blkcg;
+	struct request_queue *q = td->queue;
 
 	rcu_read_lock();
 	blkcg = task_blkio_cgroup(current);
-	tg = throtl_find_alloc_tg(td, blkcg);
-	if (!tg)
+	tg = throtl_find_tg(td, blkcg);
+	if (tg) {
+		rcu_read_unlock();
+		return tg;
+	}
+
+	/*
+	 * Need to allocate a group. Allocation of group also needs allocation
+	 * of per cpu stats which in-turn takes a mutex() and can block. Hence
+	 * we need to drop rcu lock and queue_lock before we call alloc
+	 *
+	 * Take the request queue reference to make sure queue does not
+	 * go away once we return from allocation.
+	 */
+	blk_get_queue(q);
+	rcu_read_unlock();
+	spin_unlock_irq(q->queue_lock);
+
+	tg = throtl_alloc_tg(td);
+	/*
+	 * We might have slept in group allocation. Make sure queue is not
+	 * dead
+	 */
+	if (unlikely(test_bit(QUEUE_FLAG_DEAD, &q->queue_flags))) {
+		blk_put_queue(q);
+		if (tg)
+			kfree(tg);
+
+		return ERR_PTR(-ENODEV);
+	}
+	blk_put_queue(q);
+
+	/* Group allocated and queue is still alive. take the lock */
+	spin_lock_irq(q->queue_lock);
+
+	/*
+	 * Initialize the new group. After sleeping, read the blkcg again.
+	 */
+	rcu_read_lock();
+	blkcg = task_blkio_cgroup(current);
+
+	/*
+	 * If some other thread already allocated the group while we were
+	 * not holding queue lock, free up the group
+	 */
+	__tg = throtl_find_tg(td, blkcg);
+
+	if (__tg) {
+		kfree(tg);
+		rcu_read_unlock();
+		return __tg;
+	}
+
+	/* Group allocation failed. Account the IO to root group */
+	if (!tg) {
 		tg = &td->root_tg;
+		return tg;
+	}
+
+	throtl_init_add_tg_lists(td, tg, blkcg);
 	rcu_read_unlock();
 	return tg;
 }
@@ -1014,6 +1080,15 @@
 	spin_lock_irq(q->queue_lock);
 	tg = throtl_get_tg(td);
 
+	if (IS_ERR(tg)) {
+		if (PTR_ERR(tg)	== -ENODEV) {
+			/*
+			 * Queue is gone. No queue lock held here.
+			 */
+			return -ENODEV;
+		}
+	}
+
 	if (tg->nr_queued[rw]) {
 		/*
 		 * There is already another bio queued in same dir. No