bcache: Refactor journalling flow control

Making things less asynchronous that don't need to be - bch_journal()
only has to block when the journal or journal entry is full, which is
emphatically not a fast path. So make it a normal function that just
returns when it finishes, to make the code and control flow easier to
follow.

Signed-off-by: Kent Overstreet <kmo@daterainc.com>
diff --git a/drivers/md/bcache/journal.c b/drivers/md/bcache/journal.c
index 1bdefdb..940e89e 100644
--- a/drivers/md/bcache/journal.c
+++ b/drivers/md/bcache/journal.c
@@ -318,7 +318,6 @@
 			bch_keylist_push(&op->keys);
 
 			op->journal = i->pin;
-			atomic_inc(op->journal);
 
 			ret = bch_btree_insert(op, s, &op->keys);
 			if (ret)
@@ -357,48 +356,35 @@
 	 * Try to find the btree node with that references the oldest journal
 	 * entry, best is our current candidate and is locked if non NULL:
 	 */
-	struct btree *b, *best = NULL;
-	unsigned iter;
+	struct btree *b, *best;
+	unsigned i;
+retry:
+	best = NULL;
 
-	for_each_cached_btree(b, c, iter) {
-		if (!down_write_trylock(&b->lock))
-			continue;
-
-		if (!btree_node_dirty(b) ||
-		    !btree_current_write(b)->journal) {
-			rw_unlock(true, b);
-			continue;
+	for_each_cached_btree(b, c, i)
+		if (btree_current_write(b)->journal) {
+			if (!best)
+				best = b;
+			else if (journal_pin_cmp(c,
+						 btree_current_write(best),
+						 btree_current_write(b))) {
+				best = b;
+			}
 		}
 
-		if (!best)
-			best = b;
-		else if (journal_pin_cmp(c,
-					 btree_current_write(best),
-					 btree_current_write(b))) {
-			rw_unlock(true, best);
-			best = b;
-		} else
+	b = best;
+	if (b) {
+		rw_lock(true, b, b->level);
+
+		if (!btree_current_write(b)->journal) {
 			rw_unlock(true, b);
+			/* We raced */
+			goto retry;
+		}
+
+		bch_btree_node_write(b, NULL);
+		rw_unlock(true, b);
 	}
-
-	if (best)
-		goto out;
-
-	/* We can't find the best btree node, just pick the first */
-	list_for_each_entry(b, &c->btree_cache, list)
-		if (!b->level && btree_node_dirty(b)) {
-			best = b;
-			rw_lock(true, best, best->level);
-			goto found;
-		}
-
-out:
-	if (!best)
-		return;
-found:
-	if (btree_node_dirty(best))
-		bch_btree_node_write(best, NULL);
-	rw_unlock(true, best);
 }
 
 #define last_seq(j)	((j)->seq - fifo_used(&(j)->pin) + 1)
@@ -494,7 +480,7 @@
 		do_journal_discard(ca);
 
 	if (c->journal.blocks_free)
-		return;
+		goto out;
 
 	/*
 	 * Allocate:
@@ -520,7 +506,7 @@
 
 	if (n)
 		c->journal.blocks_free = c->sb.bucket_size >> c->block_bits;
-
+out:
 	if (!journal_full(&c->journal))
 		__closure_wake_up(&c->journal.wait);
 }
@@ -659,7 +645,7 @@
 	journal_write_unlocked(cl);
 }
 
-static void __journal_try_write(struct cache_set *c, bool noflush)
+static void journal_try_write(struct cache_set *c)
 	__releases(c->journal.lock)
 {
 	struct closure *cl = &c->journal.io;
@@ -667,29 +653,59 @@
 
 	w->need_write = true;
 
-	if (!closure_trylock(cl, &c->cl))
-		spin_unlock(&c->journal.lock);
-	else if (noflush && journal_full(&c->journal)) {
-		spin_unlock(&c->journal.lock);
-		continue_at(cl, journal_write, system_wq);
-	} else
+	if (closure_trylock(cl, &c->cl))
 		journal_write_unlocked(cl);
+	else
+		spin_unlock(&c->journal.lock);
 }
 
-#define journal_try_write(c)	__journal_try_write(c, false)
-
-void bch_journal_meta(struct cache_set *c, struct closure *cl)
+static struct journal_write *journal_wait_for_write(struct cache_set *c,
+						    unsigned nkeys)
 {
-	struct journal_write *w;
+	size_t sectors;
+	struct closure cl;
 
-	if (CACHE_SYNC(&c->sb)) {
+	closure_init_stack(&cl);
+
+	spin_lock(&c->journal.lock);
+
+	while (1) {
+		struct journal_write *w = c->journal.cur;
+
+		sectors = __set_blocks(w->data, w->data->keys + nkeys,
+				       c) * c->sb.block_size;
+
+		if (sectors <= min_t(size_t,
+				     c->journal.blocks_free * c->sb.block_size,
+				     PAGE_SECTORS << JSET_BITS))
+			return w;
+
+		/* XXX: tracepoint */
+		if (!journal_full(&c->journal)) {
+			trace_bcache_journal_entry_full(c);
+
+			/*
+			 * XXX: If we were inserting so many keys that they
+			 * won't fit in an _empty_ journal write, we'll
+			 * deadlock. For now, handle this in
+			 * bch_keylist_realloc() - but something to think about.
+			 */
+			BUG_ON(!w->data->keys);
+
+			closure_wait(&w->wait, &cl);
+			journal_try_write(c); /* unlocks */
+		} else {
+			trace_bcache_journal_full(c);
+
+			closure_wait(&c->journal.wait, &cl);
+			journal_reclaim(c);
+			spin_unlock(&c->journal.lock);
+
+			btree_flush_write(c);
+		}
+
+		closure_sync(&cl);
 		spin_lock(&c->journal.lock);
-		w = c->journal.cur;
-
-		if (cl)
-			BUG_ON(!closure_wait(&w->wait, cl));
-
-		__journal_try_write(c, true);
 	}
 }
 
@@ -708,68 +724,26 @@
  * bch_journal() hands those same keys off to btree_insert_async()
  */
 
-void bch_journal(struct closure *cl)
+atomic_t *bch_journal(struct cache_set *c,
+		      struct keylist *keys,
+		      struct closure *parent)
 {
-	struct btree_op *op = container_of(cl, struct btree_op, cl);
-	struct cache_set *c = op->c;
 	struct journal_write *w;
-	size_t sectors, nkeys;
+	atomic_t *ret;
 
-	if (op->type != BTREE_INSERT ||
-	    !CACHE_SYNC(&c->sb))
-		goto out;
+	if (!CACHE_SYNC(&c->sb))
+		return NULL;
 
-	/*
-	 * If we're looping because we errored, might already be waiting on
-	 * another journal write:
-	 */
-	while (atomic_read(&cl->parent->remaining) & CLOSURE_WAITING)
-		closure_sync(cl->parent);
+	w = journal_wait_for_write(c, bch_keylist_nkeys(keys));
 
-	spin_lock(&c->journal.lock);
+	memcpy(end(w->data), keys->keys, bch_keylist_bytes(keys));
+	w->data->keys += bch_keylist_nkeys(keys);
 
-	if (journal_full(&c->journal)) {
-		trace_bcache_journal_full(c);
+	ret = &fifo_back(&c->journal.pin);
+	atomic_inc(ret);
 
-		closure_wait(&c->journal.wait, cl);
-
-		journal_reclaim(c);
-		spin_unlock(&c->journal.lock);
-
-		btree_flush_write(c);
-		continue_at(cl, bch_journal, bcache_wq);
-	}
-
-	w = c->journal.cur;
-	nkeys = w->data->keys + bch_keylist_nkeys(&op->keys);
-	sectors = __set_blocks(w->data, nkeys, c) * c->sb.block_size;
-
-	if (sectors > min_t(size_t,
-			    c->journal.blocks_free * c->sb.block_size,
-			    PAGE_SECTORS << JSET_BITS)) {
-		trace_bcache_journal_entry_full(c);
-
-		/*
-		 * XXX: If we were inserting so many keys that they won't fit in
-		 * an _empty_ journal write, we'll deadlock. For now, handle
-		 * this in bch_keylist_realloc() - but something to think about.
-		 */
-		BUG_ON(!w->data->keys);
-
-		BUG_ON(!closure_wait(&w->wait, cl));
-
-		journal_try_write(c);
-		continue_at(cl, bch_journal, bcache_wq);
-	}
-
-	memcpy(end(w->data), op->keys.keys, bch_keylist_bytes(&op->keys));
-	w->data->keys += bch_keylist_nkeys(&op->keys);
-
-	op->journal = &fifo_back(&c->journal.pin);
-	atomic_inc(op->journal);
-
-	if (op->flush_journal) {
-		closure_wait(&w->wait, cl->parent);
+	if (parent) {
+		closure_wait(&w->wait, parent);
 		journal_try_write(c);
 	} else if (!w->need_write) {
 		schedule_delayed_work(&c->journal.work,
@@ -778,8 +752,21 @@
 	} else {
 		spin_unlock(&c->journal.lock);
 	}
-out:
-	bch_btree_insert_async(cl);
+
+
+	return ret;
+}
+
+void bch_journal_meta(struct cache_set *c, struct closure *cl)
+{
+	struct keylist keys;
+	atomic_t *ref;
+
+	bch_keylist_init(&keys);
+
+	ref = bch_journal(c, &keys, cl);
+	if (ref)
+		atomic_dec_bug(ref);
 }
 
 void bch_journal_free(struct cache_set *c)