Move interarrival timer to Client class so that it can be used for async tests
as well
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index 2dc5b38..45481a3 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -35,6 +35,7 @@
#define TEST_QPS_CLIENT_H
#include "test/cpp/qps/histogram.h"
+#include "test/cpp/qps/interarrival.h"
#include "test/cpp/qps/timer.h"
#include "test/cpp/qps/qpstest.grpc.pb.h"
@@ -46,7 +47,8 @@
class Client {
public:
- explicit Client(const ClientConfig& config) : timer_(new Timer) {
+ explicit Client(const ClientConfig& config) : timer_(new Timer),
+ interarrival_timer_() {
for (int i = 0; i < config.client_channels(); i++) {
channels_.push_back(ClientChannelInfo(
config.server_targets(i % config.server_targets_size()), config));
@@ -105,7 +107,60 @@
void EndThreads() { threads_.clear(); }
virtual bool ThreadFunc(Histogram* histogram, size_t thread_idx) = 0;
+
+ void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
+ // Set up the load distribution based on the number of threads
+ if (config.load_type() == CLOSED_LOOP) {
+ closed_loop_ = true;
+ }
+ else {
+ closed_loop_ = false;
+ std::unique_ptr<RandomDist> random_dist;
+ auto& load = config.load_params();
+ switch (config.load_type()) {
+ case POISSON:
+ random_dist.reset
+ (new ExpDist(load.poisson().offered_load()/num_threads));
+ break;
+ case UNIFORM:
+ random_dist.reset
+ (new UniformDist(load.uniform().interarrival_lo()*num_threads,
+ load.uniform().interarrival_hi()*num_threads));
+ break;
+ case DETERMINISTIC:
+ random_dist.reset
+ (new DetDist(num_threads/load.determ().offered_load()));
+ break;
+ case PARETO:
+ random_dist.reset
+ (new ParetoDist(load.pareto().interarrival_base()*num_threads,
+ load.pareto().alpha()));
+ break;
+ default:
+ GPR_ASSERT(false);
+ break;
+ }
+
+ interarrival_timer_.init(*random_dist, num_threads);
+ for (size_t i = 0; i<num_threads; i++) {
+ next_time_.push_back(std::chrono::high_resolution_clock::now()
+ + interarrival_timer_(i));
+ }
+ }
+ }
+ template<class Timepoint>
+ bool NextIssueTime(int thread_idx, Timepoint *time_delay) {
+ if (closed_loop_) {
+ return false;
+ }
+ else {
+ *time_delay = next_time_[thread_idx];
+ next_time_[thread_idx] += interarrival_timer_(thread_idx);
+ return true;
+ }
+ }
+
private:
class Thread {
public:
@@ -166,6 +221,11 @@
std::vector<std::unique_ptr<Thread>> threads_;
std::unique_ptr<Timer> timer_;
+
+ bool closed_loop_;
+ InterarrivalTimer interarrival_timer_;
+ std::vector<std::chrono::time_point
+ <std::chrono::high_resolution_clock>> next_time_;
};
std::unique_ptr<Client>