Fully parallellize io_u verification
Keep a full queue whenever possible, do verifications while
io is in progress.
Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
diff --git a/fio.c b/fio.c
index 5358af2..eb857af 100644
--- a/fio.c
+++ b/fio.c
@@ -174,7 +174,7 @@
*/
r = td_io_getevents(td, 0, td->cur_depth, &ts);
if (r > 0) {
- icd.nr = r;
+ init_icd(&icd, NULL, r);
ios_completed(td, &icd);
}
@@ -194,7 +194,7 @@
if (td->cur_depth) {
r = td_io_getevents(td, td->cur_depth, td->cur_depth, NULL);
if (r > 0) {
- icd.nr = r;
+ init_icd(&icd, NULL, r);
ios_completed(td, &icd);
}
}
@@ -233,7 +233,7 @@
return 1;
}
- icd.nr = ret;
+ init_icd(&icd, NULL, ret);
ios_completed(td, &icd);
if (icd.error) {
td_verror(td, icd.error);
@@ -245,7 +245,7 @@
return 1;
}
- init_icd(&icd);
+ init_icd(&icd, NULL, 1);
io_completed(td, io_u, &icd);
put_io_u(td, io_u);
}
@@ -261,7 +261,7 @@
{
struct fio_file *f;
struct io_u *io_u;
- int ret, i;
+ int ret, i, min_events;
/*
* sync io first and invalidate cache, to make sure we really
@@ -276,6 +276,9 @@
io_u = NULL;
while (!td->terminate) {
+ struct io_completion_data icd;
+ struct timespec *timeout;
+
io_u = __get_io_u(td);
if (!io_u)
break;
@@ -303,10 +306,13 @@
io_u->xfer_buf += bytes;
goto requeue;
}
- if (do_io_u_verify(td, &io_u)) {
- ret = -EIO;
+ init_icd(&icd, verify_io_u, 1);
+ io_completed(td, io_u, &icd);
+ if (icd.error) {
+ ret = icd.error;
break;
}
+ put_io_u(td, io_u);
continue;
case FIO_Q_QUEUED:
break;
@@ -316,20 +322,42 @@
break;
}
- /*
- * We get here for a queued request, in the future we
- * want to later make this take full advantage of
- * keeping IO in flight while verifying others.
- */
- ret = td_io_getevents(td, 1, 1, NULL);
if (ret < 0)
break;
- assert(ret == 1);
- io_u = td->io_ops->event(td, 0);
+ /*
+ * if we can queue more, do so. but check if there are
+ * completed io_u's first.
+ */
+ if (queue_full(td)) {
+ timeout = NULL;
+ min_events = 1;
+ } else {
+ struct timespec ts;
- if (do_io_u_verify(td, &io_u))
+ ts.tv_sec = 0;
+ ts.tv_nsec = 0;
+ timeout = &ts;
+ min_events = 0;
+ }
+
+ /*
+ * Reap required number of io units, if any, and do the
+ * verification on them through the callback handler
+ */
+ ret = td_io_getevents(td, min_events, td->cur_depth, timeout);
+ if (ret < 0)
break;
+ else if (!ret)
+ continue;
+
+ init_icd(&icd, verify_io_u, ret);
+ ios_completed(td, &icd);
+
+ if (icd.error) {
+ td_verror(td, icd.error);
+ break;
+ }
}
if (io_u)
@@ -413,7 +441,7 @@
io_u->xfer_buf += bytes;
goto requeue;
}
- init_icd(&icd);
+ init_icd(&icd, NULL, 1);
io_completed(td, io_u, &icd);
put_io_u(td, io_u);
break;
@@ -450,7 +478,7 @@
} else if (!ret)
continue;
- icd.nr = ret;
+ init_icd(&icd, NULL, ret);
ios_completed(td, &icd);
if (icd.error) {
td_verror(td, icd.error);
diff --git a/fio.h b/fio.h
index b087f97..e5e754a 100644
--- a/fio.h
+++ b/fio.h
@@ -494,8 +494,10 @@
/*
* Used for passing io_u completion data
*/
+typedef int (icd_handler)(struct io_u *);
struct io_completion_data {
int nr; /* input */
+ icd_handler *handler; /* input */
int error; /* output */
unsigned long bytes_done[2]; /* output */
@@ -597,7 +599,7 @@
*/
extern void populate_verify_io_u(struct thread_data *, struct io_u *);
extern int get_next_verify(struct thread_data *td, struct io_u *);
-extern int do_io_u_verify(struct thread_data *, struct io_u **);
+extern int verify_io_u(struct io_u *);
/*
* Memory helpers
@@ -616,7 +618,7 @@
extern void put_io_u(struct thread_data *, struct io_u *);
extern void ios_completed(struct thread_data *, struct io_completion_data *);
extern void io_completed(struct thread_data *, struct io_u *, struct io_completion_data *);
-extern void init_icd(struct io_completion_data *);
+extern void init_icd(struct io_completion_data *, icd_handler *, int);
/*
* io engine entry points
diff --git a/io_u.c b/io_u.c
index 6439979..ebcc8bb 100644
--- a/io_u.c
+++ b/io_u.c
@@ -389,6 +389,7 @@
if (!io_u->error) {
unsigned int bytes = io_u->buflen - io_u->resid;
const enum fio_ddir idx = io_u->ddir;
+ int ret;
td->io_blocks[idx]++;
td->io_bytes[idx] += bytes;
@@ -407,14 +408,23 @@
log_io_piece(td, io_u);
icd->bytes_done[idx] += bytes;
+
+ if (icd->handler) {
+ ret = icd->handler(io_u);
+ if (ret && !icd->error)
+ icd->error = ret;
+ }
} else
icd->error = io_u->error;
}
-void init_icd(struct io_completion_data *icd)
+void init_icd(struct io_completion_data *icd, icd_handler *handler, int nr)
{
fio_gettime(&icd->time, NULL);
+ icd->handler = handler;
+ icd->nr = nr;
+
icd->error = 0;
icd->bytes_done[0] = icd->bytes_done[1] = 0;
}
@@ -424,8 +434,6 @@
struct io_u *io_u;
int i;
- init_icd(icd);
-
for (i = 0; i < icd->nr; i++) {
io_u = td->io_ops->event(td, i);
diff --git a/verify.c b/verify.c
index 32cfdd7..f2ef107 100644
--- a/verify.c
+++ b/verify.c
@@ -78,7 +78,7 @@
return 0;
}
-static int verify_io_u(struct io_u *io_u)
+int verify_io_u(struct io_u *io_u)
{
struct verify_header *hdr = (struct verify_header *) io_u->buf;
int ret;
@@ -160,21 +160,3 @@
return 1;
}
-
-int do_io_u_verify(struct thread_data *td, struct io_u **io_u)
-{
- struct io_u *v_io_u = *io_u;
- int ret = 0;
-
- if (v_io_u) {
- struct io_completion_data icd;
-
- ret = verify_io_u(v_io_u);
- init_icd(&icd);
- io_completed(td, v_io_u, &icd);
- put_io_u(td, v_io_u);
- *io_u = NULL;
- }
-
- return ret;
-}