fuse: allow batching of FORGET requests

Terje Malmedal reports that a fuse filesystem with 32 million inodes
on a machine with lots of memory can take up to 30 minutes to process
FORGET requests when all those inodes are evicted from the icache.

To solve this, create a BATCH_FORGET request that allows up to about
8000 FORGET requests to be sent in a single message.

This request is only sent if userspace supports interface version 7.16
or later, otherwise fall back to sending individual FORGET messages.

Reported-by: Terje Malmedal <terje.malmedal@usit.uio.no>
Signed-off-by: Miklos Szeredi <mszeredi@suse.cz>
diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c
index fed6530..cf8d28d 100644
--- a/fs/fuse/dev.c
+++ b/fs/fuse/dev.c
@@ -254,8 +254,8 @@
 void fuse_queue_forget(struct fuse_conn *fc, struct fuse_forget_link *forget,
 		       u64 nodeid, u64 nlookup)
 {
-	forget->nodeid = nodeid;
-	forget->nlookup = nlookup;
+	forget->forget_one.nodeid = nodeid;
+	forget->forget_one.nlookup = nlookup;
 
 	spin_lock(&fc->lock);
 	fc->forget_list_tail->next = forget;
@@ -974,15 +974,26 @@
 	return err ? err : reqsize;
 }
 
-static struct fuse_forget_link *dequeue_forget(struct fuse_conn *fc)
+static struct fuse_forget_link *dequeue_forget(struct fuse_conn *fc,
+					       unsigned max,
+					       unsigned *countp)
 {
-	struct fuse_forget_link *forget = fc->forget_list_head.next;
+	struct fuse_forget_link *head = fc->forget_list_head.next;
+	struct fuse_forget_link **newhead = &head;
+	unsigned count;
 
-	fc->forget_list_head.next = forget->next;
+	for (count = 0; *newhead != NULL && count < max; count++)
+		newhead = &(*newhead)->next;
+
+	fc->forget_list_head.next = *newhead;
+	*newhead = NULL;
 	if (fc->forget_list_head.next == NULL)
 		fc->forget_list_tail = &fc->forget_list_head;
 
-	return forget;
+	if (countp != NULL)
+		*countp = count;
+
+	return head;
 }
 
 static int fuse_read_single_forget(struct fuse_conn *fc,
@@ -991,13 +1002,13 @@
 __releases(fc->lock)
 {
 	int err;
-	struct fuse_forget_link *forget = dequeue_forget(fc);
+	struct fuse_forget_link *forget = dequeue_forget(fc, 1, NULL);
 	struct fuse_forget_in arg = {
-		.nlookup = forget->nlookup,
+		.nlookup = forget->forget_one.nlookup,
 	};
 	struct fuse_in_header ih = {
 		.opcode = FUSE_FORGET,
-		.nodeid = forget->nodeid,
+		.nodeid = forget->forget_one.nodeid,
 		.unique = fuse_get_unique(fc),
 		.len = sizeof(ih) + sizeof(arg),
 	};
@@ -1018,6 +1029,65 @@
 	return ih.len;
 }
 
+static int fuse_read_batch_forget(struct fuse_conn *fc,
+				   struct fuse_copy_state *cs, size_t nbytes)
+__releases(fc->lock)
+{
+	int err;
+	unsigned max_forgets;
+	unsigned count;
+	struct fuse_forget_link *head;
+	struct fuse_batch_forget_in arg = { .count = 0 };
+	struct fuse_in_header ih = {
+		.opcode = FUSE_BATCH_FORGET,
+		.unique = fuse_get_unique(fc),
+		.len = sizeof(ih) + sizeof(arg),
+	};
+
+	if (nbytes < ih.len) {
+		spin_unlock(&fc->lock);
+		return -EINVAL;
+	}
+
+	max_forgets = (nbytes - ih.len) / sizeof(struct fuse_forget_one);
+	head = dequeue_forget(fc, max_forgets, &count);
+	spin_unlock(&fc->lock);
+
+	arg.count = count;
+	ih.len += count * sizeof(struct fuse_forget_one);
+	err = fuse_copy_one(cs, &ih, sizeof(ih));
+	if (!err)
+		err = fuse_copy_one(cs, &arg, sizeof(arg));
+
+	while (head) {
+		struct fuse_forget_link *forget = head;
+
+		if (!err) {
+			err = fuse_copy_one(cs, &forget->forget_one,
+					    sizeof(forget->forget_one));
+		}
+		head = forget->next;
+		kfree(forget);
+	}
+
+	fuse_copy_finish(cs);
+
+	if (err)
+		return err;
+
+	return ih.len;
+}
+
+static int fuse_read_forget(struct fuse_conn *fc, struct fuse_copy_state *cs,
+			    size_t nbytes)
+__releases(fc->lock)
+{
+	if (fc->minor < 16 || fc->forget_list_head.next->next == NULL)
+		return fuse_read_single_forget(fc, cs, nbytes);
+	else
+		return fuse_read_batch_forget(fc, cs, nbytes);
+}
+
 /*
  * Read a single request into the userspace filesystem's buffer.  This
  * function waits until a request is available, then removes it from
@@ -1058,7 +1128,7 @@
 
 	if (forget_pending(fc)) {
 		if (list_empty(&fc->pending) || fc->forget_batch-- > 0)
-			return fuse_read_single_forget(fc, cs, nbytes);
+			return fuse_read_forget(fc, cs, nbytes);
 
 		if (fc->forget_batch <= -8)
 			fc->forget_batch = 16;
@@ -1837,7 +1907,7 @@
 	end_requests(fc, &fc->pending);
 	end_requests(fc, &fc->processing);
 	while (forget_pending(fc))
-		kfree(dequeue_forget(fc));
+		kfree(dequeue_forget(fc, 1, NULL));
 }
 
 /*