blob: 33053edc97c53c91d480ba620b0f0125cbd0ed88 [file] [log] [blame]
Yang Gaoa4002072015-04-09 23:25:21 -07001/*
2 *
3 * Copyright 2015, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "test/cpp/interop/interop_client.h"
35
36#include <memory>
37
38#include <unistd.h>
39
40#include <grpc/grpc.h>
41#include <grpc/support/log.h>
42#include <grpc++/channel_interface.h>
43#include <grpc++/client_context.h>
44#include <grpc++/status.h>
45#include <grpc++/stream.h>
Abhishek Kumar1b3e3cd2015-04-16 20:10:29 -070046#include "test/proto/test.grpc.pb.h"
Yang Gaoa4002072015-04-09 23:25:21 -070047#include "test/cpp/interop/empty.grpc.pb.h"
48#include "test/cpp/interop/messages.grpc.pb.h"
49
50namespace grpc {
51namespace testing {
52
53namespace {
54// The same value is defined by the Java client.
55const std::vector<int> request_stream_sizes = {27182, 8, 1828, 45904};
56const std::vector<int> response_stream_sizes = {31415, 9, 2653, 58979};
57const int kNumResponseMessages = 2000;
58const int kResponseMessageSize = 1030;
59const int kReceiveDelayMilliSeconds = 20;
60const int kLargeRequestSize = 314159;
61const int kLargeResponseSize = 271812;
62} // namespace
63
64InteropClient::InteropClient(std::shared_ptr<ChannelInterface> channel)
65 : channel_(channel) {}
66
67void InteropClient::AssertOkOrPrintErrorStatus(const Status& s) {
68 if (s.IsOk()) {
69 return;
70 }
71 gpr_log(GPR_INFO, "Error status code: %d, message: %s", s.code(),
72 s.details().c_str());
73 GPR_ASSERT(0);
74}
75
76void InteropClient::DoEmpty() {
77 gpr_log(GPR_INFO, "Sending an empty rpc...");
78 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
79
80 Empty request = Empty::default_instance();
81 Empty response = Empty::default_instance();
82 ClientContext context;
83
84 Status s = stub->EmptyCall(&context, request, &response);
85 AssertOkOrPrintErrorStatus(s);
86
87 gpr_log(GPR_INFO, "Empty rpc done.");
88}
89
90// Shared code to set large payload, make rpc and check response payload.
91void InteropClient::PerformLargeUnary(SimpleRequest* request,
92 SimpleResponse* response) {
93 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
94
95 ClientContext context;
96 request->set_response_type(PayloadType::COMPRESSABLE);
97 request->set_response_size(kLargeResponseSize);
98 grpc::string payload(kLargeRequestSize, '\0');
99 request->mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
100
101 Status s = stub->UnaryCall(&context, *request, response);
102
103 AssertOkOrPrintErrorStatus(s);
104 GPR_ASSERT(response->payload().type() == PayloadType::COMPRESSABLE);
105 GPR_ASSERT(response->payload().body() ==
106 grpc::string(kLargeResponseSize, '\0'));
107}
108
109void InteropClient::DoComputeEngineCreds(
110 const grpc::string& default_service_account,
111 const grpc::string& oauth_scope) {
112 gpr_log(GPR_INFO,
113 "Sending a large unary rpc with compute engine credentials ...");
114 SimpleRequest request;
115 SimpleResponse response;
116 request.set_fill_username(true);
117 request.set_fill_oauth_scope(true);
118 PerformLargeUnary(&request, &response);
119 gpr_log(GPR_INFO, "Got username %s", response.username().c_str());
120 gpr_log(GPR_INFO, "Got oauth_scope %s", response.oauth_scope().c_str());
121 GPR_ASSERT(!response.username().empty());
122 GPR_ASSERT(response.username().c_str() == default_service_account);
123 GPR_ASSERT(!response.oauth_scope().empty());
124 const char* oauth_scope_str = response.oauth_scope().c_str();
125 GPR_ASSERT(oauth_scope.find(oauth_scope_str) != grpc::string::npos);
126 gpr_log(GPR_INFO, "Large unary with compute engine creds done.");
127}
128
129void InteropClient::DoServiceAccountCreds(const grpc::string& username,
130 const grpc::string& oauth_scope) {
131 gpr_log(GPR_INFO,
132 "Sending a large unary rpc with service account credentials ...");
133 SimpleRequest request;
134 SimpleResponse response;
135 request.set_fill_username(true);
136 request.set_fill_oauth_scope(true);
137 PerformLargeUnary(&request, &response);
138 GPR_ASSERT(!response.username().empty());
139 GPR_ASSERT(!response.oauth_scope().empty());
140 GPR_ASSERT(username.find(response.username()) != grpc::string::npos);
141 const char* oauth_scope_str = response.oauth_scope().c_str();
142 GPR_ASSERT(oauth_scope.find(oauth_scope_str) != grpc::string::npos);
143 gpr_log(GPR_INFO, "Large unary with service account creds done.");
144}
145
146void InteropClient::DoJwtTokenCreds(const grpc::string& username) {
147 gpr_log(GPR_INFO, "Sending a large unary rpc with JWT token credentials ...");
148 SimpleRequest request;
149 SimpleResponse response;
150 request.set_fill_username(true);
151 PerformLargeUnary(&request, &response);
152 GPR_ASSERT(!response.username().empty());
153 GPR_ASSERT(username.find(response.username()) != grpc::string::npos);
154 gpr_log(GPR_INFO, "Large unary with JWT token creds done.");
155}
156
157void InteropClient::DoLargeUnary() {
158 gpr_log(GPR_INFO, "Sending a large unary rpc...");
159 SimpleRequest request;
160 SimpleResponse response;
161 PerformLargeUnary(&request, &response);
162 gpr_log(GPR_INFO, "Large unary done.");
163}
164
165void InteropClient::DoRequestStreaming() {
166 gpr_log(GPR_INFO, "Sending request steaming rpc ...");
167 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
168
169 ClientContext context;
170 StreamingInputCallRequest request;
171 StreamingInputCallResponse response;
172
173 std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream(
174 stub->StreamingInputCall(&context, &response));
175
176 int aggregated_payload_size = 0;
177 for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) {
178 Payload* payload = request.mutable_payload();
179 payload->set_body(grpc::string(request_stream_sizes[i], '\0'));
180 GPR_ASSERT(stream->Write(request));
181 aggregated_payload_size += request_stream_sizes[i];
182 }
183 stream->WritesDone();
184 Status s = stream->Finish();
185
186 GPR_ASSERT(response.aggregated_payload_size() == aggregated_payload_size);
187 AssertOkOrPrintErrorStatus(s);
188 gpr_log(GPR_INFO, "Request streaming done.");
189}
190
191void InteropClient::DoResponseStreaming() {
192 gpr_log(GPR_INFO, "Receiving response steaming rpc ...");
193 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
194
195 ClientContext context;
196 StreamingOutputCallRequest request;
197 for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
198 ResponseParameters* response_parameter = request.add_response_parameters();
199 response_parameter->set_size(response_stream_sizes[i]);
200 }
201 StreamingOutputCallResponse response;
202 std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
203 stub->StreamingOutputCall(&context, request));
204
205 unsigned int i = 0;
206 while (stream->Read(&response)) {
207 GPR_ASSERT(response.payload().body() ==
208 grpc::string(response_stream_sizes[i], '\0'));
209 ++i;
210 }
211 GPR_ASSERT(response_stream_sizes.size() == i);
212 Status s = stream->Finish();
213
214 AssertOkOrPrintErrorStatus(s);
215 gpr_log(GPR_INFO, "Response streaming done.");
216}
217
218void InteropClient::DoResponseStreamingWithSlowConsumer() {
219 gpr_log(GPR_INFO, "Receiving response steaming rpc with slow consumer ...");
220 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
221
222 ClientContext context;
223 StreamingOutputCallRequest request;
224
225 for (int i = 0; i < kNumResponseMessages; ++i) {
226 ResponseParameters* response_parameter = request.add_response_parameters();
227 response_parameter->set_size(kResponseMessageSize);
228 }
229 StreamingOutputCallResponse response;
230 std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
231 stub->StreamingOutputCall(&context, request));
232
233 int i = 0;
234 while (stream->Read(&response)) {
235 GPR_ASSERT(response.payload().body() ==
236 grpc::string(kResponseMessageSize, '\0'));
237 gpr_log(GPR_INFO, "received message %d", i);
238 usleep(kReceiveDelayMilliSeconds * 1000);
239 ++i;
240 }
241 GPR_ASSERT(kNumResponseMessages == i);
242 Status s = stream->Finish();
243
244 AssertOkOrPrintErrorStatus(s);
245 gpr_log(GPR_INFO, "Response streaming done.");
246}
247
248void InteropClient::DoHalfDuplex() {
249 gpr_log(GPR_INFO, "Sending half-duplex streaming rpc ...");
250 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
251
252 ClientContext context;
253 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
254 StreamingOutputCallResponse>>
255 stream(stub->HalfDuplexCall(&context));
256
257 StreamingOutputCallRequest request;
258 ResponseParameters* response_parameter = request.add_response_parameters();
259 for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
260 response_parameter->set_size(response_stream_sizes[i]);
261 GPR_ASSERT(stream->Write(request));
262 }
263 stream->WritesDone();
264
265 unsigned int i = 0;
266 StreamingOutputCallResponse response;
267 while (stream->Read(&response)) {
268 GPR_ASSERT(response.payload().has_body());
269 GPR_ASSERT(response.payload().body() ==
270 grpc::string(response_stream_sizes[i], '\0'));
271 ++i;
272 }
273 GPR_ASSERT(response_stream_sizes.size() == i);
274 Status s = stream->Finish();
275 AssertOkOrPrintErrorStatus(s);
276 gpr_log(GPR_INFO, "Half-duplex streaming rpc done.");
277}
278
279void InteropClient::DoPingPong() {
280 gpr_log(GPR_INFO, "Sending Ping Pong streaming rpc ...");
281 std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
282
283 ClientContext context;
284 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
285 StreamingOutputCallResponse>>
286 stream(stub->FullDuplexCall(&context));
287
288 StreamingOutputCallRequest request;
289 request.set_response_type(PayloadType::COMPRESSABLE);
290 ResponseParameters* response_parameter = request.add_response_parameters();
291 Payload* payload = request.mutable_payload();
292 StreamingOutputCallResponse response;
293 for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) {
294 response_parameter->set_size(response_stream_sizes[i]);
295 payload->set_body(grpc::string(request_stream_sizes[i], '\0'));
296 GPR_ASSERT(stream->Write(request));
297 GPR_ASSERT(stream->Read(&response));
298 GPR_ASSERT(response.payload().has_body());
299 GPR_ASSERT(response.payload().body() ==
300 grpc::string(response_stream_sizes[i], '\0'));
301 }
302
303 stream->WritesDone();
304 GPR_ASSERT(!stream->Read(&response));
305 Status s = stream->Finish();
306 AssertOkOrPrintErrorStatus(s);
307 gpr_log(GPR_INFO, "Ping pong streaming done.");
308}
309
310} // namespace testing
311} // namespace grpc