Orangefs: implement .write_iter

Until now, orangefs_devreq_write_iter has just been a wrapper for
the old-fashioned orangefs_devreq_writev... linux would call
.write_iter with "struct kiocb *iocb" and "struct iov_iter *iter"
and .write_iter would just:

        return pvfs2_devreq_writev(iocb->ki_filp,
                                   iter->iov,
                                   iter->nr_segs,
                                   &iocb->ki_pos);

Signed-off-by: Mike Marshall <hubcap@omnibond.com>
diff --git a/fs/orangefs/devorangefs-req.c b/fs/orangefs/devorangefs-req.c
index e3bb15e..0f01d3e 100644
--- a/fs/orangefs/devorangefs-req.c
+++ b/fs/orangefs/devorangefs-req.c
@@ -245,304 +245,240 @@
 }
 
 /*
- * Function for writev() callers into the device. Readdir related
- * operations have an extra iovec containing info about objects
- * contained in directories.
+ * Function for writev() callers into the device.
+ *
+ * Userspace should have written:
+ *  - __u32 version
+ *  - __u32 magic
+ *  - __u64 tag
+ *  - struct orangefs_downcall_s
+ *  - trailer buffer (in the case of READDIR operations)
  */
-static ssize_t orangefs_devreq_writev(struct file *file,
-				   const struct iovec *iov,
-				   size_t count,
-				   loff_t *offset)
-{
-	struct orangefs_kernel_op_s *op = NULL;
-	void *buffer = NULL;
-	void *ptr = NULL;
-	unsigned long i = 0;
-	int num_remaining = MAX_DEV_REQ_DOWNSIZE;
-	int ret = 0;
-	/* num elements in iovec without trailer */
-	int notrailer_count = 4;
-	/*
-	 * If there's a trailer, its iov index will be equal to
-	 * notrailer_count.
-	 */
-	int trailer_index = notrailer_count;
-	int payload_size = 0;
-	int returned_downcall_size = 0;
-	__s32 magic = 0;
-	__s32 proto_ver = 0;
-	__u64 tag = 0;
-	ssize_t total_returned_size = 0;
-
-	/*
-	 * There will always be at least notrailer_count iovecs, and
-	 * when there's a trailer, one more than notrailer_count. Check
-	 * count's sanity.
-	 */
-	if (count != notrailer_count && count != (notrailer_count + 1)) {
-		gossip_err("%s: count:%zu: notrailer_count :%d:\n",
-			__func__,
-			count,
-			notrailer_count);
-		return -EPROTO;
-	}
-
-
-	/* Copy the non-trailer iovec data into a device request buffer. */
-	buffer = dev_req_alloc();
-	if (!buffer) {
-		gossip_err("%s: dev_req_alloc failed.\n", __func__);
-		return -ENOMEM;
-	}
-	ptr = buffer;
-	for (i = 0; i < notrailer_count; i++) {
-		if (iov[i].iov_len > num_remaining) {
-			gossip_err
-			    ("writev error: Freeing buffer and returning\n");
-			dev_req_release(buffer);
-			return -EMSGSIZE;
-		}
-		ret = copy_from_user(ptr, iov[i].iov_base, iov[i].iov_len);
-		if (ret) {
-			gossip_err("Failed to copy data from user space\n");
-			dev_req_release(buffer);
-			return -EIO;
-		}
-		num_remaining -= iov[i].iov_len;
-		ptr += iov[i].iov_len;
-		payload_size += iov[i].iov_len;
-	}
-	total_returned_size = payload_size;
-
-	/* these elements are currently 8 byte aligned (8 bytes for (version +
-	 * magic) 8 bytes for tag).  If you add another element, either
-	 * make it 8 bytes big, or use get_unaligned when asigning.
-	 */
-	ptr = buffer;
-	proto_ver = *((__s32 *) ptr); /* unused */
-	ptr += sizeof(__s32);
-
-	magic = *((__s32 *) ptr);
-	ptr += sizeof(__s32);
-
-	tag = *((__u64 *) ptr);
-	ptr += sizeof(__u64);
-
-	if (magic != ORANGEFS_DEVREQ_MAGIC) {
-		gossip_err("Error: Device magic number does not match.\n");
-		dev_req_release(buffer);
-		return -EPROTO;
-	}
-
-	op = orangefs_devreq_remove_op(tag);
-	if (op) {
-		/* Increase ref count! */
-		get_op(op);
-
-		/* calculate the size of the returned downcall. */
-		returned_downcall_size =
-			payload_size - (2 * sizeof(__s32) + sizeof(__u64));
-
-		/* copy the passed in downcall into the op */
-		if (returned_downcall_size ==
-			sizeof(struct orangefs_downcall_s)) {
-			memcpy(&op->downcall,
-			       ptr,
-			       sizeof(struct orangefs_downcall_s));
-		} else {
-			gossip_err("%s: returned downcall size:%d: \n",
-				   __func__,
-				   returned_downcall_size);
-			dev_req_release(buffer);
-			put_op(op);
-			return -EMSGSIZE;
-		}
-
-		/* Don't tolerate an unexpected trailer iovec. */
-		if ((op->downcall.trailer_size == 0) &&
-		    (count != notrailer_count)) {
-			gossip_err("%s: unexpected trailer iovec.\n",
-				   __func__);
-			dev_req_release(buffer);
-			put_op(op);
-			return -EPROTO;
-		}
-
-		/* Don't consider the trailer if there's a bad status. */
-		if (op->downcall.status != 0)
-			goto no_trailer;
-
-		/* get the trailer if there is one. */
-		if (op->downcall.trailer_size == 0)
-			goto no_trailer;
-
-		gossip_debug(GOSSIP_DEV_DEBUG,
-			     "%s: op->downcall.trailer_size %lld\n",
-			     __func__,
-			     op->downcall.trailer_size);
-
-		/*
-		 * Bail if we think think there should be a trailer, but
-		 * there's no iovec for it.
-		 */
-		if (count != (notrailer_count + 1)) {
-			gossip_err("%s: trailer_size:%lld: count:%zu:\n",
-				   __func__,
-				   op->downcall.trailer_size,
-				   count);
-			dev_req_release(buffer);
-			put_op(op);
-			return -EPROTO;
-		}
-
-		/* Verify that trailer_size is accurate. */
-		if (op->downcall.trailer_size != iov[trailer_index].iov_len) {
-			gossip_err("%s: trailer_size:%lld: != iov_len:%zd:\n",
-				   __func__,
-				   op->downcall.trailer_size,
-				   iov[trailer_index].iov_len);
-			dev_req_release(buffer);
-			put_op(op);
-			return -EMSGSIZE;
-		}
-
-		total_returned_size += iov[trailer_index].iov_len;
-
-		/*
-		 * Allocate a buffer, copy the trailer bytes into it and
-		 * attach it to the downcall.
-		 */
-		op->downcall.trailer_buf = vmalloc(iov[trailer_index].iov_len);
-		if (op->downcall.trailer_buf != NULL) {
-			gossip_debug(GOSSIP_DEV_DEBUG, "vmalloc: %p\n",
-				     op->downcall.trailer_buf);
-			ret = copy_from_user(op->downcall.trailer_buf,
-					     iov[trailer_index].iov_base,
-					     iov[trailer_index].iov_len);
-			if (ret) {
-				gossip_err("%s: Failed to copy trailer.\n",
-					   __func__);
-				dev_req_release(buffer);
-				gossip_debug(GOSSIP_DEV_DEBUG,
-					     "vfree: %p\n",
-					     op->downcall.trailer_buf);
-				vfree(op->downcall.trailer_buf);
-				op->downcall.trailer_buf = NULL;
-				put_op(op);
-				return -EIO;
-			}
-		} else {
-			gossip_err("writev: could not vmalloc for trailer!\n");
-			dev_req_release(buffer);
-			put_op(op);
-			return -ENOMEM;
-		}
-
-no_trailer:
-
-		/* if this operation is an I/O operation we need to wait
-		 * for all data to be copied before we can return to avoid
-		 * buffer corruption and races that can pull the buffers
-		 * out from under us.
-		 *
-		 * Essentially we're synchronizing with other parts of the
-		 * vfs implicitly by not allowing the user space
-		 * application reading/writing this device to return until
-		 * the buffers are done being used.
-		 */
-		if (op->upcall.type == ORANGEFS_VFS_OP_FILE_IO) {
-			int timed_out = 0;
-			DEFINE_WAIT(wait_entry);
-
-			/*
-			 * tell the vfs op waiting on a waitqueue
-			 * that this op is done
-			 */
-			spin_lock(&op->lock);
-			set_op_state_serviced(op);
-			spin_unlock(&op->lock);
-
-			wake_up_interruptible(&op->waitq);
-
-			while (1) {
-				spin_lock(&op->lock);
-				prepare_to_wait_exclusive(
-					&op->io_completion_waitq,
-					&wait_entry,
-					TASK_INTERRUPTIBLE);
-				if (op->io_completed) {
-					spin_unlock(&op->lock);
-					break;
-				}
-				spin_unlock(&op->lock);
-
-				if (!signal_pending(current)) {
-					int timeout =
-					    MSECS_TO_JIFFIES(1000 *
-							     op_timeout_secs);
-					if (!schedule_timeout(timeout)) {
-						gossip_debug(GOSSIP_DEV_DEBUG,
-							"%s: timed out.\n",
-							__func__);
-						timed_out = 1;
-						break;
-					}
-					continue;
-				}
-
-				gossip_debug(GOSSIP_DEV_DEBUG,
-					"%s: signal on I/O wait, aborting\n",
-					__func__);
-				break;
-			}
-
-			spin_lock(&op->lock);
-			finish_wait(&op->io_completion_waitq, &wait_entry);
-			spin_unlock(&op->lock);
-
-			/* NOTE: for I/O operations we handle releasing the op
-			 * object except in the case of timeout.  the reason we
-			 * can't free the op in timeout cases is that the op
-			 * service logic in the vfs retries operations using
-			 * the same op ptr, thus it can't be freed.
-			 */
-			if (!timed_out)
-				op_release(op);
-		} else {
-
-			/*
-			 * tell the vfs op waiting on a waitqueue that
-			 * this op is done
-			 */
-			spin_lock(&op->lock);
-			set_op_state_serviced(op);
-			spin_unlock(&op->lock);
-			/*
-			 * for every other operation (i.e. non-I/O), we need to
-			 * wake up the callers for downcall completion
-			 * notification
-			 */
-			wake_up_interruptible(&op->waitq);
-		}
-	} else {
-		/* ignore downcalls that we're not interested in */
-		gossip_debug(GOSSIP_DEV_DEBUG,
-			     "WARNING: No one's waiting for tag %llu\n",
-			     llu(tag));
-	}
-	/* put_op? */
-	dev_req_release(buffer);
-
-	return total_returned_size;
-}
-
 static ssize_t orangefs_devreq_write_iter(struct kiocb *iocb,
 				      struct iov_iter *iter)
 {
-	return orangefs_devreq_writev(iocb->ki_filp,
-				   iter->iov,
-				   iter->nr_segs,
-				   &iocb->ki_pos);
+	ssize_t ret;
+	struct orangefs_kernel_op_s *op = NULL;
+	struct {
+		__u32 version;
+		__u32 magic;
+		__u64 tag;
+	} head;
+	int total = ret = iov_iter_count(iter);
+	int n;
+	int downcall_size = sizeof(struct orangefs_downcall_s);
+	int head_size = sizeof(head);
+
+	gossip_debug(GOSSIP_DEV_DEBUG, "%s: total:%d: ret:%zd:\n",
+		     __func__,
+		     total,
+		     ret);
+
+        if (total < MAX_DEV_REQ_DOWNSIZE) {
+		gossip_err("%s: total:%d: must be at least:%lu:\n",
+			   __func__,
+			   total,
+			   MAX_DEV_REQ_DOWNSIZE);
+		ret = -EFAULT;
+		goto out;
+	}
+     
+	n = copy_from_iter(&head, head_size, iter);
+	if (n < head_size) {
+		gossip_err("%s: failed to copy head.\n", __func__);
+		ret = -EFAULT;
+		goto out;
+	}
+
+	if (head.version < ORANGEFS_MINIMUM_USERSPACE_VERSION) {
+		gossip_err("%s: userspace claims version"
+			   "%d, minimum version required: %d.\n",
+			   __func__,
+			   head.version,
+			   ORANGEFS_MINIMUM_USERSPACE_VERSION);
+		ret = -EPROTO;
+		goto out;
+	}
+
+	if (head.magic != ORANGEFS_DEVREQ_MAGIC) {
+		gossip_err("Error: Device magic number does not match.\n");
+		ret = -EPROTO;
+		goto out;
+	}
+
+	op = orangefs_devreq_remove_op(head.tag);
+	if (!op) {
+		gossip_err("WARNING: No one's waiting for tag %llu\n",
+			   llu(head.tag));
+		goto out;
+	}
+
+	get_op(op); /* increase ref count. */
+
+	n = copy_from_iter(&op->downcall, downcall_size, iter);
+	if (n != downcall_size) {
+		gossip_err("%s: failed to copy downcall.\n", __func__);
+		put_op(op);
+		ret = -EFAULT;
+		goto out;
+	}
+
+	if (op->downcall.status)
+		goto wakeup;
+
+	/*
+	 * We've successfully peeled off the head and the downcall. 
+	 * Something has gone awry if total doesn't equal the
+	 * sum of head_size, downcall_size and trailer_size.
+	 */
+	if ((head_size + downcall_size + op->downcall.trailer_size) != total) {
+		gossip_err("%s: funky write, head_size:%d"
+			   ": downcall_size:%d: trailer_size:%lld"
+			   ": total size:%d:\n",
+			   __func__,
+			   head_size,
+			   downcall_size,
+			   op->downcall.trailer_size,
+			   total);
+		put_op(op);
+		ret = -EFAULT;
+		goto out;
+	}
+
+	/* Only READDIR operations should have trailers. */
+	if ((op->downcall.type != ORANGEFS_VFS_OP_READDIR) &&
+	    (op->downcall.trailer_size != 0)) {
+		gossip_err("%s: %x operation with trailer.",
+			   __func__,
+			   op->downcall.type);
+		put_op(op);
+		ret = -EFAULT;
+		goto out;
+	}
+
+	/* READDIR operations should always have trailers. */
+	if ((op->downcall.type == ORANGEFS_VFS_OP_READDIR) &&
+	    (op->downcall.trailer_size == 0)) {
+		gossip_err("%s: %x operation with no trailer.",
+			   __func__,
+			   op->downcall.type);
+		put_op(op);
+		ret = -EFAULT;
+		goto out;
+	}
+
+	if (op->downcall.type != ORANGEFS_VFS_OP_READDIR)
+		goto wakeup;
+
+	op->downcall.trailer_buf =
+		vmalloc(op->downcall.trailer_size);
+	if (op->downcall.trailer_buf == NULL) {
+		gossip_err("%s: failed trailer vmalloc.\n",
+			   __func__);
+		put_op(op);
+		ret = -ENOMEM;
+		goto out;
+	}
+	memset(op->downcall.trailer_buf, 0, op->downcall.trailer_size);
+	n = copy_from_iter(op->downcall.trailer_buf,
+			   op->downcall.trailer_size,
+			   iter);
+	if (n != op->downcall.trailer_size) {
+		gossip_err("%s: failed to copy trailer.\n", __func__);
+		vfree(op->downcall.trailer_buf);
+		put_op(op);
+		ret = -EFAULT;
+		goto out;
+	}
+
+wakeup:
+
+	/*
+	 * If this operation is an I/O operation we need to wait
+	 * for all data to be copied before we can return to avoid
+	 * buffer corruption and races that can pull the buffers
+	 * out from under us.
+	 *
+	 * Essentially we're synchronizing with other parts of the
+	 * vfs implicitly by not allowing the user space
+	 * application reading/writing this device to return until
+	 * the buffers are done being used.
+	 */
+	if (op->downcall.type == ORANGEFS_VFS_OP_FILE_IO) {
+		int timed_out = 0;
+		DEFINE_WAIT(wait_entry);
+
+		/*
+		 * tell the vfs op waiting on a waitqueue
+		 * that this op is done
+		 */
+		spin_lock(&op->lock);
+		set_op_state_serviced(op);
+		spin_unlock(&op->lock);
+
+		wake_up_interruptible(&op->waitq);
+
+		while (1) {
+			spin_lock(&op->lock);
+			prepare_to_wait_exclusive(
+				&op->io_completion_waitq,
+				&wait_entry,
+				TASK_INTERRUPTIBLE);
+			if (op->io_completed) {
+				spin_unlock(&op->lock);
+				break;
+			}
+			spin_unlock(&op->lock);
+
+			if (!signal_pending(current)) {
+				int timeout =
+				    MSECS_TO_JIFFIES(1000 *
+						     op_timeout_secs);
+				if (!schedule_timeout(timeout)) {
+					gossip_debug(GOSSIP_DEV_DEBUG,
+						"%s: timed out.\n",
+						__func__);
+					timed_out = 1;
+					break;
+				}
+				continue;
+			}
+
+			gossip_debug(GOSSIP_DEV_DEBUG,
+				"%s: signal on I/O wait, aborting\n",
+				__func__);
+			break;
+		}
+
+		spin_lock(&op->lock);
+		finish_wait(&op->io_completion_waitq, &wait_entry);
+		spin_unlock(&op->lock);
+
+		/* NOTE: for I/O operations we handle releasing the op
+		 * object except in the case of timeout.  the reason we
+		 * can't free the op in timeout cases is that the op
+		 * service logic in the vfs retries operations using
+		 * the same op ptr, thus it can't be freed.
+		 */
+		if (!timed_out)
+			op_release(op);
+	} else {
+		/*
+		 * tell the vfs op waiting on a waitqueue that
+		 * this op is done
+		 */
+		spin_lock(&op->lock);
+		set_op_state_serviced(op);
+		spin_unlock(&op->lock);
+		/*
+		 * for every other operation (i.e. non-I/O), we need to
+		 * wake up the callers for downcall completion
+		 * notification
+		 */
+		wake_up_interruptible(&op->waitq);
+	}
+out:
+	return ret;
 }
 
 /* Returns whether any FS are still pending remounted */