client/server: track and handle command timeouts

Signed-off-by: Jens Axboe <axboe@kernel.dk>
diff --git a/client.c b/client.c
index 358903b..130aeaf 100644
--- a/client.c
+++ b/client.c
@@ -45,6 +45,8 @@
 	struct flist_head eta_list;
 	struct client_eta *eta_in_flight;
 
+	struct flist_head cmd_list;
+
 	uint16_t argc;
 	char **argv;
 };
@@ -188,6 +190,7 @@
 	INIT_FLIST_HEAD(&client->hash_list);
 	INIT_FLIST_HEAD(&client->arg_list);
 	INIT_FLIST_HEAD(&client->eta_list);
+	INIT_FLIST_HEAD(&client->cmd_list);
 
 	if (fio_server_parse_string(hostname, &client->hostname,
 					&client->is_sock, &client->port,
@@ -266,6 +269,8 @@
 	else
 		fd = fio_client_connect_ip(client);
 
+	dprint(FD_NET, "client: %s connected %d\n", client->hostname, fd);
+
 	if (fd < 0)
 		return 1;
 
@@ -285,7 +290,7 @@
 	flist_for_each(entry, &client_list) {
 		client = flist_entry(entry, struct fio_client, list);
 
-		fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_QUIT, 0);
+		fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_QUIT, 0, NULL);
 	}
 }
 
@@ -314,8 +319,7 @@
 {
 	dprint(FD_NET, "client: send probe\n");
 
-	fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_PROBE, 0);
-	handle_client(client);
+	fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_PROBE, 0, &client->cmd_list);
 }
 
 static int send_client_cmd_line(struct fio_client *client)
@@ -631,6 +635,30 @@
 	}
 }
 
+static void remove_reply_cmd(struct fio_client *client, struct fio_net_cmd *cmd)
+{
+	struct fio_net_int_cmd *icmd = NULL;
+	struct flist_head *entry;
+
+	flist_for_each(entry, &client->cmd_list) {
+		icmd = flist_entry(entry, struct fio_net_int_cmd, list);
+
+		if (cmd->tag == (uint64_t) icmd)
+			break;
+
+		icmd = NULL;
+	}
+
+	if (!icmd) {
+		log_err("fio: client: unable to find matching tag\n");
+		return;
+	}
+
+	flist_del(&icmd->list);
+	cmd->tag = icmd->saved_tag;
+	free(icmd);
+}
+
 static void handle_eta(struct fio_client *client, struct fio_net_cmd *cmd)
 {
 	struct jobs_eta *je = (struct jobs_eta *) cmd->payload;
@@ -679,8 +707,8 @@
 	if (!cmd)
 		return 0;
 
-	dprint(FD_NET, "client: got cmd op %d from %s\n",
-					cmd->opcode, client->hostname);
+	dprint(FD_NET, "client: got cmd op %s from %s\n",
+				fio_server_op(cmd->opcode), client->hostname);
 
 	switch (cmd->opcode) {
 	case FIO_NET_CMD_QUIT:
@@ -711,10 +739,12 @@
 		free(cmd);
 		break;
 	case FIO_NET_CMD_ETA:
+		remove_reply_cmd(client, cmd);
 		handle_eta(client, cmd);
 		free(cmd);
 		break;
 	case FIO_NET_CMD_PROBE:
+		remove_reply_cmd(client, cmd);
 		handle_probe(client, cmd);
 		free(cmd);
 		break;
@@ -727,7 +757,7 @@
 		free(cmd);
 		break;
 	default:
-		log_err("fio: unknown client op: %d\n", cmd->opcode);
+		log_err("fio: unknown client op: %s\n", fio_server_op(cmd->opcode));
 		free(cmd);
 		break;
 	}
@@ -760,7 +790,7 @@
 		flist_add_tail(&client->eta_list, &eta_list);
 		client->eta_in_flight = eta;
 		fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_SEND_ETA,
-						(uint64_t) eta);
+					(uint64_t) eta, &client->cmd_list);
 	}
 
 	while (skipped--)
@@ -769,6 +799,55 @@
 	dprint(FD_NET, "client: requested eta tag %p\n", eta);
 }
 
+static int client_check_cmd_timeout(struct fio_client *client,
+				    struct timeval *now)
+{
+	struct fio_net_int_cmd *cmd;
+	struct flist_head *entry, *tmp;
+	int ret = 0;
+
+	flist_for_each_safe(entry, tmp, &client->cmd_list) {
+		cmd = flist_entry(entry, struct fio_net_int_cmd, list);
+
+		if (mtime_since(&cmd->tv, now) < FIO_NET_CLIENT_TIMEOUT)
+			continue;
+
+		log_err("fio: client %s, timeout on cmd %s\n", client->hostname,
+						fio_server_op(cmd->cmd.opcode));
+		flist_del(&cmd->list);
+		free(cmd);
+		ret = 1;
+	}
+
+	return flist_empty(&client->cmd_list) && ret;
+}
+
+static int fio_client_timed_out(void)
+{
+	struct fio_client *client;
+	struct flist_head *entry, *tmp;
+	struct timeval tv;
+	int ret = 0;
+
+	gettimeofday(&tv, NULL);
+
+	flist_for_each_safe(entry, tmp, &client_list) {
+		client = flist_entry(entry, struct fio_client, list);
+
+		if (flist_empty(&client->cmd_list))
+			continue;
+
+		if (!client_check_cmd_timeout(client, &tv))
+			continue;
+
+		log_err("fio: client %s timed out\n", client->hostname);
+		remove_client(client);
+		ret = 1;
+	}
+
+	return ret;
+}
+
 int fio_handle_clients(void)
 {
 	struct fio_client *client;
@@ -799,6 +878,9 @@
 			if (mtime_since(&eta_tv, &tv) >= 900) {
 				request_client_etas();
 				memcpy(&eta_tv, &tv, sizeof(tv));
+
+				if (fio_client_timed_out())
+					break;
 			}
 
 			ret = poll(pfds, nr_clients, 100);
diff --git a/server.c b/server.c
index 5506ca9..339bb66 100644
--- a/server.c
+++ b/server.c
@@ -33,6 +33,33 @@
 static char *bind_sock;
 static struct sockaddr_in saddr_in;
 
+static const char *fio_server_ops[FIO_NET_CMD_NR] = {
+	"",
+	"QUIT",
+	"EXIT",
+	"JOB",
+	"JOBLINE",
+	"TEXT",
+	"TS",
+	"GS",
+	"SEND_ETA",
+	"ETA",
+	"PROBE",
+	"START",
+	"STOP"
+};
+
+const char *fio_server_op(unsigned int op)
+{
+	static char buf[32];
+
+	if (op < FIO_NET_CMD_NR)
+		return fio_server_ops[op];
+
+	sprintf(buf, "UNKNOWN/%d", op);
+	return buf;
+}
+
 int fio_send_data(int sk, const void *p, unsigned int len)
 {
 	assert(len <= sizeof(struct fio_net_cmd) + FIO_SERVER_MAX_PDU);
@@ -243,7 +270,7 @@
 	return ret;
 }
 
-int fio_net_send_simple_cmd(int sk, uint16_t opcode, uint64_t tag)
+static int fio_net_send_simple_stack_cmd(int sk, uint16_t opcode, uint64_t tag)
 {
 	struct fio_net_cmd cmd;
 
@@ -253,10 +280,42 @@
 	return fio_send_data(sk, &cmd, sizeof(cmd));
 }
 
+/*
+ * If 'list' is non-NULL, then allocate and store the sent command for
+ * later verification.
+ */
+int fio_net_send_simple_cmd(int sk, uint16_t opcode, uint64_t tag,
+			    struct flist_head *list)
+{
+	struct fio_net_int_cmd *cmd;
+	int ret;
+
+	if (!list)
+		return fio_net_send_simple_stack_cmd(sk, opcode, tag);
+
+	cmd = malloc(sizeof(*cmd));
+
+	fio_init_net_cmd(&cmd->cmd, opcode, NULL, 0, (uint64_t) cmd);
+	fio_net_cmd_crc(&cmd->cmd);
+
+	INIT_FLIST_HEAD(&cmd->list);
+	gettimeofday(&cmd->tv, NULL);
+	cmd->saved_tag = tag;
+
+	ret = fio_send_data(sk, &cmd->cmd, sizeof(cmd->cmd));
+	if (ret) {
+		free(cmd);
+		return ret;
+	}
+
+	flist_add_tail(&cmd->list, list);
+	return 0;
+}
+
 static int fio_server_send_quit_cmd(void)
 {
 	dprint(FD_NET, "server: sending quit\n");
-	return fio_net_send_simple_cmd(server_fd, FIO_NET_CMD_QUIT, 0);
+	return fio_net_send_simple_cmd(server_fd, FIO_NET_CMD_QUIT, 0, NULL);
 }
 
 static int handle_job_cmd(struct fio_net_cmd *cmd)
@@ -269,7 +328,7 @@
 		return -1;
 	}
 
-	fio_net_send_simple_cmd(server_fd, FIO_NET_CMD_START, 0);
+	fio_net_send_simple_cmd(server_fd, FIO_NET_CMD_START, 0, NULL);
 
 	ret = exec_run();
 	fio_server_send_quit_cmd();
@@ -309,7 +368,7 @@
 
 	free(argv);
 
-	fio_net_send_simple_cmd(server_fd, FIO_NET_CMD_START, 0);
+	fio_net_send_simple_cmd(server_fd, FIO_NET_CMD_START, 0, NULL);
 
 	ret = exec_run();
 	fio_server_send_quit_cmd();
@@ -321,6 +380,8 @@
 {
 	struct cmd_probe_pdu probe;
 
+	dprint(FD_NET, "server: sending probe reply\n");
+
 	memset(&probe, 0, sizeof(probe));
 	gethostname((char *) probe.hostname, sizeof(probe.hostname));
 #ifdef FIO_BIG_ENDIAN
@@ -333,7 +394,7 @@
 	probe.os	= FIO_OS;
 	probe.arch	= FIO_ARCH;
 
-	return fio_net_send_cmd(server_fd, FIO_NET_CMD_PROBE, &probe, sizeof(probe), 0);
+	return fio_net_send_cmd(server_fd, FIO_NET_CMD_PROBE, &probe, sizeof(probe), cmd->tag);
 }
 
 static int handle_send_eta_cmd(struct fio_net_cmd *cmd)
@@ -379,7 +440,8 @@
 {
 	int ret;
 
-	dprint(FD_NET, "server: got opcode %d, pdu=%u\n", cmd->opcode, cmd->pdu_len);
+	dprint(FD_NET, "server: got op [%s], pdu=%u, tag=%lx\n",
+			fio_server_op(cmd->opcode), cmd->pdu_len, cmd->tag);
 
 	switch (cmd->opcode) {
 	case FIO_NET_CMD_QUIT:
@@ -401,7 +463,7 @@
 		ret = handle_send_eta_cmd(cmd);
 		break;
 	default:
-		log_err("fio: unknown opcode: %d\n", cmd->opcode);
+		log_err("fio: unknown opcode: %s\n",fio_server_op(cmd->opcode));
 		ret = 1;
 	}
 
@@ -758,7 +820,7 @@
 
 	log_info("fio: server listening on %s\n", bind_str);
 
-	if (listen(sk, 1) < 0) {
+	if (listen(sk, 0) < 0) {
 		log_err("fio: listen: %s\n", strerror(errno));
 		return -1;
 	}
diff --git a/server.h b/server.h
index ea888af..cd07a85 100644
--- a/server.h
+++ b/server.h
@@ -3,6 +3,7 @@
 
 #include <inttypes.h>
 #include <string.h>
+#include <sys/time.h>
 
 #include "stat.h"
 #include "os/os.h"
@@ -25,6 +26,13 @@
 	uint8_t payload[0];	/* payload */
 };
 
+struct fio_net_int_cmd {
+	struct fio_net_cmd cmd;
+	struct flist_head list;
+	struct timeval tv;
+	uint64_t saved_tag;
+};
+
 enum {
 	FIO_SERVER_VER		= 5,
 
@@ -42,12 +50,15 @@
 	FIO_NET_CMD_PROBE	= 10,
 	FIO_NET_CMD_START	= 11,
 	FIO_NET_CMD_STOP	= 12,
+	FIO_NET_CMD_NR		= 13,
 
 	FIO_NET_CMD_F_MORE	= 1UL << 0,
 
 	/* crc does not include the crc fields */
 	FIO_NET_CMD_CRC_SZ	= sizeof(struct fio_net_cmd) -
 					2 * sizeof(uint16_t),
+
+	FIO_NET_CLIENT_TIMEOUT	= 5000,
 };
 
 struct cmd_ts_pdu {
@@ -79,9 +90,10 @@
 extern int fio_server_text_output(const char *, size_t);
 extern int fio_server_log(const char *format, ...);
 extern int fio_net_send_cmd(int, uint16_t, const void *, off_t, uint64_t);
-extern int fio_net_send_simple_cmd(int sk, uint16_t opcode, uint64_t tag);
+extern int fio_net_send_simple_cmd(int, uint16_t, uint64_t, struct flist_head *);
 extern void fio_server_set_arg(const char *);
 extern int fio_server_parse_string(const char *, char **, int *, int *, struct in_addr *);
+extern const char *fio_server_op(unsigned int);
 
 struct thread_stat;
 struct group_run_stats;