Add support for async IO verification offload
This adds support for setting up a number of IO verification offload
threads, instead of doing the offload inline. An option for controlling
the CPU affinity of those threads are always added.
Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
diff --git a/HOWTO b/HOWTO
index 708eca0..55662d3 100644
--- a/HOWTO
+++ b/HOWTO
@@ -827,6 +827,16 @@
before quitting on a block verification failure. If this
option is set, fio will exit the job on the first observed
failure.
+
+verify_async=int Fio will normally verify IO inline from the submitting
+ thread. This option takes an integer describing how many
+ async offload threads to create for IO verification instead,
+ causing fio to offload the duty of verifying IO contents
+ to one or more separate threads.
+
+verify_async_cpus=str Tell fio to set the given CPU affinity on the
+ async IO verification threads. See cpus_allowed for the
+ format used.
stonewall Wait for preceeding jobs in the job file to exit, before
starting this one. Can be used to insert serialization
diff --git a/file.h b/file.h
index 6aa5b50..dc22d4e 100644
--- a/file.h
+++ b/file.h
@@ -133,6 +133,7 @@
extern int add_file(struct thread_data *, const char *);
extern void get_file(struct fio_file *);
extern int __must_check put_file(struct thread_data *, struct fio_file *);
+extern void put_file_log(struct thread_data *, struct fio_file *);
extern void lock_file(struct thread_data *, struct fio_file *, enum fio_ddir);
extern void unlock_file(struct thread_data *, struct fio_file *);
extern void unlock_file_all(struct thread_data *, struct fio_file *);
diff --git a/fio.1 b/fio.1
index 32993b6..7b1fc80 100644
--- a/fio.1
+++ b/fio.1
@@ -598,6 +598,16 @@
If true, exit the job on the first observed verification failure. Default:
false.
.TP
+.BI verify_async \fR=\fPint
+Fio will normally verify IO inline from the submitting thread. This option
+takes an integer describing how many async offload threads to create for IO
+verification instead, causing fio to offload the duty of verifying IO contents
+to one or more separate threads.
+.TP
+.BI verify_async_cpus \fR=\fPstr
+Tell fio to set the given CPU affinity on the async IO verification threads.
+See \fBcpus_allowed\fP for the format used.
+.TP
.B stonewall
Wait for preceeding jobs in the job file to exit before starting this one.
\fBstonewall\fR implies \fBnew_group\fR.
diff --git a/fio.c b/fio.c
index aabfee6..fc6dd8a 100644
--- a/fio.c
+++ b/fio.c
@@ -472,7 +472,10 @@
break;
}
- io_u->end_io = verify_io_u;
+ if (td->o.verify_async)
+ io_u->end_io = verify_io_u_async;
+ else
+ io_u->end_io = verify_io_u;
ret = td_io_queue(td, io_u);
switch (ret) {
@@ -604,7 +607,10 @@
* a previously written file.
*/
if (td->o.verify != VERIFY_NONE && io_u->ddir == DDIR_READ) {
- io_u->end_io = verify_io_u;
+ if (td->o.verify_async)
+ io_u->end_io = verify_io_u_async;
+ else
+ io_u->end_io = verify_io_u;
td_set_runstate(td, TD_VERIFYING);
} else if (in_ramp_time(td))
td_set_runstate(td, TD_RAMP);
@@ -992,6 +998,7 @@
{
unsigned long long runtime[2], elapsed;
struct thread_data *td = data;
+ pthread_condattr_t attr;
int clear_state;
if (!td->o.use_thread)
@@ -1006,8 +1013,14 @@
INIT_FLIST_HEAD(&td->io_u_requeues);
INIT_FLIST_HEAD(&td->io_log_list);
INIT_FLIST_HEAD(&td->io_hist_list);
+ INIT_FLIST_HEAD(&td->verify_list);
+ pthread_mutex_init(&td->io_u_lock, NULL);
td->io_hist_tree = RB_ROOT;
+ pthread_condattr_init(&attr);
+ pthread_cond_init(&td->verify_cond, &attr);
+ pthread_cond_init(&td->free_cond, &attr);
+
td_set_runstate(td, TD_INITIALIZED);
dprint(FD_MUTEX, "up startup_mutex\n");
fio_mutex_up(startup_mutex);
@@ -1031,7 +1044,10 @@
if (init_io_u(td))
goto err;
- if (td->o.cpumask_set && fio_setaffinity(td) == -1) {
+ if (td->o.verify_async && verify_async_init(td))
+ goto err;
+
+ if (td->o.cpumask_set && fio_setaffinity(td->pid, td->o.cpumask) == -1) {
td_verror(td, errno, "cpu_set_affinity");
goto err;
}
@@ -1042,7 +1058,7 @@
*/
if (td->o.gtod_cpu) {
fio_cpu_clear(&td->o.cpumask, td->o.gtod_cpu);
- if (fio_setaffinity(td) == -1) {
+ if (fio_setaffinity(td->pid, td->o.cpumask) == -1) {
td_verror(td, errno, "cpu_set_affinity");
goto err;
}
@@ -1184,6 +1200,9 @@
td_verror(td, ret, "fio_cpuset_exit");
}
+ if (td->o.verify_async)
+ verify_async_exit(td);
+
/*
* do this very late, it will log file closing as well
*/
diff --git a/fio.h b/fio.h
index b19101c..fb70b46 100644
--- a/fio.h
+++ b/fio.h
@@ -174,6 +174,7 @@
unsigned int verify_pattern;
unsigned int verify_pattern_bytes;
unsigned int verify_fatal;
+ unsigned int verify_async;
unsigned int use_thread;
unsigned int unlink;
unsigned int do_disk_util;
@@ -209,6 +210,8 @@
unsigned int numjobs;
os_cpu_mask_t cpumask;
unsigned int cpumask_set;
+ os_cpu_mask_t verify_cpumask;
+ unsigned int verify_cpumask_set;
unsigned int iolog;
unsigned int rwmixcycle;
unsigned int rwmix[2];
@@ -319,6 +322,17 @@
struct flist_head io_u_freelist;
struct flist_head io_u_busylist;
struct flist_head io_u_requeues;
+ pthread_mutex_t io_u_lock;
+ pthread_cond_t free_cond;
+
+ /*
+ * async verify offload
+ */
+ struct flist_head verify_list;
+ pthread_t *verify_threads;
+ unsigned int nr_verify_threads;
+ pthread_cond_t verify_cond;
+ int verify_thread_exit;
/*
* Rate state
@@ -661,4 +675,26 @@
return (val != 0 && ((val & (val - 1)) == 0));
}
+/*
+ * We currently only need to do locking if we have verifier threads
+ * accessing our internal structures too
+ */
+static inline void td_io_u_lock(struct thread_data *td)
+{
+ if (td->o.verify_async)
+ pthread_mutex_lock(&td->io_u_lock);
+}
+
+static inline void td_io_u_unlock(struct thread_data *td)
+{
+ if (td->o.verify_async)
+ pthread_mutex_unlock(&td->io_u_lock);
+}
+
+static inline void td_io_u_free_notify(struct thread_data *td)
+{
+ if (td->o.verify_async)
+ pthread_cond_signal(&td->free_cond);
+}
+
#endif
diff --git a/io_u.c b/io_u.c
index 2e9dac0..6549804 100644
--- a/io_u.c
+++ b/io_u.c
@@ -401,7 +401,7 @@
return td->rwmix_ddir;
}
-static void put_file_log(struct thread_data *td, struct fio_file *f)
+void put_file_log(struct thread_data *td, struct fio_file *f)
{
int ret = put_file(td, f);
@@ -411,16 +411,21 @@
void put_io_u(struct thread_data *td, struct io_u *io_u)
{
+ td_io_u_lock(td);
+
assert((io_u->flags & IO_U_F_FREE) == 0);
io_u->flags |= IO_U_F_FREE;
+ io_u->flags &= ~IO_U_F_FREE_DEF;
if (io_u->file)
put_file_log(td, io_u->file);
io_u->file = NULL;
- flist_del(&io_u->list);
+ flist_del_init(&io_u->list);
flist_add(&io_u->list, &td->io_u_freelist);
td->cur_depth--;
+ td_io_u_unlock(td);
+ td_io_u_free_notify(td);
}
void clear_io_u(struct thread_data *td, struct io_u *io_u)
@@ -435,6 +440,8 @@
dprint(FD_IO, "requeue %p\n", __io_u);
+ td_io_u_lock(td);
+
__io_u->flags |= IO_U_F_FREE;
if ((__io_u->flags & IO_U_F_FLIGHT) && !ddir_sync(__io_u->ddir))
td->io_issues[__io_u->ddir]--;
@@ -444,6 +451,7 @@
flist_del(&__io_u->list);
flist_add_tail(&__io_u->list, &td->io_u_requeues);
td->cur_depth--;
+ td_io_u_unlock(td);
*io_u = NULL;
}
@@ -826,6 +834,9 @@
{
struct io_u *io_u = NULL;
+ td_io_u_lock(td);
+
+again:
if (!flist_empty(&td->io_u_requeues))
io_u = flist_entry(td->io_u_requeues.next, struct io_u, list);
else if (!queue_full(td)) {
@@ -837,9 +848,18 @@
io_u->end_io = NULL;
}
+ /*
+ * We ran out, wait for async verify threads to finish and return one
+ */
+ if (!io_u && td->o.verify_async) {
+ pthread_cond_wait(&td->free_cond, &td->io_u_lock);
+ goto again;
+ }
+
if (io_u) {
assert(io_u->flags & IO_U_F_FREE);
io_u->flags &= ~IO_U_F_FREE;
+ io_u->flags &= ~IO_U_F_FREE_DEF;
io_u->error = 0;
flist_del(&io_u->list);
@@ -847,6 +867,7 @@
td->cur_depth++;
}
+ td_io_u_unlock(td);
return io_u;
}
@@ -1042,7 +1063,9 @@
io_u = td->io_ops->event(td, i);
io_completed(td, io_u, icd);
- put_io_u(td, io_u);
+
+ if (!(io_u->flags & IO_U_F_FREE_DEF))
+ put_io_u(td, io_u);
}
}
@@ -1056,7 +1079,9 @@
init_icd(td, &icd, 1);
io_completed(td, io_u, &icd);
- put_io_u(td, io_u);
+
+ if (!(io_u->flags & IO_U_F_FREE_DEF))
+ put_io_u(td, io_u);
if (icd.error) {
td_verror(td, icd.error, "io_u_sync_complete");
diff --git a/ioengine.h b/ioengine.h
index f977799..3df0944 100644
--- a/ioengine.h
+++ b/ioengine.h
@@ -6,6 +6,7 @@
enum {
IO_U_F_FREE = 1 << 0,
IO_U_F_FLIGHT = 1 << 1,
+ IO_U_F_FREE_DEF = 1 << 2,
};
/*
diff --git a/options.c b/options.c
index 87515ef..0954ccd 100644
--- a/options.c
+++ b/options.c
@@ -303,14 +303,14 @@
return 0;
}
-static int str_cpus_allowed_cb(void *data, const char *input)
+static int set_cpus_allowed(struct thread_data *td, os_cpu_mask_t *mask,
+ const char *input)
{
- struct thread_data *td = data;
char *cpu, *str, *p;
long max_cpu;
int ret = 0;
- ret = fio_cpuset_init(&td->o.cpumask);
+ ret = fio_cpuset_init(mask);
if (ret < 0) {
log_err("fio: cpuset_init failed\n");
td_verror(td, ret, "fio_cpuset_init");
@@ -358,7 +358,7 @@
}
dprint(FD_PARSE, "set cpu allowed %d\n", icpu);
- fio_cpu_set(&td->o.cpumask, icpu);
+ fio_cpu_set(mask, icpu);
icpu++;
}
if (ret)
@@ -370,6 +370,30 @@
td->o.cpumask_set = 1;
return ret;
}
+
+static int str_cpus_allowed_cb(void *data, const char *input)
+{
+ struct thread_data *td = data;
+ int ret;
+
+ ret = set_cpus_allowed(td, &td->o.cpumask, input);
+ if (!ret)
+ td->o.cpumask_set = 1;
+
+ return ret;
+}
+
+static int str_verify_cpus_allowed_cb(void *data, const char *input)
+{
+ struct thread_data *td = data;
+ int ret;
+
+ ret = set_cpus_allowed(td, &td->o.verify_cpumask, input);
+ if (!ret)
+ td->o.verify_cpumask_set = 1;
+
+ return ret;
+}
#endif
static int str_fst_cb(void *data, const char *str)
@@ -1181,6 +1205,23 @@
.parent = "verify",
},
{
+ .name = "verify_async",
+ .type = FIO_OPT_INT,
+ .off1 = td_var_offset(verify_async),
+ .def = "0",
+ .help = "Number of async verifier threads to use",
+ .parent = "verify",
+ },
+#ifdef FIO_HAVE_CPU_AFFINITY
+ {
+ .name = "verify_async_cpus",
+ .type = FIO_OPT_STR,
+ .cb = str_verify_cpus_allowed_cb,
+ .help = "Set CPUs allowed for async verify threads",
+ .parent = "verify_async",
+ },
+#endif
+ {
.name = "write_iolog",
.type = FIO_OPT_STR_STORE,
.off1 = td_var_offset(write_iolog_file),
diff --git a/os/os-linux.h b/os/os-linux.h
index b766cbf..dd9c5aa 100644
--- a/os/os-linux.h
+++ b/os/os-linux.h
@@ -55,13 +55,13 @@
* the affinity helpers to work.
*/
#ifndef GLIBC_2_3_2
-#define fio_setaffinity(td) \
- sched_setaffinity((td)->pid, sizeof((td)->o.cpumask), &(td)->o.cpumask)
+#define fio_setaffinity(pid, cpumask) \
+ sched_setaffinity((pid), sizeof(cpumask), &(cpumask))
#define fio_getaffinity(pid, ptr) \
sched_getaffinity((pid), sizeof(cpu_set_t), (ptr))
#else
-#define fio_setaffinity(td) \
- sched_setaffinity((td)->pid, &(td)->o.cpumask)
+#define fio_setaffinity(pid, cpumask) \
+ sched_setaffinity((pid), &(cpumask))
#define fio_getaffinity(pid, ptr) \
sched_getaffinity((pid), (ptr))
#endif
diff --git a/os/os-solaris.h b/os/os-solaris.h
index b58d130..5672956 100644
--- a/os/os-solaris.h
+++ b/os/os-solaris.h
@@ -69,8 +69,8 @@
/*
* pset binding hooks for fio
*/
-#define fio_setaffinity(td) \
- pset_bind((td)->o.cpumask, P_PID, (td)->pid, NULL)
+#define fio_setaffinity(pid, cpumask) \
+ pset_bind(&(cpumask), P_PID, (pid), NULL)
#define fio_getaffinity(pid, ptr) ({ 0; })
#define fio_cpu_clear(mask, cpu) pset_assign(PS_NONE, (cpu), NULL)
diff --git a/os/os.h b/os/os.h
index dbf0957..10e796f 100644
--- a/os/os.h
+++ b/os/os.h
@@ -39,7 +39,7 @@
#endif /* FIO_HAVE_FADVISE */
#ifndef FIO_HAVE_CPU_AFFINITY
-#define fio_setaffinity(td) (0)
+#define fio_setaffinity(pid, mask) (0)
#define fio_getaffinity(pid, mask) do { } while (0)
#define fio_cpu_clear(mask, cpu) do { } while (0)
#define fio_cpuset_exit(mask) (-1)
diff --git a/verify.c b/verify.c
index 2ae74b9..5dd9ee3 100644
--- a/verify.c
+++ b/verify.c
@@ -5,9 +5,11 @@
#include <fcntl.h>
#include <string.h>
#include <assert.h>
+#include <pthread.h>
#include "fio.h"
#include "verify.h"
+#include "smalloc.h"
#include "crc/md5.h"
#include "crc/crc64.h"
@@ -417,6 +419,26 @@
return 0;
}
+/*
+ * Push IO verification to a separate thread
+ */
+int verify_io_u_async(struct thread_data *td, struct io_u *io_u)
+{
+ if (io_u->file)
+ put_file_log(td, io_u->file);
+
+ io_u->file = NULL;
+
+ pthread_mutex_lock(&td->io_u_lock);
+ flist_del(&io_u->list);
+ flist_add_tail(&io_u->list, &td->verify_list);
+ pthread_mutex_unlock(&td->io_u_lock);
+
+ pthread_cond_signal(&td->verify_cond);
+ io_u->flags |= IO_U_F_FREE_DEF;
+ return 0;
+}
+
int verify_io_u(struct thread_data *td, struct io_u *io_u)
{
struct verify_header *hdr;
@@ -720,3 +742,103 @@
dprint(FD_VERIFY, "get_next_verify: empty\n");
return 1;
}
+
+static void *verify_async_thread(void *data)
+{
+ struct thread_data *td = data;
+ struct io_u *io_u;
+ int ret = 0;
+
+ if (td->o.verify_cpumask_set &&
+ fio_setaffinity(td->pid, td->o.verify_cpumask)) {
+ log_err("fio: failed setting verify thread affinity\n");
+ goto done;
+ }
+
+ do {
+ read_barrier();
+ if (td->verify_thread_exit)
+ break;
+
+ pthread_mutex_lock(&td->io_u_lock);
+
+ while (flist_empty(&td->verify_list) &&
+ !td->verify_thread_exit) {
+ ret = pthread_cond_wait(&td->verify_cond, &td->io_u_lock);
+ if (ret) {
+ pthread_mutex_unlock(&td->io_u_lock);
+ break;
+ }
+ }
+
+ if (flist_empty(&td->verify_list)) {
+ pthread_mutex_unlock(&td->io_u_lock);
+ continue;
+ }
+
+ io_u = flist_entry(td->verify_list.next, struct io_u, list);
+ flist_del_init(&io_u->list);
+ pthread_mutex_unlock(&td->io_u_lock);
+
+ ret = verify_io_u(td, io_u);
+ put_io_u(td, io_u);
+ } while (!ret);
+
+done:
+ pthread_mutex_lock(&td->io_u_lock);
+ td->nr_verify_threads--;
+ pthread_mutex_unlock(&td->io_u_lock);
+
+ pthread_cond_signal(&td->free_cond);
+ return NULL;
+}
+
+int verify_async_init(struct thread_data *td)
+{
+ int i, ret;
+
+ td->verify_thread_exit = 0;
+
+ td->verify_threads = malloc(sizeof(pthread_t) * td->o.verify_async);
+ for (i = 0; i < td->o.verify_async; i++) {
+ ret = pthread_create(&td->verify_threads[i], NULL,
+ verify_async_thread, td);
+ if (ret) {
+ log_err("fio: async verify creation failed: %s\n",
+ strerror(ret));
+ break;
+ }
+ ret = pthread_detach(td->verify_threads[i]);
+ if (ret) {
+ log_err("fio: async verify thread detach failed: %s\n",
+ strerror(ret));
+ break;
+ }
+ td->nr_verify_threads++;
+ }
+
+ if (i != td->o.verify_async) {
+ td->verify_thread_exit = 1;
+ write_barrier();
+ pthread_cond_broadcast(&td->verify_cond);
+ return 1;
+ }
+
+ return 0;
+}
+
+void verify_async_exit(struct thread_data *td)
+{
+ td->verify_thread_exit = 1;
+ write_barrier();
+ pthread_cond_broadcast(&td->verify_cond);
+
+ pthread_mutex_lock(&td->io_u_lock);
+
+ while (td->nr_verify_threads)
+ pthread_cond_wait(&td->free_cond, &td->io_u_lock);
+
+ pthread_mutex_unlock(&td->io_u_lock);
+ free(td->verify_threads);
+ td->verify_threads = NULL;
+}
diff --git a/verify.h b/verify.h
index 76d256d..50c8e43 100644
--- a/verify.h
+++ b/verify.h
@@ -64,5 +64,12 @@
extern void populate_verify_io_u(struct thread_data *, struct io_u *);
extern int __must_check get_next_verify(struct thread_data *td, struct io_u *);
extern int __must_check verify_io_u(struct thread_data *, struct io_u *);
+extern int verify_io_u_async(struct thread_data *, struct io_u *);
+
+/*
+ * Async verify offload
+ */
+extern int verify_async_init(struct thread_data *);
+extern void verify_async_exit(struct thread_data *);
#endif