blob: 48ccf06da988c8d98c4321336906c1dfe2d5f9d4 [file] [log] [blame]
Yang Gaoa4002072015-04-09 23:25:21 -07001/*
2 *
3 * Copyright 2015, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "test/cpp/interop/interop_client.h"
35
yang-g9e2f90c2015-08-21 15:35:03 -070036#include <unistd.h>
37
David Garcia Quintasc8993192015-07-22 09:10:39 -070038#include <fstream>
Yang Gaoa4002072015-04-09 23:25:21 -070039#include <memory>
40
Yang Gaoa4002072015-04-09 23:25:21 -070041#include <grpc/grpc.h>
42#include <grpc/support/log.h>
David Garcia Quintasc8993192015-07-22 09:10:39 -070043#include <grpc/support/string_util.h>
David Garcia Quintas2a6427f2015-08-18 11:11:40 -070044#include <grpc/support/useful.h>
yang-g8c2be9f2015-08-19 16:28:09 -070045#include <grpc++/channel.h>
Yang Gaoa4002072015-04-09 23:25:21 -070046#include <grpc++/client_context.h>
Julien Boeuf5be92a32015-08-28 16:28:18 -070047#include <grpc++/security/credentials.h>
David Garcia Quintas80f39952015-07-21 16:07:36 -070048
yang-g9e2f90c2015-08-21 15:35:03 -070049#include "src/core/transport/stream_op.h"
David Garcia Quintas80f39952015-07-21 16:07:36 -070050#include "test/cpp/interop/client_helper.h"
Abhishek Kumar1b3e3cd2015-04-16 20:10:29 -070051#include "test/proto/test.grpc.pb.h"
Abhishek Kumar60572d42015-04-16 20:45:25 -070052#include "test/proto/empty.grpc.pb.h"
53#include "test/proto/messages.grpc.pb.h"
Yang Gaoa4002072015-04-09 23:25:21 -070054
55namespace grpc {
56namespace testing {
57
David Garcia Quintasc8993192015-07-22 09:10:39 -070058static const char* kRandomFile = "test/cpp/interop/rnd.dat";
59
Yang Gaoa4002072015-04-09 23:25:21 -070060namespace {
61// The same value is defined by the Java client.
62const std::vector<int> request_stream_sizes = {27182, 8, 1828, 45904};
63const std::vector<int> response_stream_sizes = {31415, 9, 2653, 58979};
64const int kNumResponseMessages = 2000;
65const int kResponseMessageSize = 1030;
66const int kReceiveDelayMilliSeconds = 20;
Xudong Maa5861f32015-05-26 15:24:11 -070067const int kLargeRequestSize = 271828;
68const int kLargeResponseSize = 314159;
David Garcia Quintas81491b62015-08-18 16:46:53 -070069
70CompressionType GetInteropCompressionTypeFromCompressionAlgorithm(
71 grpc_compression_algorithm algorithm) {
72 switch (algorithm) {
73 case GRPC_COMPRESS_NONE:
74 return CompressionType::NONE;
75 case GRPC_COMPRESS_GZIP:
76 return CompressionType::GZIP;
77 case GRPC_COMPRESS_DEFLATE:
78 return CompressionType::DEFLATE;
79 default:
80 GPR_ASSERT(false);
81 }
82}
Yang Gaoa4002072015-04-09 23:25:21 -070083} // namespace
84
yang-g8c2be9f2015-08-19 16:28:09 -070085InteropClient::InteropClient(std::shared_ptr<Channel> channel)
Yang Gaoa4002072015-04-09 23:25:21 -070086 : channel_(channel) {}
87
88void InteropClient::AssertOkOrPrintErrorStatus(const Status& s) {
Yang Gaoc1a2c312015-06-16 10:59:46 -070089 if (s.ok()) {
Yang Gaoa4002072015-04-09 23:25:21 -070090 return;
91 }
Yang Gaoc1a2c312015-06-16 10:59:46 -070092 gpr_log(GPR_INFO, "Error status code: %d, message: %s", s.error_code(),
93 s.error_message().c_str());
Yang Gaoa4002072015-04-09 23:25:21 -070094 GPR_ASSERT(0);
95}
96
97void InteropClient::DoEmpty() {
98 gpr_log(GPR_INFO, "Sending an empty rpc...");
99 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
100
101 Empty request = Empty::default_instance();
102 Empty response = Empty::default_instance();
103 ClientContext context;
104
105 Status s = stub->EmptyCall(&context, request, &response);
106 AssertOkOrPrintErrorStatus(s);
107
108 gpr_log(GPR_INFO, "Empty rpc done.");
109}
110
111// Shared code to set large payload, make rpc and check response payload.
112void InteropClient::PerformLargeUnary(SimpleRequest* request,
113 SimpleResponse* response) {
114 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
115
116 ClientContext context;
David Garcia Quintascd37d582015-08-09 15:50:21 -0700117 InteropClientContextInspector inspector(context);
David Garcia Quintas93dfab92015-08-13 11:29:50 -0700118 // If the request doesn't already specify the response type, default to
119 // COMPRESSABLE.
David Garcia Quintascd37d582015-08-09 15:50:21 -0700120 request->set_response_size(kLargeResponseSize);
121 grpc::string payload(kLargeRequestSize, '\0');
122 request->mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
123
David Garcia Quintas2e1bb1b2015-08-10 14:05:57 -0700124 Status s = stub->UnaryCall(&context, *request, response);
David Garcia Quintascd37d582015-08-09 15:50:21 -0700125
David Garcia Quintasc8993192015-07-22 09:10:39 -0700126 // Compression related checks.
David Garcia Quintas80f39952015-07-21 16:07:36 -0700127 GPR_ASSERT(request->response_compression() ==
128 GetInteropCompressionTypeFromCompressionAlgorithm(
129 inspector.GetCallCompressionAlgorithm()));
David Garcia Quintasc8993192015-07-22 09:10:39 -0700130 if (request->response_compression() == NONE) {
131 GPR_ASSERT(!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS));
132 } else if (request->response_type() == PayloadType::COMPRESSABLE) {
133 // requested compression and compressable response => results should always
134 // be compressed.
135 GPR_ASSERT(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS);
136 }
137
Yang Gaoa4002072015-04-09 23:25:21 -0700138 AssertOkOrPrintErrorStatus(s);
David Garcia Quintasc8993192015-07-22 09:10:39 -0700139
140 // Payload related checks.
141 if (request->response_type() != PayloadType::RANDOM) {
142 GPR_ASSERT(response->payload().type() == request->response_type());
143 }
144 switch (response->payload().type()) {
145 case PayloadType::COMPRESSABLE:
146 GPR_ASSERT(response->payload().body() ==
147 grpc::string(kLargeResponseSize, '\0'));
148 break;
149 case PayloadType::UNCOMPRESSABLE: {
Craig Tillerd6c98df2015-08-18 09:33:44 -0700150 std::ifstream rnd_file(kRandomFile);
151 GPR_ASSERT(rnd_file.good());
152 for (int i = 0; i < kLargeResponseSize; i++) {
153 GPR_ASSERT(response->payload().body()[i] == (char)rnd_file.get());
David Garcia Quintasc8993192015-07-22 09:10:39 -0700154 }
Craig Tillerd6c98df2015-08-18 09:33:44 -0700155 } break;
David Garcia Quintasc8993192015-07-22 09:10:39 -0700156 default:
157 GPR_ASSERT(false);
158 }
Yang Gaoa4002072015-04-09 23:25:21 -0700159}
160
161void InteropClient::DoComputeEngineCreds(
162 const grpc::string& default_service_account,
163 const grpc::string& oauth_scope) {
164 gpr_log(GPR_INFO,
165 "Sending a large unary rpc with compute engine credentials ...");
166 SimpleRequest request;
167 SimpleResponse response;
168 request.set_fill_username(true);
169 request.set_fill_oauth_scope(true);
David Garcia Quintas80f39952015-07-21 16:07:36 -0700170 request.set_response_type(PayloadType::COMPRESSABLE);
Yang Gaoa4002072015-04-09 23:25:21 -0700171 PerformLargeUnary(&request, &response);
172 gpr_log(GPR_INFO, "Got username %s", response.username().c_str());
173 gpr_log(GPR_INFO, "Got oauth_scope %s", response.oauth_scope().c_str());
174 GPR_ASSERT(!response.username().empty());
175 GPR_ASSERT(response.username().c_str() == default_service_account);
176 GPR_ASSERT(!response.oauth_scope().empty());
177 const char* oauth_scope_str = response.oauth_scope().c_str();
178 GPR_ASSERT(oauth_scope.find(oauth_scope_str) != grpc::string::npos);
179 gpr_log(GPR_INFO, "Large unary with compute engine creds done.");
180}
181
yang-gbe5f0592015-07-13 11:11:50 -0700182void InteropClient::DoOauth2AuthToken(const grpc::string& username,
183 const grpc::string& oauth_scope) {
184 gpr_log(GPR_INFO,
yang-g463cde72015-07-17 15:21:39 -0700185 "Sending a unary rpc with raw oauth2 access token credentials ...");
yang-gbe5f0592015-07-13 11:11:50 -0700186 SimpleRequest request;
187 SimpleResponse response;
188 request.set_fill_username(true);
189 request.set_fill_oauth_scope(true);
yang-g463cde72015-07-17 15:21:39 -0700190 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
191
192 ClientContext context;
193
194 Status s = stub->UnaryCall(&context, request, &response);
195
196 AssertOkOrPrintErrorStatus(s);
yang-gbe5f0592015-07-13 11:11:50 -0700197 GPR_ASSERT(!response.username().empty());
198 GPR_ASSERT(!response.oauth_scope().empty());
199 GPR_ASSERT(username.find(response.username()) != grpc::string::npos);
200 const char* oauth_scope_str = response.oauth_scope().c_str();
201 GPR_ASSERT(oauth_scope.find(oauth_scope_str) != grpc::string::npos);
yang-g463cde72015-07-17 15:21:39 -0700202 gpr_log(GPR_INFO, "Unary with oauth2 access token credentials done.");
yang-gbe5f0592015-07-13 11:11:50 -0700203}
204
yang-g5bf510b2015-07-14 10:54:29 -0700205void InteropClient::DoPerRpcCreds(const grpc::string& username,
206 const grpc::string& oauth_scope) {
207 gpr_log(GPR_INFO,
yang-g8c31ee22015-07-15 13:36:27 -0700208 "Sending a unary rpc with per-rpc raw oauth2 access token ...");
yang-g5bf510b2015-07-14 10:54:29 -0700209 SimpleRequest request;
210 SimpleResponse response;
211 request.set_fill_username(true);
212 request.set_fill_oauth_scope(true);
213 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
214
215 ClientContext context;
216 grpc::string access_token = GetOauth2AccessToken();
217 std::shared_ptr<Credentials> creds = AccessTokenCredentials(access_token);
218 context.set_credentials(creds);
yang-g5bf510b2015-07-14 10:54:29 -0700219
220 Status s = stub->UnaryCall(&context, request, &response);
221
222 AssertOkOrPrintErrorStatus(s);
yang-g5bf510b2015-07-14 10:54:29 -0700223 GPR_ASSERT(!response.username().empty());
224 GPR_ASSERT(!response.oauth_scope().empty());
225 GPR_ASSERT(username.find(response.username()) != grpc::string::npos);
226 const char* oauth_scope_str = response.oauth_scope().c_str();
227 GPR_ASSERT(oauth_scope.find(oauth_scope_str) != grpc::string::npos);
yang-g8c31ee22015-07-15 13:36:27 -0700228 gpr_log(GPR_INFO, "Unary with per-rpc oauth2 access token done.");
yang-g5bf510b2015-07-14 10:54:29 -0700229}
230
Yang Gaoa4002072015-04-09 23:25:21 -0700231void InteropClient::DoJwtTokenCreds(const grpc::string& username) {
232 gpr_log(GPR_INFO, "Sending a large unary rpc with JWT token credentials ...");
233 SimpleRequest request;
234 SimpleResponse response;
235 request.set_fill_username(true);
David Garcia Quintas80f39952015-07-21 16:07:36 -0700236 request.set_response_type(PayloadType::COMPRESSABLE);
Yang Gaoa4002072015-04-09 23:25:21 -0700237 PerformLargeUnary(&request, &response);
238 GPR_ASSERT(!response.username().empty());
239 GPR_ASSERT(username.find(response.username()) != grpc::string::npos);
240 gpr_log(GPR_INFO, "Large unary with JWT token creds done.");
241}
242
243void InteropClient::DoLargeUnary() {
David Garcia Quintascd37d582015-08-09 15:50:21 -0700244 gpr_log(GPR_INFO, "Sending a large unary rpc...");
245 SimpleRequest request;
246 SimpleResponse response;
David Garcia Quintas93dfab92015-08-13 11:29:50 -0700247 request.set_response_type(PayloadType::COMPRESSABLE);
David Garcia Quintascd37d582015-08-09 15:50:21 -0700248 PerformLargeUnary(&request, &response);
249 gpr_log(GPR_INFO, "Large unary done.");
250}
251
252void InteropClient::DoLargeCompressedUnary() {
David Garcia Quintas80f39952015-07-21 16:07:36 -0700253 const CompressionType compression_types[] = {NONE, GZIP, DEFLATE};
254 const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE, RANDOM};
David Garcia Quintas2a6427f2015-08-18 11:11:40 -0700255 for (size_t i = 0; i < GPR_ARRAY_SIZE(payload_types); i++) {
256 for (size_t j = 0; j < GPR_ARRAY_SIZE(compression_types); j++) {
David Garcia Quintasc8993192015-07-22 09:10:39 -0700257 char* log_suffix;
258 gpr_asprintf(&log_suffix, "(compression=%s; payload=%s)",
David Garcia Quintas2a6427f2015-08-18 11:11:40 -0700259 CompressionType_Name(compression_types[j]).c_str(),
260 PayloadType_Name(payload_types[i]).c_str());
David Garcia Quintasc8993192015-07-22 09:10:39 -0700261
David Garcia Quintas616b3752015-08-11 15:21:02 -0700262 gpr_log(GPR_INFO, "Sending a large compressed unary rpc %s.", log_suffix);
David Garcia Quintas80f39952015-07-21 16:07:36 -0700263 SimpleRequest request;
264 SimpleResponse response;
David Garcia Quintas2a6427f2015-08-18 11:11:40 -0700265 request.set_response_type(payload_types[i]);
266 request.set_response_compression(compression_types[j]);
David Garcia Quintas80f39952015-07-21 16:07:36 -0700267 PerformLargeUnary(&request, &response);
David Garcia Quintas616b3752015-08-11 15:21:02 -0700268 gpr_log(GPR_INFO, "Large compressed unary done %s.", log_suffix);
David Garcia Quintasc8993192015-07-22 09:10:39 -0700269 gpr_free(log_suffix);
David Garcia Quintas80f39952015-07-21 16:07:36 -0700270 }
271 }
Yang Gaoa4002072015-04-09 23:25:21 -0700272}
273
274void InteropClient::DoRequestStreaming() {
275 gpr_log(GPR_INFO, "Sending request steaming rpc ...");
276 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
277
278 ClientContext context;
279 StreamingInputCallRequest request;
280 StreamingInputCallResponse response;
281
282 std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream(
283 stub->StreamingInputCall(&context, &response));
284
285 int aggregated_payload_size = 0;
286 for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) {
287 Payload* payload = request.mutable_payload();
288 payload->set_body(grpc::string(request_stream_sizes[i], '\0'));
289 GPR_ASSERT(stream->Write(request));
290 aggregated_payload_size += request_stream_sizes[i];
291 }
292 stream->WritesDone();
293 Status s = stream->Finish();
294
295 GPR_ASSERT(response.aggregated_payload_size() == aggregated_payload_size);
296 AssertOkOrPrintErrorStatus(s);
297 gpr_log(GPR_INFO, "Request streaming done.");
298}
299
300void InteropClient::DoResponseStreaming() {
David Garcia Quintascd37d582015-08-09 15:50:21 -0700301 gpr_log(GPR_INFO, "Receiving response steaming rpc ...");
302 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
303
304 ClientContext context;
305 StreamingOutputCallRequest request;
306 for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
307 ResponseParameters* response_parameter = request.add_response_parameters();
308 response_parameter->set_size(response_stream_sizes[i]);
309 }
310 StreamingOutputCallResponse response;
311 std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
312 stub->StreamingOutputCall(&context, request));
313
314 unsigned int i = 0;
315 while (stream->Read(&response)) {
316 GPR_ASSERT(response.payload().body() ==
317 grpc::string(response_stream_sizes[i], '\0'));
318 ++i;
319 }
320 GPR_ASSERT(response_stream_sizes.size() == i);
321 Status s = stream->Finish();
322 AssertOkOrPrintErrorStatus(s);
323 gpr_log(GPR_INFO, "Response streaming done.");
324}
325
326void InteropClient::DoResponseCompressedStreaming() {
Yang Gaoa4002072015-04-09 23:25:21 -0700327 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
328
David Garcia Quintasc8993192015-07-22 09:10:39 -0700329 const CompressionType compression_types[] = {NONE, GZIP, DEFLATE};
330 const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE, RANDOM};
David Garcia Quintas2a6427f2015-08-18 11:11:40 -0700331 for (size_t i = 0; i < GPR_ARRAY_SIZE(payload_types); i++) {
332 for (size_t j = 0; j < GPR_ARRAY_SIZE(compression_types); j++) {
David Garcia Quintasc8993192015-07-22 09:10:39 -0700333 ClientContext context;
334 InteropClientContextInspector inspector(context);
335 StreamingOutputCallRequest request;
David Garcia Quintas80f39952015-07-21 16:07:36 -0700336
David Garcia Quintasc8993192015-07-22 09:10:39 -0700337 char* log_suffix;
338 gpr_asprintf(&log_suffix, "(compression=%s; payload=%s)",
David Garcia Quintas2a6427f2015-08-18 11:11:40 -0700339 CompressionType_Name(compression_types[j]).c_str(),
340 PayloadType_Name(payload_types[i]).c_str());
David Garcia Quintasc8993192015-07-22 09:10:39 -0700341
342 gpr_log(GPR_INFO, "Receiving response steaming rpc %s.", log_suffix);
343
David Garcia Quintas2a6427f2015-08-18 11:11:40 -0700344 request.set_response_type(payload_types[i]);
345 request.set_response_compression(compression_types[j]);
David Garcia Quintasc8993192015-07-22 09:10:39 -0700346
David Garcia Quintas04ecfa12015-08-25 14:19:48 -0700347 for (size_t k = 0; k < response_stream_sizes.size(); ++k) {
David Garcia Quintasc8993192015-07-22 09:10:39 -0700348 ResponseParameters* response_parameter =
349 request.add_response_parameters();
David Garcia Quintas04ecfa12015-08-25 14:19:48 -0700350 response_parameter->set_size(response_stream_sizes[k]);
David Garcia Quintasc8993192015-07-22 09:10:39 -0700351 }
352 StreamingOutputCallResponse response;
353
354 std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
355 stub->StreamingOutputCall(&context, request));
356
David Garcia Quintas04ecfa12015-08-25 14:19:48 -0700357 size_t k = 0;
David Garcia Quintasc8993192015-07-22 09:10:39 -0700358 while (stream->Read(&response)) {
David Garcia Quintas04ecfa12015-08-25 14:19:48 -0700359 // Payload related checks.
360 if (request.response_type() != PayloadType::RANDOM) {
361 GPR_ASSERT(response.payload().type() == request.response_type());
362 }
363 switch (response.payload().type()) {
364 case PayloadType::COMPRESSABLE:
365 GPR_ASSERT(response.payload().body() ==
366 grpc::string(response_stream_sizes[k], '\0'));
367 break;
368 case PayloadType::UNCOMPRESSABLE: {
369 std::ifstream rnd_file(kRandomFile);
370 GPR_ASSERT(rnd_file.good());
371 for (int n = 0; n < response_stream_sizes[k]; n++) {
372 GPR_ASSERT(response.payload().body()[n] == (char)rnd_file.get());
373 }
374 } break;
375 default:
376 GPR_ASSERT(false);
377 }
David Garcia Quintasc8993192015-07-22 09:10:39 -0700378
379 // Compression related checks.
380 GPR_ASSERT(request.response_compression() ==
381 GetInteropCompressionTypeFromCompressionAlgorithm(
382 inspector.GetCallCompressionAlgorithm()));
383 if (request.response_compression() == NONE) {
384 GPR_ASSERT(
385 !(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS));
386 } else if (request.response_type() == PayloadType::COMPRESSABLE) {
387 // requested compression and compressable response => results should
388 // always be compressed.
389 GPR_ASSERT(inspector.GetMessageFlags() &
390 GRPC_WRITE_INTERNAL_COMPRESS);
391 }
392
David Garcia Quintas04ecfa12015-08-25 14:19:48 -0700393 ++k;
David Garcia Quintasc8993192015-07-22 09:10:39 -0700394 }
395
David Garcia Quintas04ecfa12015-08-25 14:19:48 -0700396 GPR_ASSERT(response_stream_sizes.size() == k);
David Garcia Quintasc8993192015-07-22 09:10:39 -0700397 Status s = stream->Finish();
398
399 AssertOkOrPrintErrorStatus(s);
400 gpr_log(GPR_INFO, "Response streaming done %s.", log_suffix);
401 gpr_free(log_suffix);
402 }
Yang Gaoa4002072015-04-09 23:25:21 -0700403 }
Yang Gaoa4002072015-04-09 23:25:21 -0700404}
405
406void InteropClient::DoResponseStreamingWithSlowConsumer() {
407 gpr_log(GPR_INFO, "Receiving response steaming rpc with slow consumer ...");
408 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
409
410 ClientContext context;
411 StreamingOutputCallRequest request;
412
413 for (int i = 0; i < kNumResponseMessages; ++i) {
414 ResponseParameters* response_parameter = request.add_response_parameters();
415 response_parameter->set_size(kResponseMessageSize);
416 }
417 StreamingOutputCallResponse response;
418 std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
419 stub->StreamingOutputCall(&context, request));
420
421 int i = 0;
422 while (stream->Read(&response)) {
423 GPR_ASSERT(response.payload().body() ==
424 grpc::string(kResponseMessageSize, '\0'));
425 gpr_log(GPR_INFO, "received message %d", i);
426 usleep(kReceiveDelayMilliSeconds * 1000);
427 ++i;
428 }
429 GPR_ASSERT(kNumResponseMessages == i);
430 Status s = stream->Finish();
431
432 AssertOkOrPrintErrorStatus(s);
433 gpr_log(GPR_INFO, "Response streaming done.");
434}
435
436void InteropClient::DoHalfDuplex() {
437 gpr_log(GPR_INFO, "Sending half-duplex streaming rpc ...");
438 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
439
440 ClientContext context;
441 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
442 StreamingOutputCallResponse>>
443 stream(stub->HalfDuplexCall(&context));
444
445 StreamingOutputCallRequest request;
446 ResponseParameters* response_parameter = request.add_response_parameters();
447 for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
448 response_parameter->set_size(response_stream_sizes[i]);
449 GPR_ASSERT(stream->Write(request));
450 }
451 stream->WritesDone();
452
453 unsigned int i = 0;
454 StreamingOutputCallResponse response;
455 while (stream->Read(&response)) {
Yang Gaoa4002072015-04-09 23:25:21 -0700456 GPR_ASSERT(response.payload().body() ==
457 grpc::string(response_stream_sizes[i], '\0'));
458 ++i;
459 }
460 GPR_ASSERT(response_stream_sizes.size() == i);
461 Status s = stream->Finish();
462 AssertOkOrPrintErrorStatus(s);
463 gpr_log(GPR_INFO, "Half-duplex streaming rpc done.");
464}
465
466void InteropClient::DoPingPong() {
467 gpr_log(GPR_INFO, "Sending Ping Pong streaming rpc ...");
468 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
469
470 ClientContext context;
471 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
472 StreamingOutputCallResponse>>
473 stream(stub->FullDuplexCall(&context));
474
475 StreamingOutputCallRequest request;
476 request.set_response_type(PayloadType::COMPRESSABLE);
477 ResponseParameters* response_parameter = request.add_response_parameters();
478 Payload* payload = request.mutable_payload();
479 StreamingOutputCallResponse response;
480 for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) {
481 response_parameter->set_size(response_stream_sizes[i]);
482 payload->set_body(grpc::string(request_stream_sizes[i], '\0'));
483 GPR_ASSERT(stream->Write(request));
484 GPR_ASSERT(stream->Read(&response));
Yang Gaoa4002072015-04-09 23:25:21 -0700485 GPR_ASSERT(response.payload().body() ==
486 grpc::string(response_stream_sizes[i], '\0'));
487 }
488
489 stream->WritesDone();
490 GPR_ASSERT(!stream->Read(&response));
491 Status s = stream->Finish();
492 AssertOkOrPrintErrorStatus(s);
493 gpr_log(GPR_INFO, "Ping pong streaming done.");
494}
495
Yang Gao68d61572015-04-24 14:42:42 -0700496void InteropClient::DoCancelAfterBegin() {
497 gpr_log(GPR_INFO, "Sending request steaming rpc ...");
498 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
499
500 ClientContext context;
501 StreamingInputCallRequest request;
502 StreamingInputCallResponse response;
503
504 std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream(
505 stub->StreamingInputCall(&context, &response));
506
507 gpr_log(GPR_INFO, "Trying to cancel...");
508 context.TryCancel();
509 Status s = stream->Finish();
Yang Gaoc1a2c312015-06-16 10:59:46 -0700510 GPR_ASSERT(s.error_code() == StatusCode::CANCELLED);
Yang Gao68d61572015-04-24 14:42:42 -0700511 gpr_log(GPR_INFO, "Canceling streaming done.");
512}
513
514void InteropClient::DoCancelAfterFirstResponse() {
515 gpr_log(GPR_INFO, "Sending Ping Pong streaming rpc ...");
516 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
517
518 ClientContext context;
519 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
520 StreamingOutputCallResponse>>
521 stream(stub->FullDuplexCall(&context));
522
523 StreamingOutputCallRequest request;
524 request.set_response_type(PayloadType::COMPRESSABLE);
525 ResponseParameters* response_parameter = request.add_response_parameters();
526 response_parameter->set_size(31415);
527 request.mutable_payload()->set_body(grpc::string(27182, '\0'));
528 StreamingOutputCallResponse response;
529 GPR_ASSERT(stream->Write(request));
530 GPR_ASSERT(stream->Read(&response));
Yang Gao68d61572015-04-24 14:42:42 -0700531 GPR_ASSERT(response.payload().body() == grpc::string(31415, '\0'));
532 gpr_log(GPR_INFO, "Trying to cancel...");
533 context.TryCancel();
534
535 Status s = stream->Finish();
536 gpr_log(GPR_INFO, "Canceling pingpong streaming done.");
537}
538
yang-g69563b92015-07-10 15:32:11 -0700539void InteropClient::DoTimeoutOnSleepingServer() {
540 gpr_log(GPR_INFO, "Sending Ping Pong streaming rpc with a short deadline...");
541 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
542
543 ClientContext context;
544 std::chrono::system_clock::time_point deadline =
545 std::chrono::system_clock::now() + std::chrono::milliseconds(1);
546 context.set_deadline(deadline);
547 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
548 StreamingOutputCallResponse>>
549 stream(stub->FullDuplexCall(&context));
550
551 StreamingOutputCallRequest request;
552 request.mutable_payload()->set_body(grpc::string(27182, '\0'));
553 stream->Write(request);
554
555 Status s = stream->Finish();
556 GPR_ASSERT(s.error_code() == StatusCode::DEADLINE_EXCEEDED);
557 gpr_log(GPR_INFO, "Pingpong streaming timeout done.");
558}
559
Abhishek Kumare1c867d2015-08-05 11:04:45 -0700560void InteropClient::DoStatusWithMessage() {
561 gpr_log(GPR_INFO, "Sending RPC with a request for status code 2 and message");
562 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
563
564 ClientContext context;
565 SimpleRequest request;
566 SimpleResponse response;
Craig Tillerd6c98df2015-08-18 09:33:44 -0700567 EchoStatus* requested_status = request.mutable_response_status();
Abhishek Kumare1c867d2015-08-05 11:04:45 -0700568 requested_status->set_code(grpc::StatusCode::UNKNOWN);
569 grpc::string test_msg = "This is a test message";
570 requested_status->set_message(test_msg);
571
572 Status s = stub->UnaryCall(&context, request, &response);
573
574 GPR_ASSERT(s.error_code() == grpc::StatusCode::UNKNOWN);
575 GPR_ASSERT(s.error_message() == test_msg);
576 gpr_log(GPR_INFO, "Done testing Status and Message");
577}
578
Yang Gaoa4002072015-04-09 23:25:21 -0700579} // namespace testing
580} // namespace grpc