Cleanup end IO handling
Abstract out the end IO handling, so that callers don't have to
keep track of completion details. Then we can make the
io_completion_data structure private to io_u, and just provide to
functions to end io - one for sync completes, one for queued completes.
Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
diff --git a/fio.c b/fio.c
index 3e55f92..78deaf6 100644
--- a/fio.c
+++ b/fio.c
@@ -163,20 +163,14 @@
*/
static void cleanup_pending_aio(struct thread_data *td)
{
- struct timespec ts = { .tv_sec = 0, .tv_nsec = 0};
struct list_head *entry, *n;
- struct io_completion_data icd;
struct io_u *io_u;
int r;
/*
* get immediately available events, if any
*/
- r = td_io_getevents(td, 0, td->cur_depth, &ts);
- if (r > 0) {
- init_icd(&icd, NULL, r);
- ios_completed(td, &icd);
- }
+ io_u_queued_complete(td, 0, NULL);
/*
* now cancel remaining active events
@@ -191,13 +185,8 @@
}
}
- if (td->cur_depth) {
- r = td_io_getevents(td, td->cur_depth, td->cur_depth, NULL);
- if (r > 0) {
- init_icd(&icd, NULL, r);
- ios_completed(td, &icd);
- }
- }
+ if (td->cur_depth)
+ io_u_queued_complete(td, td->cur_depth, NULL);
}
/*
@@ -207,7 +196,6 @@
static int fio_io_sync(struct thread_data *td, struct fio_file *f)
{
struct io_u *io_u = __get_io_u(td);
- struct io_completion_data icd;
int ret;
if (!io_u)
@@ -227,27 +215,15 @@
put_io_u(td, io_u);
return 1;
} else if (ret == FIO_Q_QUEUED) {
- ret = td_io_getevents(td, 1, td->cur_depth, NULL);
- if (ret < 0) {
- td_verror(td, -ret);
+ if (io_u_queued_complete(td, 1, NULL))
return 1;
- }
-
- init_icd(&icd, NULL, ret);
- ios_completed(td, &icd);
- if (icd.error) {
- td_verror(td, icd.error);
- return 1;
- }
} else if (ret == FIO_Q_COMPLETED) {
if (io_u->error) {
td_verror(td, io_u->error);
return 1;
}
- init_icd(&icd, NULL, 1);
- io_completed(td, io_u, &icd);
- put_io_u(td, io_u);
+ io_u_sync_complete(td, io_u, NULL);
}
return 0;
@@ -276,9 +252,6 @@
io_u = NULL;
while (!td->terminate) {
- struct io_completion_data icd;
- struct timespec *timeout;
-
io_u = __get_io_u(td);
if (!io_u)
break;
@@ -306,13 +279,8 @@
io_u->xfer_buf += bytes;
goto requeue;
}
- init_icd(&icd, verify_io_u, 1);
- io_completed(td, io_u, &icd);
- if (icd.error) {
- ret = icd.error;
+ if (io_u_sync_complete(td, io_u, verify_io_u))
break;
- }
- put_io_u(td, io_u);
continue;
case FIO_Q_QUEUED:
break;
@@ -329,36 +297,16 @@
* if we can queue more, do so. but check if there are
* completed io_u's first.
*/
- if (queue_full(td)) {
- timeout = NULL;
+ min_events = 0;
+ if (queue_full(td))
min_events = 1;
- } else {
- struct timespec ts;
-
- ts.tv_sec = 0;
- ts.tv_nsec = 0;
- timeout = &ts;
- min_events = 0;
- }
/*
* Reap required number of io units, if any, and do the
* verification on them through the callback handler
*/
- ret = td_io_getevents(td, min_events, td->cur_depth, timeout);
- if (ret < 0) {
- td_verror(td, -ret);
+ if (io_u_queued_complete(td, min_events, verify_io_u))
break;
- } else if (!ret)
- continue;
-
- init_icd(&icd, verify_io_u, ret);
- ios_completed(td, &icd);
-
- if (icd.error) {
- td_verror(td, icd.error);
- break;
- }
}
if (io_u)
@@ -401,7 +349,6 @@
*/
static void do_io(struct thread_data *td)
{
- struct io_completion_data icd;
struct timeval s;
unsigned long usec;
struct fio_file *f;
@@ -410,7 +357,8 @@
td_set_runstate(td, TD_RUNNING);
while ((td->this_io_bytes[0] + td->this_io_bytes[1]) < td->io_size) {
- struct timespec *timeout;
+ struct timeval comp_time;
+ long bytes_done = 0;
int min_evts = 0;
struct io_u *io_u;
@@ -426,6 +374,11 @@
break;
memcpy(&s, &io_u->start_time, sizeof(s));
+
+ if (runtime_exceeded(td, &s)) {
+ put_io_u(td, io_u);
+ break;
+ }
requeue:
ret = td_io_queue(td, io_u);
@@ -442,9 +395,8 @@
io_u->xfer_buf += bytes;
goto requeue;
}
- init_icd(&icd, NULL, 1);
- io_completed(td, io_u, &icd);
- put_io_u(td, io_u);
+ fio_gettime(&comp_time, NULL);
+ bytes_done = io_u_sync_complete(td, io_u, NULL);
break;
case FIO_Q_QUEUED:
break;
@@ -459,53 +411,39 @@
add_slat_sample(td, io_u->ddir, mtime_since(&io_u->start_time, &io_u->issue_time));
+ /*
+ * See if we need to complete some commands
+ */
if (ret == FIO_Q_QUEUED) {
- if (td->cur_depth < td->iodepth) {
- struct timespec ts;
-
- ts.tv_sec = 0;
- ts.tv_nsec = 0;
- timeout = &ts;
- min_evts = 0;
- } else {
- timeout = NULL;
+ min_evts = 0;
+ if (queue_full(td))
min_evts = 1;
- }
- ret = td_io_getevents(td, min_evts, td->cur_depth, timeout);
- if (ret < 0) {
- td_verror(td, -ret);
+ fio_gettime(&comp_time, NULL);
+ bytes_done = io_u_queued_complete(td, min_evts, NULL);
+ if (bytes_done < 0)
break;
- } else if (!ret)
- continue;
-
- init_icd(&icd, NULL, ret);
- ios_completed(td, &icd);
- if (icd.error) {
- td_verror(td, icd.error);
- break;
- }
}
+ if (!bytes_done)
+ continue;
+
/*
* the rate is batched for now, it should work for batches
* of completions except the very first one which may look
* a little bursty
*/
- usec = utime_since(&s, &icd.time);
+ usec = utime_since(&s, &comp_time);
- rate_throttle(td, usec, icd.bytes_done[td->ddir], td->ddir);
+ rate_throttle(td, usec, bytes_done, td->ddir);
- if (check_min_rate(td, &icd.time)) {
+ if (check_min_rate(td, &comp_time)) {
if (exitall_on_terminate)
terminate_threads(td->groupid, 0);
td_verror(td, ENODATA);
break;
}
- if (runtime_exceeded(td, &icd.time))
- break;
-
if (td->thinktime) {
unsigned long long b;
diff --git a/fio.h b/fio.h
index e5e754a..aa66ecd 100644
--- a/fio.h
+++ b/fio.h
@@ -492,17 +492,9 @@
};
/*
- * Used for passing io_u completion data
+ * Callback for io completion
*/
-typedef int (icd_handler)(struct io_u *);
-struct io_completion_data {
- int nr; /* input */
- icd_handler *handler; /* input */
-
- int error; /* output */
- unsigned long bytes_done[2]; /* output */
- struct timeval time; /* output */
-};
+typedef int (endio_handler)(struct io_u *);
#define DISK_UTIL_MSEC (250)
@@ -616,9 +608,8 @@
extern struct io_u *__get_io_u(struct thread_data *);
extern struct io_u *get_io_u(struct thread_data *, struct fio_file *);
extern void put_io_u(struct thread_data *, struct io_u *);
-extern void ios_completed(struct thread_data *, struct io_completion_data *);
-extern void io_completed(struct thread_data *, struct io_u *, struct io_completion_data *);
-extern void init_icd(struct io_completion_data *, icd_handler *, int);
+extern long io_u_sync_complete(struct thread_data *, struct io_u *, endio_handler *);
+extern long io_u_queued_complete(struct thread_data *, int, endio_handler *);
/*
* io engine entry points
diff --git a/io_u.c b/io_u.c
index ebcc8bb..b0e91e7 100644
--- a/io_u.c
+++ b/io_u.c
@@ -7,6 +7,15 @@
#include "fio.h"
#include "os.h"
+struct io_completion_data {
+ int nr; /* input */
+ endio_handler *handler; /* input */
+
+ int error; /* output */
+ unsigned long bytes_done[2]; /* output */
+ struct timeval time; /* output */
+};
+
/*
* The ->file_map[] contains a map of blocks we have or have not done io
* to yet. Used to make sure we cover the entire range in a fair fashion.
@@ -374,8 +383,8 @@
return io_u;
}
-void io_completed(struct thread_data *td, struct io_u *io_u,
- struct io_completion_data *icd)
+static void io_completed(struct thread_data *td, struct io_u *io_u,
+ struct io_completion_data *icd)
{
unsigned long msec;
@@ -418,7 +427,8 @@
icd->error = io_u->error;
}
-void init_icd(struct io_completion_data *icd, icd_handler *handler, int nr)
+static void init_icd(struct io_completion_data *icd, endio_handler *handler,
+ int nr)
{
fio_gettime(&icd->time, NULL);
@@ -429,7 +439,8 @@
icd->bytes_done[0] = icd->bytes_done[1] = 0;
}
-void ios_completed(struct thread_data *td, struct io_completion_data *icd)
+static void ios_completed(struct thread_data *td,
+ struct io_completion_data *icd)
{
struct io_u *io_u;
int i;
@@ -441,3 +452,47 @@
put_io_u(td, io_u);
}
}
+
+long io_u_sync_complete(struct thread_data *td, struct io_u *io_u,
+ endio_handler *handler)
+{
+ struct io_completion_data icd;
+
+ init_icd(&icd, handler, 1);
+ io_completed(td, io_u, &icd);
+ put_io_u(td, io_u);
+
+ if (!icd.error)
+ return icd.bytes_done[0] + icd.bytes_done[1];
+
+ td_verror(td, icd.error);
+ return -1;
+}
+
+long io_u_queued_complete(struct thread_data *td, int min_events,
+ endio_handler *handler)
+
+{
+ struct timespec ts = { .tv_sec = 0, .tv_nsec = 0, };
+ struct timespec *tsp = NULL;
+ struct io_completion_data icd;
+ int ret;
+
+ if (min_events > 0)
+ tsp = &ts;
+
+ ret = td_io_getevents(td, min_events, td->cur_depth, tsp);
+ if (ret < 0) {
+ td_verror(td, -ret);
+ return ret;
+ } else if (!ret)
+ return ret;
+
+ init_icd(&icd, handler, ret);
+ ios_completed(td, &icd);
+ if (!icd.error)
+ return icd.bytes_done[0] + icd.bytes_done[1];
+
+ td_verror(td, icd.error);
+ return -1;
+}