[PATCH] relay: add sendfile() support

Signed-off-by: Jens Axboe <axboe@suse.de>
diff --git a/kernel/relay.c b/kernel/relay.c
index 9358e8e..fefe2b2 100644
--- a/kernel/relay.c
+++ b/kernel/relay.c
@@ -95,15 +95,16 @@
  *	@buf: the buffer struct
  *	@size: total size of the buffer
  *
- *	Returns a pointer to the resulting buffer, NULL if unsuccessful
+ *	Returns a pointer to the resulting buffer, NULL if unsuccessful. The
+ *	passed in size will get page aligned, if it isn't already.
  */
-static void *relay_alloc_buf(struct rchan_buf *buf, unsigned long size)
+static void *relay_alloc_buf(struct rchan_buf *buf, size_t *size)
 {
 	void *mem;
 	unsigned int i, j, n_pages;
 
-	size = PAGE_ALIGN(size);
-	n_pages = size >> PAGE_SHIFT;
+	*size = PAGE_ALIGN(*size);
+	n_pages = *size >> PAGE_SHIFT;
 
 	buf->page_array = kcalloc(n_pages, sizeof(struct page *), GFP_KERNEL);
 	if (!buf->page_array)
@@ -118,7 +119,7 @@
 	if (!mem)
 		goto depopulate;
 
-	memset(mem, 0, size);
+	memset(mem, 0, *size);
 	buf->page_count = n_pages;
 	return mem;
 
@@ -146,7 +147,7 @@
 	if (!buf->padding)
 		goto free_buf;
 
-	buf->start = relay_alloc_buf(buf, chan->alloc_size);
+	buf->start = relay_alloc_buf(buf, &chan->alloc_size);
 	if (!buf->start)
 		goto free_buf;
 
@@ -543,6 +544,9 @@
 		old_subbuf = buf->subbufs_produced % buf->chan->n_subbufs;
 		buf->padding[old_subbuf] = buf->prev_padding;
 		buf->subbufs_produced++;
+		buf->dentry->d_inode->i_size += buf->chan->subbuf_size -
+			buf->padding[old_subbuf];
+		smp_mb();
 		if (waitqueue_active(&buf->read_wait)) {
 			PREPARE_WORK(&buf->wake_readers, wakeup_readers, buf);
 			schedule_delayed_work(&buf->wake_readers, 1);
@@ -757,37 +761,33 @@
  */
 static int relay_file_read_avail(struct rchan_buf *buf, size_t read_pos)
 {
-	size_t bytes_produced, bytes_consumed, write_offset;
 	size_t subbuf_size = buf->chan->subbuf_size;
 	size_t n_subbufs = buf->chan->n_subbufs;
-	size_t produced = buf->subbufs_produced % n_subbufs;
-	size_t consumed = buf->subbufs_consumed % n_subbufs;
-
-	write_offset = buf->offset > subbuf_size ? subbuf_size : buf->offset;
-
-	if (consumed > produced) {
-		if ((produced > n_subbufs) &&
-		    (produced + n_subbufs - consumed <= n_subbufs))
-			produced += n_subbufs;
-	} else if (consumed == produced) {
-		if (buf->offset > subbuf_size) {
-			produced += n_subbufs;
-			if (buf->subbufs_produced == buf->subbufs_consumed)
-				consumed += n_subbufs;
-		}
-	}
-
-	if (buf->offset > subbuf_size)
-		bytes_produced = (produced - 1) * subbuf_size + write_offset;
-	else
-		bytes_produced = produced * subbuf_size + write_offset;
-	bytes_consumed = consumed * subbuf_size + buf->bytes_consumed;
-
-	if (bytes_produced == bytes_consumed)
-		return 0;
+	size_t produced = buf->subbufs_produced;
+	size_t consumed = buf->subbufs_consumed;
 
 	relay_file_read_consume(buf, read_pos, 0);
 
+	if (unlikely(buf->offset > subbuf_size)) {
+		if (produced == consumed)
+			return 0;
+		return 1;
+	}
+
+	if (unlikely(produced - consumed >= n_subbufs)) {
+		consumed = (produced / n_subbufs) * n_subbufs;
+		buf->subbufs_consumed = consumed;
+	}
+	
+	produced = (produced % n_subbufs) * subbuf_size + buf->offset;
+	consumed = (consumed % n_subbufs) * subbuf_size + buf->bytes_consumed;
+
+	if (consumed > produced)
+		produced += n_subbufs * subbuf_size;
+	
+	if (consumed == produced)
+		return 0;
+
 	return 1;
 }
 
@@ -908,6 +908,91 @@
 	return ret;
 }
 
+static ssize_t relay_file_sendsubbuf(struct file *filp, loff_t *ppos,
+				     size_t count, read_actor_t actor,
+				     void *target)
+{
+	struct rchan_buf *buf = filp->private_data;
+	read_descriptor_t desc;
+	size_t read_start, avail;
+	unsigned long pidx, poff;
+	unsigned int subbuf_pages;
+	ssize_t ret = 0;
+
+	if (!relay_file_read_avail(buf, *ppos))
+		return 0;
+
+	read_start = relay_file_read_start_pos(*ppos, buf);
+	avail = relay_file_read_subbuf_avail(read_start, buf);
+	if (!avail)
+		return 0;
+
+	count = min(count, avail);
+
+	desc.written = 0;
+	desc.count = count;
+	desc.arg.data = target;
+	desc.error = 0;
+
+	subbuf_pages = buf->chan->alloc_size >> PAGE_SHIFT;
+	pidx = (read_start / PAGE_SIZE) % subbuf_pages;
+	poff = read_start & ~PAGE_MASK;
+	while (count) {
+		struct page *p = buf->page_array[pidx];
+		unsigned int len;
+
+		len = PAGE_SIZE - poff;
+		if (len > count)
+			len = count;
+
+		len = actor(&desc, p, poff, len);
+
+		if (desc.error) {
+			if (!ret)
+				ret = desc.error;
+			break;
+		}
+
+		count -= len;
+		ret += len;
+		poff = 0;
+		pidx = (pidx + 1) % subbuf_pages;
+	}
+
+	if (ret > 0) {
+		relay_file_read_consume(buf, read_start, ret);
+		*ppos = relay_file_read_end_pos(buf, read_start, ret);
+	}
+
+	return ret;
+}
+
+static ssize_t relay_file_sendfile(struct file *filp, loff_t *ppos,
+				   size_t count, read_actor_t actor,
+				   void *target)
+{
+	ssize_t sent = 0, ret = 0;
+
+	if (!count)
+		return 0;
+
+	mutex_lock(&filp->f_dentry->d_inode->i_mutex);
+
+	do {
+		ret = relay_file_sendsubbuf(filp, ppos, count, actor, target);
+		if (ret < 0) {
+			if (!sent)
+				sent = ret;
+			break;
+		}
+		count -= ret;
+		sent += ret;
+	} while (count && ret);
+
+	mutex_unlock(&filp->f_dentry->d_inode->i_mutex);
+	return sent;
+}
+
 struct file_operations relay_file_operations = {
 	.open		= relay_file_open,
 	.poll		= relay_file_poll,
@@ -915,5 +1000,6 @@
 	.read		= relay_file_read,
 	.llseek		= no_llseek,
 	.release	= relay_file_release,
+	.sendfile       = relay_file_sendfile,
 };
 EXPORT_SYMBOL_GPL(relay_file_operations);