fuse: support splice() reading from fuse device

Allow userspace filesystem implementation to use splice() to read from
the fuse device.

The userspace filesystem can now transfer data coming from a WRITE
request to an arbitrary file descriptor (regular file, block device or
socket) without having to go through a userspace buffer.

The semantics of using splice() to read messages are:

 1)  with a single splice() call move the whole message from the fuse
     device to a temporary pipe
 2)  read the header from the pipe and determine the message type
 3a) if message is a WRITE then splice data from pipe to destination
 3b) else read rest of message to userspace buffer

Signed-off-by: Miklos Szeredi <mszeredi@suse.cz>
diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c
index b070d3a..4413f5e 100644
--- a/fs/fuse/dev.c
+++ b/fs/fuse/dev.c
@@ -515,13 +515,12 @@
 };
 
 static void fuse_copy_init(struct fuse_copy_state *cs, struct fuse_conn *fc,
-			   int write, struct fuse_req *req,
+			   int write,
 			   const struct iovec *iov, unsigned long nr_segs)
 {
 	memset(cs, 0, sizeof(*cs));
 	cs->fc = fc;
 	cs->write = write;
-	cs->req = req;
 	cs->iov = iov;
 	cs->nr_segs = nr_segs;
 }
@@ -532,8 +531,12 @@
 	if (cs->currbuf) {
 		struct pipe_buffer *buf = cs->currbuf;
 
-		buf->ops->unmap(cs->pipe, buf, cs->mapaddr);
-
+		if (!cs->write) {
+			buf->ops->unmap(cs->pipe, buf, cs->mapaddr);
+		} else {
+			kunmap_atomic(cs->mapaddr, KM_USER0);
+			buf->len = PAGE_SIZE - cs->len;
+		}
 		cs->currbuf = NULL;
 		cs->mapaddr = NULL;
 	} else if (cs->mapaddr) {
@@ -561,17 +564,39 @@
 	if (cs->pipebufs) {
 		struct pipe_buffer *buf = cs->pipebufs;
 
-		err = buf->ops->confirm(cs->pipe, buf);
-		if (err)
-			return err;
+		if (!cs->write) {
+			err = buf->ops->confirm(cs->pipe, buf);
+			if (err)
+				return err;
 
-		BUG_ON(!cs->nr_segs);
-		cs->currbuf = buf;
-		cs->mapaddr = buf->ops->map(cs->pipe, buf, 1);
-		cs->len = buf->len;
-		cs->buf = cs->mapaddr + buf->offset;
-		cs->pipebufs++;
-		cs->nr_segs--;
+			BUG_ON(!cs->nr_segs);
+			cs->currbuf = buf;
+			cs->mapaddr = buf->ops->map(cs->pipe, buf, 1);
+			cs->len = buf->len;
+			cs->buf = cs->mapaddr + buf->offset;
+			cs->pipebufs++;
+			cs->nr_segs--;
+		} else {
+			struct page *page;
+
+			if (cs->nr_segs == cs->pipe->buffers)
+				return -EIO;
+
+			page = alloc_page(GFP_HIGHUSER);
+			if (!page)
+				return -ENOMEM;
+
+			buf->page = page;
+			buf->offset = 0;
+			buf->len = 0;
+
+			cs->currbuf = buf;
+			cs->mapaddr = kmap_atomic(page, KM_USER0);
+			cs->buf = cs->mapaddr;
+			cs->len = PAGE_SIZE;
+			cs->pipebufs++;
+			cs->nr_segs++;
+		}
 	} else {
 		if (!cs->seglen) {
 			BUG_ON(!cs->nr_segs);
@@ -731,6 +756,30 @@
 	return 1;
 }
 
+static int fuse_ref_page(struct fuse_copy_state *cs, struct page *page,
+			 unsigned offset, unsigned count)
+{
+	struct pipe_buffer *buf;
+
+	if (cs->nr_segs == cs->pipe->buffers)
+		return -EIO;
+
+	unlock_request(cs->fc, cs->req);
+	fuse_copy_finish(cs);
+
+	buf = cs->pipebufs;
+	page_cache_get(page);
+	buf->page = page;
+	buf->offset = offset;
+	buf->len = count;
+
+	cs->pipebufs++;
+	cs->nr_segs++;
+	cs->len = 0;
+
+	return 0;
+}
+
 /*
  * Copy a page in the request to/from the userspace buffer.  Must be
  * done atomically
@@ -747,7 +796,9 @@
 		kunmap_atomic(mapaddr, KM_USER1);
 	}
 	while (count) {
-		if (!cs->len) {
+		if (cs->write && cs->pipebufs && page) {
+			return fuse_ref_page(cs, page, offset, count);
+		} else if (!cs->len) {
 			if (cs->move_pages && page &&
 			    offset == 0 && count == PAGE_SIZE) {
 				err = fuse_try_move_page(cs, pagep);
@@ -862,11 +913,10 @@
  *
  * Called with fc->lock held, releases it
  */
-static int fuse_read_interrupt(struct fuse_conn *fc, struct fuse_req *req,
-			       const struct iovec *iov, unsigned long nr_segs)
+static int fuse_read_interrupt(struct fuse_conn *fc, struct fuse_copy_state *cs,
+			       size_t nbytes, struct fuse_req *req)
 __releases(&fc->lock)
 {
-	struct fuse_copy_state cs;
 	struct fuse_in_header ih;
 	struct fuse_interrupt_in arg;
 	unsigned reqsize = sizeof(ih) + sizeof(arg);
@@ -882,14 +932,13 @@
 	arg.unique = req->in.h.unique;
 
 	spin_unlock(&fc->lock);
-	if (iov_length(iov, nr_segs) < reqsize)
+	if (nbytes < reqsize)
 		return -EINVAL;
 
-	fuse_copy_init(&cs, fc, 1, NULL, iov, nr_segs);
-	err = fuse_copy_one(&cs, &ih, sizeof(ih));
+	err = fuse_copy_one(cs, &ih, sizeof(ih));
 	if (!err)
-		err = fuse_copy_one(&cs, &arg, sizeof(arg));
-	fuse_copy_finish(&cs);
+		err = fuse_copy_one(cs, &arg, sizeof(arg));
+	fuse_copy_finish(cs);
 
 	return err ? err : reqsize;
 }
@@ -903,18 +952,13 @@
  * request_end().  Otherwise add it to the processing list, and set
  * the 'sent' flag.
  */
-static ssize_t fuse_dev_read(struct kiocb *iocb, const struct iovec *iov,
-			      unsigned long nr_segs, loff_t pos)
+static ssize_t fuse_dev_do_read(struct fuse_conn *fc, struct file *file,
+				struct fuse_copy_state *cs, size_t nbytes)
 {
 	int err;
 	struct fuse_req *req;
 	struct fuse_in *in;
-	struct fuse_copy_state cs;
 	unsigned reqsize;
-	struct file *file = iocb->ki_filp;
-	struct fuse_conn *fc = fuse_get_conn(file);
-	if (!fc)
-		return -EPERM;
 
  restart:
 	spin_lock(&fc->lock);
@@ -934,7 +978,7 @@
 	if (!list_empty(&fc->interrupts)) {
 		req = list_entry(fc->interrupts.next, struct fuse_req,
 				 intr_entry);
-		return fuse_read_interrupt(fc, req, iov, nr_segs);
+		return fuse_read_interrupt(fc, cs, nbytes, req);
 	}
 
 	req = list_entry(fc->pending.next, struct fuse_req, list);
@@ -944,7 +988,7 @@
 	in = &req->in;
 	reqsize = in->h.len;
 	/* If request is too large, reply with an error and restart the read */
-	if (iov_length(iov, nr_segs) < reqsize) {
+	if (nbytes < reqsize) {
 		req->out.h.error = -EIO;
 		/* SETXATTR is special, since it may contain too large data */
 		if (in->h.opcode == FUSE_SETXATTR)
@@ -953,12 +997,12 @@
 		goto restart;
 	}
 	spin_unlock(&fc->lock);
-	fuse_copy_init(&cs, fc, 1, req, iov, nr_segs);
-	err = fuse_copy_one(&cs, &in->h, sizeof(in->h));
+	cs->req = req;
+	err = fuse_copy_one(cs, &in->h, sizeof(in->h));
 	if (!err)
-		err = fuse_copy_args(&cs, in->numargs, in->argpages,
+		err = fuse_copy_args(cs, in->numargs, in->argpages,
 				     (struct fuse_arg *) in->args, 0);
-	fuse_copy_finish(&cs);
+	fuse_copy_finish(cs);
 	spin_lock(&fc->lock);
 	req->locked = 0;
 	if (req->aborted) {
@@ -986,6 +1030,110 @@
 	return err;
 }
 
+static ssize_t fuse_dev_read(struct kiocb *iocb, const struct iovec *iov,
+			      unsigned long nr_segs, loff_t pos)
+{
+	struct fuse_copy_state cs;
+	struct file *file = iocb->ki_filp;
+	struct fuse_conn *fc = fuse_get_conn(file);
+	if (!fc)
+		return -EPERM;
+
+	fuse_copy_init(&cs, fc, 1, iov, nr_segs);
+
+	return fuse_dev_do_read(fc, file, &cs, iov_length(iov, nr_segs));
+}
+
+static int fuse_dev_pipe_buf_steal(struct pipe_inode_info *pipe,
+				   struct pipe_buffer *buf)
+{
+	return 1;
+}
+
+static const struct pipe_buf_operations fuse_dev_pipe_buf_ops = {
+	.can_merge = 0,
+	.map = generic_pipe_buf_map,
+	.unmap = generic_pipe_buf_unmap,
+	.confirm = generic_pipe_buf_confirm,
+	.release = generic_pipe_buf_release,
+	.steal = fuse_dev_pipe_buf_steal,
+	.get = generic_pipe_buf_get,
+};
+
+static ssize_t fuse_dev_splice_read(struct file *in, loff_t *ppos,
+				    struct pipe_inode_info *pipe,
+				    size_t len, unsigned int flags)
+{
+	int ret;
+	int page_nr = 0;
+	int do_wakeup = 0;
+	struct pipe_buffer *bufs;
+	struct fuse_copy_state cs;
+	struct fuse_conn *fc = fuse_get_conn(in);
+	if (!fc)
+		return -EPERM;
+
+	bufs = kmalloc(pipe->buffers * sizeof (struct pipe_buffer), GFP_KERNEL);
+	if (!bufs)
+		return -ENOMEM;
+
+	fuse_copy_init(&cs, fc, 1, NULL, 0);
+	cs.pipebufs = bufs;
+	cs.pipe = pipe;
+	ret = fuse_dev_do_read(fc, in, &cs, len);
+	if (ret < 0)
+		goto out;
+
+	ret = 0;
+	pipe_lock(pipe);
+
+	if (!pipe->readers) {
+		send_sig(SIGPIPE, current, 0);
+		if (!ret)
+			ret = -EPIPE;
+		goto out_unlock;
+	}
+
+	if (pipe->nrbufs + cs.nr_segs > pipe->buffers) {
+		ret = -EIO;
+		goto out_unlock;
+	}
+
+	while (page_nr < cs.nr_segs) {
+		int newbuf = (pipe->curbuf + pipe->nrbufs) & (pipe->buffers - 1);
+		struct pipe_buffer *buf = pipe->bufs + newbuf;
+
+		buf->page = bufs[page_nr].page;
+		buf->offset = bufs[page_nr].offset;
+		buf->len = bufs[page_nr].len;
+		buf->ops = &fuse_dev_pipe_buf_ops;
+
+		pipe->nrbufs++;
+		page_nr++;
+		ret += buf->len;
+
+		if (pipe->inode)
+			do_wakeup = 1;
+	}
+
+out_unlock:
+	pipe_unlock(pipe);
+
+	if (do_wakeup) {
+		smp_mb();
+		if (waitqueue_active(&pipe->wait))
+			wake_up_interruptible(&pipe->wait);
+		kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
+	}
+
+out:
+	for (; page_nr < cs.nr_segs; page_nr++)
+		page_cache_release(bufs[page_nr].page);
+
+	kfree(bufs);
+	return ret;
+}
+
 static int fuse_notify_poll(struct fuse_conn *fc, unsigned int size,
 			    struct fuse_copy_state *cs)
 {
@@ -1246,7 +1394,7 @@
 	if (!fc)
 		return -EPERM;
 
-	fuse_copy_init(&cs, fc, 0, NULL, iov, nr_segs);
+	fuse_copy_init(&cs, fc, 0, iov, nr_segs);
 
 	return fuse_dev_do_write(fc, &cs, iov_length(iov, nr_segs));
 }
@@ -1311,11 +1459,8 @@
 	}
 	pipe_unlock(pipe);
 
-	memset(&cs, 0, sizeof(struct fuse_copy_state));
-	cs.fc = fc;
-	cs.write = 0;
+	fuse_copy_init(&cs, fc, 0, NULL, nbuf);
 	cs.pipebufs = bufs;
-	cs.nr_segs = nbuf;
 	cs.pipe = pipe;
 
 	if (flags & SPLICE_F_MOVE)
@@ -1473,6 +1618,7 @@
 	.llseek		= no_llseek,
 	.read		= do_sync_read,
 	.aio_read	= fuse_dev_read,
+	.splice_read	= fuse_dev_splice_read,
 	.write		= do_sync_write,
 	.aio_write	= fuse_dev_write,
 	.splice_write	= fuse_dev_splice_write,