Move interarrival timer to Client class so that it can be used for async tests
as well
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index 2dc5b38..45481a3 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -35,6 +35,7 @@
#define TEST_QPS_CLIENT_H
#include "test/cpp/qps/histogram.h"
+#include "test/cpp/qps/interarrival.h"
#include "test/cpp/qps/timer.h"
#include "test/cpp/qps/qpstest.grpc.pb.h"
@@ -46,7 +47,8 @@
class Client {
public:
- explicit Client(const ClientConfig& config) : timer_(new Timer) {
+ explicit Client(const ClientConfig& config) : timer_(new Timer),
+ interarrival_timer_() {
for (int i = 0; i < config.client_channels(); i++) {
channels_.push_back(ClientChannelInfo(
config.server_targets(i % config.server_targets_size()), config));
@@ -105,7 +107,60 @@
void EndThreads() { threads_.clear(); }
virtual bool ThreadFunc(Histogram* histogram, size_t thread_idx) = 0;
+
+ void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
+ // Set up the load distribution based on the number of threads
+ if (config.load_type() == CLOSED_LOOP) {
+ closed_loop_ = true;
+ }
+ else {
+ closed_loop_ = false;
+ std::unique_ptr<RandomDist> random_dist;
+ auto& load = config.load_params();
+ switch (config.load_type()) {
+ case POISSON:
+ random_dist.reset
+ (new ExpDist(load.poisson().offered_load()/num_threads));
+ break;
+ case UNIFORM:
+ random_dist.reset
+ (new UniformDist(load.uniform().interarrival_lo()*num_threads,
+ load.uniform().interarrival_hi()*num_threads));
+ break;
+ case DETERMINISTIC:
+ random_dist.reset
+ (new DetDist(num_threads/load.determ().offered_load()));
+ break;
+ case PARETO:
+ random_dist.reset
+ (new ParetoDist(load.pareto().interarrival_base()*num_threads,
+ load.pareto().alpha()));
+ break;
+ default:
+ GPR_ASSERT(false);
+ break;
+ }
+
+ interarrival_timer_.init(*random_dist, num_threads);
+ for (size_t i = 0; i<num_threads; i++) {
+ next_time_.push_back(std::chrono::high_resolution_clock::now()
+ + interarrival_timer_(i));
+ }
+ }
+ }
+ template<class Timepoint>
+ bool NextIssueTime(int thread_idx, Timepoint *time_delay) {
+ if (closed_loop_) {
+ return false;
+ }
+ else {
+ *time_delay = next_time_[thread_idx];
+ next_time_[thread_idx] += interarrival_timer_(thread_idx);
+ return true;
+ }
+ }
+
private:
class Thread {
public:
@@ -166,6 +221,11 @@
std::vector<std::unique_ptr<Thread>> threads_;
std::unique_ptr<Timer> timer_;
+
+ bool closed_loop_;
+ InterarrivalTimer interarrival_timer_;
+ std::vector<std::chrono::time_point
+ <std::chrono::high_resolution_clock>> next_time_;
};
std::unique_ptr<Client>
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc
index 609d003..8573ddb 100644
--- a/test/cpp/qps/client_sync.cc
+++ b/test/cpp/qps/client_sync.cc
@@ -66,70 +66,25 @@
class SynchronousClient : public Client {
public:
- SynchronousClient(const ClientConfig& config) : Client(config),
- interarrival_timer_() {
+ SynchronousClient(const ClientConfig& config) : Client(config) {
num_threads_ =
config.outstanding_rpcs_per_channel() * config.client_channels();
responses_.resize(num_threads_);
-
- // Now sort out the load test type
- if (config.load_type() == CLOSED_LOOP) {
- closed_loop_ = true;
- }
- else {
- closed_loop_ = false;
-
- std::unique_ptr<RandomDist> random_dist;
- auto& load = config.load_params();
- switch (config.load_type()) {
- case POISSON:
- random_dist.reset
- (new ExpDist(load.poisson().offered_load()/num_threads_));
- break;
- case UNIFORM:
- random_dist.reset
- (new UniformDist(load.uniform().interarrival_lo()*num_threads_,
- load.uniform().interarrival_hi()*num_threads_));
- break;
- case DETERMINISTIC:
- random_dist.reset
- (new DetDist(num_threads_/load.determ().offered_load()));
- break;
- case PARETO:
- random_dist.reset
- (new ParetoDist(load.pareto().interarrival_base()*num_threads_,
- load.pareto().alpha()));
- break;
- default:
- GPR_ASSERT(false);
- break;
- }
-
- interarrival_timer_.init(*random_dist, num_threads_);
- for (size_t i = 0; i<num_threads_; i++) {
- next_time_.push_back(std::chrono::high_resolution_clock::now()
- + interarrival_timer_(i));
- }
- }
+ SetupLoadTest(config, num_threads_);
}
virtual ~SynchronousClient() { EndThreads(); }
protected:
void WaitToIssue(int thread_idx) {
- if (!closed_loop_) {
- std::this_thread::sleep_until(next_time_[thread_idx]);
- next_time_[thread_idx] += interarrival_timer_(thread_idx);
+ std::chrono::time_point<std::chrono::high_resolution_clock> next_time;
+ if (NextIssueTime(thread_idx, &next_time)) {
+ std::this_thread::sleep_until(next_time);
}
}
size_t num_threads_;
std::vector<SimpleResponse> responses_;
- private:
- bool closed_loop_;
- InterarrivalTimer interarrival_timer_;
- std::vector<std::chrono::time_point
- <std::chrono::high_resolution_clock>> next_time_;
};
class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient {