bcache: btree locking rework

Add a new lock, b->write_lock, which is required to actually modify - or write -
a btree node; this lock is only held for short durations.

This means we can write out a btree node without taking b->lock, which _is_ held
for long durations - solving a deadlock when btree_flush_write() (from the
journalling code) is called with a btree node locked.

Right now just occurs in bch_btree_set_root(), but with an upcoming journalling
rework is going to happen a lot more.

This also turns b->lock is now more of a read/intent lock instead of a
read/write lock - but not completely, since it still blocks readers. May turn it
into a real intent lock at some point in the future.

Signed-off-by: Kent Overstreet <kmo@daterainc.com>
diff --git a/drivers/md/bcache/btree.c b/drivers/md/bcache/btree.c
index e83732e..01b1b7e 100644
--- a/drivers/md/bcache/btree.c
+++ b/drivers/md/bcache/btree.c
@@ -167,6 +167,20 @@
 	return ((void *) btree_bset_first(b)) + b->written * block_bytes(b->c);
 }
 
+static void bch_btree_init_next(struct btree *b)
+{
+	/* If not a leaf node, always sort */
+	if (b->level && b->keys.nsets)
+		bch_btree_sort(&b->keys, &b->c->sort);
+	else
+		bch_btree_sort_lazy(&b->keys, &b->c->sort);
+
+	if (b->written < btree_blocks(b))
+		bch_bset_init_next(&b->keys, write_block(b),
+				   bset_magic(&b->c->sb));
+
+}
+
 /* Btree key manipulation */
 
 void bkey_put(struct cache_set *c, struct bkey *k)
@@ -438,10 +452,12 @@
 	}
 }
 
-void bch_btree_node_write(struct btree *b, struct closure *parent)
+void __bch_btree_node_write(struct btree *b, struct closure *parent)
 {
 	struct bset *i = btree_bset_last(b);
 
+	lockdep_assert_held(&b->write_lock);
+
 	trace_bcache_btree_write(b);
 
 	BUG_ON(current->bio_list);
@@ -465,23 +481,24 @@
 			&PTR_CACHE(b->c, &b->key, 0)->btree_sectors_written);
 
 	b->written += set_blocks(i, block_bytes(b->c));
+}
 
-	/* If not a leaf node, always sort */
-	if (b->level && b->keys.nsets)
-		bch_btree_sort(&b->keys, &b->c->sort);
-	else
-		bch_btree_sort_lazy(&b->keys, &b->c->sort);
+void bch_btree_node_write(struct btree *b, struct closure *parent)
+{
+	unsigned nsets = b->keys.nsets;
+
+	lockdep_assert_held(&b->lock);
+
+	__bch_btree_node_write(b, parent);
 
 	/*
 	 * do verify if there was more than one set initially (i.e. we did a
 	 * sort) and we sorted down to a single set:
 	 */
-	if (i != b->keys.set->data && !b->keys.nsets)
+	if (nsets && !b->keys.nsets)
 		bch_btree_verify(b);
 
-	if (b->written < btree_blocks(b))
-		bch_bset_init_next(&b->keys, write_block(b),
-				   bset_magic(&b->c->sb));
+	bch_btree_init_next(b);
 }
 
 static void bch_btree_node_write_sync(struct btree *b)
@@ -489,7 +506,11 @@
 	struct closure cl;
 
 	closure_init_stack(&cl);
+
+	mutex_lock(&b->write_lock);
 	bch_btree_node_write(b, &cl);
+	mutex_unlock(&b->write_lock);
+
 	closure_sync(&cl);
 }
 
@@ -497,11 +518,10 @@
 {
 	struct btree *b = container_of(to_delayed_work(w), struct btree, work);
 
-	rw_lock(true, b, b->level);
-
+	mutex_lock(&b->write_lock);
 	if (btree_node_dirty(b))
-		bch_btree_node_write(b, NULL);
-	rw_unlock(true, b);
+		__bch_btree_node_write(b, NULL);
+	mutex_unlock(&b->write_lock);
 }
 
 static void bch_btree_leaf_dirty(struct btree *b, atomic_t *journal_ref)
@@ -509,6 +529,8 @@
 	struct bset *i = btree_bset_last(b);
 	struct btree_write *w = btree_current_write(b);
 
+	lockdep_assert_held(&b->write_lock);
+
 	BUG_ON(!b->written);
 	BUG_ON(!i->keys);
 
@@ -593,6 +615,8 @@
 
 	init_rwsem(&b->lock);
 	lockdep_set_novalidate_class(&b->lock);
+	mutex_init(&b->write_lock);
+	lockdep_set_novalidate_class(&b->write_lock);
 	INIT_LIST_HEAD(&b->list);
 	INIT_DELAYED_WORK(&b->work, btree_node_write_work);
 	b->c = c;
@@ -626,8 +650,12 @@
 		up(&b->io_mutex);
 	}
 
+	mutex_lock(&b->write_lock);
 	if (btree_node_dirty(b))
-		bch_btree_node_write_sync(b);
+		__bch_btree_node_write(b, &cl);
+	mutex_unlock(&b->write_lock);
+
+	closure_sync(&cl);
 
 	/* wait for any in flight btree write */
 	down(&b->io_mutex);
@@ -1010,10 +1038,14 @@
 
 	BUG_ON(b == b->c->root);
 
+	mutex_lock(&b->write_lock);
+
 	if (btree_node_dirty(b))
 		btree_complete_write(b, btree_current_write(b));
 	clear_bit(BTREE_NODE_dirty, &b->flags);
 
+	mutex_unlock(&b->write_lock);
+
 	cancel_delayed_work(&b->work);
 
 	mutex_lock(&b->c->bucket_lock);
@@ -1065,8 +1097,10 @@
 {
 	struct btree *n = bch_btree_node_alloc(b->c, b->level, wait);
 	if (!IS_ERR_OR_NULL(n)) {
+		mutex_lock(&n->write_lock);
 		bch_btree_sort_into(&b->keys, &n->keys, &b->c->sort);
 		bkey_copy_key(&n->key, &b->key);
+		mutex_unlock(&n->write_lock);
 	}
 
 	return n;
@@ -1269,6 +1303,9 @@
 			goto out_nocoalesce;
 	}
 
+	for (i = 0; i < nodes; i++)
+		mutex_lock(&new_nodes[i]->write_lock);
+
 	for (i = nodes - 1; i > 0; --i) {
 		struct bset *n1 = btree_bset_first(new_nodes[i]);
 		struct bset *n2 = btree_bset_first(new_nodes[i - 1]);
@@ -1335,6 +1372,9 @@
 		bch_keylist_add(keylist, &new_nodes[i]->key);
 	}
 
+	for (i = 0; i < nodes; i++)
+		mutex_unlock(&new_nodes[i]->write_lock);
+
 	closure_sync(&cl);
 
 	/* We emptied out this node */
@@ -1399,7 +1439,6 @@
 static int btree_gc_recurse(struct btree *b, struct btree_op *op,
 			    struct closure *writes, struct gc_stat *gc)
 {
-	unsigned i;
 	int ret = 0;
 	bool should_rewrite;
 	struct btree *n;
@@ -1407,13 +1446,13 @@
 	struct keylist keys;
 	struct btree_iter iter;
 	struct gc_merge_info r[GC_MERGE_NODES];
-	struct gc_merge_info *last = r + GC_MERGE_NODES - 1;
+	struct gc_merge_info *i, *last = r + ARRAY_SIZE(r) - 1;
 
 	bch_keylist_init(&keys);
 	bch_btree_iter_init(&b->keys, &iter, &b->c->gc_done);
 
-	for (i = 0; i < GC_MERGE_NODES; i++)
-		r[i].b = ERR_PTR(-EINTR);
+	for (i = r; i < r + ARRAY_SIZE(r); i++)
+		i->b = ERR_PTR(-EINTR);
 
 	while (1) {
 		k = bch_btree_iter_next_filter(&iter, &b->keys, bch_ptr_bad);
@@ -1443,6 +1482,7 @@
 
 				if (!IS_ERR_OR_NULL(n)) {
 					bch_btree_node_write_sync(n);
+
 					bch_keylist_add(&keys, &n->key);
 
 					make_btree_freeing_key(last->b,
@@ -1475,8 +1515,10 @@
 			 * Must flush leaf nodes before gc ends, since replace
 			 * operations aren't journalled
 			 */
+			mutex_lock(&last->b->write_lock);
 			if (btree_node_dirty(last->b))
 				bch_btree_node_write(last->b, writes);
+			mutex_unlock(&last->b->write_lock);
 			rw_unlock(true, last->b);
 		}
 
@@ -1489,11 +1531,13 @@
 		}
 	}
 
-	for (i = 0; i < GC_MERGE_NODES; i++)
-		if (!IS_ERR_OR_NULL(r[i].b)) {
-			if (btree_node_dirty(r[i].b))
-				bch_btree_node_write(r[i].b, writes);
-			rw_unlock(true, r[i].b);
+	for (i = r; i < r + ARRAY_SIZE(r); i++)
+		if (!IS_ERR_OR_NULL(i->b)) {
+			mutex_lock(&i->b->write_lock);
+			if (btree_node_dirty(i->b))
+				bch_btree_node_write(i->b, writes);
+			mutex_unlock(&i->b->write_lock);
+			rw_unlock(true, i->b);
 		}
 
 	bch_keylist_free(&keys);
@@ -1514,6 +1558,7 @@
 
 		if (!IS_ERR_OR_NULL(n)) {
 			bch_btree_node_write_sync(n);
+
 			bch_btree_set_root(n);
 			btree_node_free(b);
 			rw_unlock(true, n);
@@ -1871,6 +1916,9 @@
 				goto err_free2;
 		}
 
+		mutex_lock(&n1->write_lock);
+		mutex_lock(&n2->write_lock);
+
 		bch_btree_insert_keys(n1, op, insert_keys, replace_key);
 
 		/*
@@ -1897,21 +1945,26 @@
 
 		bch_keylist_add(&parent_keys, &n2->key);
 		bch_btree_node_write(n2, &cl);
+		mutex_unlock(&n2->write_lock);
 		rw_unlock(true, n2);
 	} else {
 		trace_bcache_btree_node_compact(b, btree_bset_first(n1)->keys);
 
+		mutex_lock(&n1->write_lock);
 		bch_btree_insert_keys(n1, op, insert_keys, replace_key);
 	}
 
 	bch_keylist_add(&parent_keys, &n1->key);
 	bch_btree_node_write(n1, &cl);
+	mutex_unlock(&n1->write_lock);
 
 	if (n3) {
 		/* Depth increases, make a new root */
+		mutex_lock(&n3->write_lock);
 		bkey_copy_key(&n3->key, &MAX_KEY);
 		bch_btree_insert_keys(n3, op, &parent_keys, NULL);
 		bch_btree_node_write(n3, &cl);
+		mutex_unlock(&n3->write_lock);
 
 		closure_sync(&cl);
 		bch_btree_set_root(n3);
@@ -1960,33 +2013,54 @@
 				 atomic_t *journal_ref,
 				 struct bkey *replace_key)
 {
+	struct closure cl;
+
 	BUG_ON(b->level && replace_key);
 
+	closure_init_stack(&cl);
+
+	mutex_lock(&b->write_lock);
+
+	if (write_block(b) != btree_bset_last(b) &&
+	    b->keys.last_set_unwritten)
+		bch_btree_init_next(b); /* just wrote a set */
+
 	if (bch_keylist_nkeys(insert_keys) > insert_u64s_remaining(b)) {
-		if (current->bio_list) {
-			op->lock = b->c->root->level + 1;
-			return -EAGAIN;
-		} else if (op->lock <= b->c->root->level) {
-			op->lock = b->c->root->level + 1;
-			return -EINTR;
-		} else {
-			/* Invalidated all iterators */
-			int ret = btree_split(b, op, insert_keys, replace_key);
+		mutex_unlock(&b->write_lock);
+		goto split;
+	}
 
-			return bch_keylist_empty(insert_keys) ?
-				0 : ret ?: -EINTR;
-		}
+	BUG_ON(write_block(b) != btree_bset_last(b));
+
+	if (bch_btree_insert_keys(b, op, insert_keys, replace_key)) {
+		if (!b->level)
+			bch_btree_leaf_dirty(b, journal_ref);
+		else
+			bch_btree_node_write(b, &cl);
+	}
+
+	mutex_unlock(&b->write_lock);
+
+	/* wait for btree node write if necessary, after unlock */
+	closure_sync(&cl);
+
+	return 0;
+split:
+	if (current->bio_list) {
+		op->lock = b->c->root->level + 1;
+		return -EAGAIN;
+	} else if (op->lock <= b->c->root->level) {
+		op->lock = b->c->root->level + 1;
+		return -EINTR;
 	} else {
-		BUG_ON(write_block(b) != btree_bset_last(b));
+		/* Invalidated all iterators */
+		int ret = btree_split(b, op, insert_keys, replace_key);
 
-		if (bch_btree_insert_keys(b, op, insert_keys, replace_key)) {
-			if (!b->level)
-				bch_btree_leaf_dirty(b, journal_ref);
-			else
-				bch_btree_node_write_sync(b);
-		}
-
-		return 0;
+		if (bch_keylist_empty(insert_keys))
+			return 0;
+		else if (!ret)
+			return -EINTR;
+		return ret;
 	}
 }
 
diff --git a/drivers/md/bcache/btree.h b/drivers/md/bcache/btree.h
index def9dc4..acebf26 100644
--- a/drivers/md/bcache/btree.h
+++ b/drivers/md/bcache/btree.h
@@ -127,6 +127,8 @@
 	struct cache_set	*c;
 	struct btree		*parent;
 
+	struct mutex		write_lock;
+
 	unsigned long		flags;
 	uint16_t		written;	/* would be nice to kill */
 	uint8_t			level;
@@ -236,6 +238,7 @@
 }
 
 void bch_btree_node_read_done(struct btree *);
+void __bch_btree_node_write(struct btree *, struct closure *);
 void bch_btree_node_write(struct btree *, struct closure *);
 
 void bch_btree_set_root(struct btree *);
diff --git a/drivers/md/bcache/journal.c b/drivers/md/bcache/journal.c
index c8bfc28..59e8202 100644
--- a/drivers/md/bcache/journal.c
+++ b/drivers/md/bcache/journal.c
@@ -381,16 +381,15 @@
 
 	b = best;
 	if (b) {
-		rw_lock(true, b, b->level);
-
+		mutex_lock(&b->write_lock);
 		if (!btree_current_write(b)->journal) {
-			rw_unlock(true, b);
+			mutex_unlock(&b->write_lock);
 			/* We raced */
 			goto retry;
 		}
 
-		bch_btree_node_write(b, NULL);
-		rw_unlock(true, b);
+		__bch_btree_node_write(b, NULL);
+		mutex_unlock(&b->write_lock);
 	}
 }
 
diff --git a/drivers/md/bcache/super.c b/drivers/md/bcache/super.c
index ddfde38..9ded064 100644
--- a/drivers/md/bcache/super.c
+++ b/drivers/md/bcache/super.c
@@ -1398,9 +1398,12 @@
 		list_add(&c->root->list, &c->btree_cache);
 
 	/* Should skip this if we're unregistering because of an error */
-	list_for_each_entry(b, &c->btree_cache, list)
+	list_for_each_entry(b, &c->btree_cache, list) {
+		mutex_lock(&b->write_lock);
 		if (btree_node_dirty(b))
-			bch_btree_node_write(b, NULL);
+			__bch_btree_node_write(b, NULL);
+		mutex_unlock(&b->write_lock);
+	}
 
 	for_each_cache(ca, c, i)
 		if (ca->alloc_thread)
@@ -1667,8 +1670,10 @@
 		if (IS_ERR_OR_NULL(c->root))
 			goto err;
 
+		mutex_lock(&c->root->write_lock);
 		bkey_copy_key(&c->root->key, &MAX_KEY);
 		bch_btree_node_write(c->root, &cl);
+		mutex_unlock(&c->root->write_lock);
 
 		bch_btree_set_root(c->root);
 		rw_unlock(true, c->root);