blob: 30056e26ab209bcb52afb799547cd4a83d912752 [file] [log] [blame]
Yang Gaoa4002072015-04-09 23:25:21 -07001/*
2 *
3 * Copyright 2015, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "test/cpp/interop/interop_client.h"
35
36#include <memory>
37
38#include <unistd.h>
39
40#include <grpc/grpc.h>
41#include <grpc/support/log.h>
42#include <grpc++/channel_interface.h>
43#include <grpc++/client_context.h>
44#include <grpc++/status.h>
45#include <grpc++/stream.h>
Abhishek Kumar1b3e3cd2015-04-16 20:10:29 -070046#include "test/proto/test.grpc.pb.h"
Abhishek Kumar60572d42015-04-16 20:45:25 -070047#include "test/proto/empty.grpc.pb.h"
48#include "test/proto/messages.grpc.pb.h"
Yang Gaoa4002072015-04-09 23:25:21 -070049
50namespace grpc {
51namespace testing {
52
53namespace {
54// The same value is defined by the Java client.
55const std::vector<int> request_stream_sizes = {27182, 8, 1828, 45904};
56const std::vector<int> response_stream_sizes = {31415, 9, 2653, 58979};
57const int kNumResponseMessages = 2000;
58const int kResponseMessageSize = 1030;
59const int kReceiveDelayMilliSeconds = 20;
Xudong Maa5861f32015-05-26 15:24:11 -070060const int kLargeRequestSize = 271828;
61const int kLargeResponseSize = 314159;
Yang Gaoa4002072015-04-09 23:25:21 -070062} // namespace
63
64InteropClient::InteropClient(std::shared_ptr<ChannelInterface> channel)
65 : channel_(channel) {}
66
67void InteropClient::AssertOkOrPrintErrorStatus(const Status& s) {
Yang Gaoc1a2c312015-06-16 10:59:46 -070068 if (s.ok()) {
Yang Gaoa4002072015-04-09 23:25:21 -070069 return;
70 }
Yang Gaoc1a2c312015-06-16 10:59:46 -070071 gpr_log(GPR_INFO, "Error status code: %d, message: %s", s.error_code(),
72 s.error_message().c_str());
Yang Gaoa4002072015-04-09 23:25:21 -070073 GPR_ASSERT(0);
74}
75
76void InteropClient::DoEmpty() {
77 gpr_log(GPR_INFO, "Sending an empty rpc...");
78 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
79
80 Empty request = Empty::default_instance();
81 Empty response = Empty::default_instance();
82 ClientContext context;
83
84 Status s = stub->EmptyCall(&context, request, &response);
85 AssertOkOrPrintErrorStatus(s);
86
87 gpr_log(GPR_INFO, "Empty rpc done.");
88}
89
90// Shared code to set large payload, make rpc and check response payload.
91void InteropClient::PerformLargeUnary(SimpleRequest* request,
92 SimpleResponse* response) {
93 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
94
95 ClientContext context;
96 request->set_response_type(PayloadType::COMPRESSABLE);
97 request->set_response_size(kLargeResponseSize);
98 grpc::string payload(kLargeRequestSize, '\0');
99 request->mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
100
101 Status s = stub->UnaryCall(&context, *request, response);
102
103 AssertOkOrPrintErrorStatus(s);
104 GPR_ASSERT(response->payload().type() == PayloadType::COMPRESSABLE);
105 GPR_ASSERT(response->payload().body() ==
106 grpc::string(kLargeResponseSize, '\0'));
107}
108
109void InteropClient::DoComputeEngineCreds(
110 const grpc::string& default_service_account,
111 const grpc::string& oauth_scope) {
112 gpr_log(GPR_INFO,
113 "Sending a large unary rpc with compute engine credentials ...");
114 SimpleRequest request;
115 SimpleResponse response;
116 request.set_fill_username(true);
117 request.set_fill_oauth_scope(true);
118 PerformLargeUnary(&request, &response);
119 gpr_log(GPR_INFO, "Got username %s", response.username().c_str());
120 gpr_log(GPR_INFO, "Got oauth_scope %s", response.oauth_scope().c_str());
121 GPR_ASSERT(!response.username().empty());
122 GPR_ASSERT(response.username().c_str() == default_service_account);
123 GPR_ASSERT(!response.oauth_scope().empty());
124 const char* oauth_scope_str = response.oauth_scope().c_str();
125 GPR_ASSERT(oauth_scope.find(oauth_scope_str) != grpc::string::npos);
126 gpr_log(GPR_INFO, "Large unary with compute engine creds done.");
127}
128
129void InteropClient::DoServiceAccountCreds(const grpc::string& username,
130 const grpc::string& oauth_scope) {
131 gpr_log(GPR_INFO,
132 "Sending a large unary rpc with service account credentials ...");
133 SimpleRequest request;
134 SimpleResponse response;
135 request.set_fill_username(true);
136 request.set_fill_oauth_scope(true);
137 PerformLargeUnary(&request, &response);
138 GPR_ASSERT(!response.username().empty());
139 GPR_ASSERT(!response.oauth_scope().empty());
140 GPR_ASSERT(username.find(response.username()) != grpc::string::npos);
141 const char* oauth_scope_str = response.oauth_scope().c_str();
142 GPR_ASSERT(oauth_scope.find(oauth_scope_str) != grpc::string::npos);
143 gpr_log(GPR_INFO, "Large unary with service account creds done.");
144}
145
yang-gbe5f0592015-07-13 11:11:50 -0700146void InteropClient::DoOauth2AuthToken(const grpc::string& username,
147 const grpc::string& oauth_scope) {
148 gpr_log(GPR_INFO,
149 "Sending a large unary rpc with raw oauth2 access token ...");
150 SimpleRequest request;
151 SimpleResponse response;
152 request.set_fill_username(true);
153 request.set_fill_oauth_scope(true);
154 PerformLargeUnary(&request, &response);
155 GPR_ASSERT(!response.username().empty());
156 GPR_ASSERT(!response.oauth_scope().empty());
157 GPR_ASSERT(username.find(response.username()) != grpc::string::npos);
158 const char* oauth_scope_str = response.oauth_scope().c_str();
159 GPR_ASSERT(oauth_scope.find(oauth_scope_str) != grpc::string::npos);
160 gpr_log(GPR_INFO, "Large unary with oauth2 access token done.");
161}
162
Yang Gaoa4002072015-04-09 23:25:21 -0700163void InteropClient::DoJwtTokenCreds(const grpc::string& username) {
164 gpr_log(GPR_INFO, "Sending a large unary rpc with JWT token credentials ...");
165 SimpleRequest request;
166 SimpleResponse response;
167 request.set_fill_username(true);
168 PerformLargeUnary(&request, &response);
169 GPR_ASSERT(!response.username().empty());
170 GPR_ASSERT(username.find(response.username()) != grpc::string::npos);
171 gpr_log(GPR_INFO, "Large unary with JWT token creds done.");
172}
173
174void InteropClient::DoLargeUnary() {
175 gpr_log(GPR_INFO, "Sending a large unary rpc...");
176 SimpleRequest request;
177 SimpleResponse response;
178 PerformLargeUnary(&request, &response);
179 gpr_log(GPR_INFO, "Large unary done.");
180}
181
182void InteropClient::DoRequestStreaming() {
183 gpr_log(GPR_INFO, "Sending request steaming rpc ...");
184 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
185
186 ClientContext context;
187 StreamingInputCallRequest request;
188 StreamingInputCallResponse response;
189
190 std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream(
191 stub->StreamingInputCall(&context, &response));
192
193 int aggregated_payload_size = 0;
194 for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) {
195 Payload* payload = request.mutable_payload();
196 payload->set_body(grpc::string(request_stream_sizes[i], '\0'));
197 GPR_ASSERT(stream->Write(request));
198 aggregated_payload_size += request_stream_sizes[i];
199 }
200 stream->WritesDone();
201 Status s = stream->Finish();
202
203 GPR_ASSERT(response.aggregated_payload_size() == aggregated_payload_size);
204 AssertOkOrPrintErrorStatus(s);
205 gpr_log(GPR_INFO, "Request streaming done.");
206}
207
208void InteropClient::DoResponseStreaming() {
209 gpr_log(GPR_INFO, "Receiving response steaming rpc ...");
210 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
211
212 ClientContext context;
213 StreamingOutputCallRequest request;
214 for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
215 ResponseParameters* response_parameter = request.add_response_parameters();
216 response_parameter->set_size(response_stream_sizes[i]);
217 }
218 StreamingOutputCallResponse response;
219 std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
220 stub->StreamingOutputCall(&context, request));
221
222 unsigned int i = 0;
223 while (stream->Read(&response)) {
224 GPR_ASSERT(response.payload().body() ==
225 grpc::string(response_stream_sizes[i], '\0'));
226 ++i;
227 }
228 GPR_ASSERT(response_stream_sizes.size() == i);
229 Status s = stream->Finish();
230
231 AssertOkOrPrintErrorStatus(s);
232 gpr_log(GPR_INFO, "Response streaming done.");
233}
234
235void InteropClient::DoResponseStreamingWithSlowConsumer() {
236 gpr_log(GPR_INFO, "Receiving response steaming rpc with slow consumer ...");
237 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
238
239 ClientContext context;
240 StreamingOutputCallRequest request;
241
242 for (int i = 0; i < kNumResponseMessages; ++i) {
243 ResponseParameters* response_parameter = request.add_response_parameters();
244 response_parameter->set_size(kResponseMessageSize);
245 }
246 StreamingOutputCallResponse response;
247 std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
248 stub->StreamingOutputCall(&context, request));
249
250 int i = 0;
251 while (stream->Read(&response)) {
252 GPR_ASSERT(response.payload().body() ==
253 grpc::string(kResponseMessageSize, '\0'));
254 gpr_log(GPR_INFO, "received message %d", i);
255 usleep(kReceiveDelayMilliSeconds * 1000);
256 ++i;
257 }
258 GPR_ASSERT(kNumResponseMessages == i);
259 Status s = stream->Finish();
260
261 AssertOkOrPrintErrorStatus(s);
262 gpr_log(GPR_INFO, "Response streaming done.");
263}
264
265void InteropClient::DoHalfDuplex() {
266 gpr_log(GPR_INFO, "Sending half-duplex streaming rpc ...");
267 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
268
269 ClientContext context;
270 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
271 StreamingOutputCallResponse>>
272 stream(stub->HalfDuplexCall(&context));
273
274 StreamingOutputCallRequest request;
275 ResponseParameters* response_parameter = request.add_response_parameters();
276 for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
277 response_parameter->set_size(response_stream_sizes[i]);
278 GPR_ASSERT(stream->Write(request));
279 }
280 stream->WritesDone();
281
282 unsigned int i = 0;
283 StreamingOutputCallResponse response;
284 while (stream->Read(&response)) {
285 GPR_ASSERT(response.payload().has_body());
286 GPR_ASSERT(response.payload().body() ==
287 grpc::string(response_stream_sizes[i], '\0'));
288 ++i;
289 }
290 GPR_ASSERT(response_stream_sizes.size() == i);
291 Status s = stream->Finish();
292 AssertOkOrPrintErrorStatus(s);
293 gpr_log(GPR_INFO, "Half-duplex streaming rpc done.");
294}
295
296void InteropClient::DoPingPong() {
297 gpr_log(GPR_INFO, "Sending Ping Pong streaming rpc ...");
298 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
299
300 ClientContext context;
301 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
302 StreamingOutputCallResponse>>
303 stream(stub->FullDuplexCall(&context));
304
305 StreamingOutputCallRequest request;
306 request.set_response_type(PayloadType::COMPRESSABLE);
307 ResponseParameters* response_parameter = request.add_response_parameters();
308 Payload* payload = request.mutable_payload();
309 StreamingOutputCallResponse response;
310 for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) {
311 response_parameter->set_size(response_stream_sizes[i]);
312 payload->set_body(grpc::string(request_stream_sizes[i], '\0'));
313 GPR_ASSERT(stream->Write(request));
314 GPR_ASSERT(stream->Read(&response));
315 GPR_ASSERT(response.payload().has_body());
316 GPR_ASSERT(response.payload().body() ==
317 grpc::string(response_stream_sizes[i], '\0'));
318 }
319
320 stream->WritesDone();
321 GPR_ASSERT(!stream->Read(&response));
322 Status s = stream->Finish();
323 AssertOkOrPrintErrorStatus(s);
324 gpr_log(GPR_INFO, "Ping pong streaming done.");
325}
326
Yang Gao68d61572015-04-24 14:42:42 -0700327void InteropClient::DoCancelAfterBegin() {
328 gpr_log(GPR_INFO, "Sending request steaming rpc ...");
329 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
330
331 ClientContext context;
332 StreamingInputCallRequest request;
333 StreamingInputCallResponse response;
334
335 std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream(
336 stub->StreamingInputCall(&context, &response));
337
338 gpr_log(GPR_INFO, "Trying to cancel...");
339 context.TryCancel();
340 Status s = stream->Finish();
Yang Gaoc1a2c312015-06-16 10:59:46 -0700341 GPR_ASSERT(s.error_code() == StatusCode::CANCELLED);
Yang Gao68d61572015-04-24 14:42:42 -0700342 gpr_log(GPR_INFO, "Canceling streaming done.");
343}
344
345void InteropClient::DoCancelAfterFirstResponse() {
346 gpr_log(GPR_INFO, "Sending Ping Pong streaming rpc ...");
347 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
348
349 ClientContext context;
350 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
351 StreamingOutputCallResponse>>
352 stream(stub->FullDuplexCall(&context));
353
354 StreamingOutputCallRequest request;
355 request.set_response_type(PayloadType::COMPRESSABLE);
356 ResponseParameters* response_parameter = request.add_response_parameters();
357 response_parameter->set_size(31415);
358 request.mutable_payload()->set_body(grpc::string(27182, '\0'));
359 StreamingOutputCallResponse response;
360 GPR_ASSERT(stream->Write(request));
361 GPR_ASSERT(stream->Read(&response));
362 GPR_ASSERT(response.payload().has_body());
363 GPR_ASSERT(response.payload().body() == grpc::string(31415, '\0'));
364 gpr_log(GPR_INFO, "Trying to cancel...");
365 context.TryCancel();
366
367 Status s = stream->Finish();
368 gpr_log(GPR_INFO, "Canceling pingpong streaming done.");
369}
370
yang-g69563b92015-07-10 15:32:11 -0700371void InteropClient::DoTimeoutOnSleepingServer() {
372 gpr_log(GPR_INFO, "Sending Ping Pong streaming rpc with a short deadline...");
373 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
374
375 ClientContext context;
376 std::chrono::system_clock::time_point deadline =
377 std::chrono::system_clock::now() + std::chrono::milliseconds(1);
378 context.set_deadline(deadline);
379 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
380 StreamingOutputCallResponse>>
381 stream(stub->FullDuplexCall(&context));
382
383 StreamingOutputCallRequest request;
384 request.mutable_payload()->set_body(grpc::string(27182, '\0'));
385 stream->Write(request);
386
387 Status s = stream->Finish();
388 GPR_ASSERT(s.error_code() == StatusCode::DEADLINE_EXCEEDED);
389 gpr_log(GPR_INFO, "Pingpong streaming timeout done.");
390}
391
Yang Gaoa4002072015-04-09 23:25:21 -0700392} // namespace testing
393} // namespace grpc