Fully parallellize io_u verification

Keep a full queue whenever possible, do verifications while
io is in progress.

Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
diff --git a/fio.c b/fio.c
index 5358af2..eb857af 100644
--- a/fio.c
+++ b/fio.c
@@ -174,7 +174,7 @@
 	 */
 	r = td_io_getevents(td, 0, td->cur_depth, &ts);
 	if (r > 0) {
-		icd.nr = r;
+		init_icd(&icd, NULL, r);
 		ios_completed(td, &icd);
 	}
 
@@ -194,7 +194,7 @@
 	if (td->cur_depth) {
 		r = td_io_getevents(td, td->cur_depth, td->cur_depth, NULL);
 		if (r > 0) {
-			icd.nr = r;
+			init_icd(&icd, NULL, r);
 			ios_completed(td, &icd);
 		}
 	}
@@ -233,7 +233,7 @@
 			return 1;
 		}
 
-		icd.nr = ret;
+		init_icd(&icd, NULL, ret);
 		ios_completed(td, &icd);
 		if (icd.error) {
 			td_verror(td, icd.error);
@@ -245,7 +245,7 @@
 			return 1;
 		}
 
-		init_icd(&icd);
+		init_icd(&icd, NULL, 1);
 		io_completed(td, io_u, &icd);
 		put_io_u(td, io_u);
 	}
@@ -261,7 +261,7 @@
 {
 	struct fio_file *f;
 	struct io_u *io_u;
-	int ret, i;
+	int ret, i, min_events;
 
 	/*
 	 * sync io first and invalidate cache, to make sure we really
@@ -276,6 +276,9 @@
 
 	io_u = NULL;
 	while (!td->terminate) {
+		struct io_completion_data icd;
+		struct timespec *timeout;
+
 		io_u = __get_io_u(td);
 		if (!io_u)
 			break;
@@ -303,10 +306,13 @@
 				io_u->xfer_buf += bytes;
 				goto requeue;
 			}
-			if (do_io_u_verify(td, &io_u)) {
-				ret = -EIO;
+			init_icd(&icd, verify_io_u, 1);
+			io_completed(td, io_u, &icd);
+			if (icd.error) {
+				ret = icd.error;
 				break;
 			}
+			put_io_u(td, io_u);
 			continue;
 		case FIO_Q_QUEUED:
 			break;
@@ -316,20 +322,42 @@
 			break;
 		}
 
-		/*
-		 * We get here for a queued request, in the future we
-		 * want to later make this take full advantage of
-		 * keeping IO in flight while verifying others.
-		 */
-		ret = td_io_getevents(td, 1, 1, NULL);
 		if (ret < 0)
 			break;
 
-		assert(ret == 1);
-		io_u = td->io_ops->event(td, 0);
+		/*
+		 * if we can queue more, do so. but check if there are
+		 * completed io_u's first.
+		 */
+		if (queue_full(td)) {
+			timeout = NULL;
+			min_events = 1;
+		} else {
+			struct timespec ts;
 
-		if (do_io_u_verify(td, &io_u))
+			ts.tv_sec = 0;
+			ts.tv_nsec = 0;
+			timeout = &ts;
+			min_events = 0;
+		}
+
+		/*
+		 * Reap required number of io units, if any, and do the
+		 * verification on them through the callback handler
+		 */
+		ret = td_io_getevents(td, min_events, td->cur_depth, timeout);
+		if (ret < 0)
 			break;
+		else if (!ret)
+			continue;
+
+		init_icd(&icd, verify_io_u, ret);
+		ios_completed(td, &icd);
+
+		if (icd.error) {
+			td_verror(td, icd.error);
+			break;
+		}
 	}
 
 	if (io_u)
@@ -413,7 +441,7 @@
 				io_u->xfer_buf += bytes;
 				goto requeue;
 			}
-			init_icd(&icd);
+			init_icd(&icd, NULL, 1);
 			io_completed(td, io_u, &icd);
 			put_io_u(td, io_u);
 			break;
@@ -450,7 +478,7 @@
 			} else if (!ret)
 				continue;
 
-			icd.nr = ret;
+			init_icd(&icd, NULL, ret);
 			ios_completed(td, &icd);
 			if (icd.error) {
 				td_verror(td, icd.error);
diff --git a/fio.h b/fio.h
index b087f97..e5e754a 100644
--- a/fio.h
+++ b/fio.h
@@ -494,8 +494,10 @@
 /*
  * Used for passing io_u completion data
  */
+typedef int (icd_handler)(struct io_u *);
 struct io_completion_data {
 	int nr;				/* input */
+	icd_handler *handler;		/* input */
 
 	int error;			/* output */
 	unsigned long bytes_done[2];	/* output */
@@ -597,7 +599,7 @@
  */
 extern void populate_verify_io_u(struct thread_data *, struct io_u *);
 extern int get_next_verify(struct thread_data *td, struct io_u *);
-extern int do_io_u_verify(struct thread_data *, struct io_u **);
+extern int verify_io_u(struct io_u *);
 
 /*
  * Memory helpers
@@ -616,7 +618,7 @@
 extern void put_io_u(struct thread_data *, struct io_u *);
 extern void ios_completed(struct thread_data *, struct io_completion_data *);
 extern void io_completed(struct thread_data *, struct io_u *, struct io_completion_data *);
-extern void init_icd(struct io_completion_data *);
+extern void init_icd(struct io_completion_data *, icd_handler *, int);
 
 /*
  * io engine entry points
diff --git a/io_u.c b/io_u.c
index 6439979..ebcc8bb 100644
--- a/io_u.c
+++ b/io_u.c
@@ -389,6 +389,7 @@
 	if (!io_u->error) {
 		unsigned int bytes = io_u->buflen - io_u->resid;
 		const enum fio_ddir idx = io_u->ddir;
+		int ret;
 
 		td->io_blocks[idx]++;
 		td->io_bytes[idx] += bytes;
@@ -407,14 +408,23 @@
 			log_io_piece(td, io_u);
 
 		icd->bytes_done[idx] += bytes;
+
+		if (icd->handler) {
+			ret = icd->handler(io_u);
+			if (ret && !icd->error)
+				icd->error = ret;
+		}
 	} else
 		icd->error = io_u->error;
 }
 
-void init_icd(struct io_completion_data *icd)
+void init_icd(struct io_completion_data *icd, icd_handler *handler, int nr)
 {
 	fio_gettime(&icd->time, NULL);
 
+	icd->handler = handler;
+	icd->nr = nr;
+
 	icd->error = 0;
 	icd->bytes_done[0] = icd->bytes_done[1] = 0;
 }
@@ -424,8 +434,6 @@
 	struct io_u *io_u;
 	int i;
 
-	init_icd(icd);
-
 	for (i = 0; i < icd->nr; i++) {
 		io_u = td->io_ops->event(td, i);
 
diff --git a/verify.c b/verify.c
index 32cfdd7..f2ef107 100644
--- a/verify.c
+++ b/verify.c
@@ -78,7 +78,7 @@
 	return 0;
 }
 
-static int verify_io_u(struct io_u *io_u)
+int verify_io_u(struct io_u *io_u)
 {
 	struct verify_header *hdr = (struct verify_header *) io_u->buf;
 	int ret;
@@ -160,21 +160,3 @@
 
 	return 1;
 }
-
-int do_io_u_verify(struct thread_data *td, struct io_u **io_u)
-{
-	struct io_u *v_io_u = *io_u;
-	int ret = 0;
-
-	if (v_io_u) {
-		struct io_completion_data icd;
-
-		ret = verify_io_u(v_io_u);
-		init_icd(&icd);
-		io_completed(td, v_io_u, &icd);
-		put_io_u(td, v_io_u);
-		*io_u = NULL;
-	}
-
-	return ret;
-}