Btrfs: Worker thread optimizations

This changes the worker thread pool to maintain a list of idle threads,
avoiding a complex search for a good thread to wake up.

Threads have two states:

idle - we try to reuse the last thread used in hopes of improving the batching
ratios

busy - each time a new work item is added to a busy task, the task is
rotated to the end of the line.

Signed-off-by: Chris Mason <chris.mason@oracle.com>
diff --git a/fs/btrfs/async-thread.c b/fs/btrfs/async-thread.c
index 8b9e2cf..8d4cc46 100644
--- a/fs/btrfs/async-thread.c
+++ b/fs/btrfs/async-thread.c
@@ -34,6 +34,9 @@
  * One of these is allocated per thread.
  */
 struct btrfs_worker_thread {
+	/* pool we belong to */
+	struct btrfs_workers *workers;
+
 	/* list of struct btrfs_work that are waiting for service */
 	struct list_head pending;
 
@@ -51,9 +54,45 @@
 
 	/* set to non-zero when this thread is already awake and kicking */
 	int working;
+
+	/* are we currently idle */
+	int idle;
 };
 
 /*
+ * helper function to move a thread onto the idle list after it
+ * has finished some requests.
+ */
+static void check_idle_worker(struct btrfs_worker_thread *worker)
+{
+	if (!worker->idle && atomic_read(&worker->num_pending) <
+	    worker->workers->idle_thresh / 2) {
+		unsigned long flags;
+		spin_lock_irqsave(&worker->workers->lock, flags);
+		worker->idle = 1;
+		list_move(&worker->worker_list, &worker->workers->idle_list);
+		spin_unlock_irqrestore(&worker->workers->lock, flags);
+	}
+}
+
+/*
+ * helper function to move a thread off the idle list after new
+ * pending work is added.
+ */
+static void check_busy_worker(struct btrfs_worker_thread *worker)
+{
+	if (worker->idle && atomic_read(&worker->num_pending) >=
+	    worker->workers->idle_thresh) {
+		unsigned long flags;
+		spin_lock_irqsave(&worker->workers->lock, flags);
+		worker->idle = 0;
+		list_move_tail(&worker->worker_list,
+			       &worker->workers->worker_list);
+		spin_unlock_irqrestore(&worker->workers->lock, flags);
+	}
+}
+
+/*
  * main loop for servicing work items
  */
 static int worker_loop(void *arg)
@@ -76,6 +115,7 @@
 
 			atomic_dec(&worker->num_pending);
 			spin_lock_irq(&worker->lock);
+			check_idle_worker(worker);
 		}
 		worker->working = 0;
 		if (freezing(current)) {
@@ -98,6 +138,7 @@
 	struct list_head *cur;
 	struct btrfs_worker_thread *worker;
 
+	list_splice_init(&workers->idle_list, &workers->worker_list);
 	while(!list_empty(&workers->worker_list)) {
 		cur = workers->worker_list.next;
 		worker = list_entry(cur, struct btrfs_worker_thread,
@@ -116,9 +157,10 @@
 {
 	workers->num_workers = 0;
 	INIT_LIST_HEAD(&workers->worker_list);
-	workers->last = NULL;
+	INIT_LIST_HEAD(&workers->idle_list);
 	spin_lock_init(&workers->lock);
 	workers->max_workers = max;
+	workers->idle_thresh = 64;
 }
 
 /*
@@ -143,14 +185,14 @@
 		spin_lock_init(&worker->lock);
 		atomic_set(&worker->num_pending, 0);
 		worker->task = kthread_run(worker_loop, worker, "btrfs");
+		worker->workers = workers;
 		if (IS_ERR(worker->task)) {
 			ret = PTR_ERR(worker->task);
 			goto fail;
 		}
 
 		spin_lock_irq(&workers->lock);
-		list_add_tail(&worker->worker_list, &workers->worker_list);
-		workers->last = worker;
+		list_add_tail(&worker->worker_list, &workers->idle_list);
 		workers->num_workers++;
 		spin_unlock_irq(&workers->lock);
 	}
@@ -169,42 +211,30 @@
 {
 	struct btrfs_worker_thread *worker;
 	struct list_head *next;
-	struct list_head *start;
 	int enforce_min = workers->num_workers < workers->max_workers;
 
-	/* start with the last thread if it isn't busy */
-	worker = workers->last;
-	if (atomic_read(&worker->num_pending) < 64)
-		goto done;
-
-	next = worker->worker_list.next;
-	start = &worker->worker_list;
-
 	/*
-	 * check all the workers for someone that is bored.  FIXME, do
-	 * something smart here
+	 * if we find an idle thread, don't move it to the end of the
+	 * idle list.  This improves the chance that the next submission
+	 * will reuse the same thread, and maybe catch it while it is still
+	 * working
 	 */
-	while(next != start) {
-		if (next == &workers->worker_list) {
-			next = workers->worker_list.next;
-			continue;
-		}
+	if (!list_empty(&workers->idle_list)) {
+		next = workers->idle_list.next;
 		worker = list_entry(next, struct btrfs_worker_thread,
 				    worker_list);
-		if (atomic_read(&worker->num_pending) < 64 || !enforce_min)
-			goto done;
-		next = next->next;
+		return worker;
 	}
+	if (enforce_min || list_empty(&workers->worker_list))
+		return NULL;
+
 	/*
-	 * nobody was bored, if we're already at the max thread count,
-	 * use the last thread
+	 * if we pick a busy task, move the task to the end of the list.
+	 * hopefully this will keep things somewhat evenly balanced
 	 */
-	if (!enforce_min || atomic_read(&workers->last->num_pending) < 64) {
-		return workers->last;
-	}
-	return NULL;
-done:
-	workers->last = worker;
+	next = workers->worker_list.next;
+	worker = list_entry(next, struct btrfs_worker_thread, worker_list);
+	list_move_tail(next, &workers->worker_list);
 	return worker;
 }
 
@@ -221,11 +251,17 @@
 	if (!worker) {
 		spin_lock_irqsave(&workers->lock, flags);
 		if (workers->num_workers >= workers->max_workers) {
+			struct list_head *fallback = NULL;
 			/*
 			 * we have failed to find any workers, just
 			 * return the force one
 			 */
-			worker = list_entry(workers->worker_list.next,
+			if (!list_empty(&workers->worker_list))
+				fallback = workers->worker_list.next;
+			if (!list_empty(&workers->idle_list))
+				fallback = workers->idle_list.next;
+			BUG_ON(!fallback);
+			worker = list_entry(fallback,
 				  struct btrfs_worker_thread, worker_list);
 			spin_unlock_irqrestore(&workers->lock, flags);
 		} else {
@@ -254,6 +290,7 @@
 	spin_lock_irqsave(&worker->lock, flags);
 	atomic_inc(&worker->num_pending);
 	list_add_tail(&work->list, &worker->pending);
+	check_busy_worker(worker);
 	spin_unlock_irqrestore(&worker->lock, flags);
 out:
 	return 0;
@@ -276,6 +313,7 @@
 
 	spin_lock_irqsave(&worker->lock, flags);
 	atomic_inc(&worker->num_pending);
+	check_busy_worker(worker);
 	list_add_tail(&work->list, &worker->pending);
 
 	/*