nbd: convert to blkmq

This moves NBD over to using blkmq, which allows us to get rid of the NBD
wide queue lock and the async submit kthread.  We will start with 1 hw
queue for now, but I plan to add multiple tcp connection support in the
future and we'll fix how we set the hwqueue's.

Signed-off-by: Josef Bacik <jbacik@fb.com>
Signed-off-by: Jens Axboe <axboe@fb.com>
diff --git a/drivers/block/nbd.c b/drivers/block/nbd.c
index a9e3980..15e7c67 100644
--- a/drivers/block/nbd.c
+++ b/drivers/block/nbd.c
@@ -34,6 +34,7 @@
 #include <linux/kthread.h>
 #include <linux/types.h>
 #include <linux/debugfs.h>
+#include <linux/blk-mq.h>
 
 #include <asm/uaccess.h>
 #include <asm/types.h>
@@ -45,12 +46,8 @@
 	struct socket * sock;	/* If == NULL, device is not ready, yet	*/
 	int magic;
 
-	spinlock_t queue_lock;
-	struct list_head queue_head;	/* Requests waiting result */
-	struct request *active_req;
-	wait_queue_head_t active_wq;
-	struct list_head waiting_queue;	/* Requests to be sent */
-	wait_queue_head_t waiting_wq;
+	atomic_t outstanding_cmds;
+	struct blk_mq_tag_set tag_set;
 
 	struct mutex tx_lock;
 	struct gendisk *disk;
@@ -71,6 +68,11 @@
 #endif
 };
 
+struct nbd_cmd {
+	struct nbd_device *nbd;
+	struct list_head list;
+};
+
 #if IS_ENABLED(CONFIG_DEBUG_FS)
 static struct dentry *nbd_dbg_dir;
 #endif
@@ -83,18 +85,6 @@
 static struct nbd_device *nbd_dev;
 static int max_part;
 
-/*
- * Use just one lock (or at most 1 per NIC). Two arguments for this:
- * 1. Each NIC is essentially a synchronization point for all servers
- *    accessed through that NIC so there's no need to have more locks
- *    than NICs anyway.
- * 2. More locks lead to more "Dirty cache line bouncing" which will slow
- *    down each lock to the point where they're actually slower than just
- *    a single lock.
- * Thanks go to Jens Axboe and Al Viro for their LKML emails explaining this!
- */
-static DEFINE_SPINLOCK(nbd_lock);
-
 static inline struct device *nbd_to_dev(struct nbd_device *nbd)
 {
 	return disk_to_dev(nbd->disk);
@@ -153,18 +143,17 @@
 	return 0;
 }
 
-static void nbd_end_request(struct nbd_device *nbd, struct request *req)
+static void nbd_end_request(struct nbd_cmd *cmd)
 {
+	struct nbd_device *nbd = cmd->nbd;
+	struct request *req = blk_mq_rq_from_pdu(cmd);
 	int error = req->errors ? -EIO : 0;
-	struct request_queue *q = req->q;
-	unsigned long flags;
 
-	dev_dbg(nbd_to_dev(nbd), "request %p: %s\n", req,
+	dev_dbg(nbd_to_dev(nbd), "request %p: %s\n", cmd,
 		error ? "failed" : "done");
 
-	spin_lock_irqsave(q->queue_lock, flags);
-	__blk_end_request_all(req, error);
-	spin_unlock_irqrestore(q->queue_lock, flags);
+	atomic_dec(&nbd->outstanding_cmds);
+	blk_mq_complete_request(req, error);
 }
 
 /*
@@ -193,7 +182,7 @@
 	struct nbd_device *nbd = (struct nbd_device *)arg;
 	unsigned long flags;
 
-	if (list_empty(&nbd->queue_head))
+	if (!atomic_read(&nbd->outstanding_cmds))
 		return;
 
 	spin_lock_irqsave(&nbd->sock_lock, flags);
@@ -273,8 +262,9 @@
 }
 
 /* always call with the tx_lock held */
-static int nbd_send_req(struct nbd_device *nbd, struct request *req)
+static int nbd_send_cmd(struct nbd_device *nbd, struct nbd_cmd *cmd)
 {
+	struct request *req = blk_mq_rq_from_pdu(cmd);
 	int result, flags;
 	struct nbd_request request;
 	unsigned long size = blk_rq_bytes(req);
@@ -298,10 +288,10 @@
 		request.from = cpu_to_be64((u64)blk_rq_pos(req) << 9);
 		request.len = htonl(size);
 	}
-	memcpy(request.handle, &req, sizeof(req));
+	memcpy(request.handle, &req->tag, sizeof(req->tag));
 
 	dev_dbg(nbd_to_dev(nbd), "request %p: sending control (%s@%llu,%uB)\n",
-		req, nbdcmd_to_ascii(type),
+		cmd, nbdcmd_to_ascii(type),
 		(unsigned long long)blk_rq_pos(req) << 9, blk_rq_bytes(req));
 	result = sock_xmit(nbd, 1, &request, sizeof(request),
 			(type == NBD_CMD_WRITE) ? MSG_MORE : 0);
@@ -323,7 +313,7 @@
 			if (!rq_iter_last(bvec, iter))
 				flags = MSG_MORE;
 			dev_dbg(nbd_to_dev(nbd), "request %p: sending %d bytes data\n",
-				req, bvec.bv_len);
+				cmd, bvec.bv_len);
 			result = sock_send_bvec(nbd, &bvec, flags);
 			if (result <= 0) {
 				dev_err(disk_to_dev(nbd->disk),
@@ -336,29 +326,6 @@
 	return 0;
 }
 
-static struct request *nbd_find_request(struct nbd_device *nbd,
-					struct request *xreq)
-{
-	struct request *req, *tmp;
-	int err;
-
-	err = wait_event_interruptible(nbd->active_wq, nbd->active_req != xreq);
-	if (unlikely(err))
-		return ERR_PTR(err);
-
-	spin_lock(&nbd->queue_lock);
-	list_for_each_entry_safe(req, tmp, &nbd->queue_head, queuelist) {
-		if (req != xreq)
-			continue;
-		list_del_init(&req->queuelist);
-		spin_unlock(&nbd->queue_lock);
-		return req;
-	}
-	spin_unlock(&nbd->queue_lock);
-
-	return ERR_PTR(-ENOENT);
-}
-
 static inline int sock_recv_bvec(struct nbd_device *nbd, struct bio_vec *bvec)
 {
 	int result;
@@ -370,11 +337,14 @@
 }
 
 /* NULL returned = something went wrong, inform userspace */
-static struct request *nbd_read_stat(struct nbd_device *nbd)
+static struct nbd_cmd *nbd_read_stat(struct nbd_device *nbd)
 {
 	int result;
 	struct nbd_reply reply;
-	struct request *req;
+	struct nbd_cmd *cmd;
+	struct request *req = NULL;
+	u16 hwq;
+	int tag;
 
 	reply.magic = 0;
 	result = sock_xmit(nbd, 0, &reply, sizeof(reply), MSG_WAITALL);
@@ -390,25 +360,27 @@
 		return ERR_PTR(-EPROTO);
 	}
 
-	req = nbd_find_request(nbd, *(struct request **)reply.handle);
-	if (IS_ERR(req)) {
-		result = PTR_ERR(req);
-		if (result != -ENOENT)
-			return ERR_PTR(result);
+	memcpy(&tag, reply.handle, sizeof(int));
 
-		dev_err(disk_to_dev(nbd->disk), "Unexpected reply (%p)\n",
-			reply.handle);
-		return ERR_PTR(-EBADR);
+	hwq = blk_mq_unique_tag_to_hwq(tag);
+	if (hwq < nbd->tag_set.nr_hw_queues)
+		req = blk_mq_tag_to_rq(nbd->tag_set.tags[hwq],
+				       blk_mq_unique_tag_to_tag(tag));
+	if (!req || !blk_mq_request_started(req)) {
+		dev_err(disk_to_dev(nbd->disk), "Unexpected reply (%d) %p\n",
+			tag, req);
+		return ERR_PTR(-ENOENT);
 	}
+	cmd = blk_mq_rq_to_pdu(req);
 
 	if (ntohl(reply.error)) {
 		dev_err(disk_to_dev(nbd->disk), "Other side returned error (%d)\n",
 			ntohl(reply.error));
 		req->errors++;
-		return req;
+		return cmd;
 	}
 
-	dev_dbg(nbd_to_dev(nbd), "request %p: got reply\n", req);
+	dev_dbg(nbd_to_dev(nbd), "request %p: got reply\n", cmd);
 	if (rq_data_dir(req) != WRITE) {
 		struct req_iterator iter;
 		struct bio_vec bvec;
@@ -419,13 +391,13 @@
 				dev_err(disk_to_dev(nbd->disk), "Receive data failed (result %d)\n",
 					result);
 				req->errors++;
-				return req;
+				return cmd;
 			}
 			dev_dbg(nbd_to_dev(nbd), "request %p: got %d bytes data\n",
-				req, bvec.bv_len);
+				cmd, bvec.bv_len);
 		}
 	}
-	return req;
+	return cmd;
 }
 
 static ssize_t pid_show(struct device *dev,
@@ -444,7 +416,7 @@
 
 static int nbd_thread_recv(struct nbd_device *nbd, struct block_device *bdev)
 {
-	struct request *req;
+	struct nbd_cmd *cmd;
 	int ret;
 
 	BUG_ON(nbd->magic != NBD_MAGIC);
@@ -460,13 +432,13 @@
 	nbd_size_update(nbd, bdev);
 
 	while (1) {
-		req = nbd_read_stat(nbd);
-		if (IS_ERR(req)) {
-			ret = PTR_ERR(req);
+		cmd = nbd_read_stat(nbd);
+		if (IS_ERR(cmd)) {
+			ret = PTR_ERR(cmd);
 			break;
 		}
 
-		nbd_end_request(nbd, req);
+		nbd_end_request(cmd);
 	}
 
 	nbd_size_clear(nbd, bdev);
@@ -475,44 +447,37 @@
 	return ret;
 }
 
+static void nbd_clear_req(struct request *req, void *data, bool reserved)
+{
+	struct nbd_cmd *cmd;
+
+	if (!blk_mq_request_started(req))
+		return;
+	cmd = blk_mq_rq_to_pdu(req);
+	req->errors++;
+	nbd_end_request(cmd);
+}
+
 static void nbd_clear_que(struct nbd_device *nbd)
 {
-	struct request *req;
-
 	BUG_ON(nbd->magic != NBD_MAGIC);
 
 	/*
 	 * Because we have set nbd->sock to NULL under the tx_lock, all
-	 * modifications to the list must have completed by now.  For
-	 * the same reason, the active_req must be NULL.
-	 *
-	 * As a consequence, we don't need to take the spin lock while
-	 * purging the list here.
+	 * modifications to the list must have completed by now.
 	 */
 	BUG_ON(nbd->sock);
-	BUG_ON(nbd->active_req);
 
-	while (!list_empty(&nbd->queue_head)) {
-		req = list_entry(nbd->queue_head.next, struct request,
-				 queuelist);
-		list_del_init(&req->queuelist);
-		req->errors++;
-		nbd_end_request(nbd, req);
-	}
-
-	while (!list_empty(&nbd->waiting_queue)) {
-		req = list_entry(nbd->waiting_queue.next, struct request,
-				 queuelist);
-		list_del_init(&req->queuelist);
-		req->errors++;
-		nbd_end_request(nbd, req);
-	}
+	blk_mq_tagset_busy_iter(&nbd->tag_set, nbd_clear_req, NULL);
 	dev_dbg(disk_to_dev(nbd->disk), "queue cleared\n");
 }
 
 
-static void nbd_handle_req(struct nbd_device *nbd, struct request *req)
+static void nbd_handle_cmd(struct nbd_cmd *cmd)
 {
+	struct request *req = blk_mq_rq_from_pdu(cmd);
+	struct nbd_device *nbd = cmd->nbd;
+
 	if (req->cmd_type != REQ_TYPE_FS)
 		goto error_out;
 
@@ -526,6 +491,7 @@
 	req->errors = 0;
 
 	mutex_lock(&nbd->tx_lock);
+	nbd->task_send = current;
 	if (unlikely(!nbd->sock)) {
 		mutex_unlock(&nbd->tx_lock);
 		dev_err(disk_to_dev(nbd->disk),
@@ -533,106 +499,34 @@
 		goto error_out;
 	}
 
-	nbd->active_req = req;
-
-	if (nbd->xmit_timeout && list_empty_careful(&nbd->queue_head))
+	if (nbd->xmit_timeout && !atomic_read(&nbd->outstanding_cmds))
 		mod_timer(&nbd->timeout_timer, jiffies + nbd->xmit_timeout);
 
-	if (nbd_send_req(nbd, req) != 0) {
+	atomic_inc(&nbd->outstanding_cmds);
+	if (nbd_send_cmd(nbd, cmd) != 0) {
 		dev_err(disk_to_dev(nbd->disk), "Request send failed\n");
 		req->errors++;
-		nbd_end_request(nbd, req);
-	} else {
-		spin_lock(&nbd->queue_lock);
-		list_add_tail(&req->queuelist, &nbd->queue_head);
-		spin_unlock(&nbd->queue_lock);
+		nbd_end_request(cmd);
 	}
 
-	nbd->active_req = NULL;
+	nbd->task_send = NULL;
 	mutex_unlock(&nbd->tx_lock);
-	wake_up_all(&nbd->active_wq);
 
 	return;
 
 error_out:
 	req->errors++;
-	nbd_end_request(nbd, req);
+	nbd_end_request(cmd);
 }
 
-static int nbd_thread_send(void *data)
+static int nbd_queue_rq(struct blk_mq_hw_ctx *hctx,
+			const struct blk_mq_queue_data *bd)
 {
-	struct nbd_device *nbd = data;
-	struct request *req;
+	struct nbd_cmd *cmd = blk_mq_rq_to_pdu(bd->rq);
 
-	nbd->task_send = current;
-
-	set_user_nice(current, MIN_NICE);
-	while (!kthread_should_stop() || !list_empty(&nbd->waiting_queue)) {
-		/* wait for something to do */
-		wait_event_interruptible(nbd->waiting_wq,
-					 kthread_should_stop() ||
-					 !list_empty(&nbd->waiting_queue));
-
-		/* extract request */
-		if (list_empty(&nbd->waiting_queue))
-			continue;
-
-		spin_lock_irq(&nbd->queue_lock);
-		req = list_entry(nbd->waiting_queue.next, struct request,
-				 queuelist);
-		list_del_init(&req->queuelist);
-		spin_unlock_irq(&nbd->queue_lock);
-
-		/* handle request */
-		nbd_handle_req(nbd, req);
-	}
-
-	nbd->task_send = NULL;
-
-	return 0;
-}
-
-/*
- * We always wait for result of write, for now. It would be nice to make it optional
- * in future
- * if ((rq_data_dir(req) == WRITE) && (nbd->flags & NBD_WRITE_NOCHK))
- *   { printk( "Warning: Ignoring result!\n"); nbd_end_request( req ); }
- */
-
-static void nbd_request_handler(struct request_queue *q)
-		__releases(q->queue_lock) __acquires(q->queue_lock)
-{
-	struct request *req;
-	
-	while ((req = blk_fetch_request(q)) != NULL) {
-		struct nbd_device *nbd;
-
-		spin_unlock_irq(q->queue_lock);
-
-		nbd = req->rq_disk->private_data;
-
-		BUG_ON(nbd->magic != NBD_MAGIC);
-
-		dev_dbg(nbd_to_dev(nbd), "request %p: dequeued (flags=%x)\n",
-			req, req->cmd_type);
-
-		if (unlikely(!nbd->sock)) {
-			dev_err_ratelimited(disk_to_dev(nbd->disk),
-					    "Attempted send on closed socket\n");
-			req->errors++;
-			nbd_end_request(nbd, req);
-			spin_lock_irq(q->queue_lock);
-			continue;
-		}
-
-		spin_lock_irq(&nbd->queue_lock);
-		list_add_tail(&req->queuelist, &nbd->waiting_queue);
-		spin_unlock_irq(&nbd->queue_lock);
-
-		wake_up(&nbd->waiting_wq);
-
-		spin_lock_irq(q->queue_lock);
-	}
+	blk_mq_start_request(bd->rq);
+	nbd_handle_cmd(cmd);
+	return BLK_MQ_RQ_QUEUE_OK;
 }
 
 static int nbd_set_socket(struct nbd_device *nbd, struct socket *sock)
@@ -700,33 +594,37 @@
 {
 	switch (cmd) {
 	case NBD_DISCONNECT: {
-		struct request sreq;
+		struct request *sreq;
 
 		dev_info(disk_to_dev(nbd->disk), "NBD_DISCONNECT\n");
 		if (!nbd->sock)
 			return -EINVAL;
 
+		sreq = blk_mq_alloc_request(bdev_get_queue(bdev), WRITE, 0);
+		if (!sreq)
+			return -ENOMEM;
+
 		mutex_unlock(&nbd->tx_lock);
 		fsync_bdev(bdev);
 		mutex_lock(&nbd->tx_lock);
-		blk_rq_init(NULL, &sreq);
-		sreq.cmd_type = REQ_TYPE_DRV_PRIV;
+		sreq->cmd_type = REQ_TYPE_DRV_PRIV;
 
 		/* Check again after getting mutex back.  */
-		if (!nbd->sock)
+		if (!nbd->sock) {
+			blk_mq_free_request(sreq);
 			return -EINVAL;
+		}
 
 		nbd->disconnect = true;
 
-		nbd_send_req(nbd, &sreq);
+		nbd_send_cmd(nbd, blk_mq_rq_to_pdu(sreq));
+		blk_mq_free_request(sreq);
 		return 0;
 	}
  
 	case NBD_CLEAR_SOCK:
 		sock_shutdown(nbd);
 		nbd_clear_que(nbd);
-		BUG_ON(!list_empty(&nbd->queue_head));
-		BUG_ON(!list_empty(&nbd->waiting_queue));
 		kill_bdev(bdev);
 		return 0;
 
@@ -772,7 +670,6 @@
 		return 0;
 
 	case NBD_DO_IT: {
-		struct task_struct *thread;
 		int error;
 
 		if (nbd->task_recv)
@@ -786,18 +683,9 @@
 
 		nbd_parse_flags(nbd, bdev);
 
-		thread = kthread_run(nbd_thread_send, nbd, "%s",
-				     nbd_name(nbd));
-		if (IS_ERR(thread)) {
-			mutex_lock(&nbd->tx_lock);
-			nbd->task_recv = NULL;
-			return PTR_ERR(thread);
-		}
-
 		nbd_dev_dbg_init(nbd);
 		error = nbd_thread_recv(nbd, bdev);
 		nbd_dev_dbg_close(nbd);
-		kthread_stop(thread);
 
 		mutex_lock(&nbd->tx_lock);
 		nbd->task_recv = NULL;
@@ -825,10 +713,10 @@
 		return 0;
 
 	case NBD_PRINT_DEBUG:
-		dev_info(disk_to_dev(nbd->disk),
-			"next = %p, prev = %p, head = %p\n",
-			nbd->queue_head.next, nbd->queue_head.prev,
-			&nbd->queue_head);
+		/*
+		 * For compatibility only, we no longer keep a list of
+		 * outstanding requests.
+		 */
 		return 0;
 	}
 	return -ENOTTY;
@@ -987,6 +875,23 @@
 
 #endif
 
+static int nbd_init_request(void *data, struct request *rq,
+			    unsigned int hctx_idx, unsigned int request_idx,
+			    unsigned int numa_node)
+{
+	struct nbd_cmd *cmd = blk_mq_rq_to_pdu(rq);
+
+	cmd->nbd = data;
+	INIT_LIST_HEAD(&cmd->list);
+	return 0;
+}
+
+static struct blk_mq_ops nbd_mq_ops = {
+	.queue_rq	= nbd_queue_rq,
+	.map_queue	= blk_mq_map_queue,
+	.init_request	= nbd_init_request,
+};
+
 /*
  * And here should be modules and kernel interface 
  *  (Just smiley confuses emacs :-)
@@ -1035,16 +940,34 @@
 		if (!disk)
 			goto out;
 		nbd_dev[i].disk = disk;
+
+		nbd_dev[i].tag_set.ops = &nbd_mq_ops;
+		nbd_dev[i].tag_set.nr_hw_queues = 1;
+		nbd_dev[i].tag_set.queue_depth = 128;
+		nbd_dev[i].tag_set.numa_node = NUMA_NO_NODE;
+		nbd_dev[i].tag_set.cmd_size = sizeof(struct nbd_cmd);
+		nbd_dev[i].tag_set.flags = BLK_MQ_F_SHOULD_MERGE |
+			BLK_MQ_F_SG_MERGE;
+		nbd_dev[i].tag_set.driver_data = &nbd_dev[i];
+
+		err = blk_mq_alloc_tag_set(&nbd_dev[i].tag_set);
+		if (err) {
+			put_disk(disk);
+			goto out;
+		}
+
 		/*
 		 * The new linux 2.5 block layer implementation requires
 		 * every gendisk to have its very own request_queue struct.
 		 * These structs are big so we dynamically allocate them.
 		 */
-		disk->queue = blk_init_queue(nbd_request_handler, &nbd_lock);
+		disk->queue = blk_mq_init_queue(&nbd_dev[i].tag_set);
 		if (!disk->queue) {
+			blk_mq_free_tag_set(&nbd_dev[i].tag_set);
 			put_disk(disk);
 			goto out;
 		}
+
 		/*
 		 * Tell the block layer that we are not a rotational device
 		 */
@@ -1069,16 +992,12 @@
 	for (i = 0; i < nbds_max; i++) {
 		struct gendisk *disk = nbd_dev[i].disk;
 		nbd_dev[i].magic = NBD_MAGIC;
-		INIT_LIST_HEAD(&nbd_dev[i].waiting_queue);
-		spin_lock_init(&nbd_dev[i].queue_lock);
 		spin_lock_init(&nbd_dev[i].sock_lock);
-		INIT_LIST_HEAD(&nbd_dev[i].queue_head);
 		mutex_init(&nbd_dev[i].tx_lock);
 		init_timer(&nbd_dev[i].timeout_timer);
 		nbd_dev[i].timeout_timer.function = nbd_xmit_timeout;
 		nbd_dev[i].timeout_timer.data = (unsigned long)&nbd_dev[i];
-		init_waitqueue_head(&nbd_dev[i].active_wq);
-		init_waitqueue_head(&nbd_dev[i].waiting_wq);
+		atomic_set(&nbd_dev[i].outstanding_cmds, 0);
 		disk->major = NBD_MAJOR;
 		disk->first_minor = i << part_shift;
 		disk->fops = &nbd_fops;
@@ -1091,6 +1010,7 @@
 	return 0;
 out:
 	while (i--) {
+		blk_mq_free_tag_set(&nbd_dev[i].tag_set);
 		blk_cleanup_queue(nbd_dev[i].disk->queue);
 		put_disk(nbd_dev[i].disk);
 	}
@@ -1110,6 +1030,7 @@
 		if (disk) {
 			del_gendisk(disk);
 			blk_cleanup_queue(disk->queue);
+			blk_mq_free_tag_set(&nbd_dev[i].tag_set);
 			put_disk(disk);
 		}
 	}