fuse: separate pqueue for clones

Make each fuse device clone refer to a separate processing queue.  The only
constraint on userspace code is that the request answer must be written to
the same device clone as it was read off.

Signed-off-by: Miklos Szeredi <mszeredi@suse.cz>
diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c
index 99e94584..80cc1b3 100644
--- a/fs/fuse/dev.c
+++ b/fs/fuse/dev.c
@@ -1232,12 +1232,13 @@
  * request_end().  Otherwise add it to the processing list, and set
  * the 'sent' flag.
  */
-static ssize_t fuse_dev_do_read(struct fuse_conn *fc, struct file *file,
+static ssize_t fuse_dev_do_read(struct fuse_dev *fud, struct file *file,
 				struct fuse_copy_state *cs, size_t nbytes)
 {
 	ssize_t err;
+	struct fuse_conn *fc = fud->fc;
 	struct fuse_iqueue *fiq = &fc->iq;
-	struct fuse_pqueue *fpq = &fc->pq;
+	struct fuse_pqueue *fpq = &fud->pq;
 	struct fuse_req *req;
 	struct fuse_in *in;
 	unsigned reqsize;
@@ -1358,7 +1359,7 @@
 
 	fuse_copy_init(&cs, 1, to);
 
-	return fuse_dev_do_read(fud->fc, file, &cs, iov_iter_count(to));
+	return fuse_dev_do_read(fud, file, &cs, iov_iter_count(to));
 }
 
 static ssize_t fuse_dev_splice_read(struct file *in, loff_t *ppos,
@@ -1382,7 +1383,7 @@
 	fuse_copy_init(&cs, 1, NULL);
 	cs.pipebufs = bufs;
 	cs.pipe = pipe;
-	ret = fuse_dev_do_read(fud->fc, in, &cs, len);
+	ret = fuse_dev_do_read(fud, in, &cs, len);
 	if (ret < 0)
 		goto out;
 
@@ -1862,11 +1863,12 @@
  * it from the list and copy the rest of the buffer to the request.
  * The request is finished by calling request_end()
  */
-static ssize_t fuse_dev_do_write(struct fuse_conn *fc,
+static ssize_t fuse_dev_do_write(struct fuse_dev *fud,
 				 struct fuse_copy_state *cs, size_t nbytes)
 {
 	int err;
-	struct fuse_pqueue *fpq = &fc->pq;
+	struct fuse_conn *fc = fud->fc;
+	struct fuse_pqueue *fpq = &fud->pq;
 	struct fuse_req *req;
 	struct fuse_out_header oh;
 
@@ -1966,7 +1968,7 @@
 
 	fuse_copy_init(&cs, 0, from);
 
-	return fuse_dev_do_write(fud->fc, &cs, iov_iter_count(from));
+	return fuse_dev_do_write(fud, &cs, iov_iter_count(from));
 }
 
 static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe,
@@ -2037,7 +2039,7 @@
 	if (flags & SPLICE_F_MOVE)
 		cs.move_pages = 1;
 
-	ret = fuse_dev_do_write(fud->fc, &cs, len);
+	ret = fuse_dev_do_write(fud, &cs, len);
 
 	for (idx = 0; idx < nbuf; idx++) {
 		struct pipe_buffer *buf = &bufs[idx];
@@ -2124,10 +2126,10 @@
 void fuse_abort_conn(struct fuse_conn *fc)
 {
 	struct fuse_iqueue *fiq = &fc->iq;
-	struct fuse_pqueue *fpq = &fc->pq;
 
 	spin_lock(&fc->lock);
 	if (fc->connected) {
+		struct fuse_dev *fud;
 		struct fuse_req *req, *next;
 		LIST_HEAD(to_end1);
 		LIST_HEAD(to_end2);
@@ -2135,20 +2137,24 @@
 		fc->connected = 0;
 		fc->blocked = 0;
 		fuse_set_initialized(fc);
-		spin_lock(&fpq->lock);
-		fpq->connected = 0;
-		list_for_each_entry_safe(req, next, &fpq->io, list) {
-			req->out.h.error = -ECONNABORTED;
-			spin_lock(&req->waitq.lock);
-			set_bit(FR_ABORTED, &req->flags);
-			if (!test_bit(FR_LOCKED, &req->flags)) {
-				set_bit(FR_PRIVATE, &req->flags);
-				list_move(&req->list, &to_end1);
+		list_for_each_entry(fud, &fc->devices, entry) {
+			struct fuse_pqueue *fpq = &fud->pq;
+
+			spin_lock(&fpq->lock);
+			fpq->connected = 0;
+			list_for_each_entry_safe(req, next, &fpq->io, list) {
+				req->out.h.error = -ECONNABORTED;
+				spin_lock(&req->waitq.lock);
+				set_bit(FR_ABORTED, &req->flags);
+				if (!test_bit(FR_LOCKED, &req->flags)) {
+					set_bit(FR_PRIVATE, &req->flags);
+					list_move(&req->list, &to_end1);
+				}
+				spin_unlock(&req->waitq.lock);
 			}
-			spin_unlock(&req->waitq.lock);
+			list_splice_init(&fpq->processing, &to_end2);
+			spin_unlock(&fpq->lock);
 		}
-		list_splice_init(&fpq->processing, &to_end2);
-		spin_unlock(&fpq->lock);
 		fc->max_background = UINT_MAX;
 		flush_bg_queue(fc);
 
@@ -2183,13 +2189,17 @@
 
 	if (fud) {
 		struct fuse_conn *fc = fud->fc;
+		struct fuse_pqueue *fpq = &fud->pq;
 
-		WARN_ON(!list_empty(&fc->pq.io));
-		WARN_ON(fc->iq.fasync != NULL);
-		fuse_abort_conn(fc);
+		WARN_ON(!list_empty(&fpq->io));
+		end_requests(fc, &fpq->processing);
+		/* Are we the last open device? */
+		if (atomic_dec_and_test(&fc->dev_count)) {
+			WARN_ON(fc->iq.fasync != NULL);
+			fuse_abort_conn(fc);
+		}
 		fuse_dev_free(fud);
 	}
-
 	return 0;
 }
 EXPORT_SYMBOL_GPL(fuse_dev_release);
@@ -2217,6 +2227,7 @@
 		return -ENOMEM;
 
 	new->private_data = fud;
+	atomic_inc(&fc->dev_count);
 
 	return 0;
 }