blob: 5cd99950d83a8f11c05bf21f7bb0f7da26cc5d83 [file] [log] [blame]
Craig Tiller26598a32015-03-02 16:16:00 -08001/*
2 *
3 * Copyright 2015, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#ifndef TEST_QPS_CLIENT_H
35#define TEST_QPS_CLIENT_H
36
Craig Tiller88568752015-03-04 10:50:43 -080037#include "test/cpp/qps/histogram.h"
Vijay Pai372fd872015-06-08 13:30:08 -070038#include "test/cpp/qps/interarrival.h"
Craig Tiller88568752015-03-04 10:50:43 -080039#include "test/cpp/qps/timer.h"
Nicolas "Pixel" Noble0caebbf2015-04-09 23:08:51 +020040#include "test/cpp/qps/qpstest.grpc.pb.h"
Craig Tiller26598a32015-03-02 16:16:00 -080041
Craig Tillere38ec212015-03-04 11:19:15 -080042#include <condition_variable>
43#include <mutex>
vjpaia9e08302015-07-31 07:55:06 -070044#include <grpc++/config.h>
45#include <grpc++/config.h>
Craig Tillere38ec212015-03-04 11:19:15 -080046
Craig Tiller26598a32015-03-02 16:16:00 -080047namespace grpc {
Vijay Pai372fd872015-06-08 13:30:08 -070048
49#if defined(__APPLE__)
50// Specialize Timepoint for high res clock as we need that
51template <>
52class TimePoint<std::chrono::high_resolution_clock::time_point> {
53 public:
54 TimePoint(const std::chrono::high_resolution_clock::time_point& time) {
55 TimepointHR2Timespec(time, &time_);
56 }
57 gpr_timespec raw_time() const { return time_; }
58
59 private:
60 gpr_timespec time_;
61};
62#endif
63
Craig Tiller26598a32015-03-02 16:16:00 -080064namespace testing {
65
Vijay Pai372fd872015-06-08 13:30:08 -070066typedef std::chrono::high_resolution_clock grpc_time_source;
67typedef std::chrono::time_point<grpc_time_source> grpc_time;
68
Craig Tiller26598a32015-03-02 16:16:00 -080069class Client {
70 public:
Vijay Pai372fd872015-06-08 13:30:08 -070071 explicit Client(const ClientConfig& config)
72 : timer_(new Timer), interarrival_timer_() {
Craig Tiller88568752015-03-04 10:50:43 -080073 for (int i = 0; i < config.client_channels(); i++) {
vjpaie88bb072015-07-31 08:39:31 -070074 channels_.emplace_back(config.server_targets(i % config.server_targets_size()), config);
Craig Tiller88568752015-03-04 10:50:43 -080075 }
76 request_.set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
77 request_.set_response_size(config.payload_size());
78 }
Craig Tiller26598a32015-03-02 16:16:00 -080079 virtual ~Client() {}
Craig Tiller6af9ed02015-03-02 22:42:10 -080080
Craig Tiller88568752015-03-04 10:50:43 -080081 ClientStats Mark() {
82 Histogram latencies;
83 std::vector<Histogram> to_merge(threads_.size());
84 for (size_t i = 0; i < threads_.size(); i++) {
85 threads_[i]->BeginSwap(&to_merge[i]);
86 }
87 std::unique_ptr<Timer> timer(new Timer);
88 timer_.swap(timer);
89 for (size_t i = 0; i < threads_.size(); i++) {
90 threads_[i]->EndSwap();
91 latencies.Merge(&to_merge[i]);
92 }
93
94 auto timer_result = timer->Mark();
95
96 ClientStats stats;
97 latencies.FillProto(stats.mutable_latencies());
98 stats.set_time_elapsed(timer_result.wall);
99 stats.set_time_system(timer_result.system);
100 stats.set_time_user(timer_result.user);
101 return stats;
102 }
103
104 protected:
105 SimpleRequest request_;
Vijay Pai372fd872015-06-08 13:30:08 -0700106 bool closed_loop_;
Craig Tiller88568752015-03-04 10:50:43 -0800107
108 class ClientChannelInfo {
109 public:
Craig Tillera182bf12015-03-04 13:54:39 -0800110 ClientChannelInfo(const grpc::string& target, const ClientConfig& config)
Craig Tiller88568752015-03-04 10:50:43 -0800111 : channel_(CreateTestChannel(target, config.enable_ssl())),
112 stub_(TestService::NewStub(channel_)) {}
113 ChannelInterface* get_channel() { return channel_.get(); }
114 TestService::Stub* get_stub() { return stub_.get(); }
115
116 private:
117 std::shared_ptr<ChannelInterface> channel_;
118 std::unique_ptr<TestService::Stub> stub_;
119 };
120 std::vector<ClientChannelInfo> channels_;
121
122 void StartThreads(size_t num_threads) {
Craig Tillera182bf12015-03-04 13:54:39 -0800123 for (size_t i = 0; i < num_threads; i++) {
124 threads_.emplace_back(new Thread(this, i));
125 }
Craig Tiller88568752015-03-04 10:50:43 -0800126 }
127
Craig Tillera182bf12015-03-04 13:54:39 -0800128 void EndThreads() { threads_.clear(); }
Craig Tiller88568752015-03-04 10:50:43 -0800129
Craig Tiller8a5a6662015-04-09 11:31:28 -0700130 virtual bool ThreadFunc(Histogram* histogram, size_t thread_idx) = 0;
Vijay Pai85594852015-06-03 11:01:25 -0700131
Vijay Pai372fd872015-06-08 13:30:08 -0700132 void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
133 // Set up the load distribution based on the number of threads
134 if (config.load_type() == CLOSED_LOOP) {
135 closed_loop_ = true;
136 } else {
137 closed_loop_ = false;
138
139 std::unique_ptr<RandomDist> random_dist;
140 const auto& load = config.load_params();
141 switch (config.load_type()) {
142 case POISSON:
143 random_dist.reset(
144 new ExpDist(load.poisson().offered_load() / num_threads));
145 break;
146 case UNIFORM:
147 random_dist.reset(
148 new UniformDist(load.uniform().interarrival_lo() * num_threads,
149 load.uniform().interarrival_hi() * num_threads));
150 break;
151 case DETERMINISTIC:
152 random_dist.reset(
153 new DetDist(num_threads / load.determ().offered_load()));
154 break;
155 case PARETO:
156 random_dist.reset(
157 new ParetoDist(load.pareto().interarrival_base() * num_threads,
158 load.pareto().alpha()));
159 break;
160 default:
161 GPR_ASSERT(false);
162 break;
163 }
164
165 interarrival_timer_.init(*random_dist, num_threads);
166 for (size_t i = 0; i < num_threads; i++) {
167 next_time_.push_back(
168 grpc_time_source::now() +
169 std::chrono::duration_cast<grpc_time_source::duration>(
170 interarrival_timer_(i)));
171 }
172 }
173 }
174
175 bool NextIssueTime(int thread_idx, grpc_time* time_delay) {
176 if (closed_loop_) {
177 return false;
178 } else {
179 *time_delay = next_time_[thread_idx];
180 next_time_[thread_idx] +=
181 std::chrono::duration_cast<grpc_time_source::duration>(
182 interarrival_timer_(thread_idx));
183 return true;
184 }
185 }
186
Craig Tiller88568752015-03-04 10:50:43 -0800187 private:
188 class Thread {
189 public:
190 Thread(Client* client, size_t idx)
vjpaia9e08302015-07-31 07:55:06 -0700191 : done_(false), new_(nullptr), client_(client), idx_(idx),
192 impl_(&Thread::ThreadFunc, this) {}
Craig Tiller88568752015-03-04 10:50:43 -0800193
194 ~Thread() {
195 {
196 std::lock_guard<std::mutex> g(mu_);
197 done_ = true;
198 }
199 impl_.join();
200 }
201
202 void BeginSwap(Histogram* n) {
203 std::lock_guard<std::mutex> g(mu_);
204 new_ = n;
205 }
206
207 void EndSwap() {
208 std::unique_lock<std::mutex> g(mu_);
209 cv_.wait(g, [this]() { return new_ == nullptr; });
210 }
211
212 private:
213 Thread(const Thread&);
214 Thread& operator=(const Thread&);
215
vjpaia9e08302015-07-31 07:55:06 -0700216 void ThreadFunc() {
217 for (;;) {
218 // run the loop body
219 bool thread_still_ok = client_->ThreadFunc(&histogram_, idx_);
220 // lock, see if we're done
221 std::lock_guard<std::mutex> g(mu_);
222 if (!thread_still_ok) {
223 gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
224 done_ = true;
225 }
226 if (done_) {
227 return;
228 }
229 // check if we're marking, swap out the histogram if so
230 if (new_) {
231 new_->Swap(&histogram_);
232 new_ = nullptr;
233 cv_.notify_one();
234 }
235 }
236 }
237
Craig Tiller88568752015-03-04 10:50:43 -0800238 TestService::Stub* stub_;
239 ClientConfig config_;
240 std::mutex mu_;
241 std::condition_variable cv_;
242 bool done_;
243 Histogram* new_;
244 Histogram histogram_;
vjpaia9e08302015-07-31 07:55:06 -0700245 Client *client_;
246 size_t idx_;
Craig Tiller88568752015-03-04 10:50:43 -0800247 std::thread impl_;
248 };
249
250 std::vector<std::unique_ptr<Thread>> threads_;
251 std::unique_ptr<Timer> timer_;
Vijay Pai372fd872015-06-08 13:30:08 -0700252
253 InterarrivalTimer interarrival_timer_;
254 std::vector<grpc_time> next_time_;
Craig Tiller26598a32015-03-02 16:16:00 -0800255};
256
Craig Tiller5c8737d2015-05-21 11:42:17 -0700257std::unique_ptr<Client> CreateSynchronousUnaryClient(const ClientConfig& args);
258std::unique_ptr<Client> CreateSynchronousStreamingClient(
259 const ClientConfig& args);
vjpai46f65232015-03-23 10:10:27 -0700260std::unique_ptr<Client> CreateAsyncUnaryClient(const ClientConfig& args);
261std::unique_ptr<Client> CreateAsyncStreamingClient(const ClientConfig& args);
Craig Tiller26598a32015-03-02 16:16:00 -0800262
263} // namespace testing
264} // namespace grpc
265
266#endif