blob: 5395d02e32f4c1a26a866e2b7179b97a43f937ce [file] [log] [blame]
Craig Tiller26598a32015-03-02 16:16:00 -08001/*
2 *
3 * Copyright 2015, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#ifndef TEST_QPS_CLIENT_H
35#define TEST_QPS_CLIENT_H
36
Craig Tiller88568752015-03-04 10:50:43 -080037#include "test/cpp/qps/histogram.h"
Vijay Pai372fd872015-06-08 13:30:08 -070038#include "test/cpp/qps/interarrival.h"
Craig Tiller88568752015-03-04 10:50:43 -080039#include "test/cpp/qps/timer.h"
Nicolas "Pixel" Noble0caebbf2015-04-09 23:08:51 +020040#include "test/cpp/qps/qpstest.grpc.pb.h"
Craig Tiller26598a32015-03-02 16:16:00 -080041
Craig Tillere38ec212015-03-04 11:19:15 -080042#include <condition_variable>
43#include <mutex>
vjpaia9e08302015-07-31 07:55:06 -070044#include <grpc++/config.h>
Craig Tillere38ec212015-03-04 11:19:15 -080045
Craig Tiller26598a32015-03-02 16:16:00 -080046namespace grpc {
Vijay Pai372fd872015-06-08 13:30:08 -070047
48#if defined(__APPLE__)
49// Specialize Timepoint for high res clock as we need that
50template <>
51class TimePoint<std::chrono::high_resolution_clock::time_point> {
52 public:
53 TimePoint(const std::chrono::high_resolution_clock::time_point& time) {
54 TimepointHR2Timespec(time, &time_);
55 }
56 gpr_timespec raw_time() const { return time_; }
57
58 private:
59 gpr_timespec time_;
60};
61#endif
62
Craig Tiller26598a32015-03-02 16:16:00 -080063namespace testing {
64
Vijay Pai372fd872015-06-08 13:30:08 -070065typedef std::chrono::high_resolution_clock grpc_time_source;
66typedef std::chrono::time_point<grpc_time_source> grpc_time;
67
Craig Tiller26598a32015-03-02 16:16:00 -080068class Client {
69 public:
Vijay Pai372fd872015-06-08 13:30:08 -070070 explicit Client(const ClientConfig& config)
Vijay Pai90e73692015-08-05 19:15:36 -070071 : channels_(config.client_channels()),
72 timer_(new Timer),
73 interarrival_timer_() {
Craig Tiller88568752015-03-04 10:50:43 -080074 for (int i = 0; i < config.client_channels(); i++) {
Vijay Paieed63fa2015-08-05 23:08:34 +000075 channels_[i].init(config.server_targets(i % config.server_targets_size()),
Vijay Pai90e73692015-08-05 19:15:36 -070076 config);
Craig Tiller88568752015-03-04 10:50:43 -080077 }
78 request_.set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
79 request_.set_response_size(config.payload_size());
80 }
Craig Tiller26598a32015-03-02 16:16:00 -080081 virtual ~Client() {}
Craig Tiller6af9ed02015-03-02 22:42:10 -080082
Craig Tiller88568752015-03-04 10:50:43 -080083 ClientStats Mark() {
84 Histogram latencies;
vjpaib1db8692015-08-11 22:41:02 -070085 // avoid std::vector for old compilers that expect a copy constructor
Vijay Pai3ae11042015-08-11 22:43:14 -070086 Histogram* to_merge = new Histogram[threads_.size()];
Craig Tiller88568752015-03-04 10:50:43 -080087 for (size_t i = 0; i < threads_.size(); i++) {
88 threads_[i]->BeginSwap(&to_merge[i]);
89 }
90 std::unique_ptr<Timer> timer(new Timer);
91 timer_.swap(timer);
92 for (size_t i = 0; i < threads_.size(); i++) {
93 threads_[i]->EndSwap();
94 latencies.Merge(&to_merge[i]);
95 }
Vijay Paiad3e00c2015-08-07 17:21:08 +000096 delete[] to_merge;
Craig Tiller88568752015-03-04 10:50:43 -080097
98 auto timer_result = timer->Mark();
99
100 ClientStats stats;
101 latencies.FillProto(stats.mutable_latencies());
102 stats.set_time_elapsed(timer_result.wall);
103 stats.set_time_system(timer_result.system);
104 stats.set_time_user(timer_result.user);
105 return stats;
106 }
107
108 protected:
109 SimpleRequest request_;
Vijay Pai372fd872015-06-08 13:30:08 -0700110 bool closed_loop_;
Craig Tiller88568752015-03-04 10:50:43 -0800111
112 class ClientChannelInfo {
113 public:
Vijay Paieed63fa2015-08-05 23:08:34 +0000114 ClientChannelInfo() {}
vjpaib1db8692015-08-11 22:41:02 -0700115 ClientChannelInfo(const ClientChannelInfo& i) {
Vijay Pai90e73692015-08-05 19:15:36 -0700116 // The copy constructor is to satisfy old compilers
117 // that need it for using std::vector . It is only ever
118 // used for empty entries
Vijay Paieed63fa2015-08-05 23:08:34 +0000119 GPR_ASSERT(!i.channel_ && !i.stub_);
120 }
121 void init(const grpc::string& target, const ClientConfig& config) {
Vijay Pai90e73692015-08-05 19:15:36 -0700122 // We have to use a 2-phase init like this with a default
123 // constructor followed by an initializer function to make
124 // old compilers happy with using this in std::vector
Vijay Paieed63fa2015-08-05 23:08:34 +0000125 channel_ = CreateTestChannel(target, config.enable_ssl());
126 stub_ = TestService::NewStub(channel_);
127 }
yang-g8c2be9f2015-08-19 16:28:09 -0700128 Channel* get_channel() { return channel_.get(); }
Craig Tiller88568752015-03-04 10:50:43 -0800129 TestService::Stub* get_stub() { return stub_.get(); }
Vijay Pai90e73692015-08-05 19:15:36 -0700130
Craig Tiller88568752015-03-04 10:50:43 -0800131 private:
yang-g8c2be9f2015-08-19 16:28:09 -0700132 std::shared_ptr<Channel> channel_;
Craig Tiller88568752015-03-04 10:50:43 -0800133 std::unique_ptr<TestService::Stub> stub_;
134 };
135 std::vector<ClientChannelInfo> channels_;
136
137 void StartThreads(size_t num_threads) {
Craig Tillera182bf12015-03-04 13:54:39 -0800138 for (size_t i = 0; i < num_threads; i++) {
139 threads_.emplace_back(new Thread(this, i));
140 }
Craig Tiller88568752015-03-04 10:50:43 -0800141 }
142
Craig Tillera182bf12015-03-04 13:54:39 -0800143 void EndThreads() { threads_.clear(); }
Craig Tiller88568752015-03-04 10:50:43 -0800144
Craig Tiller8a5a6662015-04-09 11:31:28 -0700145 virtual bool ThreadFunc(Histogram* histogram, size_t thread_idx) = 0;
Vijay Pai85594852015-06-03 11:01:25 -0700146
Vijay Pai372fd872015-06-08 13:30:08 -0700147 void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
148 // Set up the load distribution based on the number of threads
149 if (config.load_type() == CLOSED_LOOP) {
150 closed_loop_ = true;
151 } else {
152 closed_loop_ = false;
153
154 std::unique_ptr<RandomDist> random_dist;
155 const auto& load = config.load_params();
156 switch (config.load_type()) {
157 case POISSON:
158 random_dist.reset(
159 new ExpDist(load.poisson().offered_load() / num_threads));
160 break;
161 case UNIFORM:
162 random_dist.reset(
163 new UniformDist(load.uniform().interarrival_lo() * num_threads,
164 load.uniform().interarrival_hi() * num_threads));
165 break;
166 case DETERMINISTIC:
167 random_dist.reset(
168 new DetDist(num_threads / load.determ().offered_load()));
169 break;
170 case PARETO:
171 random_dist.reset(
172 new ParetoDist(load.pareto().interarrival_base() * num_threads,
173 load.pareto().alpha()));
174 break;
175 default:
176 GPR_ASSERT(false);
177 break;
178 }
179
180 interarrival_timer_.init(*random_dist, num_threads);
181 for (size_t i = 0; i < num_threads; i++) {
182 next_time_.push_back(
183 grpc_time_source::now() +
184 std::chrono::duration_cast<grpc_time_source::duration>(
185 interarrival_timer_(i)));
186 }
187 }
188 }
189
190 bool NextIssueTime(int thread_idx, grpc_time* time_delay) {
191 if (closed_loop_) {
192 return false;
193 } else {
194 *time_delay = next_time_[thread_idx];
195 next_time_[thread_idx] +=
196 std::chrono::duration_cast<grpc_time_source::duration>(
197 interarrival_timer_(thread_idx));
198 return true;
199 }
200 }
201
Craig Tiller88568752015-03-04 10:50:43 -0800202 private:
203 class Thread {
204 public:
205 Thread(Client* client, size_t idx)
Vijay Paiab1dba72015-07-31 09:09:09 -0700206 : done_(false),
207 new_(nullptr),
208 client_(client),
209 idx_(idx),
210 impl_(&Thread::ThreadFunc, this) {}
Craig Tiller88568752015-03-04 10:50:43 -0800211
212 ~Thread() {
213 {
214 std::lock_guard<std::mutex> g(mu_);
215 done_ = true;
216 }
217 impl_.join();
218 }
219
220 void BeginSwap(Histogram* n) {
221 std::lock_guard<std::mutex> g(mu_);
222 new_ = n;
223 }
224
225 void EndSwap() {
226 std::unique_lock<std::mutex> g(mu_);
Vijay Pai784005b2015-07-31 10:53:42 -0700227 while (new_ != nullptr) {
228 cv_.wait(g);
229 };
Craig Tiller88568752015-03-04 10:50:43 -0800230 }
231
232 private:
233 Thread(const Thread&);
234 Thread& operator=(const Thread&);
235
vjpaia9e08302015-07-31 07:55:06 -0700236 void ThreadFunc() {
237 for (;;) {
Vijay Paiab1dba72015-07-31 09:09:09 -0700238 // run the loop body
vjpaib1db8692015-08-11 22:41:02 -0700239 const bool thread_still_ok = client_->ThreadFunc(&histogram_, idx_);
Vijay Paiab1dba72015-07-31 09:09:09 -0700240 // lock, see if we're done
241 std::lock_guard<std::mutex> g(mu_);
242 if (!thread_still_ok) {
243 gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
244 done_ = true;
245 }
246 if (done_) {
247 return;
248 }
249 // check if we're marking, swap out the histogram if so
250 if (new_) {
251 new_->Swap(&histogram_);
252 new_ = nullptr;
253 cv_.notify_one();
254 }
vjpaia9e08302015-07-31 07:55:06 -0700255 }
256 }
257
Craig Tiller88568752015-03-04 10:50:43 -0800258 TestService::Stub* stub_;
259 ClientConfig config_;
260 std::mutex mu_;
261 std::condition_variable cv_;
262 bool done_;
263 Histogram* new_;
264 Histogram histogram_;
Vijay Paiab1dba72015-07-31 09:09:09 -0700265 Client* client_;
vjpaia9e08302015-07-31 07:55:06 -0700266 size_t idx_;
Craig Tiller88568752015-03-04 10:50:43 -0800267 std::thread impl_;
268 };
269
270 std::vector<std::unique_ptr<Thread>> threads_;
271 std::unique_ptr<Timer> timer_;
Vijay Pai372fd872015-06-08 13:30:08 -0700272
273 InterarrivalTimer interarrival_timer_;
274 std::vector<grpc_time> next_time_;
Craig Tiller26598a32015-03-02 16:16:00 -0800275};
276
Craig Tiller5c8737d2015-05-21 11:42:17 -0700277std::unique_ptr<Client> CreateSynchronousUnaryClient(const ClientConfig& args);
278std::unique_ptr<Client> CreateSynchronousStreamingClient(
279 const ClientConfig& args);
vjpai46f65232015-03-23 10:10:27 -0700280std::unique_ptr<Client> CreateAsyncUnaryClient(const ClientConfig& args);
281std::unique_ptr<Client> CreateAsyncStreamingClient(const ClientConfig& args);
Craig Tiller26598a32015-03-02 16:16:00 -0800282
283} // namespace testing
284} // namespace grpc
285
286#endif