Btrfs: attach delayed ref updates to delayed ref heads

Currently we have two rb-trees, one for delayed ref heads and one for all of the
delayed refs, including the delayed ref heads.  When we process the delayed refs
we have to hold onto the delayed ref lock for all of the selecting and merging
and such, which results in quite a bit of lock contention.  This was solved by
having a waitqueue and only one flusher at a time, however this hurts if we get
a lot of delayed refs queued up.

So instead just have an rb tree for the delayed ref heads, and then attach the
delayed ref updates to an rb tree that is per delayed ref head.  Then we only
need to take the delayed ref lock when adding new delayed refs and when
selecting a delayed ref head to process, all the rest of the time we deal with a
per delayed ref head lock which will be much less contentious.

The locking rules for this get a little more complicated since we have to lock
up to 3 things to properly process delayed refs, but I will address that problem
later.  For now this passes all of xfstests and my overnight stress tests.
Thanks,

Signed-off-by: Josef Bacik <jbacik@fb.com>
Signed-off-by: Chris Mason <clm@fb.com>
diff --git a/fs/btrfs/disk-io.c b/fs/btrfs/disk-io.c
index 9850a51..ed23127 100644
--- a/fs/btrfs/disk-io.c
+++ b/fs/btrfs/disk-io.c
@@ -3801,58 +3801,55 @@
 	delayed_refs = &trans->delayed_refs;
 
 	spin_lock(&delayed_refs->lock);
-	if (delayed_refs->num_entries == 0) {
+	if (atomic_read(&delayed_refs->num_entries) == 0) {
 		spin_unlock(&delayed_refs->lock);
 		btrfs_info(root->fs_info, "delayed_refs has NO entry");
 		return ret;
 	}
 
-	while ((node = rb_first(&delayed_refs->root)) != NULL) {
-		struct btrfs_delayed_ref_head *head = NULL;
+	while ((node = rb_first(&delayed_refs->href_root)) != NULL) {
+		struct btrfs_delayed_ref_head *head;
 		bool pin_bytes = false;
 
-		ref = rb_entry(node, struct btrfs_delayed_ref_node, rb_node);
-		atomic_set(&ref->refs, 1);
-		if (btrfs_delayed_ref_is_head(ref)) {
+		head = rb_entry(node, struct btrfs_delayed_ref_head,
+				href_node);
+		if (!mutex_trylock(&head->mutex)) {
+			atomic_inc(&head->node.refs);
+			spin_unlock(&delayed_refs->lock);
 
-			head = btrfs_delayed_node_to_head(ref);
-			if (!mutex_trylock(&head->mutex)) {
-				atomic_inc(&ref->refs);
-				spin_unlock(&delayed_refs->lock);
-
-				/* Need to wait for the delayed ref to run */
-				mutex_lock(&head->mutex);
-				mutex_unlock(&head->mutex);
-				btrfs_put_delayed_ref(ref);
-
-				spin_lock(&delayed_refs->lock);
-				continue;
-			}
-
-			if (head->must_insert_reserved)
-				pin_bytes = true;
-			btrfs_free_delayed_extent_op(head->extent_op);
-			delayed_refs->num_heads--;
-			if (list_empty(&head->cluster))
-				delayed_refs->num_heads_ready--;
-			list_del_init(&head->cluster);
-		}
-
-		ref->in_tree = 0;
-		rb_erase(&ref->rb_node, &delayed_refs->root);
-		if (head)
-			rb_erase(&head->href_node, &delayed_refs->href_root);
-
-		delayed_refs->num_entries--;
-		spin_unlock(&delayed_refs->lock);
-		if (head) {
-			if (pin_bytes)
-				btrfs_pin_extent(root, ref->bytenr,
-						 ref->num_bytes, 1);
+			mutex_lock(&head->mutex);
 			mutex_unlock(&head->mutex);
+			btrfs_put_delayed_ref(&head->node);
+			spin_lock(&delayed_refs->lock);
+			continue;
 		}
-		btrfs_put_delayed_ref(ref);
+		spin_lock(&head->lock);
+		while ((node = rb_first(&head->ref_root)) != NULL) {
+			ref = rb_entry(node, struct btrfs_delayed_ref_node,
+				       rb_node);
+			ref->in_tree = 0;
+			rb_erase(&ref->rb_node, &head->ref_root);
+			atomic_dec(&delayed_refs->num_entries);
+			btrfs_put_delayed_ref(ref);
+			cond_resched_lock(&head->lock);
+		}
+		if (head->must_insert_reserved)
+			pin_bytes = true;
+		btrfs_free_delayed_extent_op(head->extent_op);
+		delayed_refs->num_heads--;
+		if (head->processing == 0)
+			delayed_refs->num_heads_ready--;
+		atomic_dec(&delayed_refs->num_entries);
+		head->node.in_tree = 0;
+		rb_erase(&head->href_node, &delayed_refs->href_root);
+		spin_unlock(&head->lock);
+		spin_unlock(&delayed_refs->lock);
+		mutex_unlock(&head->mutex);
 
+		if (pin_bytes)
+			btrfs_pin_extent(root, head->node.bytenr,
+					 head->node.num_bytes, 1);
+		btrfs_put_delayed_ref(&head->node);
 		cond_resched();
 		spin_lock(&delayed_refs->lock);
 	}