Add support for limiting only rate in only one direction

So now you can say 'limit writes to 10MB/sec' and have reads go
full throttle, for instance.

Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
diff --git a/HOWTO b/HOWTO
index 494b3e6..8f73208 100644
--- a/HOWTO
+++ b/HOWTO
@@ -588,19 +588,29 @@
 		defaults to 1 which will make fio wait 'thinktime' usecs
 		after every block.
 
-rate=int	Cap the bandwidth used by this job to this number of KiB/sec.
+rate=int	Cap the bandwidth used by this job. The number is in bytes/sec,
+		the normal postfix rules apply. You can use rate=500k to limit
+		reads and writes to 500k each, or you can specify read and
+		writes separately. Using rate=1m,500k would limit reads to
+		1MB/sec and writes to 500KB/sec. Capping only reads or
+		writes can be done with rate=,500k or rate=500k,. The former
+		will only limit writes (to 500KB/sec), the latter will only
+		limit reads.
 
 ratemin=int	Tell fio to do whatever it can to maintain at least this
 		bandwidth. Failing to meet this requirement, will cause
-		the job to exit.
+		the job to exit. The same format as rate is used for
+		read vs write separation.
 
 rate_iops=int	Cap the bandwidth to this number of IOPS. Basically the same
 		as rate, just specified independently of bandwidth. If the
 		job is given a block size range instead of a fixed value,
-		the smallest block size is used as the metric.
+		the smallest block size is used as the metric. The same format
+		as rate is used for read vs write seperation.
 
 rate_iops_min=int If fio doesn't meet this rate of IO, it will cause
-		the job to exit.
+		the job to exit. The same format as rate is used for read vs
+		write seperation.
 
 ratecycle=int	Average bandwidth for 'rate' and 'ratemin' over this number
 		of milliseconds.
diff --git a/eta.c b/eta.c
index a9082d0..9573e8a 100644
--- a/eta.c
+++ b/eta.c
@@ -168,8 +168,8 @@
 					t_eta -= ramp_left;
 			}
 		}
-		if (td->o.rate) {
-			r_eta = (bytes_total / 1024) / td->o.rate;
+		if (td->o.rate[0] || td->o.rate[1]) {
+			r_eta = (bytes_total / 1024) / (td->o.rate[0] + td->o.rate[1]);
 			r_eta += td->o.start_delay;
 		}
 
@@ -260,10 +260,10 @@
 		    || td->runstate == TD_FSYNCING
 		    || td->runstate == TD_PRE_READING) {
 			nr_running++;
-			t_rate += td->o.rate;
-			m_rate += td->o.ratemin;
-			t_iops += td->o.rate_iops;
-			m_iops += td->o.rate_iops_min;
+			t_rate += td->o.rate[0] + td->o.rate[1];
+			m_rate += td->o.ratemin[0] + td->o.ratemin[1];
+			t_iops += td->o.rate_iops[0] + td->o.rate_iops[1];
+			m_iops += td->o.rate_iops_min[0] + td->o.rate_iops_min[1];
 			files_open += td->nr_open_files;
 		} else if (td->runstate == TD_RAMP) {
 			nr_running++;
@@ -331,9 +331,15 @@
 		return;
 
 	printf("Jobs: %d (f=%d)", nr_running, files_open);
-	if (m_rate || t_rate)
-		printf(", CR=%d/%d KiB/s", t_rate, m_rate);
-	else if (m_iops || t_iops)
+	if (m_rate || t_rate) {
+		char *tr, *mr;
+
+		mr = num2str(m_rate, 4, 0, 1);
+		tr = num2str(t_rate, 4, 0, 1);
+		printf(", CR=%s/%s KiB/s", tr, mr);
+		free(tr);
+		free(mr);
+	} else if (m_iops || t_iops)
 		printf(", CR=%d/%d IOPS", t_iops, m_iops);
 	if (eta_sec != INT_MAX && nr_running) {
 		char perc_str[32];
diff --git a/fio.c b/fio.c
index 6ac8ef0..ebe06d6 100644
--- a/fio.c
+++ b/fio.c
@@ -172,28 +172,19 @@
 	sigaction(SIGQUIT, &act, NULL);
 }
 
-static inline int should_check_rate(struct thread_data *td)
-{
-	struct thread_options *o = &td->o;
-
-	/*
-	 * If some rate setting was given, we need to check it
-	 */
-	if (o->rate || o->ratemin || o->rate_iops || o->rate_iops_min)
-		return 1;
-
-	return 0;
-}
-
 /*
  * Check if we are above the minimum rate given.
  */
-static int check_min_rate(struct thread_data *td, struct timeval *now)
+static int __check_min_rate(struct thread_data *td, struct timeval *now,
+			    enum td_ddir ddir)
 {
 	unsigned long long bytes = 0;
 	unsigned long iops = 0;
 	unsigned long spent;
 	unsigned long rate;
+	unsigned int ratemin = 0;
+	unsigned int rate_iops = 0;
+	unsigned int rate_iops_min = 0;
 
 	/*
 	 * allow a 2 second settle period in the beginning
@@ -201,38 +192,35 @@
 	if (mtime_since(&td->start, now) < 2000)
 		return 0;
 
-	if (td_read(td)) {
-		iops += td->io_blocks[DDIR_READ];
-		bytes += td->this_io_bytes[DDIR_READ];
-	}
-	if (td_write(td)) {
-		iops += td->io_blocks[DDIR_WRITE];
-		bytes += td->this_io_bytes[DDIR_WRITE];
-	}
+	iops += td->io_blocks[ddir];
+	bytes += td->this_io_bytes[ddir];
+	ratemin += td->o.ratemin[ddir];
+	rate_iops += td->o.rate_iops[ddir];
+	rate_iops_min += td->o.rate_iops_min[ddir];
 
 	/*
 	 * if rate blocks is set, sample is running
 	 */
-	if (td->rate_bytes || td->rate_blocks) {
-		spent = mtime_since(&td->lastrate, now);
+	if (td->rate_bytes[ddir] || td->rate_blocks[ddir]) {
+		spent = mtime_since(&td->lastrate[ddir], now);
 		if (spent < td->o.ratecycle)
 			return 0;
 
-		if (td->o.rate) {
+		if (td->o.rate[ddir]) {
 			/*
 			 * check bandwidth specified rate
 			 */
-			if (bytes < td->rate_bytes) {
+			if (bytes < td->rate_bytes[ddir]) {
 				log_err("%s: min rate %u not met\n", td->o.name,
-								td->o.ratemin);
+								ratemin);
 				return 1;
 			} else {
-				rate = (bytes - td->rate_bytes) / spent;
-				if (rate < td->o.ratemin ||
-				    bytes < td->rate_bytes) {
+				rate = ((bytes - td->rate_bytes[ddir]) * 1000) / spent;
+				if (rate < ratemin ||
+				    bytes < td->rate_bytes[ddir]) {
 					log_err("%s: min rate %u not met, got"
 						" %luKiB/sec\n", td->o.name,
-							td->o.ratemin, rate);
+							ratemin, rate);
 					return 1;
 				}
 			}
@@ -240,29 +228,41 @@
 			/*
 			 * checks iops specified rate
 			 */
-			if (iops < td->o.rate_iops) {
+			if (iops < rate_iops) {
 				log_err("%s: min iops rate %u not met\n",
-						td->o.name, td->o.rate_iops);
+						td->o.name, rate_iops);
 				return 1;
 			} else {
-				rate = (iops - td->rate_blocks) / spent;
-				if (rate < td->o.rate_iops_min ||
-				    iops < td->rate_blocks) {
+				rate = ((iops - td->rate_blocks[ddir]) * 1000) / spent;
+				if (rate < rate_iops_min ||
+				    iops < td->rate_blocks[ddir]) {
 					log_err("%s: min iops rate %u not met,"
 						" got %lu\n", td->o.name,
-							td->o.rate_iops_min,
-							rate);
+							rate_iops_min, rate);
 				}
 			}
 		}
 	}
 
-	td->rate_bytes = bytes;
-	td->rate_blocks = iops;
-	memcpy(&td->lastrate, now, sizeof(*now));
+	td->rate_bytes[ddir] = bytes;
+	td->rate_blocks[ddir] = iops;
+	memcpy(&td->lastrate[ddir], now, sizeof(*now));
 	return 0;
 }
 
+static int check_min_rate(struct thread_data *td, struct timeval *now,
+			  unsigned long *bytes_done)
+{
+	int ret = 0;
+
+	if (bytes_done[0])
+		ret |= __check_min_rate(td, now, 0);
+	if (bytes_done[1])
+		ret |= __check_min_rate(td, now, 1);
+
+	return ret;
+}
+
 static inline int runtime_exceeded(struct thread_data *td, struct timeval *t)
 {
 	if (!td->o.timeout)
@@ -286,7 +286,7 @@
 	/*
 	 * get immediately available events, if any
 	 */
-	r = io_u_queued_complete(td, 0);
+	r = io_u_queued_complete(td, 0, NULL);
 	if (r < 0)
 		return;
 
@@ -314,7 +314,7 @@
 	}
 
 	if (td->cur_depth)
-		r = io_u_queued_complete(td, td->cur_depth);
+		r = io_u_queued_complete(td, td->cur_depth, NULL);
 }
 
 /*
@@ -344,7 +344,7 @@
 		put_io_u(td, io_u);
 		return 1;
 	} else if (ret == FIO_Q_QUEUED) {
-		if (io_u_queued_complete(td, 1) < 0)
+		if (io_u_queued_complete(td, 1, NULL) < 0)
 			return 1;
 	} else if (ret == FIO_Q_COMPLETED) {
 		if (io_u->error) {
@@ -352,7 +352,7 @@
 			return 1;
 		}
 
-		if (io_u_sync_complete(td, io_u) < 0)
+		if (io_u_sync_complete(td, io_u, NULL) < 0)
 			return 1;
 	} else if (ret == FIO_Q_BUSY) {
 		if (td_io_commit(td))
@@ -456,7 +456,7 @@
 				requeue_io_u(td, &io_u);
 			} else {
 sync_done:
-				ret = io_u_sync_complete(td, io_u);
+				ret = io_u_sync_complete(td, io_u, NULL);
 				if (ret < 0)
 					break;
 			}
@@ -494,7 +494,7 @@
 				 * and do the verification on them through
 				 * the callback handler
 				 */
-				if (io_u_queued_complete(td, min_events) < 0) {
+				if (io_u_queued_complete(td, min_events, NULL) < 0) {
 					ret = -1;
 					break;
 				}
@@ -508,7 +508,7 @@
 		min_events = td->cur_depth;
 
 		if (min_events)
-			ret = io_u_queued_complete(td, min_events);
+			ret = io_u_queued_complete(td, min_events, NULL);
 	} else
 		cleanup_pending_aio(td);
 
@@ -521,7 +521,6 @@
  */
 static void do_io(struct thread_data *td)
 {
-	unsigned long usec;
 	unsigned int i;
 	int ret = 0;
 
@@ -532,7 +531,7 @@
 
 	while ((td->this_io_bytes[0] + td->this_io_bytes[1]) < td->o.size) {
 		struct timeval comp_time;
-		long bytes_done = 0;
+		unsigned long bytes_done[2] = { 0, 0 };
 		int min_evts = 0;
 		struct io_u *io_u;
 		int ret2, full;
@@ -594,12 +593,13 @@
 				requeue_io_u(td, &io_u);
 			} else {
 sync_done:
-				if (should_check_rate(td))
+				if (__should_check_rate(td, 0) ||
+				    __should_check_rate(td, 1))
 					fio_gettime(&comp_time, NULL);
 
-				bytes_done = io_u_sync_complete(td, io_u);
-				if (bytes_done < 0)
-					ret = bytes_done;
+				ret = io_u_sync_complete(td, io_u, bytes_done);
+				if (ret < 0)
+					break;
 			}
 			break;
 		case FIO_Q_QUEUED:
@@ -635,34 +635,25 @@
 			if (full && !min_evts)
 				min_evts = 1;
 
-			if (should_check_rate(td))
+			if (__should_check_rate(td, 0) ||
+			    __should_check_rate(td, 1))
 				fio_gettime(&comp_time, NULL);
 
 			do {
-				ret = io_u_queued_complete(td, min_evts);
-				if (ret <= 0)
+				ret = io_u_queued_complete(td, min_evts, bytes_done);
+				if (ret < 0)
 					break;
 
-				bytes_done += ret;
 			} while (full && (td->cur_depth > td->o.iodepth_low));
 		}
 
 		if (ret < 0)
 			break;
-		if (!bytes_done)
+		if (!(bytes_done[0] + bytes_done[1]))
 			continue;
 
-		/*
-		 * the rate is batched for now, it should work for batches
-		 * of completions except the very first one which may look
-		 * a little bursty
-		 */
-		if (!in_ramp_time(td) && should_check_rate(td)) {
-			usec = utime_since(&td->tv_cache, &comp_time);
-
-			rate_throttle(td, usec, bytes_done);
-
-			if (check_min_rate(td, &comp_time)) {
+		if (!in_ramp_time(td) && should_check_rate(td, bytes_done)) {
+			if (check_min_rate(td, &comp_time, bytes_done)) {
 				if (exitall_on_terminate)
 					terminate_threads(td->groupid);
 				td_verror(td, EIO, "check_min_rate");
@@ -696,7 +687,7 @@
 
 		i = td->cur_depth;
 		if (i)
-			ret = io_u_queued_complete(td, i);
+			ret = io_u_queued_complete(td, i, NULL);
 
 		if (should_fsync(td) && td->o.end_fsync) {
 			td_set_runstate(td, TD_FSYNCING);
@@ -878,8 +869,8 @@
 	td->ts.stat_io_bytes[0] = td->ts.stat_io_bytes[1] = 0;
 	td->this_io_bytes[0] = td->this_io_bytes[1] = 0;
 	td->zone_bytes = 0;
-	td->rate_bytes = 0;
-	td->rate_blocks = 0;
+	td->rate_bytes[0] = td->rate_bytes[1] = 0;
+	td->rate_blocks[0] = td->rate_blocks[1] = 0;
 
 	td->last_was_sync = 0;
 
@@ -1256,8 +1247,8 @@
 		continue;
 reaped:
 		(*nr_running)--;
-		(*m_rate) -= td->o.ratemin;
-		(*t_rate) -= td->o.rate;
+		(*m_rate) -= (td->o.ratemin[0] + td->o.ratemin[1]);
+		(*t_rate) -= (td->o.rate[0] + td->o.rate[1]);
 		if (!td->pid)
 			pending--;
 
@@ -1512,8 +1503,8 @@
 				td_set_runstate(td, TD_RUNNING);
 			nr_running++;
 			nr_started--;
-			m_rate += td->o.ratemin;
-			t_rate += td->o.rate;
+			m_rate += td->o.ratemin[0] + td->o.ratemin[1];
+			t_rate += td->o.rate[0] + td->o.rate[1];
 			todo--;
 			fio_mutex_up(td->mutex);
 		}
diff --git a/fio.h b/fio.h
index 771df35..8e4f9dd 100644
--- a/fio.h
+++ b/fio.h
@@ -228,11 +228,11 @@
 	char *exec_prerun;
 	char *exec_postrun;
 
-	unsigned int rate;
-	unsigned int ratemin;
+	unsigned int rate[2];
+	unsigned int ratemin[2];
 	unsigned int ratecycle;
-	unsigned int rate_iops;
-	unsigned int rate_iops_min;
+	unsigned int rate_iops[2];
+	unsigned int rate_iops_min[2];
 
 	char *ioscheduler;
 
@@ -309,11 +309,11 @@
 	/*
 	 * Rate state
 	 */
-	unsigned long rate_usec_cycle;
-	long rate_pending_usleep;
-	unsigned long rate_bytes;
-	unsigned long rate_blocks;
-	struct timeval lastrate;
+	unsigned long rate_usec_cycle[2];
+	long rate_pending_usleep[2];
+	unsigned long rate_bytes[2];
+	unsigned long rate_blocks[2];
+	struct timeval lastrate[2];
 
 	unsigned long long total_io_size;
 
@@ -454,7 +454,7 @@
 extern unsigned long mtime_since_genesis(void);
 extern void usec_spin(unsigned int);
 extern void usec_sleep(struct thread_data *, unsigned long);
-extern void rate_throttle(struct thread_data *, unsigned long, unsigned int);
+extern long rate_throttle(struct thread_data *, unsigned long, unsigned long, enum fio_ddir);
 extern void fill_start_time(struct timeval *);
 extern void fio_gettime(struct timeval *, void *);
 extern void fio_gtod_init(void);
@@ -603,4 +603,32 @@
 	return buf;
 }
 
+static inline int __should_check_rate(struct thread_data *td,
+				      enum fio_ddir ddir)
+{
+	struct thread_options *o = &td->o;
+
+	/*
+	 * If some rate setting was given, we need to check it
+	 */
+	if (o->rate[ddir] || o->ratemin[ddir] || o->rate_iops[ddir] ||
+	    o->rate_iops_min[ddir])
+		return 1;
+
+	return 0;
+}
+
+static inline int should_check_rate(struct thread_data *td,
+				    unsigned long *bytes_done)
+{
+	int ret = 0;
+
+	if (bytes_done[0])
+		ret |= __should_check_rate(td, 0);
+	if (bytes_done[1])
+		ret |= __should_check_rate(td, 1);
+
+	return ret;
+}
+
 #endif
diff --git a/init.c b/init.c
index 6be72c1..80c329d 100644
--- a/init.c
+++ b/init.c
@@ -175,38 +175,40 @@
 	thread_number--;
 }
 
-static int setup_rate(struct thread_data *td)
+static int __setup_rate(struct thread_data *td, enum fio_ddir ddir)
 {
-	unsigned long nr_reads_per_msec;
+	unsigned int bs = td->o.min_bs[ddir];
 	unsigned long long rate;
-	unsigned int bs;
+	unsigned long ios_per_msec;
 
-	if (!td->o.rate && !td->o.rate_iops)
-		return 0;
-
-	if (td_rw(td))
-		bs = td->o.rw_min_bs;
-	else if (td_read(td))
-		bs = td->o.min_bs[DDIR_READ];
-	else
-		bs = td->o.min_bs[DDIR_WRITE];
-
-	if (td->o.rate) {
-		rate = td->o.rate;
-		nr_reads_per_msec = (rate * 1024 * 1000LL) / bs;
+	if (td->o.rate[ddir]) {
+		rate = td->o.rate[ddir];
+		ios_per_msec = (rate * 1000LL) / bs;
 	} else
-		nr_reads_per_msec = td->o.rate_iops * 1000UL;
+		ios_per_msec = td->o.rate_iops[ddir] * 1000UL;
 
-	if (!nr_reads_per_msec) {
+	if (!ios_per_msec) {
 		log_err("rate lower than supported\n");
 		return -1;
 	}
 
-	td->rate_usec_cycle = 1000000000ULL / nr_reads_per_msec;
-	td->rate_pending_usleep = 0;
+	td->rate_usec_cycle[ddir] = 1000000000ULL / ios_per_msec;
+	td->rate_pending_usleep[ddir] = 0;
 	return 0;
 }
 
+static int setup_rate(struct thread_data *td)
+{
+	int ret = 0;
+
+	if (td->o.rate[DDIR_READ] || td->o.rate_iops[DDIR_READ])
+		ret = __setup_rate(td, DDIR_READ);
+	if (td->o.rate[DDIR_WRITE] || td->o.rate_iops[DDIR_WRITE])
+		ret |= __setup_rate(td, DDIR_WRITE);
+
+	return ret;
+}
+
 static int fixed_block_size(struct thread_options *o)
 {
 	return o->min_bs[DDIR_READ] == o->max_bs[DDIR_READ] &&
@@ -334,11 +336,15 @@
 	if (o->open_files > o->nr_files || !o->open_files)
 		o->open_files = o->nr_files;
 
-	if ((o->rate && o->rate_iops) || (o->ratemin && o->rate_iops_min)) {
+	if (((o->rate[0] + o->rate[1]) && (o->rate_iops[0] + o->rate_iops[1]))||
+	    ((o->ratemin[0] + o->ratemin[1]) && (o->rate_iops_min[0] +
+		o->rate_iops_min[1]))) {
 		log_err("fio: rate and rate_iops are mutually exclusive\n");
 		return 1;
 	}
-	if ((o->rate < o->ratemin) || (o->rate_iops < o->rate_iops_min)) {
+	if ((o->rate[0] < o->ratemin[0]) || (o->rate[1] < o->ratemin[1]) ||
+	    (o->rate_iops[0] < o->rate_iops_min[0]) ||
+	    (o->rate_iops[1] < o->rate_iops_min[1])) {
 		log_err("fio: minimum rate exceeds rate\n");
 		return 1;
 	}
diff --git a/io_u.c b/io_u.c
index 40fd196..e218a30 100644
--- a/io_u.c
+++ b/io_u.c
@@ -307,6 +307,53 @@
 	return DDIR_WRITE;
 }
 
+static enum fio_ddir rate_ddir(struct thread_data *td, enum fio_ddir ddir)
+{
+	enum fio_ddir odir = ddir ^ 1;
+	struct timeval t;
+	long usec;
+
+	if (td->rate_pending_usleep[ddir] <= 0)
+		return ddir;
+
+	/*
+	 * We have too much pending sleep in this direction. See if we
+	 * should switch.
+	 */
+	if (td_rw(td)) {
+		/*
+		 * Other direction does not have too much pending, switch
+		 */
+		if (td->rate_pending_usleep[odir] < 100000)
+			return odir;
+
+		/*
+		 * Both directions have pending sleep. Sleep the minimum time
+		 * and deduct from both.
+		 */
+		if (td->rate_pending_usleep[ddir] <=
+			td->rate_pending_usleep[odir]) {
+			usec = td->rate_pending_usleep[ddir];
+		} else {
+			usec = td->rate_pending_usleep[odir];
+			ddir = odir;
+		}
+	} else
+		usec = td->rate_pending_usleep[ddir];
+
+	fio_gettime(&t, NULL);
+	usec_sleep(td, usec);
+	usec = utime_since_now(&t);
+
+	td->rate_pending_usleep[ddir] -= usec;
+
+	odir = ddir ^ 1;
+	if (td_rw(td) && __should_check_rate(td, odir))
+		td->rate_pending_usleep[odir] -= usec;
+	
+	return ddir;
+}
+
 /*
  * Return the data direction for the next io_u. If the job is a
  * mixed read/write workload, check the rwmix cycle and switch if
@@ -314,13 +361,13 @@
  */
 static enum fio_ddir get_rw_ddir(struct thread_data *td)
 {
+	enum fio_ddir ddir;
+
 	if (td_rw(td)) {
 		/*
 		 * Check if it's time to seed a new data direction.
 		 */
 		if (td->io_issues[td->rwmix_ddir] >= td->rwmix_issues) {
-			enum fio_ddir ddir;
-
 			/*
 			 * Put a top limit on how many bytes we do for
 			 * one data direction, to avoid overflowing the
@@ -333,11 +380,14 @@
 
 			td->rwmix_ddir = ddir;
 		}
-		return td->rwmix_ddir;
+		ddir = td->rwmix_ddir;
 	} else if (td_read(td))
-		return DDIR_READ;
+		ddir = DDIR_READ;
 	else
-		return DDIR_WRITE;
+		ddir = DDIR_WRITE;
+
+	td->rwmix_ddir = rate_ddir(td, ddir);
+	return td->rwmix_ddir;
 }
 
 static void put_file_log(struct thread_data *td, struct fio_file *f)
@@ -902,7 +952,8 @@
 		td->this_io_bytes[idx] += bytes;
 
 		if (ramp_time_over(td)) {
-			if (!td->o.disable_clat || !td->o.disable_bw)
+			if (!td->o.disable_clat || !td->o.disable_bw ||
+			    __should_check_rate(td, idx))
 				usec = utime_since(&io_u->issue_time,
 							&icd->time);
 
@@ -912,6 +963,10 @@
 			}
 			if (!td->o.disable_bw)
 				add_bw_sample(td, idx, bytes, &icd->time);
+			if (__should_check_rate(td, idx))
+				td->rate_pending_usleep[idx] += (long) td->rate_usec_cycle[idx] - usec;
+			if (__should_check_rate(td, idx ^ 1))
+				td->rate_pending_usleep[idx ^ 1] -= usec;
 		}
 
 		if (td_write(td) && idx == DDIR_WRITE &&
@@ -961,7 +1016,8 @@
 /*
  * Complete a single io_u for the sync engines.
  */
-long io_u_sync_complete(struct thread_data *td, struct io_u *io_u)
+int io_u_sync_complete(struct thread_data *td, struct io_u *io_u,
+		       unsigned long *bytes)
 {
 	struct io_completion_data icd;
 
@@ -969,17 +1025,24 @@
 	io_completed(td, io_u, &icd);
 	put_io_u(td, io_u);
 
-	if (!icd.error)
-		return icd.bytes_done[0] + icd.bytes_done[1];
+	if (icd.error) {
+		td_verror(td, icd.error, "io_u_sync_complete");
+		return -1;
+	}
 
-	td_verror(td, icd.error, "io_u_sync_complete");
-	return -1;
+	if (bytes) {
+		bytes[0] += icd.bytes_done[0];
+		bytes[1] += icd.bytes_done[1];
+	}
+
+	return 0;
 }
 
 /*
  * Called to complete min_events number of io for the async engines.
  */
-long io_u_queued_complete(struct thread_data *td, int min_evts)
+int io_u_queued_complete(struct thread_data *td, int min_evts,
+			 unsigned long *bytes)
 {
 	struct io_completion_data icd;
 	struct timespec *tvp = NULL;
@@ -1000,11 +1063,17 @@
 
 	init_icd(td, &icd, ret);
 	ios_completed(td, &icd);
-	if (!icd.error)
-		return icd.bytes_done[0] + icd.bytes_done[1];
+	if (icd.error) {
+		td_verror(td, icd.error, "io_u_queued_complete");
+		return -1;
+	}
 
-	td_verror(td, icd.error, "io_u_queued_complete");
-	return -1;
+	if (bytes) {
+		bytes[0] += icd.bytes_done[0];
+		bytes[1] += icd.bytes_done[1];
+	}
+
+	return 0;
 }
 
 /*
diff --git a/ioengine.h b/ioengine.h
index 18496c3..9c0ed9a 100644
--- a/ioengine.h
+++ b/ioengine.h
@@ -140,8 +140,8 @@
 extern struct io_u *get_io_u(struct thread_data *);
 extern void put_io_u(struct thread_data *, struct io_u *);
 extern void requeue_io_u(struct thread_data *, struct io_u **);
-extern long __must_check io_u_sync_complete(struct thread_data *, struct io_u *);
-extern long __must_check io_u_queued_complete(struct thread_data *, int);
+extern int __must_check io_u_sync_complete(struct thread_data *, struct io_u *, unsigned long *);
+extern int __must_check io_u_queued_complete(struct thread_data *, int, unsigned long *);
 extern void io_u_queued(struct thread_data *, struct io_u *);
 extern void io_u_log_error(struct thread_data *, struct io_u *);
 extern void io_u_mark_depth(struct thread_data *, unsigned int);
diff --git a/options.c b/options.c
index 9dcef0c..b2dd4de 100644
--- a/options.c
+++ b/options.c
@@ -1259,27 +1259,31 @@
 	},
 	{
 		.name	= "rate",
-		.type	= FIO_OPT_INT,
-		.off1	= td_var_offset(rate),
+		.type	= FIO_OPT_STR_VAL_INT,
+		.off1	= td_var_offset(rate[0]),
+		.off2	= td_var_offset(rate[1]),
 		.help	= "Set bandwidth rate",
 	},
 	{
 		.name	= "ratemin",
-		.type	= FIO_OPT_INT,
-		.off1	= td_var_offset(ratemin),
+		.type	= FIO_OPT_STR_VAL_INT,
+		.off1	= td_var_offset(ratemin[0]),
+		.off2	= td_var_offset(ratemin[1]),
 		.help	= "Job must meet this rate or it will be shutdown",
 		.parent	= "rate",
 	},
 	{
 		.name	= "rate_iops",
-		.type	= FIO_OPT_INT,
-		.off1	= td_var_offset(rate_iops),
+		.type	= FIO_OPT_STR_VAL_INT,
+		.off1	= td_var_offset(rate_iops[0]),
+		.off2	= td_var_offset(rate_iops[1]),
 		.help	= "Limit IO used to this number of IO operations/sec",
 	},
 	{
 		.name	= "rate_iops_min",
-		.type	= FIO_OPT_INT,
-		.off1	= td_var_offset(rate_iops_min),
+		.type	= FIO_OPT_STR_VAL_INT,
+		.off1	= td_var_offset(rate_iops_min[0]),
+		.off2	= td_var_offset(rate_iops_min[1]),
 		.help	= "Job must meet this rate or it will be shutdown",
 		.parent	= "rate_iops",
 	},
diff --git a/time.c b/time.c
index 643fcea..ee9d33f 100644
--- a/time.c
+++ b/time.c
@@ -122,41 +122,28 @@
 	} while (!td->terminate);
 }
 
-void rate_throttle(struct thread_data *td, unsigned long time_spent,
-		   unsigned int bytes)
+long rate_throttle(struct thread_data *td, unsigned long time_spent,
+		   unsigned long bytes, enum fio_ddir ddir)
 {
+	unsigned int bs = td->o.min_bs[ddir];
 	unsigned long usec_cycle;
-	unsigned int bs;
 
-	if (!td->o.rate && !td->o.rate_iops)
-		return;
+	if (!td->o.rate[ddir] && !td->o.rate_iops[ddir])
+		return 0;
 
-	if (td_rw(td))
-		bs = td->o.rw_min_bs;
-	else if (td_read(td))
-		bs = td->o.min_bs[DDIR_READ];
-	else
-		bs = td->o.min_bs[DDIR_WRITE];
-
-	usec_cycle = td->rate_usec_cycle * (bytes / bs);
+	usec_cycle = td->rate_usec_cycle[ddir] * (bytes / bs);
 
 	if (time_spent < usec_cycle) {
 		unsigned long s = usec_cycle - time_spent;
 
-		td->rate_pending_usleep += s;
-
-		if (td->rate_pending_usleep >= 100000) {
-			struct timeval t;
-
-			fio_gettime(&t, NULL);
-			usec_sleep(td, td->rate_pending_usleep);
-			td->rate_pending_usleep -= utime_since_now(&t);
-		}
+		td->rate_pending_usleep[ddir] += s;
 	} else {
 		long overtime = time_spent - usec_cycle;
 
-		td->rate_pending_usleep -= overtime;
+		td->rate_pending_usleep[ddir] -= overtime;
 	}
+
+	return td->rate_pending_usleep[ddir];
 }
 
 unsigned long mtime_since_genesis(void)