bcache: Refactor journalling flow control

Making things less asynchronous that don't need to be - bch_journal()
only has to block when the journal or journal entry is full, which is
emphatically not a fast path. So make it a normal function that just
returns when it finishes, to make the code and control flow easier to
follow.

Signed-off-by: Kent Overstreet <kmo@daterainc.com>
diff --git a/drivers/md/bcache/btree.c b/drivers/md/bcache/btree.c
index f960607..777c01d 100644
--- a/drivers/md/bcache/btree.c
+++ b/drivers/md/bcache/btree.c
@@ -2164,9 +2164,6 @@
 		}
 	}
 
-	if (op->journal)
-		atomic_dec_bug(op->journal);
-	op->journal = NULL;
 	return ret;
 }
 
diff --git a/drivers/md/bcache/closure.h b/drivers/md/bcache/closure.h
index 0003992..ab011f0 100644
--- a/drivers/md/bcache/closure.h
+++ b/drivers/md/bcache/closure.h
@@ -642,7 +642,7 @@
 #define continue_at_nobarrier(_cl, _fn, _wq)				\
 do {									\
 	set_closure_fn(_cl, _fn, _wq);					\
-	closure_queue(cl);						\
+	closure_queue(_cl);						\
 	return;								\
 } while (0)
 
diff --git a/drivers/md/bcache/journal.c b/drivers/md/bcache/journal.c
index 1bdefdb..940e89e 100644
--- a/drivers/md/bcache/journal.c
+++ b/drivers/md/bcache/journal.c
@@ -318,7 +318,6 @@
 			bch_keylist_push(&op->keys);
 
 			op->journal = i->pin;
-			atomic_inc(op->journal);
 
 			ret = bch_btree_insert(op, s, &op->keys);
 			if (ret)
@@ -357,48 +356,35 @@
 	 * Try to find the btree node with that references the oldest journal
 	 * entry, best is our current candidate and is locked if non NULL:
 	 */
-	struct btree *b, *best = NULL;
-	unsigned iter;
+	struct btree *b, *best;
+	unsigned i;
+retry:
+	best = NULL;
 
-	for_each_cached_btree(b, c, iter) {
-		if (!down_write_trylock(&b->lock))
-			continue;
-
-		if (!btree_node_dirty(b) ||
-		    !btree_current_write(b)->journal) {
-			rw_unlock(true, b);
-			continue;
+	for_each_cached_btree(b, c, i)
+		if (btree_current_write(b)->journal) {
+			if (!best)
+				best = b;
+			else if (journal_pin_cmp(c,
+						 btree_current_write(best),
+						 btree_current_write(b))) {
+				best = b;
+			}
 		}
 
-		if (!best)
-			best = b;
-		else if (journal_pin_cmp(c,
-					 btree_current_write(best),
-					 btree_current_write(b))) {
-			rw_unlock(true, best);
-			best = b;
-		} else
+	b = best;
+	if (b) {
+		rw_lock(true, b, b->level);
+
+		if (!btree_current_write(b)->journal) {
 			rw_unlock(true, b);
+			/* We raced */
+			goto retry;
+		}
+
+		bch_btree_node_write(b, NULL);
+		rw_unlock(true, b);
 	}
-
-	if (best)
-		goto out;
-
-	/* We can't find the best btree node, just pick the first */
-	list_for_each_entry(b, &c->btree_cache, list)
-		if (!b->level && btree_node_dirty(b)) {
-			best = b;
-			rw_lock(true, best, best->level);
-			goto found;
-		}
-
-out:
-	if (!best)
-		return;
-found:
-	if (btree_node_dirty(best))
-		bch_btree_node_write(best, NULL);
-	rw_unlock(true, best);
 }
 
 #define last_seq(j)	((j)->seq - fifo_used(&(j)->pin) + 1)
@@ -494,7 +480,7 @@
 		do_journal_discard(ca);
 
 	if (c->journal.blocks_free)
-		return;
+		goto out;
 
 	/*
 	 * Allocate:
@@ -520,7 +506,7 @@
 
 	if (n)
 		c->journal.blocks_free = c->sb.bucket_size >> c->block_bits;
-
+out:
 	if (!journal_full(&c->journal))
 		__closure_wake_up(&c->journal.wait);
 }
@@ -659,7 +645,7 @@
 	journal_write_unlocked(cl);
 }
 
-static void __journal_try_write(struct cache_set *c, bool noflush)
+static void journal_try_write(struct cache_set *c)
 	__releases(c->journal.lock)
 {
 	struct closure *cl = &c->journal.io;
@@ -667,29 +653,59 @@
 
 	w->need_write = true;
 
-	if (!closure_trylock(cl, &c->cl))
-		spin_unlock(&c->journal.lock);
-	else if (noflush && journal_full(&c->journal)) {
-		spin_unlock(&c->journal.lock);
-		continue_at(cl, journal_write, system_wq);
-	} else
+	if (closure_trylock(cl, &c->cl))
 		journal_write_unlocked(cl);
+	else
+		spin_unlock(&c->journal.lock);
 }
 
-#define journal_try_write(c)	__journal_try_write(c, false)
-
-void bch_journal_meta(struct cache_set *c, struct closure *cl)
+static struct journal_write *journal_wait_for_write(struct cache_set *c,
+						    unsigned nkeys)
 {
-	struct journal_write *w;
+	size_t sectors;
+	struct closure cl;
 
-	if (CACHE_SYNC(&c->sb)) {
+	closure_init_stack(&cl);
+
+	spin_lock(&c->journal.lock);
+
+	while (1) {
+		struct journal_write *w = c->journal.cur;
+
+		sectors = __set_blocks(w->data, w->data->keys + nkeys,
+				       c) * c->sb.block_size;
+
+		if (sectors <= min_t(size_t,
+				     c->journal.blocks_free * c->sb.block_size,
+				     PAGE_SECTORS << JSET_BITS))
+			return w;
+
+		/* XXX: tracepoint */
+		if (!journal_full(&c->journal)) {
+			trace_bcache_journal_entry_full(c);
+
+			/*
+			 * XXX: If we were inserting so many keys that they
+			 * won't fit in an _empty_ journal write, we'll
+			 * deadlock. For now, handle this in
+			 * bch_keylist_realloc() - but something to think about.
+			 */
+			BUG_ON(!w->data->keys);
+
+			closure_wait(&w->wait, &cl);
+			journal_try_write(c); /* unlocks */
+		} else {
+			trace_bcache_journal_full(c);
+
+			closure_wait(&c->journal.wait, &cl);
+			journal_reclaim(c);
+			spin_unlock(&c->journal.lock);
+
+			btree_flush_write(c);
+		}
+
+		closure_sync(&cl);
 		spin_lock(&c->journal.lock);
-		w = c->journal.cur;
-
-		if (cl)
-			BUG_ON(!closure_wait(&w->wait, cl));
-
-		__journal_try_write(c, true);
 	}
 }
 
@@ -708,68 +724,26 @@
  * bch_journal() hands those same keys off to btree_insert_async()
  */
 
-void bch_journal(struct closure *cl)
+atomic_t *bch_journal(struct cache_set *c,
+		      struct keylist *keys,
+		      struct closure *parent)
 {
-	struct btree_op *op = container_of(cl, struct btree_op, cl);
-	struct cache_set *c = op->c;
 	struct journal_write *w;
-	size_t sectors, nkeys;
+	atomic_t *ret;
 
-	if (op->type != BTREE_INSERT ||
-	    !CACHE_SYNC(&c->sb))
-		goto out;
+	if (!CACHE_SYNC(&c->sb))
+		return NULL;
 
-	/*
-	 * If we're looping because we errored, might already be waiting on
-	 * another journal write:
-	 */
-	while (atomic_read(&cl->parent->remaining) & CLOSURE_WAITING)
-		closure_sync(cl->parent);
+	w = journal_wait_for_write(c, bch_keylist_nkeys(keys));
 
-	spin_lock(&c->journal.lock);
+	memcpy(end(w->data), keys->keys, bch_keylist_bytes(keys));
+	w->data->keys += bch_keylist_nkeys(keys);
 
-	if (journal_full(&c->journal)) {
-		trace_bcache_journal_full(c);
+	ret = &fifo_back(&c->journal.pin);
+	atomic_inc(ret);
 
-		closure_wait(&c->journal.wait, cl);
-
-		journal_reclaim(c);
-		spin_unlock(&c->journal.lock);
-
-		btree_flush_write(c);
-		continue_at(cl, bch_journal, bcache_wq);
-	}
-
-	w = c->journal.cur;
-	nkeys = w->data->keys + bch_keylist_nkeys(&op->keys);
-	sectors = __set_blocks(w->data, nkeys, c) * c->sb.block_size;
-
-	if (sectors > min_t(size_t,
-			    c->journal.blocks_free * c->sb.block_size,
-			    PAGE_SECTORS << JSET_BITS)) {
-		trace_bcache_journal_entry_full(c);
-
-		/*
-		 * XXX: If we were inserting so many keys that they won't fit in
-		 * an _empty_ journal write, we'll deadlock. For now, handle
-		 * this in bch_keylist_realloc() - but something to think about.
-		 */
-		BUG_ON(!w->data->keys);
-
-		BUG_ON(!closure_wait(&w->wait, cl));
-
-		journal_try_write(c);
-		continue_at(cl, bch_journal, bcache_wq);
-	}
-
-	memcpy(end(w->data), op->keys.keys, bch_keylist_bytes(&op->keys));
-	w->data->keys += bch_keylist_nkeys(&op->keys);
-
-	op->journal = &fifo_back(&c->journal.pin);
-	atomic_inc(op->journal);
-
-	if (op->flush_journal) {
-		closure_wait(&w->wait, cl->parent);
+	if (parent) {
+		closure_wait(&w->wait, parent);
 		journal_try_write(c);
 	} else if (!w->need_write) {
 		schedule_delayed_work(&c->journal.work,
@@ -778,8 +752,21 @@
 	} else {
 		spin_unlock(&c->journal.lock);
 	}
-out:
-	bch_btree_insert_async(cl);
+
+
+	return ret;
+}
+
+void bch_journal_meta(struct cache_set *c, struct closure *cl)
+{
+	struct keylist keys;
+	atomic_t *ref;
+
+	bch_keylist_init(&keys);
+
+	ref = bch_journal(c, &keys, cl);
+	if (ref)
+		atomic_dec_bug(ref);
 }
 
 void bch_journal_free(struct cache_set *c)
diff --git a/drivers/md/bcache/journal.h b/drivers/md/bcache/journal.h
index 3ca93d3..7045e6f 100644
--- a/drivers/md/bcache/journal.h
+++ b/drivers/md/bcache/journal.h
@@ -200,8 +200,9 @@
 struct closure;
 struct cache_set;
 struct btree_op;
+struct keylist;
 
-void bch_journal(struct closure *);
+atomic_t *bch_journal(struct cache_set *, struct keylist *, struct closure *);
 void bch_journal_next(struct journal *);
 void bch_journal_mark(struct cache_set *, struct list_head *);
 void bch_journal_meta(struct cache_set *, struct closure *);
diff --git a/drivers/md/bcache/movinggc.c b/drivers/md/bcache/movinggc.c
index dd8a035..2c42377 100644
--- a/drivers/md/bcache/movinggc.c
+++ b/drivers/md/bcache/movinggc.c
@@ -110,7 +110,7 @@
 		bkey_copy(&s->op.replace, &io->w->key);
 
 		closure_init(&s->op.cl, cl);
-		bch_insert_data(&s->op.cl);
+		bch_data_insert(&s->op.cl);
 	}
 
 	continue_at(cl, write_moving_finish, bch_gc_wq);
diff --git a/drivers/md/bcache/request.c b/drivers/md/bcache/request.c
index 3b85f33..1c3af44 100644
--- a/drivers/md/bcache/request.c
+++ b/drivers/md/bcache/request.c
@@ -25,6 +25,8 @@
 
 struct kmem_cache *bch_search_cache;
 
+static void bch_data_insert_start(struct closure *);
+
 /* Cgroup interface */
 
 #ifdef CONFIG_CGROUP_BCACHE
@@ -211,31 +213,42 @@
 
 /* Insert data into cache */
 
-static void bio_invalidate(struct closure *cl)
+static void bch_data_insert_keys(struct closure *cl)
 {
 	struct btree_op *op = container_of(cl, struct btree_op, cl);
-	struct bio *bio = op->cache_bio;
+	struct search *s = container_of(op, struct search, op);
 
-	pr_debug("invalidating %i sectors from %llu",
-		 bio_sectors(bio), (uint64_t) bio->bi_sector);
+	/*
+	 * If we're looping, might already be waiting on
+	 * another journal write - can't wait on more than one journal write at
+	 * a time
+	 *
+	 * XXX: this looks wrong
+	 */
+#if 0
+	while (atomic_read(&s->cl.remaining) & CLOSURE_WAITING)
+		closure_sync(&s->cl);
+#endif
 
-	while (bio_sectors(bio)) {
-		unsigned len = min(bio_sectors(bio), 1U << 14);
+	if (s->write)
+		op->journal = bch_journal(op->c, &op->keys,
+					  op->flush_journal
+					  ? &s->cl : NULL);
 
-		if (bch_keylist_realloc(&op->keys, 0, op->c))
-			goto out;
-
-		bio->bi_sector	+= len;
-		bio->bi_size	-= len << 9;
-
-		bch_keylist_add(&op->keys,
-				&KEY(op->inode, bio->bi_sector, len));
+	if (bch_btree_insert(op, op->c, &op->keys)) {
+		s->error		= -ENOMEM;
+		op->insert_data_done	= true;
 	}
 
-	op->insert_data_done = true;
-	bio_put(bio);
-out:
-	continue_at(cl, bch_journal, bcache_wq);
+	if (op->journal)
+		atomic_dec_bug(op->journal);
+	op->journal = NULL;
+
+	if (!op->insert_data_done)
+		continue_at(cl, bch_data_insert_start, bcache_wq);
+
+	bch_keylist_free(&op->keys);
+	closure_return(cl);
 }
 
 struct open_bucket {
@@ -423,7 +436,34 @@
 	return true;
 }
 
-static void bch_insert_data_error(struct closure *cl)
+static void bch_data_invalidate(struct closure *cl)
+{
+	struct btree_op *op = container_of(cl, struct btree_op, cl);
+	struct bio *bio = op->cache_bio;
+
+	pr_debug("invalidating %i sectors from %llu",
+		 bio_sectors(bio), (uint64_t) bio->bi_sector);
+
+	while (bio_sectors(bio)) {
+		unsigned len = min(bio_sectors(bio), 1U << 14);
+
+		if (bch_keylist_realloc(&op->keys, 0, op->c))
+			goto out;
+
+		bio->bi_sector	+= len;
+		bio->bi_size	-= len << 9;
+
+		bch_keylist_add(&op->keys, &KEY(op->inode,
+						bio->bi_sector, len));
+	}
+
+	op->insert_data_done = true;
+	bio_put(bio);
+out:
+	continue_at(cl, bch_data_insert_keys, bcache_wq);
+}
+
+static void bch_data_insert_error(struct closure *cl)
 {
 	struct btree_op *op = container_of(cl, struct btree_op, cl);
 
@@ -450,10 +490,10 @@
 
 	op->keys.top = dst;
 
-	bch_journal(cl);
+	bch_data_insert_keys(cl);
 }
 
-static void bch_insert_data_endio(struct bio *bio, int error)
+static void bch_data_insert_endio(struct bio *bio, int error)
 {
 	struct closure *cl = bio->bi_private;
 	struct btree_op *op = container_of(cl, struct btree_op, cl);
@@ -464,7 +504,7 @@
 		if (s->writeback)
 			s->error = error;
 		else if (s->write)
-			set_closure_fn(cl, bch_insert_data_error, bcache_wq);
+			set_closure_fn(cl, bch_data_insert_error, bcache_wq);
 		else
 			set_closure_fn(cl, NULL, NULL);
 	}
@@ -472,14 +512,14 @@
 	bch_bbio_endio(op->c, bio, error, "writing data to cache");
 }
 
-static void bch_insert_data_loop(struct closure *cl)
+static void bch_data_insert_start(struct closure *cl)
 {
 	struct btree_op *op = container_of(cl, struct btree_op, cl);
 	struct search *s = container_of(op, struct search, op);
 	struct bio *bio = op->cache_bio, *n;
 
 	if (op->bypass)
-		return bio_invalidate(cl);
+		return bch_data_invalidate(cl);
 
 	if (atomic_sub_return(bio_sectors(bio), &op->c->sectors_to_gc) < 0) {
 		set_gc_sectors(op->c);
@@ -502,7 +542,7 @@
 		if (bch_keylist_realloc(&op->keys,
 					1 + (op->csum ? 1 : 0),
 					op->c))
-			continue_at(cl, bch_journal, bcache_wq);
+			continue_at(cl, bch_data_insert_keys, bcache_wq);
 
 		k = op->keys.top;
 		bkey_init(k);
@@ -514,7 +554,7 @@
 
 		n = bch_bio_split(bio, KEY_SIZE(k), GFP_NOIO, split);
 
-		n->bi_end_io	= bch_insert_data_endio;
+		n->bi_end_io	= bch_data_insert_endio;
 		n->bi_private	= cl;
 
 		if (s->writeback) {
@@ -537,7 +577,7 @@
 	} while (n != bio);
 
 	op->insert_data_done = true;
-	continue_at(cl, bch_journal, bcache_wq);
+	continue_at(cl, bch_data_insert_keys, bcache_wq);
 err:
 	/* bch_alloc_sectors() blocks if s->writeback = true */
 	BUG_ON(s->writeback);
@@ -556,7 +596,7 @@
 		 * rest of the write.
 		 */
 		op->bypass = true;
-		return bio_invalidate(cl);
+		return bch_data_invalidate(cl);
 	} else {
 		/*
 		 * From a cache miss, we can just insert the keys for the data
@@ -566,14 +606,14 @@
 		bio_put(bio);
 
 		if (!bch_keylist_empty(&op->keys))
-			continue_at(cl, bch_journal, bcache_wq);
+			continue_at(cl, bch_data_insert_keys, bcache_wq);
 		else
 			closure_return(cl);
 	}
 }
 
 /**
- * bch_insert_data - stick some data in the cache
+ * bch_data_insert - stick some data in the cache
  *
  * This is the starting point for any data to end up in a cache device; it could
  * be from a normal write, or a writeback write, or a write to a flash only
@@ -591,30 +631,13 @@
  * If op->bypass is true, instead of inserting the data it invalidates the
  * region of the cache represented by op->cache_bio and op->inode.
  */
-void bch_insert_data(struct closure *cl)
+void bch_data_insert(struct closure *cl)
 {
 	struct btree_op *op = container_of(cl, struct btree_op, cl);
 
 	bch_keylist_init(&op->keys);
 	bio_get(op->cache_bio);
-	bch_insert_data_loop(cl);
-}
-
-void bch_btree_insert_async(struct closure *cl)
-{
-	struct btree_op *op = container_of(cl, struct btree_op, cl);
-	struct search *s = container_of(op, struct search, op);
-
-	if (bch_btree_insert(op, op->c, &op->keys)) {
-		s->error		= -ENOMEM;
-		op->insert_data_done	= true;
-	}
-
-	if (op->insert_data_done) {
-		bch_keylist_free(&op->keys);
-		closure_return(cl);
-	} else
-		continue_at(cl, bch_insert_data_loop, bcache_wq);
+	bch_data_insert_start(cl);
 }
 
 /* Common code for the make_request functions */
@@ -969,7 +992,7 @@
 	if (s->op.cache_bio &&
 	    !test_bit(CACHE_SET_STOPPING, &s->op.c->flags)) {
 		s->op.type = BTREE_REPLACE;
-		closure_call(&s->op.cl, bch_insert_data, NULL, cl);
+		closure_call(&s->op.cl, bch_data_insert, NULL, cl);
 	}
 
 	continue_at(cl, cached_dev_cache_miss_done, NULL);
@@ -1147,13 +1170,13 @@
 		closure_bio_submit(bio, cl, s->d);
 	}
 
-	closure_call(&s->op.cl, bch_insert_data, NULL, cl);
+	closure_call(&s->op.cl, bch_data_insert, NULL, cl);
 	continue_at(cl, cached_dev_write_complete, NULL);
 }
 
-static void cached_dev_nodata(struct cached_dev *dc, struct search *s)
+static void cached_dev_nodata(struct closure *cl)
 {
-	struct closure *cl = &s->cl;
+	struct search *s = container_of(cl, struct search, cl);
 	struct bio *bio = &s->bio.bio;
 
 	if (s->op.flush_journal)
@@ -1186,9 +1209,15 @@
 		s = search_alloc(bio, d);
 		trace_bcache_request_start(s, bio);
 
-		if (!bio->bi_size)
-			cached_dev_nodata(dc, s);
-		else {
+		if (!bio->bi_size) {
+			/*
+			 * can't call bch_journal_meta from under
+			 * generic_make_request
+			 */
+			continue_at_nobarrier(&s->cl,
+					      cached_dev_nodata,
+					      bcache_wq);
+		} else {
 			s->op.bypass = check_should_bypass(dc, s);
 
 			if (rw)
@@ -1275,6 +1304,16 @@
 	return 0;
 }
 
+static void flash_dev_nodata(struct closure *cl)
+{
+	struct search *s = container_of(cl, struct search, cl);
+
+	if (s->op.flush_journal)
+		bch_journal_meta(s->op.c, cl);
+
+	continue_at(cl, search_free, NULL);
+}
+
 static void flash_dev_make_request(struct request_queue *q, struct bio *bio)
 {
 	struct search *s;
@@ -1294,8 +1333,13 @@
 	trace_bcache_request_start(s, bio);
 
 	if (!bio->bi_size) {
-		if (s->op.flush_journal)
-			bch_journal_meta(s->op.c, cl);
+		/*
+		 * can't call bch_journal_meta from under
+		 * generic_make_request
+		 */
+		continue_at_nobarrier(&s->cl,
+				      flash_dev_nodata,
+				      bcache_wq);
 	} else if (rw) {
 		bch_keybuf_check_overlapping(&s->op.c->moving_gc_keys,
 					&KEY(d->id, bio->bi_sector, 0),
@@ -1305,7 +1349,7 @@
 		s->writeback	= true;
 		s->op.cache_bio	= bio;
 
-		closure_call(&s->op.cl, bch_insert_data, NULL, cl);
+		closure_call(&s->op.cl, bch_data_insert, NULL, cl);
 	} else {
 		closure_call(&s->op.cl, btree_read_async, NULL, cl);
 	}
diff --git a/drivers/md/bcache/request.h b/drivers/md/bcache/request.h
index 57dc478..1f1b59d 100644
--- a/drivers/md/bcache/request.h
+++ b/drivers/md/bcache/request.h
@@ -31,8 +31,7 @@
 
 void bch_cache_read_endio(struct bio *, int);
 unsigned bch_get_congested(struct cache_set *);
-void bch_insert_data(struct closure *cl);
-void bch_btree_insert_async(struct closure *);
+void bch_data_insert(struct closure *cl);
 void bch_cache_read_endio(struct bio *, int);
 
 void bch_open_buckets_free(struct cache_set *);