blob: 92bc872f0115ca33455bcf733a22a3cd3ce51550 [file] [log] [blame]
Yang Gaoa4002072015-04-09 23:25:21 -07001/*
2 *
3 * Copyright 2015, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "test/cpp/interop/interop_client.h"
35
36#include <memory>
37
38#include <unistd.h>
39
40#include <grpc/grpc.h>
41#include <grpc/support/log.h>
42#include <grpc++/channel_interface.h>
43#include <grpc++/client_context.h>
yang-g5bf510b2015-07-14 10:54:29 -070044#include <grpc++/credentials.h>
Yang Gaoa4002072015-04-09 23:25:21 -070045#include <grpc++/status.h>
46#include <grpc++/stream.h>
yang-g5bf510b2015-07-14 10:54:29 -070047#include "test/cpp/interop/client_helper.h"
Abhishek Kumar1b3e3cd2015-04-16 20:10:29 -070048#include "test/proto/test.grpc.pb.h"
Abhishek Kumar60572d42015-04-16 20:45:25 -070049#include "test/proto/empty.grpc.pb.h"
50#include "test/proto/messages.grpc.pb.h"
Yang Gaoa4002072015-04-09 23:25:21 -070051
52namespace grpc {
53namespace testing {
54
55namespace {
56// The same value is defined by the Java client.
57const std::vector<int> request_stream_sizes = {27182, 8, 1828, 45904};
58const std::vector<int> response_stream_sizes = {31415, 9, 2653, 58979};
59const int kNumResponseMessages = 2000;
60const int kResponseMessageSize = 1030;
61const int kReceiveDelayMilliSeconds = 20;
Xudong Maa5861f32015-05-26 15:24:11 -070062const int kLargeRequestSize = 271828;
63const int kLargeResponseSize = 314159;
Yang Gaoa4002072015-04-09 23:25:21 -070064} // namespace
65
66InteropClient::InteropClient(std::shared_ptr<ChannelInterface> channel)
67 : channel_(channel) {}
68
69void InteropClient::AssertOkOrPrintErrorStatus(const Status& s) {
Yang Gaoc1a2c312015-06-16 10:59:46 -070070 if (s.ok()) {
Yang Gaoa4002072015-04-09 23:25:21 -070071 return;
72 }
Yang Gaoc1a2c312015-06-16 10:59:46 -070073 gpr_log(GPR_INFO, "Error status code: %d, message: %s", s.error_code(),
74 s.error_message().c_str());
Yang Gaoa4002072015-04-09 23:25:21 -070075 GPR_ASSERT(0);
76}
77
78void InteropClient::DoEmpty() {
79 gpr_log(GPR_INFO, "Sending an empty rpc...");
80 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
81
82 Empty request = Empty::default_instance();
83 Empty response = Empty::default_instance();
84 ClientContext context;
85
86 Status s = stub->EmptyCall(&context, request, &response);
87 AssertOkOrPrintErrorStatus(s);
88
89 gpr_log(GPR_INFO, "Empty rpc done.");
90}
91
92// Shared code to set large payload, make rpc and check response payload.
93void InteropClient::PerformLargeUnary(SimpleRequest* request,
94 SimpleResponse* response) {
95 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
96
97 ClientContext context;
98 request->set_response_type(PayloadType::COMPRESSABLE);
99 request->set_response_size(kLargeResponseSize);
100 grpc::string payload(kLargeRequestSize, '\0');
101 request->mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
102
103 Status s = stub->UnaryCall(&context, *request, response);
104
105 AssertOkOrPrintErrorStatus(s);
106 GPR_ASSERT(response->payload().type() == PayloadType::COMPRESSABLE);
107 GPR_ASSERT(response->payload().body() ==
108 grpc::string(kLargeResponseSize, '\0'));
109}
110
111void InteropClient::DoComputeEngineCreds(
112 const grpc::string& default_service_account,
113 const grpc::string& oauth_scope) {
114 gpr_log(GPR_INFO,
115 "Sending a large unary rpc with compute engine credentials ...");
116 SimpleRequest request;
117 SimpleResponse response;
118 request.set_fill_username(true);
119 request.set_fill_oauth_scope(true);
120 PerformLargeUnary(&request, &response);
121 gpr_log(GPR_INFO, "Got username %s", response.username().c_str());
122 gpr_log(GPR_INFO, "Got oauth_scope %s", response.oauth_scope().c_str());
123 GPR_ASSERT(!response.username().empty());
124 GPR_ASSERT(response.username().c_str() == default_service_account);
125 GPR_ASSERT(!response.oauth_scope().empty());
126 const char* oauth_scope_str = response.oauth_scope().c_str();
127 GPR_ASSERT(oauth_scope.find(oauth_scope_str) != grpc::string::npos);
128 gpr_log(GPR_INFO, "Large unary with compute engine creds done.");
129}
130
131void InteropClient::DoServiceAccountCreds(const grpc::string& username,
132 const grpc::string& oauth_scope) {
133 gpr_log(GPR_INFO,
134 "Sending a large unary rpc with service account credentials ...");
135 SimpleRequest request;
136 SimpleResponse response;
137 request.set_fill_username(true);
138 request.set_fill_oauth_scope(true);
139 PerformLargeUnary(&request, &response);
140 GPR_ASSERT(!response.username().empty());
141 GPR_ASSERT(!response.oauth_scope().empty());
142 GPR_ASSERT(username.find(response.username()) != grpc::string::npos);
143 const char* oauth_scope_str = response.oauth_scope().c_str();
144 GPR_ASSERT(oauth_scope.find(oauth_scope_str) != grpc::string::npos);
145 gpr_log(GPR_INFO, "Large unary with service account creds done.");
146}
147
yang-gbe5f0592015-07-13 11:11:50 -0700148void InteropClient::DoOauth2AuthToken(const grpc::string& username,
149 const grpc::string& oauth_scope) {
150 gpr_log(GPR_INFO,
151 "Sending a large unary rpc with raw oauth2 access token ...");
152 SimpleRequest request;
153 SimpleResponse response;
154 request.set_fill_username(true);
155 request.set_fill_oauth_scope(true);
156 PerformLargeUnary(&request, &response);
157 GPR_ASSERT(!response.username().empty());
158 GPR_ASSERT(!response.oauth_scope().empty());
159 GPR_ASSERT(username.find(response.username()) != grpc::string::npos);
160 const char* oauth_scope_str = response.oauth_scope().c_str();
161 GPR_ASSERT(oauth_scope.find(oauth_scope_str) != grpc::string::npos);
162 gpr_log(GPR_INFO, "Large unary with oauth2 access token done.");
163}
164
yang-g5bf510b2015-07-14 10:54:29 -0700165void InteropClient::DoPerRpcCreds(const grpc::string& username,
166 const grpc::string& oauth_scope) {
167 gpr_log(GPR_INFO,
168 "Sending a large unary rpc with per-rpc raw oauth2 access token ...");
169 SimpleRequest request;
170 SimpleResponse response;
171 request.set_fill_username(true);
172 request.set_fill_oauth_scope(true);
173 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
174
175 ClientContext context;
176 grpc::string access_token = GetOauth2AccessToken();
177 std::shared_ptr<Credentials> creds = AccessTokenCredentials(access_token);
178 context.set_credentials(creds);
179 request.set_response_type(PayloadType::COMPRESSABLE);
180 request.set_response_size(kLargeResponseSize);
181 grpc::string payload(kLargeRequestSize, '\0');
182 request.mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
183
184 Status s = stub->UnaryCall(&context, request, &response);
185
186 AssertOkOrPrintErrorStatus(s);
187 GPR_ASSERT(response.payload().type() == PayloadType::COMPRESSABLE);
188 GPR_ASSERT(response.payload().body() ==
189 grpc::string(kLargeResponseSize, '\0'));
190 GPR_ASSERT(!response.username().empty());
191 GPR_ASSERT(!response.oauth_scope().empty());
192 GPR_ASSERT(username.find(response.username()) != grpc::string::npos);
193 const char* oauth_scope_str = response.oauth_scope().c_str();
194 GPR_ASSERT(oauth_scope.find(oauth_scope_str) != grpc::string::npos);
195 gpr_log(GPR_INFO, "Large unary with per-rpc oauth2 access token done.");
196}
197
Yang Gaoa4002072015-04-09 23:25:21 -0700198void InteropClient::DoJwtTokenCreds(const grpc::string& username) {
199 gpr_log(GPR_INFO, "Sending a large unary rpc with JWT token credentials ...");
200 SimpleRequest request;
201 SimpleResponse response;
202 request.set_fill_username(true);
203 PerformLargeUnary(&request, &response);
204 GPR_ASSERT(!response.username().empty());
205 GPR_ASSERT(username.find(response.username()) != grpc::string::npos);
206 gpr_log(GPR_INFO, "Large unary with JWT token creds done.");
207}
208
209void InteropClient::DoLargeUnary() {
210 gpr_log(GPR_INFO, "Sending a large unary rpc...");
211 SimpleRequest request;
212 SimpleResponse response;
213 PerformLargeUnary(&request, &response);
214 gpr_log(GPR_INFO, "Large unary done.");
215}
216
217void InteropClient::DoRequestStreaming() {
218 gpr_log(GPR_INFO, "Sending request steaming rpc ...");
219 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
220
221 ClientContext context;
222 StreamingInputCallRequest request;
223 StreamingInputCallResponse response;
224
225 std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream(
226 stub->StreamingInputCall(&context, &response));
227
228 int aggregated_payload_size = 0;
229 for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) {
230 Payload* payload = request.mutable_payload();
231 payload->set_body(grpc::string(request_stream_sizes[i], '\0'));
232 GPR_ASSERT(stream->Write(request));
233 aggregated_payload_size += request_stream_sizes[i];
234 }
235 stream->WritesDone();
236 Status s = stream->Finish();
237
238 GPR_ASSERT(response.aggregated_payload_size() == aggregated_payload_size);
239 AssertOkOrPrintErrorStatus(s);
240 gpr_log(GPR_INFO, "Request streaming done.");
241}
242
243void InteropClient::DoResponseStreaming() {
244 gpr_log(GPR_INFO, "Receiving response steaming rpc ...");
245 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
246
247 ClientContext context;
248 StreamingOutputCallRequest request;
249 for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
250 ResponseParameters* response_parameter = request.add_response_parameters();
251 response_parameter->set_size(response_stream_sizes[i]);
252 }
253 StreamingOutputCallResponse response;
254 std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
255 stub->StreamingOutputCall(&context, request));
256
257 unsigned int i = 0;
258 while (stream->Read(&response)) {
259 GPR_ASSERT(response.payload().body() ==
260 grpc::string(response_stream_sizes[i], '\0'));
261 ++i;
262 }
263 GPR_ASSERT(response_stream_sizes.size() == i);
264 Status s = stream->Finish();
265
266 AssertOkOrPrintErrorStatus(s);
267 gpr_log(GPR_INFO, "Response streaming done.");
268}
269
270void InteropClient::DoResponseStreamingWithSlowConsumer() {
271 gpr_log(GPR_INFO, "Receiving response steaming rpc with slow consumer ...");
272 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
273
274 ClientContext context;
275 StreamingOutputCallRequest request;
276
277 for (int i = 0; i < kNumResponseMessages; ++i) {
278 ResponseParameters* response_parameter = request.add_response_parameters();
279 response_parameter->set_size(kResponseMessageSize);
280 }
281 StreamingOutputCallResponse response;
282 std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
283 stub->StreamingOutputCall(&context, request));
284
285 int i = 0;
286 while (stream->Read(&response)) {
287 GPR_ASSERT(response.payload().body() ==
288 grpc::string(kResponseMessageSize, '\0'));
289 gpr_log(GPR_INFO, "received message %d", i);
290 usleep(kReceiveDelayMilliSeconds * 1000);
291 ++i;
292 }
293 GPR_ASSERT(kNumResponseMessages == i);
294 Status s = stream->Finish();
295
296 AssertOkOrPrintErrorStatus(s);
297 gpr_log(GPR_INFO, "Response streaming done.");
298}
299
300void InteropClient::DoHalfDuplex() {
301 gpr_log(GPR_INFO, "Sending half-duplex streaming rpc ...");
302 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
303
304 ClientContext context;
305 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
306 StreamingOutputCallResponse>>
307 stream(stub->HalfDuplexCall(&context));
308
309 StreamingOutputCallRequest request;
310 ResponseParameters* response_parameter = request.add_response_parameters();
311 for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
312 response_parameter->set_size(response_stream_sizes[i]);
313 GPR_ASSERT(stream->Write(request));
314 }
315 stream->WritesDone();
316
317 unsigned int i = 0;
318 StreamingOutputCallResponse response;
319 while (stream->Read(&response)) {
320 GPR_ASSERT(response.payload().has_body());
321 GPR_ASSERT(response.payload().body() ==
322 grpc::string(response_stream_sizes[i], '\0'));
323 ++i;
324 }
325 GPR_ASSERT(response_stream_sizes.size() == i);
326 Status s = stream->Finish();
327 AssertOkOrPrintErrorStatus(s);
328 gpr_log(GPR_INFO, "Half-duplex streaming rpc done.");
329}
330
331void InteropClient::DoPingPong() {
332 gpr_log(GPR_INFO, "Sending Ping Pong streaming rpc ...");
333 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
334
335 ClientContext context;
336 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
337 StreamingOutputCallResponse>>
338 stream(stub->FullDuplexCall(&context));
339
340 StreamingOutputCallRequest request;
341 request.set_response_type(PayloadType::COMPRESSABLE);
342 ResponseParameters* response_parameter = request.add_response_parameters();
343 Payload* payload = request.mutable_payload();
344 StreamingOutputCallResponse response;
345 for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) {
346 response_parameter->set_size(response_stream_sizes[i]);
347 payload->set_body(grpc::string(request_stream_sizes[i], '\0'));
348 GPR_ASSERT(stream->Write(request));
349 GPR_ASSERT(stream->Read(&response));
350 GPR_ASSERT(response.payload().has_body());
351 GPR_ASSERT(response.payload().body() ==
352 grpc::string(response_stream_sizes[i], '\0'));
353 }
354
355 stream->WritesDone();
356 GPR_ASSERT(!stream->Read(&response));
357 Status s = stream->Finish();
358 AssertOkOrPrintErrorStatus(s);
359 gpr_log(GPR_INFO, "Ping pong streaming done.");
360}
361
Yang Gao68d61572015-04-24 14:42:42 -0700362void InteropClient::DoCancelAfterBegin() {
363 gpr_log(GPR_INFO, "Sending request steaming rpc ...");
364 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
365
366 ClientContext context;
367 StreamingInputCallRequest request;
368 StreamingInputCallResponse response;
369
370 std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream(
371 stub->StreamingInputCall(&context, &response));
372
373 gpr_log(GPR_INFO, "Trying to cancel...");
374 context.TryCancel();
375 Status s = stream->Finish();
Yang Gaoc1a2c312015-06-16 10:59:46 -0700376 GPR_ASSERT(s.error_code() == StatusCode::CANCELLED);
Yang Gao68d61572015-04-24 14:42:42 -0700377 gpr_log(GPR_INFO, "Canceling streaming done.");
378}
379
380void InteropClient::DoCancelAfterFirstResponse() {
381 gpr_log(GPR_INFO, "Sending Ping Pong streaming rpc ...");
382 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
383
384 ClientContext context;
385 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
386 StreamingOutputCallResponse>>
387 stream(stub->FullDuplexCall(&context));
388
389 StreamingOutputCallRequest request;
390 request.set_response_type(PayloadType::COMPRESSABLE);
391 ResponseParameters* response_parameter = request.add_response_parameters();
392 response_parameter->set_size(31415);
393 request.mutable_payload()->set_body(grpc::string(27182, '\0'));
394 StreamingOutputCallResponse response;
395 GPR_ASSERT(stream->Write(request));
396 GPR_ASSERT(stream->Read(&response));
397 GPR_ASSERT(response.payload().has_body());
398 GPR_ASSERT(response.payload().body() == grpc::string(31415, '\0'));
399 gpr_log(GPR_INFO, "Trying to cancel...");
400 context.TryCancel();
401
402 Status s = stream->Finish();
403 gpr_log(GPR_INFO, "Canceling pingpong streaming done.");
404}
405
yang-g69563b92015-07-10 15:32:11 -0700406void InteropClient::DoTimeoutOnSleepingServer() {
407 gpr_log(GPR_INFO, "Sending Ping Pong streaming rpc with a short deadline...");
408 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
409
410 ClientContext context;
411 std::chrono::system_clock::time_point deadline =
412 std::chrono::system_clock::now() + std::chrono::milliseconds(1);
413 context.set_deadline(deadline);
414 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
415 StreamingOutputCallResponse>>
416 stream(stub->FullDuplexCall(&context));
417
418 StreamingOutputCallRequest request;
419 request.mutable_payload()->set_body(grpc::string(27182, '\0'));
420 stream->Write(request);
421
422 Status s = stream->Finish();
423 GPR_ASSERT(s.error_code() == StatusCode::DEADLINE_EXCEEDED);
424 gpr_log(GPR_INFO, "Pingpong streaming timeout done.");
425}
426
Yang Gaoa4002072015-04-09 23:25:21 -0700427} // namespace testing
428} // namespace grpc