Token-based flow control

This patch allows two fio jobs to be kept to a certain
proportion of each other using token-based flow control.
There are three new parameters: flow, flow_watermark, and
flow_sleep, documented in the fio options. An example of an fio
job using these parameters is below:

[global]
norandommap
thread
time_based
runtime=30
direct=1
ioengine=libaio
iodepth=256
size=100g
bs=8k
filename=/tmp/testfile
flow_watermark=100
flow_sleep=1000

[job2]
numjobs=1
rw=write
flow=-8

[job1]
numjobs=1
rw=randread
flow=1

The motivating application of this patch was to allow random reads
and sequential writes at a particular given proportion.

This initial version is only correct when run with 'thread', as shared
state is represented with a global variable. It also only allows two
jobs to be synchronized properly. A future version might do more, but
no more functionality was needed for my application.

Tested: Ran a few fio jobs with this flow control, observing
the proportion of IOPS to match what was intended by the job file.
Varied the flow_watermark and flow_sleep parameters and observed
the effect on throughput.

Signed-off-by: Dan Ehrenberg <dehrenberg@google.com>

Modified by me to support flow_id, so an arbitrary number of flows can
be used. This means it no longer relies on global context, so it can be
used from a thread or process alike. Also added man page documentation.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
diff --git a/flow.c b/flow.c
new file mode 100644
index 0000000..e5c4a40
--- /dev/null
+++ b/flow.c
@@ -0,0 +1,104 @@
+#include "fio.h"
+#include "mutex.h"
+#include "smalloc.h"
+#include "flist.h"
+
+struct fio_flow {
+	unsigned int refs;
+	struct flist_head list;
+	unsigned int id;
+	long long int flow_counter;
+};
+
+static struct flist_head *flow_list;
+static struct fio_mutex *flow_lock;
+
+int flow_threshold_exceeded(struct thread_data *td)
+{
+	struct fio_flow *flow = td->flow;
+	int sign;
+
+	if (!flow)
+		return 0;
+
+	sign = td->o.flow > 0 ? 1 : -1;
+	if (sign * flow->flow_counter > td->o.flow_watermark) {
+		if (td->o.flow_sleep)
+			usleep(td->o.flow_sleep);
+		return 1;
+	}
+
+	/* No synchronization needed because it doesn't
+	 * matter if the flow count is slightly inaccurate */
+	flow->flow_counter += td->o.flow;
+	return 0;
+}
+
+static struct fio_flow *flow_get(unsigned int id)
+{
+	struct fio_flow *flow;
+	struct flist_head *n;
+
+	fio_mutex_down(flow_lock);
+
+	flist_for_each(n, flow_list) {
+		flow = flist_entry(n, struct fio_flow, list);
+		if (flow->id == id)
+			break;
+
+		flow = NULL;
+	}
+
+	if (!flow) {
+		flow = smalloc(sizeof(*flow));
+		flow->refs = 0;
+		INIT_FLIST_HEAD(&flow->list);
+		flow->id = id;
+		flow->flow_counter = 0;
+
+		flist_add_tail(&flow->list, flow_list);
+	}
+
+	flow->refs++;
+	fio_mutex_up(flow_lock);
+	return flow;
+}
+
+static void flow_put(struct fio_flow *flow)
+{
+	fio_mutex_down(flow_lock);
+
+	if (!--flow->refs) {
+		flist_del(&flow->list);
+		sfree(flow);
+	}
+
+	fio_mutex_up(flow_lock);
+}
+
+void flow_init_job(struct thread_data *td)
+{
+	if (td->o.flow)
+		td->flow = flow_get(td->o.flow_id);
+}
+
+void flow_exit_job(struct thread_data *td)
+{
+	if (td->flow) {
+		flow_put(td->flow);
+		td->flow = NULL;
+	}
+}
+
+void flow_init(void)
+{
+	flow_lock = fio_mutex_init(1);
+	flow_list = smalloc(sizeof(*flow_list));
+	INIT_FLIST_HEAD(flow_list);
+}
+
+void flow_exit(void)
+{
+	fio_mutex_remove(flow_lock);
+	sfree(flow_list);
+}