Btrfs: add waitqueue instead of doing busy waiting for more delayed refs

Now that we may be holding back delayed refs for a limited period, we
might end up having no runnable delayed refs. Without this commit, we'd
do busy waiting in that thread until another (runnable) ref arives.
Instead, we're detecting this situation and use a waitqueue, such that
we only try to run more refs after
	a) another runnable ref was added  or
	b) delayed refs are no longer held back

Signed-off-by: Jan Schmidt <list.btrfs@jan-o-sch.net>
diff --git a/fs/btrfs/extent-tree.c b/fs/btrfs/extent-tree.c
index bbcca12..0a435e2 100644
--- a/fs/btrfs/extent-tree.c
+++ b/fs/btrfs/extent-tree.c
@@ -2300,7 +2300,12 @@
 		ref->in_tree = 0;
 		rb_erase(&ref->rb_node, &delayed_refs->root);
 		delayed_refs->num_entries--;
-
+		/*
+		 * we modified num_entries, but as we're currently running
+		 * delayed refs, skip
+		 *     wake_up(&delayed_refs->seq_wait);
+		 * here.
+		 */
 		spin_unlock(&delayed_refs->lock);
 
 		ret = run_one_delayed_ref(trans, root, ref, extent_op,
@@ -2317,6 +2322,23 @@
 	return count;
 }
 
+
+static void wait_for_more_refs(struct btrfs_delayed_ref_root *delayed_refs,
+			unsigned long num_refs)
+{
+	struct list_head *first_seq = delayed_refs->seq_head.next;
+
+	spin_unlock(&delayed_refs->lock);
+	pr_debug("waiting for more refs (num %ld, first %p)\n",
+		 num_refs, first_seq);
+	wait_event(delayed_refs->seq_wait,
+		   num_refs != delayed_refs->num_entries ||
+		   delayed_refs->seq_head.next != first_seq);
+	pr_debug("done waiting for more refs (num %ld, first %p)\n",
+		 delayed_refs->num_entries, delayed_refs->seq_head.next);
+	spin_lock(&delayed_refs->lock);
+}
+
 /*
  * this starts processing the delayed reference count updates and
  * extent insertions we have queued up so far.  count can be
@@ -2332,8 +2354,11 @@
 	struct btrfs_delayed_ref_node *ref;
 	struct list_head cluster;
 	int ret;
+	u64 delayed_start;
 	int run_all = count == (unsigned long)-1;
 	int run_most = 0;
+	unsigned long num_refs = 0;
+	int consider_waiting;
 
 	if (root == root->fs_info->extent_root)
 		root = root->fs_info->tree_root;
@@ -2341,6 +2366,7 @@
 	delayed_refs = &trans->transaction->delayed_refs;
 	INIT_LIST_HEAD(&cluster);
 again:
+	consider_waiting = 0;
 	spin_lock(&delayed_refs->lock);
 	if (count == 0) {
 		count = delayed_refs->num_entries * 2;
@@ -2357,11 +2383,35 @@
 		 * of refs to process starting at the first one we are able to
 		 * lock
 		 */
+		delayed_start = delayed_refs->run_delayed_start;
 		ret = btrfs_find_ref_cluster(trans, &cluster,
 					     delayed_refs->run_delayed_start);
 		if (ret)
 			break;
 
+		if (delayed_start >= delayed_refs->run_delayed_start) {
+			if (consider_waiting == 0) {
+				/*
+				 * btrfs_find_ref_cluster looped. let's do one
+				 * more cycle. if we don't run any delayed ref
+				 * during that cycle (because we can't because
+				 * all of them are blocked) and if the number of
+				 * refs doesn't change, we avoid busy waiting.
+				 */
+				consider_waiting = 1;
+				num_refs = delayed_refs->num_entries;
+			} else {
+				wait_for_more_refs(delayed_refs, num_refs);
+				/*
+				 * after waiting, things have changed. we
+				 * dropped the lock and someone else might have
+				 * run some refs, built new clusters and so on.
+				 * therefore, we restart staleness detection.
+				 */
+				consider_waiting = 0;
+			}
+		}
+
 		ret = run_clustered_refs(trans, root, &cluster);
 		BUG_ON(ret < 0);
 
@@ -2369,6 +2419,11 @@
 
 		if (count == 0)
 			break;
+
+		if (ret || delayed_refs->run_delayed_start == 0) {
+			/* refs were run, let's reset staleness detection */
+			consider_waiting = 0;
+		}
 	}
 
 	if (run_all) {
@@ -4933,6 +4988,8 @@
 	rb_erase(&head->node.rb_node, &delayed_refs->root);
 
 	delayed_refs->num_entries--;
+	if (waitqueue_active(&delayed_refs->seq_wait))
+		wake_up(&delayed_refs->seq_wait);
 
 	/*
 	 * we don't take a ref on the node because we're removing it from the