splice: implement pipe to pipe splicing

Allow splice(2) to work when both the input and the output is a pipe.

Based on the impementation of the tee(2) syscall, but instead of
duplicating the buffer references move the buffers from the input pipe
to the output pipe.

Moving the whole buffer only succeeds if the full length of the buffer
is spliced.  Otherwise duplicate the buffer, just like tee(2), set the
length of the output buffer and advance the offset on the input
buffer.

Since splice is operating on two pipes, special care needs to be taken
with locking to prevent AN ABBA deadlock.  Again this is done
similarly to the tee(2) syscall, first preparing the input and output
pipes so there's data to consume and space for that data, and then
doing the move operation while holding both locks.

If other processes are doing I/O on the same pipes parallel to the
splice, then by the time both inodes are locked there might be no
buffers left to move, or no space to move them to.  In this case retry
the whole operation, including the preparation phase.  This could lead
to starvation, but I'm not sure if that's serious enough to worry
about.

Signed-off-by: Miklos Szeredi <mszeredi@suse.cz>
Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
diff --git a/fs/splice.c b/fs/splice.c
index 666953d5..e405cf5 100644
--- a/fs/splice.c
+++ b/fs/splice.c
@@ -1112,6 +1112,9 @@
 	return ret;
 }
 
+static int splice_pipe_to_pipe(struct pipe_inode_info *ipipe,
+			       struct pipe_inode_info *opipe,
+			       size_t len, unsigned int flags);
 /*
  * After the inode slimming patch, i_pipe/i_bdev/i_cdev share the same
  * location, so checking ->i_pipe is not enough to verify that this is a
@@ -1132,12 +1135,32 @@
 		      struct file *out, loff_t __user *off_out,
 		      size_t len, unsigned int flags)
 {
-	struct pipe_inode_info *pipe;
+	struct pipe_inode_info *ipipe;
+	struct pipe_inode_info *opipe;
 	loff_t offset, *off;
 	long ret;
 
-	pipe = pipe_info(in->f_path.dentry->d_inode);
-	if (pipe) {
+	ipipe = pipe_info(in->f_path.dentry->d_inode);
+	opipe = pipe_info(out->f_path.dentry->d_inode);
+
+	if (ipipe && opipe) {
+		if (off_in || off_out)
+			return -ESPIPE;
+
+		if (!(in->f_mode & FMODE_READ))
+			return -EBADF;
+
+		if (!(out->f_mode & FMODE_WRITE))
+			return -EBADF;
+
+		/* Splicing to self would be fun, but... */
+		if (ipipe == opipe)
+			return -EINVAL;
+
+		return splice_pipe_to_pipe(ipipe, opipe, len, flags);
+	}
+
+	if (ipipe) {
 		if (off_in)
 			return -ESPIPE;
 		if (off_out) {
@@ -1149,7 +1172,7 @@
 		} else
 			off = &out->f_pos;
 
-		ret = do_splice_from(pipe, out, off, len, flags);
+		ret = do_splice_from(ipipe, out, off, len, flags);
 
 		if (off_out && copy_to_user(off_out, off, sizeof(loff_t)))
 			ret = -EFAULT;
@@ -1157,8 +1180,7 @@
 		return ret;
 	}
 
-	pipe = pipe_info(out->f_path.dentry->d_inode);
-	if (pipe) {
+	if (opipe) {
 		if (off_out)
 			return -ESPIPE;
 		if (off_in) {
@@ -1170,7 +1192,7 @@
 		} else
 			off = &in->f_pos;
 
-		ret = do_splice_to(in, off, pipe, len, flags);
+		ret = do_splice_to(in, off, opipe, len, flags);
 
 		if (off_in && copy_to_user(off_in, off, sizeof(loff_t)))
 			ret = -EFAULT;
@@ -1511,7 +1533,7 @@
  * Make sure there's data to read. Wait for input if we can, otherwise
  * return an appropriate error.
  */
-static int link_ipipe_prep(struct pipe_inode_info *pipe, unsigned int flags)
+static int ipipe_prep(struct pipe_inode_info *pipe, unsigned int flags)
 {
 	int ret;
 
@@ -1549,7 +1571,7 @@
  * Make sure there's writeable room. Wait for room if we can, otherwise
  * return an appropriate error.
  */
-static int link_opipe_prep(struct pipe_inode_info *pipe, unsigned int flags)
+static int opipe_prep(struct pipe_inode_info *pipe, unsigned int flags)
 {
 	int ret;
 
@@ -1587,6 +1609,124 @@
 }
 
 /*
+ * Splice contents of ipipe to opipe.
+ */
+static int splice_pipe_to_pipe(struct pipe_inode_info *ipipe,
+			       struct pipe_inode_info *opipe,
+			       size_t len, unsigned int flags)
+{
+	struct pipe_buffer *ibuf, *obuf;
+	int ret = 0, nbuf;
+	bool input_wakeup = false;
+
+
+retry:
+	ret = ipipe_prep(ipipe, flags);
+	if (ret)
+		return ret;
+
+	ret = opipe_prep(opipe, flags);
+	if (ret)
+		return ret;
+
+	/*
+	 * Potential ABBA deadlock, work around it by ordering lock
+	 * grabbing by pipe info address. Otherwise two different processes
+	 * could deadlock (one doing tee from A -> B, the other from B -> A).
+	 */
+	pipe_double_lock(ipipe, opipe);
+
+	do {
+		if (!opipe->readers) {
+			send_sig(SIGPIPE, current, 0);
+			if (!ret)
+				ret = -EPIPE;
+			break;
+		}
+
+		if (!ipipe->nrbufs && !ipipe->writers)
+			break;
+
+		/*
+		 * Cannot make any progress, because either the input
+		 * pipe is empty or the output pipe is full.
+		 */
+		if (!ipipe->nrbufs || opipe->nrbufs >= PIPE_BUFFERS) {
+			/* Already processed some buffers, break */
+			if (ret)
+				break;
+
+			if (flags & SPLICE_F_NONBLOCK) {
+				ret = -EAGAIN;
+				break;
+			}
+
+			/*
+			 * We raced with another reader/writer and haven't
+			 * managed to process any buffers.  A zero return
+			 * value means EOF, so retry instead.
+			 */
+			pipe_unlock(ipipe);
+			pipe_unlock(opipe);
+			goto retry;
+		}
+
+		ibuf = ipipe->bufs + ipipe->curbuf;
+		nbuf = (opipe->curbuf + opipe->nrbufs) % PIPE_BUFFERS;
+		obuf = opipe->bufs + nbuf;
+
+		if (len >= ibuf->len) {
+			/*
+			 * Simply move the whole buffer from ipipe to opipe
+			 */
+			*obuf = *ibuf;
+			ibuf->ops = NULL;
+			opipe->nrbufs++;
+			ipipe->curbuf = (ipipe->curbuf + 1) % PIPE_BUFFERS;
+			ipipe->nrbufs--;
+			input_wakeup = true;
+		} else {
+			/*
+			 * Get a reference to this pipe buffer,
+			 * so we can copy the contents over.
+			 */
+			ibuf->ops->get(ipipe, ibuf);
+			*obuf = *ibuf;
+
+			/*
+			 * Don't inherit the gift flag, we need to
+			 * prevent multiple steals of this page.
+			 */
+			obuf->flags &= ~PIPE_BUF_FLAG_GIFT;
+
+			obuf->len = len;
+			opipe->nrbufs++;
+			ibuf->offset += obuf->len;
+			ibuf->len -= obuf->len;
+		}
+		ret += obuf->len;
+		len -= obuf->len;
+	} while (len);
+
+	pipe_unlock(ipipe);
+	pipe_unlock(opipe);
+
+	/*
+	 * If we put data in the output pipe, wakeup any potential readers.
+	 */
+	if (ret > 0) {
+		smp_mb();
+		if (waitqueue_active(&opipe->wait))
+			wake_up_interruptible(&opipe->wait);
+		kill_fasync(&opipe->fasync_readers, SIGIO, POLL_IN);
+	}
+	if (input_wakeup)
+		wakeup_pipe_writers(ipipe);
+
+	return ret;
+}
+
+/*
  * Link contents of ipipe to opipe.
  */
 static int link_pipe(struct pipe_inode_info *ipipe,
@@ -1690,9 +1830,9 @@
 		 * Keep going, unless we encounter an error. The ipipe/opipe
 		 * ordering doesn't really matter.
 		 */
-		ret = link_ipipe_prep(ipipe, flags);
+		ret = ipipe_prep(ipipe, flags);
 		if (!ret) {
-			ret = link_opipe_prep(opipe, flags);
+			ret = opipe_prep(opipe, flags);
 			if (!ret)
 				ret = link_pipe(ipipe, opipe, len, flags);
 		}