blob: c77cbce5c41589f188cccdbfd73df72c37c1c796 [file] [log] [blame]
Yang Gaoa4002072015-04-09 23:25:21 -07001/*
2 *
3 * Copyright 2015, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "test/cpp/interop/interop_client.h"
35
36#include <memory>
37
38#include <unistd.h>
39
40#include <grpc/grpc.h>
41#include <grpc/support/log.h>
42#include <grpc++/channel_interface.h>
43#include <grpc++/client_context.h>
yang-g5bf510b2015-07-14 10:54:29 -070044#include <grpc++/credentials.h>
Yang Gaoa4002072015-04-09 23:25:21 -070045#include <grpc++/status.h>
46#include <grpc++/stream.h>
yang-g5bf510b2015-07-14 10:54:29 -070047#include "test/cpp/interop/client_helper.h"
Abhishek Kumar1b3e3cd2015-04-16 20:10:29 -070048#include "test/proto/test.grpc.pb.h"
Abhishek Kumar60572d42015-04-16 20:45:25 -070049#include "test/proto/empty.grpc.pb.h"
50#include "test/proto/messages.grpc.pb.h"
Yang Gaoa4002072015-04-09 23:25:21 -070051
52namespace grpc {
53namespace testing {
54
55namespace {
56// The same value is defined by the Java client.
57const std::vector<int> request_stream_sizes = {27182, 8, 1828, 45904};
58const std::vector<int> response_stream_sizes = {31415, 9, 2653, 58979};
59const int kNumResponseMessages = 2000;
60const int kResponseMessageSize = 1030;
61const int kReceiveDelayMilliSeconds = 20;
Xudong Maa5861f32015-05-26 15:24:11 -070062const int kLargeRequestSize = 271828;
63const int kLargeResponseSize = 314159;
Yang Gaoa4002072015-04-09 23:25:21 -070064} // namespace
65
66InteropClient::InteropClient(std::shared_ptr<ChannelInterface> channel)
67 : channel_(channel) {}
68
69void InteropClient::AssertOkOrPrintErrorStatus(const Status& s) {
Yang Gaoc1a2c312015-06-16 10:59:46 -070070 if (s.ok()) {
Yang Gaoa4002072015-04-09 23:25:21 -070071 return;
72 }
Yang Gaoc1a2c312015-06-16 10:59:46 -070073 gpr_log(GPR_INFO, "Error status code: %d, message: %s", s.error_code(),
74 s.error_message().c_str());
Yang Gaoa4002072015-04-09 23:25:21 -070075 GPR_ASSERT(0);
76}
77
78void InteropClient::DoEmpty() {
79 gpr_log(GPR_INFO, "Sending an empty rpc...");
80 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
81
82 Empty request = Empty::default_instance();
83 Empty response = Empty::default_instance();
84 ClientContext context;
85
86 Status s = stub->EmptyCall(&context, request, &response);
87 AssertOkOrPrintErrorStatus(s);
88
89 gpr_log(GPR_INFO, "Empty rpc done.");
90}
91
92// Shared code to set large payload, make rpc and check response payload.
93void InteropClient::PerformLargeUnary(SimpleRequest* request,
94 SimpleResponse* response) {
95 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
96
97 ClientContext context;
98 request->set_response_type(PayloadType::COMPRESSABLE);
99 request->set_response_size(kLargeResponseSize);
100 grpc::string payload(kLargeRequestSize, '\0');
101 request->mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
102
103 Status s = stub->UnaryCall(&context, *request, response);
104
105 AssertOkOrPrintErrorStatus(s);
106 GPR_ASSERT(response->payload().type() == PayloadType::COMPRESSABLE);
107 GPR_ASSERT(response->payload().body() ==
108 grpc::string(kLargeResponseSize, '\0'));
109}
110
111void InteropClient::DoComputeEngineCreds(
112 const grpc::string& default_service_account,
113 const grpc::string& oauth_scope) {
114 gpr_log(GPR_INFO,
115 "Sending a large unary rpc with compute engine credentials ...");
116 SimpleRequest request;
117 SimpleResponse response;
118 request.set_fill_username(true);
119 request.set_fill_oauth_scope(true);
120 PerformLargeUnary(&request, &response);
121 gpr_log(GPR_INFO, "Got username %s", response.username().c_str());
122 gpr_log(GPR_INFO, "Got oauth_scope %s", response.oauth_scope().c_str());
123 GPR_ASSERT(!response.username().empty());
124 GPR_ASSERT(response.username().c_str() == default_service_account);
125 GPR_ASSERT(!response.oauth_scope().empty());
126 const char* oauth_scope_str = response.oauth_scope().c_str();
127 GPR_ASSERT(oauth_scope.find(oauth_scope_str) != grpc::string::npos);
128 gpr_log(GPR_INFO, "Large unary with compute engine creds done.");
129}
130
131void InteropClient::DoServiceAccountCreds(const grpc::string& username,
132 const grpc::string& oauth_scope) {
133 gpr_log(GPR_INFO,
134 "Sending a large unary rpc with service account credentials ...");
135 SimpleRequest request;
136 SimpleResponse response;
137 request.set_fill_username(true);
138 request.set_fill_oauth_scope(true);
139 PerformLargeUnary(&request, &response);
140 GPR_ASSERT(!response.username().empty());
141 GPR_ASSERT(!response.oauth_scope().empty());
142 GPR_ASSERT(username.find(response.username()) != grpc::string::npos);
143 const char* oauth_scope_str = response.oauth_scope().c_str();
144 GPR_ASSERT(oauth_scope.find(oauth_scope_str) != grpc::string::npos);
145 gpr_log(GPR_INFO, "Large unary with service account creds done.");
146}
147
yang-gbe5f0592015-07-13 11:11:50 -0700148void InteropClient::DoOauth2AuthToken(const grpc::string& username,
149 const grpc::string& oauth_scope) {
150 gpr_log(GPR_INFO,
151 "Sending a large unary rpc with raw oauth2 access token ...");
152 SimpleRequest request;
153 SimpleResponse response;
154 request.set_fill_username(true);
155 request.set_fill_oauth_scope(true);
156 PerformLargeUnary(&request, &response);
157 GPR_ASSERT(!response.username().empty());
158 GPR_ASSERT(!response.oauth_scope().empty());
159 GPR_ASSERT(username.find(response.username()) != grpc::string::npos);
160 const char* oauth_scope_str = response.oauth_scope().c_str();
161 GPR_ASSERT(oauth_scope.find(oauth_scope_str) != grpc::string::npos);
162 gpr_log(GPR_INFO, "Large unary with oauth2 access token done.");
163}
164
yang-g5bf510b2015-07-14 10:54:29 -0700165void InteropClient::DoPerRpcCreds(const grpc::string& username,
166 const grpc::string& oauth_scope) {
167 gpr_log(GPR_INFO,
yang-g8c31ee22015-07-15 13:36:27 -0700168 "Sending a unary rpc with per-rpc raw oauth2 access token ...");
yang-g5bf510b2015-07-14 10:54:29 -0700169 SimpleRequest request;
170 SimpleResponse response;
171 request.set_fill_username(true);
172 request.set_fill_oauth_scope(true);
173 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
174
175 ClientContext context;
176 grpc::string access_token = GetOauth2AccessToken();
177 std::shared_ptr<Credentials> creds = AccessTokenCredentials(access_token);
178 context.set_credentials(creds);
yang-g5bf510b2015-07-14 10:54:29 -0700179
180 Status s = stub->UnaryCall(&context, request, &response);
181
182 AssertOkOrPrintErrorStatus(s);
yang-g5bf510b2015-07-14 10:54:29 -0700183 GPR_ASSERT(!response.username().empty());
184 GPR_ASSERT(!response.oauth_scope().empty());
185 GPR_ASSERT(username.find(response.username()) != grpc::string::npos);
186 const char* oauth_scope_str = response.oauth_scope().c_str();
187 GPR_ASSERT(oauth_scope.find(oauth_scope_str) != grpc::string::npos);
yang-g8c31ee22015-07-15 13:36:27 -0700188 gpr_log(GPR_INFO, "Unary with per-rpc oauth2 access token done.");
yang-g5bf510b2015-07-14 10:54:29 -0700189}
190
Yang Gaoa4002072015-04-09 23:25:21 -0700191void InteropClient::DoJwtTokenCreds(const grpc::string& username) {
192 gpr_log(GPR_INFO, "Sending a large unary rpc with JWT token credentials ...");
193 SimpleRequest request;
194 SimpleResponse response;
195 request.set_fill_username(true);
196 PerformLargeUnary(&request, &response);
197 GPR_ASSERT(!response.username().empty());
198 GPR_ASSERT(username.find(response.username()) != grpc::string::npos);
199 gpr_log(GPR_INFO, "Large unary with JWT token creds done.");
200}
201
202void InteropClient::DoLargeUnary() {
203 gpr_log(GPR_INFO, "Sending a large unary rpc...");
204 SimpleRequest request;
205 SimpleResponse response;
206 PerformLargeUnary(&request, &response);
207 gpr_log(GPR_INFO, "Large unary done.");
208}
209
210void InteropClient::DoRequestStreaming() {
211 gpr_log(GPR_INFO, "Sending request steaming rpc ...");
212 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
213
214 ClientContext context;
215 StreamingInputCallRequest request;
216 StreamingInputCallResponse response;
217
218 std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream(
219 stub->StreamingInputCall(&context, &response));
220
221 int aggregated_payload_size = 0;
222 for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) {
223 Payload* payload = request.mutable_payload();
224 payload->set_body(grpc::string(request_stream_sizes[i], '\0'));
225 GPR_ASSERT(stream->Write(request));
226 aggregated_payload_size += request_stream_sizes[i];
227 }
228 stream->WritesDone();
229 Status s = stream->Finish();
230
231 GPR_ASSERT(response.aggregated_payload_size() == aggregated_payload_size);
232 AssertOkOrPrintErrorStatus(s);
233 gpr_log(GPR_INFO, "Request streaming done.");
234}
235
236void InteropClient::DoResponseStreaming() {
237 gpr_log(GPR_INFO, "Receiving response steaming rpc ...");
238 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
239
240 ClientContext context;
241 StreamingOutputCallRequest request;
242 for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
243 ResponseParameters* response_parameter = request.add_response_parameters();
244 response_parameter->set_size(response_stream_sizes[i]);
245 }
246 StreamingOutputCallResponse response;
247 std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
248 stub->StreamingOutputCall(&context, request));
249
250 unsigned int i = 0;
251 while (stream->Read(&response)) {
252 GPR_ASSERT(response.payload().body() ==
253 grpc::string(response_stream_sizes[i], '\0'));
254 ++i;
255 }
256 GPR_ASSERT(response_stream_sizes.size() == i);
257 Status s = stream->Finish();
258
259 AssertOkOrPrintErrorStatus(s);
260 gpr_log(GPR_INFO, "Response streaming done.");
261}
262
263void InteropClient::DoResponseStreamingWithSlowConsumer() {
264 gpr_log(GPR_INFO, "Receiving response steaming rpc with slow consumer ...");
265 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
266
267 ClientContext context;
268 StreamingOutputCallRequest request;
269
270 for (int i = 0; i < kNumResponseMessages; ++i) {
271 ResponseParameters* response_parameter = request.add_response_parameters();
272 response_parameter->set_size(kResponseMessageSize);
273 }
274 StreamingOutputCallResponse response;
275 std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
276 stub->StreamingOutputCall(&context, request));
277
278 int i = 0;
279 while (stream->Read(&response)) {
280 GPR_ASSERT(response.payload().body() ==
281 grpc::string(kResponseMessageSize, '\0'));
282 gpr_log(GPR_INFO, "received message %d", i);
283 usleep(kReceiveDelayMilliSeconds * 1000);
284 ++i;
285 }
286 GPR_ASSERT(kNumResponseMessages == i);
287 Status s = stream->Finish();
288
289 AssertOkOrPrintErrorStatus(s);
290 gpr_log(GPR_INFO, "Response streaming done.");
291}
292
293void InteropClient::DoHalfDuplex() {
294 gpr_log(GPR_INFO, "Sending half-duplex streaming rpc ...");
295 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
296
297 ClientContext context;
298 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
299 StreamingOutputCallResponse>>
300 stream(stub->HalfDuplexCall(&context));
301
302 StreamingOutputCallRequest request;
303 ResponseParameters* response_parameter = request.add_response_parameters();
304 for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
305 response_parameter->set_size(response_stream_sizes[i]);
306 GPR_ASSERT(stream->Write(request));
307 }
308 stream->WritesDone();
309
310 unsigned int i = 0;
311 StreamingOutputCallResponse response;
312 while (stream->Read(&response)) {
313 GPR_ASSERT(response.payload().has_body());
314 GPR_ASSERT(response.payload().body() ==
315 grpc::string(response_stream_sizes[i], '\0'));
316 ++i;
317 }
318 GPR_ASSERT(response_stream_sizes.size() == i);
319 Status s = stream->Finish();
320 AssertOkOrPrintErrorStatus(s);
321 gpr_log(GPR_INFO, "Half-duplex streaming rpc done.");
322}
323
324void InteropClient::DoPingPong() {
325 gpr_log(GPR_INFO, "Sending Ping Pong streaming rpc ...");
326 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
327
328 ClientContext context;
329 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
330 StreamingOutputCallResponse>>
331 stream(stub->FullDuplexCall(&context));
332
333 StreamingOutputCallRequest request;
334 request.set_response_type(PayloadType::COMPRESSABLE);
335 ResponseParameters* response_parameter = request.add_response_parameters();
336 Payload* payload = request.mutable_payload();
337 StreamingOutputCallResponse response;
338 for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) {
339 response_parameter->set_size(response_stream_sizes[i]);
340 payload->set_body(grpc::string(request_stream_sizes[i], '\0'));
341 GPR_ASSERT(stream->Write(request));
342 GPR_ASSERT(stream->Read(&response));
343 GPR_ASSERT(response.payload().has_body());
344 GPR_ASSERT(response.payload().body() ==
345 grpc::string(response_stream_sizes[i], '\0'));
346 }
347
348 stream->WritesDone();
349 GPR_ASSERT(!stream->Read(&response));
350 Status s = stream->Finish();
351 AssertOkOrPrintErrorStatus(s);
352 gpr_log(GPR_INFO, "Ping pong streaming done.");
353}
354
Yang Gao68d61572015-04-24 14:42:42 -0700355void InteropClient::DoCancelAfterBegin() {
356 gpr_log(GPR_INFO, "Sending request steaming rpc ...");
357 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
358
359 ClientContext context;
360 StreamingInputCallRequest request;
361 StreamingInputCallResponse response;
362
363 std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream(
364 stub->StreamingInputCall(&context, &response));
365
366 gpr_log(GPR_INFO, "Trying to cancel...");
367 context.TryCancel();
368 Status s = stream->Finish();
Yang Gaoc1a2c312015-06-16 10:59:46 -0700369 GPR_ASSERT(s.error_code() == StatusCode::CANCELLED);
Yang Gao68d61572015-04-24 14:42:42 -0700370 gpr_log(GPR_INFO, "Canceling streaming done.");
371}
372
373void InteropClient::DoCancelAfterFirstResponse() {
374 gpr_log(GPR_INFO, "Sending Ping Pong streaming rpc ...");
375 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
376
377 ClientContext context;
378 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
379 StreamingOutputCallResponse>>
380 stream(stub->FullDuplexCall(&context));
381
382 StreamingOutputCallRequest request;
383 request.set_response_type(PayloadType::COMPRESSABLE);
384 ResponseParameters* response_parameter = request.add_response_parameters();
385 response_parameter->set_size(31415);
386 request.mutable_payload()->set_body(grpc::string(27182, '\0'));
387 StreamingOutputCallResponse response;
388 GPR_ASSERT(stream->Write(request));
389 GPR_ASSERT(stream->Read(&response));
390 GPR_ASSERT(response.payload().has_body());
391 GPR_ASSERT(response.payload().body() == grpc::string(31415, '\0'));
392 gpr_log(GPR_INFO, "Trying to cancel...");
393 context.TryCancel();
394
395 Status s = stream->Finish();
396 gpr_log(GPR_INFO, "Canceling pingpong streaming done.");
397}
398
yang-g69563b92015-07-10 15:32:11 -0700399void InteropClient::DoTimeoutOnSleepingServer() {
400 gpr_log(GPR_INFO, "Sending Ping Pong streaming rpc with a short deadline...");
401 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
402
403 ClientContext context;
404 std::chrono::system_clock::time_point deadline =
405 std::chrono::system_clock::now() + std::chrono::milliseconds(1);
406 context.set_deadline(deadline);
407 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
408 StreamingOutputCallResponse>>
409 stream(stub->FullDuplexCall(&context));
410
411 StreamingOutputCallRequest request;
412 request.mutable_payload()->set_body(grpc::string(27182, '\0'));
413 stream->Write(request);
414
415 Status s = stream->Finish();
416 GPR_ASSERT(s.error_code() == StatusCode::DEADLINE_EXCEEDED);
417 gpr_log(GPR_INFO, "Pingpong streaming timeout done.");
418}
419
Yang Gaoa4002072015-04-09 23:25:21 -0700420} // namespace testing
421} // namespace grpc