Cleanup end IO handling

Abstract out the end IO handling, so that callers don't have to
keep track of completion details. Then we can make the
io_completion_data structure private to io_u, and just provide to
functions to end io - one for sync completes, one for queued completes.

Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
diff --git a/fio.c b/fio.c
index 3e55f92..78deaf6 100644
--- a/fio.c
+++ b/fio.c
@@ -163,20 +163,14 @@
  */
 static void cleanup_pending_aio(struct thread_data *td)
 {
-	struct timespec ts = { .tv_sec = 0, .tv_nsec = 0};
 	struct list_head *entry, *n;
-	struct io_completion_data icd;
 	struct io_u *io_u;
 	int r;
 
 	/*
 	 * get immediately available events, if any
 	 */
-	r = td_io_getevents(td, 0, td->cur_depth, &ts);
-	if (r > 0) {
-		init_icd(&icd, NULL, r);
-		ios_completed(td, &icd);
-	}
+	io_u_queued_complete(td, 0, NULL);
 
 	/*
 	 * now cancel remaining active events
@@ -191,13 +185,8 @@
 		}
 	}
 
-	if (td->cur_depth) {
-		r = td_io_getevents(td, td->cur_depth, td->cur_depth, NULL);
-		if (r > 0) {
-			init_icd(&icd, NULL, r);
-			ios_completed(td, &icd);
-		}
-	}
+	if (td->cur_depth)
+		io_u_queued_complete(td, td->cur_depth, NULL);
 }
 
 /*
@@ -207,7 +196,6 @@
 static int fio_io_sync(struct thread_data *td, struct fio_file *f)
 {
 	struct io_u *io_u = __get_io_u(td);
-	struct io_completion_data icd;
 	int ret;
 
 	if (!io_u)
@@ -227,27 +215,15 @@
 		put_io_u(td, io_u);
 		return 1;
 	} else if (ret == FIO_Q_QUEUED) {
-		ret = td_io_getevents(td, 1, td->cur_depth, NULL);
-		if (ret < 0) {
-			td_verror(td, -ret);
+		if (io_u_queued_complete(td, 1, NULL))
 			return 1;
-		}
-
-		init_icd(&icd, NULL, ret);
-		ios_completed(td, &icd);
-		if (icd.error) {
-			td_verror(td, icd.error);
-			return 1;
-		}
 	} else if (ret == FIO_Q_COMPLETED) {
 		if (io_u->error) {
 			td_verror(td, io_u->error);
 			return 1;
 		}
 
-		init_icd(&icd, NULL, 1);
-		io_completed(td, io_u, &icd);
-		put_io_u(td, io_u);
+		io_u_sync_complete(td, io_u, NULL);
 	}
 
 	return 0;
@@ -276,9 +252,6 @@
 
 	io_u = NULL;
 	while (!td->terminate) {
-		struct io_completion_data icd;
-		struct timespec *timeout;
-
 		io_u = __get_io_u(td);
 		if (!io_u)
 			break;
@@ -306,13 +279,8 @@
 				io_u->xfer_buf += bytes;
 				goto requeue;
 			}
-			init_icd(&icd, verify_io_u, 1);
-			io_completed(td, io_u, &icd);
-			if (icd.error) {
-				ret = icd.error;
+			if (io_u_sync_complete(td, io_u, verify_io_u))
 				break;
-			}
-			put_io_u(td, io_u);
 			continue;
 		case FIO_Q_QUEUED:
 			break;
@@ -329,36 +297,16 @@
 		 * if we can queue more, do so. but check if there are
 		 * completed io_u's first.
 		 */
-		if (queue_full(td)) {
-			timeout = NULL;
+		min_events = 0;
+		if (queue_full(td))
 			min_events = 1;
-		} else {
-			struct timespec ts;
-
-			ts.tv_sec = 0;
-			ts.tv_nsec = 0;
-			timeout = &ts;
-			min_events = 0;
-		}
 
 		/*
 		 * Reap required number of io units, if any, and do the
 		 * verification on them through the callback handler
 		 */
-		ret = td_io_getevents(td, min_events, td->cur_depth, timeout);
-		if (ret < 0) {
-			td_verror(td, -ret);
+		if (io_u_queued_complete(td, min_events, verify_io_u))
 			break;
-		} else if (!ret)
-			continue;
-
-		init_icd(&icd, verify_io_u, ret);
-		ios_completed(td, &icd);
-
-		if (icd.error) {
-			td_verror(td, icd.error);
-			break;
-		}
 	}
 
 	if (io_u)
@@ -401,7 +349,6 @@
  */
 static void do_io(struct thread_data *td)
 {
-	struct io_completion_data icd;
 	struct timeval s;
 	unsigned long usec;
 	struct fio_file *f;
@@ -410,7 +357,8 @@
 	td_set_runstate(td, TD_RUNNING);
 
 	while ((td->this_io_bytes[0] + td->this_io_bytes[1]) < td->io_size) {
-		struct timespec *timeout;
+		struct timeval comp_time;
+		long bytes_done = 0;
 		int min_evts = 0;
 		struct io_u *io_u;
 
@@ -426,6 +374,11 @@
 			break;
 
 		memcpy(&s, &io_u->start_time, sizeof(s));
+
+		if (runtime_exceeded(td, &s)) {
+			put_io_u(td, io_u);
+			break;
+		}
 requeue:
 		ret = td_io_queue(td, io_u);
 
@@ -442,9 +395,8 @@
 				io_u->xfer_buf += bytes;
 				goto requeue;
 			}
-			init_icd(&icd, NULL, 1);
-			io_completed(td, io_u, &icd);
-			put_io_u(td, io_u);
+			fio_gettime(&comp_time, NULL);
+			bytes_done = io_u_sync_complete(td, io_u, NULL);
 			break;
 		case FIO_Q_QUEUED:
 			break;
@@ -459,53 +411,39 @@
 
 		add_slat_sample(td, io_u->ddir, mtime_since(&io_u->start_time, &io_u->issue_time));
 
+		/*
+		 * See if we need to complete some commands
+		 */
 		if (ret == FIO_Q_QUEUED) {
-			if (td->cur_depth < td->iodepth) {
-				struct timespec ts;
-
-				ts.tv_sec = 0;
-				ts.tv_nsec = 0;
-				timeout = &ts;
-				min_evts = 0;
-			} else {
-				timeout = NULL;
+			min_evts = 0;
+			if (queue_full(td))
 				min_evts = 1;
-			}
 
-			ret = td_io_getevents(td, min_evts, td->cur_depth, timeout);
-			if (ret < 0) {
-				td_verror(td, -ret);
+			fio_gettime(&comp_time, NULL);
+			bytes_done = io_u_queued_complete(td, min_evts, NULL);
+			if (bytes_done < 0)
 				break;
-			} else if (!ret)
-				continue;
-
-			init_icd(&icd, NULL, ret);
-			ios_completed(td, &icd);
-			if (icd.error) {
-				td_verror(td, icd.error);
-				break;
-			}
 		}
 
+		if (!bytes_done)
+			continue;
+
 		/*
 		 * the rate is batched for now, it should work for batches
 		 * of completions except the very first one which may look
 		 * a little bursty
 		 */
-		usec = utime_since(&s, &icd.time);
+		usec = utime_since(&s, &comp_time);
 
-		rate_throttle(td, usec, icd.bytes_done[td->ddir], td->ddir);
+		rate_throttle(td, usec, bytes_done, td->ddir);
 
-		if (check_min_rate(td, &icd.time)) {
+		if (check_min_rate(td, &comp_time)) {
 			if (exitall_on_terminate)
 				terminate_threads(td->groupid, 0);
 			td_verror(td, ENODATA);
 			break;
 		}
 
-		if (runtime_exceeded(td, &icd.time))
-			break;
-
 		if (td->thinktime) {
 			unsigned long long b;