blob: 7ad0f31575e34cf3629bd85e0016f677ca0e6cef [file] [log] [blame]
Yang Gaoa4002072015-04-09 23:25:21 -07001/*
2 *
3 * Copyright 2015, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "test/cpp/interop/interop_client.h"
35
David Garcia Quintasc8993192015-07-22 09:10:39 -070036#include <fstream>
Yang Gaoa4002072015-04-09 23:25:21 -070037#include <memory>
38
39#include <unistd.h>
40
41#include <grpc/grpc.h>
42#include <grpc/support/log.h>
David Garcia Quintasc8993192015-07-22 09:10:39 -070043#include <grpc/support/string_util.h>
Yang Gaoa4002072015-04-09 23:25:21 -070044#include <grpc++/channel_interface.h>
45#include <grpc++/client_context.h>
yang-g5bf510b2015-07-14 10:54:29 -070046#include <grpc++/credentials.h>
Yang Gaoa4002072015-04-09 23:25:21 -070047#include <grpc++/status.h>
48#include <grpc++/stream.h>
David Garcia Quintas80f39952015-07-21 16:07:36 -070049
50#include "test/cpp/interop/client_helper.h"
Abhishek Kumar1b3e3cd2015-04-16 20:10:29 -070051#include "test/proto/test.grpc.pb.h"
Abhishek Kumar60572d42015-04-16 20:45:25 -070052#include "test/proto/empty.grpc.pb.h"
53#include "test/proto/messages.grpc.pb.h"
David Garcia Quintasc8993192015-07-22 09:10:39 -070054#include "src/core/transport/stream_op.h"
Yang Gaoa4002072015-04-09 23:25:21 -070055
56namespace grpc {
57namespace testing {
58
David Garcia Quintasc8993192015-07-22 09:10:39 -070059static const char* kRandomFile = "test/cpp/interop/rnd.dat";
60
Yang Gaoa4002072015-04-09 23:25:21 -070061namespace {
62// The same value is defined by the Java client.
63const std::vector<int> request_stream_sizes = {27182, 8, 1828, 45904};
64const std::vector<int> response_stream_sizes = {31415, 9, 2653, 58979};
65const int kNumResponseMessages = 2000;
66const int kResponseMessageSize = 1030;
67const int kReceiveDelayMilliSeconds = 20;
Xudong Maa5861f32015-05-26 15:24:11 -070068const int kLargeRequestSize = 271828;
69const int kLargeResponseSize = 314159;
Yang Gaoa4002072015-04-09 23:25:21 -070070} // namespace
71
72InteropClient::InteropClient(std::shared_ptr<ChannelInterface> channel)
73 : channel_(channel) {}
74
75void InteropClient::AssertOkOrPrintErrorStatus(const Status& s) {
Yang Gaoc1a2c312015-06-16 10:59:46 -070076 if (s.ok()) {
Yang Gaoa4002072015-04-09 23:25:21 -070077 return;
78 }
Yang Gaoc1a2c312015-06-16 10:59:46 -070079 gpr_log(GPR_INFO, "Error status code: %d, message: %s", s.error_code(),
80 s.error_message().c_str());
Yang Gaoa4002072015-04-09 23:25:21 -070081 GPR_ASSERT(0);
82}
83
84void InteropClient::DoEmpty() {
85 gpr_log(GPR_INFO, "Sending an empty rpc...");
86 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
87
88 Empty request = Empty::default_instance();
89 Empty response = Empty::default_instance();
90 ClientContext context;
91
92 Status s = stub->EmptyCall(&context, request, &response);
93 AssertOkOrPrintErrorStatus(s);
94
95 gpr_log(GPR_INFO, "Empty rpc done.");
96}
97
98// Shared code to set large payload, make rpc and check response payload.
99void InteropClient::PerformLargeUnary(SimpleRequest* request,
100 SimpleResponse* response) {
101 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
102
103 ClientContext context;
David Garcia Quintascd37d582015-08-09 15:50:21 -0700104 request->set_response_type(PayloadType::COMPRESSABLE);
Yang Gaoa4002072015-04-09 23:25:21 -0700105 request->set_response_size(kLargeResponseSize);
106 grpc::string payload(kLargeRequestSize, '\0');
107 request->mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
108
109 Status s = stub->UnaryCall(&context, *request, response);
110
David Garcia Quintascd37d582015-08-09 15:50:21 -0700111 AssertOkOrPrintErrorStatus(s);
112}
113
114// Shared code to set large payload, make rpc and check response payload.
115void InteropClient::PerformLargeCompressedUnary(SimpleRequest* request,
116 SimpleResponse* response) {
117 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
118
119 ClientContext context;
120 InteropClientContextInspector inspector(context);
121 request->set_response_size(kLargeResponseSize);
122 grpc::string payload(kLargeRequestSize, '\0');
123 request->mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
124
125 Status s = stub->CompressedUnaryCall(&context, *request, response);
126
David Garcia Quintasc8993192015-07-22 09:10:39 -0700127 // Compression related checks.
David Garcia Quintas80f39952015-07-21 16:07:36 -0700128 GPR_ASSERT(request->response_compression() ==
129 GetInteropCompressionTypeFromCompressionAlgorithm(
130 inspector.GetCallCompressionAlgorithm()));
David Garcia Quintasc8993192015-07-22 09:10:39 -0700131 if (request->response_compression() == NONE) {
132 GPR_ASSERT(!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS));
133 } else if (request->response_type() == PayloadType::COMPRESSABLE) {
134 // requested compression and compressable response => results should always
135 // be compressed.
136 GPR_ASSERT(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS);
137 }
138
Yang Gaoa4002072015-04-09 23:25:21 -0700139 AssertOkOrPrintErrorStatus(s);
David Garcia Quintasc8993192015-07-22 09:10:39 -0700140
141 // Payload related checks.
142 if (request->response_type() != PayloadType::RANDOM) {
143 GPR_ASSERT(response->payload().type() == request->response_type());
144 }
145 switch (response->payload().type()) {
146 case PayloadType::COMPRESSABLE:
147 GPR_ASSERT(response->payload().body() ==
148 grpc::string(kLargeResponseSize, '\0'));
149 break;
150 case PayloadType::UNCOMPRESSABLE: {
151 std::ifstream rnd_file(kRandomFile);
152 GPR_ASSERT(rnd_file.good());
153 for (int i = 0; i < kLargeResponseSize; i++) {
154 GPR_ASSERT(response->payload().body()[i] == (char)rnd_file.get());
155 }
156 }
157 break;
158 default:
159 GPR_ASSERT(false);
160 }
Yang Gaoa4002072015-04-09 23:25:21 -0700161}
162
163void InteropClient::DoComputeEngineCreds(
164 const grpc::string& default_service_account,
165 const grpc::string& oauth_scope) {
166 gpr_log(GPR_INFO,
167 "Sending a large unary rpc with compute engine credentials ...");
168 SimpleRequest request;
169 SimpleResponse response;
170 request.set_fill_username(true);
171 request.set_fill_oauth_scope(true);
David Garcia Quintas80f39952015-07-21 16:07:36 -0700172 request.set_response_type(PayloadType::COMPRESSABLE);
Yang Gaoa4002072015-04-09 23:25:21 -0700173 PerformLargeUnary(&request, &response);
174 gpr_log(GPR_INFO, "Got username %s", response.username().c_str());
175 gpr_log(GPR_INFO, "Got oauth_scope %s", response.oauth_scope().c_str());
176 GPR_ASSERT(!response.username().empty());
177 GPR_ASSERT(response.username().c_str() == default_service_account);
178 GPR_ASSERT(!response.oauth_scope().empty());
179 const char* oauth_scope_str = response.oauth_scope().c_str();
180 GPR_ASSERT(oauth_scope.find(oauth_scope_str) != grpc::string::npos);
181 gpr_log(GPR_INFO, "Large unary with compute engine creds done.");
182}
183
184void InteropClient::DoServiceAccountCreds(const grpc::string& username,
185 const grpc::string& oauth_scope) {
186 gpr_log(GPR_INFO,
187 "Sending a large unary rpc with service account credentials ...");
188 SimpleRequest request;
189 SimpleResponse response;
190 request.set_fill_username(true);
191 request.set_fill_oauth_scope(true);
David Garcia Quintas80f39952015-07-21 16:07:36 -0700192 request.set_response_type(PayloadType::COMPRESSABLE);
Yang Gaoa4002072015-04-09 23:25:21 -0700193 PerformLargeUnary(&request, &response);
194 GPR_ASSERT(!response.username().empty());
195 GPR_ASSERT(!response.oauth_scope().empty());
196 GPR_ASSERT(username.find(response.username()) != grpc::string::npos);
197 const char* oauth_scope_str = response.oauth_scope().c_str();
198 GPR_ASSERT(oauth_scope.find(oauth_scope_str) != grpc::string::npos);
199 gpr_log(GPR_INFO, "Large unary with service account creds done.");
200}
201
yang-gbe5f0592015-07-13 11:11:50 -0700202void InteropClient::DoOauth2AuthToken(const grpc::string& username,
203 const grpc::string& oauth_scope) {
204 gpr_log(GPR_INFO,
yang-g463cde72015-07-17 15:21:39 -0700205 "Sending a unary rpc with raw oauth2 access token credentials ...");
yang-gbe5f0592015-07-13 11:11:50 -0700206 SimpleRequest request;
207 SimpleResponse response;
208 request.set_fill_username(true);
209 request.set_fill_oauth_scope(true);
yang-g463cde72015-07-17 15:21:39 -0700210 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
211
212 ClientContext context;
213
214 Status s = stub->UnaryCall(&context, request, &response);
215
216 AssertOkOrPrintErrorStatus(s);
yang-gbe5f0592015-07-13 11:11:50 -0700217 GPR_ASSERT(!response.username().empty());
218 GPR_ASSERT(!response.oauth_scope().empty());
219 GPR_ASSERT(username.find(response.username()) != grpc::string::npos);
220 const char* oauth_scope_str = response.oauth_scope().c_str();
221 GPR_ASSERT(oauth_scope.find(oauth_scope_str) != grpc::string::npos);
yang-g463cde72015-07-17 15:21:39 -0700222 gpr_log(GPR_INFO, "Unary with oauth2 access token credentials done.");
yang-gbe5f0592015-07-13 11:11:50 -0700223}
224
yang-g5bf510b2015-07-14 10:54:29 -0700225void InteropClient::DoPerRpcCreds(const grpc::string& username,
226 const grpc::string& oauth_scope) {
227 gpr_log(GPR_INFO,
yang-g8c31ee22015-07-15 13:36:27 -0700228 "Sending a unary rpc with per-rpc raw oauth2 access token ...");
yang-g5bf510b2015-07-14 10:54:29 -0700229 SimpleRequest request;
230 SimpleResponse response;
231 request.set_fill_username(true);
232 request.set_fill_oauth_scope(true);
233 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
234
235 ClientContext context;
236 grpc::string access_token = GetOauth2AccessToken();
237 std::shared_ptr<Credentials> creds = AccessTokenCredentials(access_token);
238 context.set_credentials(creds);
yang-g5bf510b2015-07-14 10:54:29 -0700239
240 Status s = stub->UnaryCall(&context, request, &response);
241
242 AssertOkOrPrintErrorStatus(s);
yang-g5bf510b2015-07-14 10:54:29 -0700243 GPR_ASSERT(!response.username().empty());
244 GPR_ASSERT(!response.oauth_scope().empty());
245 GPR_ASSERT(username.find(response.username()) != grpc::string::npos);
246 const char* oauth_scope_str = response.oauth_scope().c_str();
247 GPR_ASSERT(oauth_scope.find(oauth_scope_str) != grpc::string::npos);
yang-g8c31ee22015-07-15 13:36:27 -0700248 gpr_log(GPR_INFO, "Unary with per-rpc oauth2 access token done.");
yang-g5bf510b2015-07-14 10:54:29 -0700249}
250
Yang Gaoa4002072015-04-09 23:25:21 -0700251void InteropClient::DoJwtTokenCreds(const grpc::string& username) {
252 gpr_log(GPR_INFO, "Sending a large unary rpc with JWT token credentials ...");
253 SimpleRequest request;
254 SimpleResponse response;
255 request.set_fill_username(true);
David Garcia Quintas80f39952015-07-21 16:07:36 -0700256 request.set_response_type(PayloadType::COMPRESSABLE);
Yang Gaoa4002072015-04-09 23:25:21 -0700257 PerformLargeUnary(&request, &response);
258 GPR_ASSERT(!response.username().empty());
259 GPR_ASSERT(username.find(response.username()) != grpc::string::npos);
260 gpr_log(GPR_INFO, "Large unary with JWT token creds done.");
261}
262
263void InteropClient::DoLargeUnary() {
David Garcia Quintascd37d582015-08-09 15:50:21 -0700264 gpr_log(GPR_INFO, "Sending a large unary rpc...");
265 SimpleRequest request;
266 SimpleResponse response;
267 PerformLargeUnary(&request, &response);
268 gpr_log(GPR_INFO, "Large unary done.");
269}
270
271void InteropClient::DoLargeCompressedUnary() {
David Garcia Quintas80f39952015-07-21 16:07:36 -0700272 const CompressionType compression_types[] = {NONE, GZIP, DEFLATE};
273 const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE, RANDOM};
274 for (const auto payload_type : payload_types) {
275 for (const auto compression_type : compression_types) {
David Garcia Quintasc8993192015-07-22 09:10:39 -0700276 char* log_suffix;
277 gpr_asprintf(&log_suffix, "(compression=%s; payload=%s)",
278 CompressionType_Name(compression_type).c_str(),
279 PayloadType_Name(payload_type).c_str());
280
281 gpr_log(GPR_INFO, "Sending a large unary rpc %s.", log_suffix);
David Garcia Quintas80f39952015-07-21 16:07:36 -0700282 SimpleRequest request;
283 SimpleResponse response;
284 request.set_response_type(payload_type);
285 request.set_response_compression(compression_type);
286 PerformLargeUnary(&request, &response);
David Garcia Quintasc8993192015-07-22 09:10:39 -0700287 gpr_log(GPR_INFO, "Large unary done %s.", log_suffix);
288 gpr_free(log_suffix);
David Garcia Quintas80f39952015-07-21 16:07:36 -0700289 }
290 }
Yang Gaoa4002072015-04-09 23:25:21 -0700291}
292
293void InteropClient::DoRequestStreaming() {
294 gpr_log(GPR_INFO, "Sending request steaming rpc ...");
295 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
296
297 ClientContext context;
298 StreamingInputCallRequest request;
299 StreamingInputCallResponse response;
300
301 std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream(
302 stub->StreamingInputCall(&context, &response));
303
304 int aggregated_payload_size = 0;
305 for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) {
306 Payload* payload = request.mutable_payload();
307 payload->set_body(grpc::string(request_stream_sizes[i], '\0'));
308 GPR_ASSERT(stream->Write(request));
309 aggregated_payload_size += request_stream_sizes[i];
310 }
311 stream->WritesDone();
312 Status s = stream->Finish();
313
314 GPR_ASSERT(response.aggregated_payload_size() == aggregated_payload_size);
315 AssertOkOrPrintErrorStatus(s);
316 gpr_log(GPR_INFO, "Request streaming done.");
317}
318
319void InteropClient::DoResponseStreaming() {
David Garcia Quintascd37d582015-08-09 15:50:21 -0700320 gpr_log(GPR_INFO, "Receiving response steaming rpc ...");
321 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
322
323 ClientContext context;
324 StreamingOutputCallRequest request;
325 for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
326 ResponseParameters* response_parameter = request.add_response_parameters();
327 response_parameter->set_size(response_stream_sizes[i]);
328 }
329 StreamingOutputCallResponse response;
330 std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
331 stub->StreamingOutputCall(&context, request));
332
333 unsigned int i = 0;
334 while (stream->Read(&response)) {
335 GPR_ASSERT(response.payload().body() ==
336 grpc::string(response_stream_sizes[i], '\0'));
337 ++i;
338 }
339 GPR_ASSERT(response_stream_sizes.size() == i);
340 Status s = stream->Finish();
341 AssertOkOrPrintErrorStatus(s);
342 gpr_log(GPR_INFO, "Response streaming done.");
343}
344
345void InteropClient::DoResponseCompressedStreaming() {
Yang Gaoa4002072015-04-09 23:25:21 -0700346 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
347
David Garcia Quintasc8993192015-07-22 09:10:39 -0700348 const CompressionType compression_types[] = {NONE, GZIP, DEFLATE};
349 const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE, RANDOM};
350 for (const auto payload_type : payload_types) {
351 for (const auto compression_type : compression_types) {
352 ClientContext context;
353 InteropClientContextInspector inspector(context);
354 StreamingOutputCallRequest request;
David Garcia Quintas80f39952015-07-21 16:07:36 -0700355
David Garcia Quintasc8993192015-07-22 09:10:39 -0700356 char* log_suffix;
357 gpr_asprintf(&log_suffix, "(compression=%s; payload=%s)",
358 CompressionType_Name(compression_type).c_str(),
359 PayloadType_Name(payload_type).c_str());
360
361 gpr_log(GPR_INFO, "Receiving response steaming rpc %s.", log_suffix);
362
363 request.set_response_type(payload_type);
364 request.set_response_compression(compression_type);
365
366 for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
367 ResponseParameters* response_parameter =
368 request.add_response_parameters();
369 response_parameter->set_size(response_stream_sizes[i]);
370 }
371 StreamingOutputCallResponse response;
372
373 std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
374 stub->StreamingOutputCall(&context, request));
375
376 unsigned int i = 0;
377 while (stream->Read(&response)) {
378 GPR_ASSERT(response.payload().body() ==
379 grpc::string(response_stream_sizes[i], '\0'));
380
381 // Compression related checks.
382 GPR_ASSERT(request.response_compression() ==
383 GetInteropCompressionTypeFromCompressionAlgorithm(
384 inspector.GetCallCompressionAlgorithm()));
385 if (request.response_compression() == NONE) {
386 GPR_ASSERT(
387 !(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS));
388 } else if (request.response_type() == PayloadType::COMPRESSABLE) {
389 // requested compression and compressable response => results should
390 // always be compressed.
391 GPR_ASSERT(inspector.GetMessageFlags() &
392 GRPC_WRITE_INTERNAL_COMPRESS);
393 }
394
395 ++i;
396 }
397
398 GPR_ASSERT(response_stream_sizes.size() == i);
399 Status s = stream->Finish();
400
401 AssertOkOrPrintErrorStatus(s);
402 gpr_log(GPR_INFO, "Response streaming done %s.", log_suffix);
403 gpr_free(log_suffix);
404 }
Yang Gaoa4002072015-04-09 23:25:21 -0700405 }
Yang Gaoa4002072015-04-09 23:25:21 -0700406}
407
408void InteropClient::DoResponseStreamingWithSlowConsumer() {
409 gpr_log(GPR_INFO, "Receiving response steaming rpc with slow consumer ...");
410 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
411
412 ClientContext context;
413 StreamingOutputCallRequest request;
414
415 for (int i = 0; i < kNumResponseMessages; ++i) {
416 ResponseParameters* response_parameter = request.add_response_parameters();
417 response_parameter->set_size(kResponseMessageSize);
418 }
419 StreamingOutputCallResponse response;
420 std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
421 stub->StreamingOutputCall(&context, request));
422
423 int i = 0;
424 while (stream->Read(&response)) {
425 GPR_ASSERT(response.payload().body() ==
426 grpc::string(kResponseMessageSize, '\0'));
427 gpr_log(GPR_INFO, "received message %d", i);
428 usleep(kReceiveDelayMilliSeconds * 1000);
429 ++i;
430 }
431 GPR_ASSERT(kNumResponseMessages == i);
432 Status s = stream->Finish();
433
434 AssertOkOrPrintErrorStatus(s);
435 gpr_log(GPR_INFO, "Response streaming done.");
436}
437
438void InteropClient::DoHalfDuplex() {
439 gpr_log(GPR_INFO, "Sending half-duplex streaming rpc ...");
440 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
441
442 ClientContext context;
443 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
444 StreamingOutputCallResponse>>
445 stream(stub->HalfDuplexCall(&context));
446
447 StreamingOutputCallRequest request;
448 ResponseParameters* response_parameter = request.add_response_parameters();
449 for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
450 response_parameter->set_size(response_stream_sizes[i]);
451 GPR_ASSERT(stream->Write(request));
452 }
453 stream->WritesDone();
454
455 unsigned int i = 0;
456 StreamingOutputCallResponse response;
457 while (stream->Read(&response)) {
458 GPR_ASSERT(response.payload().has_body());
459 GPR_ASSERT(response.payload().body() ==
460 grpc::string(response_stream_sizes[i], '\0'));
461 ++i;
462 }
463 GPR_ASSERT(response_stream_sizes.size() == i);
464 Status s = stream->Finish();
465 AssertOkOrPrintErrorStatus(s);
466 gpr_log(GPR_INFO, "Half-duplex streaming rpc done.");
467}
468
469void InteropClient::DoPingPong() {
470 gpr_log(GPR_INFO, "Sending Ping Pong streaming rpc ...");
471 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
472
473 ClientContext context;
474 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
475 StreamingOutputCallResponse>>
476 stream(stub->FullDuplexCall(&context));
477
478 StreamingOutputCallRequest request;
479 request.set_response_type(PayloadType::COMPRESSABLE);
480 ResponseParameters* response_parameter = request.add_response_parameters();
481 Payload* payload = request.mutable_payload();
482 StreamingOutputCallResponse response;
483 for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) {
484 response_parameter->set_size(response_stream_sizes[i]);
485 payload->set_body(grpc::string(request_stream_sizes[i], '\0'));
486 GPR_ASSERT(stream->Write(request));
487 GPR_ASSERT(stream->Read(&response));
488 GPR_ASSERT(response.payload().has_body());
489 GPR_ASSERT(response.payload().body() ==
490 grpc::string(response_stream_sizes[i], '\0'));
491 }
492
493 stream->WritesDone();
494 GPR_ASSERT(!stream->Read(&response));
495 Status s = stream->Finish();
496 AssertOkOrPrintErrorStatus(s);
497 gpr_log(GPR_INFO, "Ping pong streaming done.");
498}
499
Yang Gao68d61572015-04-24 14:42:42 -0700500void InteropClient::DoCancelAfterBegin() {
501 gpr_log(GPR_INFO, "Sending request steaming rpc ...");
502 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
503
504 ClientContext context;
505 StreamingInputCallRequest request;
506 StreamingInputCallResponse response;
507
508 std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream(
509 stub->StreamingInputCall(&context, &response));
510
511 gpr_log(GPR_INFO, "Trying to cancel...");
512 context.TryCancel();
513 Status s = stream->Finish();
Yang Gaoc1a2c312015-06-16 10:59:46 -0700514 GPR_ASSERT(s.error_code() == StatusCode::CANCELLED);
Yang Gao68d61572015-04-24 14:42:42 -0700515 gpr_log(GPR_INFO, "Canceling streaming done.");
516}
517
518void InteropClient::DoCancelAfterFirstResponse() {
519 gpr_log(GPR_INFO, "Sending Ping Pong streaming rpc ...");
520 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
521
522 ClientContext context;
523 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
524 StreamingOutputCallResponse>>
525 stream(stub->FullDuplexCall(&context));
526
527 StreamingOutputCallRequest request;
528 request.set_response_type(PayloadType::COMPRESSABLE);
529 ResponseParameters* response_parameter = request.add_response_parameters();
530 response_parameter->set_size(31415);
531 request.mutable_payload()->set_body(grpc::string(27182, '\0'));
532 StreamingOutputCallResponse response;
533 GPR_ASSERT(stream->Write(request));
534 GPR_ASSERT(stream->Read(&response));
535 GPR_ASSERT(response.payload().has_body());
536 GPR_ASSERT(response.payload().body() == grpc::string(31415, '\0'));
537 gpr_log(GPR_INFO, "Trying to cancel...");
538 context.TryCancel();
539
540 Status s = stream->Finish();
541 gpr_log(GPR_INFO, "Canceling pingpong streaming done.");
542}
543
yang-g69563b92015-07-10 15:32:11 -0700544void InteropClient::DoTimeoutOnSleepingServer() {
545 gpr_log(GPR_INFO, "Sending Ping Pong streaming rpc with a short deadline...");
546 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
547
548 ClientContext context;
549 std::chrono::system_clock::time_point deadline =
550 std::chrono::system_clock::now() + std::chrono::milliseconds(1);
551 context.set_deadline(deadline);
552 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
553 StreamingOutputCallResponse>>
554 stream(stub->FullDuplexCall(&context));
555
556 StreamingOutputCallRequest request;
557 request.mutable_payload()->set_body(grpc::string(27182, '\0'));
558 stream->Write(request);
559
560 Status s = stream->Finish();
561 GPR_ASSERT(s.error_code() == StatusCode::DEADLINE_EXCEEDED);
562 gpr_log(GPR_INFO, "Pingpong streaming timeout done.");
563}
564
Abhishek Kumare1c867d2015-08-05 11:04:45 -0700565void InteropClient::DoStatusWithMessage() {
566 gpr_log(GPR_INFO, "Sending RPC with a request for status code 2 and message");
567 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
568
569 ClientContext context;
570 SimpleRequest request;
571 SimpleResponse response;
572 EchoStatus *requested_status = request.mutable_response_status();
573 requested_status->set_code(grpc::StatusCode::UNKNOWN);
574 grpc::string test_msg = "This is a test message";
575 requested_status->set_message(test_msg);
576
577 Status s = stub->UnaryCall(&context, request, &response);
578
579 GPR_ASSERT(s.error_code() == grpc::StatusCode::UNKNOWN);
580 GPR_ASSERT(s.error_message() == test_msg);
581 gpr_log(GPR_INFO, "Done testing Status and Message");
582}
583
Yang Gaoa4002072015-04-09 23:25:21 -0700584} // namespace testing
585} // namespace grpc