client/server: request ETA instead of having the server send it automatically
Also changes the 'serial' of the command to a tag, that's passed
back and forth for commands that need to use it.
Signed-off-by: Jens Axboe <axboe@kernel.dk>
diff --git a/client.c b/client.c
index dbc8522..76935ca 100644
--- a/client.c
+++ b/client.c
@@ -32,15 +32,16 @@
char *name;
int state;
+
int skip_newline;
int is_sock;
+ int waiting_eta;
uint16_t argc;
char **argv;
};
-static struct jobs_eta client_etas;
-static int received_etas;
+static struct timeval eta_tv;
enum {
Client_created = 0,
@@ -50,6 +51,11 @@
Client_exited = 4,
};
+struct client_eta {
+ struct jobs_eta eta;
+ unsigned int pending;
+};
+
static FLIST_HEAD(client_list);
#define FIO_CLIENT_HASH_BITS 7
@@ -104,6 +110,8 @@
fio_client_remove_hash(client);
+ /* FIXME: check ->waiting_eta and handle it */
+
free(client->hostname);
if (client->argv)
free(client->argv);
@@ -310,7 +318,7 @@
}
clp->lines = cpu_to_le16(client->argc);
- ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOBLINE, pdu, mem);
+ ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOBLINE, pdu, mem, 0);
free(pdu);
return ret;
}
@@ -393,7 +401,7 @@
return 1;
}
- ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOB, buf, sb.st_size);
+ ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOB, buf, sb.st_size, 0);
free(buf);
close(fd);
return ret;
@@ -549,9 +557,8 @@
je->eta_sec = le64_to_cpu(je->eta_sec);
}
-static void sum_jobs_eta(struct jobs_eta *je)
+static void sum_jobs_eta(struct jobs_eta *dst, struct jobs_eta *je)
{
- struct jobs_eta *dst = &client_etas;
int i;
dst->nr_running += je->nr_running;
@@ -577,19 +584,17 @@
static void handle_eta(struct fio_net_cmd *cmd)
{
struct jobs_eta *je = (struct jobs_eta *) cmd->payload;
+ struct client_eta *eta = (struct client_eta *) cmd->tag;
+
+ dprint(FD_NET, "client: got eta tag %p, %d\n", eta, eta->pending);
convert_jobs_eta(je);
+ sum_jobs_eta(&eta->eta, je);
- if (nr_clients > 1) {
- sum_jobs_eta(je);
- received_etas++;
- if (received_etas == nr_clients) {
- received_etas = 0;
- display_thread_status(&client_etas);
- memset(&client_etas, 0, sizeof(client_etas));
- }
- } else
- display_thread_status(je);
+ if (!--eta->pending) {
+ display_thread_status(&eta->eta);
+ free(eta);
+ }
}
static void handle_probe(struct fio_client *client, struct fio_net_cmd *cmd)
@@ -679,6 +684,35 @@
return 1;
}
+static void request_client_etas(void)
+{
+ struct fio_client *client;
+ struct flist_head *entry;
+ struct client_eta *eta;
+
+ dprint(FD_NET, "client: request eta (%d)\n", nr_clients);
+
+ /*
+ * We need to do something more clever about checking status
+ * of command being send, client haven't sent previous ETA
+ * already, etc.
+ */
+
+ eta = malloc(sizeof(*eta));
+ memset(&eta->eta, 0, sizeof(eta->eta));
+ eta->pending = nr_clients;
+
+ flist_for_each(entry, &client_list) {
+ client = flist_entry(entry, struct fio_client, list);
+
+ client->waiting_eta = 1;
+ fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_SEND_ETA,
+ (uint64_t) eta);
+ }
+
+ dprint(FD_NET, "client: requested eta tag %p\n", eta);
+}
+
int fio_handle_clients(void)
{
struct fio_client *client;
@@ -686,6 +720,8 @@
struct pollfd *pfds;
int i, ret = 0;
+ gettimeofday(&eta_tv, NULL);
+
pfds = malloc(nr_clients * sizeof(struct pollfd));
while (!exit_backend && nr_clients) {
@@ -701,6 +737,14 @@
assert(i == nr_clients);
do {
+ struct timeval tv;
+
+ gettimeofday(&tv, NULL);
+ if (mtime_since(&eta_tv, &tv) >= 900) {
+ request_client_etas();
+ memcpy(&eta_tv, &tv, sizeof(tv));
+ }
+
ret = poll(pfds, nr_clients, 100);
if (ret < 0) {
if (errno == EINTR)
diff --git a/eta.c b/eta.c
index 5b912eb..b7f1fd6 100644
--- a/eta.c
+++ b/eta.c
@@ -230,7 +230,7 @@
* Print status of the jobs we know about. This includes rate estimates,
* ETA, thread state, etc.
*/
-int calc_thread_status(struct jobs_eta *je)
+int calc_thread_status(struct jobs_eta *je, int force)
{
struct thread_data *td;
int i;
@@ -245,11 +245,13 @@
static struct timeval rate_prev_time, disp_prev_time;
int i2p = 0;
- if (temp_stall_ts || terse_output || eta_print == FIO_ETA_NEVER)
- return 0;
+ if (!force) {
+ if (temp_stall_ts || terse_output || eta_print == FIO_ETA_NEVER)
+ return 0;
- if (!isatty(STDOUT_FILENO) && (eta_print != FIO_ETA_ALWAYS))
- return 0;
+ if (!isatty(STDOUT_FILENO) && (eta_print != FIO_ETA_ALWAYS))
+ return 0;
+ }
if (!rate_io_bytes[0] && !rate_io_bytes[1])
fill_start_time(&rate_prev_time);
@@ -332,7 +334,7 @@
/*
* Allow a little slack, the target is to print it every 1000 msecs
*/
- if (disp_time < 900)
+ if (!force && disp_time < 900)
return 0;
calc_rate(disp_time, io_bytes, disp_io_bytes, je->rate);
@@ -340,7 +342,7 @@
memcpy(&disp_prev_time, &now, sizeof(now));
- if (!je->nr_running && !je->nr_pending)
+ if (!force && !je->nr_running && !je->nr_pending)
return 0;
je->nr_threads = thread_number;
@@ -421,7 +423,7 @@
memset(je, 0, sizeof(*je) + thread_number * sizeof(char));
- if (calc_thread_status(je))
+ if (calc_thread_status(je, 0))
display_thread_status(je);
free(je);
diff --git a/fio.c b/fio.c
index 212b72e..ece897e 100644
--- a/fio.c
+++ b/fio.c
@@ -183,9 +183,7 @@
break;
update_io_ticks();
- if (is_backend)
- fio_server_send_status();
- else
+ if (!is_backend)
print_thread_status();
}
diff --git a/server.c b/server.c
index a54db7b..5afd8d4 100644
--- a/server.c
+++ b/server.c
@@ -98,7 +98,7 @@
cmd->version = le16_to_cpu(cmd->version);
cmd->opcode = le16_to_cpu(cmd->opcode);
cmd->flags = le32_to_cpu(cmd->flags);
- cmd->serial = le64_to_cpu(cmd->serial);
+ cmd->tag = le64_to_cpu(cmd->tag);
cmd->pdu_len = le32_to_cpu(cmd->pdu_len);
switch (cmd->version) {
@@ -205,7 +205,8 @@
cmd->pdu_crc16 = __cpu_to_le16(crc16(cmd->payload, pdu_len));
}
-int fio_net_send_cmd(int fd, uint16_t opcode, const void *buf, off_t size)
+int fio_net_send_cmd(int fd, uint16_t opcode, const void *buf, off_t size,
+ uint64_t tag)
{
struct fio_net_cmd *cmd;
size_t this_len;
@@ -218,7 +219,7 @@
cmd = malloc(sizeof(*cmd) + this_len);
- fio_init_net_cmd(cmd, opcode, buf, this_len);
+ fio_init_net_cmd(cmd, opcode, buf, this_len, tag);
if (this_len < size)
cmd->flags = __cpu_to_le32(FIO_NET_CMD_F_MORE);
@@ -234,11 +235,11 @@
return ret;
}
-int fio_net_send_simple_cmd(int sk, uint16_t opcode, uint64_t serial)
+int fio_net_send_simple_cmd(int sk, uint16_t opcode, uint64_t tag)
{
struct fio_net_cmd cmd;
- fio_init_net_cmd(&cmd, opcode, NULL, 0);
+ fio_init_net_cmd(&cmd, opcode, NULL, 0, tag);
fio_net_cmd_crc(&cmd);
return fio_send_data(sk, &cmd, sizeof(cmd));
@@ -324,7 +325,48 @@
probe.os = FIO_OS;
probe.arch = FIO_ARCH;
- return fio_net_send_cmd(server_fd, FIO_NET_CMD_PROBE, &probe, sizeof(probe));
+ return fio_net_send_cmd(server_fd, FIO_NET_CMD_PROBE, &probe, sizeof(probe), 0);
+}
+
+static int handle_send_eta_cmd(struct fio_net_cmd *cmd)
+{
+ struct jobs_eta *je;
+ size_t size;
+ void *buf;
+ int i;
+
+ size = sizeof(*je) + thread_number * sizeof(char);
+ buf = malloc(size);
+ memset(buf, 0, size);
+ je = buf;
+
+ if (!calc_thread_status(je, 1)) {
+ free(je);
+ return 0;
+ }
+
+ dprint(FD_NET, "server sending status\n");
+
+ je->nr_running = cpu_to_le32(je->nr_running);
+ je->nr_ramp = cpu_to_le32(je->nr_ramp);
+ je->nr_pending = cpu_to_le32(je->nr_pending);
+ je->files_open = cpu_to_le32(je->files_open);
+ je->m_rate = cpu_to_le32(je->m_rate);
+ je->t_rate = cpu_to_le32(je->t_rate);
+ je->m_iops = cpu_to_le32(je->m_iops);
+ je->t_iops = cpu_to_le32(je->t_iops);
+
+ for (i = 0; i < 2; i++) {
+ je->rate[i] = cpu_to_le32(je->rate[i]);
+ je->iops[i] = cpu_to_le32(je->iops[i]);
+ }
+
+ je->elapsed_sec = cpu_to_le32(je->nr_running);
+ je->eta_sec = cpu_to_le64(je->eta_sec);
+
+ fio_net_send_cmd(server_fd, FIO_NET_CMD_ETA, buf, size, cmd->tag);
+ free(je);
+ return 0;
}
static int handle_command(struct fio_net_cmd *cmd)
@@ -349,6 +391,9 @@
case FIO_NET_CMD_PROBE:
ret = handle_probe_cmd(cmd);
break;
+ case FIO_NET_CMD_SEND_ETA:
+ ret = handle_send_eta_cmd(cmd);
+ break;
default:
log_err("fio: unknown opcode: %d\n", cmd->opcode);
ret = 1;
@@ -477,7 +522,7 @@
int fio_server_text_output(const char *buf, unsigned int len)
{
if (server_fd != -1)
- return fio_net_send_cmd(server_fd, FIO_NET_CMD_TEXT, buf, len);
+ return fio_net_send_cmd(server_fd, FIO_NET_CMD_TEXT, buf, len, 0);
return fwrite(buf, len, 1, f_err);
}
@@ -590,7 +635,7 @@
convert_gs(&p.rs, rs);
- fio_net_send_cmd(server_fd, FIO_NET_CMD_TS, &p, sizeof(p));
+ fio_net_send_cmd(server_fd, FIO_NET_CMD_TS, &p, sizeof(p), 0);
}
void fio_server_send_gs(struct group_run_stats *rs)
@@ -600,47 +645,7 @@
dprint(FD_NET, "server sending group run stats\n");
convert_gs(&gs, rs);
- fio_net_send_cmd(server_fd, FIO_NET_CMD_GS, &gs, sizeof(gs));
-}
-
-void fio_server_send_status(void)
-{
- struct jobs_eta *je;
- size_t size;
- void *buf;
- int i;
-
- size = sizeof(*je) + thread_number * sizeof(char);
- buf = malloc(size);
- memset(buf, 0, size);
- je = buf;
-
- if (!calc_thread_status(je)) {
- free(je);
- return;
- }
-
- dprint(FD_NET, "server sending status\n");
-
- je->nr_running = cpu_to_le32(je->nr_running);
- je->nr_ramp = cpu_to_le32(je->nr_ramp);
- je->nr_pending = cpu_to_le32(je->nr_pending);
- je->files_open = cpu_to_le32(je->files_open);
- je->m_rate = cpu_to_le32(je->m_rate);
- je->t_rate = cpu_to_le32(je->t_rate);
- je->m_iops = cpu_to_le32(je->m_iops);
- je->t_iops = cpu_to_le32(je->t_iops);
-
- for (i = 0; i < 2; i++) {
- je->rate[i] = cpu_to_le32(je->rate[i]);
- je->iops[i] = cpu_to_le32(je->iops[i]);
- }
-
- je->elapsed_sec = cpu_to_le32(je->nr_running);
- je->eta_sec = cpu_to_le64(je->eta_sec);
-
- fio_net_send_cmd(server_fd, FIO_NET_CMD_ETA, buf, size);
- free(je);
+ fio_net_send_cmd(server_fd, FIO_NET_CMD_GS, &gs, sizeof(gs), 0);
}
int fio_server_log(const char *format, ...)
diff --git a/server.h b/server.h
index d68cbf5..9f7eeae 100644
--- a/server.h
+++ b/server.h
@@ -14,7 +14,7 @@
uint16_t version; /* protocol version */
uint16_t opcode; /* command opcode */
uint32_t flags; /* modifier flags */
- uint64_t serial; /* serial number */
+ uint64_t tag; /* passed back on reply */
uint32_t pdu_len; /* length of post-cmd layload */
/*
* These must be immediately before the payload, anything before
@@ -26,7 +26,7 @@
};
enum {
- FIO_SERVER_VER = 4,
+ FIO_SERVER_VER = 5,
FIO_SERVER_MAX_PDU = 1024,
@@ -37,10 +37,11 @@
FIO_NET_CMD_TEXT = 5,
FIO_NET_CMD_TS = 6,
FIO_NET_CMD_GS = 7,
- FIO_NET_CMD_ETA = 8,
- FIO_NET_CMD_PROBE = 9,
- FIO_NET_CMD_START = 10,
- FIO_NET_CMD_STOP = 11,
+ FIO_NET_CMD_SEND_ETA = 8,
+ FIO_NET_CMD_ETA = 9,
+ FIO_NET_CMD_PROBE = 10,
+ FIO_NET_CMD_START = 11,
+ FIO_NET_CMD_STOP = 12,
FIO_NET_CMD_F_MORE = 1UL << 0,
@@ -77,8 +78,8 @@
extern int fio_start_server(int);
extern int fio_server_text_output(const char *, unsigned int len);
extern int fio_server_log(const char *format, ...);
-extern int fio_net_send_cmd(int, uint16_t, const void *, off_t);
-extern int fio_net_send_simple_cmd(int sk, uint16_t opcode, uint64_t serial);
+extern int fio_net_send_cmd(int, uint16_t, const void *, off_t, uint64_t);
+extern int fio_net_send_simple_cmd(int sk, uint16_t opcode, uint64_t tag);
extern void fio_server_set_arg(const char *);
extern int fio_server_parse_string(const char *, char **, int *, int *, struct in_addr *);
@@ -86,7 +87,6 @@
struct group_run_stats;
extern void fio_server_send_ts(struct thread_stat *, struct group_run_stats *);
extern void fio_server_send_gs(struct group_run_stats *);
-extern void fio_server_send_status(void);
extern void fio_server_idle_loop(void);
extern int fio_clients_connect(void);
@@ -104,12 +104,14 @@
extern int fio_net_port;
static inline void fio_init_net_cmd(struct fio_net_cmd *cmd, uint16_t opcode,
- const void *pdu, uint32_t pdu_len)
+ const void *pdu, uint32_t pdu_len,
+ uint64_t tag)
{
memset(cmd, 0, sizeof(*cmd));
cmd->version = __cpu_to_le16(FIO_SERVER_VER);
cmd->opcode = cpu_to_le16(opcode);
+ cmd->tag = cpu_to_le64(tag);
if (pdu) {
cmd->pdu_len = cpu_to_le32(pdu_len);
diff --git a/stat.h b/stat.h
index 68bc970..b564ebf 100644
--- a/stat.h
+++ b/stat.h
@@ -190,7 +190,7 @@
extern void show_thread_status(struct thread_stat *ts, struct group_run_stats *rs);
extern void show_group_stats(struct group_run_stats *rs);
-extern int calc_thread_status(struct jobs_eta *je);
+extern int calc_thread_status(struct jobs_eta *je, int force);
extern void display_thread_status(struct jobs_eta *je);
#endif