Change IO engine queuing

Instead of always pretending to be async, let the IO engines
return FIO_Q_COMPLETED or FIO_Q_QUEUED to signal async or
sync completions regardless of their nature. This cleans up
the queuing model quite a bit.

Also fixed a verification error spotted while doing this
transformation.

The main intent of this is to allow queuing more than 1 piece
of IO at the time, that will come in a later changeset.

Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
diff --git a/engines/libaio.c b/engines/libaio.c
index c2f47d8..ba8c49d 100644
--- a/engines/libaio.c
+++ b/engines/libaio.c
@@ -18,7 +18,6 @@
 struct libaio_data {
 	io_context_t aio_ctx;
 	struct io_event *aio_events;
-	struct io_u *sync_io_u;
 };
 
 static int fio_libaio_prep(struct thread_data fio_unused *td, struct io_u *io_u)
@@ -41,13 +40,6 @@
 {
 	struct libaio_data *ld = td->io_ops->data;
 
-	if (ld->sync_io_u) {
-		struct io_u *ret = ld->sync_io_u;
-
-		ld->sync_io_u = NULL;
-		return ret;
-	}
-
 	return ev_to_iou(ld->aio_events + event);
 }
 
@@ -57,9 +49,6 @@
 	struct libaio_data *ld = td->io_ops->data;
 	long r;
 
-	if (ld->sync_io_u)
-		return 1;
-
 	do {
 		r = io_getevents(ld->aio_ctx, min, max, ld->aio_events, t);
 		if (r >= min)
@@ -88,7 +77,7 @@
 	do {
 		ret = io_submit(ld->aio_ctx, 1, &iocb);
 		if (ret == 1)
-			break;
+			return FIO_Q_QUEUED;
 		else if (ret == -EAGAIN || !ret)
 			usleep(100);
 		else if (ret == -EINTR)
@@ -103,10 +92,8 @@
 			 */
 			if (fsync(io_u->file->fd) < 0)
 				ret = errno;
-			else {
-				ret = 1;
-				ld->sync_io_u = io_u;
-			}
+			else
+				ret = FIO_Q_COMPLETED;
 			break;
 		} else
 			break;
@@ -116,10 +103,10 @@
 		io_u->resid = io_u->xfer_buflen;
 		io_u->error = -ret;
 		td_verror(td, io_u->error);
-		return 1;
+		return FIO_Q_COMPLETED;
 	}
 
-	return 0;
+	return ret;
 }
 
 static int fio_libaio_cancel(struct thread_data *td, struct io_u *io_u)
diff --git a/engines/mmap.c b/engines/mmap.c
index b214319..d32fe96 100644
--- a/engines/mmap.c
+++ b/engines/mmap.c
@@ -12,40 +12,10 @@
 #include "../fio.h"
 #include "../os.h"
 
-struct mmapio_data {
-	struct io_u *last_io_u;
-};
-
-static int fio_mmapio_getevents(struct thread_data *td, int fio_unused min,
-				int max, struct timespec fio_unused *t)
-{
-	assert(max <= 1);
-
-	/*
-	 * we can only have one finished io_u for sync io, since the depth
-	 * is always 1
-	 */
-	if (list_empty(&td->io_u_busylist))
-		return 0;
-
-	return 1;
-}
-
-static struct io_u *fio_mmapio_event(struct thread_data *td, int event)
-{
-	struct mmapio_data *sd = td->io_ops->data;
-
-	assert(event == 0);
-
-	return sd->last_io_u;
-}
-
-
 static int fio_mmapio_queue(struct thread_data *td, struct io_u *io_u)
 {
 	struct fio_file *f = io_u->file;
 	unsigned long long real_off = io_u->offset - f->file_offset;
-	struct mmapio_data *sd = td->io_ops->data;
 
 	if (io_u->ddir == DDIR_READ)
 		memcpy(io_u->xfer_buf, f->mmap + real_off, io_u->xfer_buflen);
@@ -66,39 +36,16 @@
 			io_u->error = errno;
 	}
 
-	if (!io_u->error)
-		sd->last_io_u = io_u;
-	else
+	if (io_u->error)
 		td_verror(td, io_u->error);
 
-	return io_u->error;
-}
-
-static void fio_mmapio_cleanup(struct thread_data *td)
-{
-	if (td->io_ops->data) {
-		free(td->io_ops->data);
-		td->io_ops->data = NULL;
-	}
-}
-
-static int fio_mmapio_init(struct thread_data *td)
-{
-	struct mmapio_data *sd = malloc(sizeof(*sd));
-
-	sd->last_io_u = NULL;
-	td->io_ops->data = sd;
-	return 0;
+	return FIO_Q_COMPLETED;
 }
 
 static struct ioengine_ops ioengine = {
 	.name		= "mmap",
 	.version	= FIO_IOOPS_VERSION,
-	.init		= fio_mmapio_init,
 	.queue		= fio_mmapio_queue,
-	.getevents	= fio_mmapio_getevents,
-	.event		= fio_mmapio_event,
-	.cleanup	= fio_mmapio_cleanup,
 	.flags		= FIO_SYNCIO | FIO_MMAPIO,
 };
 
diff --git a/engines/net.c b/engines/net.c
index 0231451..4f070f9 100644
--- a/engines/net.c
+++ b/engines/net.c
@@ -14,46 +14,17 @@
 #include "../fio.h"
 #include "../os.h"
 
-struct net_data {
-	int send_to_net;
-	struct io_u *last_io_u;
-};
-
-static int fio_netio_getevents(struct thread_data *td, int fio_unused min,
-				int max, struct timespec fio_unused *t)
-{
-	assert(max <= 1);
-
-	/*
-	 * we can only have one finished io_u for sync io, since the depth
-	 * is always 1
-	 */
-	if (list_empty(&td->io_u_busylist))
-		return 0;
-
-	return 1;
-}
-
-static struct io_u *fio_netio_event(struct thread_data *td, int event)
-{
-	struct net_data *nd = td->io_ops->data;
-
-	assert(event == 0);
-
-	return nd->last_io_u;
-}
+#define send_to_net(td)	((td)->io_ops->priv)
 
 static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
 {
-	struct net_data *nd = td->io_ops->data;
 	struct fio_file *f = io_u->file;
 
 	/*
 	 * Make sure we don't see spurious reads to a receiver, and vice versa
 	 */
-	if ((nd->send_to_net && io_u->ddir == DDIR_READ) ||
-	    (!nd->send_to_net && io_u->ddir == DDIR_WRITE)) {
-		printf("boo!\n");
+	if ((send_to_net(td) && io_u->ddir == DDIR_READ) ||
+	    (!send_to_net(td) && io_u->ddir == DDIR_WRITE)) {
 		td_verror(td, EINVAL);
 		return 1;
 	}
@@ -73,7 +44,6 @@
 
 static int fio_netio_queue(struct thread_data *td, struct io_u *io_u)
 {
-	struct net_data *nd = td->io_ops->data;
 	struct fio_file *f = io_u->file;
 	int ret, flags = 0;
 
@@ -96,17 +66,15 @@
 		if (ret > 0) {
 			io_u->resid = io_u->xfer_buflen - ret;
 			io_u->error = 0;
-			return ret;
+			return FIO_Q_COMPLETED;
 		} else
 			io_u->error = errno;
 	}
 
-	if (!io_u->error)
-		nd->last_io_u = io_u;
-	else
+	if (io_u->error)
 		td_verror(td, io_u->error);
 
-	return io_u->error;
+	return FIO_Q_COMPLETED;
 }
 
 static int fio_netio_setup_connect(struct thread_data *td, const char *host,
@@ -245,7 +213,6 @@
 static int fio_netio_setup(struct thread_data *td)
 {
 	char host[64], buf[128];
-	struct net_data *nd;
 	unsigned short port;
 	struct fio_file *f;
 	char *sep;
@@ -256,14 +223,6 @@
 		return 1;
 	}
 
-	/*
-	 * work around for late init call
-	 */
-	if (td->io_ops->init(td))
-		return 1;
-
-	nd = td->io_ops->data;
-
 	if (td->iomix) {
 		log_err("fio: network connections must be read OR write\n");
 		return 1;
@@ -283,10 +242,10 @@
 	port = atoi(sep);
 
 	if (td->ddir == DDIR_READ) {
-		nd->send_to_net = 0;
+		send_to_net(td) = 0;
 		ret = fio_netio_setup_listen(td, port);
 	} else {
-		nd->send_to_net = 1;
+		send_to_net(td) = 1;
 		ret = fio_netio_setup_connect(td, host, port);
 	}
 
@@ -304,40 +263,11 @@
 	return 0;
 }
 
-static void fio_netio_cleanup(struct thread_data *td)
-{
-	if (td->io_ops->data) {
-		free(td->io_ops->data);
-		td->io_ops->data = NULL;
-	}
-}
-
-static int fio_netio_init(struct thread_data *td)
-{
-	struct net_data *nd;
-
-	/*
-	 * Hack to work-around the ->setup() function calling init on its
-	 * own, since it needs ->io_ops->data to be set up.
-	 */
-	if (td->io_ops->data)
-		return 0;
-
-	nd  = malloc(sizeof(*nd));
-	nd->last_io_u = NULL;
-	td->io_ops->data = nd;
-	return 0;
-}
-
 static struct ioengine_ops ioengine = {
 	.name		= "net",
 	.version	= FIO_IOOPS_VERSION,
-	.init		= fio_netio_init,
 	.prep		= fio_netio_prep,
 	.queue		= fio_netio_queue,
-	.getevents	= fio_netio_getevents,
-	.event		= fio_netio_event,
-	.cleanup	= fio_netio_cleanup,
 	.setup		= fio_netio_setup,
 	.flags		= FIO_SYNCIO | FIO_NETIO,
 };
diff --git a/engines/null.c b/engines/null.c
index 7b4b217..6bd51ba 100644
--- a/engines/null.c
+++ b/engines/null.c
@@ -11,65 +11,17 @@
 #include "../fio.h"
 #include "../os.h"
 
-struct null_data {
-	struct io_u *last_io_u;
-};
-
-static int fio_null_getevents(struct thread_data *td, int fio_unused min,
-			      int max, struct timespec fio_unused *t)
+static int fio_null_queue(struct thread_data fio_unused *td, struct io_u *io_u)
 {
-	assert(max <= 1);
-
-	if (list_empty(&td->io_u_busylist))
-		return 0;
-
-	return 1;
-}
-
-static struct io_u *fio_null_event(struct thread_data *td, int event)
-{
-	struct null_data *nd = td->io_ops->data;
-
-	assert(event == 0);
-
-	return nd->last_io_u;
-}
-
-static int fio_null_queue(struct thread_data *td, struct io_u *io_u)
-{
-	struct null_data *nd = td->io_ops->data;
-
 	io_u->resid = 0;
 	io_u->error = 0;
-	nd->last_io_u = io_u;
-	return 0;
-}
-
-static void fio_null_cleanup(struct thread_data *td)
-{
-	if (td->io_ops->data) {
-		free(td->io_ops->data);
-		td->io_ops->data = NULL;
-	}
-}
-
-static int fio_null_init(struct thread_data *td)
-{
-	struct null_data *nd = malloc(sizeof(*nd));
-
-	nd->last_io_u = NULL;
-	td->io_ops->data = nd;
-	return 0;
+	return FIO_Q_COMPLETED;
 }
 
 static struct ioengine_ops ioengine = {
 	.name		= "null",
 	.version	= FIO_IOOPS_VERSION,
-	.init		= fio_null_init,
 	.queue		= fio_null_queue,
-	.getevents	= fio_null_getevents,
-	.event		= fio_null_event,
-	.cleanup	= fio_null_cleanup,
 	.flags		= FIO_SYNCIO | FIO_NULLIO,
 };
 
diff --git a/engines/posixaio.c b/engines/posixaio.c
index 2fc56cd..a56ab3a 100644
--- a/engines/posixaio.c
+++ b/engines/posixaio.c
@@ -154,9 +154,10 @@
 	if (ret) {
 		io_u->error = errno;
 		td_verror(td, io_u->error);
+		return FIO_Q_COMPLETED;
 	}
-		
-	return io_u->error;
+
+	return FIO_Q_QUEUED;
 }
 
 static void fio_posixaio_cleanup(struct thread_data *td)
diff --git a/engines/sg.c b/engines/sg.c
index 8d086bf..2713976 100644
--- a/engines/sg.c
+++ b/engines/sg.c
@@ -48,21 +48,6 @@
 	}
 }
 
-static int fio_sgio_ioctl_getevents(struct thread_data *td, int fio_unused min,
-				    int max, struct timespec fio_unused *t)
-{
-	assert(max <= 1);
-
-	/*
-	 * we can only have one finished io_u for sync io, since the depth
-	 * is always 1
-	 */
-	if (list_empty(&td->io_u_busylist))
-		return 0;
-
-	return 1;
-}
-
 static int pollin_events(struct pollfd *pfds, int fds)
 {
 	int i;
@@ -171,10 +156,15 @@
 {
 	struct sgio_data *sd = td->io_ops->data;
 	struct sg_io_hdr *hdr = &io_u->hdr;
+	int ret;
 
 	sd->events[0] = io_u;
 
-	return ioctl(f->fd, SG_IO, hdr);
+	ret = ioctl(f->fd, SG_IO, hdr);
+	if (ret < 0)
+		return ret;
+
+	return FIO_Q_COMPLETED;
 }
 
 static int fio_sgio_rw_doio(struct fio_file *f, struct io_u *io_u, int sync)
@@ -190,9 +180,10 @@
 		ret = read(f->fd, hdr, sizeof(*hdr));
 		if (ret < 0)
 			return errno;
+		return FIO_Q_COMPLETED;
 	}
 
-	return 0;
+	return FIO_Q_QUEUED;
 }
 
 static int fio_sgio_doio(struct thread_data *td, struct io_u *io_u, int sync)
@@ -263,10 +254,10 @@
 
 	if (io_u->error) {
 		td_verror(td, io_u->error);
-		return io_u->error;
+		return FIO_Q_COMPLETED;
 	}
 
-	return 0;
+	return ret;
 }
 
 static struct io_u *fio_sgio_event(struct thread_data *td, int event)
@@ -369,10 +360,10 @@
 
 	sd->bs = bs;
 
-	if (td->filetype == FIO_TYPE_BD)
-		td->io_ops->getevents = fio_sgio_ioctl_getevents;
-	else
-		td->io_ops->getevents = fio_sgio_getevents;
+	if (td->filetype == FIO_TYPE_BD) {
+		td->io_ops->getevents = NULL;
+		td->io_ops->event = NULL;
+	}
 
 	/*
 	 * we want to do it, regardless of whether odirect is set or not
diff --git a/engines/skeleton_external.c b/engines/skeleton_external.c
index 785b9a6..0937d68 100644
--- a/engines/skeleton_external.c
+++ b/engines/skeleton_external.c
@@ -61,12 +61,17 @@
  * The io engine must transfer in the direction noted by io_u->ddir
  * to the buffer pointed to by io_u->xfer_buf for as many bytes as
  * io_u->xfer_buflen. Residual data count may be set in io_u->residual
- * for a short read/write. Should return 0 for io_u complete, < 0 for
- * an error, and > 0 for the number of bytes transferred.
+ * for a short read/write.
  */
 static int fio_skeleton_queue(struct thread_data *td, struct io_u *io_u)
 {
-	return 0;
+	/*
+	 * Could return FIO_Q_QUEUED for a queued request,
+	 * FIO_Q_COMPLETED for a completed request, and FIO_Q_BUSY
+	 * if we could queue no more at this point (you'd have to
+	 * define ->commit() to handle that.
+	 */
+	return FIO_Q_COMPLETED;
 }
 
 /*
diff --git a/engines/splice.c b/engines/splice.c
index 432ba79..f55e5c0 100644
--- a/engines/splice.c
+++ b/engines/splice.c
@@ -15,34 +15,9 @@
 #ifdef FIO_HAVE_SPLICE
 
 struct spliceio_data {
-	struct io_u *last_io_u;
 	int pipe[2];
 };
 
-static int fio_spliceio_getevents(struct thread_data *td, int fio_unused min,
-				  int max, struct timespec fio_unused *t)
-{
-	assert(max <= 1);
-
-	/*
-	 * we can only have one finished io_u for sync io, since the depth
-	 * is always 1
-	 */
-	if (list_empty(&td->io_u_busylist))
-		return 0;
-
-	return 1;
-}
-
-static struct io_u *fio_spliceio_event(struct thread_data *td, int event)
-{
-	struct spliceio_data *sd = td->io_ops->data;
-
-	assert(event == 0);
-
-	return sd->last_io_u;
-}
-
 /*
  * For splice reading, we unfortunately cannot (yet) vmsplice the other way.
  * So just splice the data from the file into the pipe, and use regular
@@ -131,7 +106,6 @@
 
 static int fio_spliceio_queue(struct thread_data *td, struct io_u *io_u)
 {
-	struct spliceio_data *sd = td->io_ops->data;
 	int ret;
 
 	if (io_u->ddir == DDIR_READ)
@@ -145,17 +119,15 @@
 		if (ret > 0) {
 			io_u->resid = io_u->xfer_buflen - ret;
 			io_u->error = 0;
-			return ret;
+			return FIO_Q_COMPLETED;
 		} else
 			io_u->error = errno;
 	}
 
-	if (!io_u->error)
-		sd->last_io_u = io_u;
-	else
+	if (io_u->error)
 		td_verror(td, io_u->error);
 
-	return io_u->error;
+	return FIO_Q_COMPLETED;
 }
 
 static void fio_spliceio_cleanup(struct thread_data *td)
@@ -174,7 +146,6 @@
 {
 	struct spliceio_data *sd = malloc(sizeof(*sd));
 
-	sd->last_io_u = NULL;
 	if (pipe(sd->pipe) < 0) {
 		td_verror(td, errno);
 		free(sd);
@@ -190,8 +161,6 @@
 	.version	= FIO_IOOPS_VERSION,
 	.init		= fio_spliceio_init,
 	.queue		= fio_spliceio_queue,
-	.getevents	= fio_spliceio_getevents,
-	.event		= fio_spliceio_event,
 	.cleanup	= fio_spliceio_cleanup,
 	.flags		= FIO_SYNCIO,
 };
diff --git a/engines/sync.c b/engines/sync.c
index f689cbe..6a5b7d3 100644
--- a/engines/sync.c
+++ b/engines/sync.c
@@ -11,34 +11,6 @@
 #include "../fio.h"
 #include "../os.h"
 
-struct syncio_data {
-	struct io_u *last_io_u;
-};
-
-static int fio_syncio_getevents(struct thread_data *td, int fio_unused min,
-				int max, struct timespec fio_unused *t)
-{
-	assert(max <= 1);
-
-	/*
-	 * we can only have one finished io_u for sync io, since the depth
-	 * is always 1
-	 */
-	if (list_empty(&td->io_u_busylist))
-		return 0;
-
-	return 1;
-}
-
-static struct io_u *fio_syncio_event(struct thread_data *td, int event)
-{
-	struct syncio_data *sd = td->io_ops->data;
-
-	assert(event == 0);
-
-	return sd->last_io_u;
-}
-
 static int fio_syncio_prep(struct thread_data *td, struct io_u *io_u)
 {
 	struct fio_file *f = io_u->file;
@@ -58,7 +30,6 @@
 
 static int fio_syncio_queue(struct thread_data *td, struct io_u *io_u)
 {
-	struct syncio_data *sd = td->io_ops->data;
 	struct fio_file *f = io_u->file;
 	int ret;
 
@@ -73,45 +44,22 @@
 		if (ret > 0) {
 			io_u->resid = io_u->xfer_buflen - ret;
 			io_u->error = 0;
-			return ret;
+			return FIO_Q_COMPLETED;
 		} else
 			io_u->error = errno;
 	}
 
-	if (!io_u->error)
-		sd->last_io_u = io_u;
-	else
+	if (io_u->error)
 		td_verror(td, io_u->error);
 
-	return io_u->error;
-}
-
-static void fio_syncio_cleanup(struct thread_data *td)
-{
-	if (td->io_ops->data) {
-		free(td->io_ops->data);
-		td->io_ops->data = NULL;
-	}
-}
-
-static int fio_syncio_init(struct thread_data *td)
-{
-	struct syncio_data *sd = malloc(sizeof(*sd));
-
-	sd->last_io_u = NULL;
-	td->io_ops->data = sd;
-	return 0;
+	return FIO_Q_COMPLETED;
 }
 
 static struct ioengine_ops ioengine = {
 	.name		= "sync",
 	.version	= FIO_IOOPS_VERSION,
-	.init		= fio_syncio_init,
 	.prep		= fio_syncio_prep,
 	.queue		= fio_syncio_queue,
-	.getevents	= fio_syncio_getevents,
-	.event		= fio_syncio_event,
-	.cleanup	= fio_syncio_cleanup,
 	.flags		= FIO_SYNCIO,
 };
 
diff --git a/engines/syslet-rw.c b/engines/syslet-rw.c
index 4b65b2d..bfb6021 100644
--- a/engines/syslet-rw.c
+++ b/engines/syslet-rw.c
@@ -154,7 +154,7 @@
 	 * it's queued asynchronously.
 	 */
 	if (!async_exec(&io_u->req.atom))
-		return 0;
+		return FIO_Q_QUEUED;
 
 	/*
 	 * completed sync
@@ -164,7 +164,7 @@
 		if (ret > 0) {
 			io_u->resid = io_u->xfer_buflen - ret;
 			io_u->error = 0;
-			return ret;
+			return FIO_Q_COMPLETED;
 		} else
 			io_u->error = errno;
 	}
@@ -174,7 +174,7 @@
 	else
 		td_verror(td, io_u->error);
 
-	return io_u->error;
+	return FIO_Q_COMPLETED;
 }
 
 static int async_head_init(struct syslet_data *sd, unsigned int depth)
diff --git a/fio.c b/fio.c
index ff169f8..5358af2 100644
--- a/fio.c
+++ b/fio.c
@@ -27,6 +27,7 @@
 #include <signal.h>
 #include <time.h>
 #include <locale.h>
+#include <assert.h>
 #include <sys/stat.h>
 #include <sys/wait.h>
 #include <sys/ipc.h>
@@ -221,23 +222,32 @@
 	}
 
 	ret = td_io_queue(td, io_u);
-	if (ret) {
+	if (ret < 0) {
 		td_verror(td, io_u->error);
 		put_io_u(td, io_u);
 		return 1;
-	}
+	} else if (ret == FIO_Q_QUEUED) {
+		ret = td_io_getevents(td, 1, td->cur_depth, NULL);
+		if (ret < 0) {
+			td_verror(td, ret);
+			return 1;
+		}
 
-	ret = td_io_getevents(td, 1, td->cur_depth, NULL);
-	if (ret < 0) {
-		td_verror(td, ret);
-		return 1;
-	}
+		icd.nr = ret;
+		ios_completed(td, &icd);
+		if (icd.error) {
+			td_verror(td, icd.error);
+			return 1;
+		}
+	} else if (ret == FIO_Q_COMPLETED) {
+		if (io_u->error) {
+			td_verror(td, io_u->error);
+			return 1;
+		}
 
-	icd.nr = ret;
-	ios_completed(td, &icd);
-	if (icd.error) {
-		td_verror(td, icd.error);
-		return 1;
+		init_icd(&icd);
+		io_completed(td, io_u, &icd);
+		put_io_u(td, io_u);
 	}
 
 	return 0;
@@ -249,9 +259,8 @@
  */
 static void do_verify(struct thread_data *td)
 {
-	struct io_u *io_u, *v_io_u = NULL;
-	struct io_completion_data icd;
 	struct fio_file *f;
+	struct io_u *io_u;
 	int ret, i;
 
 	/*
@@ -265,78 +274,66 @@
 
 	td_set_runstate(td, TD_VERIFYING);
 
-	do {
-		if (td->terminate)
-			break;
-
+	io_u = NULL;
+	while (!td->terminate) {
 		io_u = __get_io_u(td);
 		if (!io_u)
 			break;
 
-		if (runtime_exceeded(td, &io_u->start_time)) {
-			put_io_u(td, io_u);
-			break;
-		}
-
-		if (get_next_verify(td, io_u)) {
-			put_io_u(td, io_u);
-			break;
-		}
-
-		f = get_next_file(td);
-		if (!f)
+		if (runtime_exceeded(td, &io_u->start_time))
 			break;
 
-		io_u->file = f;
-
-		if (td_io_prep(td, io_u)) {
-			put_io_u(td, io_u);
+		if (get_next_verify(td, io_u))
 			break;
-		}
 
+		if (td_io_prep(td, io_u))
+			break;
+
+requeue:
 		ret = td_io_queue(td, io_u);
-		if (ret) {
-			td_verror(td, io_u->error);
-			put_io_u(td, io_u);
+
+		switch (ret) {
+		case FIO_Q_COMPLETED:
+			if (io_u->error)
+				ret = io_u->error;
+			if (io_u->xfer_buflen != io_u->resid && io_u->resid) {
+				int bytes = io_u->xfer_buflen - io_u->resid;
+
+				io_u->xfer_buflen = io_u->resid;
+				io_u->xfer_buf += bytes;
+				goto requeue;
+			}
+			if (do_io_u_verify(td, &io_u)) {
+				ret = -EIO;
+				break;
+			}
+			continue;
+		case FIO_Q_QUEUED:
+			break;
+		default:
+			assert(ret < 0);
+			td_verror(td, ret);
 			break;
 		}
 
 		/*
-		 * we have one pending to verify, do that while
-		 * we are doing io on the next one
+		 * We get here for a queued request, in the future we
+		 * want to later make this take full advantage of
+		 * keeping IO in flight while verifying others.
 		 */
-		if (do_io_u_verify(td, &v_io_u))
-			break;
-
 		ret = td_io_getevents(td, 1, 1, NULL);
-		if (ret != 1) {
-			if (ret < 0)
-				td_verror(td, ret);
-			break;
-		}
-
-		v_io_u = td->io_ops->event(td, 0);
-		icd.nr = 1;
-		icd.error = 0;
-		fio_gettime(&icd.time, NULL);
-		io_completed(td, v_io_u, &icd);
-
-		if (icd.error) {
-			td_verror(td, icd.error);
-			put_io_u(td, v_io_u);
-			v_io_u = NULL;
-			break;
-		}
-
-		/*
-		 * if we can't submit more io, we need to verify now
-		 */
-		if (queue_full(td) && do_io_u_verify(td, &v_io_u))
+		if (ret < 0)
 			break;
 
-	} while (1);
+		assert(ret == 1);
+		io_u = td->io_ops->event(td, 0);
 
-	do_io_u_verify(td, &v_io_u);
+		if (do_io_u_verify(td, &io_u))
+			break;
+	}
+
+	if (io_u)
+		put_io_u(td, io_u);
 
 	if (td->cur_depth)
 		cleanup_pending_aio(td);
@@ -402,45 +399,63 @@
 		memcpy(&s, &io_u->start_time, sizeof(s));
 requeue:
 		ret = td_io_queue(td, io_u);
-		if (ret) {
-			if (ret > 0 && (io_u->xfer_buflen != io_u->resid) &&
-			    io_u->resid) {
-				/*
-				 * short read/write. requeue.
-				 */
-				io_u->xfer_buflen = io_u->resid;
-				io_u->xfer_buf += ret;
-				goto requeue;
-			} else {
-				put_io_u(td, io_u);
+
+		switch (ret) {
+		case FIO_Q_COMPLETED:
+			if (io_u->error) {
+				ret = io_u->error;
 				break;
 			}
+			if (io_u->xfer_buflen != io_u->resid && io_u->resid) {
+				int bytes = io_u->xfer_buflen - io_u->resid;
+
+				io_u->xfer_buflen = io_u->resid;
+				io_u->xfer_buf += bytes;
+				goto requeue;
+			}
+			init_icd(&icd);
+			io_completed(td, io_u, &icd);
+			put_io_u(td, io_u);
+			break;
+		case FIO_Q_QUEUED:
+			break;
+		default:
+			assert(ret < 0);
+			put_io_u(td, io_u);
+			break;
 		}
 
+		if (ret < 0)
+			break;
+
 		add_slat_sample(td, io_u->ddir, mtime_since(&io_u->start_time, &io_u->issue_time));
 
-		if (td->cur_depth < td->iodepth) {
-			struct timespec ts = { .tv_sec = 0, .tv_nsec = 0};
+		if (ret == FIO_Q_QUEUED) {
+			if (td->cur_depth < td->iodepth) {
+				struct timespec ts;
 
-			timeout = &ts;
-			min_evts = 0;
-		} else {
-			timeout = NULL;
-			min_evts = 1;
-		}
+				ts.tv_sec = 0;
+				ts.tv_nsec = 0;
+				timeout = &ts;
+				min_evts = 0;
+			} else {
+				timeout = NULL;
+				min_evts = 1;
+			}
 
-		ret = td_io_getevents(td, min_evts, td->cur_depth, timeout);
-		if (ret < 0) {
-			td_verror(td, ret);
-			break;
-		} else if (!ret)
-			continue;
+			ret = td_io_getevents(td, min_evts, td->cur_depth, timeout);
+			if (ret < 0) {
+				td_verror(td, ret);
+				break;
+			} else if (!ret)
+				continue;
 
-		icd.nr = ret;
-		ios_completed(td, &icd);
-		if (icd.error) {
-			td_verror(td, icd.error);
-			break;
+			icd.nr = ret;
+			ios_completed(td, &icd);
+			if (icd.error) {
+				td_verror(td, icd.error);
+				break;
+			}
 		}
 
 		/*
diff --git a/fio.h b/fio.h
index 934d897..b087f97 100644
--- a/fio.h
+++ b/fio.h
@@ -129,6 +129,14 @@
 	struct list_head list;
 };
 
+/*
+ * io_ops->queue() return values
+ */
+enum {
+	FIO_Q_COMPLETED	= 0,		/* completed sync */
+	FIO_Q_QUEUED	= 1,		/* queued, will complete async */
+};
+
 #define FIO_HDR_MAGIC	0xf00baaef
 
 enum {
@@ -608,6 +616,7 @@
 extern void put_io_u(struct thread_data *, struct io_u *);
 extern void ios_completed(struct thread_data *, struct io_completion_data *);
 extern void io_completed(struct thread_data *, struct io_u *, struct io_completion_data *);
+extern void init_icd(struct io_completion_data *);
 
 /*
  * io engine entry points
@@ -666,9 +675,10 @@
 	void (*cleanup)(struct thread_data *);
 	void *data;
 	void *dlhandle;
+	unsigned long priv;
 };
 
-#define FIO_IOOPS_VERSION	3
+#define FIO_IOOPS_VERSION	4
 
 extern struct ioengine_ops *load_ioengine(struct thread_data *, const char *);
 extern int register_ioengine(struct ioengine_ops *);
diff --git a/io_u.c b/io_u.c
index 132d897..6439979 100644
--- a/io_u.c
+++ b/io_u.c
@@ -353,23 +353,23 @@
 			return NULL;
 		}
 
-		f->last_pos += io_u->buflen;
+		f->last_pos = io_u->offset + io_u->buflen;
 
 		if (td->verify != VERIFY_NONE)
 			populate_verify_io_u(td, io_u);
 	}
 
-	if (td_io_prep(td, io_u)) {
-		put_io_u(td, io_u);
-		return NULL;
-	}
-
 	/*
 	 * Set io data pointers.
 	 */
 	io_u->xfer_buf = io_u->buf;
 	io_u->xfer_buflen = io_u->buflen;
 
+	if (td_io_prep(td, io_u)) {
+		put_io_u(td, io_u);
+		return NULL;
+	}
+
 	fio_gettime(&io_u->start_time, NULL);
 	return io_u;
 }
@@ -411,15 +411,20 @@
 		icd->error = io_u->error;
 }
 
+void init_icd(struct io_completion_data *icd)
+{
+	fio_gettime(&icd->time, NULL);
+
+	icd->error = 0;
+	icd->bytes_done[0] = icd->bytes_done[1] = 0;
+}
+
 void ios_completed(struct thread_data *td, struct io_completion_data *icd)
 {
 	struct io_u *io_u;
 	int i;
 
-	fio_gettime(&icd->time, NULL);
-
-	icd->error = 0;
-	icd->bytes_done[0] = icd->bytes_done[1] = 0;
+	init_icd(icd);
 
 	for (i = 0; i < icd->nr; i++) {
 		io_u = td->io_ops->event(td, i);
diff --git a/ioengines.c b/ioengines.c
index 2ed2749..1b510df 100644
--- a/ioengines.c
+++ b/ioengines.c
@@ -33,16 +33,27 @@
 	if (ops->flags & FIO_CPUIO)
 		return 0;
 
+	if (!ops->queue) {
+		log_err("%s: no queue handler\n", ops->name);
+		return 1;
+	}
+
+	/*
+	 * sync engines only need a ->queue()
+	 */
+	if (ops->flags & FIO_SYNCIO)
+		return 0;
+	
 	if (!ops->event) {
-		log_err("%s: no event handler)\n", ops->name);
+		log_err("%s: no event handler\n", ops->name);
 		return 1;
 	}
 	if (!ops->getevents) {
-		log_err("%s: no getevents handler)\n", ops->name);
+		log_err("%s: no getevents handler\n", ops->name);
 		return 1;
 	}
 	if (!ops->queue) {
-		log_err("%s: no queue handler)\n", ops->name);
+		log_err("%s: no queue handler\n", ops->name);
 		return 1;
 	}
 		
@@ -159,8 +170,8 @@
 
 int td_io_prep(struct thread_data *td, struct io_u *io_u)
 {
-	if (td->io_ops->prep && td->io_ops->prep(td, io_u))
-		return 1;
+	if (td->io_ops->prep)
+		return td->io_ops->prep(td, io_u);
 
 	return 0;
 }
@@ -168,7 +179,10 @@
 int td_io_getevents(struct thread_data *td, int min, int max,
 		    struct timespec *t)
 {
-	return td->io_ops->getevents(td, min, max, t);
+	if (td->io_ops->getevents)
+		return td->io_ops->getevents(td, min, max, t);
+
+	return 0;
 }
 
 int td_io_queue(struct thread_data *td, struct io_u *io_u)
diff --git a/verify.c b/verify.c
index 4440d44..32cfdd7 100644
--- a/verify.c
+++ b/verify.c
@@ -83,8 +83,10 @@
 	struct verify_header *hdr = (struct verify_header *) io_u->buf;
 	int ret;
 
-	if (hdr->fio_magic != FIO_HDR_MAGIC)
+	if (hdr->fio_magic != FIO_HDR_MAGIC) {
+		log_err("Bad verify header %x\n", hdr->fio_magic);
 		return 1;
+	}
 
 	if (hdr->verify_type == VERIFY_MD5)
 		ret = verify_io_u_md5(hdr, io_u);
@@ -148,7 +150,10 @@
 
 		io_u->offset = ipo->offset;
 		io_u->buflen = ipo->len;
+		io_u->file = ipo->file;
 		io_u->ddir = DDIR_READ;
+		io_u->xfer_buf = io_u->buf;
+		io_u->xfer_buflen = io_u->buflen;
 		free(ipo);
 		return 0;
 	}
@@ -162,7 +167,11 @@
 	int ret = 0;
 
 	if (v_io_u) {
+		struct io_completion_data icd;
+
 		ret = verify_io_u(v_io_u);
+		init_icd(&icd);
+		io_completed(td, v_io_u, &icd);
 		put_io_u(td, v_io_u);
 		*io_u = NULL;
 	}