Revert "Revert "Open loop sync/async multithreaded testing""
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index dc3a9f2..28cd32a 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -35,6 +35,7 @@
 #define TEST_QPS_CLIENT_H
 
 #include "test/cpp/qps/histogram.h"
+#include "test/cpp/qps/interarrival.h"
 #include "test/cpp/qps/timer.h"
 #include "test/cpp/qps/qpstest.grpc.pb.h"
 
@@ -42,11 +43,31 @@
 #include <mutex>
 
 namespace grpc {
+
+#if defined(__APPLE__)
+// Specialize Timepoint for high res clock as we need that
+template <>
+class TimePoint<std::chrono::high_resolution_clock::time_point> {
+ public:
+  TimePoint(const std::chrono::high_resolution_clock::time_point& time) {
+    TimepointHR2Timespec(time, &time_);
+  }
+  gpr_timespec raw_time() const { return time_; }
+
+ private:
+  gpr_timespec time_;
+};
+#endif
+
 namespace testing {
 
+typedef std::chrono::high_resolution_clock grpc_time_source;
+typedef std::chrono::time_point<grpc_time_source> grpc_time;
+
 class Client {
  public:
-  explicit Client(const ClientConfig& config) : timer_(new Timer) {
+  explicit Client(const ClientConfig& config)
+      : timer_(new Timer), interarrival_timer_() {
     for (int i = 0; i < config.client_channels(); i++) {
       channels_.push_back(ClientChannelInfo(
           config.server_targets(i % config.server_targets_size()), config));
@@ -81,6 +102,7 @@
 
  protected:
   SimpleRequest request_;
+  bool closed_loop_;
 
   class ClientChannelInfo {
    public:
@@ -106,6 +128,61 @@
 
   virtual bool ThreadFunc(Histogram* histogram, size_t thread_idx) = 0;
 
+  void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
+    // Set up the load distribution based on the number of threads
+    if (config.load_type() == CLOSED_LOOP) {
+      closed_loop_ = true;
+    } else {
+      closed_loop_ = false;
+
+      std::unique_ptr<RandomDist> random_dist;
+      const auto& load = config.load_params();
+      switch (config.load_type()) {
+        case POISSON:
+          random_dist.reset(
+              new ExpDist(load.poisson().offered_load() / num_threads));
+          break;
+        case UNIFORM:
+          random_dist.reset(
+              new UniformDist(load.uniform().interarrival_lo() * num_threads,
+                              load.uniform().interarrival_hi() * num_threads));
+          break;
+        case DETERMINISTIC:
+          random_dist.reset(
+              new DetDist(num_threads / load.determ().offered_load()));
+          break;
+        case PARETO:
+          random_dist.reset(
+              new ParetoDist(load.pareto().interarrival_base() * num_threads,
+                             load.pareto().alpha()));
+          break;
+        default:
+          GPR_ASSERT(false);
+          break;
+      }
+
+      interarrival_timer_.init(*random_dist, num_threads);
+      for (size_t i = 0; i < num_threads; i++) {
+        next_time_.push_back(
+            grpc_time_source::now() +
+            std::chrono::duration_cast<grpc_time_source::duration>(
+                interarrival_timer_(i)));
+      }
+    }
+  }
+
+  bool NextIssueTime(int thread_idx, grpc_time* time_delay) {
+    if (closed_loop_) {
+      return false;
+    } else {
+      *time_delay = next_time_[thread_idx];
+      next_time_[thread_idx] +=
+          std::chrono::duration_cast<grpc_time_source::duration>(
+              interarrival_timer_(thread_idx));
+      return true;
+    }
+  }
+
  private:
   class Thread {
    public:
@@ -168,6 +245,9 @@
 
   std::vector<std::unique_ptr<Thread>> threads_;
   std::unique_ptr<Timer> timer_;
+
+  InterarrivalTimer interarrival_timer_;
+  std::vector<grpc_time> next_time_;
 };
 
 std::unique_ptr<Client> CreateSynchronousUnaryClient(const ClientConfig& args);