blob: 620103b77dec87b662b8bc286230fde2b9572acc [file] [log] [blame]
Craig Tiller26598a32015-03-02 16:16:00 -08001/*
2 *
3 * Copyright 2015, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#ifndef TEST_QPS_CLIENT_H
35#define TEST_QPS_CLIENT_H
36
Craig Tiller88568752015-03-04 10:50:43 -080037#include "test/cpp/qps/histogram.h"
vjpaic6aa60e2015-04-29 10:20:41 -070038#include "test/cpp/qps/interarrival.h"
Craig Tiller88568752015-03-04 10:50:43 -080039#include "test/cpp/qps/timer.h"
Nicolas "Pixel" Noble0caebbf2015-04-09 23:08:51 +020040#include "test/cpp/qps/qpstest.grpc.pb.h"
Craig Tiller26598a32015-03-02 16:16:00 -080041
Craig Tillere38ec212015-03-04 11:19:15 -080042#include <condition_variable>
43#include <mutex>
44
Craig Tiller26598a32015-03-02 16:16:00 -080045namespace grpc {
46namespace testing {
47
48class Client {
49 public:
vjpaic6aa60e2015-04-29 10:20:41 -070050 explicit Client(const ClientConfig& config) : timer_(new Timer),
51 interarrival_timer_() {
Craig Tiller88568752015-03-04 10:50:43 -080052 for (int i = 0; i < config.client_channels(); i++) {
53 channels_.push_back(ClientChannelInfo(
54 config.server_targets(i % config.server_targets_size()), config));
55 }
56 request_.set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
57 request_.set_response_size(config.payload_size());
58 }
Craig Tiller26598a32015-03-02 16:16:00 -080059 virtual ~Client() {}
Craig Tiller6af9ed02015-03-02 22:42:10 -080060
Craig Tiller88568752015-03-04 10:50:43 -080061 ClientStats Mark() {
62 Histogram latencies;
63 std::vector<Histogram> to_merge(threads_.size());
64 for (size_t i = 0; i < threads_.size(); i++) {
65 threads_[i]->BeginSwap(&to_merge[i]);
66 }
67 std::unique_ptr<Timer> timer(new Timer);
68 timer_.swap(timer);
69 for (size_t i = 0; i < threads_.size(); i++) {
70 threads_[i]->EndSwap();
71 latencies.Merge(&to_merge[i]);
72 }
73
74 auto timer_result = timer->Mark();
75
76 ClientStats stats;
77 latencies.FillProto(stats.mutable_latencies());
78 stats.set_time_elapsed(timer_result.wall);
79 stats.set_time_system(timer_result.system);
80 stats.set_time_user(timer_result.user);
81 return stats;
82 }
83
84 protected:
85 SimpleRequest request_;
vjpai37f72572015-05-12 10:29:07 -070086 bool closed_loop_;
Craig Tiller88568752015-03-04 10:50:43 -080087
88 class ClientChannelInfo {
89 public:
Craig Tillera182bf12015-03-04 13:54:39 -080090 ClientChannelInfo(const grpc::string& target, const ClientConfig& config)
Craig Tiller88568752015-03-04 10:50:43 -080091 : channel_(CreateTestChannel(target, config.enable_ssl())),
92 stub_(TestService::NewStub(channel_)) {}
93 ChannelInterface* get_channel() { return channel_.get(); }
94 TestService::Stub* get_stub() { return stub_.get(); }
95
96 private:
97 std::shared_ptr<ChannelInterface> channel_;
98 std::unique_ptr<TestService::Stub> stub_;
99 };
100 std::vector<ClientChannelInfo> channels_;
101
102 void StartThreads(size_t num_threads) {
Craig Tillera182bf12015-03-04 13:54:39 -0800103 for (size_t i = 0; i < num_threads; i++) {
104 threads_.emplace_back(new Thread(this, i));
105 }
Craig Tiller88568752015-03-04 10:50:43 -0800106 }
107
Craig Tillera182bf12015-03-04 13:54:39 -0800108 void EndThreads() { threads_.clear(); }
Craig Tiller88568752015-03-04 10:50:43 -0800109
Craig Tiller8a5a6662015-04-09 11:31:28 -0700110 virtual bool ThreadFunc(Histogram* histogram, size_t thread_idx) = 0;
vjpaic6aa60e2015-04-29 10:20:41 -0700111
112 void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
113 // Set up the load distribution based on the number of threads
114 if (config.load_type() == CLOSED_LOOP) {
115 closed_loop_ = true;
116 }
117 else {
118 closed_loop_ = false;
Craig Tiller88568752015-03-04 10:50:43 -0800119
vjpaic6aa60e2015-04-29 10:20:41 -0700120 std::unique_ptr<RandomDist> random_dist;
121 auto& load = config.load_params();
122 switch (config.load_type()) {
123 case POISSON:
124 random_dist.reset
125 (new ExpDist(load.poisson().offered_load()/num_threads));
126 break;
127 case UNIFORM:
128 random_dist.reset
129 (new UniformDist(load.uniform().interarrival_lo()*num_threads,
130 load.uniform().interarrival_hi()*num_threads));
131 break;
132 case DETERMINISTIC:
133 random_dist.reset
134 (new DetDist(num_threads/load.determ().offered_load()));
135 break;
136 case PARETO:
137 random_dist.reset
138 (new ParetoDist(load.pareto().interarrival_base()*num_threads,
139 load.pareto().alpha()));
140 break;
141 default:
142 GPR_ASSERT(false);
143 break;
144 }
145
146 interarrival_timer_.init(*random_dist, num_threads);
147 for (size_t i = 0; i<num_threads; i++) {
148 next_time_.push_back(std::chrono::high_resolution_clock::now()
149 + interarrival_timer_(i));
150 }
151 }
152 }
153 template<class Timepoint>
154 bool NextIssueTime(int thread_idx, Timepoint *time_delay) {
155 if (closed_loop_) {
156 return false;
157 }
158 else {
159 *time_delay = next_time_[thread_idx];
160 next_time_[thread_idx] += interarrival_timer_(thread_idx);
161 return true;
162 }
163 }
164
Craig Tiller88568752015-03-04 10:50:43 -0800165 private:
166 class Thread {
167 public:
168 Thread(Client* client, size_t idx)
169 : done_(false),
170 new_(nullptr),
171 impl_([this, idx, client]() {
Craig Tiller8a5a6662015-04-09 11:31:28 -0700172 for (;;) {
173 // run the loop body
174 bool thread_still_ok = client->ThreadFunc(&histogram_, idx);
175 // lock, see if we're done
176 std::lock_guard<std::mutex> g(mu_);
177 if (!thread_still_ok) {
178 gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
179 done_ = true;
180 }
181 if (done_) {return;}
182 // check if we're marking, swap out the histogram if so
183 if (new_) {
184 new_->Swap(&histogram_);
185 new_ = nullptr;
186 cv_.notify_one();
187 }
Craig Tiller88568752015-03-04 10:50:43 -0800188 }
Craig Tiller8a5a6662015-04-09 11:31:28 -0700189 }) {}
Craig Tiller88568752015-03-04 10:50:43 -0800190
191 ~Thread() {
192 {
193 std::lock_guard<std::mutex> g(mu_);
194 done_ = true;
195 }
196 impl_.join();
197 }
198
199 void BeginSwap(Histogram* n) {
200 std::lock_guard<std::mutex> g(mu_);
201 new_ = n;
202 }
203
204 void EndSwap() {
205 std::unique_lock<std::mutex> g(mu_);
206 cv_.wait(g, [this]() { return new_ == nullptr; });
207 }
208
209 private:
210 Thread(const Thread&);
211 Thread& operator=(const Thread&);
212
213 TestService::Stub* stub_;
214 ClientConfig config_;
215 std::mutex mu_;
216 std::condition_variable cv_;
217 bool done_;
218 Histogram* new_;
219 Histogram histogram_;
220 std::thread impl_;
221 };
222
223 std::vector<std::unique_ptr<Thread>> threads_;
224 std::unique_ptr<Timer> timer_;
vjpaic6aa60e2015-04-29 10:20:41 -0700225
vjpaic6aa60e2015-04-29 10:20:41 -0700226 InterarrivalTimer interarrival_timer_;
227 std::vector<std::chrono::time_point
228 <std::chrono::high_resolution_clock>> next_time_;
Craig Tiller26598a32015-03-02 16:16:00 -0800229};
230
vjpai46f65232015-03-23 10:10:27 -0700231std::unique_ptr<Client>
232 CreateSynchronousUnaryClient(const ClientConfig& args);
233std::unique_ptr<Client>
234 CreateSynchronousStreamingClient(const ClientConfig& args);
235std::unique_ptr<Client> CreateAsyncUnaryClient(const ClientConfig& args);
236std::unique_ptr<Client> CreateAsyncStreamingClient(const ClientConfig& args);
Craig Tiller26598a32015-03-02 16:16:00 -0800237
238} // namespace testing
239} // namespace grpc
240
241#endif