blob: 7d75ed7d742abf269530e7e0a57637f29448257b [file] [log] [blame]
Craig Tiller26598a32015-03-02 16:16:00 -08001/*
2 *
3 * Copyright 2015, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#ifndef TEST_QPS_CLIENT_H
35#define TEST_QPS_CLIENT_H
36
Craig Tiller88568752015-03-04 10:50:43 -080037#include "test/cpp/qps/histogram.h"
vjpaic6aa60e2015-04-29 10:20:41 -070038#include "test/cpp/qps/interarrival.h"
Craig Tiller88568752015-03-04 10:50:43 -080039#include "test/cpp/qps/timer.h"
Nicolas "Pixel" Noble0caebbf2015-04-09 23:08:51 +020040#include "test/cpp/qps/qpstest.grpc.pb.h"
Craig Tiller26598a32015-03-02 16:16:00 -080041
Craig Tillere38ec212015-03-04 11:19:15 -080042#include <condition_variable>
43#include <mutex>
44
Craig Tiller26598a32015-03-02 16:16:00 -080045namespace grpc {
vjpai3beb20c2015-06-02 13:58:44 -070046
Vijay Pai9dc5c152015-06-03 11:34:53 -070047#if defined(__APPLE__)
48// Specialize Timepoint for high res clock as we need that
vjpai3beb20c2015-06-02 13:58:44 -070049template <>
50class TimePoint<std::chrono::high_resolution_clock::time_point> {
51 public:
52 TimePoint(const std::chrono::high_resolution_clock::time_point& time) {
Vijay Pai9dc5c152015-06-03 11:34:53 -070053 TimepointHR2Timespec(time, &time_);
vjpai3beb20c2015-06-02 13:58:44 -070054 }
55 gpr_timespec raw_time() const { return time_; }
56 private:
57 gpr_timespec time_;
58};
Vijay Pai9dc5c152015-06-03 11:34:53 -070059#endif
vjpai3beb20c2015-06-02 13:58:44 -070060
Craig Tiller26598a32015-03-02 16:16:00 -080061namespace testing {
62
vjpai3beb20c2015-06-02 13:58:44 -070063typedef std::chrono::high_resolution_clock grpc_time_source;
vjpai924d4592015-06-02 10:26:52 -070064typedef std::chrono::time_point<grpc_time_source> grpc_time;
65
Craig Tiller26598a32015-03-02 16:16:00 -080066class Client {
67 public:
vjpaic6aa60e2015-04-29 10:20:41 -070068 explicit Client(const ClientConfig& config) : timer_(new Timer),
69 interarrival_timer_() {
Craig Tiller88568752015-03-04 10:50:43 -080070 for (int i = 0; i < config.client_channels(); i++) {
71 channels_.push_back(ClientChannelInfo(
72 config.server_targets(i % config.server_targets_size()), config));
73 }
74 request_.set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
75 request_.set_response_size(config.payload_size());
76 }
Craig Tiller26598a32015-03-02 16:16:00 -080077 virtual ~Client() {}
Craig Tiller6af9ed02015-03-02 22:42:10 -080078
Craig Tiller88568752015-03-04 10:50:43 -080079 ClientStats Mark() {
80 Histogram latencies;
81 std::vector<Histogram> to_merge(threads_.size());
82 for (size_t i = 0; i < threads_.size(); i++) {
83 threads_[i]->BeginSwap(&to_merge[i]);
84 }
85 std::unique_ptr<Timer> timer(new Timer);
86 timer_.swap(timer);
87 for (size_t i = 0; i < threads_.size(); i++) {
88 threads_[i]->EndSwap();
89 latencies.Merge(&to_merge[i]);
90 }
91
92 auto timer_result = timer->Mark();
93
94 ClientStats stats;
95 latencies.FillProto(stats.mutable_latencies());
96 stats.set_time_elapsed(timer_result.wall);
97 stats.set_time_system(timer_result.system);
98 stats.set_time_user(timer_result.user);
99 return stats;
100 }
101
102 protected:
103 SimpleRequest request_;
vjpai37f72572015-05-12 10:29:07 -0700104 bool closed_loop_;
Craig Tiller88568752015-03-04 10:50:43 -0800105
106 class ClientChannelInfo {
107 public:
Craig Tillera182bf12015-03-04 13:54:39 -0800108 ClientChannelInfo(const grpc::string& target, const ClientConfig& config)
Craig Tiller88568752015-03-04 10:50:43 -0800109 : channel_(CreateTestChannel(target, config.enable_ssl())),
110 stub_(TestService::NewStub(channel_)) {}
111 ChannelInterface* get_channel() { return channel_.get(); }
112 TestService::Stub* get_stub() { return stub_.get(); }
113
114 private:
115 std::shared_ptr<ChannelInterface> channel_;
116 std::unique_ptr<TestService::Stub> stub_;
117 };
118 std::vector<ClientChannelInfo> channels_;
119
120 void StartThreads(size_t num_threads) {
Craig Tillera182bf12015-03-04 13:54:39 -0800121 for (size_t i = 0; i < num_threads; i++) {
122 threads_.emplace_back(new Thread(this, i));
123 }
Craig Tiller88568752015-03-04 10:50:43 -0800124 }
125
Craig Tillera182bf12015-03-04 13:54:39 -0800126 void EndThreads() { threads_.clear(); }
Craig Tiller88568752015-03-04 10:50:43 -0800127
Craig Tiller8a5a6662015-04-09 11:31:28 -0700128 virtual bool ThreadFunc(Histogram* histogram, size_t thread_idx) = 0;
Vijay Pai85594852015-06-03 11:01:25 -0700129
vjpaic6aa60e2015-04-29 10:20:41 -0700130 void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
131 // Set up the load distribution based on the number of threads
132 if (config.load_type() == CLOSED_LOOP) {
133 closed_loop_ = true;
134 }
135 else {
136 closed_loop_ = false;
Craig Tiller88568752015-03-04 10:50:43 -0800137
vjpaic6aa60e2015-04-29 10:20:41 -0700138 std::unique_ptr<RandomDist> random_dist;
139 auto& load = config.load_params();
140 switch (config.load_type()) {
141 case POISSON:
142 random_dist.reset
143 (new ExpDist(load.poisson().offered_load()/num_threads));
144 break;
145 case UNIFORM:
146 random_dist.reset
147 (new UniformDist(load.uniform().interarrival_lo()*num_threads,
148 load.uniform().interarrival_hi()*num_threads));
149 break;
150 case DETERMINISTIC:
151 random_dist.reset
152 (new DetDist(num_threads/load.determ().offered_load()));
153 break;
154 case PARETO:
155 random_dist.reset
156 (new ParetoDist(load.pareto().interarrival_base()*num_threads,
157 load.pareto().alpha()));
158 break;
159 default:
160 GPR_ASSERT(false);
161 break;
162 }
163
164 interarrival_timer_.init(*random_dist, num_threads);
165 for (size_t i = 0; i<num_threads; i++) {
vjpai924d4592015-06-02 10:26:52 -0700166 next_time_.push_back(grpc_time_source::now() +
167 std::chrono::duration_cast<grpc_time_source::duration>(interarrival_timer_(i)));
vjpaic6aa60e2015-04-29 10:20:41 -0700168 }
169 }
170 }
vjpai924d4592015-06-02 10:26:52 -0700171 bool NextIssueTime(int thread_idx, grpc_time *time_delay) {
vjpaic6aa60e2015-04-29 10:20:41 -0700172 if (closed_loop_) {
173 return false;
174 }
175 else {
176 *time_delay = next_time_[thread_idx];
vjpai924d4592015-06-02 10:26:52 -0700177 next_time_[thread_idx] += std::chrono::duration_cast<grpc_time_source::duration>(interarrival_timer_(thread_idx));
vjpaic6aa60e2015-04-29 10:20:41 -0700178 return true;
179 }
180 }
181
Craig Tiller88568752015-03-04 10:50:43 -0800182 private:
183 class Thread {
184 public:
185 Thread(Client* client, size_t idx)
186 : done_(false),
187 new_(nullptr),
188 impl_([this, idx, client]() {
Craig Tiller5c8737d2015-05-21 11:42:17 -0700189 for (;;) {
190 // run the loop body
191 bool thread_still_ok = client->ThreadFunc(&histogram_, idx);
192 // lock, see if we're done
193 std::lock_guard<std::mutex> g(mu_);
194 if (!thread_still_ok) {
195 gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
196 done_ = true;
Craig Tiller88568752015-03-04 10:50:43 -0800197 }
Craig Tiller5c8737d2015-05-21 11:42:17 -0700198 if (done_) {
199 return;
200 }
201 // check if we're marking, swap out the histogram if so
202 if (new_) {
203 new_->Swap(&histogram_);
204 new_ = nullptr;
205 cv_.notify_one();
206 }
207 }
208 }) {}
Craig Tiller88568752015-03-04 10:50:43 -0800209
210 ~Thread() {
211 {
212 std::lock_guard<std::mutex> g(mu_);
213 done_ = true;
214 }
215 impl_.join();
216 }
217
218 void BeginSwap(Histogram* n) {
219 std::lock_guard<std::mutex> g(mu_);
220 new_ = n;
221 }
222
223 void EndSwap() {
224 std::unique_lock<std::mutex> g(mu_);
225 cv_.wait(g, [this]() { return new_ == nullptr; });
226 }
227
228 private:
229 Thread(const Thread&);
230 Thread& operator=(const Thread&);
231
232 TestService::Stub* stub_;
233 ClientConfig config_;
234 std::mutex mu_;
235 std::condition_variable cv_;
236 bool done_;
237 Histogram* new_;
238 Histogram histogram_;
239 std::thread impl_;
240 };
241
242 std::vector<std::unique_ptr<Thread>> threads_;
243 std::unique_ptr<Timer> timer_;
vjpaic6aa60e2015-04-29 10:20:41 -0700244
vjpaic6aa60e2015-04-29 10:20:41 -0700245 InterarrivalTimer interarrival_timer_;
vjpai924d4592015-06-02 10:26:52 -0700246 std::vector<grpc_time> next_time_;
Craig Tiller26598a32015-03-02 16:16:00 -0800247};
248
Craig Tiller5c8737d2015-05-21 11:42:17 -0700249std::unique_ptr<Client> CreateSynchronousUnaryClient(const ClientConfig& args);
250std::unique_ptr<Client> CreateSynchronousStreamingClient(
251 const ClientConfig& args);
vjpai46f65232015-03-23 10:10:27 -0700252std::unique_ptr<Client> CreateAsyncUnaryClient(const ClientConfig& args);
253std::unique_ptr<Client> CreateAsyncStreamingClient(const ClientConfig& args);
Craig Tiller26598a32015-03-02 16:16:00 -0800254
255} // namespace testing
256} // namespace grpc
257
258#endif