Add support for async IO verification offload

This adds support for setting up a number of IO verification offload
threads, instead of doing the offload inline. An option for controlling
the CPU affinity of those threads are always added.

Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
diff --git a/io_u.c b/io_u.c
index 2e9dac0..6549804 100644
--- a/io_u.c
+++ b/io_u.c
@@ -401,7 +401,7 @@
 	return td->rwmix_ddir;
 }
 
-static void put_file_log(struct thread_data *td, struct fio_file *f)
+void put_file_log(struct thread_data *td, struct fio_file *f)
 {
 	int ret = put_file(td, f);
 
@@ -411,16 +411,21 @@
 
 void put_io_u(struct thread_data *td, struct io_u *io_u)
 {
+	td_io_u_lock(td);
+
 	assert((io_u->flags & IO_U_F_FREE) == 0);
 	io_u->flags |= IO_U_F_FREE;
+	io_u->flags &= ~IO_U_F_FREE_DEF;
 
 	if (io_u->file)
 		put_file_log(td, io_u->file);
 
 	io_u->file = NULL;
-	flist_del(&io_u->list);
+	flist_del_init(&io_u->list);
 	flist_add(&io_u->list, &td->io_u_freelist);
 	td->cur_depth--;
+	td_io_u_unlock(td);
+	td_io_u_free_notify(td);
 }
 
 void clear_io_u(struct thread_data *td, struct io_u *io_u)
@@ -435,6 +440,8 @@
 
 	dprint(FD_IO, "requeue %p\n", __io_u);
 
+	td_io_u_lock(td);
+
 	__io_u->flags |= IO_U_F_FREE;
 	if ((__io_u->flags & IO_U_F_FLIGHT) && !ddir_sync(__io_u->ddir))
 		td->io_issues[__io_u->ddir]--;
@@ -444,6 +451,7 @@
 	flist_del(&__io_u->list);
 	flist_add_tail(&__io_u->list, &td->io_u_requeues);
 	td->cur_depth--;
+	td_io_u_unlock(td);
 	*io_u = NULL;
 }
 
@@ -826,6 +834,9 @@
 {
 	struct io_u *io_u = NULL;
 
+	td_io_u_lock(td);
+
+again:
 	if (!flist_empty(&td->io_u_requeues))
 		io_u = flist_entry(td->io_u_requeues.next, struct io_u, list);
 	else if (!queue_full(td)) {
@@ -837,9 +848,18 @@
 		io_u->end_io = NULL;
 	}
 
+	/*
+	 * We ran out, wait for async verify threads to finish and return one
+	 */
+	if (!io_u && td->o.verify_async) {
+		pthread_cond_wait(&td->free_cond, &td->io_u_lock);
+		goto again;
+	}
+
 	if (io_u) {
 		assert(io_u->flags & IO_U_F_FREE);
 		io_u->flags &= ~IO_U_F_FREE;
+		io_u->flags &= ~IO_U_F_FREE_DEF;
 
 		io_u->error = 0;
 		flist_del(&io_u->list);
@@ -847,6 +867,7 @@
 		td->cur_depth++;
 	}
 
+	td_io_u_unlock(td);
 	return io_u;
 }
 
@@ -1042,7 +1063,9 @@
 		io_u = td->io_ops->event(td, i);
 
 		io_completed(td, io_u, icd);
-		put_io_u(td, io_u);
+
+		if (!(io_u->flags & IO_U_F_FREE_DEF))
+			put_io_u(td, io_u);
 	}
 }
 
@@ -1056,7 +1079,9 @@
 
 	init_icd(td, &icd, 1);
 	io_completed(td, io_u, &icd);
-	put_io_u(td, io_u);
+
+	if (!(io_u->flags & IO_U_F_FREE_DEF))
+		put_io_u(td, io_u);
 
 	if (icd.error) {
 		td_verror(td, icd.error, "io_u_sync_complete");