btrfs: incremental send, improve rmdir performance for large directory

Currently when checking if a directory can be deleted, we always check
if all its children have been processed.

Example: A directory with 2,000,000 files was deleted

original: 1994m57.071s
patch:       1m38.554s

[FIX]
Instead of checking all children on all calls to can_rmdir(), we keep
track of the directory index offset of the child last checked in the
last call to can_rmdir(), and then use it as the starting point for
future calls to can_rmdir().

Signed-off-by: Robbie Ko <robbieko@synology.com>
Reviewed-by: Filipe Manana <fdmanana@suse.com>
[ update changelog ]
Signed-off-by: David Sterba <dsterba@suse.com>
diff --git a/fs/btrfs/send.c b/fs/btrfs/send.c
index 29cfc0d..c47f62b 100644
--- a/fs/btrfs/send.c
+++ b/fs/btrfs/send.c
@@ -235,6 +235,7 @@
 	struct rb_node node;
 	u64 ino;
 	u64 gen;
+	u64 last_dir_index_offset;
 };
 
 struct name_cache_entry {
@@ -2861,6 +2862,7 @@
 		return ERR_PTR(-ENOMEM);
 	odi->ino = dir_ino;
 	odi->gen = 0;
+	odi->last_dir_index_offset = 0;
 
 	rb_link_node(&odi->node, parent, p);
 	rb_insert_color(&odi->node, &sctx->orphan_dirs);
@@ -2916,6 +2918,7 @@
 	struct btrfs_key found_key;
 	struct btrfs_key loc;
 	struct btrfs_dir_item *di;
+	struct orphan_dir_info *odi = NULL;
 
 	/*
 	 * Don't try to rmdir the top/root subvolume dir.
@@ -2930,6 +2933,11 @@
 	key.objectid = dir;
 	key.type = BTRFS_DIR_INDEX_KEY;
 	key.offset = 0;
+
+	odi = get_orphan_dir_info(sctx, dir);
+	if (odi)
+		key.offset = odi->last_dir_index_offset;
+
 	ret = btrfs_search_slot(NULL, root, &key, path, 0, 0);
 	if (ret < 0)
 		goto out;
@@ -2957,30 +2965,33 @@
 
 		dm = get_waiting_dir_move(sctx, loc.objectid);
 		if (dm) {
-			struct orphan_dir_info *odi;
-
 			odi = add_orphan_dir_info(sctx, dir);
 			if (IS_ERR(odi)) {
 				ret = PTR_ERR(odi);
 				goto out;
 			}
 			odi->gen = dir_gen;
+			odi->last_dir_index_offset = found_key.offset;
 			dm->rmdir_ino = dir;
 			ret = 0;
 			goto out;
 		}
 
 		if (loc.objectid > send_progress) {
-			struct orphan_dir_info *odi;
-
-			odi = get_orphan_dir_info(sctx, dir);
-			free_orphan_dir_info(sctx, odi);
+			odi = add_orphan_dir_info(sctx, dir);
+			if (IS_ERR(odi)) {
+				ret = PTR_ERR(odi);
+				goto out;
+			}
+			odi->gen = dir_gen;
+			odi->last_dir_index_offset = found_key.offset;
 			ret = 0;
 			goto out;
 		}
 
 		path->slots[0]++;
 	}
+	free_orphan_dir_info(sctx, odi);
 
 	ret = 1;
 
@@ -3258,13 +3269,16 @@
 
 	if (rmdir_ino) {
 		struct orphan_dir_info *odi;
+		u64 gen;
 
 		odi = get_orphan_dir_info(sctx, rmdir_ino);
 		if (!odi) {
 			/* already deleted */
 			goto finish;
 		}
-		ret = can_rmdir(sctx, rmdir_ino, odi->gen, sctx->cur_ino);
+		gen = odi->gen;
+
+		ret = can_rmdir(sctx, rmdir_ino, gen, sctx->cur_ino);
 		if (ret < 0)
 			goto out;
 		if (!ret)
@@ -3275,13 +3289,12 @@
 			ret = -ENOMEM;
 			goto out;
 		}
-		ret = get_cur_path(sctx, rmdir_ino, odi->gen, name);
+		ret = get_cur_path(sctx, rmdir_ino, gen, name);
 		if (ret < 0)
 			goto out;
 		ret = send_rmdir(sctx, name);
 		if (ret < 0)
 			goto out;
-		free_orphan_dir_info(sctx, odi);
 	}
 
 finish: