Orangefs: implement .write_iter
Until now, orangefs_devreq_write_iter has just been a wrapper for
the old-fashioned orangefs_devreq_writev... linux would call
.write_iter with "struct kiocb *iocb" and "struct iov_iter *iter"
and .write_iter would just:
return pvfs2_devreq_writev(iocb->ki_filp,
iter->iov,
iter->nr_segs,
&iocb->ki_pos);
Signed-off-by: Mike Marshall <hubcap@omnibond.com>
diff --git a/fs/orangefs/devorangefs-req.c b/fs/orangefs/devorangefs-req.c
index e3bb15e..0f01d3e 100644
--- a/fs/orangefs/devorangefs-req.c
+++ b/fs/orangefs/devorangefs-req.c
@@ -245,304 +245,240 @@
}
/*
- * Function for writev() callers into the device. Readdir related
- * operations have an extra iovec containing info about objects
- * contained in directories.
+ * Function for writev() callers into the device.
+ *
+ * Userspace should have written:
+ * - __u32 version
+ * - __u32 magic
+ * - __u64 tag
+ * - struct orangefs_downcall_s
+ * - trailer buffer (in the case of READDIR operations)
*/
-static ssize_t orangefs_devreq_writev(struct file *file,
- const struct iovec *iov,
- size_t count,
- loff_t *offset)
-{
- struct orangefs_kernel_op_s *op = NULL;
- void *buffer = NULL;
- void *ptr = NULL;
- unsigned long i = 0;
- int num_remaining = MAX_DEV_REQ_DOWNSIZE;
- int ret = 0;
- /* num elements in iovec without trailer */
- int notrailer_count = 4;
- /*
- * If there's a trailer, its iov index will be equal to
- * notrailer_count.
- */
- int trailer_index = notrailer_count;
- int payload_size = 0;
- int returned_downcall_size = 0;
- __s32 magic = 0;
- __s32 proto_ver = 0;
- __u64 tag = 0;
- ssize_t total_returned_size = 0;
-
- /*
- * There will always be at least notrailer_count iovecs, and
- * when there's a trailer, one more than notrailer_count. Check
- * count's sanity.
- */
- if (count != notrailer_count && count != (notrailer_count + 1)) {
- gossip_err("%s: count:%zu: notrailer_count :%d:\n",
- __func__,
- count,
- notrailer_count);
- return -EPROTO;
- }
-
-
- /* Copy the non-trailer iovec data into a device request buffer. */
- buffer = dev_req_alloc();
- if (!buffer) {
- gossip_err("%s: dev_req_alloc failed.\n", __func__);
- return -ENOMEM;
- }
- ptr = buffer;
- for (i = 0; i < notrailer_count; i++) {
- if (iov[i].iov_len > num_remaining) {
- gossip_err
- ("writev error: Freeing buffer and returning\n");
- dev_req_release(buffer);
- return -EMSGSIZE;
- }
- ret = copy_from_user(ptr, iov[i].iov_base, iov[i].iov_len);
- if (ret) {
- gossip_err("Failed to copy data from user space\n");
- dev_req_release(buffer);
- return -EIO;
- }
- num_remaining -= iov[i].iov_len;
- ptr += iov[i].iov_len;
- payload_size += iov[i].iov_len;
- }
- total_returned_size = payload_size;
-
- /* these elements are currently 8 byte aligned (8 bytes for (version +
- * magic) 8 bytes for tag). If you add another element, either
- * make it 8 bytes big, or use get_unaligned when asigning.
- */
- ptr = buffer;
- proto_ver = *((__s32 *) ptr); /* unused */
- ptr += sizeof(__s32);
-
- magic = *((__s32 *) ptr);
- ptr += sizeof(__s32);
-
- tag = *((__u64 *) ptr);
- ptr += sizeof(__u64);
-
- if (magic != ORANGEFS_DEVREQ_MAGIC) {
- gossip_err("Error: Device magic number does not match.\n");
- dev_req_release(buffer);
- return -EPROTO;
- }
-
- op = orangefs_devreq_remove_op(tag);
- if (op) {
- /* Increase ref count! */
- get_op(op);
-
- /* calculate the size of the returned downcall. */
- returned_downcall_size =
- payload_size - (2 * sizeof(__s32) + sizeof(__u64));
-
- /* copy the passed in downcall into the op */
- if (returned_downcall_size ==
- sizeof(struct orangefs_downcall_s)) {
- memcpy(&op->downcall,
- ptr,
- sizeof(struct orangefs_downcall_s));
- } else {
- gossip_err("%s: returned downcall size:%d: \n",
- __func__,
- returned_downcall_size);
- dev_req_release(buffer);
- put_op(op);
- return -EMSGSIZE;
- }
-
- /* Don't tolerate an unexpected trailer iovec. */
- if ((op->downcall.trailer_size == 0) &&
- (count != notrailer_count)) {
- gossip_err("%s: unexpected trailer iovec.\n",
- __func__);
- dev_req_release(buffer);
- put_op(op);
- return -EPROTO;
- }
-
- /* Don't consider the trailer if there's a bad status. */
- if (op->downcall.status != 0)
- goto no_trailer;
-
- /* get the trailer if there is one. */
- if (op->downcall.trailer_size == 0)
- goto no_trailer;
-
- gossip_debug(GOSSIP_DEV_DEBUG,
- "%s: op->downcall.trailer_size %lld\n",
- __func__,
- op->downcall.trailer_size);
-
- /*
- * Bail if we think think there should be a trailer, but
- * there's no iovec for it.
- */
- if (count != (notrailer_count + 1)) {
- gossip_err("%s: trailer_size:%lld: count:%zu:\n",
- __func__,
- op->downcall.trailer_size,
- count);
- dev_req_release(buffer);
- put_op(op);
- return -EPROTO;
- }
-
- /* Verify that trailer_size is accurate. */
- if (op->downcall.trailer_size != iov[trailer_index].iov_len) {
- gossip_err("%s: trailer_size:%lld: != iov_len:%zd:\n",
- __func__,
- op->downcall.trailer_size,
- iov[trailer_index].iov_len);
- dev_req_release(buffer);
- put_op(op);
- return -EMSGSIZE;
- }
-
- total_returned_size += iov[trailer_index].iov_len;
-
- /*
- * Allocate a buffer, copy the trailer bytes into it and
- * attach it to the downcall.
- */
- op->downcall.trailer_buf = vmalloc(iov[trailer_index].iov_len);
- if (op->downcall.trailer_buf != NULL) {
- gossip_debug(GOSSIP_DEV_DEBUG, "vmalloc: %p\n",
- op->downcall.trailer_buf);
- ret = copy_from_user(op->downcall.trailer_buf,
- iov[trailer_index].iov_base,
- iov[trailer_index].iov_len);
- if (ret) {
- gossip_err("%s: Failed to copy trailer.\n",
- __func__);
- dev_req_release(buffer);
- gossip_debug(GOSSIP_DEV_DEBUG,
- "vfree: %p\n",
- op->downcall.trailer_buf);
- vfree(op->downcall.trailer_buf);
- op->downcall.trailer_buf = NULL;
- put_op(op);
- return -EIO;
- }
- } else {
- gossip_err("writev: could not vmalloc for trailer!\n");
- dev_req_release(buffer);
- put_op(op);
- return -ENOMEM;
- }
-
-no_trailer:
-
- /* if this operation is an I/O operation we need to wait
- * for all data to be copied before we can return to avoid
- * buffer corruption and races that can pull the buffers
- * out from under us.
- *
- * Essentially we're synchronizing with other parts of the
- * vfs implicitly by not allowing the user space
- * application reading/writing this device to return until
- * the buffers are done being used.
- */
- if (op->upcall.type == ORANGEFS_VFS_OP_FILE_IO) {
- int timed_out = 0;
- DEFINE_WAIT(wait_entry);
-
- /*
- * tell the vfs op waiting on a waitqueue
- * that this op is done
- */
- spin_lock(&op->lock);
- set_op_state_serviced(op);
- spin_unlock(&op->lock);
-
- wake_up_interruptible(&op->waitq);
-
- while (1) {
- spin_lock(&op->lock);
- prepare_to_wait_exclusive(
- &op->io_completion_waitq,
- &wait_entry,
- TASK_INTERRUPTIBLE);
- if (op->io_completed) {
- spin_unlock(&op->lock);
- break;
- }
- spin_unlock(&op->lock);
-
- if (!signal_pending(current)) {
- int timeout =
- MSECS_TO_JIFFIES(1000 *
- op_timeout_secs);
- if (!schedule_timeout(timeout)) {
- gossip_debug(GOSSIP_DEV_DEBUG,
- "%s: timed out.\n",
- __func__);
- timed_out = 1;
- break;
- }
- continue;
- }
-
- gossip_debug(GOSSIP_DEV_DEBUG,
- "%s: signal on I/O wait, aborting\n",
- __func__);
- break;
- }
-
- spin_lock(&op->lock);
- finish_wait(&op->io_completion_waitq, &wait_entry);
- spin_unlock(&op->lock);
-
- /* NOTE: for I/O operations we handle releasing the op
- * object except in the case of timeout. the reason we
- * can't free the op in timeout cases is that the op
- * service logic in the vfs retries operations using
- * the same op ptr, thus it can't be freed.
- */
- if (!timed_out)
- op_release(op);
- } else {
-
- /*
- * tell the vfs op waiting on a waitqueue that
- * this op is done
- */
- spin_lock(&op->lock);
- set_op_state_serviced(op);
- spin_unlock(&op->lock);
- /*
- * for every other operation (i.e. non-I/O), we need to
- * wake up the callers for downcall completion
- * notification
- */
- wake_up_interruptible(&op->waitq);
- }
- } else {
- /* ignore downcalls that we're not interested in */
- gossip_debug(GOSSIP_DEV_DEBUG,
- "WARNING: No one's waiting for tag %llu\n",
- llu(tag));
- }
- /* put_op? */
- dev_req_release(buffer);
-
- return total_returned_size;
-}
-
static ssize_t orangefs_devreq_write_iter(struct kiocb *iocb,
struct iov_iter *iter)
{
- return orangefs_devreq_writev(iocb->ki_filp,
- iter->iov,
- iter->nr_segs,
- &iocb->ki_pos);
+ ssize_t ret;
+ struct orangefs_kernel_op_s *op = NULL;
+ struct {
+ __u32 version;
+ __u32 magic;
+ __u64 tag;
+ } head;
+ int total = ret = iov_iter_count(iter);
+ int n;
+ int downcall_size = sizeof(struct orangefs_downcall_s);
+ int head_size = sizeof(head);
+
+ gossip_debug(GOSSIP_DEV_DEBUG, "%s: total:%d: ret:%zd:\n",
+ __func__,
+ total,
+ ret);
+
+ if (total < MAX_DEV_REQ_DOWNSIZE) {
+ gossip_err("%s: total:%d: must be at least:%lu:\n",
+ __func__,
+ total,
+ MAX_DEV_REQ_DOWNSIZE);
+ ret = -EFAULT;
+ goto out;
+ }
+
+ n = copy_from_iter(&head, head_size, iter);
+ if (n < head_size) {
+ gossip_err("%s: failed to copy head.\n", __func__);
+ ret = -EFAULT;
+ goto out;
+ }
+
+ if (head.version < ORANGEFS_MINIMUM_USERSPACE_VERSION) {
+ gossip_err("%s: userspace claims version"
+ "%d, minimum version required: %d.\n",
+ __func__,
+ head.version,
+ ORANGEFS_MINIMUM_USERSPACE_VERSION);
+ ret = -EPROTO;
+ goto out;
+ }
+
+ if (head.magic != ORANGEFS_DEVREQ_MAGIC) {
+ gossip_err("Error: Device magic number does not match.\n");
+ ret = -EPROTO;
+ goto out;
+ }
+
+ op = orangefs_devreq_remove_op(head.tag);
+ if (!op) {
+ gossip_err("WARNING: No one's waiting for tag %llu\n",
+ llu(head.tag));
+ goto out;
+ }
+
+ get_op(op); /* increase ref count. */
+
+ n = copy_from_iter(&op->downcall, downcall_size, iter);
+ if (n != downcall_size) {
+ gossip_err("%s: failed to copy downcall.\n", __func__);
+ put_op(op);
+ ret = -EFAULT;
+ goto out;
+ }
+
+ if (op->downcall.status)
+ goto wakeup;
+
+ /*
+ * We've successfully peeled off the head and the downcall.
+ * Something has gone awry if total doesn't equal the
+ * sum of head_size, downcall_size and trailer_size.
+ */
+ if ((head_size + downcall_size + op->downcall.trailer_size) != total) {
+ gossip_err("%s: funky write, head_size:%d"
+ ": downcall_size:%d: trailer_size:%lld"
+ ": total size:%d:\n",
+ __func__,
+ head_size,
+ downcall_size,
+ op->downcall.trailer_size,
+ total);
+ put_op(op);
+ ret = -EFAULT;
+ goto out;
+ }
+
+ /* Only READDIR operations should have trailers. */
+ if ((op->downcall.type != ORANGEFS_VFS_OP_READDIR) &&
+ (op->downcall.trailer_size != 0)) {
+ gossip_err("%s: %x operation with trailer.",
+ __func__,
+ op->downcall.type);
+ put_op(op);
+ ret = -EFAULT;
+ goto out;
+ }
+
+ /* READDIR operations should always have trailers. */
+ if ((op->downcall.type == ORANGEFS_VFS_OP_READDIR) &&
+ (op->downcall.trailer_size == 0)) {
+ gossip_err("%s: %x operation with no trailer.",
+ __func__,
+ op->downcall.type);
+ put_op(op);
+ ret = -EFAULT;
+ goto out;
+ }
+
+ if (op->downcall.type != ORANGEFS_VFS_OP_READDIR)
+ goto wakeup;
+
+ op->downcall.trailer_buf =
+ vmalloc(op->downcall.trailer_size);
+ if (op->downcall.trailer_buf == NULL) {
+ gossip_err("%s: failed trailer vmalloc.\n",
+ __func__);
+ put_op(op);
+ ret = -ENOMEM;
+ goto out;
+ }
+ memset(op->downcall.trailer_buf, 0, op->downcall.trailer_size);
+ n = copy_from_iter(op->downcall.trailer_buf,
+ op->downcall.trailer_size,
+ iter);
+ if (n != op->downcall.trailer_size) {
+ gossip_err("%s: failed to copy trailer.\n", __func__);
+ vfree(op->downcall.trailer_buf);
+ put_op(op);
+ ret = -EFAULT;
+ goto out;
+ }
+
+wakeup:
+
+ /*
+ * If this operation is an I/O operation we need to wait
+ * for all data to be copied before we can return to avoid
+ * buffer corruption and races that can pull the buffers
+ * out from under us.
+ *
+ * Essentially we're synchronizing with other parts of the
+ * vfs implicitly by not allowing the user space
+ * application reading/writing this device to return until
+ * the buffers are done being used.
+ */
+ if (op->downcall.type == ORANGEFS_VFS_OP_FILE_IO) {
+ int timed_out = 0;
+ DEFINE_WAIT(wait_entry);
+
+ /*
+ * tell the vfs op waiting on a waitqueue
+ * that this op is done
+ */
+ spin_lock(&op->lock);
+ set_op_state_serviced(op);
+ spin_unlock(&op->lock);
+
+ wake_up_interruptible(&op->waitq);
+
+ while (1) {
+ spin_lock(&op->lock);
+ prepare_to_wait_exclusive(
+ &op->io_completion_waitq,
+ &wait_entry,
+ TASK_INTERRUPTIBLE);
+ if (op->io_completed) {
+ spin_unlock(&op->lock);
+ break;
+ }
+ spin_unlock(&op->lock);
+
+ if (!signal_pending(current)) {
+ int timeout =
+ MSECS_TO_JIFFIES(1000 *
+ op_timeout_secs);
+ if (!schedule_timeout(timeout)) {
+ gossip_debug(GOSSIP_DEV_DEBUG,
+ "%s: timed out.\n",
+ __func__);
+ timed_out = 1;
+ break;
+ }
+ continue;
+ }
+
+ gossip_debug(GOSSIP_DEV_DEBUG,
+ "%s: signal on I/O wait, aborting\n",
+ __func__);
+ break;
+ }
+
+ spin_lock(&op->lock);
+ finish_wait(&op->io_completion_waitq, &wait_entry);
+ spin_unlock(&op->lock);
+
+ /* NOTE: for I/O operations we handle releasing the op
+ * object except in the case of timeout. the reason we
+ * can't free the op in timeout cases is that the op
+ * service logic in the vfs retries operations using
+ * the same op ptr, thus it can't be freed.
+ */
+ if (!timed_out)
+ op_release(op);
+ } else {
+ /*
+ * tell the vfs op waiting on a waitqueue that
+ * this op is done
+ */
+ spin_lock(&op->lock);
+ set_op_state_serviced(op);
+ spin_unlock(&op->lock);
+ /*
+ * for every other operation (i.e. non-I/O), we need to
+ * wake up the callers for downcall completion
+ * notification
+ */
+ wake_up_interruptible(&op->waitq);
+ }
+out:
+ return ret;
}
/* Returns whether any FS are still pending remounted */