blob: 0232a9fa3171cf073d6c19122dad7a35c500461c [file] [log] [blame]
Craig Tiller0220cf12015-02-12 17:39:26 -08001/*
2 *
Craig Tiller6169d5f2016-03-31 07:46:18 -07003 * Copyright 2015, Google Inc.
Craig Tiller0220cf12015-02-12 17:39:26 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Yang Gaoda699b82015-02-18 01:10:22 -080034#include <memory>
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -080035#include <thread>
Craig Tiller0220cf12015-02-12 17:39:26 -080036
yang-g8c2be9f2015-08-19 16:28:09 -070037#include <grpc++/channel.h>
Craig Tiller0220cf12015-02-12 17:39:26 -080038#include <grpc++/client_context.h>
39#include <grpc++/create_channel.h>
Craig Tiller0220cf12015-02-12 17:39:26 -080040#include <grpc++/server.h>
41#include <grpc++/server_builder.h>
42#include <grpc++/server_context.h>
Sree Kuchibhotlab0d0c8e2016-01-13 22:52:17 -080043#include <grpc/grpc.h>
44#include <grpc/support/thd.h>
45#include <grpc/support/time.h>
Vijay Paib65eda42016-02-16 13:48:05 -080046#include <grpc/support/tls.h>
Craig Tiller0220cf12015-02-12 17:39:26 -080047#include <gtest/gtest.h>
48
Craig Tiller1b4e3302015-12-17 16:35:00 -080049#include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
50#include "src/proto/grpc/testing/echo.grpc.pb.h"
Sree Kuchibhotlab0d0c8e2016-01-13 22:52:17 -080051#include "test/core/util/port.h"
52#include "test/core/util/test_config.h"
yang-ge21908f2015-08-25 13:47:51 -070053#include "test/cpp/util/string_ref_helper.h"
Vijay Paidf8b62c2016-05-02 14:34:24 -070054#include "test/cpp/util/test_credentials_provider.h"
Craig Tiller0220cf12015-02-12 17:39:26 -080055
Craig Tiller69f90e62015-08-06 08:32:35 -070056#ifdef GPR_POSIX_SOCKET
Craig Tillerf45496f2016-03-30 07:41:19 -070057#include "src/core/lib/iomgr/ev_posix.h"
Craig Tiller69f90e62015-08-06 08:32:35 -070058#endif
59
Craig Tiller1b4e3302015-12-17 16:35:00 -080060using grpc::testing::EchoRequest;
61using grpc::testing::EchoResponse;
Vijay Paidf8b62c2016-05-02 14:34:24 -070062using grpc::testing::kTlsCredentialsType;
Craig Tiller0220cf12015-02-12 17:39:26 -080063using std::chrono::system_clock;
64
Vijay Paib65eda42016-02-16 13:48:05 -080065GPR_TLS_DECL(g_is_async_end2end_test);
66
Craig Tiller0220cf12015-02-12 17:39:26 -080067namespace grpc {
68namespace testing {
69
70namespace {
71
Craig Tiller7536af02015-12-22 13:49:30 -080072void* tag(int i) { return (void*)(intptr_t)i; }
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -080073int detag(void* p) { return static_cast<int>(reinterpret_cast<intptr_t>(p)); }
Yang Gaoc05b6cb2015-02-13 00:34:10 -080074
Craig Tiller69f90e62015-08-06 08:32:35 -070075#ifdef GPR_POSIX_SOCKET
Vijay Paib65eda42016-02-16 13:48:05 -080076static int maybe_assert_non_blocking_poll(struct pollfd* pfds, nfds_t nfds,
77 int timeout) {
78 if (gpr_tls_get(&g_is_async_end2end_test)) {
79 GPR_ASSERT(timeout == 0);
80 }
81 return poll(pfds, nfds, timeout);
Craig Tiller69f90e62015-08-06 08:32:35 -070082}
83
84class PollOverride {
Craig Tiller06cf3cc2015-05-13 13:11:01 -070085 public:
Craig Tiller69f90e62015-08-06 08:32:35 -070086 PollOverride(grpc_poll_function_type f) {
87 prev_ = grpc_poll_function;
88 grpc_poll_function = f;
89 }
90
Craig Tiller4c06b822015-08-06 08:41:31 -070091 ~PollOverride() { grpc_poll_function = prev_; }
Craig Tiller69f90e62015-08-06 08:32:35 -070092
93 private:
94 grpc_poll_function_type prev_;
95};
96
vjpaicf4daeb2016-02-15 02:33:54 -080097class PollingOverrider : public PollOverride {
Craig Tiller69f90e62015-08-06 08:32:35 -070098 public:
vjpaicf4daeb2016-02-15 02:33:54 -080099 explicit PollingOverrider(bool allow_blocking)
Vijay Paib65eda42016-02-16 13:48:05 -0800100 : PollOverride(allow_blocking ? poll : maybe_assert_non_blocking_poll) {}
Craig Tiller69f90e62015-08-06 08:32:35 -0700101};
102#else
vjpaicf4daeb2016-02-15 02:33:54 -0800103class PollingOverrider {
Craig Tiller69f90e62015-08-06 08:32:35 -0700104 public:
vjpaicf4daeb2016-02-15 02:33:54 -0800105 explicit PollingOverrider(bool allow_blocking) {}
Craig Tiller69f90e62015-08-06 08:32:35 -0700106};
107#endif
108
vjpaicf4daeb2016-02-15 02:33:54 -0800109class Verifier {
Craig Tiller69f90e62015-08-06 08:32:35 -0700110 public:
vjpaicf4daeb2016-02-15 02:33:54 -0800111 explicit Verifier(bool spin) : spin_(spin) {}
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800112 // Expect sets the expected ok value for a specific tag
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700113 Verifier& Expect(int i, bool expect_ok) {
114 expectations_[tag(i)] = expect_ok;
115 return *this;
vjpai7aadf462015-03-16 23:58:44 -0700116 }
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800117
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800118 // Next waits for 1 async tag to complete, checks its
119 // expectations, and returns the tag
120 int Next(CompletionQueue* cq, bool ignore_ok) {
121 bool ok;
122 void* got_tag;
123 if (spin_) {
124 for (;;) {
125 auto r = cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME));
126 if (r == CompletionQueue::TIMEOUT) continue;
127 if (r == CompletionQueue::GOT_EVENT) break;
128 gpr_log(GPR_ERROR, "unexpected result from AsyncNext");
129 abort();
130 }
131 } else {
132 EXPECT_TRUE(cq->Next(&got_tag, &ok));
133 }
134 auto it = expectations_.find(got_tag);
135 EXPECT_TRUE(it != expectations_.end());
136 if (!ignore_ok) {
137 EXPECT_EQ(it->second, ok);
138 }
139 expectations_.erase(it);
140 return detag(got_tag);
141 }
142
143 // Verify keeps calling Next until all currently set
144 // expected tags are complete
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800145 void Verify(CompletionQueue* cq) { Verify(cq, false); }
146
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800147 // This version of Verify allows optionally ignoring the
148 // outcome of the expectation
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800149 void Verify(CompletionQueue* cq, bool ignore_ok) {
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700150 GPR_ASSERT(!expectations_.empty());
151 while (!expectations_.empty()) {
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800152 Next(cq, ignore_ok);
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700153 }
154 }
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800155 // This version of Verify stops after a certain deadline
Craig Tillerd6c98df2015-08-18 09:33:44 -0700156 void Verify(CompletionQueue* cq,
157 std::chrono::system_clock::time_point deadline) {
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700158 if (expectations_.empty()) {
159 bool ok;
Craig Tillerd6c98df2015-08-18 09:33:44 -0700160 void* got_tag;
Craig Tiller69f90e62015-08-06 08:32:35 -0700161 if (spin_) {
162 while (std::chrono::system_clock::now() < deadline) {
Craig Tiller4c06b822015-08-06 08:41:31 -0700163 EXPECT_EQ(
164 cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME)),
165 CompletionQueue::TIMEOUT);
Craig Tiller69f90e62015-08-06 08:32:35 -0700166 }
167 } else {
Craig Tiller4c06b822015-08-06 08:41:31 -0700168 EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
169 CompletionQueue::TIMEOUT);
Craig Tiller69f90e62015-08-06 08:32:35 -0700170 }
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700171 } else {
172 while (!expectations_.empty()) {
173 bool ok;
Craig Tillerd6c98df2015-08-18 09:33:44 -0700174 void* got_tag;
Craig Tiller69f90e62015-08-06 08:32:35 -0700175 if (spin_) {
176 for (;;) {
177 GPR_ASSERT(std::chrono::system_clock::now() < deadline);
Craig Tiller4c06b822015-08-06 08:41:31 -0700178 auto r =
179 cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME));
Craig Tiller69f90e62015-08-06 08:32:35 -0700180 if (r == CompletionQueue::TIMEOUT) continue;
181 if (r == CompletionQueue::GOT_EVENT) break;
182 gpr_log(GPR_ERROR, "unexpected result from AsyncNext");
183 abort();
Craig Tiller4c06b822015-08-06 08:41:31 -0700184 }
Craig Tiller69f90e62015-08-06 08:32:35 -0700185 } else {
Craig Tiller4c06b822015-08-06 08:41:31 -0700186 EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
187 CompletionQueue::GOT_EVENT);
Craig Tiller69f90e62015-08-06 08:32:35 -0700188 }
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700189 auto it = expectations_.find(got_tag);
190 EXPECT_TRUE(it != expectations_.end());
191 EXPECT_EQ(it->second, ok);
192 expectations_.erase(it);
193 }
194 }
195 }
196
197 private:
198 std::map<void*, bool> expectations_;
Craig Tiller69f90e62015-08-06 08:32:35 -0700199 bool spin_;
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700200};
vjpai7aadf462015-03-16 23:58:44 -0700201
Vijay Paidf8b62c2016-05-02 14:34:24 -0700202class TestScenario {
203 public:
Vijay Paid7b1e702016-05-02 15:10:21 -0700204 TestScenario(bool non_block, const grpc::string& creds_type,
205 const grpc::string& content)
206 : disable_blocking(non_block),
207 credentials_type(creds_type),
208 message_content(content) {}
209 void Log() const {
210 gpr_log(GPR_INFO,
211 "Scenario: disable_blocking %d, credentials %s, message size %d",
212 disable_blocking, credentials_type.c_str(), message_content.size());
213 }
Vijay Paidf8b62c2016-05-02 14:34:24 -0700214 bool disable_blocking;
215 const grpc::string credentials_type;
Vijay Paid7b1e702016-05-02 15:10:21 -0700216 const grpc::string message_content;
Vijay Paidf8b62c2016-05-02 14:34:24 -0700217};
218
219class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
Craig Tiller0220cf12015-02-12 17:39:26 -0800220 protected:
Vijay Paid7b1e702016-05-02 15:10:21 -0700221 AsyncEnd2endTest() { GetParam().Log(); }
Craig Tiller0220cf12015-02-12 17:39:26 -0800222
Craig Tillercf133f42015-02-26 14:05:56 -0800223 void SetUp() GRPC_OVERRIDE {
Vijay Paidf8b62c2016-05-02 14:34:24 -0700224 poll_overrider_.reset(new PollingOverrider(!GetParam().disable_blocking));
Vijay Pai018879a2016-02-16 09:20:50 -0800225
Craig Tiller0220cf12015-02-12 17:39:26 -0800226 int port = grpc_pick_unused_port_or_die();
227 server_address_ << "localhost:" << port;
vjpai017ed622015-12-09 10:42:54 -0800228
Craig Tiller0220cf12015-02-12 17:39:26 -0800229 // Setup server
230 ServerBuilder builder;
Vijay Paidf8b62c2016-05-02 14:34:24 -0700231 auto server_creds = GetServerCredentials(GetParam().credentials_type);
232 builder.AddListeningPort(server_address_.str(), server_creds);
Craig Tiller15f383c2016-01-07 12:45:32 -0800233 builder.RegisterService(&service_);
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700234 cq_ = builder.AddCompletionQueue();
Craig Tiller0220cf12015-02-12 17:39:26 -0800235 server_ = builder.BuildAndStart();
Vijay Paib65eda42016-02-16 13:48:05 -0800236
237 gpr_tls_set(&g_is_async_end2end_test, 1);
Craig Tiller0220cf12015-02-12 17:39:26 -0800238 }
239
Craig Tillercf133f42015-02-26 14:05:56 -0800240 void TearDown() GRPC_OVERRIDE {
Craig Tiller492968f2015-02-18 13:14:03 -0800241 server_->Shutdown();
242 void* ignored_tag;
243 bool ignored_ok;
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700244 cq_->Shutdown();
245 while (cq_->Next(&ignored_tag, &ignored_ok))
Craig Tiller492968f2015-02-18 13:14:03 -0800246 ;
Vijay Pai018879a2016-02-16 09:20:50 -0800247 poll_overrider_.reset();
Vijay Paib65eda42016-02-16 13:48:05 -0800248 gpr_tls_set(&g_is_async_end2end_test, 0);
Craig Tiller492968f2015-02-18 13:14:03 -0800249 }
Craig Tiller0220cf12015-02-12 17:39:26 -0800250
251 void ResetStub() {
Vijay Paidf8b62c2016-05-02 14:34:24 -0700252 ChannelArguments args;
253 auto channel_creds =
254 GetChannelCredentials(GetParam().credentials_type, &args);
yang-g730055d2015-08-27 12:29:45 -0700255 std::shared_ptr<Channel> channel =
Vijay Paidf8b62c2016-05-02 14:34:24 -0700256 CreateCustomChannel(server_address_.str(), channel_creds, args);
Sree Kuchibhotla5a05f512016-01-13 22:43:20 -0800257 stub_ = grpc::testing::EchoTestService::NewStub(channel);
Craig Tiller0220cf12015-02-12 17:39:26 -0800258 }
259
Yang Gao406b32f2015-02-13 16:25:33 -0800260 void SendRpc(int num_rpcs) {
261 for (int i = 0; i < num_rpcs; i++) {
262 EchoRequest send_request;
263 EchoRequest recv_request;
264 EchoResponse send_response;
265 EchoResponse recv_response;
266 Status recv_status;
267
268 ClientContext cli_ctx;
269 ServerContext srv_ctx;
270 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
271
Vijay Paid7b1e702016-05-02 15:10:21 -0700272 send_request.set_message(GetParam().message_content);
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800273 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700274 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
Yang Gao406b32f2015-02-13 16:25:33 -0800275
Craig Tillerd6c98df2015-08-18 09:33:44 -0700276 service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
277 cq_.get(), tag(2));
Yang Gao406b32f2015-02-13 16:25:33 -0800278
Vijay Paidf8b62c2016-05-02 14:34:24 -0700279 Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
Yang Gao406b32f2015-02-13 16:25:33 -0800280 EXPECT_EQ(send_request.message(), recv_request.message());
281
282 send_response.set_message(recv_request.message());
283 response_writer.Finish(send_response, Status::OK, tag(3));
Yang Gao3a5e5492015-02-18 14:32:38 -0800284 response_reader->Finish(&recv_response, &recv_status, tag(4));
Craig Tillere4d27482016-05-03 12:19:33 -0700285 Verifier(GetParam().disable_blocking)
286 .Expect(3, true)
287 .Expect(4, true)
288 .Verify(cq_.get());
Yang Gao406b32f2015-02-13 16:25:33 -0800289
290 EXPECT_EQ(send_response.message(), recv_response.message());
Yang Gaoc1a2c312015-06-16 10:59:46 -0700291 EXPECT_TRUE(recv_status.ok());
Yang Gao406b32f2015-02-13 16:25:33 -0800292 }
293 }
294
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700295 std::unique_ptr<ServerCompletionQueue> cq_;
Sree Kuchibhotla5a05f512016-01-13 22:43:20 -0800296 std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
Craig Tiller0220cf12015-02-12 17:39:26 -0800297 std::unique_ptr<Server> server_;
Sree Kuchibhotla5a05f512016-01-13 22:43:20 -0800298 grpc::testing::EchoTestService::AsyncService service_;
Craig Tiller0220cf12015-02-12 17:39:26 -0800299 std::ostringstream server_address_;
vjpaicf4daeb2016-02-15 02:33:54 -0800300
Vijay Pai018879a2016-02-16 09:20:50 -0800301 std::unique_ptr<PollingOverrider> poll_overrider_;
Craig Tiller0220cf12015-02-12 17:39:26 -0800302};
303
Craig Tiller69f90e62015-08-06 08:32:35 -0700304TEST_P(AsyncEnd2endTest, SimpleRpc) {
Craig Tiller0220cf12015-02-12 17:39:26 -0800305 ResetStub();
Yang Gao406b32f2015-02-13 16:25:33 -0800306 SendRpc(1);
307}
Yang Gaobb84a302015-02-12 23:30:12 -0800308
Craig Tiller69f90e62015-08-06 08:32:35 -0700309TEST_P(AsyncEnd2endTest, SequentialRpcs) {
Yang Gao406b32f2015-02-13 16:25:33 -0800310 ResetStub();
311 SendRpc(10);
Craig Tiller0220cf12015-02-12 17:39:26 -0800312}
313
vjpai7aadf462015-03-16 23:58:44 -0700314// Test a simple RPC using the async version of Next
Craig Tiller69f90e62015-08-06 08:32:35 -0700315TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
vjpai7aadf462015-03-16 23:58:44 -0700316 ResetStub();
317
318 EchoRequest send_request;
319 EchoRequest recv_request;
320 EchoResponse send_response;
321 EchoResponse recv_response;
322 Status recv_status;
323
324 ClientContext cli_ctx;
325 ServerContext srv_ctx;
326 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
327
Vijay Paid7b1e702016-05-02 15:10:21 -0700328 send_request.set_message(GetParam().message_content);
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800329 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700330 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
vjpai7aadf462015-03-16 23:58:44 -0700331
Yang Gao757afae2015-03-17 15:49:26 -0700332 std::chrono::system_clock::time_point time_now(
Craig Tillerf51199f2015-05-08 09:32:53 -0700333 std::chrono::system_clock::now());
334 std::chrono::system_clock::time_point time_limit(
335 std::chrono::system_clock::now() + std::chrono::seconds(10));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700336 Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now);
337 Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now);
vjpai7aadf462015-03-16 23:58:44 -0700338
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700339 service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
340 cq_.get(), tag(2));
vjpai7aadf462015-03-16 23:58:44 -0700341
Vijay Paidf8b62c2016-05-02 14:34:24 -0700342 Verifier(GetParam().disable_blocking)
343 .Expect(2, true)
344 .Verify(cq_.get(), time_limit);
vjpai7aadf462015-03-16 23:58:44 -0700345 EXPECT_EQ(send_request.message(), recv_request.message());
vjpai7aadf462015-03-16 23:58:44 -0700346
347 send_response.set_message(recv_request.message());
348 response_writer.Finish(send_response, Status::OK, tag(3));
vjpai7aadf462015-03-16 23:58:44 -0700349 response_reader->Finish(&recv_response, &recv_status, tag(4));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700350 Verifier(GetParam().disable_blocking)
Craig Tillere4d27482016-05-03 12:19:33 -0700351 .Expect(3, true)
Craig Tiller4c06b822015-08-06 08:41:31 -0700352 .Expect(4, true)
353 .Verify(cq_.get(), std::chrono::system_clock::time_point::max());
vjpai7aadf462015-03-16 23:58:44 -0700354
355 EXPECT_EQ(send_response.message(), recv_response.message());
Yang Gaoc1a2c312015-06-16 10:59:46 -0700356 EXPECT_TRUE(recv_status.ok());
vjpai7aadf462015-03-16 23:58:44 -0700357}
Yang Gao757afae2015-03-17 15:49:26 -0700358
Yang Gao0e0d8e12015-02-13 14:40:41 -0800359// Two pings and a final pong.
Craig Tiller69f90e62015-08-06 08:32:35 -0700360TEST_P(AsyncEnd2endTest, SimpleClientStreaming) {
Yang Gao005f18a2015-02-13 10:22:33 -0800361 ResetStub();
362
363 EchoRequest send_request;
364 EchoRequest recv_request;
365 EchoResponse send_response;
366 EchoResponse recv_response;
367 Status recv_status;
368 ClientContext cli_ctx;
369 ServerContext srv_ctx;
370 ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
371
Vijay Paid7b1e702016-05-02 15:10:21 -0700372 send_request.set_message(GetParam().message_content);
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800373 std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700374 stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
Yang Gao005f18a2015-02-13 10:22:33 -0800375
Craig Tillerd6c98df2015-08-18 09:33:44 -0700376 service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
377 tag(2));
Yang Gao005f18a2015-02-13 10:22:33 -0800378
Vijay Paidf8b62c2016-05-02 14:34:24 -0700379 Verifier(GetParam().disable_blocking)
380 .Expect(2, true)
381 .Expect(1, true)
382 .Verify(cq_.get());
Yang Gao005f18a2015-02-13 10:22:33 -0800383
384 cli_stream->Write(send_request, tag(3));
Yang Gao005f18a2015-02-13 10:22:33 -0800385 srv_stream.Read(&recv_request, tag(4));
Craig Tillere4d27482016-05-03 12:19:33 -0700386 Verifier(GetParam().disable_blocking)
387 .Expect(3, true)
388 .Expect(4, true)
389 .Verify(cq_.get());
Yang Gao005f18a2015-02-13 10:22:33 -0800390 EXPECT_EQ(send_request.message(), recv_request.message());
391
392 cli_stream->Write(send_request, tag(5));
Yang Gao005f18a2015-02-13 10:22:33 -0800393 srv_stream.Read(&recv_request, tag(6));
Craig Tillere4d27482016-05-03 12:19:33 -0700394 Verifier(GetParam().disable_blocking)
395 .Expect(5, true)
396 .Expect(6, true)
397 .Verify(cq_.get());
Yang Gao005f18a2015-02-13 10:22:33 -0800398
399 EXPECT_EQ(send_request.message(), recv_request.message());
400 cli_stream->WritesDone(tag(7));
Yang Gao005f18a2015-02-13 10:22:33 -0800401 srv_stream.Read(&recv_request, tag(8));
Craig Tillere4d27482016-05-03 12:19:33 -0700402 Verifier(GetParam().disable_blocking)
403 .Expect(7, true)
404 .Expect(8, false)
405 .Verify(cq_.get());
Yang Gao005f18a2015-02-13 10:22:33 -0800406
407 send_response.set_message(recv_request.message());
408 srv_stream.Finish(send_response, Status::OK, tag(9));
Yang Gao005f18a2015-02-13 10:22:33 -0800409 cli_stream->Finish(&recv_status, tag(10));
Craig Tillere4d27482016-05-03 12:19:33 -0700410 Verifier(GetParam().disable_blocking)
411 .Expect(9, true)
412 .Expect(10, true)
413 .Verify(cq_.get());
Yang Gao005f18a2015-02-13 10:22:33 -0800414
415 EXPECT_EQ(send_response.message(), recv_response.message());
Yang Gaoc1a2c312015-06-16 10:59:46 -0700416 EXPECT_TRUE(recv_status.ok());
Yang Gao005f18a2015-02-13 10:22:33 -0800417}
418
Yang Gao0e0d8e12015-02-13 14:40:41 -0800419// One ping, two pongs.
Craig Tiller69f90e62015-08-06 08:32:35 -0700420TEST_P(AsyncEnd2endTest, SimpleServerStreaming) {
Yang Gao0e0d8e12015-02-13 14:40:41 -0800421 ResetStub();
422
423 EchoRequest send_request;
424 EchoRequest recv_request;
425 EchoResponse send_response;
426 EchoResponse recv_response;
427 Status recv_status;
428 ClientContext cli_ctx;
429 ServerContext srv_ctx;
430 ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
431
Vijay Paid7b1e702016-05-02 15:10:21 -0700432 send_request.set_message(GetParam().message_content);
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800433 std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700434 stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
Yang Gao0e0d8e12015-02-13 14:40:41 -0800435
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700436 service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700437 cq_.get(), cq_.get(), tag(2));
Yang Gao0e0d8e12015-02-13 14:40:41 -0800438
Vijay Paidf8b62c2016-05-02 14:34:24 -0700439 Verifier(GetParam().disable_blocking)
440 .Expect(1, true)
441 .Expect(2, true)
442 .Verify(cq_.get());
Yang Gao0e0d8e12015-02-13 14:40:41 -0800443 EXPECT_EQ(send_request.message(), recv_request.message());
444
445 send_response.set_message(recv_request.message());
446 srv_stream.Write(send_response, tag(3));
Yang Gao0e0d8e12015-02-13 14:40:41 -0800447 cli_stream->Read(&recv_response, tag(4));
Craig Tillere4d27482016-05-03 12:19:33 -0700448 Verifier(GetParam().disable_blocking)
449 .Expect(3, true)
450 .Expect(4, true)
451 .Verify(cq_.get());
Yang Gao0e0d8e12015-02-13 14:40:41 -0800452 EXPECT_EQ(send_response.message(), recv_response.message());
453
454 srv_stream.Write(send_response, tag(5));
Yang Gao0e0d8e12015-02-13 14:40:41 -0800455 cli_stream->Read(&recv_response, tag(6));
Craig Tillere4d27482016-05-03 12:19:33 -0700456 Verifier(GetParam().disable_blocking)
457 .Expect(5, true)
458 .Expect(6, true)
459 .Verify(cq_.get());
Yang Gao0e0d8e12015-02-13 14:40:41 -0800460 EXPECT_EQ(send_response.message(), recv_response.message());
461
462 srv_stream.Finish(Status::OK, tag(7));
Yang Gao0e0d8e12015-02-13 14:40:41 -0800463 cli_stream->Read(&recv_response, tag(8));
Craig Tillere4d27482016-05-03 12:19:33 -0700464 Verifier(GetParam().disable_blocking)
465 .Expect(7, true)
466 .Expect(8, false)
467 .Verify(cq_.get());
Yang Gao0e0d8e12015-02-13 14:40:41 -0800468
469 cli_stream->Finish(&recv_status, tag(9));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700470 Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get());
Yang Gao0e0d8e12015-02-13 14:40:41 -0800471
Yang Gaoc1a2c312015-06-16 10:59:46 -0700472 EXPECT_TRUE(recv_status.ok());
Yang Gao0e0d8e12015-02-13 14:40:41 -0800473}
474
475// One ping, one pong.
Craig Tiller69f90e62015-08-06 08:32:35 -0700476TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) {
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800477 ResetStub();
478
479 EchoRequest send_request;
480 EchoRequest recv_request;
481 EchoResponse send_response;
482 EchoResponse recv_response;
483 Status recv_status;
484 ClientContext cli_ctx;
485 ServerContext srv_ctx;
486 ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
487
Vijay Paid7b1e702016-05-02 15:10:21 -0700488 send_request.set_message(GetParam().message_content);
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800489 std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700490 cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800491
Craig Tillerd6c98df2015-08-18 09:33:44 -0700492 service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
493 tag(2));
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800494
Vijay Paidf8b62c2016-05-02 14:34:24 -0700495 Verifier(GetParam().disable_blocking)
496 .Expect(1, true)
497 .Expect(2, true)
498 .Verify(cq_.get());
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800499
500 cli_stream->Write(send_request, tag(3));
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800501 srv_stream.Read(&recv_request, tag(4));
Craig Tillere4d27482016-05-03 12:19:33 -0700502 Verifier(GetParam().disable_blocking)
503 .Expect(3, true)
504 .Expect(4, true)
505 .Verify(cq_.get());
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800506 EXPECT_EQ(send_request.message(), recv_request.message());
507
508 send_response.set_message(recv_request.message());
509 srv_stream.Write(send_response, tag(5));
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800510 cli_stream->Read(&recv_response, tag(6));
Craig Tillere4d27482016-05-03 12:19:33 -0700511 Verifier(GetParam().disable_blocking)
512 .Expect(5, true)
513 .Expect(6, true)
514 .Verify(cq_.get());
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800515 EXPECT_EQ(send_response.message(), recv_response.message());
516
517 cli_stream->WritesDone(tag(7));
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800518 srv_stream.Read(&recv_request, tag(8));
Craig Tillere4d27482016-05-03 12:19:33 -0700519 Verifier(GetParam().disable_blocking)
520 .Expect(7, true)
521 .Expect(8, false)
522 .Verify(cq_.get());
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800523
524 srv_stream.Finish(Status::OK, tag(9));
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800525 cli_stream->Finish(&recv_status, tag(10));
Craig Tillere4d27482016-05-03 12:19:33 -0700526 Verifier(GetParam().disable_blocking)
527 .Expect(9, true)
528 .Expect(10, true)
529 .Verify(cq_.get());
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800530
Yang Gaoc1a2c312015-06-16 10:59:46 -0700531 EXPECT_TRUE(recv_status.ok());
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800532}
533
Yang Gao406b32f2015-02-13 16:25:33 -0800534// Metadata tests
Craig Tiller69f90e62015-08-06 08:32:35 -0700535TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
Yang Gao406b32f2015-02-13 16:25:33 -0800536 ResetStub();
537
538 EchoRequest send_request;
539 EchoRequest recv_request;
540 EchoResponse send_response;
541 EchoResponse recv_response;
542 Status recv_status;
543
544 ClientContext cli_ctx;
545 ServerContext srv_ctx;
546 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
547
Vijay Paid7b1e702016-05-02 15:10:21 -0700548 send_request.set_message(GetParam().message_content);
Yang Gao406b32f2015-02-13 16:25:33 -0800549 std::pair<grpc::string, grpc::string> meta1("key1", "val1");
550 std::pair<grpc::string, grpc::string> meta2("key2", "val2");
Craig Tiller6f871642016-02-03 16:15:31 -0800551 std::pair<grpc::string, grpc::string> meta3("g.r.d-bin", "xyz");
Yang Gao406b32f2015-02-13 16:25:33 -0800552 cli_ctx.AddMetadata(meta1.first, meta1.second);
553 cli_ctx.AddMetadata(meta2.first, meta2.second);
Craig Tiller6f871642016-02-03 16:15:31 -0800554 cli_ctx.AddMetadata(meta3.first, meta3.second);
Yang Gao406b32f2015-02-13 16:25:33 -0800555
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800556 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700557 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
Yang Gao406b32f2015-02-13 16:25:33 -0800558
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700559 service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
560 cq_.get(), tag(2));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700561 Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
Yang Gao406b32f2015-02-13 16:25:33 -0800562 EXPECT_EQ(send_request.message(), recv_request.message());
563 auto client_initial_metadata = srv_ctx.client_metadata();
yang-ge21908f2015-08-25 13:47:51 -0700564 EXPECT_EQ(meta1.second,
565 ToString(client_initial_metadata.find(meta1.first)->second));
566 EXPECT_EQ(meta2.second,
567 ToString(client_initial_metadata.find(meta2.first)->second));
Craig Tiller6f871642016-02-03 16:15:31 -0800568 EXPECT_EQ(meta3.second,
569 ToString(client_initial_metadata.find(meta3.first)->second));
Craig Tiller8bf2dca2015-07-10 13:08:41 -0700570 EXPECT_GE(client_initial_metadata.size(), static_cast<size_t>(2));
Yang Gao406b32f2015-02-13 16:25:33 -0800571
572 send_response.set_message(recv_request.message());
573 response_writer.Finish(send_response, Status::OK, tag(3));
Yang Gao3a5e5492015-02-18 14:32:38 -0800574 response_reader->Finish(&recv_response, &recv_status, tag(4));
Craig Tillere4d27482016-05-03 12:19:33 -0700575 Verifier(GetParam().disable_blocking)
576 .Expect(3, true)
577 .Expect(4, true)
578 .Verify(cq_.get());
Yang Gao406b32f2015-02-13 16:25:33 -0800579
580 EXPECT_EQ(send_response.message(), recv_response.message());
Yang Gaoc1a2c312015-06-16 10:59:46 -0700581 EXPECT_TRUE(recv_status.ok());
Yang Gao406b32f2015-02-13 16:25:33 -0800582}
583
Craig Tiller69f90e62015-08-06 08:32:35 -0700584TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) {
Yang Gao2b7f5372015-02-18 00:45:53 -0800585 ResetStub();
586
587 EchoRequest send_request;
588 EchoRequest recv_request;
589 EchoResponse send_response;
590 EchoResponse recv_response;
591 Status recv_status;
592
593 ClientContext cli_ctx;
594 ServerContext srv_ctx;
595 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
596
Vijay Paid7b1e702016-05-02 15:10:21 -0700597 send_request.set_message(GetParam().message_content);
Yang Gao2b7f5372015-02-18 00:45:53 -0800598 std::pair<grpc::string, grpc::string> meta1("key1", "val1");
599 std::pair<grpc::string, grpc::string> meta2("key2", "val2");
600
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800601 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700602 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
Yang Gao2b7f5372015-02-18 00:45:53 -0800603
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700604 service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
605 cq_.get(), tag(2));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700606 Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
Yang Gao2b7f5372015-02-18 00:45:53 -0800607 EXPECT_EQ(send_request.message(), recv_request.message());
608 srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
609 srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
610 response_writer.SendInitialMetadata(tag(3));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700611 Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
Yang Gao2b7f5372015-02-18 00:45:53 -0800612
Yang Gao3a5e5492015-02-18 14:32:38 -0800613 response_reader->ReadInitialMetadata(tag(4));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700614 Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
Yang Gao2b7f5372015-02-18 00:45:53 -0800615 auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
yang-ge21908f2015-08-25 13:47:51 -0700616 EXPECT_EQ(meta1.second,
617 ToString(server_initial_metadata.find(meta1.first)->second));
618 EXPECT_EQ(meta2.second,
619 ToString(server_initial_metadata.find(meta2.first)->second));
vjpaid5577aa2015-02-18 22:26:48 -0800620 EXPECT_EQ(static_cast<size_t>(2), server_initial_metadata.size());
Yang Gao3a5e5492015-02-18 14:32:38 -0800621
622 send_response.set_message(recv_request.message());
623 response_writer.Finish(send_response, Status::OK, tag(5));
Yang Gao3a5e5492015-02-18 14:32:38 -0800624 response_reader->Finish(&recv_response, &recv_status, tag(6));
Craig Tillere4d27482016-05-03 12:19:33 -0700625 Verifier(GetParam().disable_blocking)
626 .Expect(5, true)
627 .Expect(6, true)
628 .Verify(cq_.get());
Yang Gao3a5e5492015-02-18 14:32:38 -0800629
630 EXPECT_EQ(send_response.message(), recv_response.message());
Yang Gaoc1a2c312015-06-16 10:59:46 -0700631 EXPECT_TRUE(recv_status.ok());
Yang Gao2b7f5372015-02-18 00:45:53 -0800632}
633
Craig Tiller69f90e62015-08-06 08:32:35 -0700634TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
Yang Gao2b7f5372015-02-18 00:45:53 -0800635 ResetStub();
636
637 EchoRequest send_request;
638 EchoRequest recv_request;
639 EchoResponse send_response;
640 EchoResponse recv_response;
641 Status recv_status;
642
643 ClientContext cli_ctx;
644 ServerContext srv_ctx;
645 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
646
Vijay Paid7b1e702016-05-02 15:10:21 -0700647 send_request.set_message(GetParam().message_content);
Yang Gao2b7f5372015-02-18 00:45:53 -0800648 std::pair<grpc::string, grpc::string> meta1("key1", "val1");
649 std::pair<grpc::string, grpc::string> meta2("key2", "val2");
650
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800651 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700652 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
Yang Gao2b7f5372015-02-18 00:45:53 -0800653
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700654 service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
655 cq_.get(), tag(2));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700656 Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
Yang Gao2b7f5372015-02-18 00:45:53 -0800657 EXPECT_EQ(send_request.message(), recv_request.message());
658 response_writer.SendInitialMetadata(tag(3));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700659 Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
Yang Gao2b7f5372015-02-18 00:45:53 -0800660
661 send_response.set_message(recv_request.message());
662 srv_ctx.AddTrailingMetadata(meta1.first, meta1.second);
663 srv_ctx.AddTrailingMetadata(meta2.first, meta2.second);
664 response_writer.Finish(send_response, Status::OK, tag(4));
Yang Gao3a5e5492015-02-18 14:32:38 -0800665 response_reader->Finish(&recv_response, &recv_status, tag(5));
Craig Tillere4d27482016-05-03 12:19:33 -0700666
667 Verifier(GetParam().disable_blocking)
668 .Expect(4, true)
669 .Expect(5, true)
670 .Verify(cq_.get());
671
Yang Gao2b7f5372015-02-18 00:45:53 -0800672 EXPECT_EQ(send_response.message(), recv_response.message());
Yang Gaoc1a2c312015-06-16 10:59:46 -0700673 EXPECT_TRUE(recv_status.ok());
Yang Gao2b7f5372015-02-18 00:45:53 -0800674 auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
yang-ge21908f2015-08-25 13:47:51 -0700675 EXPECT_EQ(meta1.second,
676 ToString(server_trailing_metadata.find(meta1.first)->second));
677 EXPECT_EQ(meta2.second,
678 ToString(server_trailing_metadata.find(meta2.first)->second));
vjpaid5577aa2015-02-18 22:26:48 -0800679 EXPECT_EQ(static_cast<size_t>(2), server_trailing_metadata.size());
Yang Gao2b7f5372015-02-18 00:45:53 -0800680}
681
Craig Tiller69f90e62015-08-06 08:32:35 -0700682TEST_P(AsyncEnd2endTest, MetadataRpc) {
Yang Gao2b7f5372015-02-18 00:45:53 -0800683 ResetStub();
684
685 EchoRequest send_request;
686 EchoRequest recv_request;
687 EchoResponse send_response;
688 EchoResponse recv_response;
689 Status recv_status;
690
691 ClientContext cli_ctx;
692 ServerContext srv_ctx;
693 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
694
Vijay Paid7b1e702016-05-02 15:10:21 -0700695 send_request.set_message(GetParam().message_content);
Yang Gao2b7f5372015-02-18 00:45:53 -0800696 std::pair<grpc::string, grpc::string> meta1("key1", "val1");
Yang Gao3a5e5492015-02-18 14:32:38 -0800697 std::pair<grpc::string, grpc::string> meta2(
Vijay Pai92a928f2015-03-26 16:30:22 -0400698 "key2-bin",
Craig Tillerd6c98df2015-08-18 09:33:44 -0700699 grpc::string("\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc", 13));
Yang Gao2b7f5372015-02-18 00:45:53 -0800700 std::pair<grpc::string, grpc::string> meta3("key3", "val3");
Craig Tiller47c83fd2015-02-21 22:45:35 -0800701 std::pair<grpc::string, grpc::string> meta6(
702 "key4-bin",
Vijay Pai92a928f2015-03-26 16:30:22 -0400703 grpc::string("\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d",
Craig Tillerd6c98df2015-08-18 09:33:44 -0700704 14));
Yang Gao2b7f5372015-02-18 00:45:53 -0800705 std::pair<grpc::string, grpc::string> meta5("key5", "val5");
Craig Tiller47c83fd2015-02-21 22:45:35 -0800706 std::pair<grpc::string, grpc::string> meta4(
707 "key6-bin",
Craig Tillerd6c98df2015-08-18 09:33:44 -0700708 grpc::string(
709 "\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee", 15));
Yang Gao2b7f5372015-02-18 00:45:53 -0800710
711 cli_ctx.AddMetadata(meta1.first, meta1.second);
712 cli_ctx.AddMetadata(meta2.first, meta2.second);
713
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800714 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700715 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
Yang Gao2b7f5372015-02-18 00:45:53 -0800716
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700717 service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
718 cq_.get(), tag(2));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700719 Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
Yang Gao2b7f5372015-02-18 00:45:53 -0800720 EXPECT_EQ(send_request.message(), recv_request.message());
721 auto client_initial_metadata = srv_ctx.client_metadata();
yang-ge21908f2015-08-25 13:47:51 -0700722 EXPECT_EQ(meta1.second,
723 ToString(client_initial_metadata.find(meta1.first)->second));
724 EXPECT_EQ(meta2.second,
725 ToString(client_initial_metadata.find(meta2.first)->second));
Craig Tiller8bf2dca2015-07-10 13:08:41 -0700726 EXPECT_GE(client_initial_metadata.size(), static_cast<size_t>(2));
Yang Gao2b7f5372015-02-18 00:45:53 -0800727
728 srv_ctx.AddInitialMetadata(meta3.first, meta3.second);
729 srv_ctx.AddInitialMetadata(meta4.first, meta4.second);
730 response_writer.SendInitialMetadata(tag(3));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700731 Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
Yang Gao3a5e5492015-02-18 14:32:38 -0800732 response_reader->ReadInitialMetadata(tag(4));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700733 Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
Yang Gao2b7f5372015-02-18 00:45:53 -0800734 auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
yang-ge21908f2015-08-25 13:47:51 -0700735 EXPECT_EQ(meta3.second,
736 ToString(server_initial_metadata.find(meta3.first)->second));
737 EXPECT_EQ(meta4.second,
738 ToString(server_initial_metadata.find(meta4.first)->second));
Craig Tiller8bf2dca2015-07-10 13:08:41 -0700739 EXPECT_GE(server_initial_metadata.size(), static_cast<size_t>(2));
Yang Gao3a5e5492015-02-18 14:32:38 -0800740
741 send_response.set_message(recv_request.message());
742 srv_ctx.AddTrailingMetadata(meta5.first, meta5.second);
743 srv_ctx.AddTrailingMetadata(meta6.first, meta6.second);
744 response_writer.Finish(send_response, Status::OK, tag(5));
Yang Gao3a5e5492015-02-18 14:32:38 -0800745 response_reader->Finish(&recv_response, &recv_status, tag(6));
Craig Tillere4d27482016-05-03 12:19:33 -0700746
747 Verifier(GetParam().disable_blocking)
748 .Expect(5, true)
749 .Expect(6, true)
750 .Verify(cq_.get());
751
Yang Gao3a5e5492015-02-18 14:32:38 -0800752 EXPECT_EQ(send_response.message(), recv_response.message());
Yang Gaoc1a2c312015-06-16 10:59:46 -0700753 EXPECT_TRUE(recv_status.ok());
Yang Gao2b7f5372015-02-18 00:45:53 -0800754 auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
yang-ge21908f2015-08-25 13:47:51 -0700755 EXPECT_EQ(meta5.second,
756 ToString(server_trailing_metadata.find(meta5.first)->second));
757 EXPECT_EQ(meta6.second,
758 ToString(server_trailing_metadata.find(meta6.first)->second));
Craig Tiller8bf2dca2015-07-10 13:08:41 -0700759 EXPECT_GE(server_trailing_metadata.size(), static_cast<size_t>(2));
Yang Gao2b7f5372015-02-18 00:45:53 -0800760}
yang-gb3352562015-08-04 14:42:06 -0700761
762// Server uses AsyncNotifyWhenDone API to check for cancellation
Craig Tiller69f90e62015-08-06 08:32:35 -0700763TEST_P(AsyncEnd2endTest, ServerCheckCancellation) {
yang-gb3352562015-08-04 14:42:06 -0700764 ResetStub();
765
766 EchoRequest send_request;
767 EchoRequest recv_request;
768 EchoResponse send_response;
769 EchoResponse recv_response;
770 Status recv_status;
771
772 ClientContext cli_ctx;
773 ServerContext srv_ctx;
774 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
775
Vijay Paid7b1e702016-05-02 15:10:21 -0700776 send_request.set_message(GetParam().message_content);
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800777 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
yang-gb3352562015-08-04 14:42:06 -0700778 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
779
780 srv_ctx.AsyncNotifyWhenDone(tag(5));
781 service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
782 cq_.get(), tag(2));
783
Vijay Paidf8b62c2016-05-02 14:34:24 -0700784 Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
yang-gb3352562015-08-04 14:42:06 -0700785 EXPECT_EQ(send_request.message(), recv_request.message());
786
787 cli_ctx.TryCancel();
Vijay Paidf8b62c2016-05-02 14:34:24 -0700788 Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get());
yang-gb3352562015-08-04 14:42:06 -0700789 EXPECT_TRUE(srv_ctx.IsCancelled());
790
791 response_reader->Finish(&recv_response, &recv_status, tag(4));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700792 Verifier(GetParam().disable_blocking).Expect(4, false).Verify(cq_.get());
yang-gb3352562015-08-04 14:42:06 -0700793
794 EXPECT_EQ(StatusCode::CANCELLED, recv_status.error_code());
795}
796
797// Server uses AsyncNotifyWhenDone API to check for normal finish
Craig Tiller69f90e62015-08-06 08:32:35 -0700798TEST_P(AsyncEnd2endTest, ServerCheckDone) {
yang-gb3352562015-08-04 14:42:06 -0700799 ResetStub();
800
801 EchoRequest send_request;
802 EchoRequest recv_request;
803 EchoResponse send_response;
804 EchoResponse recv_response;
805 Status recv_status;
806
807 ClientContext cli_ctx;
808 ServerContext srv_ctx;
809 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
810
Vijay Paid7b1e702016-05-02 15:10:21 -0700811 send_request.set_message(GetParam().message_content);
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800812 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
yang-gb3352562015-08-04 14:42:06 -0700813 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
814
815 srv_ctx.AsyncNotifyWhenDone(tag(5));
816 service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
817 cq_.get(), tag(2));
818
Vijay Paidf8b62c2016-05-02 14:34:24 -0700819 Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
yang-gb3352562015-08-04 14:42:06 -0700820 EXPECT_EQ(send_request.message(), recv_request.message());
821
822 send_response.set_message(recv_request.message());
823 response_writer.Finish(send_response, Status::OK, tag(3));
yang-gb3352562015-08-04 14:42:06 -0700824 response_reader->Finish(&recv_response, &recv_status, tag(4));
Craig Tillere4d27482016-05-03 12:19:33 -0700825 Verifier(GetParam().disable_blocking)
826 .Expect(3, true)
827 .Expect(4, true)
828 .Expect(5, true)
829 .Verify(cq_.get());
830 EXPECT_FALSE(srv_ctx.IsCancelled());
yang-gb3352562015-08-04 14:42:06 -0700831
832 EXPECT_EQ(send_response.message(), recv_response.message());
833 EXPECT_TRUE(recv_status.ok());
834}
835
Craig Tiller8f7bff72015-08-17 13:23:14 -0700836TEST_P(AsyncEnd2endTest, UnimplementedRpc) {
Vijay Paidf8b62c2016-05-02 14:34:24 -0700837 ChannelArguments args;
838 auto channel_creds =
839 GetChannelCredentials(GetParam().credentials_type, &args);
yang-g730055d2015-08-27 12:29:45 -0700840 std::shared_ptr<Channel> channel =
Vijay Paidf8b62c2016-05-02 14:34:24 -0700841 CreateCustomChannel(server_address_.str(), channel_creds, args);
Craig Tiller1b4e3302015-12-17 16:35:00 -0800842 std::unique_ptr<grpc::testing::UnimplementedService::Stub> stub;
843 stub = grpc::testing::UnimplementedService::NewStub(channel);
yang-g9b7757d2015-08-13 11:15:53 -0700844 EchoRequest send_request;
845 EchoResponse recv_response;
846 Status recv_status;
847
848 ClientContext cli_ctx;
Vijay Paid7b1e702016-05-02 15:10:21 -0700849 send_request.set_message(GetParam().message_content);
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800850 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
yang-g9b7757d2015-08-13 11:15:53 -0700851 stub->AsyncUnimplemented(&cli_ctx, send_request, cq_.get()));
852
853 response_reader->Finish(&recv_response, &recv_status, tag(4));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700854 Verifier(GetParam().disable_blocking).Expect(4, false).Verify(cq_.get());
yang-g9b7757d2015-08-13 11:15:53 -0700855
856 EXPECT_EQ(StatusCode::UNIMPLEMENTED, recv_status.error_code());
857 EXPECT_EQ("", recv_status.error_message());
858}
859
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -0800860// This class is for testing scenarios where RPCs are cancelled on the server
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800861// by calling ServerContext::TryCancel(). Server uses AsyncNotifyWhenDone
862// API to check for cancellation
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800863class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
864 protected:
865 typedef enum {
866 DO_NOT_CANCEL = 0,
867 CANCEL_BEFORE_PROCESSING,
868 CANCEL_DURING_PROCESSING,
869 CANCEL_AFTER_PROCESSING
870 } ServerTryCancelRequestPhase;
871
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -0800872 // Helper for testing client-streaming RPCs which are cancelled on the server.
873 // Depending on the value of server_try_cancel parameter, this will test one
874 // of the following three scenarios:
875 // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading
876 // any messages from the client
877 //
878 // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading
879 // messages from the client
880 //
881 // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all
882 // messages from the client (but before sending any status back to the
883 // client)
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800884 void TestClientStreamingServerCancel(
885 ServerTryCancelRequestPhase server_try_cancel) {
886 ResetStub();
887
888 EchoRequest send_request;
889 EchoRequest recv_request;
890 EchoResponse send_response;
891 EchoResponse recv_response;
892 Status recv_status;
893
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800894 ClientContext cli_ctx;
895 ServerContext srv_ctx;
896 ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
897
898 // Initiate the 'RequestStream' call on client
899 std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
Sree Kuchibhotla4fb59082016-01-29 11:16:24 -0800900 stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700901 Verifier(GetParam().disable_blocking).Expect(1, true).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800902
903 // On the server, request to be notified of 'RequestStream' calls
904 // and receive the 'RequestStream' call just made by the client
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800905 srv_ctx.AsyncNotifyWhenDone(tag(11));
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800906 service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
907 tag(2));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700908 Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800909
910 // Client sends 3 messages (tags 3, 4 and 5)
911 for (int tag_idx = 3; tag_idx <= 5; tag_idx++) {
912 send_request.set_message("Ping " + std::to_string(tag_idx));
913 cli_stream->Write(send_request, tag(tag_idx));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700914 Verifier(GetParam().disable_blocking)
915 .Expect(tag_idx, true)
916 .Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800917 }
918 cli_stream->WritesDone(tag(6));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700919 Verifier(GetParam().disable_blocking).Expect(6, true).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800920
921 bool expected_server_cq_result = true;
922 bool ignore_cq_result = false;
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800923 bool want_done_tag = false;
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800924
925 if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800926 srv_ctx.TryCancel();
Vijay Paidf8b62c2016-05-02 14:34:24 -0700927 Verifier(GetParam().disable_blocking).Expect(11, true).Verify(cq_.get());
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800928 EXPECT_TRUE(srv_ctx.IsCancelled());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800929
930 // Since cancellation is done before server reads any results, we know
931 // for sure that all cq results will return false from this point forward
932 expected_server_cq_result = false;
933 }
934
935 std::thread* server_try_cancel_thd = NULL;
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800936
Vijay Paidf8b62c2016-05-02 14:34:24 -0700937 auto verif = Verifier(GetParam().disable_blocking);
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800938
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800939 if (server_try_cancel == CANCEL_DURING_PROCESSING) {
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800940 server_try_cancel_thd =
941 new std::thread(&ServerContext::TryCancel, &srv_ctx);
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800942 // Server will cancel the RPC in a parallel thread while reading the
943 // requests from the client. Since the cancellation can happen at anytime,
944 // some of the cq results (i.e those until cancellation) might be true but
945 // its non deterministic. So better to ignore the cq results
946 ignore_cq_result = true;
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800947 // Expect that we might possibly see the done tag that
948 // indicates cancellation completion in this case
949 want_done_tag = true;
950 verif.Expect(11, true);
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800951 }
952
953 // Server reads 3 messages (tags 6, 7 and 8)
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800954 // But if want_done_tag is true, we might also see tag 11
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800955 for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
956 srv_stream.Read(&recv_request, tag(tag_idx));
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800957 // Note that we'll add something to the verifier and verify that
958 // something was seen, but it might be tag 11 and not what we
959 // just added
960 int got_tag = verif.Expect(tag_idx, expected_server_cq_result)
961 .Next(cq_.get(), ignore_cq_result);
962 GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag));
963 if (got_tag == 11) {
964 EXPECT_TRUE(srv_ctx.IsCancelled());
965 want_done_tag = false;
966 // Now get the other entry that we were waiting on
967 EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), tag_idx);
968 }
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800969 }
970
971 if (server_try_cancel_thd != NULL) {
972 server_try_cancel_thd->join();
973 delete server_try_cancel_thd;
974 }
975
976 if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800977 srv_ctx.TryCancel();
978 want_done_tag = true;
979 verif.Expect(11, true);
980 }
981
982 if (want_done_tag) {
983 verif.Verify(cq_.get());
984 EXPECT_TRUE(srv_ctx.IsCancelled());
985 want_done_tag = false;
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800986 }
987
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -0800988 // The RPC has been cancelled at this point for sure (i.e irrespective of
989 // the value of `server_try_cancel` is). So, from this point forward, we
990 // know that cq results are supposed to return false on server.
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800991
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -0800992 // Server sends the final message and cancelled status (but the RPC is
993 // already cancelled at this point. So we expect the operation to fail)
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800994 srv_stream.Finish(send_response, Status::CANCELLED, tag(9));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700995 Verifier(GetParam().disable_blocking).Expect(9, false).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800996
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -0800997 // Client will see the cancellation
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800998 cli_stream->Finish(&recv_status, tag(10));
Sree Kuchibhotla369a04a2016-02-01 10:53:13 -0800999 // TODO(sreek): The expectation here should be true. This is a bug (github
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001000 // issue #4972)
Vijay Paidf8b62c2016-05-02 14:34:24 -07001001 Verifier(GetParam().disable_blocking).Expect(10, false).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001002 EXPECT_FALSE(recv_status.ok());
1003 EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code());
1004 }
1005
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001006 // Helper for testing server-streaming RPCs which are cancelled on the server.
1007 // Depending on the value of server_try_cancel parameter, this will test one
1008 // of the following three scenarios:
1009 // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before sending
1010 // any messages to the client
1011 //
1012 // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while sending
1013 // messages to the client
1014 //
1015 // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after sending all
1016 // messages to the client (but before sending any status back to the
1017 // client)
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001018 void TestServerStreamingServerCancel(
1019 ServerTryCancelRequestPhase server_try_cancel) {
1020 ResetStub();
1021
1022 EchoRequest send_request;
1023 EchoRequest recv_request;
1024 EchoResponse send_response;
1025 EchoResponse recv_response;
1026 Status recv_status;
1027 ClientContext cli_ctx;
1028 ServerContext srv_ctx;
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001029 ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
1030
1031 send_request.set_message("Ping");
1032 // Initiate the 'ResponseStream' call on the client
1033 std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
Sree Kuchibhotla4fb59082016-01-29 11:16:24 -08001034 stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
Vijay Paidf8b62c2016-05-02 14:34:24 -07001035 Verifier(GetParam().disable_blocking).Expect(1, true).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001036 // On the server, request to be notified of 'ResponseStream' calls and
1037 // receive the call just made by the client
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001038 srv_ctx.AsyncNotifyWhenDone(tag(11));
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001039 service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
1040 cq_.get(), cq_.get(), tag(2));
Vijay Paidf8b62c2016-05-02 14:34:24 -07001041 Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001042 EXPECT_EQ(send_request.message(), recv_request.message());
1043
1044 bool expected_cq_result = true;
1045 bool ignore_cq_result = false;
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001046 bool want_done_tag = false;
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001047
1048 if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001049 srv_ctx.TryCancel();
Vijay Paidf8b62c2016-05-02 14:34:24 -07001050 Verifier(GetParam().disable_blocking).Expect(11, true).Verify(cq_.get());
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001051 EXPECT_TRUE(srv_ctx.IsCancelled());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001052
1053 // We know for sure that all cq results will be false from this point
1054 // since the server cancelled the RPC
1055 expected_cq_result = false;
1056 }
1057
1058 std::thread* server_try_cancel_thd = NULL;
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001059
Vijay Paidf8b62c2016-05-02 14:34:24 -07001060 auto verif = Verifier(GetParam().disable_blocking);
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001061
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001062 if (server_try_cancel == CANCEL_DURING_PROCESSING) {
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001063 server_try_cancel_thd =
1064 new std::thread(&ServerContext::TryCancel, &srv_ctx);
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001065
1066 // Server will cancel the RPC in a parallel thread while writing responses
1067 // to the client. Since the cancellation can happen at anytime, some of
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001068 // the cq results (i.e those until cancellation) might be true but it is
1069 // non deterministic. So better to ignore the cq results
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001070 ignore_cq_result = true;
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001071 // Expect that we might possibly see the done tag that
1072 // indicates cancellation completion in this case
1073 want_done_tag = true;
1074 verif.Expect(11, true);
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001075 }
1076
1077 // Server sends three messages (tags 3, 4 and 5)
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001078 // But if want_done tag is true, we might also see tag 11
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001079 for (int tag_idx = 3; tag_idx <= 5; tag_idx++) {
1080 send_response.set_message("Pong " + std::to_string(tag_idx));
1081 srv_stream.Write(send_response, tag(tag_idx));
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001082 // Note that we'll add something to the verifier and verify that
1083 // something was seen, but it might be tag 11 and not what we
1084 // just added
1085 int got_tag = verif.Expect(tag_idx, expected_cq_result)
1086 .Next(cq_.get(), ignore_cq_result);
1087 GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag));
1088 if (got_tag == 11) {
1089 EXPECT_TRUE(srv_ctx.IsCancelled());
1090 want_done_tag = false;
1091 // Now get the other entry that we were waiting on
1092 EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), tag_idx);
1093 }
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001094 }
1095
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001096 if (server_try_cancel_thd != NULL) {
1097 server_try_cancel_thd->join();
1098 delete server_try_cancel_thd;
1099 }
1100
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001101 if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001102 srv_ctx.TryCancel();
1103 want_done_tag = true;
1104 verif.Expect(11, true);
yang-gad0df7b2016-02-22 10:00:20 -08001105
1106 // Client reads may fail bacause it is notified that the stream is
1107 // cancelled.
1108 ignore_cq_result = true;
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001109 }
1110
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001111 if (want_done_tag) {
1112 verif.Verify(cq_.get());
1113 EXPECT_TRUE(srv_ctx.IsCancelled());
1114 want_done_tag = false;
1115 }
1116
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001117 // Client attemts to read the three messages from the server
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001118 for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
1119 cli_stream->Read(&recv_response, tag(tag_idx));
Vijay Paidf8b62c2016-05-02 14:34:24 -07001120 Verifier(GetParam().disable_blocking)
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001121 .Expect(tag_idx, expected_cq_result)
Sree Kuchibhotla4fb59082016-01-29 11:16:24 -08001122 .Verify(cq_.get(), ignore_cq_result);
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001123 }
1124
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001125 // The RPC has been cancelled at this point for sure (i.e irrespective of
1126 // the value of `server_try_cancel` is). So, from this point forward, we
1127 // know that cq results are supposed to return false on server.
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001128
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001129 // Server finishes the stream (but the RPC is already cancelled)
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001130 srv_stream.Finish(Status::CANCELLED, tag(9));
Vijay Paidf8b62c2016-05-02 14:34:24 -07001131 Verifier(GetParam().disable_blocking).Expect(9, false).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001132
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001133 // Client will see the cancellation
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001134 cli_stream->Finish(&recv_status, tag(10));
Vijay Paidf8b62c2016-05-02 14:34:24 -07001135 Verifier(GetParam().disable_blocking).Expect(10, true).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001136 EXPECT_FALSE(recv_status.ok());
1137 EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code());
1138 }
1139
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001140 // Helper for testing bidirectinal-streaming RPCs which are cancelled on the
1141 // server.
1142 //
1143 // Depending on the value of server_try_cancel parameter, this will
1144 // test one of the following three scenarios:
1145 // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading/
1146 // writing any messages from/to the client
1147 //
1148 // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading
1149 // messages from the client
1150 //
1151 // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all
1152 // messages from the client (but before sending any status back to the
1153 // client)
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001154 void TestBidiStreamingServerCancel(
1155 ServerTryCancelRequestPhase server_try_cancel) {
1156 ResetStub();
1157
1158 EchoRequest send_request;
1159 EchoRequest recv_request;
1160 EchoResponse send_response;
1161 EchoResponse recv_response;
1162 Status recv_status;
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001163 ClientContext cli_ctx;
1164 ServerContext srv_ctx;
1165 ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
1166
1167 // Initiate the call from the client side
1168 std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
Sree Kuchibhotla4fb59082016-01-29 11:16:24 -08001169 cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
Vijay Paidf8b62c2016-05-02 14:34:24 -07001170 Verifier(GetParam().disable_blocking).Expect(1, true).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001171
1172 // On the server, request to be notified of the 'BidiStream' call and
1173 // receive the call just made by the client
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001174 srv_ctx.AsyncNotifyWhenDone(tag(11));
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001175 service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
1176 tag(2));
Vijay Paidf8b62c2016-05-02 14:34:24 -07001177 Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001178
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001179 // Client sends the first and the only message
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001180 send_request.set_message("Ping");
1181 cli_stream->Write(send_request, tag(3));
Vijay Paidf8b62c2016-05-02 14:34:24 -07001182 Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001183
1184 bool expected_cq_result = true;
1185 bool ignore_cq_result = false;
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001186 bool want_done_tag = false;
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001187
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001188 if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001189 srv_ctx.TryCancel();
Vijay Paidf8b62c2016-05-02 14:34:24 -07001190 Verifier(GetParam().disable_blocking).Expect(11, true).Verify(cq_.get());
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001191 EXPECT_TRUE(srv_ctx.IsCancelled());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001192
1193 // We know for sure that all cq results will be false from this point
1194 // since the server cancelled the RPC
1195 expected_cq_result = false;
1196 }
1197
1198 std::thread* server_try_cancel_thd = NULL;
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001199
Vijay Paidf8b62c2016-05-02 14:34:24 -07001200 auto verif = Verifier(GetParam().disable_blocking);
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001201
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001202 if (server_try_cancel == CANCEL_DURING_PROCESSING) {
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001203 server_try_cancel_thd =
1204 new std::thread(&ServerContext::TryCancel, &srv_ctx);
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001205
1206 // Since server is going to cancel the RPC in a parallel thread, some of
1207 // the cq results (i.e those until the cancellation) might be true. Since
1208 // that number is non-deterministic, it is better to ignore the cq results
1209 ignore_cq_result = true;
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001210 // Expect that we might possibly see the done tag that
1211 // indicates cancellation completion in this case
1212 want_done_tag = true;
1213 verif.Expect(11, true);
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001214 }
1215
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001216 int got_tag;
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001217 srv_stream.Read(&recv_request, tag(4));
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001218 verif.Expect(4, expected_cq_result);
1219 got_tag = verif.Next(cq_.get(), ignore_cq_result);
1220 GPR_ASSERT((got_tag == 4) || (got_tag == 11 && want_done_tag));
1221 if (got_tag == 11) {
1222 EXPECT_TRUE(srv_ctx.IsCancelled());
1223 want_done_tag = false;
1224 // Now get the other entry that we were waiting on
1225 EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 4);
1226 }
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001227
1228 send_response.set_message("Pong");
1229 srv_stream.Write(send_response, tag(5));
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001230 verif.Expect(5, expected_cq_result);
1231 got_tag = verif.Next(cq_.get(), ignore_cq_result);
1232 GPR_ASSERT((got_tag == 5) || (got_tag == 11 && want_done_tag));
1233 if (got_tag == 11) {
1234 EXPECT_TRUE(srv_ctx.IsCancelled());
1235 want_done_tag = false;
1236 // Now get the other entry that we were waiting on
1237 EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 5);
1238 }
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001239
1240 cli_stream->Read(&recv_response, tag(6));
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001241 verif.Expect(6, expected_cq_result);
1242 got_tag = verif.Next(cq_.get(), ignore_cq_result);
1243 GPR_ASSERT((got_tag == 6) || (got_tag == 11 && want_done_tag));
1244 if (got_tag == 11) {
1245 EXPECT_TRUE(srv_ctx.IsCancelled());
1246 want_done_tag = false;
1247 // Now get the other entry that we were waiting on
1248 EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 6);
1249 }
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001250
1251 // This is expected to succeed in all cases
1252 cli_stream->WritesDone(tag(7));
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001253 verif.Expect(7, true);
1254 got_tag = verif.Next(cq_.get(), ignore_cq_result);
1255 GPR_ASSERT((got_tag == 7) || (got_tag == 11 && want_done_tag));
1256 if (got_tag == 11) {
1257 EXPECT_TRUE(srv_ctx.IsCancelled());
1258 want_done_tag = false;
1259 // Now get the other entry that we were waiting on
1260 EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 7);
1261 }
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001262
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001263 // This is expected to fail in all cases i.e for all values of
Vijay Pai018879a2016-02-16 09:20:50 -08001264 // server_try_cancel. This is because at this point, either there are no
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001265 // more msgs from the client (because client called WritesDone) or the RPC
1266 // is cancelled on the server
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001267 srv_stream.Read(&recv_request, tag(8));
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001268 verif.Expect(8, false);
1269 got_tag = verif.Next(cq_.get(), ignore_cq_result);
1270 GPR_ASSERT((got_tag == 8) || (got_tag == 11 && want_done_tag));
1271 if (got_tag == 11) {
1272 EXPECT_TRUE(srv_ctx.IsCancelled());
1273 want_done_tag = false;
1274 // Now get the other entry that we were waiting on
1275 EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 8);
1276 }
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001277
1278 if (server_try_cancel_thd != NULL) {
1279 server_try_cancel_thd->join();
1280 delete server_try_cancel_thd;
1281 }
1282
1283 if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001284 srv_ctx.TryCancel();
1285 want_done_tag = true;
1286 verif.Expect(11, true);
1287 }
1288
1289 if (want_done_tag) {
1290 verif.Verify(cq_.get());
1291 EXPECT_TRUE(srv_ctx.IsCancelled());
1292 want_done_tag = false;
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001293 }
1294
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001295 // The RPC has been cancelled at this point for sure (i.e irrespective of
1296 // the value of `server_try_cancel` is). So, from this point forward, we
1297 // know that cq results are supposed to return false on server.
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001298
1299 srv_stream.Finish(Status::CANCELLED, tag(9));
Vijay Paidf8b62c2016-05-02 14:34:24 -07001300 Verifier(GetParam().disable_blocking).Expect(9, false).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001301
1302 cli_stream->Finish(&recv_status, tag(10));
Vijay Paidf8b62c2016-05-02 14:34:24 -07001303 Verifier(GetParam().disable_blocking).Expect(10, true).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001304 EXPECT_FALSE(recv_status.ok());
1305 EXPECT_EQ(grpc::StatusCode::CANCELLED, recv_status.error_code());
1306 }
1307};
1308
1309TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelBefore) {
1310 TestClientStreamingServerCancel(CANCEL_BEFORE_PROCESSING);
1311}
1312
1313TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelDuring) {
1314 TestClientStreamingServerCancel(CANCEL_DURING_PROCESSING);
1315}
1316
1317TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelAfter) {
1318 TestClientStreamingServerCancel(CANCEL_AFTER_PROCESSING);
1319}
1320
1321TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelBefore) {
1322 TestServerStreamingServerCancel(CANCEL_BEFORE_PROCESSING);
1323}
1324
1325TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelDuring) {
1326 TestServerStreamingServerCancel(CANCEL_DURING_PROCESSING);
1327}
1328
1329TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelAfter) {
1330 TestServerStreamingServerCancel(CANCEL_AFTER_PROCESSING);
1331}
1332
1333TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelBefore) {
1334 TestBidiStreamingServerCancel(CANCEL_BEFORE_PROCESSING);
1335}
1336
1337TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelDuring) {
1338 TestBidiStreamingServerCancel(CANCEL_DURING_PROCESSING);
1339}
1340
1341TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelAfter) {
1342 TestBidiStreamingServerCancel(CANCEL_AFTER_PROCESSING);
1343}
1344
Vijay Paidf8b62c2016-05-02 14:34:24 -07001345std::vector<TestScenario> CreateTestScenarios(bool test_disable_blocking,
Vijay Paid7b1e702016-05-02 15:10:21 -07001346 bool test_secure,
1347 int test_big_limit) {
Vijay Paidf8b62c2016-05-02 14:34:24 -07001348 std::vector<TestScenario> scenarios;
1349 std::vector<grpc::string> credentials_types;
Vijay Paid7b1e702016-05-02 15:10:21 -07001350 std::vector<grpc::string> messages;
1351
Vijay Paidf8b62c2016-05-02 14:34:24 -07001352 credentials_types.push_back(kInsecureCredentialsType);
Vijay Paid7b1e702016-05-02 15:10:21 -07001353 auto sec_list = GetSecureCredentialsTypeList();
1354 for (auto sec = sec_list.begin(); sec != sec_list.end(); sec++) {
1355 credentials_types.push_back(*sec);
1356 }
1357
1358 messages.push_back("Hello");
1359 for (int sz = 1; sz < test_big_limit; sz *= 2) {
1360 grpc::string big_msg;
1361 for (int i = 0; i < sz * 1024; i++) {
1362 char c = 'a' + (i % 26);
1363 big_msg += c;
1364 }
1365 messages.push_back(big_msg);
1366 }
1367
1368 for (auto cred = credentials_types.begin(); cred != credentials_types.end();
1369 ++cred) {
1370 for (auto msg = messages.begin(); msg != messages.end(); msg++) {
1371 scenarios.push_back(TestScenario(false, *cred, *msg));
1372 if (test_disable_blocking) {
1373 scenarios.push_back(TestScenario(true, *cred, *msg));
1374 }
Vijay Paidf8b62c2016-05-02 14:34:24 -07001375 }
1376 }
1377 return scenarios;
1378}
1379
Craig Tiller4c06b822015-08-06 08:41:31 -07001380INSTANTIATE_TEST_CASE_P(AsyncEnd2end, AsyncEnd2endTest,
Vijay Paid7b1e702016-05-02 15:10:21 -07001381 ::testing::ValuesIn(CreateTestScenarios(true, true,
1382 1024)));
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001383INSTANTIATE_TEST_CASE_P(AsyncEnd2endServerTryCancel,
1384 AsyncEnd2endServerTryCancelTest,
Vijay Paid7b1e702016-05-02 15:10:21 -07001385 ::testing::ValuesIn(CreateTestScenarios(false, false,
1386 0)));
Craig Tiller69f90e62015-08-06 08:32:35 -07001387
Craig Tiller0220cf12015-02-12 17:39:26 -08001388} // namespace
1389} // namespace testing
1390} // namespace grpc
1391
1392int main(int argc, char** argv) {
1393 grpc_test_init(argc, argv);
Vijay Paib65eda42016-02-16 13:48:05 -08001394 gpr_tls_init(&g_is_async_end2end_test);
Craig Tiller0220cf12015-02-12 17:39:26 -08001395 ::testing::InitGoogleTest(&argc, argv);
Vijay Paib65eda42016-02-16 13:48:05 -08001396 int ret = RUN_ALL_TESTS();
1397 gpr_tls_destroy(&g_is_async_end2end_test);
1398 return ret;
Craig Tiller0220cf12015-02-12 17:39:26 -08001399}