Btrfs: allow block group cache writeout outside critical section in commit

We loop through all of the dirty block groups during commit and write
the free space cache.  In order to make sure the cache is currect, we do
this while no other writers are allowed in the commit.

If a large number of block groups are dirty, this can introduce long
stalls during the final stages of the commit, which can block new procs
trying to change the filesystem.

This commit changes the block group cache writeout to take appropriate
locks and allow it to run earlier in the commit.  We'll still have to
redo some of the block groups, but it means we can get most of the work
out of the way without blocking the entire FS.

Signed-off-by: Chris Mason <clm@fb.com>
diff --git a/fs/btrfs/ctree.h b/fs/btrfs/ctree.h
index 1df0d9d..83051fa 100644
--- a/fs/btrfs/ctree.h
+++ b/fs/btrfs/ctree.h
@@ -1491,6 +1491,12 @@
 	struct mutex chunk_mutex;
 	struct mutex volume_mutex;
 
+	/*
+	 * this is taken to make sure we don't set block groups ro after
+	 * the free space cache has been allocated on them
+	 */
+	struct mutex ro_block_group_mutex;
+
 	/* this is used during read/modify/write to make sure
 	 * no two ios are trying to mod the same stripe at the same
 	 * time
@@ -3407,6 +3413,8 @@
 			 u64 bytenr, u64 num_bytes, u64 parent,
 			 u64 root_objectid, u64 owner, u64 offset, int no_quota);
 
+int btrfs_start_dirty_block_groups(struct btrfs_trans_handle *trans,
+				   struct btrfs_root *root);
 int btrfs_write_dirty_block_groups(struct btrfs_trans_handle *trans,
 				    struct btrfs_root *root);
 int btrfs_setup_space_cache(struct btrfs_trans_handle *trans,
diff --git a/fs/btrfs/disk-io.c b/fs/btrfs/disk-io.c
index 568cc4e..b5e3d5f 100644
--- a/fs/btrfs/disk-io.c
+++ b/fs/btrfs/disk-io.c
@@ -2572,6 +2572,7 @@
 	mutex_init(&fs_info->transaction_kthread_mutex);
 	mutex_init(&fs_info->cleaner_mutex);
 	mutex_init(&fs_info->volume_mutex);
+	mutex_init(&fs_info->ro_block_group_mutex);
 	init_rwsem(&fs_info->commit_root_sem);
 	init_rwsem(&fs_info->cleanup_work_sem);
 	init_rwsem(&fs_info->subvol_sem);
diff --git a/fs/btrfs/extent-tree.c b/fs/btrfs/extent-tree.c
index 40c9513..02c2b29 100644
--- a/fs/btrfs/extent-tree.c
+++ b/fs/btrfs/extent-tree.c
@@ -3298,7 +3298,7 @@
 		if (ret)
 			goto out_put;
 
-		ret = btrfs_truncate_free_space_cache(root, trans, inode);
+		ret = btrfs_truncate_free_space_cache(root, trans, NULL, inode);
 		if (ret)
 			goto out_put;
 	}
@@ -3382,6 +3382,146 @@
 	return 0;
 }
 
+/*
+ * transaction commit does final block group cache writeback during a
+ * critical section where nothing is allowed to change the FS.  This is
+ * required in order for the cache to actually match the block group,
+ * but can introduce a lot of latency into the commit.
+ *
+ * So, btrfs_start_dirty_block_groups is here to kick off block group
+ * cache IO.  There's a chance we'll have to redo some of it if the
+ * block group changes again during the commit, but it greatly reduces
+ * the commit latency by getting rid of the easy block groups while
+ * we're still allowing others to join the commit.
+ */
+int btrfs_start_dirty_block_groups(struct btrfs_trans_handle *trans,
+				   struct btrfs_root *root)
+{
+	struct btrfs_block_group_cache *cache;
+	struct btrfs_transaction *cur_trans = trans->transaction;
+	int ret = 0;
+	int should_put;
+	struct btrfs_path *path = NULL;
+	LIST_HEAD(dirty);
+	struct list_head *io = &cur_trans->io_bgs;
+	int num_started = 0;
+	int loops = 0;
+
+	spin_lock(&cur_trans->dirty_bgs_lock);
+	if (!list_empty(&cur_trans->dirty_bgs)) {
+		list_splice_init(&cur_trans->dirty_bgs, &dirty);
+	}
+	spin_unlock(&cur_trans->dirty_bgs_lock);
+
+again:
+	if (list_empty(&dirty)) {
+		btrfs_free_path(path);
+		return 0;
+	}
+
+	/*
+	 * make sure all the block groups on our dirty list actually
+	 * exist
+	 */
+	btrfs_create_pending_block_groups(trans, root);
+
+	if (!path) {
+		path = btrfs_alloc_path();
+		if (!path)
+			return -ENOMEM;
+	}
+
+	while (!list_empty(&dirty)) {
+		cache = list_first_entry(&dirty,
+					 struct btrfs_block_group_cache,
+					 dirty_list);
+
+		/*
+		 * cache_write_mutex is here only to save us from balance
+		 * deleting this block group while we are writing out the
+		 * cache
+		 */
+		mutex_lock(&trans->transaction->cache_write_mutex);
+
+		/*
+		 * this can happen if something re-dirties a block
+		 * group that is already under IO.  Just wait for it to
+		 * finish and then do it all again
+		 */
+		if (!list_empty(&cache->io_list)) {
+			list_del_init(&cache->io_list);
+			btrfs_wait_cache_io(root, trans, cache,
+					    &cache->io_ctl, path,
+					    cache->key.objectid);
+			btrfs_put_block_group(cache);
+		}
+
+
+		/*
+		 * btrfs_wait_cache_io uses the cache->dirty_list to decide
+		 * if it should update the cache_state.  Don't delete
+		 * until after we wait.
+		 *
+		 * Since we're not running in the commit critical section
+		 * we need the dirty_bgs_lock to protect from update_block_group
+		 */
+		spin_lock(&cur_trans->dirty_bgs_lock);
+		list_del_init(&cache->dirty_list);
+		spin_unlock(&cur_trans->dirty_bgs_lock);
+
+		should_put = 1;
+
+		cache_save_setup(cache, trans, path);
+
+		if (cache->disk_cache_state == BTRFS_DC_SETUP) {
+			cache->io_ctl.inode = NULL;
+			ret = btrfs_write_out_cache(root, trans, cache, path);
+			if (ret == 0 && cache->io_ctl.inode) {
+				num_started++;
+				should_put = 0;
+
+				/*
+				 * the cache_write_mutex is protecting
+				 * the io_list
+				 */
+				list_add_tail(&cache->io_list, io);
+			} else {
+				/*
+				 * if we failed to write the cache, the
+				 * generation will be bad and life goes on
+				 */
+				ret = 0;
+			}
+		}
+		if (!ret)
+			ret = write_one_cache_group(trans, root, path, cache);
+		mutex_unlock(&trans->transaction->cache_write_mutex);
+
+		/* if its not on the io list, we need to put the block group */
+		if (should_put)
+			btrfs_put_block_group(cache);
+
+		if (ret)
+			break;
+	}
+
+	/*
+	 * go through delayed refs for all the stuff we've just kicked off
+	 * and then loop back (just once)
+	 */
+	ret = btrfs_run_delayed_refs(trans, root, 0);
+	if (!ret && loops == 0) {
+		loops++;
+		spin_lock(&cur_trans->dirty_bgs_lock);
+		list_splice_init(&cur_trans->dirty_bgs, &dirty);
+		spin_unlock(&cur_trans->dirty_bgs_lock);
+		goto again;
+	}
+
+	btrfs_free_path(path);
+	return ret;
+}
+
 int btrfs_write_dirty_block_groups(struct btrfs_trans_handle *trans,
 				   struct btrfs_root *root)
 {
@@ -3390,12 +3530,8 @@
 	int ret = 0;
 	int should_put;
 	struct btrfs_path *path;
-	LIST_HEAD(io);
+	struct list_head *io = &cur_trans->io_bgs;
 	int num_started = 0;
-	int num_waited = 0;
-
-	if (list_empty(&cur_trans->dirty_bgs))
-		return 0;
 
 	path = btrfs_alloc_path();
 	if (!path)
@@ -3423,14 +3559,16 @@
 					    &cache->io_ctl, path,
 					    cache->key.objectid);
 			btrfs_put_block_group(cache);
-			num_waited++;
 		}
 
+		/*
+		 * don't remove from the dirty list until after we've waited
+		 * on any pending IO
+		 */
 		list_del_init(&cache->dirty_list);
 		should_put = 1;
 
-		if (cache->disk_cache_state == BTRFS_DC_CLEAR)
-			cache_save_setup(cache, trans, path);
+		cache_save_setup(cache, trans, path);
 
 		if (!ret)
 			ret = btrfs_run_delayed_refs(trans, root, (unsigned long) -1);
@@ -3441,7 +3579,7 @@
 			if (ret == 0 && cache->io_ctl.inode) {
 				num_started++;
 				should_put = 0;
-				list_add_tail(&cache->io_list, &io);
+				list_add_tail(&cache->io_list, io);
 			} else {
 				/*
 				 * if we failed to write the cache, the
@@ -3458,11 +3596,10 @@
 			btrfs_put_block_group(cache);
 	}
 
-	while (!list_empty(&io)) {
-		cache = list_first_entry(&io, struct btrfs_block_group_cache,
+	while (!list_empty(io)) {
+		cache = list_first_entry(io, struct btrfs_block_group_cache,
 					 io_list);
 		list_del_init(&cache->io_list);
-		num_waited++;
 		btrfs_wait_cache_io(root, trans, cache,
 				    &cache->io_ctl, path, cache->key.objectid);
 		btrfs_put_block_group(cache);
@@ -5459,15 +5596,6 @@
 		if (!alloc && cache->cached == BTRFS_CACHE_NO)
 			cache_block_group(cache, 1);
 
-		spin_lock(&trans->transaction->dirty_bgs_lock);
-		if (list_empty(&cache->dirty_list)) {
-			list_add_tail(&cache->dirty_list,
-				      &trans->transaction->dirty_bgs);
-				trans->transaction->num_dirty_bgs++;
-			btrfs_get_block_group(cache);
-		}
-		spin_unlock(&trans->transaction->dirty_bgs_lock);
-
 		byte_in_group = bytenr - cache->key.objectid;
 		WARN_ON(byte_in_group > cache->key.offset);
 
@@ -5516,6 +5644,16 @@
 				spin_unlock(&info->unused_bgs_lock);
 			}
 		}
+
+		spin_lock(&trans->transaction->dirty_bgs_lock);
+		if (list_empty(&cache->dirty_list)) {
+			list_add_tail(&cache->dirty_list,
+				      &trans->transaction->dirty_bgs);
+				trans->transaction->num_dirty_bgs++;
+			btrfs_get_block_group(cache);
+		}
+		spin_unlock(&trans->transaction->dirty_bgs_lock);
+
 		btrfs_put_block_group(cache);
 		total -= num_bytes;
 		bytenr += num_bytes;
@@ -8602,10 +8740,30 @@
 
 	BUG_ON(cache->ro);
 
+again:
 	trans = btrfs_join_transaction(root);
 	if (IS_ERR(trans))
 		return PTR_ERR(trans);
 
+	/*
+	 * we're not allowed to set block groups readonly after the dirty
+	 * block groups cache has started writing.  If it already started,
+	 * back off and let this transaction commit
+	 */
+	mutex_lock(&root->fs_info->ro_block_group_mutex);
+	if (trans->transaction->dirty_bg_run) {
+		u64 transid = trans->transid;
+
+		mutex_unlock(&root->fs_info->ro_block_group_mutex);
+		btrfs_end_transaction(trans, root);
+
+		ret = btrfs_wait_for_commit(root, transid);
+		if (ret)
+			return ret;
+		goto again;
+	}
+
+
 	ret = set_block_group_ro(cache, 0);
 	if (!ret)
 		goto out;
@@ -8620,6 +8778,7 @@
 		alloc_flags = update_block_group_flags(root, cache->flags);
 		check_system_chunk(trans, root, alloc_flags);
 	}
+	mutex_unlock(&root->fs_info->ro_block_group_mutex);
 
 	btrfs_end_transaction(trans, root);
 	return ret;
@@ -9425,7 +9584,38 @@
 		goto out;
 	}
 
+	/*
+	 * get the inode first so any iput calls done for the io_list
+	 * aren't the final iput (no unlinks allowed now)
+	 */
 	inode = lookup_free_space_inode(tree_root, block_group, path);
+
+	mutex_lock(&trans->transaction->cache_write_mutex);
+	/*
+	 * make sure our free spache cache IO is done before remove the
+	 * free space inode
+	 */
+	spin_lock(&trans->transaction->dirty_bgs_lock);
+	if (!list_empty(&block_group->io_list)) {
+		list_del_init(&block_group->io_list);
+
+		WARN_ON(!IS_ERR(inode) && inode != block_group->io_ctl.inode);
+
+		spin_unlock(&trans->transaction->dirty_bgs_lock);
+		btrfs_wait_cache_io(root, trans, block_group,
+				    &block_group->io_ctl, path,
+				    block_group->key.objectid);
+		btrfs_put_block_group(block_group);
+		spin_lock(&trans->transaction->dirty_bgs_lock);
+	}
+
+	if (!list_empty(&block_group->dirty_list)) {
+		list_del_init(&block_group->dirty_list);
+		btrfs_put_block_group(block_group);
+	}
+	spin_unlock(&trans->transaction->dirty_bgs_lock);
+	mutex_unlock(&trans->transaction->cache_write_mutex);
+
 	if (!IS_ERR(inode)) {
 		ret = btrfs_orphan_add(trans, inode);
 		if (ret) {
@@ -9518,11 +9708,12 @@
 
 	spin_lock(&trans->transaction->dirty_bgs_lock);
 	if (!list_empty(&block_group->dirty_list)) {
-		list_del_init(&block_group->dirty_list);
-		btrfs_put_block_group(block_group);
+		WARN_ON(1);
+	}
+	if (!list_empty(&block_group->io_list)) {
+		WARN_ON(1);
 	}
 	spin_unlock(&trans->transaction->dirty_bgs_lock);
-
 	btrfs_remove_free_space_cache(block_group);
 
 	spin_lock(&block_group->space_info->lock);
diff --git a/fs/btrfs/free-space-cache.c b/fs/btrfs/free-space-cache.c
index 83532a2..253cb74 100644
--- a/fs/btrfs/free-space-cache.c
+++ b/fs/btrfs/free-space-cache.c
@@ -226,9 +226,37 @@
 
 int btrfs_truncate_free_space_cache(struct btrfs_root *root,
 				    struct btrfs_trans_handle *trans,
+				    struct btrfs_block_group_cache *block_group,
 				    struct inode *inode)
 {
 	int ret = 0;
+	struct btrfs_path *path = btrfs_alloc_path();
+
+	if (!path) {
+		ret = -ENOMEM;
+		goto fail;
+	}
+
+	if (block_group) {
+		mutex_lock(&trans->transaction->cache_write_mutex);
+		if (!list_empty(&block_group->io_list)) {
+			list_del_init(&block_group->io_list);
+
+			btrfs_wait_cache_io(root, trans, block_group,
+					    &block_group->io_ctl, path,
+					    block_group->key.objectid);
+			btrfs_put_block_group(block_group);
+		}
+
+		/*
+		 * now that we've truncated the cache away, its no longer
+		 * setup or written
+		 */
+		spin_lock(&block_group->lock);
+		block_group->disk_cache_state = BTRFS_DC_CLEAR;
+		spin_unlock(&block_group->lock);
+	}
+	btrfs_free_path(path);
 
 	btrfs_i_size_write(inode, 0);
 	truncate_pagecache(inode, 0);
@@ -242,11 +270,17 @@
 	ret = btrfs_truncate_inode_items(trans, root, inode,
 					 0, BTRFS_EXTENT_DATA_KEY);
 	if (ret) {
+		mutex_unlock(&trans->transaction->cache_write_mutex);
 		btrfs_abort_transaction(trans, root, ret);
 		return ret;
 	}
 
 	ret = btrfs_update_inode(trans, root, inode);
+
+	if (block_group)
+		mutex_unlock(&trans->transaction->cache_write_mutex);
+
+fail:
 	if (ret)
 		btrfs_abort_transaction(trans, root, ret);
 
@@ -876,6 +910,7 @@
 {
 	int ret;
 	struct btrfs_free_cluster *cluster = NULL;
+	struct btrfs_free_cluster *cluster_locked = NULL;
 	struct rb_node *node = rb_first(&ctl->free_space_offset);
 	struct btrfs_trim_range *trim_entry;
 
@@ -887,6 +922,8 @@
 	}
 
 	if (!node && cluster) {
+		cluster_locked = cluster;
+		spin_lock(&cluster_locked->lock);
 		node = rb_first(&cluster->root);
 		cluster = NULL;
 	}
@@ -910,9 +947,15 @@
 		node = rb_next(node);
 		if (!node && cluster) {
 			node = rb_first(&cluster->root);
+			cluster_locked = cluster;
+			spin_lock(&cluster_locked->lock);
 			cluster = NULL;
 		}
 	}
+	if (cluster_locked) {
+		spin_unlock(&cluster_locked->lock);
+		cluster_locked = NULL;
+	}
 
 	/*
 	 * Make sure we don't miss any range that was removed from our rbtree
@@ -930,6 +973,8 @@
 
 	return 0;
 fail:
+	if (cluster_locked)
+		spin_unlock(&cluster_locked->lock);
 	return -ENOSPC;
 }
 
@@ -1101,6 +1146,9 @@
 	int ret;
 	struct inode *inode = io_ctl->inode;
 
+	if (!inode)
+		return 0;
+
 	root = root->fs_info->tree_root;
 
 	/* Flush the dirty pages in the cache file. */
@@ -1127,11 +1175,16 @@
 	btrfs_update_inode(trans, root, inode);
 
 	if (block_group) {
+		/* the dirty list is protected by the dirty_bgs_lock */
+		spin_lock(&trans->transaction->dirty_bgs_lock);
+
+		/* the disk_cache_state is protected by the block group lock */
 		spin_lock(&block_group->lock);
 
 		/*
 		 * only mark this as written if we didn't get put back on
-		 * the dirty list while waiting for IO.
+		 * the dirty list while waiting for IO.   Otherwise our
+		 * cache state won't be right, and we won't get written again
 		 */
 		if (!ret && list_empty(&block_group->dirty_list))
 			block_group->disk_cache_state = BTRFS_DC_WRITTEN;
@@ -1139,6 +1192,7 @@
 			block_group->disk_cache_state = BTRFS_DC_ERROR;
 
 		spin_unlock(&block_group->lock);
+		spin_unlock(&trans->transaction->dirty_bgs_lock);
 		io_ctl->inode = NULL;
 		iput(inode);
 	}
@@ -1207,9 +1261,11 @@
 
 	mutex_lock(&ctl->cache_writeout_mutex);
 	/* Write out the extent entries in the free space cache */
+	spin_lock(&ctl->tree_lock);
 	ret = write_cache_extent_entries(io_ctl, ctl,
 					 block_group, &entries, &bitmaps,
 					 &bitmap_list);
+	spin_unlock(&ctl->tree_lock);
 	if (ret) {
 		mutex_unlock(&ctl->cache_writeout_mutex);
 		goto out_nospc;
@@ -1219,6 +1275,9 @@
 	 * Some spaces that are freed in the current transaction are pinned,
 	 * they will be added into free space cache after the transaction is
 	 * committed, we shouldn't lose them.
+	 *
+	 * If this changes while we are working we'll get added back to
+	 * the dirty list and redo it.  No locking needed
 	 */
 	ret = write_pinned_extent_entries(root, block_group, io_ctl, &entries);
 	if (ret) {
@@ -1231,7 +1290,9 @@
 	 * locked while doing it because a concurrent trim can be manipulating
 	 * or freeing the bitmap.
 	 */
+	spin_lock(&ctl->tree_lock);
 	ret = write_bitmap_entries(io_ctl, &bitmap_list);
+	spin_unlock(&ctl->tree_lock);
 	mutex_unlock(&ctl->cache_writeout_mutex);
 	if (ret)
 		goto out_nospc;
@@ -1307,12 +1368,6 @@
 		spin_unlock(&block_group->lock);
 		return 0;
 	}
-
-	if (block_group->delalloc_bytes) {
-		block_group->disk_cache_state = BTRFS_DC_WRITTEN;
-		spin_unlock(&block_group->lock);
-		return 0;
-	}
 	spin_unlock(&block_group->lock);
 
 	inode = lookup_free_space_inode(root, block_group, path);
diff --git a/fs/btrfs/free-space-cache.h b/fs/btrfs/free-space-cache.h
index c433986..a16a029 100644
--- a/fs/btrfs/free-space-cache.h
+++ b/fs/btrfs/free-space-cache.h
@@ -62,6 +62,7 @@
 				       struct btrfs_block_rsv *rsv);
 int btrfs_truncate_free_space_cache(struct btrfs_root *root,
 				    struct btrfs_trans_handle *trans,
+				    struct btrfs_block_group_cache *block_group,
 				    struct inode *inode);
 int load_free_space_cache(struct btrfs_fs_info *fs_info,
 			  struct btrfs_block_group_cache *block_group);
diff --git a/fs/btrfs/inode-map.c b/fs/btrfs/inode-map.c
index 74faea3a..f6a596d 100644
--- a/fs/btrfs/inode-map.c
+++ b/fs/btrfs/inode-map.c
@@ -456,7 +456,7 @@
 	}
 
 	if (i_size_read(inode) > 0) {
-		ret = btrfs_truncate_free_space_cache(root, trans, inode);
+		ret = btrfs_truncate_free_space_cache(root, trans, NULL, inode);
 		if (ret) {
 			if (ret != -ENOSPC)
 				btrfs_abort_transaction(trans, root, ret);
diff --git a/fs/btrfs/relocation.c b/fs/btrfs/relocation.c
index d830853..840a4eb 100644
--- a/fs/btrfs/relocation.c
+++ b/fs/btrfs/relocation.c
@@ -3430,7 +3430,9 @@
 }
 
 static int delete_block_group_cache(struct btrfs_fs_info *fs_info,
-				    struct inode *inode, u64 ino)
+				    struct btrfs_block_group_cache *block_group,
+				    struct inode *inode,
+				    u64 ino)
 {
 	struct btrfs_key key;
 	struct btrfs_root *root = fs_info->tree_root;
@@ -3463,7 +3465,7 @@
 		goto out;
 	}
 
-	ret = btrfs_truncate_free_space_cache(root, trans, inode);
+	ret = btrfs_truncate_free_space_cache(root, trans, block_group, inode);
 
 	btrfs_end_transaction(trans, root);
 	btrfs_btree_balance_dirty(root);
@@ -3509,6 +3511,7 @@
 	 */
 	if (ref_root == BTRFS_ROOT_TREE_OBJECTID) {
 		ret = delete_block_group_cache(rc->extent_root->fs_info,
+					       rc->block_group,
 					       NULL, ref_objectid);
 		if (ret != -ENOENT)
 			return ret;
@@ -4223,7 +4226,7 @@
 	btrfs_free_path(path);
 
 	if (!IS_ERR(inode))
-		ret = delete_block_group_cache(fs_info, inode, 0);
+		ret = delete_block_group_cache(fs_info, rc->block_group, inode, 0);
 	else
 		ret = PTR_ERR(inode);
 
diff --git a/fs/btrfs/transaction.c b/fs/btrfs/transaction.c
index 234d606..5628e25 100644
--- a/fs/btrfs/transaction.c
+++ b/fs/btrfs/transaction.c
@@ -222,6 +222,7 @@
 	atomic_set(&cur_trans->use_count, 2);
 	cur_trans->have_free_bgs = 0;
 	cur_trans->start_time = get_seconds();
+	cur_trans->dirty_bg_run = 0;
 
 	cur_trans->delayed_refs.href_root = RB_ROOT;
 	atomic_set(&cur_trans->delayed_refs.num_entries, 0);
@@ -251,6 +252,8 @@
 	INIT_LIST_HEAD(&cur_trans->switch_commits);
 	INIT_LIST_HEAD(&cur_trans->pending_ordered);
 	INIT_LIST_HEAD(&cur_trans->dirty_bgs);
+	INIT_LIST_HEAD(&cur_trans->io_bgs);
+	mutex_init(&cur_trans->cache_write_mutex);
 	cur_trans->num_dirty_bgs = 0;
 	spin_lock_init(&cur_trans->dirty_bgs_lock);
 	list_add_tail(&cur_trans->list, &fs_info->trans_list);
@@ -1059,6 +1062,7 @@
 {
 	struct btrfs_fs_info *fs_info = root->fs_info;
 	struct list_head *dirty_bgs = &trans->transaction->dirty_bgs;
+	struct list_head *io_bgs = &trans->transaction->io_bgs;
 	struct list_head *next;
 	struct extent_buffer *eb;
 	int ret;
@@ -1112,7 +1116,7 @@
 			return ret;
 	}
 
-	while (!list_empty(dirty_bgs)) {
+	while (!list_empty(dirty_bgs) || !list_empty(io_bgs)) {
 		ret = btrfs_write_dirty_block_groups(trans, root);
 		if (ret)
 			return ret;
@@ -1812,6 +1816,37 @@
 		return ret;
 	}
 
+	if (!cur_trans->dirty_bg_run) {
+		int run_it = 0;
+
+		/* this mutex is also taken before trying to set
+		 * block groups readonly.  We need to make sure
+		 * that nobody has set a block group readonly
+		 * after a extents from that block group have been
+		 * allocated for cache files.  btrfs_set_block_group_ro
+		 * will wait for the transaction to commit if it
+		 * finds dirty_bg_run = 1
+		 *
+		 * The dirty_bg_run flag is also used to make sure only
+		 * one process starts all the block group IO.  It wouldn't
+		 * hurt to have more than one go through, but there's no
+		 * real advantage to it either.
+		 */
+		mutex_lock(&root->fs_info->ro_block_group_mutex);
+		if (!cur_trans->dirty_bg_run) {
+			run_it = 1;
+			cur_trans->dirty_bg_run = 1;
+		}
+		mutex_unlock(&root->fs_info->ro_block_group_mutex);
+
+		if (run_it)
+			ret = btrfs_start_dirty_block_groups(trans, root);
+	}
+	if (ret) {
+		btrfs_end_transaction(trans, root);
+		return ret;
+	}
+
 	spin_lock(&root->fs_info->trans_lock);
 	list_splice(&trans->ordered, &cur_trans->pending_ordered);
 	if (cur_trans->state >= TRANS_STATE_COMMIT_START) {
@@ -2005,6 +2040,7 @@
 
 	assert_qgroups_uptodate(trans);
 	ASSERT(list_empty(&cur_trans->dirty_bgs));
+	ASSERT(list_empty(&cur_trans->io_bgs));
 	update_super_roots(root);
 
 	btrfs_set_super_log_root(root->fs_info->super_copy, 0);
diff --git a/fs/btrfs/transaction.h b/fs/btrfs/transaction.h
index 4cb0ae2..0b24755 100644
--- a/fs/btrfs/transaction.h
+++ b/fs/btrfs/transaction.h
@@ -64,10 +64,19 @@
 	struct list_head pending_ordered;
 	struct list_head switch_commits;
 	struct list_head dirty_bgs;
+	struct list_head io_bgs;
 	u64 num_dirty_bgs;
+
+	/*
+	 * we need to make sure block group deletion doesn't race with
+	 * free space cache writeout.  This mutex keeps them from stomping
+	 * on each other
+	 */
+	struct mutex cache_write_mutex;
 	spinlock_t dirty_bgs_lock;
 	struct btrfs_delayed_ref_root delayed_refs;
 	int aborted;
+	int dirty_bg_run;
 };
 
 #define __TRANS_FREEZABLE	(1U << 0)