Merge branch 'master' into alarm_openloop
diff --git a/Makefile b/Makefile
index 3c215b3..b377448 100644
--- a/Makefile
+++ b/Makefile
@@ -1597,6 +1597,8 @@
 	$(Q) $(BINDIR)/$(CONFIG)/interop_test || ( echo test interop_test failed ; exit 1 )
 	$(E) "[RUN]     Testing mock_test"
 	$(Q) $(BINDIR)/$(CONFIG)/mock_test || ( echo test mock_test failed ; exit 1 )
+	$(E) "[RUN]     Testing qps_openloop_test"
+	$(Q) $(BINDIR)/$(CONFIG)/qps_openloop_test || ( echo test qps_openloop_test failed ; exit 1 )
 	$(E) "[RUN]     Testing qps_test"
 	$(Q) $(BINDIR)/$(CONFIG)/qps_test || ( echo test qps_test failed ; exit 1 )
 	$(E) "[RUN]     Testing secure_auth_context_test"
diff --git a/build.yaml b/build.yaml
index 7f33ef3..eacd2f0 100644
--- a/build.yaml
+++ b/build.yaml
@@ -2299,7 +2299,6 @@
   - posix
 - name: qps_openloop_test
   build: test
-  run: false
   language: c++
   src:
   - test/cpp/qps/qps_openloop_test.cc
diff --git a/src/core/surface/alarm.c b/src/core/surface/alarm.c
index d753023..fb496f6 100644
--- a/src/core/surface/alarm.c
+++ b/src/core/surface/alarm.c
@@ -63,9 +63,9 @@
   alarm->cq = cq;
   alarm->tag = tag;
 
+  grpc_cq_begin_op(cq, tag);
   grpc_timer_init(&exec_ctx, &alarm->alarm, deadline, alarm_cb, alarm,
                   gpr_now(GPR_CLOCK_MONOTONIC));
-  grpc_cq_begin_op(cq, tag);
   grpc_exec_ctx_finish(&exec_ctx);
   return alarm;
 }
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index 50b2bf2..c94a523 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -41,6 +41,7 @@
 #include <grpc++/support/byte_buffer.h>
 #include <grpc++/support/slice.h>
 #include <grpc/support/log.h>
+#include <grpc/support/time.h>
 
 #include "src/proto/grpc/testing/payloads.grpc.pb.h"
 #include "src/proto/grpc/testing/services.grpc.pb.h"
@@ -52,27 +53,8 @@
 #include "test/cpp/util/create_test_channel.h"
 
 namespace grpc {
-
-#if defined(__APPLE__)
-// Specialize Timepoint for high res clock as we need that
-template <>
-class TimePoint<std::chrono::high_resolution_clock::time_point> {
- public:
-  TimePoint(const std::chrono::high_resolution_clock::time_point& time) {
-    TimepointHR2Timespec(time, &time_);
-  }
-  gpr_timespec raw_time() const { return time_; }
-
- private:
-  gpr_timespec time_;
-};
-#endif
-
 namespace testing {
 
-typedef std::chrono::high_resolution_clock grpc_time_source;
-typedef std::chrono::time_point<grpc_time_source> grpc_time;
-
 template <class RequestType>
 class ClientRequestCreator {
  public:
@@ -184,7 +166,7 @@
     // Set up the load distribution based on the number of threads
     const auto& load = config.load_params();
 
-    std::unique_ptr<RandomDist> random_dist;
+    std::unique_ptr<RandomDistInterface> random_dist;
     switch (load.load_case()) {
       case LoadParams::kClosedLoop:
         // Closed-loop doesn't use random dist at all
@@ -218,25 +200,26 @@
       closed_loop_ = false;
       // set up interarrival timer according to random dist
       interarrival_timer_.init(*random_dist, num_threads);
+      const auto now = gpr_now(GPR_CLOCK_MONOTONIC);
       for (size_t i = 0; i < num_threads; i++) {
-        next_time_.push_back(
-            grpc_time_source::now() +
-            std::chrono::duration_cast<grpc_time_source::duration>(
-                interarrival_timer_(i)));
+        next_time_.push_back(gpr_time_add(
+            now,
+            gpr_time_from_nanos(interarrival_timer_.next(i), GPR_TIMESPAN)));
       }
     }
   }
 
-  bool NextIssueTime(int thread_idx, grpc_time* time_delay) {
-    if (closed_loop_) {
-      return false;
-    } else {
-      *time_delay = next_time_[thread_idx];
-      next_time_[thread_idx] +=
-          std::chrono::duration_cast<grpc_time_source::duration>(
-              interarrival_timer_(thread_idx));
-      return true;
-    }
+  gpr_timespec NextIssueTime(int thread_idx) {
+    const gpr_timespec result = next_time_[thread_idx];
+    next_time_[thread_idx] =
+        gpr_time_add(next_time_[thread_idx],
+                     gpr_time_from_nanos(interarrival_timer_.next(thread_idx),
+                                         GPR_TIMESPAN));
+    return result;
+  }
+  std::function<gpr_timespec()> NextIssuer(int thread_idx) {
+    return closed_loop_ ? std::function<gpr_timespec()>()
+                        : std::bind(&Client::NextIssueTime, this, thread_idx);
   }
 
  private:
@@ -306,7 +289,7 @@
     Histogram* new_stats_;
     Histogram histogram_;
     Client* client_;
-    size_t idx_;
+    const size_t idx_;
     std::thread impl_;
   };
 
@@ -314,7 +297,7 @@
   std::unique_ptr<Timer> timer_;
 
   InterarrivalTimer interarrival_timer_;
-  std::vector<grpc_time> next_time_;
+  std::vector<gpr_timespec> next_time_;
 };
 
 template <class StubType, class RequestType>
@@ -323,9 +306,9 @@
   ClientImpl(const ClientConfig& config,
              std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)>
                  create_stub)
-      : channels_(config.client_channels()), create_stub_(create_stub) {
-    cores_ = LimitCores(config.core_list().data(), config.core_list_size());
-
+      : cores_(LimitCores(config.core_list().data(), config.core_list_size())),
+        channels_(config.client_channels()),
+        create_stub_(create_stub) {
     for (int i = 0; i < config.client_channels(); i++) {
       channels_[i].init(config.server_targets(i % config.server_targets_size()),
                         config, create_stub_);
@@ -337,7 +320,7 @@
   virtual ~ClientImpl() {}
 
  protected:
-  int cores_;
+  const int cores_;
   RequestType request_;
 
   class ClientChannelInfo {
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index f3f8f37..9e8767d 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -43,9 +43,9 @@
 #include <vector>
 
 #include <gflags/gflags.h>
+#include <grpc++/alarm.h>
 #include <grpc++/channel.h>
 #include <grpc++/client_context.h>
-#include <grpc++/client_context.h>
 #include <grpc++/generic/generic_stub.h>
 #include <grpc/grpc.h>
 #include <grpc/support/cpu.h>
@@ -60,11 +60,9 @@
 namespace grpc {
 namespace testing {
 
-typedef std::list<grpc_time> deadline_list;
-
 class ClientRpcContext {
  public:
-  explicit ClientRpcContext(int ch) : channel_id_(ch) {}
+  ClientRpcContext() {}
   virtual ~ClientRpcContext() {}
   // next state, return false if done. Collect stats when appropriate
   virtual bool RunNextState(bool, Histogram* hist) = 0;
@@ -74,72 +72,73 @@
     return reinterpret_cast<ClientRpcContext*>(t);
   }
 
-  deadline_list::iterator deadline_posn() const { return deadline_posn_; }
-  void set_deadline_posn(const deadline_list::iterator& it) {
-    deadline_posn_ = it;
-  }
   virtual void Start(CompletionQueue* cq) = 0;
-  int channel_id() const { return channel_id_; }
-
- protected:
-  int channel_id_;
-
- private:
-  deadline_list::iterator deadline_posn_;
 };
 
 template <class RequestType, class ResponseType>
 class ClientRpcContextUnaryImpl : public ClientRpcContext {
  public:
   ClientRpcContextUnaryImpl(
-      int channel_id, BenchmarkService::Stub* stub, const RequestType& req,
+      BenchmarkService::Stub* stub, const RequestType& req,
+      std::function<gpr_timespec()> next_issue,
       std::function<
           std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
               BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
               CompletionQueue*)> start_req,
       std::function<void(grpc::Status, ResponseType*)> on_done)
-      : ClientRpcContext(channel_id),
-        context_(),
+      : context_(),
         stub_(stub),
+        cq_(nullptr),
         req_(req),
         response_(),
-        next_state_(&ClientRpcContextUnaryImpl::RespDone),
+        next_state_(State::READY),
         callback_(on_done),
+        next_issue_(next_issue),
         start_req_(start_req) {}
-  void Start(CompletionQueue* cq) GRPC_OVERRIDE {
-    start_ = Timer::Now();
-    response_reader_ = start_req_(stub_, &context_, req_, cq);
-    response_reader_->Finish(&response_, &status_, ClientRpcContext::tag(this));
-  }
   ~ClientRpcContextUnaryImpl() GRPC_OVERRIDE {}
-  bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
-    bool ret = (this->*next_state_)(ok);
-    if (!ret) {
-      hist->Add((Timer::Now() - start_) * 1e9);
+  void Start(CompletionQueue* cq) GRPC_OVERRIDE {
+    cq_ = cq;
+    if (!next_issue_) {  // ready to issue
+      RunNextState(true, nullptr);
+    } else {  // wait for the issue time
+      alarm_.reset(new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this)));
     }
-    return ret;
   }
-
+  bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
+    switch (next_state_) {
+      case State::READY:
+        start_ = Timer::Now();
+        response_reader_ = start_req_(stub_, &context_, req_, cq_);
+        response_reader_->Finish(&response_, &status_,
+                                 ClientRpcContext::tag(this));
+        next_state_ = State::RESP_DONE;
+        return true;
+      case State::RESP_DONE:
+        hist->Add((Timer::Now() - start_) * 1e9);
+        callback_(status_, &response_);
+        next_state_ = State::INVALID;
+        return false;
+      default:
+        GPR_ASSERT(false);
+        return false;
+    }
+  }
   ClientRpcContext* StartNewClone() GRPC_OVERRIDE {
-    return new ClientRpcContextUnaryImpl(channel_id_, stub_, req_, start_req_,
+    return new ClientRpcContextUnaryImpl(stub_, req_, next_issue_, start_req_,
                                          callback_);
   }
 
  private:
-  bool RespDone(bool) {
-    next_state_ = &ClientRpcContextUnaryImpl::DoCallBack;
-    return false;
-  }
-  bool DoCallBack(bool) {
-    callback_(status_, &response_);
-    return true;  // we're done, this'll be ignored
-  }
   grpc::ClientContext context_;
   BenchmarkService::Stub* stub_;
+  CompletionQueue* cq_;
+  std::unique_ptr<Alarm> alarm_;
   RequestType req_;
   ResponseType response_;
-  bool (ClientRpcContextUnaryImpl::*next_state_)(bool);
+  enum State { INVALID, READY, RESP_DONE };
+  State next_state_;
   std::function<void(grpc::Status, ResponseType*)> callback_;
+  std::function<gpr_timespec()> next_issue_;
   std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
       BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
       CompletionQueue*)> start_req_;
@@ -157,49 +156,35 @@
   // member name resolution until the template types are fully resolved
  public:
   using Client::SetupLoadTest;
-  using Client::NextIssueTime;
   using Client::closed_loop_;
+  using Client::NextIssuer;
   using ClientImpl<StubType, RequestType>::cores_;
   using ClientImpl<StubType, RequestType>::channels_;
   using ClientImpl<StubType, RequestType>::request_;
   AsyncClient(const ClientConfig& config,
-              std::function<ClientRpcContext*(int, StubType*,
-                                              const RequestType&)> setup_ctx,
+              std::function<ClientRpcContext*(
+                  StubType*, std::function<gpr_timespec()> next_issue,
+                  const RequestType&)> setup_ctx,
               std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)>
                   create_stub)
       : ClientImpl<StubType, RequestType>(config, create_stub),
-        num_async_threads_(NumThreads(config)),
-        channel_lock_(new std::mutex[config.client_channels()]),
-        contexts_(config.client_channels()),
-        max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()),
-        channel_count_(config.client_channels()),
-        pref_channel_inc_(num_async_threads_) {
+        num_async_threads_(NumThreads(config)) {
     SetupLoadTest(config, num_async_threads_);
 
     for (int i = 0; i < num_async_threads_; i++) {
       cli_cqs_.emplace_back(new CompletionQueue);
-      if (!closed_loop_) {
-        rpc_deadlines_.emplace_back();
-        next_channel_.push_back(i % channel_count_);
-        issue_allowed_.emplace_back(true);
-
-        grpc_time next_issue;
-        NextIssueTime(i, &next_issue);
-        next_issue_.push_back(next_issue);
-      }
+      next_issuers_.emplace_back(NextIssuer(i));
     }
 
+    using namespace std::placeholders;
     int t = 0;
     for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
-      for (int ch = 0; ch < channel_count_; ch++) {
+      for (int ch = 0; ch < config.client_channels(); ch++) {
         auto* cq = cli_cqs_[t].get();
+        auto ctx =
+            setup_ctx(channels_[ch].get_stub(), next_issuers_[t], request_);
+        ctx->Start(cq);
         t = (t + 1) % cli_cqs_.size();
-        auto ctx = setup_ctx(ch, channels_[ch].get_stub(), request_);
-        if (closed_loop_) {
-          ctx->Start(cq);
-        } else {
-          contexts_[ch].push_front(ctx);
-        }
       }
     }
   }
@@ -212,140 +197,34 @@
         delete ClientRpcContext::detag(got_tag);
       }
     }
-    // Now clear out all the pre-allocated idle contexts
-    for (int ch = 0; ch < channel_count_; ch++) {
-      while (!contexts_[ch].empty()) {
-        // Get an idle context from the front of the list
-        auto* ctx = *(contexts_[ch].begin());
-        contexts_[ch].pop_front();
-        delete ctx;
-      }
-    }
-    delete[] channel_lock_;
   }
 
   bool ThreadFunc(Histogram* histogram,
                   size_t thread_idx) GRPC_OVERRIDE GRPC_FINAL {
     void* got_tag;
     bool ok;
-    grpc_time deadline, short_deadline;
-    if (closed_loop_) {
-      deadline = grpc_time_source::now() + std::chrono::seconds(1);
-      short_deadline = deadline;
-    } else {
-      if (rpc_deadlines_[thread_idx].empty()) {
-        deadline = grpc_time_source::now() + std::chrono::seconds(1);
-      } else {
-        deadline = *(rpc_deadlines_[thread_idx].begin());
-      }
-      short_deadline =
-          issue_allowed_[thread_idx] ? next_issue_[thread_idx] : deadline;
-    }
 
-    bool got_event;
-
-    switch (cli_cqs_[thread_idx]->AsyncNext(&got_tag, &ok, short_deadline)) {
-      case CompletionQueue::SHUTDOWN:
-        return false;
-      case CompletionQueue::TIMEOUT:
-        got_event = false;
-        break;
-      case CompletionQueue::GOT_EVENT:
-        got_event = true;
-        break;
-      default:
-        GPR_ASSERT(false);
-        break;
-    }
-    if (got_event) {
+    if (cli_cqs_[thread_idx]->Next(&got_tag, &ok)) {
+      // Got a regular event, so process it
       ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
-      if (ctx->RunNextState(ok, histogram) == false) {
-        // call the callback and then clone the ctx
-        ctx->RunNextState(ok, histogram);
-        ClientRpcContext* clone_ctx = ctx->StartNewClone();
-        if (closed_loop_) {
-          clone_ctx->Start(cli_cqs_[thread_idx].get());
-        } else {
-          // Remove the entry from the rpc deadlines list
-          rpc_deadlines_[thread_idx].erase(ctx->deadline_posn());
-          // Put the clone_ctx in the list of idle contexts for this channel
-          // Under lock
-          int ch = clone_ctx->channel_id();
-          std::lock_guard<std::mutex> g(channel_lock_[ch]);
-          contexts_[ch].push_front(clone_ctx);
-        }
+      if (!ctx->RunNextState(ok, histogram)) {
+        // The RPC and callback are done, so clone the ctx
+        // and kickstart the new one
+        auto clone = ctx->StartNewClone();
+        clone->Start(cli_cqs_[thread_idx].get());
         // delete the old version
         delete ctx;
       }
-      if (!closed_loop_)
-        issue_allowed_[thread_idx] =
-            true;  // may be ok now even if it hadn't been
+      return true;
+    } else {  // queue is shutting down
+      return false;
     }
-    if (!closed_loop_ && issue_allowed_[thread_idx] &&
-        grpc_time_source::now() >= next_issue_[thread_idx]) {
-      // Attempt to issue
-      bool issued = false;
-      for (int num_attempts = 0, channel_attempt = next_channel_[thread_idx];
-           num_attempts < channel_count_ && !issued; num_attempts++) {
-        bool can_issue = false;
-        ClientRpcContext* ctx = nullptr;
-        {
-          std::lock_guard<std::mutex> g(channel_lock_[channel_attempt]);
-          if (!contexts_[channel_attempt].empty()) {
-            // Get an idle context from the front of the list
-            ctx = *(contexts_[channel_attempt].begin());
-            contexts_[channel_attempt].pop_front();
-            can_issue = true;
-          }
-        }
-        if (can_issue) {
-          // do the work to issue
-          rpc_deadlines_[thread_idx].emplace_back(grpc_time_source::now() +
-                                                  std::chrono::seconds(1));
-          auto it = rpc_deadlines_[thread_idx].end();
-          --it;
-          ctx->set_deadline_posn(it);
-          ctx->Start(cli_cqs_[thread_idx].get());
-          issued = true;
-          // If we did issue, then next time, try our thread's next
-          // preferred channel
-          next_channel_[thread_idx] += pref_channel_inc_;
-          if (next_channel_[thread_idx] >= channel_count_)
-            next_channel_[thread_idx] = (thread_idx % channel_count_);
-        } else {
-          // Do a modular increment of channel attempt if we couldn't issue
-          channel_attempt = (channel_attempt + 1) % channel_count_;
-        }
-      }
-      if (issued) {
-        // We issued one; see when we can issue the next
-        grpc_time next_issue;
-        NextIssueTime(thread_idx, &next_issue);
-        next_issue_[thread_idx] = next_issue;
-      } else {
-        issue_allowed_[thread_idx] = false;
-      }
-    }
-    return true;
   }
 
  protected:
-  int num_async_threads_;
+  const int num_async_threads_;
 
  private:
-  class boolean {  // exists only to avoid data-race on vector<bool>
-   public:
-    boolean() : val_(false) {}
-    boolean(bool b) : val_(b) {}
-    operator bool() const { return val_; }
-    boolean& operator=(bool b) {
-      val_ = b;
-      return *this;
-    }
-
-   private:
-    bool val_;
-  };
   int NumThreads(const ClientConfig& config) {
     int num_threads = config.async_client_threads();
     if (num_threads <= 0) {  // Use dynamic sizing
@@ -356,18 +235,7 @@
   }
 
   std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
-
-  std::vector<deadline_list> rpc_deadlines_;  // per thread deadlines
-  std::vector<int> next_channel_;       // per thread round-robin channel ctr
-  std::vector<boolean> issue_allowed_;  // may this thread attempt to issue
-  std::vector<grpc_time> next_issue_;   // when should it issue?
-
-  std::mutex*
-      channel_lock_;  // a vector, but avoid std::vector for old compilers
-  std::vector<context_list> contexts_;  // per-channel list of idle contexts
-  int max_outstanding_per_channel_;
-  int channel_count_;
-  int pref_channel_inc_;
+  std::vector<std::function<gpr_timespec()>> next_issuers_;
 };
 
 static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator(
@@ -391,11 +259,11 @@
            const SimpleRequest& request, CompletionQueue* cq) {
     return stub->AsyncUnaryCall(ctx, request, cq);
   };
-  static ClientRpcContext* SetupCtx(int channel_id,
-                                    BenchmarkService::Stub* stub,
+  static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
+                                    std::function<gpr_timespec()> next_issue,
                                     const SimpleRequest& req) {
     return new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
-        channel_id, stub, req, AsyncUnaryClient::StartReq,
+        stub, req, next_issue, AsyncUnaryClient::StartReq,
         AsyncUnaryClient::CheckDone);
   }
 };
@@ -404,62 +272,94 @@
 class ClientRpcContextStreamingImpl : public ClientRpcContext {
  public:
   ClientRpcContextStreamingImpl(
-      int channel_id, BenchmarkService::Stub* stub, const RequestType& req,
+      BenchmarkService::Stub* stub, const RequestType& req,
+      std::function<gpr_timespec()> next_issue,
       std::function<std::unique_ptr<
           grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
           BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*,
           void*)> start_req,
       std::function<void(grpc::Status, ResponseType*)> on_done)
-      : ClientRpcContext(channel_id),
-        context_(),
+      : context_(),
         stub_(stub),
+        cq_(nullptr),
         req_(req),
         response_(),
-        next_state_(&ClientRpcContextStreamingImpl::ReqSent),
+        next_state_(State::INVALID),
         callback_(on_done),
+        next_issue_(next_issue),
         start_req_(start_req),
         start_(Timer::Now()) {}
   ~ClientRpcContextStreamingImpl() GRPC_OVERRIDE {}
+  void Start(CompletionQueue* cq) GRPC_OVERRIDE {
+    cq_ = cq;
+    stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this));
+    next_state_ = State::STREAM_IDLE;
+  }
   bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
-    return (this->*next_state_)(ok, hist);
+    while (true) {
+      switch (next_state_) {
+        case State::STREAM_IDLE:
+          if (!next_issue_) {  // ready to issue
+            next_state_ = State::READY_TO_WRITE;
+          } else {
+            next_state_ = State::WAIT;
+          }
+          break;  // loop around, don't return
+        case State::WAIT:
+          alarm_.reset(
+              new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this)));
+          next_state_ = State::READY_TO_WRITE;
+          return true;
+        case State::READY_TO_WRITE:
+          if (!ok) {
+            return false;
+          }
+          start_ = Timer::Now();
+          next_state_ = State::WRITE_DONE;
+          stream_->Write(req_, ClientRpcContext::tag(this));
+          return true;
+        case State::WRITE_DONE:
+          if (!ok) {
+            return false;
+          }
+          next_state_ = State::READ_DONE;
+          stream_->Read(&response_, ClientRpcContext::tag(this));
+          return true;
+          break;
+        case State::READ_DONE:
+          hist->Add((Timer::Now() - start_) * 1e9);
+          callback_(status_, &response_);
+          next_state_ = State::STREAM_IDLE;
+          break;  // loop around
+        default:
+          GPR_ASSERT(false);
+          return false;
+      }
+    }
   }
   ClientRpcContext* StartNewClone() GRPC_OVERRIDE {
-    return new ClientRpcContextStreamingImpl(channel_id_, stub_, req_,
+    return new ClientRpcContextStreamingImpl(stub_, req_, next_issue_,
                                              start_req_, callback_);
   }
-  void Start(CompletionQueue* cq) GRPC_OVERRIDE {
-    stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this));
-  }
 
  private:
-  bool ReqSent(bool ok, Histogram*) { return StartWrite(ok); }
-  bool StartWrite(bool ok) {
-    if (!ok) {
-      return (false);
-    }
-    start_ = Timer::Now();
-    next_state_ = &ClientRpcContextStreamingImpl::WriteDone;
-    stream_->Write(req_, ClientRpcContext::tag(this));
-    return true;
-  }
-  bool WriteDone(bool ok, Histogram*) {
-    if (!ok) {
-      return (false);
-    }
-    next_state_ = &ClientRpcContextStreamingImpl::ReadDone;
-    stream_->Read(&response_, ClientRpcContext::tag(this));
-    return true;
-  }
-  bool ReadDone(bool ok, Histogram* hist) {
-    hist->Add((Timer::Now() - start_) * 1e9);
-    return StartWrite(ok);
-  }
   grpc::ClientContext context_;
   BenchmarkService::Stub* stub_;
+  CompletionQueue* cq_;
+  std::unique_ptr<Alarm> alarm_;
   RequestType req_;
   ResponseType response_;
-  bool (ClientRpcContextStreamingImpl::*next_state_)(bool, Histogram*);
+  enum State {
+    INVALID,
+    STREAM_IDLE,
+    WAIT,
+    READY_TO_WRITE,
+    WRITE_DONE,
+    READ_DONE
+  };
+  State next_state_;
   std::function<void(grpc::Status, ResponseType*)> callback_;
+  std::function<gpr_timespec()> next_issue_;
   std::function<
       std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
           BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*,
@@ -475,9 +375,6 @@
  public:
   explicit AsyncStreamingClient(const ClientConfig& config)
       : AsyncClient(config, SetupCtx, BenchmarkStubCreator) {
-    // async streaming currently only supports closed loop
-    GPR_ASSERT(closed_loop_);
-
     StartThreads(num_async_threads_);
   }
 
@@ -492,11 +389,11 @@
     auto stream = stub->AsyncStreamingCall(ctx, cq, tag);
     return stream;
   };
-  static ClientRpcContext* SetupCtx(int channel_id,
-                                    BenchmarkService::Stub* stub,
+  static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
+                                    std::function<gpr_timespec()> next_issue,
                                     const SimpleRequest& req) {
     return new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>(
-        channel_id, stub, req, AsyncStreamingClient::StartReq,
+        stub, req, next_issue, AsyncStreamingClient::StartReq,
         AsyncStreamingClient::CheckDone);
   }
 };
@@ -504,64 +401,96 @@
 class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
  public:
   ClientRpcContextGenericStreamingImpl(
-      int channel_id, grpc::GenericStub* stub, const ByteBuffer& req,
+      grpc::GenericStub* stub, const ByteBuffer& req,
+      std::function<gpr_timespec()> next_issue,
       std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>(
           grpc::GenericStub*, grpc::ClientContext*,
           const grpc::string& method_name, CompletionQueue*, void*)> start_req,
       std::function<void(grpc::Status, ByteBuffer*)> on_done)
-      : ClientRpcContext(channel_id),
-        context_(),
+      : context_(),
         stub_(stub),
+        cq_(nullptr),
         req_(req),
         response_(),
-        next_state_(&ClientRpcContextGenericStreamingImpl::ReqSent),
+        next_state_(State::INVALID),
         callback_(on_done),
+        next_issue_(next_issue),
         start_req_(start_req),
         start_(Timer::Now()) {}
   ~ClientRpcContextGenericStreamingImpl() GRPC_OVERRIDE {}
-  bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
-    return (this->*next_state_)(ok, hist);
-  }
-  ClientRpcContext* StartNewClone() GRPC_OVERRIDE {
-    return new ClientRpcContextGenericStreamingImpl(channel_id_, stub_, req_,
-                                                    start_req_, callback_);
-  }
   void Start(CompletionQueue* cq) GRPC_OVERRIDE {
+    cq_ = cq;
     const grpc::string kMethodName(
         "/grpc.testing.BenchmarkService/StreamingCall");
     stream_ = start_req_(stub_, &context_, kMethodName, cq,
                          ClientRpcContext::tag(this));
+    next_state_ = State::STREAM_IDLE;
+  }
+  bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
+    while (true) {
+      switch (next_state_) {
+        case State::STREAM_IDLE:
+          if (!next_issue_) {  // ready to issue
+            next_state_ = State::READY_TO_WRITE;
+          } else {
+            next_state_ = State::WAIT;
+          }
+          break;  // loop around, don't return
+        case State::WAIT:
+          alarm_.reset(
+              new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this)));
+          next_state_ = State::READY_TO_WRITE;
+          return true;
+        case State::READY_TO_WRITE:
+          if (!ok) {
+            return false;
+          }
+          start_ = Timer::Now();
+          next_state_ = State::WRITE_DONE;
+          stream_->Write(req_, ClientRpcContext::tag(this));
+          return true;
+        case State::WRITE_DONE:
+          if (!ok) {
+            return false;
+          }
+          next_state_ = State::READ_DONE;
+          stream_->Read(&response_, ClientRpcContext::tag(this));
+          return true;
+          break;
+        case State::READ_DONE:
+          hist->Add((Timer::Now() - start_) * 1e9);
+          callback_(status_, &response_);
+          next_state_ = State::STREAM_IDLE;
+          break;  // loop around
+        default:
+          GPR_ASSERT(false);
+          return false;
+      }
+    }
+  }
+  ClientRpcContext* StartNewClone() GRPC_OVERRIDE {
+    return new ClientRpcContextGenericStreamingImpl(stub_, req_, next_issue_,
+                                                    start_req_, callback_);
   }
 
  private:
-  bool ReqSent(bool ok, Histogram*) { return StartWrite(ok); }
-  bool StartWrite(bool ok) {
-    if (!ok) {
-      return (false);
-    }
-    start_ = Timer::Now();
-    next_state_ = &ClientRpcContextGenericStreamingImpl::WriteDone;
-    stream_->Write(req_, ClientRpcContext::tag(this));
-    return true;
-  }
-  bool WriteDone(bool ok, Histogram*) {
-    if (!ok) {
-      return (false);
-    }
-    next_state_ = &ClientRpcContextGenericStreamingImpl::ReadDone;
-    stream_->Read(&response_, ClientRpcContext::tag(this));
-    return true;
-  }
-  bool ReadDone(bool ok, Histogram* hist) {
-    hist->Add((Timer::Now() - start_) * 1e9);
-    return StartWrite(ok);
-  }
   grpc::ClientContext context_;
   grpc::GenericStub* stub_;
+  CompletionQueue* cq_;
+  std::unique_ptr<Alarm> alarm_;
   ByteBuffer req_;
   ByteBuffer response_;
-  bool (ClientRpcContextGenericStreamingImpl::*next_state_)(bool, Histogram*);
+  enum State {
+    INVALID,
+    STREAM_IDLE,
+    WAIT,
+    READY_TO_WRITE,
+    WRITE_DONE,
+    READ_DONE
+  };
+  State next_state_;
   std::function<void(grpc::Status, ByteBuffer*)> callback_;
+  std::function<gpr_timespec()> next_issue_;
   std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>(
       grpc::GenericStub*, grpc::ClientContext*, const grpc::string&,
       CompletionQueue*, void*)> start_req_;
@@ -580,9 +509,6 @@
  public:
   explicit GenericAsyncStreamingClient(const ClientConfig& config)
       : AsyncClient(config, SetupCtx, GenericStubCreator) {
-    // async streaming currently only supports closed loop
-    GPR_ASSERT(closed_loop_);
-
     StartThreads(num_async_threads_);
   }
 
@@ -596,10 +522,11 @@
     auto stream = stub->Call(ctx, method_name, cq, tag);
     return stream;
   };
-  static ClientRpcContext* SetupCtx(int channel_id, grpc::GenericStub* stub,
+  static ClientRpcContext* SetupCtx(grpc::GenericStub* stub,
+                                    std::function<gpr_timespec()> next_issue,
                                     const ByteBuffer& req) {
     return new ClientRpcContextGenericStreamingImpl(
-        channel_id, stub, req, GenericAsyncStreamingClient::StartReq,
+        stub, req, next_issue, GenericAsyncStreamingClient::StartReq,
         GenericAsyncStreamingClient::CheckDone);
   }
 };
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc
index d93537b..edfc246 100644
--- a/test/cpp/qps/client_sync.cc
+++ b/test/cpp/qps/client_sync.cc
@@ -84,11 +84,8 @@
 
  protected:
   void WaitToIssue(int thread_idx) {
-    grpc_time next_time;
-    if (NextIssueTime(thread_idx, &next_time)) {
-      gpr_timespec next_timespec;
-      TimepointHR2Timespec(next_time, &next_timespec);
-      gpr_sleep_until(next_timespec);
+    if (!closed_loop_) {
+      gpr_sleep_until(NextIssueTime(thread_idx));
     }
   }
 
diff --git a/test/cpp/qps/interarrival.h b/test/cpp/qps/interarrival.h
index 841619e..b6fd67b 100644
--- a/test/cpp/qps/interarrival.h
+++ b/test/cpp/qps/interarrival.h
@@ -1,6 +1,6 @@
 /*
  *
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
@@ -51,15 +51,15 @@
 // stacks. Thus, this code only uses a uniform distribution of doubles [0,1)
 // and then provides the distribution functions itself.
 
-class RandomDist {
+class RandomDistInterface {
  public:
-  RandomDist() {}
-  virtual ~RandomDist() = 0;
-  // Argument to operator() is a uniform double in the range [0,1)
-  virtual double operator()(double uni) const = 0;
+  RandomDistInterface() {}
+  virtual ~RandomDistInterface() = 0;
+  // Argument to transform is a uniform double in the range [0,1)
+  virtual double transform(double uni) const = 0;
 };
 
-inline RandomDist::~RandomDist() {}
+inline RandomDistInterface::~RandomDistInterface() {}
 
 // ExpDist implements an exponential distribution, which is the
 // interarrival distribution for a Poisson process. The parameter
@@ -69,11 +69,11 @@
 // independent identical stationary sources. For more information,
 // see http://en.wikipedia.org/wiki/Exponential_distribution
 
-class ExpDist GRPC_FINAL : public RandomDist {
+class ExpDist GRPC_FINAL : public RandomDistInterface {
  public:
   explicit ExpDist(double lambda) : lambda_recip_(1.0 / lambda) {}
   ~ExpDist() GRPC_OVERRIDE {}
-  double operator()(double uni) const GRPC_OVERRIDE {
+  double transform(double uni) const GRPC_OVERRIDE {
     // Note: Use 1.0-uni above to avoid NaN if uni is 0
     return lambda_recip_ * (-log(1.0 - uni));
   }
@@ -87,11 +87,11 @@
 // mean interarrival time is (lo+hi)/2. For more information,
 // see http://en.wikipedia.org/wiki/Uniform_distribution_%28continuous%29
 
-class UniformDist GRPC_FINAL : public RandomDist {
+class UniformDist GRPC_FINAL : public RandomDistInterface {
  public:
   UniformDist(double lo, double hi) : lo_(lo), range_(hi - lo) {}
   ~UniformDist() GRPC_OVERRIDE {}
-  double operator()(double uni) const GRPC_OVERRIDE {
+  double transform(double uni) const GRPC_OVERRIDE {
     return uni * range_ + lo_;
   }
 
@@ -106,11 +106,11 @@
 // clients) will not preserve any deterministic interarrival gap across
 // requests.
 
-class DetDist GRPC_FINAL : public RandomDist {
+class DetDist GRPC_FINAL : public RandomDistInterface {
  public:
   explicit DetDist(double val) : val_(val) {}
   ~DetDist() GRPC_OVERRIDE {}
-  double operator()(double uni) const GRPC_OVERRIDE { return val_; }
+  double transform(double uni) const GRPC_OVERRIDE { return val_; }
 
  private:
   double val_;
@@ -123,12 +123,12 @@
 // good representation of the response times of data center jobs. See
 // http://en.wikipedia.org/wiki/Pareto_distribution
 
-class ParetoDist GRPC_FINAL : public RandomDist {
+class ParetoDist GRPC_FINAL : public RandomDistInterface {
  public:
   ParetoDist(double base, double alpha)
       : base_(base), alpha_recip_(1.0 / alpha) {}
   ~ParetoDist() GRPC_OVERRIDE {}
-  double operator()(double uni) const GRPC_OVERRIDE {
+  double transform(double uni) const GRPC_OVERRIDE {
     // Note: Use 1.0-uni above to avoid div by zero if uni is 0
     return base_ / pow(1.0 - uni, alpha_recip_);
   }
@@ -145,13 +145,14 @@
 class InterarrivalTimer {
  public:
   InterarrivalTimer() {}
-  void init(const RandomDist& r, int threads, int entries = 1000000) {
+  void init(const RandomDistInterface& r, int threads, int entries = 1000000) {
     for (int i = 0; i < entries; i++) {
       // rand is the only choice that is portable across POSIX and Windows
       // and that supports new and old compilers
-      const double uniform_0_1 = rand() / RAND_MAX;
+      const double uniform_0_1 =
+          static_cast<double>(rand()) / static_cast<double>(RAND_MAX);
       random_table_.push_back(
-          std::chrono::nanoseconds(static_cast<int64_t>(1e9 * r(uniform_0_1))));
+          static_cast<int64_t>(1e9 * r.transform(uniform_0_1)));
     }
     // Now set up the thread positions
     for (int i = 0; i < threads; i++) {
@@ -160,7 +161,7 @@
   }
   virtual ~InterarrivalTimer(){};
 
-  std::chrono::nanoseconds operator()(int thread_num) {
+  int64_t next(int thread_num) {
     auto ret = *(thread_posns_[thread_num]++);
     if (thread_posns_[thread_num] == random_table_.end())
       thread_posns_[thread_num] = random_table_.begin();
@@ -168,7 +169,7 @@
   }
 
  private:
-  typedef std::vector<std::chrono::nanoseconds> time_table;
+  typedef std::vector<int64_t> time_table;
   std::vector<time_table::const_iterator> thread_posns_;
   time_table random_table_;
 };
diff --git a/test/cpp/qps/qps_interarrival_test.cc b/test/cpp/qps/qps_interarrival_test.cc
index ccda28f..77e81fb 100644
--- a/test/cpp/qps/qps_interarrival_test.cc
+++ b/test/cpp/qps/qps_interarrival_test.cc
@@ -1,6 +1,6 @@
 /*
  *
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
@@ -39,17 +39,17 @@
 
 #include "test/cpp/qps/interarrival.h"
 
-using grpc::testing::RandomDist;
+using grpc::testing::RandomDistInterface;
 using grpc::testing::InterarrivalTimer;
 
-static void RunTest(RandomDist &&r, int threads, std::string title) {
+static void RunTest(RandomDistInterface &&r, int threads, std::string title) {
   InterarrivalTimer timer;
   timer.init(r, threads);
   gpr_histogram *h(gpr_histogram_create(0.01, 60e9));
 
   for (int i = 0; i < 10000000; i++) {
     for (int j = 0; j < threads; j++) {
-      gpr_histogram_add(h, timer(j).count());
+      gpr_histogram_add(h, timer.next(j));
     }
   }
 
@@ -70,7 +70,7 @@
 int main(int argc, char **argv) {
   RunTest(ExpDist(10.0), 5, std::string("Exponential(10)"));
   RunTest(DetDist(5.0), 5, std::string("Det(5)"));
-  RunTest(UniformDist(0.0, 10.0), 5, std::string("Uniform(1,10)"));
+  RunTest(UniformDist(0.0, 10.0), 5, std::string("Uniform(0,10)"));
   RunTest(ParetoDist(1.0, 1.0), 5, std::string("Pareto(1,1)"));
   return 0;
 }
diff --git a/test/cpp/qps/qps_openloop_test.cc b/test/cpp/qps/qps_openloop_test.cc
index fe5f685..0ac41d9 100644
--- a/test/cpp/qps/qps_openloop_test.cc
+++ b/test/cpp/qps/qps_openloop_test.cc
@@ -53,7 +53,7 @@
   client_config.set_outstanding_rpcs_per_channel(1000);
   client_config.set_client_channels(8);
   client_config.set_async_client_threads(8);
-  client_config.set_rpc_type(UNARY);
+  client_config.set_rpc_type(STREAMING);
   client_config.mutable_load_params()->mutable_poisson()->set_offered_load(
       1000.0);
 
diff --git a/test/cpp/qps/qps_test.cc b/test/cpp/qps/qps_test.cc
index 15054db..27aaf13 100644
--- a/test/cpp/qps/qps_test.cc
+++ b/test/cpp/qps/qps_test.cc
@@ -53,7 +53,7 @@
   client_config.set_outstanding_rpcs_per_channel(1000);
   client_config.set_client_channels(8);
   client_config.set_async_client_threads(8);
-  client_config.set_rpc_type(UNARY);
+  client_config.set_rpc_type(STREAMING);
   client_config.mutable_load_params()->mutable_closed_loop();
 
   ServerConfig server_config;
diff --git a/tools/run_tests/tests.json b/tools/run_tests/tests.json
index 2c73c40..ddf4a12 100644
--- a/tools/run_tests/tests.json
+++ b/tools/run_tests/tests.json
@@ -2084,6 +2084,24 @@
       "mac", 
       "posix"
     ], 
+    "cpu_cost": 1.0, 
+    "exclude_configs": [], 
+    "flaky": false, 
+    "language": "c++", 
+    "name": "qps_openloop_test", 
+    "platforms": [
+      "linux", 
+      "mac", 
+      "posix"
+    ]
+  }, 
+  {
+    "args": [], 
+    "ci_platforms": [
+      "linux", 
+      "mac", 
+      "posix"
+    ], 
     "cpu_cost": 10, 
     "exclude_configs": [
       "tsan"