blob: 516994576207adca463433b0c97648d32feea142 [file] [log] [blame]
Yang Gaoa4002072015-04-09 23:25:21 -07001/*
2 *
3 * Copyright 2015, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "test/cpp/interop/interop_client.h"
35
yang-g9e2f90c2015-08-21 15:35:03 -070036#include <unistd.h>
37
David Garcia Quintasc8993192015-07-22 09:10:39 -070038#include <fstream>
Yang Gaoa4002072015-04-09 23:25:21 -070039#include <memory>
40
Yang Gaoa4002072015-04-09 23:25:21 -070041#include <grpc/grpc.h>
42#include <grpc/support/log.h>
David Garcia Quintasc8993192015-07-22 09:10:39 -070043#include <grpc/support/string_util.h>
David Garcia Quintas2a6427f2015-08-18 11:11:40 -070044#include <grpc/support/useful.h>
yang-g8c2be9f2015-08-19 16:28:09 -070045#include <grpc++/channel.h>
Yang Gaoa4002072015-04-09 23:25:21 -070046#include <grpc++/client_context.h>
Julien Boeuf5be92a32015-08-28 16:28:18 -070047#include <grpc++/security/credentials.h>
David Garcia Quintas80f39952015-07-21 16:07:36 -070048
yang-g9e2f90c2015-08-21 15:35:03 -070049#include "src/core/transport/stream_op.h"
David Garcia Quintas80f39952015-07-21 16:07:36 -070050#include "test/cpp/interop/client_helper.h"
Abhishek Kumar1b3e3cd2015-04-16 20:10:29 -070051#include "test/proto/test.grpc.pb.h"
Abhishek Kumar60572d42015-04-16 20:45:25 -070052#include "test/proto/empty.grpc.pb.h"
53#include "test/proto/messages.grpc.pb.h"
Yang Gaoa4002072015-04-09 23:25:21 -070054
55namespace grpc {
56namespace testing {
57
David Garcia Quintasc8993192015-07-22 09:10:39 -070058static const char* kRandomFile = "test/cpp/interop/rnd.dat";
59
Yang Gaoa4002072015-04-09 23:25:21 -070060namespace {
61// The same value is defined by the Java client.
62const std::vector<int> request_stream_sizes = {27182, 8, 1828, 45904};
63const std::vector<int> response_stream_sizes = {31415, 9, 2653, 58979};
64const int kNumResponseMessages = 2000;
65const int kResponseMessageSize = 1030;
66const int kReceiveDelayMilliSeconds = 20;
Xudong Maa5861f32015-05-26 15:24:11 -070067const int kLargeRequestSize = 271828;
68const int kLargeResponseSize = 314159;
David Garcia Quintas81491b62015-08-18 16:46:53 -070069
70CompressionType GetInteropCompressionTypeFromCompressionAlgorithm(
71 grpc_compression_algorithm algorithm) {
72 switch (algorithm) {
73 case GRPC_COMPRESS_NONE:
74 return CompressionType::NONE;
75 case GRPC_COMPRESS_GZIP:
76 return CompressionType::GZIP;
77 case GRPC_COMPRESS_DEFLATE:
78 return CompressionType::DEFLATE;
79 default:
80 GPR_ASSERT(false);
81 }
82}
Yang Gaoa4002072015-04-09 23:25:21 -070083} // namespace
84
yang-g8c2be9f2015-08-19 16:28:09 -070085InteropClient::InteropClient(std::shared_ptr<Channel> channel)
Yang Gaoa4002072015-04-09 23:25:21 -070086 : channel_(channel) {}
87
88void InteropClient::AssertOkOrPrintErrorStatus(const Status& s) {
Yang Gaoc1a2c312015-06-16 10:59:46 -070089 if (s.ok()) {
Yang Gaoa4002072015-04-09 23:25:21 -070090 return;
91 }
Yang Gaoc1a2c312015-06-16 10:59:46 -070092 gpr_log(GPR_INFO, "Error status code: %d, message: %s", s.error_code(),
93 s.error_message().c_str());
Yang Gaoa4002072015-04-09 23:25:21 -070094 GPR_ASSERT(0);
95}
96
97void InteropClient::DoEmpty() {
98 gpr_log(GPR_INFO, "Sending an empty rpc...");
99 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
100
101 Empty request = Empty::default_instance();
102 Empty response = Empty::default_instance();
103 ClientContext context;
104
105 Status s = stub->EmptyCall(&context, request, &response);
106 AssertOkOrPrintErrorStatus(s);
107
108 gpr_log(GPR_INFO, "Empty rpc done.");
109}
110
111// Shared code to set large payload, make rpc and check response payload.
112void InteropClient::PerformLargeUnary(SimpleRequest* request,
113 SimpleResponse* response) {
114 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
115
116 ClientContext context;
David Garcia Quintascd37d582015-08-09 15:50:21 -0700117 InteropClientContextInspector inspector(context);
David Garcia Quintas93dfab92015-08-13 11:29:50 -0700118 // If the request doesn't already specify the response type, default to
119 // COMPRESSABLE.
David Garcia Quintascd37d582015-08-09 15:50:21 -0700120 request->set_response_size(kLargeResponseSize);
121 grpc::string payload(kLargeRequestSize, '\0');
122 request->mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
123
David Garcia Quintas2e1bb1b2015-08-10 14:05:57 -0700124 Status s = stub->UnaryCall(&context, *request, response);
David Garcia Quintascd37d582015-08-09 15:50:21 -0700125
David Garcia Quintasc8993192015-07-22 09:10:39 -0700126 // Compression related checks.
David Garcia Quintas80f39952015-07-21 16:07:36 -0700127 GPR_ASSERT(request->response_compression() ==
128 GetInteropCompressionTypeFromCompressionAlgorithm(
129 inspector.GetCallCompressionAlgorithm()));
David Garcia Quintasc8993192015-07-22 09:10:39 -0700130 if (request->response_compression() == NONE) {
131 GPR_ASSERT(!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS));
132 } else if (request->response_type() == PayloadType::COMPRESSABLE) {
133 // requested compression and compressable response => results should always
134 // be compressed.
135 GPR_ASSERT(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS);
136 }
137
Yang Gaoa4002072015-04-09 23:25:21 -0700138 AssertOkOrPrintErrorStatus(s);
David Garcia Quintasc8993192015-07-22 09:10:39 -0700139
140 // Payload related checks.
141 if (request->response_type() != PayloadType::RANDOM) {
142 GPR_ASSERT(response->payload().type() == request->response_type());
143 }
144 switch (response->payload().type()) {
145 case PayloadType::COMPRESSABLE:
146 GPR_ASSERT(response->payload().body() ==
147 grpc::string(kLargeResponseSize, '\0'));
148 break;
149 case PayloadType::UNCOMPRESSABLE: {
Craig Tillerd6c98df2015-08-18 09:33:44 -0700150 std::ifstream rnd_file(kRandomFile);
151 GPR_ASSERT(rnd_file.good());
152 for (int i = 0; i < kLargeResponseSize; i++) {
153 GPR_ASSERT(response->payload().body()[i] == (char)rnd_file.get());
David Garcia Quintasc8993192015-07-22 09:10:39 -0700154 }
Craig Tillerd6c98df2015-08-18 09:33:44 -0700155 } break;
David Garcia Quintasc8993192015-07-22 09:10:39 -0700156 default:
157 GPR_ASSERT(false);
158 }
Yang Gaoa4002072015-04-09 23:25:21 -0700159}
160
161void InteropClient::DoComputeEngineCreds(
162 const grpc::string& default_service_account,
163 const grpc::string& oauth_scope) {
164 gpr_log(GPR_INFO,
165 "Sending a large unary rpc with compute engine credentials ...");
166 SimpleRequest request;
167 SimpleResponse response;
168 request.set_fill_username(true);
169 request.set_fill_oauth_scope(true);
David Garcia Quintas80f39952015-07-21 16:07:36 -0700170 request.set_response_type(PayloadType::COMPRESSABLE);
Yang Gaoa4002072015-04-09 23:25:21 -0700171 PerformLargeUnary(&request, &response);
172 gpr_log(GPR_INFO, "Got username %s", response.username().c_str());
173 gpr_log(GPR_INFO, "Got oauth_scope %s", response.oauth_scope().c_str());
174 GPR_ASSERT(!response.username().empty());
175 GPR_ASSERT(response.username().c_str() == default_service_account);
176 GPR_ASSERT(!response.oauth_scope().empty());
177 const char* oauth_scope_str = response.oauth_scope().c_str();
178 GPR_ASSERT(oauth_scope.find(oauth_scope_str) != grpc::string::npos);
179 gpr_log(GPR_INFO, "Large unary with compute engine creds done.");
180}
181
yang-gbe5f0592015-07-13 11:11:50 -0700182void InteropClient::DoOauth2AuthToken(const grpc::string& username,
183 const grpc::string& oauth_scope) {
184 gpr_log(GPR_INFO,
yang-g463cde72015-07-17 15:21:39 -0700185 "Sending a unary rpc with raw oauth2 access token credentials ...");
yang-gbe5f0592015-07-13 11:11:50 -0700186 SimpleRequest request;
187 SimpleResponse response;
188 request.set_fill_username(true);
189 request.set_fill_oauth_scope(true);
yang-g463cde72015-07-17 15:21:39 -0700190 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
191
192 ClientContext context;
193
194 Status s = stub->UnaryCall(&context, request, &response);
195
196 AssertOkOrPrintErrorStatus(s);
yang-gbe5f0592015-07-13 11:11:50 -0700197 GPR_ASSERT(!response.username().empty());
198 GPR_ASSERT(!response.oauth_scope().empty());
yang-g867d0c12015-09-02 14:33:15 -0700199 GPR_ASSERT(username == response.username());
yang-gbe5f0592015-07-13 11:11:50 -0700200 const char* oauth_scope_str = response.oauth_scope().c_str();
201 GPR_ASSERT(oauth_scope.find(oauth_scope_str) != grpc::string::npos);
yang-g463cde72015-07-17 15:21:39 -0700202 gpr_log(GPR_INFO, "Unary with oauth2 access token credentials done.");
yang-gbe5f0592015-07-13 11:11:50 -0700203}
204
yang-g867d0c12015-09-02 14:33:15 -0700205void InteropClient::DoPerRpcCreds(const grpc::string& json_key) {
Craig Tiller71a0f9d2015-09-28 17:22:01 -0700206 gpr_log(GPR_INFO, "Sending a unary rpc with per-rpc JWT access token ...");
yang-g5bf510b2015-07-14 10:54:29 -0700207 SimpleRequest request;
208 SimpleResponse response;
209 request.set_fill_username(true);
yang-g5bf510b2015-07-14 10:54:29 -0700210 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
211
212 ClientContext context;
yang-g867d0c12015-09-02 14:33:15 -0700213 std::chrono::seconds token_lifetime = std::chrono::hours(1);
214 std::shared_ptr<Credentials> creds =
215 ServiceAccountJWTAccessCredentials(json_key, token_lifetime.count());
216
yang-g5bf510b2015-07-14 10:54:29 -0700217 context.set_credentials(creds);
yang-g5bf510b2015-07-14 10:54:29 -0700218
219 Status s = stub->UnaryCall(&context, request, &response);
220
221 AssertOkOrPrintErrorStatus(s);
yang-g5bf510b2015-07-14 10:54:29 -0700222 GPR_ASSERT(!response.username().empty());
yang-g867d0c12015-09-02 14:33:15 -0700223 GPR_ASSERT(json_key.find(response.username()) != grpc::string::npos);
yang-g201ef592015-09-02 15:55:10 -0700224 gpr_log(GPR_INFO, "Unary with per-rpc JWT access token done.");
yang-g5bf510b2015-07-14 10:54:29 -0700225}
226
Yang Gaoa4002072015-04-09 23:25:21 -0700227void InteropClient::DoJwtTokenCreds(const grpc::string& username) {
228 gpr_log(GPR_INFO, "Sending a large unary rpc with JWT token credentials ...");
229 SimpleRequest request;
230 SimpleResponse response;
231 request.set_fill_username(true);
David Garcia Quintas80f39952015-07-21 16:07:36 -0700232 request.set_response_type(PayloadType::COMPRESSABLE);
Yang Gaoa4002072015-04-09 23:25:21 -0700233 PerformLargeUnary(&request, &response);
234 GPR_ASSERT(!response.username().empty());
235 GPR_ASSERT(username.find(response.username()) != grpc::string::npos);
236 gpr_log(GPR_INFO, "Large unary with JWT token creds done.");
237}
238
239void InteropClient::DoLargeUnary() {
David Garcia Quintascd37d582015-08-09 15:50:21 -0700240 gpr_log(GPR_INFO, "Sending a large unary rpc...");
241 SimpleRequest request;
242 SimpleResponse response;
David Garcia Quintas93dfab92015-08-13 11:29:50 -0700243 request.set_response_type(PayloadType::COMPRESSABLE);
David Garcia Quintascd37d582015-08-09 15:50:21 -0700244 PerformLargeUnary(&request, &response);
245 gpr_log(GPR_INFO, "Large unary done.");
246}
247
248void InteropClient::DoLargeCompressedUnary() {
David Garcia Quintas80f39952015-07-21 16:07:36 -0700249 const CompressionType compression_types[] = {NONE, GZIP, DEFLATE};
250 const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE, RANDOM};
David Garcia Quintas2a6427f2015-08-18 11:11:40 -0700251 for (size_t i = 0; i < GPR_ARRAY_SIZE(payload_types); i++) {
252 for (size_t j = 0; j < GPR_ARRAY_SIZE(compression_types); j++) {
David Garcia Quintasc8993192015-07-22 09:10:39 -0700253 char* log_suffix;
254 gpr_asprintf(&log_suffix, "(compression=%s; payload=%s)",
David Garcia Quintas2a6427f2015-08-18 11:11:40 -0700255 CompressionType_Name(compression_types[j]).c_str(),
256 PayloadType_Name(payload_types[i]).c_str());
David Garcia Quintasc8993192015-07-22 09:10:39 -0700257
David Garcia Quintas616b3752015-08-11 15:21:02 -0700258 gpr_log(GPR_INFO, "Sending a large compressed unary rpc %s.", log_suffix);
David Garcia Quintas80f39952015-07-21 16:07:36 -0700259 SimpleRequest request;
260 SimpleResponse response;
David Garcia Quintas2a6427f2015-08-18 11:11:40 -0700261 request.set_response_type(payload_types[i]);
262 request.set_response_compression(compression_types[j]);
David Garcia Quintas80f39952015-07-21 16:07:36 -0700263 PerformLargeUnary(&request, &response);
David Garcia Quintas616b3752015-08-11 15:21:02 -0700264 gpr_log(GPR_INFO, "Large compressed unary done %s.", log_suffix);
David Garcia Quintasc8993192015-07-22 09:10:39 -0700265 gpr_free(log_suffix);
David Garcia Quintas80f39952015-07-21 16:07:36 -0700266 }
267 }
Yang Gaoa4002072015-04-09 23:25:21 -0700268}
269
270void InteropClient::DoRequestStreaming() {
271 gpr_log(GPR_INFO, "Sending request steaming rpc ...");
272 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
273
274 ClientContext context;
275 StreamingInputCallRequest request;
276 StreamingInputCallResponse response;
277
278 std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream(
279 stub->StreamingInputCall(&context, &response));
280
281 int aggregated_payload_size = 0;
282 for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) {
283 Payload* payload = request.mutable_payload();
284 payload->set_body(grpc::string(request_stream_sizes[i], '\0'));
285 GPR_ASSERT(stream->Write(request));
286 aggregated_payload_size += request_stream_sizes[i];
287 }
288 stream->WritesDone();
289 Status s = stream->Finish();
290
291 GPR_ASSERT(response.aggregated_payload_size() == aggregated_payload_size);
292 AssertOkOrPrintErrorStatus(s);
293 gpr_log(GPR_INFO, "Request streaming done.");
294}
295
296void InteropClient::DoResponseStreaming() {
David Garcia Quintascd37d582015-08-09 15:50:21 -0700297 gpr_log(GPR_INFO, "Receiving response steaming rpc ...");
298 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
299
300 ClientContext context;
301 StreamingOutputCallRequest request;
302 for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
303 ResponseParameters* response_parameter = request.add_response_parameters();
304 response_parameter->set_size(response_stream_sizes[i]);
305 }
306 StreamingOutputCallResponse response;
307 std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
308 stub->StreamingOutputCall(&context, request));
309
310 unsigned int i = 0;
311 while (stream->Read(&response)) {
312 GPR_ASSERT(response.payload().body() ==
313 grpc::string(response_stream_sizes[i], '\0'));
314 ++i;
315 }
316 GPR_ASSERT(response_stream_sizes.size() == i);
317 Status s = stream->Finish();
318 AssertOkOrPrintErrorStatus(s);
319 gpr_log(GPR_INFO, "Response streaming done.");
320}
321
322void InteropClient::DoResponseCompressedStreaming() {
Yang Gaoa4002072015-04-09 23:25:21 -0700323 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
324
David Garcia Quintasc8993192015-07-22 09:10:39 -0700325 const CompressionType compression_types[] = {NONE, GZIP, DEFLATE};
326 const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE, RANDOM};
David Garcia Quintas2a6427f2015-08-18 11:11:40 -0700327 for (size_t i = 0; i < GPR_ARRAY_SIZE(payload_types); i++) {
328 for (size_t j = 0; j < GPR_ARRAY_SIZE(compression_types); j++) {
David Garcia Quintasc8993192015-07-22 09:10:39 -0700329 ClientContext context;
330 InteropClientContextInspector inspector(context);
331 StreamingOutputCallRequest request;
David Garcia Quintas80f39952015-07-21 16:07:36 -0700332
David Garcia Quintasc8993192015-07-22 09:10:39 -0700333 char* log_suffix;
334 gpr_asprintf(&log_suffix, "(compression=%s; payload=%s)",
David Garcia Quintas2a6427f2015-08-18 11:11:40 -0700335 CompressionType_Name(compression_types[j]).c_str(),
336 PayloadType_Name(payload_types[i]).c_str());
David Garcia Quintasc8993192015-07-22 09:10:39 -0700337
338 gpr_log(GPR_INFO, "Receiving response steaming rpc %s.", log_suffix);
339
David Garcia Quintas2a6427f2015-08-18 11:11:40 -0700340 request.set_response_type(payload_types[i]);
341 request.set_response_compression(compression_types[j]);
David Garcia Quintasc8993192015-07-22 09:10:39 -0700342
David Garcia Quintas04ecfa12015-08-25 14:19:48 -0700343 for (size_t k = 0; k < response_stream_sizes.size(); ++k) {
David Garcia Quintasc8993192015-07-22 09:10:39 -0700344 ResponseParameters* response_parameter =
345 request.add_response_parameters();
David Garcia Quintas04ecfa12015-08-25 14:19:48 -0700346 response_parameter->set_size(response_stream_sizes[k]);
David Garcia Quintasc8993192015-07-22 09:10:39 -0700347 }
348 StreamingOutputCallResponse response;
349
350 std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
351 stub->StreamingOutputCall(&context, request));
352
David Garcia Quintas04ecfa12015-08-25 14:19:48 -0700353 size_t k = 0;
David Garcia Quintasc8993192015-07-22 09:10:39 -0700354 while (stream->Read(&response)) {
David Garcia Quintas04ecfa12015-08-25 14:19:48 -0700355 // Payload related checks.
356 if (request.response_type() != PayloadType::RANDOM) {
357 GPR_ASSERT(response.payload().type() == request.response_type());
358 }
359 switch (response.payload().type()) {
360 case PayloadType::COMPRESSABLE:
361 GPR_ASSERT(response.payload().body() ==
362 grpc::string(response_stream_sizes[k], '\0'));
363 break;
364 case PayloadType::UNCOMPRESSABLE: {
365 std::ifstream rnd_file(kRandomFile);
366 GPR_ASSERT(rnd_file.good());
367 for (int n = 0; n < response_stream_sizes[k]; n++) {
368 GPR_ASSERT(response.payload().body()[n] == (char)rnd_file.get());
369 }
370 } break;
371 default:
372 GPR_ASSERT(false);
373 }
David Garcia Quintasc8993192015-07-22 09:10:39 -0700374
375 // Compression related checks.
376 GPR_ASSERT(request.response_compression() ==
377 GetInteropCompressionTypeFromCompressionAlgorithm(
378 inspector.GetCallCompressionAlgorithm()));
379 if (request.response_compression() == NONE) {
380 GPR_ASSERT(
381 !(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS));
382 } else if (request.response_type() == PayloadType::COMPRESSABLE) {
383 // requested compression and compressable response => results should
384 // always be compressed.
385 GPR_ASSERT(inspector.GetMessageFlags() &
386 GRPC_WRITE_INTERNAL_COMPRESS);
387 }
388
David Garcia Quintas04ecfa12015-08-25 14:19:48 -0700389 ++k;
David Garcia Quintasc8993192015-07-22 09:10:39 -0700390 }
391
David Garcia Quintas04ecfa12015-08-25 14:19:48 -0700392 GPR_ASSERT(response_stream_sizes.size() == k);
David Garcia Quintasc8993192015-07-22 09:10:39 -0700393 Status s = stream->Finish();
394
395 AssertOkOrPrintErrorStatus(s);
396 gpr_log(GPR_INFO, "Response streaming done %s.", log_suffix);
397 gpr_free(log_suffix);
398 }
Yang Gaoa4002072015-04-09 23:25:21 -0700399 }
Yang Gaoa4002072015-04-09 23:25:21 -0700400}
401
402void InteropClient::DoResponseStreamingWithSlowConsumer() {
403 gpr_log(GPR_INFO, "Receiving response steaming rpc with slow consumer ...");
404 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
405
406 ClientContext context;
407 StreamingOutputCallRequest request;
408
409 for (int i = 0; i < kNumResponseMessages; ++i) {
410 ResponseParameters* response_parameter = request.add_response_parameters();
411 response_parameter->set_size(kResponseMessageSize);
412 }
413 StreamingOutputCallResponse response;
414 std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
415 stub->StreamingOutputCall(&context, request));
416
417 int i = 0;
418 while (stream->Read(&response)) {
419 GPR_ASSERT(response.payload().body() ==
420 grpc::string(kResponseMessageSize, '\0'));
421 gpr_log(GPR_INFO, "received message %d", i);
422 usleep(kReceiveDelayMilliSeconds * 1000);
423 ++i;
424 }
425 GPR_ASSERT(kNumResponseMessages == i);
426 Status s = stream->Finish();
427
428 AssertOkOrPrintErrorStatus(s);
429 gpr_log(GPR_INFO, "Response streaming done.");
430}
431
432void InteropClient::DoHalfDuplex() {
433 gpr_log(GPR_INFO, "Sending half-duplex streaming rpc ...");
434 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
435
436 ClientContext context;
437 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
438 StreamingOutputCallResponse>>
439 stream(stub->HalfDuplexCall(&context));
440
441 StreamingOutputCallRequest request;
442 ResponseParameters* response_parameter = request.add_response_parameters();
443 for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
444 response_parameter->set_size(response_stream_sizes[i]);
445 GPR_ASSERT(stream->Write(request));
446 }
447 stream->WritesDone();
448
449 unsigned int i = 0;
450 StreamingOutputCallResponse response;
451 while (stream->Read(&response)) {
Yang Gaoa4002072015-04-09 23:25:21 -0700452 GPR_ASSERT(response.payload().body() ==
453 grpc::string(response_stream_sizes[i], '\0'));
454 ++i;
455 }
456 GPR_ASSERT(response_stream_sizes.size() == i);
457 Status s = stream->Finish();
458 AssertOkOrPrintErrorStatus(s);
459 gpr_log(GPR_INFO, "Half-duplex streaming rpc done.");
460}
461
462void InteropClient::DoPingPong() {
463 gpr_log(GPR_INFO, "Sending Ping Pong streaming rpc ...");
464 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
465
466 ClientContext context;
467 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
468 StreamingOutputCallResponse>>
469 stream(stub->FullDuplexCall(&context));
470
471 StreamingOutputCallRequest request;
472 request.set_response_type(PayloadType::COMPRESSABLE);
473 ResponseParameters* response_parameter = request.add_response_parameters();
474 Payload* payload = request.mutable_payload();
475 StreamingOutputCallResponse response;
476 for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) {
477 response_parameter->set_size(response_stream_sizes[i]);
478 payload->set_body(grpc::string(request_stream_sizes[i], '\0'));
479 GPR_ASSERT(stream->Write(request));
480 GPR_ASSERT(stream->Read(&response));
Yang Gaoa4002072015-04-09 23:25:21 -0700481 GPR_ASSERT(response.payload().body() ==
482 grpc::string(response_stream_sizes[i], '\0'));
483 }
484
485 stream->WritesDone();
486 GPR_ASSERT(!stream->Read(&response));
487 Status s = stream->Finish();
488 AssertOkOrPrintErrorStatus(s);
489 gpr_log(GPR_INFO, "Ping pong streaming done.");
490}
491
Yang Gao68d61572015-04-24 14:42:42 -0700492void InteropClient::DoCancelAfterBegin() {
493 gpr_log(GPR_INFO, "Sending request steaming rpc ...");
494 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
495
496 ClientContext context;
497 StreamingInputCallRequest request;
498 StreamingInputCallResponse response;
499
500 std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream(
501 stub->StreamingInputCall(&context, &response));
502
503 gpr_log(GPR_INFO, "Trying to cancel...");
504 context.TryCancel();
505 Status s = stream->Finish();
Yang Gaoc1a2c312015-06-16 10:59:46 -0700506 GPR_ASSERT(s.error_code() == StatusCode::CANCELLED);
Yang Gao68d61572015-04-24 14:42:42 -0700507 gpr_log(GPR_INFO, "Canceling streaming done.");
508}
509
510void InteropClient::DoCancelAfterFirstResponse() {
511 gpr_log(GPR_INFO, "Sending Ping Pong streaming rpc ...");
512 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
513
514 ClientContext context;
515 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
516 StreamingOutputCallResponse>>
517 stream(stub->FullDuplexCall(&context));
518
519 StreamingOutputCallRequest request;
520 request.set_response_type(PayloadType::COMPRESSABLE);
521 ResponseParameters* response_parameter = request.add_response_parameters();
522 response_parameter->set_size(31415);
523 request.mutable_payload()->set_body(grpc::string(27182, '\0'));
524 StreamingOutputCallResponse response;
525 GPR_ASSERT(stream->Write(request));
526 GPR_ASSERT(stream->Read(&response));
Yang Gao68d61572015-04-24 14:42:42 -0700527 GPR_ASSERT(response.payload().body() == grpc::string(31415, '\0'));
528 gpr_log(GPR_INFO, "Trying to cancel...");
529 context.TryCancel();
530
531 Status s = stream->Finish();
532 gpr_log(GPR_INFO, "Canceling pingpong streaming done.");
533}
534
yang-g69563b92015-07-10 15:32:11 -0700535void InteropClient::DoTimeoutOnSleepingServer() {
536 gpr_log(GPR_INFO, "Sending Ping Pong streaming rpc with a short deadline...");
537 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
538
539 ClientContext context;
540 std::chrono::system_clock::time_point deadline =
541 std::chrono::system_clock::now() + std::chrono::milliseconds(1);
542 context.set_deadline(deadline);
543 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
544 StreamingOutputCallResponse>>
545 stream(stub->FullDuplexCall(&context));
546
547 StreamingOutputCallRequest request;
548 request.mutable_payload()->set_body(grpc::string(27182, '\0'));
549 stream->Write(request);
550
551 Status s = stream->Finish();
552 GPR_ASSERT(s.error_code() == StatusCode::DEADLINE_EXCEEDED);
553 gpr_log(GPR_INFO, "Pingpong streaming timeout done.");
554}
555
yang-g78bddc62015-09-21 10:32:17 -0700556void InteropClient::DoEmptyStream() {
557 gpr_log(GPR_INFO, "Starting empty_stream.");
558 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
559
560 ClientContext context;
561 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
562 StreamingOutputCallResponse>>
563 stream(stub->FullDuplexCall(&context));
564 stream->WritesDone();
565 StreamingOutputCallResponse response;
566 GPR_ASSERT(stream->Read(&response) == false);
567 Status s = stream->Finish();
568 AssertOkOrPrintErrorStatus(s);
569 gpr_log(GPR_INFO, "empty_stream done.");
570}
571
Abhishek Kumare1c867d2015-08-05 11:04:45 -0700572void InteropClient::DoStatusWithMessage() {
573 gpr_log(GPR_INFO, "Sending RPC with a request for status code 2 and message");
574 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
575
576 ClientContext context;
577 SimpleRequest request;
578 SimpleResponse response;
Craig Tillerd6c98df2015-08-18 09:33:44 -0700579 EchoStatus* requested_status = request.mutable_response_status();
Abhishek Kumare1c867d2015-08-05 11:04:45 -0700580 requested_status->set_code(grpc::StatusCode::UNKNOWN);
581 grpc::string test_msg = "This is a test message";
582 requested_status->set_message(test_msg);
583
584 Status s = stub->UnaryCall(&context, request, &response);
585
586 GPR_ASSERT(s.error_code() == grpc::StatusCode::UNKNOWN);
587 GPR_ASSERT(s.error_message() == test_msg);
588 gpr_log(GPR_INFO, "Done testing Status and Message");
589}
590
Yang Gaoa4002072015-04-09 23:25:21 -0700591} // namespace testing
592} // namespace grpc