Fully parallellize io_u verification

Keep a full queue whenever possible, do verifications while
io is in progress.

Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
diff --git a/fio.c b/fio.c
index 5358af2..eb857af 100644
--- a/fio.c
+++ b/fio.c
@@ -174,7 +174,7 @@
 	 */
 	r = td_io_getevents(td, 0, td->cur_depth, &ts);
 	if (r > 0) {
-		icd.nr = r;
+		init_icd(&icd, NULL, r);
 		ios_completed(td, &icd);
 	}
 
@@ -194,7 +194,7 @@
 	if (td->cur_depth) {
 		r = td_io_getevents(td, td->cur_depth, td->cur_depth, NULL);
 		if (r > 0) {
-			icd.nr = r;
+			init_icd(&icd, NULL, r);
 			ios_completed(td, &icd);
 		}
 	}
@@ -233,7 +233,7 @@
 			return 1;
 		}
 
-		icd.nr = ret;
+		init_icd(&icd, NULL, ret);
 		ios_completed(td, &icd);
 		if (icd.error) {
 			td_verror(td, icd.error);
@@ -245,7 +245,7 @@
 			return 1;
 		}
 
-		init_icd(&icd);
+		init_icd(&icd, NULL, 1);
 		io_completed(td, io_u, &icd);
 		put_io_u(td, io_u);
 	}
@@ -261,7 +261,7 @@
 {
 	struct fio_file *f;
 	struct io_u *io_u;
-	int ret, i;
+	int ret, i, min_events;
 
 	/*
 	 * sync io first and invalidate cache, to make sure we really
@@ -276,6 +276,9 @@
 
 	io_u = NULL;
 	while (!td->terminate) {
+		struct io_completion_data icd;
+		struct timespec *timeout;
+
 		io_u = __get_io_u(td);
 		if (!io_u)
 			break;
@@ -303,10 +306,13 @@
 				io_u->xfer_buf += bytes;
 				goto requeue;
 			}
-			if (do_io_u_verify(td, &io_u)) {
-				ret = -EIO;
+			init_icd(&icd, verify_io_u, 1);
+			io_completed(td, io_u, &icd);
+			if (icd.error) {
+				ret = icd.error;
 				break;
 			}
+			put_io_u(td, io_u);
 			continue;
 		case FIO_Q_QUEUED:
 			break;
@@ -316,20 +322,42 @@
 			break;
 		}
 
-		/*
-		 * We get here for a queued request, in the future we
-		 * want to later make this take full advantage of
-		 * keeping IO in flight while verifying others.
-		 */
-		ret = td_io_getevents(td, 1, 1, NULL);
 		if (ret < 0)
 			break;
 
-		assert(ret == 1);
-		io_u = td->io_ops->event(td, 0);
+		/*
+		 * if we can queue more, do so. but check if there are
+		 * completed io_u's first.
+		 */
+		if (queue_full(td)) {
+			timeout = NULL;
+			min_events = 1;
+		} else {
+			struct timespec ts;
 
-		if (do_io_u_verify(td, &io_u))
+			ts.tv_sec = 0;
+			ts.tv_nsec = 0;
+			timeout = &ts;
+			min_events = 0;
+		}
+
+		/*
+		 * Reap required number of io units, if any, and do the
+		 * verification on them through the callback handler
+		 */
+		ret = td_io_getevents(td, min_events, td->cur_depth, timeout);
+		if (ret < 0)
 			break;
+		else if (!ret)
+			continue;
+
+		init_icd(&icd, verify_io_u, ret);
+		ios_completed(td, &icd);
+
+		if (icd.error) {
+			td_verror(td, icd.error);
+			break;
+		}
 	}
 
 	if (io_u)
@@ -413,7 +441,7 @@
 				io_u->xfer_buf += bytes;
 				goto requeue;
 			}
-			init_icd(&icd);
+			init_icd(&icd, NULL, 1);
 			io_completed(td, io_u, &icd);
 			put_io_u(td, io_u);
 			break;
@@ -450,7 +478,7 @@
 			} else if (!ret)
 				continue;
 
-			icd.nr = ret;
+			init_icd(&icd, NULL, ret);
 			ios_completed(td, &icd);
 			if (icd.error) {
 				td_verror(td, icd.error);