blob: 1b7a8d26b2c3c3e32ae0ce7c84026c7f353c3801 [file] [log] [blame]
vjpaidea740f2015-02-26 16:35:35 -08001/*
2 *
3 * Copyright 2015, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include <cassert>
Vijay Pai372fd872015-06-08 13:30:08 -070035#include <forward_list>
vjpaidea740f2015-02-26 16:35:35 -080036#include <functional>
Vijay Pai372fd872015-06-08 13:30:08 -070037#include <list>
vjpaidea740f2015-02-26 16:35:35 -080038#include <memory>
Vijay Pai372fd872015-06-08 13:30:08 -070039#include <mutex>
vjpaidea740f2015-02-26 16:35:35 -080040#include <string>
41#include <thread>
42#include <vector>
43#include <sstream>
44
45#include <grpc/grpc.h>
46#include <grpc/support/histogram.h>
47#include <grpc/support/log.h>
48#include <gflags/gflags.h>
49#include <grpc++/async_unary_call.h>
50#include <grpc++/client_context.h>
51#include <grpc++/status.h>
vjpai46f65232015-03-23 10:10:27 -070052#include <grpc++/stream.h>
vjpaidea740f2015-02-26 16:35:35 -080053#include "test/cpp/util/create_test_channel.h"
Nicolas "Pixel" Noble0caebbf2015-04-09 23:08:51 +020054#include "test/cpp/qps/qpstest.grpc.pb.h"
Craig Tiller88568752015-03-04 10:50:43 -080055#include "test/cpp/qps/timer.h"
56#include "test/cpp/qps/client.h"
vjpaidea740f2015-02-26 16:35:35 -080057
Craig Tiller88568752015-03-04 10:50:43 -080058namespace grpc {
59namespace testing {
vjpaidea740f2015-02-26 16:35:35 -080060
Vijay Pai372fd872015-06-08 13:30:08 -070061typedef std::list<grpc_time> deadline_list;
62
Vijay Pai64ac47f2015-02-26 17:59:51 -080063class ClientRpcContext {
64 public:
Yang Gaob6d57e72015-06-09 22:11:12 -070065 explicit ClientRpcContext(int ch) : channel_id_(ch) {}
Vijay Pai64ac47f2015-02-26 17:59:51 -080066 virtual ~ClientRpcContext() {}
vjpai46f65232015-03-23 10:10:27 -070067 // next state, return false if done. Collect stats when appropriate
Vijay Pai1da729a2015-03-23 12:52:56 -070068 virtual bool RunNextState(bool, Histogram* hist) = 0;
Vijay Pai372fd872015-06-08 13:30:08 -070069 virtual ClientRpcContext* StartNewClone() = 0;
Yang Gao6baa9b62015-03-17 10:49:39 -070070 static void* tag(ClientRpcContext* c) { return reinterpret_cast<void*>(c); }
71 static ClientRpcContext* detag(void* t) {
72 return reinterpret_cast<ClientRpcContext*>(t);
Vijay Pai64ac47f2015-02-26 17:59:51 -080073 }
Vijay Pai372fd872015-06-08 13:30:08 -070074
75 deadline_list::iterator deadline_posn() const { return deadline_posn_; }
76 void set_deadline_posn(const deadline_list::iterator& it) {
77 deadline_posn_ = it;
78 }
79 virtual void Start(CompletionQueue* cq) = 0;
80 int channel_id() const { return channel_id_; }
81
82 protected:
83 int channel_id_;
84
85 private:
86 deadline_list::iterator deadline_posn_;
Vijay Pai64ac47f2015-02-26 17:59:51 -080087};
Craig Tiller88568752015-03-04 10:50:43 -080088
Vijay Pai64ac47f2015-02-26 17:59:51 -080089template <class RequestType, class ResponseType>
90class ClientRpcContextUnaryImpl : public ClientRpcContext {
91 public:
Vijay Paicf3fb092015-06-05 03:41:30 -070092 ClientRpcContextUnaryImpl(
Vijay Pai372fd872015-06-08 13:30:08 -070093 int channel_id, TestService::Stub* stub, const RequestType& req,
vjpai4e1e1bc2015-02-27 23:47:12 -080094 std::function<
95 std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
Vijay Pai372fd872015-06-08 13:30:08 -070096 TestService::Stub*, grpc::ClientContext*, const RequestType&,
97 CompletionQueue*)> start_req,
Yang Gao6baa9b62015-03-17 10:49:39 -070098 std::function<void(grpc::Status, ResponseType*)> on_done)
Vijay Pai372fd872015-06-08 13:30:08 -070099 : ClientRpcContext(channel_id),
100 context_(),
Craig Tiller88568752015-03-04 10:50:43 -0800101 stub_(stub),
Vijay Pai64ac47f2015-02-26 17:59:51 -0800102 req_(req),
103 response_(),
Craig Tiller3676b382015-05-06 13:01:05 -0700104 next_state_(&ClientRpcContextUnaryImpl::RespDone),
Craig Tillera182bf12015-03-04 13:54:39 -0800105 callback_(on_done),
Vijay Pai372fd872015-06-08 13:30:08 -0700106 start_req_(start_req) {}
107 void Start(CompletionQueue* cq) GRPC_OVERRIDE {
108 start_ = Timer::Now();
109 response_reader_ = start_req_(stub_, &context_, req_, cq);
Craig Tiller3676b382015-05-06 13:01:05 -0700110 response_reader_->Finish(&response_, &status_, ClientRpcContext::tag(this));
111 }
vjpai3c110662015-02-27 07:17:16 -0800112 ~ClientRpcContextUnaryImpl() GRPC_OVERRIDE {}
Vijay Pai1da729a2015-03-23 12:52:56 -0700113 bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
vjpai46f65232015-03-23 10:10:27 -0700114 bool ret = (this->*next_state_)(ok);
115 if (!ret) {
116 hist->Add((Timer::Now() - start_) * 1e9);
117 }
118 return ret;
Vijay Pai64ac47f2015-02-26 17:59:51 -0800119 }
120
Vijay Pai372fd872015-06-08 13:30:08 -0700121 ClientRpcContext* StartNewClone() GRPC_OVERRIDE {
122 return new ClientRpcContextUnaryImpl(channel_id_, stub_, req_, start_req_,
123 callback_);
Craig Tilleref638392015-03-04 12:23:12 -0800124 }
125
Vijay Pai64ac47f2015-02-26 17:59:51 -0800126 private:
vjpai46f65232015-03-23 10:10:27 -0700127 bool RespDone(bool) {
Vijay Pai64ac47f2015-02-26 17:59:51 -0800128 next_state_ = &ClientRpcContextUnaryImpl::DoCallBack;
129 return false;
130 }
Vijay Paicf3fb092015-06-05 03:41:30 -0700131 bool DoCallBack(bool) {
Vijay Pai64ac47f2015-02-26 17:59:51 -0800132 callback_(status_, &response_);
Vijay Pai372fd872015-06-08 13:30:08 -0700133 return true; // we're done, this'll be ignored
Vijay Pai64ac47f2015-02-26 17:59:51 -0800134 }
135 grpc::ClientContext context_;
Yang Gao6baa9b62015-03-17 10:49:39 -0700136 TestService::Stub* stub_;
Vijay Pai64ac47f2015-02-26 17:59:51 -0800137 RequestType req_;
138 ResponseType response_;
vjpai46f65232015-03-23 10:10:27 -0700139 bool (ClientRpcContextUnaryImpl::*next_state_)(bool);
Yang Gao6baa9b62015-03-17 10:49:39 -0700140 std::function<void(grpc::Status, ResponseType*)> callback_;
Craig Tillera182bf12015-03-04 13:54:39 -0800141 std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
Vijay Pai372fd872015-06-08 13:30:08 -0700142 TestService::Stub*, grpc::ClientContext*, const RequestType&,
143 CompletionQueue*)> start_req_;
Vijay Pai64ac47f2015-02-26 17:59:51 -0800144 grpc::Status status_;
145 double start_;
146 std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>
147 response_reader_;
148};
vjpaidea740f2015-02-26 16:35:35 -0800149
Vijay Pai372fd872015-06-08 13:30:08 -0700150typedef std::forward_list<ClientRpcContext*> context_list;
151
Vijay Paie10ebf12015-04-30 13:12:31 -0700152class AsyncClient : public Client {
Craig Tiller88568752015-03-04 10:50:43 -0800153 public:
Vijay Pai372fd872015-06-08 13:30:08 -0700154 explicit AsyncClient(
155 const ClientConfig& config,
156 std::function<ClientRpcContext*(int, TestService::Stub*,
157 const SimpleRequest&)> setup_ctx)
158 : Client(config),
159 channel_lock_(config.client_channels()),
160 contexts_(config.client_channels()),
161 max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()),
162 channel_count_(config.client_channels()),
163 pref_channel_inc_(config.async_client_threads()) {
164 SetupLoadTest(config, config.async_client_threads());
165
Craig Tiller88568752015-03-04 10:50:43 -0800166 for (int i = 0; i < config.async_client_threads(); i++) {
167 cli_cqs_.emplace_back(new CompletionQueue);
Vijay Pai372fd872015-06-08 13:30:08 -0700168 if (!closed_loop_) {
169 rpc_deadlines_.emplace_back();
170 next_channel_.push_back(i % channel_count_);
171 issue_allowed_.push_back(true);
172
173 grpc_time next_issue;
174 NextIssueTime(i, &next_issue);
175 next_issue_.push_back(next_issue);
176 }
Craig Tiller88568752015-03-04 10:50:43 -0800177 }
Vijay Pai372fd872015-06-08 13:30:08 -0700178
vjpai6a608022015-05-18 09:16:53 -0700179 int t = 0;
180 for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
Vijay Pai372fd872015-06-08 13:30:08 -0700181 for (int ch = 0; ch < channel_count_; ch++) {
Vijay Pai7b172b22015-06-05 02:03:18 -0700182 auto* cq = cli_cqs_[t].get();
183 t = (t + 1) % cli_cqs_.size();
Vijay Pai372fd872015-06-08 13:30:08 -0700184 auto ctx = setup_ctx(ch, channels_[ch].get_stub(), request_);
185 if (closed_loop_) {
186 ctx->Start(cq);
187 } else {
188 contexts_[ch].push_front(ctx);
189 }
Craig Tiller88568752015-03-04 10:50:43 -0800190 }
191 }
Craig Tiller88568752015-03-04 10:50:43 -0800192 }
Vijay Paie10ebf12015-04-30 13:12:31 -0700193 virtual ~AsyncClient() {
Vijay Pai82dd80a2015-03-24 10:36:08 -0700194 for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
195 (*cq)->Shutdown();
Yang Gao6baa9b62015-03-17 10:49:39 -0700196 void* got_tag;
Craig Tilleref638392015-03-04 12:23:12 -0800197 bool ok;
Vijay Pai82dd80a2015-03-24 10:36:08 -0700198 while ((*cq)->Next(&got_tag, &ok)) {
Craig Tilleref638392015-03-04 12:23:12 -0800199 delete ClientRpcContext::detag(got_tag);
200 }
201 }
202 }
203
Craig Tiller5c8737d2015-05-21 11:42:17 -0700204 bool ThreadFunc(Histogram* histogram,
205 size_t thread_idx) GRPC_OVERRIDE GRPC_FINAL {
Yang Gao6baa9b62015-03-17 10:49:39 -0700206 void* got_tag;
Craig Tiller88568752015-03-04 10:50:43 -0800207 bool ok;
Vijay Pai372fd872015-06-08 13:30:08 -0700208 grpc_time deadline, short_deadline;
209 if (closed_loop_) {
210 deadline = grpc_time_source::now() + std::chrono::seconds(1);
211 short_deadline = deadline;
212 } else {
213 if (rpc_deadlines_[thread_idx].empty()) {
214 deadline = grpc_time_source::now() + std::chrono::seconds(1);
215 } else {
216 deadline = *(rpc_deadlines_[thread_idx].begin());
217 }
218 short_deadline =
219 issue_allowed_[thread_idx] ? next_issue_[thread_idx] : deadline;
220 }
221
222 bool got_event;
223
224 switch (cli_cqs_[thread_idx]->AsyncNext(&got_tag, &ok, short_deadline)) {
Vijay Paicf3fb092015-06-05 03:41:30 -0700225 case CompletionQueue::SHUTDOWN:
226 return false;
vjpai37f72572015-05-12 10:29:07 -0700227 case CompletionQueue::TIMEOUT:
Vijay Pai372fd872015-06-08 13:30:08 -0700228 got_event = false;
229 break;
vjpai37f72572015-05-12 10:29:07 -0700230 case CompletionQueue::GOT_EVENT:
Vijay Pai372fd872015-06-08 13:30:08 -0700231 got_event = true;
232 break;
233 default:
234 GPR_ASSERT(false);
Vijay Pai9dc5c152015-06-03 11:34:53 -0700235 break;
Craig Tiller1c61af72015-04-09 12:08:44 -0700236 }
Vijay Pai372fd872015-06-08 13:30:08 -0700237 if ((closed_loop_ || !rpc_deadlines_[thread_idx].empty()) &&
238 grpc_time_source::now() > deadline) {
Vijay Paid9c7e7e2015-06-08 13:25:52 -0700239 // we have missed some 1-second deadline, which is worth noting
240 gpr_log(GPR_INFO, "Missed an RPC deadline");
241 // Don't give up, as there might be some truly heavy tails
Vijay Pai8eedd4a2015-06-03 15:39:35 -0700242 }
Vijay Pai372fd872015-06-08 13:30:08 -0700243 if (got_event) {
244 ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
245 if (ctx->RunNextState(ok, histogram) == false) {
246 // call the callback and then clone the ctx
247 ctx->RunNextState(ok, histogram);
248 ClientRpcContext* clone_ctx = ctx->StartNewClone();
249 if (closed_loop_) {
250 clone_ctx->Start(cli_cqs_[thread_idx].get());
251 } else {
252 // Remove the entry from the rpc deadlines list
253 rpc_deadlines_[thread_idx].erase(ctx->deadline_posn());
254 // Put the clone_ctx in the list of idle contexts for this channel
255 // Under lock
256 int ch = clone_ctx->channel_id();
257 std::lock_guard<std::mutex> g(channel_lock_[ch]);
258 contexts_[ch].push_front(clone_ctx);
259 }
260 // delete the old version
261 delete ctx;
262 }
263 if (!closed_loop_)
264 issue_allowed_[thread_idx] =
265 true; // may be ok now even if it hadn't been
266 }
267 if (!closed_loop_ && issue_allowed_[thread_idx] &&
268 grpc_time_source::now() >= next_issue_[thread_idx]) {
269 // Attempt to issue
270 bool issued = false;
271 for (int num_attempts = 0, channel_attempt = next_channel_[thread_idx];
272 num_attempts < channel_count_ && !issued; num_attempts++) {
273 bool can_issue = false;
274 ClientRpcContext* ctx = nullptr;
275 {
276 std::lock_guard<std::mutex> g(channel_lock_[channel_attempt]);
277 if (!contexts_[channel_attempt].empty()) {
278 // Get an idle context from the front of the list
279 ctx = *(contexts_[channel_attempt].begin());
280 contexts_[channel_attempt].pop_front();
281 can_issue = true;
282 }
283 }
284 if (can_issue) {
285 // do the work to issue
286 rpc_deadlines_[thread_idx].emplace_back(grpc_time_source::now() +
287 std::chrono::seconds(1));
288 auto it = rpc_deadlines_[thread_idx].end();
289 --it;
290 ctx->set_deadline_posn(it);
291 ctx->Start(cli_cqs_[thread_idx].get());
292 issued = true;
293 // If we did issue, then next time, try our thread's next
294 // preferred channel
295 next_channel_[thread_idx] += pref_channel_inc_;
296 if (next_channel_[thread_idx] >= channel_count_)
297 next_channel_[thread_idx] = (thread_idx % channel_count_);
298 } else {
299 // Do a modular increment of channel attempt if we couldn't issue
300 channel_attempt = (channel_attempt + 1) % channel_count_;
301 }
302 }
303 if (issued) {
304 // We issued one; see when we can issue the next
305 grpc_time next_issue;
306 NextIssueTime(thread_idx, &next_issue);
307 next_issue_[thread_idx] = next_issue;
308 } else {
309 issue_allowed_[thread_idx] = false;
310 }
311 }
Vijay Paicf3fb092015-06-05 03:41:30 -0700312 return true;
Craig Tiller88568752015-03-04 10:50:43 -0800313 }
Craig Tiller5c8737d2015-05-21 11:42:17 -0700314
Vijay Paie10ebf12015-04-30 13:12:31 -0700315 private:
Craig Tiller88568752015-03-04 10:50:43 -0800316 std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
Vijay Pai372fd872015-06-08 13:30:08 -0700317
318 std::vector<deadline_list> rpc_deadlines_; // per thread deadlines
319 std::vector<int> next_channel_; // per thread round-robin channel ctr
320 std::vector<bool> issue_allowed_; // may this thread attempt to issue
321 std::vector<grpc_time> next_issue_; // when should it issue?
322
323 std::vector<std::mutex> channel_lock_;
324 std::vector<context_list> contexts_; // per-channel list of idle contexts
325 int max_outstanding_per_channel_;
326 int channel_count_;
327 int pref_channel_inc_;
Craig Tiller88568752015-03-04 10:50:43 -0800328};
329
Vijay Paie10ebf12015-04-30 13:12:31 -0700330class AsyncUnaryClient GRPC_FINAL : public AsyncClient {
331 public:
Craig Tiller5c8737d2015-05-21 11:42:17 -0700332 explicit AsyncUnaryClient(const ClientConfig& config)
333 : AsyncClient(config, SetupCtx) {
Vijay Paie10ebf12015-04-30 13:12:31 -0700334 StartThreads(config.async_client_threads());
335 }
336 ~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); }
Vijay Paicf3fb092015-06-05 03:41:30 -0700337
338 private:
Vijay Pai372fd872015-06-08 13:30:08 -0700339 static ClientRpcContext* SetupCtx(int channel_id, TestService::Stub* stub,
340 const SimpleRequest& req) {
Vijay Paie10ebf12015-04-30 13:12:31 -0700341 auto check_done = [](grpc::Status s, SimpleResponse* response) {};
Vijay Pai372fd872015-06-08 13:30:08 -0700342 auto start_req = [](TestService::Stub* stub, grpc::ClientContext* ctx,
343 const SimpleRequest& request, CompletionQueue* cq) {
Craig Tiller3676b382015-05-06 13:01:05 -0700344 return stub->AsyncUnaryCall(ctx, request, cq);
Vijay Paie10ebf12015-04-30 13:12:31 -0700345 };
Vijay Pai372fd872015-06-08 13:30:08 -0700346 return new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
347 channel_id, stub, req, start_req, check_done);
Vijay Paie10ebf12015-04-30 13:12:31 -0700348 }
349};
350
vjpai46f65232015-03-23 10:10:27 -0700351template <class RequestType, class ResponseType>
352class ClientRpcContextStreamingImpl : public ClientRpcContext {
353 public:
Vijay Paicf3fb092015-06-05 03:41:30 -0700354 ClientRpcContextStreamingImpl(
Vijay Pai372fd872015-06-08 13:30:08 -0700355 int channel_id, TestService::Stub* stub, const RequestType& req,
356 std::function<std::unique_ptr<grpc::ClientAsyncReaderWriter<
357 RequestType, ResponseType>>(TestService::Stub*, grpc::ClientContext*,
358 CompletionQueue*, void*)> start_req,
Craig Tiller5c8737d2015-05-21 11:42:17 -0700359 std::function<void(grpc::Status, ResponseType*)> on_done)
Vijay Pai372fd872015-06-08 13:30:08 -0700360 : ClientRpcContext(channel_id),
361 context_(),
vjpai46f65232015-03-23 10:10:27 -0700362 stub_(stub),
363 req_(req),
364 response_(),
365 next_state_(&ClientRpcContextStreamingImpl::ReqSent),
366 callback_(on_done),
367 start_req_(start_req),
Vijay Pai372fd872015-06-08 13:30:08 -0700368 start_(Timer::Now()) {}
vjpai46f65232015-03-23 10:10:27 -0700369 ~ClientRpcContextStreamingImpl() GRPC_OVERRIDE {}
Craig Tiller5c8737d2015-05-21 11:42:17 -0700370 bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
vjpai46f65232015-03-23 10:10:27 -0700371 return (this->*next_state_)(ok, hist);
372 }
Vijay Pai372fd872015-06-08 13:30:08 -0700373 ClientRpcContext* StartNewClone() GRPC_OVERRIDE {
374 return new ClientRpcContextStreamingImpl(channel_id_, stub_, req_,
375 start_req_, callback_);
376 }
377 void Start(CompletionQueue* cq) GRPC_OVERRIDE {
378 stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this));
Vijay Pai7b172b22015-06-05 02:03:18 -0700379 }
Vijay Paicf3fb092015-06-05 03:41:30 -0700380
vjpai46f65232015-03-23 10:10:27 -0700381 private:
Craig Tiller5c8737d2015-05-21 11:42:17 -0700382 bool ReqSent(bool ok, Histogram*) { return StartWrite(ok); }
vjpai46f65232015-03-23 10:10:27 -0700383 bool StartWrite(bool ok) {
384 if (!ok) {
Craig Tiller5c8737d2015-05-21 11:42:17 -0700385 return (false);
vjpai46f65232015-03-23 10:10:27 -0700386 }
387 start_ = Timer::Now();
388 next_state_ = &ClientRpcContextStreamingImpl::WriteDone;
389 stream_->Write(req_, ClientRpcContext::tag(this));
390 return true;
391 }
Craig Tiller5c8737d2015-05-21 11:42:17 -0700392 bool WriteDone(bool ok, Histogram*) {
vjpai46f65232015-03-23 10:10:27 -0700393 if (!ok) {
Craig Tiller5c8737d2015-05-21 11:42:17 -0700394 return (false);
vjpai46f65232015-03-23 10:10:27 -0700395 }
396 next_state_ = &ClientRpcContextStreamingImpl::ReadDone;
Vijay Paie10ebf12015-04-30 13:12:31 -0700397 stream_->Read(&response_, ClientRpcContext::tag(this));
vjpai46f65232015-03-23 10:10:27 -0700398 return true;
399 }
Craig Tiller5c8737d2015-05-21 11:42:17 -0700400 bool ReadDone(bool ok, Histogram* hist) {
vjpai46f65232015-03-23 10:10:27 -0700401 hist->Add((Timer::Now() - start_) * 1e9);
402 return StartWrite(ok);
403 }
404 grpc::ClientContext context_;
Craig Tiller5c8737d2015-05-21 11:42:17 -0700405 TestService::Stub* stub_;
vjpai46f65232015-03-23 10:10:27 -0700406 RequestType req_;
407 ResponseType response_;
Craig Tiller5c8737d2015-05-21 11:42:17 -0700408 bool (ClientRpcContextStreamingImpl::*next_state_)(bool, Histogram*);
409 std::function<void(grpc::Status, ResponseType*)> callback_;
410 std::function<
411 std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
Vijay Pai372fd872015-06-08 13:30:08 -0700412 TestService::Stub*, grpc::ClientContext*, CompletionQueue*, void*)>
413 start_req_;
vjpai46f65232015-03-23 10:10:27 -0700414 grpc::Status status_;
415 double start_;
Craig Tiller5c8737d2015-05-21 11:42:17 -0700416 std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>
417 stream_;
vjpai46f65232015-03-23 10:10:27 -0700418};
419
Vijay Paie10ebf12015-04-30 13:12:31 -0700420class AsyncStreamingClient GRPC_FINAL : public AsyncClient {
vjpai46f65232015-03-23 10:10:27 -0700421 public:
Craig Tiller5c8737d2015-05-21 11:42:17 -0700422 explicit AsyncStreamingClient(const ClientConfig& config)
423 : AsyncClient(config, SetupCtx) {
Vijay Pai372fd872015-06-08 13:30:08 -0700424 // async streaming currently only supported closed loop
425 GPR_ASSERT(config.load_type() == CLOSED_LOOP);
426
vjpai46f65232015-03-23 10:10:27 -0700427 StartThreads(config.async_client_threads());
428 }
429
Vijay Paie10ebf12015-04-30 13:12:31 -0700430 ~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); }
Vijay Paicf3fb092015-06-05 03:41:30 -0700431
432 private:
Vijay Pai372fd872015-06-08 13:30:08 -0700433 static ClientRpcContext* SetupCtx(int channel_id, TestService::Stub* stub,
434 const SimpleRequest& req) {
Vijay Paie10ebf12015-04-30 13:12:31 -0700435 auto check_done = [](grpc::Status s, SimpleResponse* response) {};
Vijay Pai372fd872015-06-08 13:30:08 -0700436 auto start_req = [](TestService::Stub* stub, grpc::ClientContext* ctx,
437 CompletionQueue* cq, void* tag) {
Vijay Paie10ebf12015-04-30 13:12:31 -0700438 auto stream = stub->AsyncStreamingCall(ctx, cq, tag);
439 return stream;
440 };
Vijay Pai372fd872015-06-08 13:30:08 -0700441 return new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>(
442 channel_id, stub, req, start_req, check_done);
vjpai46f65232015-03-23 10:10:27 -0700443 }
vjpai46f65232015-03-23 10:10:27 -0700444};
445
Vijay Pai1da729a2015-03-23 12:52:56 -0700446std::unique_ptr<Client> CreateAsyncUnaryClient(const ClientConfig& args) {
vjpai46f65232015-03-23 10:10:27 -0700447 return std::unique_ptr<Client>(new AsyncUnaryClient(args));
448}
Vijay Pai1da729a2015-03-23 12:52:56 -0700449std::unique_ptr<Client> CreateAsyncStreamingClient(const ClientConfig& args) {
vjpai46f65232015-03-23 10:10:27 -0700450 return std::unique_ptr<Client>(new AsyncStreamingClient(args));
Craig Tiller88568752015-03-04 10:50:43 -0800451}
452
Craig Tillera182bf12015-03-04 13:54:39 -0800453} // namespace testing
454} // namespace grpc