vhost: lockless enqueuing

We use spinlock to synchronize the work list now which may cause
unnecessary contentions. So this patch switch to use llist to remove
this contention. Pktgen tests shows about 5% improvement:

Before:
~1300000 pps
After:
~1370000 pps

Signed-off-by: Jason Wang <jasowang@redhat.com>
Reviewed-by: Michael S. Tsirkin <mst@redhat.com>
Signed-off-by: Michael S. Tsirkin <mst@redhat.com>
diff --git a/drivers/vhost/vhost.c b/drivers/vhost/vhost.c
index 73dd16d..0061a7b 100644
--- a/drivers/vhost/vhost.c
+++ b/drivers/vhost/vhost.c
@@ -168,7 +168,7 @@
 
 void vhost_work_init(struct vhost_work *work, vhost_work_fn_t fn)
 {
-	INIT_LIST_HEAD(&work->node);
+	clear_bit(VHOST_WORK_QUEUED, &work->flags);
 	work->fn = fn;
 	init_waitqueue_head(&work->done);
 }
@@ -246,15 +246,16 @@
 
 void vhost_work_queue(struct vhost_dev *dev, struct vhost_work *work)
 {
-	unsigned long flags;
+	if (!dev->worker)
+		return;
 
-	spin_lock_irqsave(&dev->work_lock, flags);
-	if (list_empty(&work->node)) {
-		list_add_tail(&work->node, &dev->work_list);
-		spin_unlock_irqrestore(&dev->work_lock, flags);
+	if (!test_and_set_bit(VHOST_WORK_QUEUED, &work->flags)) {
+		/* We can only add the work to the list after we're
+		 * sure it was not in the list.
+		 */
+		smp_mb();
+		llist_add(&work->node, &dev->work_list);
 		wake_up_process(dev->worker);
-	} else {
-		spin_unlock_irqrestore(&dev->work_lock, flags);
 	}
 }
 EXPORT_SYMBOL_GPL(vhost_work_queue);
@@ -262,7 +263,7 @@
 /* A lockless hint for busy polling code to exit the loop */
 bool vhost_has_work(struct vhost_dev *dev)
 {
-	return !list_empty(&dev->work_list);
+	return !llist_empty(&dev->work_list);
 }
 EXPORT_SYMBOL_GPL(vhost_has_work);
 
@@ -305,7 +306,8 @@
 static int vhost_worker(void *data)
 {
 	struct vhost_dev *dev = data;
-	struct vhost_work *work = NULL;
+	struct vhost_work *work, *work_next;
+	struct llist_node *node;
 	mm_segment_t oldfs = get_fs();
 
 	set_fs(USER_DS);
@@ -315,29 +317,25 @@
 		/* mb paired w/ kthread_stop */
 		set_current_state(TASK_INTERRUPTIBLE);
 
-		spin_lock_irq(&dev->work_lock);
-
 		if (kthread_should_stop()) {
-			spin_unlock_irq(&dev->work_lock);
 			__set_current_state(TASK_RUNNING);
 			break;
 		}
-		if (!list_empty(&dev->work_list)) {
-			work = list_first_entry(&dev->work_list,
-						struct vhost_work, node);
-			list_del_init(&work->node);
-		} else
-			work = NULL;
-		spin_unlock_irq(&dev->work_lock);
 
-		if (work) {
+		node = llist_del_all(&dev->work_list);
+		if (!node)
+			schedule();
+
+		node = llist_reverse_order(node);
+		/* make sure flag is seen after deletion */
+		smp_wmb();
+		llist_for_each_entry_safe(work, work_next, node, node) {
+			clear_bit(VHOST_WORK_QUEUED, &work->flags);
 			__set_current_state(TASK_RUNNING);
 			work->fn(work);
 			if (need_resched())
 				schedule();
-		} else
-			schedule();
-
+		}
 	}
 	unuse_mm(dev->mm);
 	set_fs(oldfs);
@@ -398,9 +396,9 @@
 	dev->log_file = NULL;
 	dev->memory = NULL;
 	dev->mm = NULL;
-	spin_lock_init(&dev->work_lock);
-	INIT_LIST_HEAD(&dev->work_list);
 	dev->worker = NULL;
+	init_llist_head(&dev->work_list);
+
 
 	for (i = 0; i < dev->nvqs; ++i) {
 		vq = dev->vqs[i];
@@ -566,7 +564,7 @@
 	/* No one will access memory at this point */
 	kvfree(dev->memory);
 	dev->memory = NULL;
-	WARN_ON(!list_empty(&dev->work_list));
+	WARN_ON(!llist_empty(&dev->work_list));
 	if (dev->worker) {
 		kthread_stop(dev->worker);
 		dev->worker = NULL;
diff --git a/drivers/vhost/vhost.h b/drivers/vhost/vhost.h
index d36d8be..6690e64 100644
--- a/drivers/vhost/vhost.h
+++ b/drivers/vhost/vhost.h
@@ -15,13 +15,15 @@
 struct vhost_work;
 typedef void (*vhost_work_fn_t)(struct vhost_work *work);
 
+#define VHOST_WORK_QUEUED 1
 struct vhost_work {
-	struct list_head	  node;
+	struct llist_node	  node;
 	vhost_work_fn_t		  fn;
 	wait_queue_head_t	  done;
 	int			  flushing;
 	unsigned		  queue_seq;
 	unsigned		  done_seq;
+	unsigned long		  flags;
 };
 
 /* Poll a file (eventfd or socket) */
@@ -126,8 +128,7 @@
 	int nvqs;
 	struct file *log_file;
 	struct eventfd_ctx *log_ctx;
-	spinlock_t work_lock;
-	struct list_head work_list;
+	struct llist_head work_list;
 	struct task_struct *worker;
 };