Add support for async IO verification offload

This adds support for setting up a number of IO verification offload
threads, instead of doing the offload inline. An option for controlling
the CPU affinity of those threads are always added.

Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
diff --git a/HOWTO b/HOWTO
index 708eca0..55662d3 100644
--- a/HOWTO
+++ b/HOWTO
@@ -827,6 +827,16 @@
 		before quitting on a block verification failure. If this
 		option is set, fio will exit the job on the first observed
 		failure.
+
+verify_async=int	Fio will normally verify IO inline from the submitting
+		thread. This option takes an integer describing how many
+		async offload threads to create for IO verification instead,
+		causing fio to offload the duty of verifying IO contents
+		to one or more separate threads.
+
+verify_async_cpus=str	Tell fio to set the given CPU affinity on the
+		async IO verification threads. See cpus_allowed for the
+		format used.
 		
 stonewall	Wait for preceeding jobs in the job file to exit, before
 		starting this one. Can be used to insert serialization
diff --git a/file.h b/file.h
index 6aa5b50..dc22d4e 100644
--- a/file.h
+++ b/file.h
@@ -133,6 +133,7 @@
 extern int add_file(struct thread_data *, const char *);
 extern void get_file(struct fio_file *);
 extern int __must_check put_file(struct thread_data *, struct fio_file *);
+extern void put_file_log(struct thread_data *, struct fio_file *);
 extern void lock_file(struct thread_data *, struct fio_file *, enum fio_ddir);
 extern void unlock_file(struct thread_data *, struct fio_file *);
 extern void unlock_file_all(struct thread_data *, struct fio_file *);
diff --git a/fio.1 b/fio.1
index 32993b6..7b1fc80 100644
--- a/fio.1
+++ b/fio.1
@@ -598,6 +598,16 @@
 If true, exit the job on the first observed verification failure.  Default:
 false.
 .TP
+.BI verify_async \fR=\fPint
+Fio will normally verify IO inline from the submitting thread. This option
+takes an integer describing how many async offload threads to create for IO
+verification instead, causing fio to offload the duty of verifying IO contents
+to one or more separate threads.
+.TP
+.BI verify_async_cpus \fR=\fPstr
+Tell fio to set the given CPU affinity on the async IO verification threads.
+See \fBcpus_allowed\fP for the format used.
+.TP
 .B stonewall
 Wait for preceeding jobs in the job file to exit before starting this one.
 \fBstonewall\fR implies \fBnew_group\fR.
diff --git a/fio.c b/fio.c
index aabfee6..fc6dd8a 100644
--- a/fio.c
+++ b/fio.c
@@ -472,7 +472,10 @@
 			break;
 		}
 
-		io_u->end_io = verify_io_u;
+		if (td->o.verify_async)
+			io_u->end_io = verify_io_u_async;
+		else
+			io_u->end_io = verify_io_u;
 
 		ret = td_io_queue(td, io_u);
 		switch (ret) {
@@ -604,7 +607,10 @@
 		 * a previously written file.
 		 */
 		if (td->o.verify != VERIFY_NONE && io_u->ddir == DDIR_READ) {
-			io_u->end_io = verify_io_u;
+			if (td->o.verify_async)
+				io_u->end_io = verify_io_u_async;
+			else
+				io_u->end_io = verify_io_u;
 			td_set_runstate(td, TD_VERIFYING);
 		} else if (in_ramp_time(td))
 			td_set_runstate(td, TD_RAMP);
@@ -992,6 +998,7 @@
 {
 	unsigned long long runtime[2], elapsed;
 	struct thread_data *td = data;
+	pthread_condattr_t attr;
 	int clear_state;
 
 	if (!td->o.use_thread)
@@ -1006,8 +1013,14 @@
 	INIT_FLIST_HEAD(&td->io_u_requeues);
 	INIT_FLIST_HEAD(&td->io_log_list);
 	INIT_FLIST_HEAD(&td->io_hist_list);
+	INIT_FLIST_HEAD(&td->verify_list);
+	pthread_mutex_init(&td->io_u_lock, NULL);
 	td->io_hist_tree = RB_ROOT;
 
+	pthread_condattr_init(&attr);
+	pthread_cond_init(&td->verify_cond, &attr);
+	pthread_cond_init(&td->free_cond, &attr);
+
 	td_set_runstate(td, TD_INITIALIZED);
 	dprint(FD_MUTEX, "up startup_mutex\n");
 	fio_mutex_up(startup_mutex);
@@ -1031,7 +1044,10 @@
 	if (init_io_u(td))
 		goto err;
 
-	if (td->o.cpumask_set && fio_setaffinity(td) == -1) {
+	if (td->o.verify_async && verify_async_init(td))
+		goto err;
+
+	if (td->o.cpumask_set && fio_setaffinity(td->pid, td->o.cpumask) == -1) {
 		td_verror(td, errno, "cpu_set_affinity");
 		goto err;
 	}
@@ -1042,7 +1058,7 @@
 	 */
 	if (td->o.gtod_cpu) {
 		fio_cpu_clear(&td->o.cpumask, td->o.gtod_cpu);
-		if (fio_setaffinity(td) == -1) {
+		if (fio_setaffinity(td->pid, td->o.cpumask) == -1) {
 			td_verror(td, errno, "cpu_set_affinity");
 			goto err;
 		}
@@ -1184,6 +1200,9 @@
 		td_verror(td, ret, "fio_cpuset_exit");
 	}
 
+	if (td->o.verify_async)
+		verify_async_exit(td);
+
 	/*
 	 * do this very late, it will log file closing as well
 	 */
diff --git a/fio.h b/fio.h
index b19101c..fb70b46 100644
--- a/fio.h
+++ b/fio.h
@@ -174,6 +174,7 @@
 	unsigned int verify_pattern;
 	unsigned int verify_pattern_bytes;
 	unsigned int verify_fatal;
+	unsigned int verify_async;
 	unsigned int use_thread;
 	unsigned int unlink;
 	unsigned int do_disk_util;
@@ -209,6 +210,8 @@
 	unsigned int numjobs;
 	os_cpu_mask_t cpumask;
 	unsigned int cpumask_set;
+	os_cpu_mask_t verify_cpumask;
+	unsigned int verify_cpumask_set;
 	unsigned int iolog;
 	unsigned int rwmixcycle;
 	unsigned int rwmix[2];
@@ -319,6 +322,17 @@
 	struct flist_head io_u_freelist;
 	struct flist_head io_u_busylist;
 	struct flist_head io_u_requeues;
+	pthread_mutex_t io_u_lock;
+	pthread_cond_t free_cond;
+
+	/*
+	 * async verify offload
+	 */
+	struct flist_head verify_list;
+	pthread_t *verify_threads;
+	unsigned int nr_verify_threads;
+	pthread_cond_t verify_cond;
+	int verify_thread_exit;
 
 	/*
 	 * Rate state
@@ -661,4 +675,26 @@
 	return (val != 0 && ((val & (val - 1)) == 0));
 }
 
+/*
+ * We currently only need to do locking if we have verifier threads
+ * accessing our internal structures too
+ */
+static inline void td_io_u_lock(struct thread_data *td)
+{
+	if (td->o.verify_async)
+		pthread_mutex_lock(&td->io_u_lock);
+}
+
+static inline void td_io_u_unlock(struct thread_data *td)
+{
+	if (td->o.verify_async)
+		pthread_mutex_unlock(&td->io_u_lock);
+}
+
+static inline void td_io_u_free_notify(struct thread_data *td)
+{
+	if (td->o.verify_async)
+		pthread_cond_signal(&td->free_cond);
+}
+
 #endif
diff --git a/io_u.c b/io_u.c
index 2e9dac0..6549804 100644
--- a/io_u.c
+++ b/io_u.c
@@ -401,7 +401,7 @@
 	return td->rwmix_ddir;
 }
 
-static void put_file_log(struct thread_data *td, struct fio_file *f)
+void put_file_log(struct thread_data *td, struct fio_file *f)
 {
 	int ret = put_file(td, f);
 
@@ -411,16 +411,21 @@
 
 void put_io_u(struct thread_data *td, struct io_u *io_u)
 {
+	td_io_u_lock(td);
+
 	assert((io_u->flags & IO_U_F_FREE) == 0);
 	io_u->flags |= IO_U_F_FREE;
+	io_u->flags &= ~IO_U_F_FREE_DEF;
 
 	if (io_u->file)
 		put_file_log(td, io_u->file);
 
 	io_u->file = NULL;
-	flist_del(&io_u->list);
+	flist_del_init(&io_u->list);
 	flist_add(&io_u->list, &td->io_u_freelist);
 	td->cur_depth--;
+	td_io_u_unlock(td);
+	td_io_u_free_notify(td);
 }
 
 void clear_io_u(struct thread_data *td, struct io_u *io_u)
@@ -435,6 +440,8 @@
 
 	dprint(FD_IO, "requeue %p\n", __io_u);
 
+	td_io_u_lock(td);
+
 	__io_u->flags |= IO_U_F_FREE;
 	if ((__io_u->flags & IO_U_F_FLIGHT) && !ddir_sync(__io_u->ddir))
 		td->io_issues[__io_u->ddir]--;
@@ -444,6 +451,7 @@
 	flist_del(&__io_u->list);
 	flist_add_tail(&__io_u->list, &td->io_u_requeues);
 	td->cur_depth--;
+	td_io_u_unlock(td);
 	*io_u = NULL;
 }
 
@@ -826,6 +834,9 @@
 {
 	struct io_u *io_u = NULL;
 
+	td_io_u_lock(td);
+
+again:
 	if (!flist_empty(&td->io_u_requeues))
 		io_u = flist_entry(td->io_u_requeues.next, struct io_u, list);
 	else if (!queue_full(td)) {
@@ -837,9 +848,18 @@
 		io_u->end_io = NULL;
 	}
 
+	/*
+	 * We ran out, wait for async verify threads to finish and return one
+	 */
+	if (!io_u && td->o.verify_async) {
+		pthread_cond_wait(&td->free_cond, &td->io_u_lock);
+		goto again;
+	}
+
 	if (io_u) {
 		assert(io_u->flags & IO_U_F_FREE);
 		io_u->flags &= ~IO_U_F_FREE;
+		io_u->flags &= ~IO_U_F_FREE_DEF;
 
 		io_u->error = 0;
 		flist_del(&io_u->list);
@@ -847,6 +867,7 @@
 		td->cur_depth++;
 	}
 
+	td_io_u_unlock(td);
 	return io_u;
 }
 
@@ -1042,7 +1063,9 @@
 		io_u = td->io_ops->event(td, i);
 
 		io_completed(td, io_u, icd);
-		put_io_u(td, io_u);
+
+		if (!(io_u->flags & IO_U_F_FREE_DEF))
+			put_io_u(td, io_u);
 	}
 }
 
@@ -1056,7 +1079,9 @@
 
 	init_icd(td, &icd, 1);
 	io_completed(td, io_u, &icd);
-	put_io_u(td, io_u);
+
+	if (!(io_u->flags & IO_U_F_FREE_DEF))
+		put_io_u(td, io_u);
 
 	if (icd.error) {
 		td_verror(td, icd.error, "io_u_sync_complete");
diff --git a/ioengine.h b/ioengine.h
index f977799..3df0944 100644
--- a/ioengine.h
+++ b/ioengine.h
@@ -6,6 +6,7 @@
 enum {
 	IO_U_F_FREE	= 1 << 0,
 	IO_U_F_FLIGHT	= 1 << 1,
+	IO_U_F_FREE_DEF	= 1 << 2,
 };
 
 /*
diff --git a/options.c b/options.c
index 87515ef..0954ccd 100644
--- a/options.c
+++ b/options.c
@@ -303,14 +303,14 @@
 	return 0;
 }
 
-static int str_cpus_allowed_cb(void *data, const char *input)
+static int set_cpus_allowed(struct thread_data *td, os_cpu_mask_t *mask,
+			    const char *input)
 {
-	struct thread_data *td = data;
 	char *cpu, *str, *p;
 	long max_cpu;
 	int ret = 0;
 
-	ret = fio_cpuset_init(&td->o.cpumask);
+	ret = fio_cpuset_init(mask);
 	if (ret < 0) {
 		log_err("fio: cpuset_init failed\n");
 		td_verror(td, ret, "fio_cpuset_init");
@@ -358,7 +358,7 @@
 			}
 	
 			dprint(FD_PARSE, "set cpu allowed %d\n", icpu);
-			fio_cpu_set(&td->o.cpumask, icpu);
+			fio_cpu_set(mask, icpu);
 			icpu++;
 		}
 		if (ret)
@@ -370,6 +370,30 @@
 		td->o.cpumask_set = 1;
 	return ret;
 }
+
+static int str_cpus_allowed_cb(void *data, const char *input)
+{
+	struct thread_data *td = data;
+	int ret;
+
+	ret = set_cpus_allowed(td, &td->o.cpumask, input);
+	if (!ret)
+		td->o.cpumask_set = 1;
+
+	return ret;
+}
+
+static int str_verify_cpus_allowed_cb(void *data, const char *input)
+{
+	struct thread_data *td = data;
+	int ret;
+
+	ret = set_cpus_allowed(td, &td->o.verify_cpumask, input);
+	if (!ret)
+		td->o.verify_cpumask_set = 1;
+
+	return ret;
+}
 #endif
 
 static int str_fst_cb(void *data, const char *str)
@@ -1181,6 +1205,23 @@
 		.parent = "verify",
 	},
 	{
+		.name	= "verify_async",
+		.type	= FIO_OPT_INT,
+		.off1	= td_var_offset(verify_async),
+		.def	= "0",
+		.help	= "Number of async verifier threads to use",
+		.parent	= "verify",
+	},
+#ifdef FIO_HAVE_CPU_AFFINITY
+	{
+		.name	= "verify_async_cpus",
+		.type	= FIO_OPT_STR,
+		.cb	= str_verify_cpus_allowed_cb,
+		.help	= "Set CPUs allowed for async verify threads",
+		.parent	= "verify_async",
+	},
+#endif
+	{
 		.name	= "write_iolog",
 		.type	= FIO_OPT_STR_STORE,
 		.off1	= td_var_offset(write_iolog_file),
diff --git a/os/os-linux.h b/os/os-linux.h
index b766cbf..dd9c5aa 100644
--- a/os/os-linux.h
+++ b/os/os-linux.h
@@ -55,13 +55,13 @@
  * the affinity helpers to work.
  */
 #ifndef GLIBC_2_3_2
-#define fio_setaffinity(td)		\
-	sched_setaffinity((td)->pid, sizeof((td)->o.cpumask), &(td)->o.cpumask)
+#define fio_setaffinity(pid, cpumask)		\
+	sched_setaffinity((pid), sizeof(cpumask), &(cpumask))
 #define fio_getaffinity(pid, ptr)	\
 	sched_getaffinity((pid), sizeof(cpu_set_t), (ptr))
 #else
-#define fio_setaffinity(td)		\
-	sched_setaffinity((td)->pid, &(td)->o.cpumask)
+#define fio_setaffinity(pid, cpumask)	\
+	sched_setaffinity((pid), &(cpumask))
 #define fio_getaffinity(pid, ptr)	\
 	sched_getaffinity((pid), (ptr))
 #endif
diff --git a/os/os-solaris.h b/os/os-solaris.h
index b58d130..5672956 100644
--- a/os/os-solaris.h
+++ b/os/os-solaris.h
@@ -69,8 +69,8 @@
 /*
  * pset binding hooks for fio
  */
-#define fio_setaffinity(td)		\
-	pset_bind((td)->o.cpumask, P_PID, (td)->pid, NULL)
+#define fio_setaffinity(pid, cpumask)		\
+	pset_bind(&(cpumask), P_PID, (pid), NULL)
 #define fio_getaffinity(pid, ptr)	({ 0; })
 
 #define fio_cpu_clear(mask, cpu)	pset_assign(PS_NONE, (cpu), NULL)
diff --git a/os/os.h b/os/os.h
index dbf0957..10e796f 100644
--- a/os/os.h
+++ b/os/os.h
@@ -39,7 +39,7 @@
 #endif /* FIO_HAVE_FADVISE */
 
 #ifndef FIO_HAVE_CPU_AFFINITY
-#define fio_setaffinity(td)		(0)
+#define fio_setaffinity(pid, mask)	(0)
 #define fio_getaffinity(pid, mask)	do { } while (0)
 #define fio_cpu_clear(mask, cpu)	do { } while (0)
 #define fio_cpuset_exit(mask)		(-1)
diff --git a/verify.c b/verify.c
index 2ae74b9..5dd9ee3 100644
--- a/verify.c
+++ b/verify.c
@@ -5,9 +5,11 @@
 #include <fcntl.h>
 #include <string.h>
 #include <assert.h>
+#include <pthread.h>
 
 #include "fio.h"
 #include "verify.h"
+#include "smalloc.h"
 
 #include "crc/md5.h"
 #include "crc/crc64.h"
@@ -417,6 +419,26 @@
 	return 0;
 }
 
+/*
+ * Push IO verification to a separate thread
+ */
+int verify_io_u_async(struct thread_data *td, struct io_u *io_u)
+{
+	if (io_u->file)
+		put_file_log(td, io_u->file);
+
+	io_u->file = NULL;
+
+	pthread_mutex_lock(&td->io_u_lock);
+	flist_del(&io_u->list);
+	flist_add_tail(&io_u->list, &td->verify_list);
+	pthread_mutex_unlock(&td->io_u_lock);
+
+	pthread_cond_signal(&td->verify_cond);
+	io_u->flags |= IO_U_F_FREE_DEF;
+	return 0;
+}
+
 int verify_io_u(struct thread_data *td, struct io_u *io_u)
 {
 	struct verify_header *hdr;
@@ -720,3 +742,103 @@
 	dprint(FD_VERIFY, "get_next_verify: empty\n");
 	return 1;
 }
+
+static void *verify_async_thread(void *data)
+{
+	struct thread_data *td = data;
+	struct io_u *io_u;
+	int ret = 0;
+
+	if (td->o.verify_cpumask_set &&
+	    fio_setaffinity(td->pid, td->o.verify_cpumask)) {
+		log_err("fio: failed setting verify thread affinity\n");
+		goto done;
+	}
+
+	do {
+		read_barrier();
+		if (td->verify_thread_exit)
+			break;
+
+		pthread_mutex_lock(&td->io_u_lock);
+
+		while (flist_empty(&td->verify_list) &&
+		       !td->verify_thread_exit) {
+			ret = pthread_cond_wait(&td->verify_cond, &td->io_u_lock);
+			if (ret) {
+				pthread_mutex_unlock(&td->io_u_lock);
+				break;
+			}
+		}
+
+		if (flist_empty(&td->verify_list)) {
+			pthread_mutex_unlock(&td->io_u_lock);
+			continue;
+		}
+
+		io_u = flist_entry(td->verify_list.next, struct io_u, list);
+		flist_del_init(&io_u->list);
+		pthread_mutex_unlock(&td->io_u_lock);
+
+		ret = verify_io_u(td, io_u);
+		put_io_u(td, io_u);
+	} while (!ret);
+
+done:
+	pthread_mutex_lock(&td->io_u_lock);
+	td->nr_verify_threads--;
+	pthread_mutex_unlock(&td->io_u_lock);
+
+	pthread_cond_signal(&td->free_cond);
+	return NULL;
+}
+
+int verify_async_init(struct thread_data *td)
+{
+	int i, ret;
+
+	td->verify_thread_exit = 0;
+
+	td->verify_threads = malloc(sizeof(pthread_t) * td->o.verify_async);
+	for (i = 0; i < td->o.verify_async; i++) {
+		ret = pthread_create(&td->verify_threads[i], NULL,
+					verify_async_thread, td);
+		if (ret) {
+			log_err("fio: async verify creation failed: %s\n",
+					strerror(ret));
+			break;
+		}
+		ret = pthread_detach(td->verify_threads[i]);
+		if (ret) {
+			log_err("fio: async verify thread detach failed: %s\n",
+					strerror(ret));
+			break;
+		}
+		td->nr_verify_threads++;
+	}
+
+	if (i != td->o.verify_async) {
+		td->verify_thread_exit = 1;
+		write_barrier();
+		pthread_cond_broadcast(&td->verify_cond);
+		return 1;
+	}
+
+	return 0;
+}
+
+void verify_async_exit(struct thread_data *td)
+{
+	td->verify_thread_exit = 1;
+	write_barrier();
+	pthread_cond_broadcast(&td->verify_cond);
+
+	pthread_mutex_lock(&td->io_u_lock);
+
+	while (td->nr_verify_threads)
+		pthread_cond_wait(&td->free_cond, &td->io_u_lock);
+
+	pthread_mutex_unlock(&td->io_u_lock);
+	free(td->verify_threads);
+	td->verify_threads = NULL;
+}
diff --git a/verify.h b/verify.h
index 76d256d..50c8e43 100644
--- a/verify.h
+++ b/verify.h
@@ -64,5 +64,12 @@
 extern void populate_verify_io_u(struct thread_data *, struct io_u *);
 extern int __must_check get_next_verify(struct thread_data *td, struct io_u *);
 extern int __must_check verify_io_u(struct thread_data *, struct io_u *);
+extern int verify_io_u_async(struct thread_data *, struct io_u *);
+
+/*
+ * Async verify offload
+ */
+extern int verify_async_init(struct thread_data *);
+extern void verify_async_exit(struct thread_data *);
 
 #endif