gfio: start of being able to update options

Not quite done yet, but it's a start.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
diff --git a/client.c b/client.c
index d1357cb..fe1f32b 100644
--- a/client.c
+++ b/client.c
@@ -459,7 +459,7 @@
 	free(lens);
 	clp->lines = cpu_to_le16(client->argc);
 	clp->client_type = __cpu_to_le16(client->type);
-	ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOBLINE, pdu, mem, 0);
+	ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOBLINE, pdu, mem, NULL, NULL);
 	free(pdu);
 	return ret;
 }
@@ -585,7 +585,7 @@
 	pdu->client_type = cpu_to_le32(client->type);
 
 	client->sent_job = 1;
-	ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOB, pdu, p_size, 0);
+	ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOB, pdu, p_size, NULL, NULL);
 	free(pdu);
 	close(fd);
 	return ret;
@@ -617,6 +617,18 @@
 	return !nr_clients;
 }
 
+int fio_client_update_options(struct fio_client *client,
+			      struct thread_options *o, uint64_t *tag)
+{
+	struct cmd_add_job_pdu pdu;
+
+	pdu.thread_number = cpu_to_le32(client->thread_number);
+	pdu.groupid = cpu_to_le32(client->groupid);
+	convert_thread_options_to_net(&pdu.top, o);
+	
+	return fio_net_send_cmd(client->fd, FIO_NET_CMD_UPDATE_JOB, &pdu, sizeof(pdu), tag, &client->cmd_list);
+}
+
 static void convert_io_stat(struct io_stat *dst, struct io_stat *src)
 {
 	dst->max_val	= le64_to_cpu(src->max_val);
@@ -863,26 +875,50 @@
 
 static void remove_reply_cmd(struct fio_client *client, struct fio_net_cmd *cmd)
 {
-	struct fio_net_int_cmd *icmd = NULL;
+	struct fio_net_cmd_reply *reply = NULL;
 	struct flist_head *entry;
 
 	flist_for_each(entry, &client->cmd_list) {
-		icmd = flist_entry(entry, struct fio_net_int_cmd, list);
+		reply = flist_entry(entry, struct fio_net_cmd_reply, list);
 
-		if (cmd->tag == (uintptr_t) icmd)
+		if (cmd->tag == (uintptr_t) reply)
 			break;
 
-		icmd = NULL;
+		reply = NULL;
 	}
 
-	if (!icmd) {
-		log_err("fio: client: unable to find matching tag\n");
+	if (!reply) {
+		log_err("fio: client: unable to find matching tag (%lx)\n", cmd->tag);
 		return;
 	}
 
-	flist_del(&icmd->list);
-	cmd->tag = icmd->saved_tag;
-	free(icmd);
+	flist_del(&reply->list);
+	cmd->tag = reply->saved_tag;
+	free(reply);
+}
+
+int fio_client_wait_for_reply(struct fio_client *client, uint64_t tag)
+{
+	do {
+		struct fio_net_cmd_reply *reply = NULL;
+		struct flist_head *entry;
+
+		flist_for_each(entry, &client->cmd_list) {
+			reply = flist_entry(entry, struct fio_net_cmd_reply, list);
+
+			if (tag == (uintptr_t) reply)
+				break;
+
+			reply = NULL;
+		}
+
+		if (!reply)
+			break;
+
+		usleep(1000);
+	} while (1);
+
+	return 0;
 }
 
 static void handle_eta(struct fio_client *client, struct fio_net_cmd *cmd)
@@ -1130,11 +1166,17 @@
 		free(cmd);
 		break;
 		}
-	case FIO_NET_CMD_ADD_JOB:
+	case FIO_NET_CMD_ADD_JOB: {
+		struct cmd_add_job_pdu *pdu = (struct cmd_add_job_pdu *) cmd->payload;
+
+		client->thread_number = le32_to_cpu(pdu->thread_number);
+		client->groupid = le32_to_cpu(pdu->groupid);
+
 		if (ops->add_job)
 			ops->add_job(client, cmd);
 		free(cmd);
 		break;
+		}
 	case FIO_NET_CMD_IOLOG:
 		if (ops->iolog) {
 			struct cmd_iolog_pdu *pdu;
@@ -1144,6 +1186,11 @@
 		}
 		free(cmd);
 		break;
+	case FIO_NET_CMD_UPDATE_JOB:
+		remove_reply_cmd(client, cmd);
+		ops->update_job(client, cmd);
+		free(cmd);
+		break;
 	default:
 		log_err("fio: unknown client op: %s\n", fio_server_op(cmd->opcode));
 		free(cmd);
@@ -1192,20 +1239,20 @@
 static int client_check_cmd_timeout(struct fio_client *client,
 				    struct timeval *now)
 {
-	struct fio_net_int_cmd *cmd;
+	struct fio_net_cmd_reply *reply;
 	struct flist_head *entry, *tmp;
 	int ret = 0;
 
 	flist_for_each_safe(entry, tmp, &client->cmd_list) {
-		cmd = flist_entry(entry, struct fio_net_int_cmd, list);
+		reply = flist_entry(entry, struct fio_net_cmd_reply, list);
 
-		if (mtime_since(&cmd->tv, now) < FIO_NET_CLIENT_TIMEOUT)
+		if (mtime_since(&reply->tv, now) < FIO_NET_CLIENT_TIMEOUT)
 			continue;
 
 		log_err("fio: client %s, timeout on cmd %s\n", client->hostname,
-						fio_server_op(cmd->cmd.opcode));
-		flist_del(&cmd->list);
-		free(cmd);
+						fio_server_op(reply->opcode));
+		flist_del(&reply->list);
+		free(reply);
 		ret = 1;
 	}
 
diff --git a/client.h b/client.h
index a89afd8..acb5a88 100644
--- a/client.h
+++ b/client.h
@@ -48,6 +48,9 @@
 	int sent_job;
 	uint32_t type;
 
+	uint32_t thread_number;
+	uint32_t groupid;
+
 	struct flist_head eta_list;
 	struct client_eta *eta_in_flight;
 
@@ -77,6 +80,7 @@
 	client_cmd_op		*probe;
 	client_cmd_op		*quit;
 	client_cmd_op		*add_job;
+	client_cmd_op		*update_job;
 	client_timed_out_op	*timed_out;
 	client_cmd_op		*stop;
 	client_cmd_op		*start;
@@ -120,6 +124,8 @@
 extern void fio_clients_terminate(void);
 extern struct fio_client *fio_get_client(struct fio_client *);
 extern void fio_put_client(struct fio_client *);
+extern int fio_client_update_options(struct fio_client *, struct thread_options *, uint64_t *);
+extern int fio_client_wait_for_reply(struct fio_client *, uint64_t);
 
 #define FIO_CLIENT_DEF_ETA_MSEC		900
 
diff --git a/gclient.c b/gclient.c
index d551351..4955ad4 100644
--- a/gclient.c
+++ b/gclient.c
@@ -620,6 +620,15 @@
 	gdk_threads_leave();
 }
 
+static void gfio_update_job_op(struct fio_client *client,
+			       struct fio_net_cmd *cmd)
+{
+	uint32_t *pdu_error = (uint32_t *) cmd->payload;
+	struct gfio_client *gc = client->client_data;
+
+	*pdu_error = le32_to_cpu(*pdu_error);
+}
+
 static void gfio_client_timed_out(struct fio_client *client)
 {
 	struct gfio_client *gc = client->client_data;
@@ -1363,6 +1372,7 @@
 	.probe			= gfio_probe_op,
 	.quit			= gfio_quit_op,
 	.add_job		= gfio_add_job_op,
+	.update_job		= gfio_update_job_op,
 	.timed_out		= gfio_client_timed_out,
 	.stop			= gfio_client_stop,
 	.start			= gfio_client_start,
diff --git a/goptions.c b/goptions.c
index 87d92e6..d17653f 100644
--- a/goptions.c
+++ b/goptions.c
@@ -81,6 +81,8 @@
 	struct gopt_frame_widget g_widgets[__FIO_OPT_G_NR];
 	GtkWidget *widgets[FIO_MAX_OPTS];
 	GtkWidget *vboxes[__FIO_OPT_C_NR];
+	GtkWidget *dialog;
+	struct gfio_client *client;
 	struct flist_head changed_list;
 	struct thread_options *o;
 };
@@ -165,6 +167,15 @@
 	gjv->widgets[idx] = gopt->box;
 }
 
+static void gopt_dialog_update_apply(struct gopt_job_view *gjv)
+{
+	GtkDialog *dialog = GTK_DIALOG(gjv->dialog);
+	gboolean set;
+
+	set = !flist_empty(&gjv->changed_list);
+	gtk_dialog_set_response_sensitive(dialog, GTK_RESPONSE_APPLY, set);
+}
+
 static void gopt_changed(struct gopt *gopt)
 {
 	struct gopt_job_view *gjv = gopt->gjv;
@@ -173,8 +184,10 @@
 	 * Add to changed list. This also prevents the option from being
 	 * freed when the widget is destroyed.
 	 */
-	if (flist_empty(&gopt->changed_list))
+	if (flist_empty(&gopt->changed_list)) {
 		flist_add_tail(&gopt->changed_list, &gjv->changed_list);
+		gopt_dialog_update_apply(gjv);
+	}
 }
 
 static void gopt_str_changed(GtkEntry *entry, gpointer data)
@@ -1139,15 +1152,87 @@
 	g_object_unref(G_OBJECT(gopt->box));
 }
 
-static void gopt_handle_changed_options(struct gopt_job_view *gjv)
+static int gopt_handle_changed_options(struct gopt_job_view *gjv)
 {
+	struct gfio_client *gc = gjv->client;
+	uint64_t waitid = 0;
 	struct gopt *gopt;
+	int ret;
 
 	while (!flist_empty(&gjv->changed_list)) {
 		gopt = flist_entry(gjv->changed_list.next, struct gopt, changed_list);
 		flist_del(&gopt->changed_list);
 		gopt_handle_changed(gopt);
 	}
+
+	gopt_dialog_update_apply(gjv);
+
+	ret = fio_client_update_options(gc->client, gjv->o, &waitid);
+	if (ret)
+		return ret;
+
+	return fio_client_wait_for_reply(gc->client, waitid);
+}
+
+static gint gopt_dialog_cancel(gint response)
+{
+	switch (response) {
+	case GTK_RESPONSE_NONE:
+	case GTK_RESPONSE_REJECT:
+	case GTK_RESPONSE_DELETE_EVENT:
+	case GTK_RESPONSE_CANCEL:
+	case GTK_RESPONSE_NO:
+		return 1;
+	default:
+		return 0;
+	}
+}
+
+static gint gopt_dialog_done(gint response)
+{
+	switch (response) {
+	case GTK_RESPONSE_ACCEPT:
+	case GTK_RESPONSE_OK:
+	case GTK_RESPONSE_YES:
+		return 1;
+	default:
+		return 0;
+	}
+}
+
+
+static void gopt_handle_option_dialog(GtkWidget *dialog,
+				      struct flist_head *gjv_list)
+{
+	struct flist_head *entry;
+	struct gopt_job_view *gjv;
+	gint response;
+
+	do {
+		response = gtk_dialog_run(GTK_DIALOG(dialog));
+		if (gopt_dialog_cancel(response))
+			break;
+		else if (gopt_dialog_done(response))
+			break;
+
+		flist_for_each(entry, gjv_list) {
+			gjv = flist_entry(gjv_list->next, struct gopt_job_view, list);
+
+			gopt_handle_changed_options(gjv);
+		}
+	} while (1);
+
+	if (gopt_dialog_cancel(response))
+		return;
+
+	while (!flist_empty(gjv_list)) {
+		gjv = flist_entry(gjv_list->next, struct gopt_job_view, list);
+
+		gopt_handle_changed_options(gjv);
+
+		flist_del(&gjv->list);
+		free(gjv);
+	}
 }
 
 void gopt_get_options_window(GtkWidget *window, struct gfio_client *gc)
@@ -1169,6 +1254,7 @@
 	dialog = gtk_dialog_new_with_buttons("Fio options",
 			GTK_WINDOW(window), GTK_DIALOG_DESTROY_WITH_PARENT,
 			GTK_STOCK_OK, GTK_RESPONSE_ACCEPT,
+			GTK_STOCK_APPLY, GTK_RESPONSE_APPLY,
 			GTK_STOCK_CANCEL, GTK_RESPONSE_REJECT, NULL);
 
 	gtk_widget_set_size_request(GTK_WIDGET(dialog), 1024, 768);
@@ -1199,23 +1285,17 @@
 		INIT_FLIST_HEAD(&gjv->list);
 		INIT_FLIST_HEAD(&gjv->changed_list);
 		gjv->o = o;
+		gjv->dialog = dialog;
+		gjv->client = gc;
 		flist_add_tail(&gjv->list, &gjv_list);
 		gopt_add_group_tabs(notebook, gjv);
 		gopt_add_options(gjv, o);
+		gopt_dialog_update_apply(gjv);
 	}
 
 	gtk_widget_show_all(dialog);
 
-	gtk_dialog_run(GTK_DIALOG(dialog));
-
-	while (!flist_empty(&gjv_list)) {
-		gjv = flist_entry(gjv_list.next, struct gopt_job_view, list);
-
-		gopt_handle_changed_options(gjv);
-
-		flist_del(&gjv->list);
-		free(gjv);
-	}
+	gopt_handle_option_dialog(dialog, &gjv_list);
 
 	gtk_widget_destroy(dialog);
 }
diff --git a/server.c b/server.c
index 6e736bb..5f00550 100644
--- a/server.c
+++ b/server.c
@@ -293,6 +293,33 @@
 	return cmdret;
 }
 
+static void add_reply(uint64_t tag, struct flist_head *list)
+{
+	struct fio_net_cmd_reply *reply = (struct fio_net_cmd_reply *) tag;
+
+	flist_add_tail(&reply->list, list);
+}
+
+static uint64_t alloc_reply(uint64_t tag, uint16_t opcode)
+{
+	struct fio_net_cmd_reply *reply;
+
+	reply = calloc(1, sizeof(*reply));
+	INIT_FLIST_HEAD(&reply->list);
+	gettimeofday(&reply->tv, NULL);
+	reply->saved_tag = tag;
+	reply->opcode = opcode;
+
+	return (uintptr_t) reply;
+}
+
+static void free_reply(uint64_t tag)
+{
+	struct fio_net_cmd_reply *reply = (struct fio_net_cmd_reply *) tag;
+
+	free(reply);
+}
+
 void fio_net_cmd_crc_pdu(struct fio_net_cmd *cmd, const void *pdu)
 {
 	uint32_t pdu_len;
@@ -309,12 +336,19 @@
 }
 
 int fio_net_send_cmd(int fd, uint16_t opcode, const void *buf, off_t size,
-		     uint64_t tag)
+		     uint64_t *tagptr, struct flist_head *list)
 {
 	struct fio_net_cmd *cmd = NULL;
 	size_t this_len, cur_len = 0;
+	uint64_t tag;
 	int ret;
 
+	if (list) {
+		assert(tagptr);
+		tag = *tagptr = alloc_reply(*tagptr, opcode);
+	} else
+		tag = tagptr ? *tagptr : 0;
+
 	do {
 		this_len = size;
 		if (this_len > FIO_SERVER_MAX_FRAGMENT_PDU)
@@ -340,6 +374,13 @@
 		buf += this_len;
 	} while (!ret && size);
 
+	if (list) {
+		if (ret)
+			free_reply(tag);
+		else
+			add_reply(tag, list);
+	}
+
 	if (cmd)
 		free(cmd);
 
@@ -363,28 +404,22 @@
 int fio_net_send_simple_cmd(int sk, uint16_t opcode, uint64_t tag,
 			    struct flist_head *list)
 {
-	struct fio_net_int_cmd *cmd;
 	int ret;
 
-	if (!list)
-		return fio_net_send_simple_stack_cmd(sk, opcode, tag);
+	if (list)
+		tag = alloc_reply(tag, opcode);
 
-	cmd = malloc(sizeof(*cmd));
-
-	fio_init_net_cmd(&cmd->cmd, opcode, NULL, 0, (uintptr_t) cmd);
-	fio_net_cmd_crc(&cmd->cmd);
-
-	INIT_FLIST_HEAD(&cmd->list);
-	gettimeofday(&cmd->tv, NULL);
-	cmd->saved_tag = tag;
-
-	ret = fio_send_data(sk, &cmd->cmd, sizeof(cmd->cmd));
+	ret = fio_net_send_simple_stack_cmd(sk, opcode, tag);
 	if (ret) {
-		free(cmd);
+		if (list)
+			free_reply(tag);
+
 		return ret;
 	}
 
-	flist_add_tail(&cmd->list, list);
+	if (list)
+		add_reply(tag, list);
+
 	return 0;
 }
 
@@ -406,7 +441,7 @@
 
 	epdu.error = __cpu_to_le32(error);
 	epdu.signal = __cpu_to_le32(signal);
-	return fio_net_send_cmd(sk, FIO_NET_CMD_STOP, &epdu, sizeof(epdu), tag);
+	return fio_net_send_cmd(sk, FIO_NET_CMD_STOP, &epdu, sizeof(epdu), &tag, NULL);
 }
 
 int fio_net_send_stop(int sk, int error, int signal)
@@ -533,7 +568,7 @@
 	}
 
 	spdu.jobs = cpu_to_le32(thread_number);
-	fio_net_send_cmd(server_fd, FIO_NET_CMD_START, &spdu, sizeof(spdu), 0);
+	fio_net_send_cmd(server_fd, FIO_NET_CMD_START, &spdu, sizeof(spdu), NULL, NULL);
 	return 0;
 }
 
@@ -572,13 +607,14 @@
 	free(argv);
 
 	spdu.jobs = cpu_to_le32(thread_number);
-	fio_net_send_cmd(server_fd, FIO_NET_CMD_START, &spdu, sizeof(spdu), 0);
+	fio_net_send_cmd(server_fd, FIO_NET_CMD_START, &spdu, sizeof(spdu), NULL, NULL);
 	return 0;
 }
 
 static int handle_probe_cmd(struct fio_net_cmd *cmd)
 {
 	struct cmd_probe_pdu probe;
+	uint64_t tag = cmd->tag;
 
 	dprint(FD_NET, "server: sending probe reply\n");
 
@@ -596,13 +632,14 @@
 
 	probe.bpp	= sizeof(void *);
 
-	return fio_net_send_cmd(server_fd, FIO_NET_CMD_PROBE, &probe, sizeof(probe), cmd->tag);
+	return fio_net_send_cmd(server_fd, FIO_NET_CMD_PROBE, &probe, sizeof(probe), &tag, NULL);
 }
 
 static int handle_send_eta_cmd(struct fio_net_cmd *cmd)
 {
 	struct jobs_eta *je;
 	size_t size;
+	uint64_t tag = cmd->tag;
 	int i;
 
 	if (!thread_number)
@@ -637,11 +674,20 @@
 	je->eta_sec		= cpu_to_le64(je->eta_sec);
 	je->nr_threads		= cpu_to_le32(je->nr_threads);
 
-	fio_net_send_cmd(server_fd, FIO_NET_CMD_ETA, je, size, cmd->tag);
+	fio_net_send_cmd(server_fd, FIO_NET_CMD_ETA, je, size, &tag, NULL);
 	free(je);
 	return 0;
 }
 
+static int send_update_job_reply(int fd, uint64_t __tag, int error)
+{
+	uint64_t tag = __tag;
+	uint32_t pdu_error;
+
+	pdu_error = __cpu_to_le32(error);
+	return fio_net_send_cmd(fd, FIO_NET_CMD_UPDATE_JOB, &pdu_error, sizeof(pdu_error), &tag, NULL);
+}
+
 static int handle_update_job_cmd(struct fio_net_cmd *cmd)
 {
 	struct cmd_add_job_pdu *pdu = (struct cmd_add_job_pdu *) cmd->payload;
@@ -653,13 +699,13 @@
 	dprint(FD_NET, "server: updating options for job %u\n", tnumber);
 
 	if (tnumber >= thread_number) {
-		fio_net_send_ack(server_fd, cmd, ENODEV, 0);
+		send_update_job_reply(server_fd, cmd->tag, ENODEV);
 		return 0;
 	}
 
 	td = &threads[tnumber];
 	convert_thread_options_to_cpu(&td->o, &pdu->top);
-	fio_net_send_ack(server_fd, cmd, 0, 0);
+	send_update_job_reply(server_fd, cmd->tag, 0);
 	return 0;
 }
 
@@ -858,7 +904,7 @@
 
 	memcpy(pdu->buf, buf, len);
 
-	fio_net_send_cmd(server_fd, FIO_NET_CMD_TEXT, pdu, tlen, 0);
+	fio_net_send_cmd(server_fd, FIO_NET_CMD_TEXT, pdu, tlen, NULL, NULL);
 	free(pdu);
 	return len;
 }
@@ -973,7 +1019,7 @@
 
 	convert_gs(&p.rs, rs);
 
-	fio_net_send_cmd(server_fd, FIO_NET_CMD_TS, &p, sizeof(p), 0);
+	fio_net_send_cmd(server_fd, FIO_NET_CMD_TS, &p, sizeof(p), NULL, NULL);
 }
 
 void fio_server_send_gs(struct group_run_stats *rs)
@@ -983,7 +1029,7 @@
 	dprint(FD_NET, "server sending group run stats\n");
 
 	convert_gs(&gs, rs);
-	fio_net_send_cmd(server_fd, FIO_NET_CMD_GS, &gs, sizeof(gs), 0);
+	fio_net_send_cmd(server_fd, FIO_NET_CMD_GS, &gs, sizeof(gs), NULL, NULL);
 }
 
 static void convert_agg(struct disk_util_agg *dst, struct disk_util_agg *src)
@@ -1037,7 +1083,7 @@
 		convert_dus(&pdu.dus, &du->dus);
 		convert_agg(&pdu.agg, &du->agg);
 
-		fio_net_send_cmd(server_fd, FIO_NET_CMD_DU, &pdu, sizeof(pdu), 0);
+		fio_net_send_cmd(server_fd, FIO_NET_CMD_DU, &pdu, sizeof(pdu), NULL, NULL);
 	}
 }
 
@@ -1148,7 +1194,7 @@
 	pdu.groupid = cpu_to_le32(td->groupid);
 	convert_thread_options_to_net(&pdu.top, &td->o);
 
-	fio_net_send_cmd(server_fd, FIO_NET_CMD_ADD_JOB, &pdu, sizeof(pdu), 0);
+	fio_net_send_cmd(server_fd, FIO_NET_CMD_ADD_JOB, &pdu, sizeof(pdu), NULL, NULL);
 }
 
 void fio_server_send_start(struct thread_data *td)
diff --git a/server.h b/server.h
index dbe7cd4..d56d498 100644
--- a/server.h
+++ b/server.h
@@ -30,11 +30,11 @@
 	uint8_t payload[0];	/* payload */
 };
 
-struct fio_net_int_cmd {
-	struct fio_net_cmd cmd;
+struct fio_net_cmd_reply {
 	struct flist_head list;
 	struct timeval tv;
 	uint64_t saved_tag;
+	uint16_t opcode;
 };
 
 enum {
@@ -144,7 +144,7 @@
 
 extern int fio_start_server(char *);
 extern int fio_server_text_output(int, const char *, size_t);
-extern int fio_net_send_cmd(int, uint16_t, const void *, off_t, uint64_t);
+extern int fio_net_send_cmd(int, uint16_t, const void *, off_t, uint64_t *, struct flist_head *);
 extern int fio_net_send_simple_cmd(int, uint16_t, uint64_t, struct flist_head *);
 extern void fio_server_set_arg(const char *);
 extern int fio_server_parse_string(const char *, char **, int *, int *, struct in_addr *, struct in6_addr *, int *);