blob: 46f6fdac4092ee99fb039c8da832f8c7f2fac069 [file] [log] [blame]
Yang Gaoa4002072015-04-09 23:25:21 -07001/*
2 *
murgatroid993466c4b2016-01-12 10:26:04 -08003 * Copyright 2015-2016, Google Inc.
Yang Gaoa4002072015-04-09 23:25:21 -07004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "test/cpp/interop/interop_client.h"
35
yang-g9e2f90c2015-08-21 15:35:03 -070036#include <unistd.h>
37
David Garcia Quintasc8993192015-07-22 09:10:39 -070038#include <fstream>
Yang Gaoa4002072015-04-09 23:25:21 -070039#include <memory>
40
Sree Kuchibhotla7cecfa62016-01-06 08:09:02 -080041#include <grpc++/channel.h>
42#include <grpc++/client_context.h>
43#include <grpc++/security/credentials.h>
Yang Gaoa4002072015-04-09 23:25:21 -070044#include <grpc/grpc.h>
45#include <grpc/support/log.h>
David Garcia Quintasc8993192015-07-22 09:10:39 -070046#include <grpc/support/string_util.h>
David Garcia Quintas2a6427f2015-08-18 11:11:40 -070047#include <grpc/support/useful.h>
David Garcia Quintas80f39952015-07-21 16:07:36 -070048
Craig Tiller93b94472015-11-02 14:18:30 -080049#include "src/core/transport/byte_stream.h"
Craig Tiller1b4e3302015-12-17 16:35:00 -080050#include "src/proto/grpc/testing/empty.grpc.pb.h"
Craig Tiller7cc0f462016-01-07 16:37:03 -080051#include "src/proto/grpc/testing/test.grpc.pb.h"
Craig Tiller1b4e3302015-12-17 16:35:00 -080052#include "src/proto/grpc/testing/messages.grpc.pb.h"
David Garcia Quintas80f39952015-07-21 16:07:36 -070053#include "test/cpp/interop/client_helper.h"
Yang Gaoa4002072015-04-09 23:25:21 -070054
55namespace grpc {
56namespace testing {
57
David Garcia Quintasc8993192015-07-22 09:10:39 -070058static const char* kRandomFile = "test/cpp/interop/rnd.dat";
59
Yang Gaoa4002072015-04-09 23:25:21 -070060namespace {
61// The same value is defined by the Java client.
62const std::vector<int> request_stream_sizes = {27182, 8, 1828, 45904};
63const std::vector<int> response_stream_sizes = {31415, 9, 2653, 58979};
64const int kNumResponseMessages = 2000;
65const int kResponseMessageSize = 1030;
66const int kReceiveDelayMilliSeconds = 20;
Xudong Maa5861f32015-05-26 15:24:11 -070067const int kLargeRequestSize = 271828;
68const int kLargeResponseSize = 314159;
David Garcia Quintas81491b62015-08-18 16:46:53 -070069
70CompressionType GetInteropCompressionTypeFromCompressionAlgorithm(
71 grpc_compression_algorithm algorithm) {
72 switch (algorithm) {
73 case GRPC_COMPRESS_NONE:
74 return CompressionType::NONE;
75 case GRPC_COMPRESS_GZIP:
76 return CompressionType::GZIP;
77 case GRPC_COMPRESS_DEFLATE:
78 return CompressionType::DEFLATE;
79 default:
80 GPR_ASSERT(false);
81 }
82}
David Garcia Quintasa2b78172015-12-15 22:25:53 -080083
84void NoopChecks(const InteropClientContextInspector& inspector,
85 const SimpleRequest* request, const SimpleResponse* response) {}
86
87void CompressionChecks(const InteropClientContextInspector& inspector,
88 const SimpleRequest* request,
89 const SimpleResponse* response) {
90 GPR_ASSERT(request->response_compression() ==
91 GetInteropCompressionTypeFromCompressionAlgorithm(
92 inspector.GetCallCompressionAlgorithm()));
93 if (request->response_compression() == NONE) {
94 GPR_ASSERT(!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS));
95 } else if (request->response_type() == PayloadType::COMPRESSABLE) {
96 // requested compression and compressable response => results should always
97 // be compressed.
98 GPR_ASSERT(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS);
99 }
100}
Yang Gaoa4002072015-04-09 23:25:21 -0700101} // namespace
102
Sree Kuchibhotlaac7edec2015-10-20 16:45:52 -0700103InteropClient::ServiceStub::ServiceStub(std::shared_ptr<Channel> channel,
104 bool new_stub_every_call)
105 : channel_(channel), new_stub_every_call_(new_stub_every_call) {
106 // If new_stub_every_call is false, then this is our chance to initialize
107 // stub_. (see Get())
108 if (!new_stub_every_call) {
109 stub_ = TestService::NewStub(channel);
110 }
111}
112
113TestService::Stub* InteropClient::ServiceStub::Get() {
114 if (new_stub_every_call_) {
115 stub_ = TestService::NewStub(channel_);
116 }
117
118 return stub_.get();
119}
120
121void InteropClient::ServiceStub::Reset(std::shared_ptr<Channel> channel) {
122 channel_ = channel;
123
Sree Kuchibhotla41b21432015-10-22 10:42:40 -0700124 // Update stub_ as well. Note: If new_stub_every_call_ is true, we can reset
125 // the stub_ since the next call to Get() will create a new stub
Sree Kuchibhotlaac7edec2015-10-20 16:45:52 -0700126 if (new_stub_every_call_) {
Sree Kuchibhotla41b21432015-10-22 10:42:40 -0700127 stub_.reset();
Sree Kuchibhotlaac7edec2015-10-20 16:45:52 -0700128 } else {
129 stub_ = TestService::NewStub(channel);
130 }
131}
132
133void InteropClient::Reset(std::shared_ptr<Channel> channel) {
134 serviceStub_.Reset(channel);
135}
136
yang-g8c2be9f2015-08-19 16:28:09 -0700137InteropClient::InteropClient(std::shared_ptr<Channel> channel)
Sree Kuchibhotlaac7edec2015-10-20 16:45:52 -0700138 : serviceStub_(channel, true) {}
139
140InteropClient::InteropClient(std::shared_ptr<Channel> channel,
141 bool new_stub_every_test_case)
142 : serviceStub_(channel, new_stub_every_test_case) {}
Yang Gaoa4002072015-04-09 23:25:21 -0700143
144void InteropClient::AssertOkOrPrintErrorStatus(const Status& s) {
Yang Gaoc1a2c312015-06-16 10:59:46 -0700145 if (s.ok()) {
Yang Gaoa4002072015-04-09 23:25:21 -0700146 return;
147 }
Sree Kuchibhotla7cecfa62016-01-06 08:09:02 -0800148 gpr_log(GPR_ERROR, "Error status code: %d, message: %s", s.error_code(),
Yang Gaoc1a2c312015-06-16 10:59:46 -0700149 s.error_message().c_str());
Yang Gaoa4002072015-04-09 23:25:21 -0700150 GPR_ASSERT(0);
151}
152
153void InteropClient::DoEmpty() {
Sree Kuchibhotla7cecfa62016-01-06 08:09:02 -0800154 gpr_log(GPR_DEBUG, "Sending an empty rpc...");
Yang Gaoa4002072015-04-09 23:25:21 -0700155
156 Empty request = Empty::default_instance();
157 Empty response = Empty::default_instance();
158 ClientContext context;
159
Sree Kuchibhotlaac7edec2015-10-20 16:45:52 -0700160 Status s = serviceStub_.Get()->EmptyCall(&context, request, &response);
Yang Gaoa4002072015-04-09 23:25:21 -0700161 AssertOkOrPrintErrorStatus(s);
162
Sree Kuchibhotla7cecfa62016-01-06 08:09:02 -0800163 gpr_log(GPR_DEBUG, "Empty rpc done.");
Yang Gaoa4002072015-04-09 23:25:21 -0700164}
165
Yang Gaoa4002072015-04-09 23:25:21 -0700166void InteropClient::PerformLargeUnary(SimpleRequest* request,
167 SimpleResponse* response) {
David Garcia Quintasa2b78172015-12-15 22:25:53 -0800168 PerformLargeUnary(request, response, NoopChecks);
169}
170
171void InteropClient::PerformLargeUnary(SimpleRequest* request,
172 SimpleResponse* response,
173 CheckerFn custom_checks_fn) {
Yang Gaoa4002072015-04-09 23:25:21 -0700174 ClientContext context;
David Garcia Quintascd37d582015-08-09 15:50:21 -0700175 InteropClientContextInspector inspector(context);
David Garcia Quintas93dfab92015-08-13 11:29:50 -0700176 // If the request doesn't already specify the response type, default to
177 // COMPRESSABLE.
David Garcia Quintascd37d582015-08-09 15:50:21 -0700178 request->set_response_size(kLargeResponseSize);
179 grpc::string payload(kLargeRequestSize, '\0');
180 request->mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
181
Sree Kuchibhotlaac7edec2015-10-20 16:45:52 -0700182 Status s = serviceStub_.Get()->UnaryCall(&context, *request, response);
Yang Gaoa4002072015-04-09 23:25:21 -0700183 AssertOkOrPrintErrorStatus(s);
David Garcia Quintasc8993192015-07-22 09:10:39 -0700184
David Garcia Quintasa2b78172015-12-15 22:25:53 -0800185 custom_checks_fn(inspector, request, response);
186
David Garcia Quintasc8993192015-07-22 09:10:39 -0700187 // Payload related checks.
188 if (request->response_type() != PayloadType::RANDOM) {
189 GPR_ASSERT(response->payload().type() == request->response_type());
190 }
191 switch (response->payload().type()) {
192 case PayloadType::COMPRESSABLE:
193 GPR_ASSERT(response->payload().body() ==
194 grpc::string(kLargeResponseSize, '\0'));
195 break;
196 case PayloadType::UNCOMPRESSABLE: {
Craig Tillerd6c98df2015-08-18 09:33:44 -0700197 std::ifstream rnd_file(kRandomFile);
198 GPR_ASSERT(rnd_file.good());
199 for (int i = 0; i < kLargeResponseSize; i++) {
200 GPR_ASSERT(response->payload().body()[i] == (char)rnd_file.get());
David Garcia Quintasc8993192015-07-22 09:10:39 -0700201 }
Craig Tillerd6c98df2015-08-18 09:33:44 -0700202 } break;
David Garcia Quintasc8993192015-07-22 09:10:39 -0700203 default:
204 GPR_ASSERT(false);
205 }
Yang Gaoa4002072015-04-09 23:25:21 -0700206}
207
208void InteropClient::DoComputeEngineCreds(
209 const grpc::string& default_service_account,
210 const grpc::string& oauth_scope) {
Sree Kuchibhotla7cecfa62016-01-06 08:09:02 -0800211 gpr_log(GPR_DEBUG,
Yang Gaoa4002072015-04-09 23:25:21 -0700212 "Sending a large unary rpc with compute engine credentials ...");
213 SimpleRequest request;
214 SimpleResponse response;
215 request.set_fill_username(true);
216 request.set_fill_oauth_scope(true);
David Garcia Quintas80f39952015-07-21 16:07:36 -0700217 request.set_response_type(PayloadType::COMPRESSABLE);
Yang Gaoa4002072015-04-09 23:25:21 -0700218 PerformLargeUnary(&request, &response);
Sree Kuchibhotla7cecfa62016-01-06 08:09:02 -0800219 gpr_log(GPR_DEBUG, "Got username %s", response.username().c_str());
220 gpr_log(GPR_DEBUG, "Got oauth_scope %s", response.oauth_scope().c_str());
Yang Gaoa4002072015-04-09 23:25:21 -0700221 GPR_ASSERT(!response.username().empty());
222 GPR_ASSERT(response.username().c_str() == default_service_account);
223 GPR_ASSERT(!response.oauth_scope().empty());
224 const char* oauth_scope_str = response.oauth_scope().c_str();
225 GPR_ASSERT(oauth_scope.find(oauth_scope_str) != grpc::string::npos);
Sree Kuchibhotla7cecfa62016-01-06 08:09:02 -0800226 gpr_log(GPR_DEBUG, "Large unary with compute engine creds done.");
Yang Gaoa4002072015-04-09 23:25:21 -0700227}
228
yang-gbe5f0592015-07-13 11:11:50 -0700229void InteropClient::DoOauth2AuthToken(const grpc::string& username,
230 const grpc::string& oauth_scope) {
Sree Kuchibhotla7cecfa62016-01-06 08:09:02 -0800231 gpr_log(GPR_DEBUG,
yang-g463cde72015-07-17 15:21:39 -0700232 "Sending a unary rpc with raw oauth2 access token credentials ...");
yang-gbe5f0592015-07-13 11:11:50 -0700233 SimpleRequest request;
234 SimpleResponse response;
235 request.set_fill_username(true);
236 request.set_fill_oauth_scope(true);
yang-g463cde72015-07-17 15:21:39 -0700237
238 ClientContext context;
239
Sree Kuchibhotlaac7edec2015-10-20 16:45:52 -0700240 Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
yang-g463cde72015-07-17 15:21:39 -0700241
242 AssertOkOrPrintErrorStatus(s);
yang-gbe5f0592015-07-13 11:11:50 -0700243 GPR_ASSERT(!response.username().empty());
244 GPR_ASSERT(!response.oauth_scope().empty());
yang-g867d0c12015-09-02 14:33:15 -0700245 GPR_ASSERT(username == response.username());
yang-gbe5f0592015-07-13 11:11:50 -0700246 const char* oauth_scope_str = response.oauth_scope().c_str();
247 GPR_ASSERT(oauth_scope.find(oauth_scope_str) != grpc::string::npos);
Sree Kuchibhotla7cecfa62016-01-06 08:09:02 -0800248 gpr_log(GPR_DEBUG, "Unary with oauth2 access token credentials done.");
yang-gbe5f0592015-07-13 11:11:50 -0700249}
250
yang-g867d0c12015-09-02 14:33:15 -0700251void InteropClient::DoPerRpcCreds(const grpc::string& json_key) {
Sree Kuchibhotla7cecfa62016-01-06 08:09:02 -0800252 gpr_log(GPR_DEBUG, "Sending a unary rpc with per-rpc JWT access token ...");
yang-g5bf510b2015-07-14 10:54:29 -0700253 SimpleRequest request;
254 SimpleResponse response;
255 request.set_fill_username(true);
yang-g5bf510b2015-07-14 10:54:29 -0700256
257 ClientContext context;
yang-g867d0c12015-09-02 14:33:15 -0700258 std::chrono::seconds token_lifetime = std::chrono::hours(1);
Julien Boeufe5adc0e2015-10-12 14:08:10 -0700259 std::shared_ptr<CallCredentials> creds =
yang-g867d0c12015-09-02 14:33:15 -0700260 ServiceAccountJWTAccessCredentials(json_key, token_lifetime.count());
261
yang-g5bf510b2015-07-14 10:54:29 -0700262 context.set_credentials(creds);
yang-g5bf510b2015-07-14 10:54:29 -0700263
Sree Kuchibhotlaac7edec2015-10-20 16:45:52 -0700264 Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
yang-g5bf510b2015-07-14 10:54:29 -0700265
266 AssertOkOrPrintErrorStatus(s);
yang-g5bf510b2015-07-14 10:54:29 -0700267 GPR_ASSERT(!response.username().empty());
yang-g867d0c12015-09-02 14:33:15 -0700268 GPR_ASSERT(json_key.find(response.username()) != grpc::string::npos);
Sree Kuchibhotla7cecfa62016-01-06 08:09:02 -0800269 gpr_log(GPR_DEBUG, "Unary with per-rpc JWT access token done.");
yang-g5bf510b2015-07-14 10:54:29 -0700270}
271
Yang Gaoa4002072015-04-09 23:25:21 -0700272void InteropClient::DoJwtTokenCreds(const grpc::string& username) {
Sree Kuchibhotla7cecfa62016-01-06 08:09:02 -0800273 gpr_log(GPR_DEBUG,
274 "Sending a large unary rpc with JWT token credentials ...");
Yang Gaoa4002072015-04-09 23:25:21 -0700275 SimpleRequest request;
276 SimpleResponse response;
277 request.set_fill_username(true);
David Garcia Quintas80f39952015-07-21 16:07:36 -0700278 request.set_response_type(PayloadType::COMPRESSABLE);
Yang Gaoa4002072015-04-09 23:25:21 -0700279 PerformLargeUnary(&request, &response);
280 GPR_ASSERT(!response.username().empty());
281 GPR_ASSERT(username.find(response.username()) != grpc::string::npos);
Sree Kuchibhotla7cecfa62016-01-06 08:09:02 -0800282 gpr_log(GPR_DEBUG, "Large unary with JWT token creds done.");
Yang Gaoa4002072015-04-09 23:25:21 -0700283}
284
285void InteropClient::DoLargeUnary() {
Sree Kuchibhotla7cecfa62016-01-06 08:09:02 -0800286 gpr_log(GPR_DEBUG, "Sending a large unary rpc...");
David Garcia Quintascd37d582015-08-09 15:50:21 -0700287 SimpleRequest request;
288 SimpleResponse response;
David Garcia Quintas93dfab92015-08-13 11:29:50 -0700289 request.set_response_type(PayloadType::COMPRESSABLE);
David Garcia Quintascd37d582015-08-09 15:50:21 -0700290 PerformLargeUnary(&request, &response);
Sree Kuchibhotla7cecfa62016-01-06 08:09:02 -0800291 gpr_log(GPR_DEBUG, "Large unary done.");
David Garcia Quintascd37d582015-08-09 15:50:21 -0700292}
293
294void InteropClient::DoLargeCompressedUnary() {
David Garcia Quintas80f39952015-07-21 16:07:36 -0700295 const CompressionType compression_types[] = {NONE, GZIP, DEFLATE};
296 const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE, RANDOM};
David Garcia Quintas2a6427f2015-08-18 11:11:40 -0700297 for (size_t i = 0; i < GPR_ARRAY_SIZE(payload_types); i++) {
298 for (size_t j = 0; j < GPR_ARRAY_SIZE(compression_types); j++) {
David Garcia Quintasc8993192015-07-22 09:10:39 -0700299 char* log_suffix;
300 gpr_asprintf(&log_suffix, "(compression=%s; payload=%s)",
David Garcia Quintas2a6427f2015-08-18 11:11:40 -0700301 CompressionType_Name(compression_types[j]).c_str(),
302 PayloadType_Name(payload_types[i]).c_str());
David Garcia Quintasc8993192015-07-22 09:10:39 -0700303
Sree Kuchibhotla7cecfa62016-01-06 08:09:02 -0800304 gpr_log(GPR_DEBUG, "Sending a large compressed unary rpc %s.",
305 log_suffix);
David Garcia Quintas80f39952015-07-21 16:07:36 -0700306 SimpleRequest request;
307 SimpleResponse response;
David Garcia Quintas2a6427f2015-08-18 11:11:40 -0700308 request.set_response_type(payload_types[i]);
309 request.set_response_compression(compression_types[j]);
David Garcia Quintasa2b78172015-12-15 22:25:53 -0800310 PerformLargeUnary(&request, &response, CompressionChecks);
Sree Kuchibhotla7cecfa62016-01-06 08:09:02 -0800311 gpr_log(GPR_DEBUG, "Large compressed unary done %s.", log_suffix);
David Garcia Quintasc8993192015-07-22 09:10:39 -0700312 gpr_free(log_suffix);
David Garcia Quintas80f39952015-07-21 16:07:36 -0700313 }
314 }
Yang Gaoa4002072015-04-09 23:25:21 -0700315}
316
317void InteropClient::DoRequestStreaming() {
Sree Kuchibhotla7cecfa62016-01-06 08:09:02 -0800318 gpr_log(GPR_DEBUG, "Sending request steaming rpc ...");
Yang Gaoa4002072015-04-09 23:25:21 -0700319
320 ClientContext context;
321 StreamingInputCallRequest request;
322 StreamingInputCallResponse response;
323
324 std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream(
Sree Kuchibhotlaac7edec2015-10-20 16:45:52 -0700325 serviceStub_.Get()->StreamingInputCall(&context, &response));
Yang Gaoa4002072015-04-09 23:25:21 -0700326
327 int aggregated_payload_size = 0;
328 for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) {
329 Payload* payload = request.mutable_payload();
330 payload->set_body(grpc::string(request_stream_sizes[i], '\0'));
331 GPR_ASSERT(stream->Write(request));
332 aggregated_payload_size += request_stream_sizes[i];
333 }
334 stream->WritesDone();
335 Status s = stream->Finish();
336
337 GPR_ASSERT(response.aggregated_payload_size() == aggregated_payload_size);
338 AssertOkOrPrintErrorStatus(s);
Sree Kuchibhotla7cecfa62016-01-06 08:09:02 -0800339 gpr_log(GPR_DEBUG, "Request streaming done.");
Yang Gaoa4002072015-04-09 23:25:21 -0700340}
341
342void InteropClient::DoResponseStreaming() {
Sree Kuchibhotla7cecfa62016-01-06 08:09:02 -0800343 gpr_log(GPR_DEBUG, "Receiving response steaming rpc ...");
David Garcia Quintascd37d582015-08-09 15:50:21 -0700344
345 ClientContext context;
346 StreamingOutputCallRequest request;
347 for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
348 ResponseParameters* response_parameter = request.add_response_parameters();
349 response_parameter->set_size(response_stream_sizes[i]);
350 }
351 StreamingOutputCallResponse response;
352 std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
Sree Kuchibhotlaac7edec2015-10-20 16:45:52 -0700353 serviceStub_.Get()->StreamingOutputCall(&context, request));
David Garcia Quintascd37d582015-08-09 15:50:21 -0700354
355 unsigned int i = 0;
356 while (stream->Read(&response)) {
357 GPR_ASSERT(response.payload().body() ==
358 grpc::string(response_stream_sizes[i], '\0'));
359 ++i;
360 }
361 GPR_ASSERT(response_stream_sizes.size() == i);
362 Status s = stream->Finish();
363 AssertOkOrPrintErrorStatus(s);
Sree Kuchibhotla7cecfa62016-01-06 08:09:02 -0800364 gpr_log(GPR_DEBUG, "Response streaming done.");
David Garcia Quintascd37d582015-08-09 15:50:21 -0700365}
366
367void InteropClient::DoResponseCompressedStreaming() {
David Garcia Quintasc8993192015-07-22 09:10:39 -0700368 const CompressionType compression_types[] = {NONE, GZIP, DEFLATE};
369 const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE, RANDOM};
David Garcia Quintas2a6427f2015-08-18 11:11:40 -0700370 for (size_t i = 0; i < GPR_ARRAY_SIZE(payload_types); i++) {
371 for (size_t j = 0; j < GPR_ARRAY_SIZE(compression_types); j++) {
David Garcia Quintasc8993192015-07-22 09:10:39 -0700372 ClientContext context;
373 InteropClientContextInspector inspector(context);
374 StreamingOutputCallRequest request;
David Garcia Quintas80f39952015-07-21 16:07:36 -0700375
David Garcia Quintasc8993192015-07-22 09:10:39 -0700376 char* log_suffix;
377 gpr_asprintf(&log_suffix, "(compression=%s; payload=%s)",
David Garcia Quintas2a6427f2015-08-18 11:11:40 -0700378 CompressionType_Name(compression_types[j]).c_str(),
379 PayloadType_Name(payload_types[i]).c_str());
David Garcia Quintasc8993192015-07-22 09:10:39 -0700380
Sree Kuchibhotla7cecfa62016-01-06 08:09:02 -0800381 gpr_log(GPR_DEBUG, "Receiving response steaming rpc %s.", log_suffix);
David Garcia Quintasc8993192015-07-22 09:10:39 -0700382
David Garcia Quintas2a6427f2015-08-18 11:11:40 -0700383 request.set_response_type(payload_types[i]);
384 request.set_response_compression(compression_types[j]);
David Garcia Quintasc8993192015-07-22 09:10:39 -0700385
David Garcia Quintas04ecfa12015-08-25 14:19:48 -0700386 for (size_t k = 0; k < response_stream_sizes.size(); ++k) {
David Garcia Quintasc8993192015-07-22 09:10:39 -0700387 ResponseParameters* response_parameter =
388 request.add_response_parameters();
David Garcia Quintas04ecfa12015-08-25 14:19:48 -0700389 response_parameter->set_size(response_stream_sizes[k]);
David Garcia Quintasc8993192015-07-22 09:10:39 -0700390 }
391 StreamingOutputCallResponse response;
392
393 std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
Sree Kuchibhotlaac7edec2015-10-20 16:45:52 -0700394 serviceStub_.Get()->StreamingOutputCall(&context, request));
David Garcia Quintasc8993192015-07-22 09:10:39 -0700395
David Garcia Quintas04ecfa12015-08-25 14:19:48 -0700396 size_t k = 0;
David Garcia Quintasc8993192015-07-22 09:10:39 -0700397 while (stream->Read(&response)) {
David Garcia Quintas04ecfa12015-08-25 14:19:48 -0700398 // Payload related checks.
399 if (request.response_type() != PayloadType::RANDOM) {
400 GPR_ASSERT(response.payload().type() == request.response_type());
401 }
402 switch (response.payload().type()) {
403 case PayloadType::COMPRESSABLE:
404 GPR_ASSERT(response.payload().body() ==
405 grpc::string(response_stream_sizes[k], '\0'));
406 break;
407 case PayloadType::UNCOMPRESSABLE: {
408 std::ifstream rnd_file(kRandomFile);
409 GPR_ASSERT(rnd_file.good());
410 for (int n = 0; n < response_stream_sizes[k]; n++) {
411 GPR_ASSERT(response.payload().body()[n] == (char)rnd_file.get());
412 }
413 } break;
414 default:
415 GPR_ASSERT(false);
416 }
David Garcia Quintasc8993192015-07-22 09:10:39 -0700417
418 // Compression related checks.
419 GPR_ASSERT(request.response_compression() ==
420 GetInteropCompressionTypeFromCompressionAlgorithm(
421 inspector.GetCallCompressionAlgorithm()));
422 if (request.response_compression() == NONE) {
423 GPR_ASSERT(
424 !(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS));
425 } else if (request.response_type() == PayloadType::COMPRESSABLE) {
426 // requested compression and compressable response => results should
427 // always be compressed.
428 GPR_ASSERT(inspector.GetMessageFlags() &
429 GRPC_WRITE_INTERNAL_COMPRESS);
430 }
431
David Garcia Quintas04ecfa12015-08-25 14:19:48 -0700432 ++k;
David Garcia Quintasc8993192015-07-22 09:10:39 -0700433 }
434
David Garcia Quintas04ecfa12015-08-25 14:19:48 -0700435 GPR_ASSERT(response_stream_sizes.size() == k);
David Garcia Quintasc8993192015-07-22 09:10:39 -0700436 Status s = stream->Finish();
437
438 AssertOkOrPrintErrorStatus(s);
Sree Kuchibhotla7cecfa62016-01-06 08:09:02 -0800439 gpr_log(GPR_DEBUG, "Response streaming done %s.", log_suffix);
David Garcia Quintasc8993192015-07-22 09:10:39 -0700440 gpr_free(log_suffix);
441 }
Yang Gaoa4002072015-04-09 23:25:21 -0700442 }
Yang Gaoa4002072015-04-09 23:25:21 -0700443}
444
445void InteropClient::DoResponseStreamingWithSlowConsumer() {
Sree Kuchibhotla7cecfa62016-01-06 08:09:02 -0800446 gpr_log(GPR_DEBUG, "Receiving response steaming rpc with slow consumer ...");
Yang Gaoa4002072015-04-09 23:25:21 -0700447
448 ClientContext context;
449 StreamingOutputCallRequest request;
450
451 for (int i = 0; i < kNumResponseMessages; ++i) {
452 ResponseParameters* response_parameter = request.add_response_parameters();
453 response_parameter->set_size(kResponseMessageSize);
454 }
455 StreamingOutputCallResponse response;
456 std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
Sree Kuchibhotlaac7edec2015-10-20 16:45:52 -0700457 serviceStub_.Get()->StreamingOutputCall(&context, request));
Yang Gaoa4002072015-04-09 23:25:21 -0700458
459 int i = 0;
460 while (stream->Read(&response)) {
461 GPR_ASSERT(response.payload().body() ==
462 grpc::string(kResponseMessageSize, '\0'));
Sree Kuchibhotla7cecfa62016-01-06 08:09:02 -0800463 gpr_log(GPR_DEBUG, "received message %d", i);
Yang Gaoa4002072015-04-09 23:25:21 -0700464 usleep(kReceiveDelayMilliSeconds * 1000);
465 ++i;
466 }
467 GPR_ASSERT(kNumResponseMessages == i);
468 Status s = stream->Finish();
469
470 AssertOkOrPrintErrorStatus(s);
Sree Kuchibhotla7cecfa62016-01-06 08:09:02 -0800471 gpr_log(GPR_DEBUG, "Response streaming done.");
Yang Gaoa4002072015-04-09 23:25:21 -0700472}
473
474void InteropClient::DoHalfDuplex() {
Sree Kuchibhotla7cecfa62016-01-06 08:09:02 -0800475 gpr_log(GPR_DEBUG, "Sending half-duplex streaming rpc ...");
Yang Gaoa4002072015-04-09 23:25:21 -0700476
477 ClientContext context;
478 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
479 StreamingOutputCallResponse>>
Sree Kuchibhotlaac7edec2015-10-20 16:45:52 -0700480 stream(serviceStub_.Get()->HalfDuplexCall(&context));
Yang Gaoa4002072015-04-09 23:25:21 -0700481
482 StreamingOutputCallRequest request;
483 ResponseParameters* response_parameter = request.add_response_parameters();
484 for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
485 response_parameter->set_size(response_stream_sizes[i]);
486 GPR_ASSERT(stream->Write(request));
487 }
488 stream->WritesDone();
489
490 unsigned int i = 0;
491 StreamingOutputCallResponse response;
492 while (stream->Read(&response)) {
Yang Gaoa4002072015-04-09 23:25:21 -0700493 GPR_ASSERT(response.payload().body() ==
494 grpc::string(response_stream_sizes[i], '\0'));
495 ++i;
496 }
497 GPR_ASSERT(response_stream_sizes.size() == i);
498 Status s = stream->Finish();
499 AssertOkOrPrintErrorStatus(s);
Sree Kuchibhotla7cecfa62016-01-06 08:09:02 -0800500 gpr_log(GPR_DEBUG, "Half-duplex streaming rpc done.");
Yang Gaoa4002072015-04-09 23:25:21 -0700501}
502
503void InteropClient::DoPingPong() {
Sree Kuchibhotla7cecfa62016-01-06 08:09:02 -0800504 gpr_log(GPR_DEBUG, "Sending Ping Pong streaming rpc ...");
Yang Gaoa4002072015-04-09 23:25:21 -0700505
506 ClientContext context;
507 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
508 StreamingOutputCallResponse>>
Sree Kuchibhotlaac7edec2015-10-20 16:45:52 -0700509 stream(serviceStub_.Get()->FullDuplexCall(&context));
Yang Gaoa4002072015-04-09 23:25:21 -0700510
511 StreamingOutputCallRequest request;
512 request.set_response_type(PayloadType::COMPRESSABLE);
513 ResponseParameters* response_parameter = request.add_response_parameters();
514 Payload* payload = request.mutable_payload();
515 StreamingOutputCallResponse response;
516 for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) {
517 response_parameter->set_size(response_stream_sizes[i]);
518 payload->set_body(grpc::string(request_stream_sizes[i], '\0'));
519 GPR_ASSERT(stream->Write(request));
520 GPR_ASSERT(stream->Read(&response));
Yang Gaoa4002072015-04-09 23:25:21 -0700521 GPR_ASSERT(response.payload().body() ==
522 grpc::string(response_stream_sizes[i], '\0'));
523 }
524
525 stream->WritesDone();
526 GPR_ASSERT(!stream->Read(&response));
527 Status s = stream->Finish();
528 AssertOkOrPrintErrorStatus(s);
Sree Kuchibhotla7cecfa62016-01-06 08:09:02 -0800529 gpr_log(GPR_DEBUG, "Ping pong streaming done.");
Yang Gaoa4002072015-04-09 23:25:21 -0700530}
531
Yang Gao68d61572015-04-24 14:42:42 -0700532void InteropClient::DoCancelAfterBegin() {
Sree Kuchibhotla7cecfa62016-01-06 08:09:02 -0800533 gpr_log(GPR_DEBUG, "Sending request steaming rpc ...");
Yang Gao68d61572015-04-24 14:42:42 -0700534
535 ClientContext context;
536 StreamingInputCallRequest request;
537 StreamingInputCallResponse response;
538
539 std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream(
Sree Kuchibhotlaac7edec2015-10-20 16:45:52 -0700540 serviceStub_.Get()->StreamingInputCall(&context, &response));
Yang Gao68d61572015-04-24 14:42:42 -0700541
Sree Kuchibhotla7cecfa62016-01-06 08:09:02 -0800542 gpr_log(GPR_DEBUG, "Trying to cancel...");
Yang Gao68d61572015-04-24 14:42:42 -0700543 context.TryCancel();
544 Status s = stream->Finish();
Yang Gaoc1a2c312015-06-16 10:59:46 -0700545 GPR_ASSERT(s.error_code() == StatusCode::CANCELLED);
Sree Kuchibhotla7cecfa62016-01-06 08:09:02 -0800546 gpr_log(GPR_DEBUG, "Canceling streaming done.");
Yang Gao68d61572015-04-24 14:42:42 -0700547}
548
549void InteropClient::DoCancelAfterFirstResponse() {
Sree Kuchibhotla7cecfa62016-01-06 08:09:02 -0800550 gpr_log(GPR_DEBUG, "Sending Ping Pong streaming rpc ...");
Yang Gao68d61572015-04-24 14:42:42 -0700551
552 ClientContext context;
553 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
554 StreamingOutputCallResponse>>
Sree Kuchibhotlaac7edec2015-10-20 16:45:52 -0700555 stream(serviceStub_.Get()->FullDuplexCall(&context));
Yang Gao68d61572015-04-24 14:42:42 -0700556
557 StreamingOutputCallRequest request;
558 request.set_response_type(PayloadType::COMPRESSABLE);
559 ResponseParameters* response_parameter = request.add_response_parameters();
560 response_parameter->set_size(31415);
561 request.mutable_payload()->set_body(grpc::string(27182, '\0'));
562 StreamingOutputCallResponse response;
563 GPR_ASSERT(stream->Write(request));
564 GPR_ASSERT(stream->Read(&response));
Yang Gao68d61572015-04-24 14:42:42 -0700565 GPR_ASSERT(response.payload().body() == grpc::string(31415, '\0'));
Sree Kuchibhotla7cecfa62016-01-06 08:09:02 -0800566 gpr_log(GPR_DEBUG, "Trying to cancel...");
Yang Gao68d61572015-04-24 14:42:42 -0700567 context.TryCancel();
568
569 Status s = stream->Finish();
Sree Kuchibhotla7cecfa62016-01-06 08:09:02 -0800570 gpr_log(GPR_DEBUG, "Canceling pingpong streaming done.");
Yang Gao68d61572015-04-24 14:42:42 -0700571}
572
yang-g69563b92015-07-10 15:32:11 -0700573void InteropClient::DoTimeoutOnSleepingServer() {
Sree Kuchibhotla7cecfa62016-01-06 08:09:02 -0800574 gpr_log(GPR_DEBUG,
575 "Sending Ping Pong streaming rpc with a short deadline...");
yang-g69563b92015-07-10 15:32:11 -0700576
577 ClientContext context;
578 std::chrono::system_clock::time_point deadline =
579 std::chrono::system_clock::now() + std::chrono::milliseconds(1);
580 context.set_deadline(deadline);
581 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
582 StreamingOutputCallResponse>>
Sree Kuchibhotlaac7edec2015-10-20 16:45:52 -0700583 stream(serviceStub_.Get()->FullDuplexCall(&context));
yang-g69563b92015-07-10 15:32:11 -0700584
585 StreamingOutputCallRequest request;
586 request.mutable_payload()->set_body(grpc::string(27182, '\0'));
587 stream->Write(request);
588
589 Status s = stream->Finish();
590 GPR_ASSERT(s.error_code() == StatusCode::DEADLINE_EXCEEDED);
Sree Kuchibhotla7cecfa62016-01-06 08:09:02 -0800591 gpr_log(GPR_DEBUG, "Pingpong streaming timeout done.");
yang-g69563b92015-07-10 15:32:11 -0700592}
593
yang-g78bddc62015-09-21 10:32:17 -0700594void InteropClient::DoEmptyStream() {
Sree Kuchibhotla7cecfa62016-01-06 08:09:02 -0800595 gpr_log(GPR_DEBUG, "Starting empty_stream.");
yang-g78bddc62015-09-21 10:32:17 -0700596
597 ClientContext context;
598 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
599 StreamingOutputCallResponse>>
Sree Kuchibhotlaac7edec2015-10-20 16:45:52 -0700600 stream(serviceStub_.Get()->FullDuplexCall(&context));
yang-g78bddc62015-09-21 10:32:17 -0700601 stream->WritesDone();
602 StreamingOutputCallResponse response;
603 GPR_ASSERT(stream->Read(&response) == false);
604 Status s = stream->Finish();
605 AssertOkOrPrintErrorStatus(s);
Sree Kuchibhotla7cecfa62016-01-06 08:09:02 -0800606 gpr_log(GPR_DEBUG, "empty_stream done.");
yang-g78bddc62015-09-21 10:32:17 -0700607}
608
Abhishek Kumare1c867d2015-08-05 11:04:45 -0700609void InteropClient::DoStatusWithMessage() {
Sree Kuchibhotla7cecfa62016-01-06 08:09:02 -0800610 gpr_log(GPR_DEBUG,
611 "Sending RPC with a request for status code 2 and message");
Abhishek Kumare1c867d2015-08-05 11:04:45 -0700612
613 ClientContext context;
614 SimpleRequest request;
615 SimpleResponse response;
Craig Tillerd6c98df2015-08-18 09:33:44 -0700616 EchoStatus* requested_status = request.mutable_response_status();
Abhishek Kumare1c867d2015-08-05 11:04:45 -0700617 requested_status->set_code(grpc::StatusCode::UNKNOWN);
618 grpc::string test_msg = "This is a test message";
619 requested_status->set_message(test_msg);
620
Sree Kuchibhotlaac7edec2015-10-20 16:45:52 -0700621 Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
Abhishek Kumare1c867d2015-08-05 11:04:45 -0700622
623 GPR_ASSERT(s.error_code() == grpc::StatusCode::UNKNOWN);
624 GPR_ASSERT(s.error_message() == test_msg);
Sree Kuchibhotla7cecfa62016-01-06 08:09:02 -0800625 gpr_log(GPR_DEBUG, "Done testing Status and Message");
Abhishek Kumare1c867d2015-08-05 11:04:45 -0700626}
627
yang-gc10348a2016-02-19 16:05:10 -0800628void InteropClient::DoCustomMetadata() {
629 const grpc::string kEchoInitialMetadataKey("x-grpc-test-echo-initial");
630 const grpc::string kInitialMetadataValue("test_initial_metadata_value");
631 const grpc::string kEchoTrailingBinMetadataKey(
632 "x-grpc-test-echo-trailing-bin");
633 const grpc::string kTrailingBinValue("\x0a\x0b\x0a\x0b\x0a\x0b");
634 ;
635
636 {
637 gpr_log(GPR_DEBUG, "Sending RPC with custom metadata");
638 ClientContext context;
639 context.AddMetadata(kEchoInitialMetadataKey, kInitialMetadataValue);
640 context.AddMetadata(kEchoTrailingBinMetadataKey, kTrailingBinValue);
641 SimpleRequest request;
642 SimpleResponse response;
643 request.set_response_size(kLargeResponseSize);
644 grpc::string payload(kLargeRequestSize, '\0');
645 request.mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
646
647 Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
648 AssertOkOrPrintErrorStatus(s);
649 const auto& server_initial_metadata = context.GetServerInitialMetadata();
650 auto iter = server_initial_metadata.find(kEchoInitialMetadataKey);
651 GPR_ASSERT(iter != server_initial_metadata.end());
652 GPR_ASSERT(iter->second.data() == kInitialMetadataValue);
653 const auto& server_trailing_metadata = context.GetServerTrailingMetadata();
654 iter = server_trailing_metadata.find(kEchoTrailingBinMetadataKey);
655 GPR_ASSERT(iter != server_trailing_metadata.end());
656 GPR_ASSERT(grpc::string(iter->second.begin(), iter->second.end()) ==
657 kTrailingBinValue);
658
659 gpr_log(GPR_DEBUG, "Done testing RPC with custom metadata");
660 }
661
662 {
663 gpr_log(GPR_DEBUG, "Sending stream with custom metadata");
664 ClientContext context;
665 context.AddMetadata(kEchoInitialMetadataKey, kInitialMetadataValue);
666 context.AddMetadata(kEchoTrailingBinMetadataKey, kTrailingBinValue);
667 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
668 StreamingOutputCallResponse>>
669 stream(serviceStub_.Get()->FullDuplexCall(&context));
670
671 StreamingOutputCallRequest request;
672 request.set_response_type(PayloadType::COMPRESSABLE);
673 ResponseParameters* response_parameter = request.add_response_parameters();
674 response_parameter->set_size(kLargeResponseSize);
675 grpc::string payload(kLargeRequestSize, '\0');
676 request.mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
677 StreamingOutputCallResponse response;
678 GPR_ASSERT(stream->Write(request));
679 stream->WritesDone();
680 GPR_ASSERT(stream->Read(&response));
681 GPR_ASSERT(response.payload().body() ==
682 grpc::string(kLargeResponseSize, '\0'));
683 GPR_ASSERT(!stream->Read(&response));
684 Status s = stream->Finish();
685 AssertOkOrPrintErrorStatus(s);
686 const auto& server_initial_metadata = context.GetServerInitialMetadata();
687 auto iter = server_initial_metadata.find(kEchoInitialMetadataKey);
688 GPR_ASSERT(iter != server_initial_metadata.end());
689 GPR_ASSERT(iter->second.data() == kInitialMetadataValue);
690 const auto& server_trailing_metadata = context.GetServerTrailingMetadata();
691 iter = server_trailing_metadata.find(kEchoTrailingBinMetadataKey);
692 GPR_ASSERT(iter != server_trailing_metadata.end());
693 GPR_ASSERT(grpc::string(iter->second.begin(), iter->second.end()) ==
694 kTrailingBinValue);
695
696 gpr_log(GPR_DEBUG, "Done testing stream with custom metadata");
697 }
698}
699
Yang Gaoa4002072015-04-09 23:25:21 -0700700} // namespace testing
701} // namespace grpc