blob: 8d58726f13de0abf5f74fd9e70ed57bb7bd12b6d [file] [log] [blame]
Craig Tiller0220cf12015-02-12 17:39:26 -08001/*
2 *
Craig Tiller6169d5f2016-03-31 07:46:18 -07003 * Copyright 2015, Google Inc.
Craig Tiller0220cf12015-02-12 17:39:26 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Yang Gaoda699b82015-02-18 01:10:22 -080034#include <memory>
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -080035#include <thread>
Craig Tiller0220cf12015-02-12 17:39:26 -080036
yang-g8c2be9f2015-08-19 16:28:09 -070037#include <grpc++/channel.h>
Craig Tiller0220cf12015-02-12 17:39:26 -080038#include <grpc++/client_context.h>
39#include <grpc++/create_channel.h>
Craig Tiller0220cf12015-02-12 17:39:26 -080040#include <grpc++/server.h>
41#include <grpc++/server_builder.h>
42#include <grpc++/server_context.h>
Sree Kuchibhotlab0d0c8e2016-01-13 22:52:17 -080043#include <grpc/grpc.h>
44#include <grpc/support/thd.h>
45#include <grpc/support/time.h>
Vijay Paib65eda42016-02-16 13:48:05 -080046#include <grpc/support/tls.h>
Craig Tiller0220cf12015-02-12 17:39:26 -080047#include <gtest/gtest.h>
48
Craig Tiller1b4e3302015-12-17 16:35:00 -080049#include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
50#include "src/proto/grpc/testing/echo.grpc.pb.h"
Sree Kuchibhotlab0d0c8e2016-01-13 22:52:17 -080051#include "test/core/util/port.h"
52#include "test/core/util/test_config.h"
yang-ge21908f2015-08-25 13:47:51 -070053#include "test/cpp/util/string_ref_helper.h"
Vijay Paidf8b62c2016-05-02 14:34:24 -070054#include "test/cpp/util/test_credentials_provider.h"
Craig Tiller0220cf12015-02-12 17:39:26 -080055
Craig Tiller69f90e62015-08-06 08:32:35 -070056#ifdef GPR_POSIX_SOCKET
Craig Tillerf45496f2016-03-30 07:41:19 -070057#include "src/core/lib/iomgr/ev_posix.h"
Craig Tiller69f90e62015-08-06 08:32:35 -070058#endif
59
Craig Tiller1b4e3302015-12-17 16:35:00 -080060using grpc::testing::EchoRequest;
61using grpc::testing::EchoResponse;
Vijay Paidf8b62c2016-05-02 14:34:24 -070062using grpc::testing::kTlsCredentialsType;
Craig Tiller0220cf12015-02-12 17:39:26 -080063using std::chrono::system_clock;
64
Vijay Paib65eda42016-02-16 13:48:05 -080065GPR_TLS_DECL(g_is_async_end2end_test);
66
Craig Tiller0220cf12015-02-12 17:39:26 -080067namespace grpc {
68namespace testing {
69
70namespace {
71
Craig Tiller7536af02015-12-22 13:49:30 -080072void* tag(int i) { return (void*)(intptr_t)i; }
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -080073int detag(void* p) { return static_cast<int>(reinterpret_cast<intptr_t>(p)); }
Yang Gaoc05b6cb2015-02-13 00:34:10 -080074
Craig Tiller69f90e62015-08-06 08:32:35 -070075#ifdef GPR_POSIX_SOCKET
Vijay Paib65eda42016-02-16 13:48:05 -080076static int maybe_assert_non_blocking_poll(struct pollfd* pfds, nfds_t nfds,
77 int timeout) {
78 if (gpr_tls_get(&g_is_async_end2end_test)) {
79 GPR_ASSERT(timeout == 0);
80 }
81 return poll(pfds, nfds, timeout);
Craig Tiller69f90e62015-08-06 08:32:35 -070082}
83
84class PollOverride {
Craig Tiller06cf3cc2015-05-13 13:11:01 -070085 public:
Craig Tiller69f90e62015-08-06 08:32:35 -070086 PollOverride(grpc_poll_function_type f) {
87 prev_ = grpc_poll_function;
88 grpc_poll_function = f;
89 }
90
Craig Tiller4c06b822015-08-06 08:41:31 -070091 ~PollOverride() { grpc_poll_function = prev_; }
Craig Tiller69f90e62015-08-06 08:32:35 -070092
93 private:
94 grpc_poll_function_type prev_;
95};
96
vjpaicf4daeb2016-02-15 02:33:54 -080097class PollingOverrider : public PollOverride {
Craig Tiller69f90e62015-08-06 08:32:35 -070098 public:
vjpaicf4daeb2016-02-15 02:33:54 -080099 explicit PollingOverrider(bool allow_blocking)
Vijay Paib65eda42016-02-16 13:48:05 -0800100 : PollOverride(allow_blocking ? poll : maybe_assert_non_blocking_poll) {}
Craig Tiller69f90e62015-08-06 08:32:35 -0700101};
102#else
vjpaicf4daeb2016-02-15 02:33:54 -0800103class PollingOverrider {
Craig Tiller69f90e62015-08-06 08:32:35 -0700104 public:
vjpaicf4daeb2016-02-15 02:33:54 -0800105 explicit PollingOverrider(bool allow_blocking) {}
Craig Tiller69f90e62015-08-06 08:32:35 -0700106};
107#endif
108
vjpaicf4daeb2016-02-15 02:33:54 -0800109class Verifier {
Craig Tiller69f90e62015-08-06 08:32:35 -0700110 public:
vjpaicf4daeb2016-02-15 02:33:54 -0800111 explicit Verifier(bool spin) : spin_(spin) {}
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800112 // Expect sets the expected ok value for a specific tag
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700113 Verifier& Expect(int i, bool expect_ok) {
114 expectations_[tag(i)] = expect_ok;
115 return *this;
vjpai7aadf462015-03-16 23:58:44 -0700116 }
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800117
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800118 // Next waits for 1 async tag to complete, checks its
119 // expectations, and returns the tag
120 int Next(CompletionQueue* cq, bool ignore_ok) {
121 bool ok;
122 void* got_tag;
123 if (spin_) {
124 for (;;) {
125 auto r = cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME));
126 if (r == CompletionQueue::TIMEOUT) continue;
127 if (r == CompletionQueue::GOT_EVENT) break;
128 gpr_log(GPR_ERROR, "unexpected result from AsyncNext");
129 abort();
130 }
131 } else {
132 EXPECT_TRUE(cq->Next(&got_tag, &ok));
133 }
134 auto it = expectations_.find(got_tag);
135 EXPECT_TRUE(it != expectations_.end());
136 if (!ignore_ok) {
137 EXPECT_EQ(it->second, ok);
138 }
139 expectations_.erase(it);
140 return detag(got_tag);
141 }
142
143 // Verify keeps calling Next until all currently set
144 // expected tags are complete
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800145 void Verify(CompletionQueue* cq) { Verify(cq, false); }
146
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800147 // This version of Verify allows optionally ignoring the
148 // outcome of the expectation
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800149 void Verify(CompletionQueue* cq, bool ignore_ok) {
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700150 GPR_ASSERT(!expectations_.empty());
151 while (!expectations_.empty()) {
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800152 Next(cq, ignore_ok);
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700153 }
154 }
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800155 // This version of Verify stops after a certain deadline
Craig Tillerd6c98df2015-08-18 09:33:44 -0700156 void Verify(CompletionQueue* cq,
157 std::chrono::system_clock::time_point deadline) {
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700158 if (expectations_.empty()) {
159 bool ok;
Craig Tillerd6c98df2015-08-18 09:33:44 -0700160 void* got_tag;
Craig Tiller69f90e62015-08-06 08:32:35 -0700161 if (spin_) {
162 while (std::chrono::system_clock::now() < deadline) {
Craig Tiller4c06b822015-08-06 08:41:31 -0700163 EXPECT_EQ(
164 cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME)),
165 CompletionQueue::TIMEOUT);
Craig Tiller69f90e62015-08-06 08:32:35 -0700166 }
167 } else {
Craig Tiller4c06b822015-08-06 08:41:31 -0700168 EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
169 CompletionQueue::TIMEOUT);
Craig Tiller69f90e62015-08-06 08:32:35 -0700170 }
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700171 } else {
172 while (!expectations_.empty()) {
173 bool ok;
Craig Tillerd6c98df2015-08-18 09:33:44 -0700174 void* got_tag;
Craig Tiller69f90e62015-08-06 08:32:35 -0700175 if (spin_) {
176 for (;;) {
177 GPR_ASSERT(std::chrono::system_clock::now() < deadline);
Craig Tiller4c06b822015-08-06 08:41:31 -0700178 auto r =
179 cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME));
Craig Tiller69f90e62015-08-06 08:32:35 -0700180 if (r == CompletionQueue::TIMEOUT) continue;
181 if (r == CompletionQueue::GOT_EVENT) break;
182 gpr_log(GPR_ERROR, "unexpected result from AsyncNext");
183 abort();
Craig Tiller4c06b822015-08-06 08:41:31 -0700184 }
Craig Tiller69f90e62015-08-06 08:32:35 -0700185 } else {
Craig Tiller4c06b822015-08-06 08:41:31 -0700186 EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
187 CompletionQueue::GOT_EVENT);
Craig Tiller69f90e62015-08-06 08:32:35 -0700188 }
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700189 auto it = expectations_.find(got_tag);
190 EXPECT_TRUE(it != expectations_.end());
191 EXPECT_EQ(it->second, ok);
192 expectations_.erase(it);
193 }
194 }
195 }
196
197 private:
198 std::map<void*, bool> expectations_;
Craig Tiller69f90e62015-08-06 08:32:35 -0700199 bool spin_;
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700200};
vjpai7aadf462015-03-16 23:58:44 -0700201
Vijay Paidf8b62c2016-05-02 14:34:24 -0700202class TestScenario {
203 public:
204 TestScenario(bool non_block, const grpc::string& creds_type)
205 : disable_blocking(non_block), credentials_type(creds_type) {}
206 bool disable_blocking;
207 const grpc::string credentials_type;
208};
209
210class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
Craig Tiller0220cf12015-02-12 17:39:26 -0800211 protected:
Vijay Pai018879a2016-02-16 09:20:50 -0800212 AsyncEnd2endTest() {}
Craig Tiller0220cf12015-02-12 17:39:26 -0800213
Craig Tillercf133f42015-02-26 14:05:56 -0800214 void SetUp() GRPC_OVERRIDE {
Vijay Paidf8b62c2016-05-02 14:34:24 -0700215 poll_overrider_.reset(new PollingOverrider(!GetParam().disable_blocking));
Vijay Pai018879a2016-02-16 09:20:50 -0800216
Craig Tiller0220cf12015-02-12 17:39:26 -0800217 int port = grpc_pick_unused_port_or_die();
218 server_address_ << "localhost:" << port;
vjpai017ed622015-12-09 10:42:54 -0800219
Craig Tiller0220cf12015-02-12 17:39:26 -0800220 // Setup server
221 ServerBuilder builder;
Vijay Paidf8b62c2016-05-02 14:34:24 -0700222 auto server_creds = GetServerCredentials(GetParam().credentials_type);
223 builder.AddListeningPort(server_address_.str(), server_creds);
Craig Tiller15f383c2016-01-07 12:45:32 -0800224 builder.RegisterService(&service_);
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700225 cq_ = builder.AddCompletionQueue();
Craig Tiller0220cf12015-02-12 17:39:26 -0800226 server_ = builder.BuildAndStart();
Vijay Paib65eda42016-02-16 13:48:05 -0800227
228 gpr_tls_set(&g_is_async_end2end_test, 1);
Craig Tiller0220cf12015-02-12 17:39:26 -0800229 }
230
Craig Tillercf133f42015-02-26 14:05:56 -0800231 void TearDown() GRPC_OVERRIDE {
Craig Tiller492968f2015-02-18 13:14:03 -0800232 server_->Shutdown();
233 void* ignored_tag;
234 bool ignored_ok;
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700235 cq_->Shutdown();
236 while (cq_->Next(&ignored_tag, &ignored_ok))
Craig Tiller492968f2015-02-18 13:14:03 -0800237 ;
Vijay Pai018879a2016-02-16 09:20:50 -0800238 poll_overrider_.reset();
Vijay Paib65eda42016-02-16 13:48:05 -0800239 gpr_tls_set(&g_is_async_end2end_test, 0);
Craig Tiller492968f2015-02-18 13:14:03 -0800240 }
Craig Tiller0220cf12015-02-12 17:39:26 -0800241
242 void ResetStub() {
Vijay Paidf8b62c2016-05-02 14:34:24 -0700243 ChannelArguments args;
244 auto channel_creds =
245 GetChannelCredentials(GetParam().credentials_type, &args);
yang-g730055d2015-08-27 12:29:45 -0700246 std::shared_ptr<Channel> channel =
Vijay Paidf8b62c2016-05-02 14:34:24 -0700247 CreateCustomChannel(server_address_.str(), channel_creds, args);
Sree Kuchibhotla5a05f512016-01-13 22:43:20 -0800248 stub_ = grpc::testing::EchoTestService::NewStub(channel);
Craig Tiller0220cf12015-02-12 17:39:26 -0800249 }
250
Yang Gao406b32f2015-02-13 16:25:33 -0800251 void SendRpc(int num_rpcs) {
252 for (int i = 0; i < num_rpcs; i++) {
253 EchoRequest send_request;
254 EchoRequest recv_request;
255 EchoResponse send_response;
256 EchoResponse recv_response;
257 Status recv_status;
258
259 ClientContext cli_ctx;
260 ServerContext srv_ctx;
261 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
262
263 send_request.set_message("Hello");
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800264 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700265 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
Yang Gao406b32f2015-02-13 16:25:33 -0800266
Craig Tillerd6c98df2015-08-18 09:33:44 -0700267 service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
268 cq_.get(), tag(2));
Yang Gao406b32f2015-02-13 16:25:33 -0800269
Vijay Paidf8b62c2016-05-02 14:34:24 -0700270 Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
Yang Gao406b32f2015-02-13 16:25:33 -0800271 EXPECT_EQ(send_request.message(), recv_request.message());
272
273 send_response.set_message(recv_request.message());
274 response_writer.Finish(send_response, Status::OK, tag(3));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700275 Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
Yang Gao406b32f2015-02-13 16:25:33 -0800276
Yang Gao3a5e5492015-02-18 14:32:38 -0800277 response_reader->Finish(&recv_response, &recv_status, tag(4));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700278 Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
Yang Gao406b32f2015-02-13 16:25:33 -0800279
280 EXPECT_EQ(send_response.message(), recv_response.message());
Yang Gaoc1a2c312015-06-16 10:59:46 -0700281 EXPECT_TRUE(recv_status.ok());
Yang Gao406b32f2015-02-13 16:25:33 -0800282 }
283 }
284
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700285 std::unique_ptr<ServerCompletionQueue> cq_;
Sree Kuchibhotla5a05f512016-01-13 22:43:20 -0800286 std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
Craig Tiller0220cf12015-02-12 17:39:26 -0800287 std::unique_ptr<Server> server_;
Sree Kuchibhotla5a05f512016-01-13 22:43:20 -0800288 grpc::testing::EchoTestService::AsyncService service_;
Craig Tiller0220cf12015-02-12 17:39:26 -0800289 std::ostringstream server_address_;
vjpaicf4daeb2016-02-15 02:33:54 -0800290
Vijay Pai018879a2016-02-16 09:20:50 -0800291 std::unique_ptr<PollingOverrider> poll_overrider_;
Craig Tiller0220cf12015-02-12 17:39:26 -0800292};
293
Craig Tiller69f90e62015-08-06 08:32:35 -0700294TEST_P(AsyncEnd2endTest, SimpleRpc) {
Craig Tiller0220cf12015-02-12 17:39:26 -0800295 ResetStub();
Yang Gao406b32f2015-02-13 16:25:33 -0800296 SendRpc(1);
297}
Yang Gaobb84a302015-02-12 23:30:12 -0800298
Craig Tiller69f90e62015-08-06 08:32:35 -0700299TEST_P(AsyncEnd2endTest, SequentialRpcs) {
Yang Gao406b32f2015-02-13 16:25:33 -0800300 ResetStub();
301 SendRpc(10);
Craig Tiller0220cf12015-02-12 17:39:26 -0800302}
303
vjpai7aadf462015-03-16 23:58:44 -0700304// Test a simple RPC using the async version of Next
Craig Tiller69f90e62015-08-06 08:32:35 -0700305TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
vjpai7aadf462015-03-16 23:58:44 -0700306 ResetStub();
307
308 EchoRequest send_request;
309 EchoRequest recv_request;
310 EchoResponse send_response;
311 EchoResponse recv_response;
312 Status recv_status;
313
314 ClientContext cli_ctx;
315 ServerContext srv_ctx;
316 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
317
318 send_request.set_message("Hello");
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800319 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700320 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
vjpai7aadf462015-03-16 23:58:44 -0700321
Yang Gao757afae2015-03-17 15:49:26 -0700322 std::chrono::system_clock::time_point time_now(
Craig Tillerf51199f2015-05-08 09:32:53 -0700323 std::chrono::system_clock::now());
324 std::chrono::system_clock::time_point time_limit(
325 std::chrono::system_clock::now() + std::chrono::seconds(10));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700326 Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now);
327 Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now);
vjpai7aadf462015-03-16 23:58:44 -0700328
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700329 service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
330 cq_.get(), tag(2));
vjpai7aadf462015-03-16 23:58:44 -0700331
Vijay Paidf8b62c2016-05-02 14:34:24 -0700332 Verifier(GetParam().disable_blocking)
333 .Expect(2, true)
334 .Verify(cq_.get(), time_limit);
vjpai7aadf462015-03-16 23:58:44 -0700335 EXPECT_EQ(send_request.message(), recv_request.message());
vjpai7aadf462015-03-16 23:58:44 -0700336
337 send_response.set_message(recv_request.message());
338 response_writer.Finish(send_response, Status::OK, tag(3));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700339 Verifier(GetParam().disable_blocking)
Craig Tiller4c06b822015-08-06 08:41:31 -0700340 .Expect(3, true)
341 .Verify(cq_.get(), std::chrono::system_clock::time_point::max());
vjpai7aadf462015-03-16 23:58:44 -0700342
343 response_reader->Finish(&recv_response, &recv_status, tag(4));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700344 Verifier(GetParam().disable_blocking)
Craig Tiller4c06b822015-08-06 08:41:31 -0700345 .Expect(4, true)
346 .Verify(cq_.get(), std::chrono::system_clock::time_point::max());
vjpai7aadf462015-03-16 23:58:44 -0700347
348 EXPECT_EQ(send_response.message(), recv_response.message());
Yang Gaoc1a2c312015-06-16 10:59:46 -0700349 EXPECT_TRUE(recv_status.ok());
vjpai7aadf462015-03-16 23:58:44 -0700350}
Yang Gao757afae2015-03-17 15:49:26 -0700351
Yang Gao0e0d8e12015-02-13 14:40:41 -0800352// Two pings and a final pong.
Craig Tiller69f90e62015-08-06 08:32:35 -0700353TEST_P(AsyncEnd2endTest, SimpleClientStreaming) {
Yang Gao005f18a2015-02-13 10:22:33 -0800354 ResetStub();
355
356 EchoRequest send_request;
357 EchoRequest recv_request;
358 EchoResponse send_response;
359 EchoResponse recv_response;
360 Status recv_status;
361 ClientContext cli_ctx;
362 ServerContext srv_ctx;
363 ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
364
365 send_request.set_message("Hello");
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800366 std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700367 stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
Yang Gao005f18a2015-02-13 10:22:33 -0800368
Craig Tillerd6c98df2015-08-18 09:33:44 -0700369 service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
370 tag(2));
Yang Gao005f18a2015-02-13 10:22:33 -0800371
Vijay Paidf8b62c2016-05-02 14:34:24 -0700372 Verifier(GetParam().disable_blocking)
373 .Expect(2, true)
374 .Expect(1, true)
375 .Verify(cq_.get());
Yang Gao005f18a2015-02-13 10:22:33 -0800376
377 cli_stream->Write(send_request, tag(3));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700378 Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
Yang Gao005f18a2015-02-13 10:22:33 -0800379
380 srv_stream.Read(&recv_request, tag(4));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700381 Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
Yang Gao005f18a2015-02-13 10:22:33 -0800382 EXPECT_EQ(send_request.message(), recv_request.message());
383
384 cli_stream->Write(send_request, tag(5));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700385 Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get());
Yang Gao005f18a2015-02-13 10:22:33 -0800386
387 srv_stream.Read(&recv_request, tag(6));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700388 Verifier(GetParam().disable_blocking).Expect(6, true).Verify(cq_.get());
Yang Gao005f18a2015-02-13 10:22:33 -0800389
390 EXPECT_EQ(send_request.message(), recv_request.message());
391 cli_stream->WritesDone(tag(7));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700392 Verifier(GetParam().disable_blocking).Expect(7, true).Verify(cq_.get());
Yang Gao005f18a2015-02-13 10:22:33 -0800393
394 srv_stream.Read(&recv_request, tag(8));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700395 Verifier(GetParam().disable_blocking).Expect(8, false).Verify(cq_.get());
Yang Gao005f18a2015-02-13 10:22:33 -0800396
397 send_response.set_message(recv_request.message());
398 srv_stream.Finish(send_response, Status::OK, tag(9));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700399 Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get());
Yang Gao005f18a2015-02-13 10:22:33 -0800400
401 cli_stream->Finish(&recv_status, tag(10));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700402 Verifier(GetParam().disable_blocking).Expect(10, true).Verify(cq_.get());
Yang Gao005f18a2015-02-13 10:22:33 -0800403
404 EXPECT_EQ(send_response.message(), recv_response.message());
Yang Gaoc1a2c312015-06-16 10:59:46 -0700405 EXPECT_TRUE(recv_status.ok());
Yang Gao005f18a2015-02-13 10:22:33 -0800406}
407
Yang Gao0e0d8e12015-02-13 14:40:41 -0800408// One ping, two pongs.
Craig Tiller69f90e62015-08-06 08:32:35 -0700409TEST_P(AsyncEnd2endTest, SimpleServerStreaming) {
Yang Gao0e0d8e12015-02-13 14:40:41 -0800410 ResetStub();
411
412 EchoRequest send_request;
413 EchoRequest recv_request;
414 EchoResponse send_response;
415 EchoResponse recv_response;
416 Status recv_status;
417 ClientContext cli_ctx;
418 ServerContext srv_ctx;
419 ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
420
421 send_request.set_message("Hello");
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800422 std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700423 stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
Yang Gao0e0d8e12015-02-13 14:40:41 -0800424
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700425 service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700426 cq_.get(), cq_.get(), tag(2));
Yang Gao0e0d8e12015-02-13 14:40:41 -0800427
Vijay Paidf8b62c2016-05-02 14:34:24 -0700428 Verifier(GetParam().disable_blocking)
429 .Expect(1, true)
430 .Expect(2, true)
431 .Verify(cq_.get());
Yang Gao0e0d8e12015-02-13 14:40:41 -0800432 EXPECT_EQ(send_request.message(), recv_request.message());
433
434 send_response.set_message(recv_request.message());
435 srv_stream.Write(send_response, tag(3));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700436 Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
Yang Gao0e0d8e12015-02-13 14:40:41 -0800437
438 cli_stream->Read(&recv_response, tag(4));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700439 Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
Yang Gao0e0d8e12015-02-13 14:40:41 -0800440 EXPECT_EQ(send_response.message(), recv_response.message());
441
442 srv_stream.Write(send_response, tag(5));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700443 Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get());
Yang Gao0e0d8e12015-02-13 14:40:41 -0800444
445 cli_stream->Read(&recv_response, tag(6));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700446 Verifier(GetParam().disable_blocking).Expect(6, true).Verify(cq_.get());
Yang Gao0e0d8e12015-02-13 14:40:41 -0800447 EXPECT_EQ(send_response.message(), recv_response.message());
448
449 srv_stream.Finish(Status::OK, tag(7));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700450 Verifier(GetParam().disable_blocking).Expect(7, true).Verify(cq_.get());
Yang Gao0e0d8e12015-02-13 14:40:41 -0800451
452 cli_stream->Read(&recv_response, tag(8));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700453 Verifier(GetParam().disable_blocking).Expect(8, false).Verify(cq_.get());
Yang Gao0e0d8e12015-02-13 14:40:41 -0800454
455 cli_stream->Finish(&recv_status, tag(9));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700456 Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get());
Yang Gao0e0d8e12015-02-13 14:40:41 -0800457
Yang Gaoc1a2c312015-06-16 10:59:46 -0700458 EXPECT_TRUE(recv_status.ok());
Yang Gao0e0d8e12015-02-13 14:40:41 -0800459}
460
461// One ping, one pong.
Craig Tiller69f90e62015-08-06 08:32:35 -0700462TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) {
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800463 ResetStub();
464
465 EchoRequest send_request;
466 EchoRequest recv_request;
467 EchoResponse send_response;
468 EchoResponse recv_response;
469 Status recv_status;
470 ClientContext cli_ctx;
471 ServerContext srv_ctx;
472 ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
473
474 send_request.set_message("Hello");
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800475 std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700476 cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800477
Craig Tillerd6c98df2015-08-18 09:33:44 -0700478 service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
479 tag(2));
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800480
Vijay Paidf8b62c2016-05-02 14:34:24 -0700481 Verifier(GetParam().disable_blocking)
482 .Expect(1, true)
483 .Expect(2, true)
484 .Verify(cq_.get());
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800485
486 cli_stream->Write(send_request, tag(3));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700487 Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800488
489 srv_stream.Read(&recv_request, tag(4));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700490 Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800491 EXPECT_EQ(send_request.message(), recv_request.message());
492
493 send_response.set_message(recv_request.message());
494 srv_stream.Write(send_response, tag(5));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700495 Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get());
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800496
497 cli_stream->Read(&recv_response, tag(6));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700498 Verifier(GetParam().disable_blocking).Expect(6, true).Verify(cq_.get());
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800499 EXPECT_EQ(send_response.message(), recv_response.message());
500
501 cli_stream->WritesDone(tag(7));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700502 Verifier(GetParam().disable_blocking).Expect(7, true).Verify(cq_.get());
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800503
504 srv_stream.Read(&recv_request, tag(8));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700505 Verifier(GetParam().disable_blocking).Expect(8, false).Verify(cq_.get());
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800506
507 srv_stream.Finish(Status::OK, tag(9));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700508 Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get());
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800509
510 cli_stream->Finish(&recv_status, tag(10));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700511 Verifier(GetParam().disable_blocking).Expect(10, true).Verify(cq_.get());
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800512
Yang Gaoc1a2c312015-06-16 10:59:46 -0700513 EXPECT_TRUE(recv_status.ok());
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800514}
515
Yang Gao406b32f2015-02-13 16:25:33 -0800516// Metadata tests
Craig Tiller69f90e62015-08-06 08:32:35 -0700517TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
Yang Gao406b32f2015-02-13 16:25:33 -0800518 ResetStub();
519
520 EchoRequest send_request;
521 EchoRequest recv_request;
522 EchoResponse send_response;
523 EchoResponse recv_response;
524 Status recv_status;
525
526 ClientContext cli_ctx;
527 ServerContext srv_ctx;
528 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
529
530 send_request.set_message("Hello");
531 std::pair<grpc::string, grpc::string> meta1("key1", "val1");
532 std::pair<grpc::string, grpc::string> meta2("key2", "val2");
Craig Tiller6f871642016-02-03 16:15:31 -0800533 std::pair<grpc::string, grpc::string> meta3("g.r.d-bin", "xyz");
Yang Gao406b32f2015-02-13 16:25:33 -0800534 cli_ctx.AddMetadata(meta1.first, meta1.second);
535 cli_ctx.AddMetadata(meta2.first, meta2.second);
Craig Tiller6f871642016-02-03 16:15:31 -0800536 cli_ctx.AddMetadata(meta3.first, meta3.second);
Yang Gao406b32f2015-02-13 16:25:33 -0800537
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800538 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700539 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
Yang Gao406b32f2015-02-13 16:25:33 -0800540
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700541 service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
542 cq_.get(), tag(2));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700543 Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
Yang Gao406b32f2015-02-13 16:25:33 -0800544 EXPECT_EQ(send_request.message(), recv_request.message());
545 auto client_initial_metadata = srv_ctx.client_metadata();
yang-ge21908f2015-08-25 13:47:51 -0700546 EXPECT_EQ(meta1.second,
547 ToString(client_initial_metadata.find(meta1.first)->second));
548 EXPECT_EQ(meta2.second,
549 ToString(client_initial_metadata.find(meta2.first)->second));
Craig Tiller6f871642016-02-03 16:15:31 -0800550 EXPECT_EQ(meta3.second,
551 ToString(client_initial_metadata.find(meta3.first)->second));
Craig Tiller8bf2dca2015-07-10 13:08:41 -0700552 EXPECT_GE(client_initial_metadata.size(), static_cast<size_t>(2));
Yang Gao406b32f2015-02-13 16:25:33 -0800553
554 send_response.set_message(recv_request.message());
555 response_writer.Finish(send_response, Status::OK, tag(3));
556
Vijay Paidf8b62c2016-05-02 14:34:24 -0700557 Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
Yang Gao406b32f2015-02-13 16:25:33 -0800558
Yang Gao3a5e5492015-02-18 14:32:38 -0800559 response_reader->Finish(&recv_response, &recv_status, tag(4));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700560 Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
Yang Gao406b32f2015-02-13 16:25:33 -0800561
562 EXPECT_EQ(send_response.message(), recv_response.message());
Yang Gaoc1a2c312015-06-16 10:59:46 -0700563 EXPECT_TRUE(recv_status.ok());
Yang Gao406b32f2015-02-13 16:25:33 -0800564}
565
Craig Tiller69f90e62015-08-06 08:32:35 -0700566TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) {
Yang Gao2b7f5372015-02-18 00:45:53 -0800567 ResetStub();
568
569 EchoRequest send_request;
570 EchoRequest recv_request;
571 EchoResponse send_response;
572 EchoResponse recv_response;
573 Status recv_status;
574
575 ClientContext cli_ctx;
576 ServerContext srv_ctx;
577 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
578
579 send_request.set_message("Hello");
580 std::pair<grpc::string, grpc::string> meta1("key1", "val1");
581 std::pair<grpc::string, grpc::string> meta2("key2", "val2");
582
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800583 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700584 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
Yang Gao2b7f5372015-02-18 00:45:53 -0800585
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700586 service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
587 cq_.get(), tag(2));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700588 Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
Yang Gao2b7f5372015-02-18 00:45:53 -0800589 EXPECT_EQ(send_request.message(), recv_request.message());
590 srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
591 srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
592 response_writer.SendInitialMetadata(tag(3));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700593 Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
Yang Gao2b7f5372015-02-18 00:45:53 -0800594
Yang Gao3a5e5492015-02-18 14:32:38 -0800595 response_reader->ReadInitialMetadata(tag(4));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700596 Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
Yang Gao2b7f5372015-02-18 00:45:53 -0800597 auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
yang-ge21908f2015-08-25 13:47:51 -0700598 EXPECT_EQ(meta1.second,
599 ToString(server_initial_metadata.find(meta1.first)->second));
600 EXPECT_EQ(meta2.second,
601 ToString(server_initial_metadata.find(meta2.first)->second));
vjpaid5577aa2015-02-18 22:26:48 -0800602 EXPECT_EQ(static_cast<size_t>(2), server_initial_metadata.size());
Yang Gao3a5e5492015-02-18 14:32:38 -0800603
604 send_response.set_message(recv_request.message());
605 response_writer.Finish(send_response, Status::OK, tag(5));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700606 Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get());
Yang Gao3a5e5492015-02-18 14:32:38 -0800607
608 response_reader->Finish(&recv_response, &recv_status, tag(6));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700609 Verifier(GetParam().disable_blocking).Expect(6, true).Verify(cq_.get());
Yang Gao3a5e5492015-02-18 14:32:38 -0800610
611 EXPECT_EQ(send_response.message(), recv_response.message());
Yang Gaoc1a2c312015-06-16 10:59:46 -0700612 EXPECT_TRUE(recv_status.ok());
Yang Gao2b7f5372015-02-18 00:45:53 -0800613}
614
Craig Tiller69f90e62015-08-06 08:32:35 -0700615TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
Yang Gao2b7f5372015-02-18 00:45:53 -0800616 ResetStub();
617
618 EchoRequest send_request;
619 EchoRequest recv_request;
620 EchoResponse send_response;
621 EchoResponse recv_response;
622 Status recv_status;
623
624 ClientContext cli_ctx;
625 ServerContext srv_ctx;
626 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
627
628 send_request.set_message("Hello");
629 std::pair<grpc::string, grpc::string> meta1("key1", "val1");
630 std::pair<grpc::string, grpc::string> meta2("key2", "val2");
631
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800632 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700633 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
Yang Gao2b7f5372015-02-18 00:45:53 -0800634
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700635 service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
636 cq_.get(), tag(2));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700637 Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
Yang Gao2b7f5372015-02-18 00:45:53 -0800638 EXPECT_EQ(send_request.message(), recv_request.message());
639 response_writer.SendInitialMetadata(tag(3));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700640 Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
Yang Gao2b7f5372015-02-18 00:45:53 -0800641
642 send_response.set_message(recv_request.message());
643 srv_ctx.AddTrailingMetadata(meta1.first, meta1.second);
644 srv_ctx.AddTrailingMetadata(meta2.first, meta2.second);
645 response_writer.Finish(send_response, Status::OK, tag(4));
646
Vijay Paidf8b62c2016-05-02 14:34:24 -0700647 Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
Yang Gao2b7f5372015-02-18 00:45:53 -0800648
Yang Gao3a5e5492015-02-18 14:32:38 -0800649 response_reader->Finish(&recv_response, &recv_status, tag(5));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700650 Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get());
Yang Gao2b7f5372015-02-18 00:45:53 -0800651 EXPECT_EQ(send_response.message(), recv_response.message());
Yang Gaoc1a2c312015-06-16 10:59:46 -0700652 EXPECT_TRUE(recv_status.ok());
Yang Gao2b7f5372015-02-18 00:45:53 -0800653 auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
yang-ge21908f2015-08-25 13:47:51 -0700654 EXPECT_EQ(meta1.second,
655 ToString(server_trailing_metadata.find(meta1.first)->second));
656 EXPECT_EQ(meta2.second,
657 ToString(server_trailing_metadata.find(meta2.first)->second));
vjpaid5577aa2015-02-18 22:26:48 -0800658 EXPECT_EQ(static_cast<size_t>(2), server_trailing_metadata.size());
Yang Gao2b7f5372015-02-18 00:45:53 -0800659}
660
Craig Tiller69f90e62015-08-06 08:32:35 -0700661TEST_P(AsyncEnd2endTest, MetadataRpc) {
Yang Gao2b7f5372015-02-18 00:45:53 -0800662 ResetStub();
663
664 EchoRequest send_request;
665 EchoRequest recv_request;
666 EchoResponse send_response;
667 EchoResponse recv_response;
668 Status recv_status;
669
670 ClientContext cli_ctx;
671 ServerContext srv_ctx;
672 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
673
674 send_request.set_message("Hello");
675 std::pair<grpc::string, grpc::string> meta1("key1", "val1");
Yang Gao3a5e5492015-02-18 14:32:38 -0800676 std::pair<grpc::string, grpc::string> meta2(
Vijay Pai92a928f2015-03-26 16:30:22 -0400677 "key2-bin",
Craig Tillerd6c98df2015-08-18 09:33:44 -0700678 grpc::string("\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc", 13));
Yang Gao2b7f5372015-02-18 00:45:53 -0800679 std::pair<grpc::string, grpc::string> meta3("key3", "val3");
Craig Tiller47c83fd2015-02-21 22:45:35 -0800680 std::pair<grpc::string, grpc::string> meta6(
681 "key4-bin",
Vijay Pai92a928f2015-03-26 16:30:22 -0400682 grpc::string("\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d",
Craig Tillerd6c98df2015-08-18 09:33:44 -0700683 14));
Yang Gao2b7f5372015-02-18 00:45:53 -0800684 std::pair<grpc::string, grpc::string> meta5("key5", "val5");
Craig Tiller47c83fd2015-02-21 22:45:35 -0800685 std::pair<grpc::string, grpc::string> meta4(
686 "key6-bin",
Craig Tillerd6c98df2015-08-18 09:33:44 -0700687 grpc::string(
688 "\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee", 15));
Yang Gao2b7f5372015-02-18 00:45:53 -0800689
690 cli_ctx.AddMetadata(meta1.first, meta1.second);
691 cli_ctx.AddMetadata(meta2.first, meta2.second);
692
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800693 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700694 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
Yang Gao2b7f5372015-02-18 00:45:53 -0800695
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700696 service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
697 cq_.get(), tag(2));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700698 Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
Yang Gao2b7f5372015-02-18 00:45:53 -0800699 EXPECT_EQ(send_request.message(), recv_request.message());
700 auto client_initial_metadata = srv_ctx.client_metadata();
yang-ge21908f2015-08-25 13:47:51 -0700701 EXPECT_EQ(meta1.second,
702 ToString(client_initial_metadata.find(meta1.first)->second));
703 EXPECT_EQ(meta2.second,
704 ToString(client_initial_metadata.find(meta2.first)->second));
Craig Tiller8bf2dca2015-07-10 13:08:41 -0700705 EXPECT_GE(client_initial_metadata.size(), static_cast<size_t>(2));
Yang Gao2b7f5372015-02-18 00:45:53 -0800706
707 srv_ctx.AddInitialMetadata(meta3.first, meta3.second);
708 srv_ctx.AddInitialMetadata(meta4.first, meta4.second);
709 response_writer.SendInitialMetadata(tag(3));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700710 Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
Yang Gao3a5e5492015-02-18 14:32:38 -0800711 response_reader->ReadInitialMetadata(tag(4));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700712 Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
Yang Gao2b7f5372015-02-18 00:45:53 -0800713 auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
yang-ge21908f2015-08-25 13:47:51 -0700714 EXPECT_EQ(meta3.second,
715 ToString(server_initial_metadata.find(meta3.first)->second));
716 EXPECT_EQ(meta4.second,
717 ToString(server_initial_metadata.find(meta4.first)->second));
Craig Tiller8bf2dca2015-07-10 13:08:41 -0700718 EXPECT_GE(server_initial_metadata.size(), static_cast<size_t>(2));
Yang Gao3a5e5492015-02-18 14:32:38 -0800719
720 send_response.set_message(recv_request.message());
721 srv_ctx.AddTrailingMetadata(meta5.first, meta5.second);
722 srv_ctx.AddTrailingMetadata(meta6.first, meta6.second);
723 response_writer.Finish(send_response, Status::OK, tag(5));
724
Vijay Paidf8b62c2016-05-02 14:34:24 -0700725 Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get());
Yang Gao3a5e5492015-02-18 14:32:38 -0800726
Yang Gao3a5e5492015-02-18 14:32:38 -0800727 response_reader->Finish(&recv_response, &recv_status, tag(6));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700728 Verifier(GetParam().disable_blocking).Expect(6, true).Verify(cq_.get());
Yang Gao3a5e5492015-02-18 14:32:38 -0800729 EXPECT_EQ(send_response.message(), recv_response.message());
Yang Gaoc1a2c312015-06-16 10:59:46 -0700730 EXPECT_TRUE(recv_status.ok());
Yang Gao2b7f5372015-02-18 00:45:53 -0800731 auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
yang-ge21908f2015-08-25 13:47:51 -0700732 EXPECT_EQ(meta5.second,
733 ToString(server_trailing_metadata.find(meta5.first)->second));
734 EXPECT_EQ(meta6.second,
735 ToString(server_trailing_metadata.find(meta6.first)->second));
Craig Tiller8bf2dca2015-07-10 13:08:41 -0700736 EXPECT_GE(server_trailing_metadata.size(), static_cast<size_t>(2));
Yang Gao2b7f5372015-02-18 00:45:53 -0800737}
yang-gb3352562015-08-04 14:42:06 -0700738
739// Server uses AsyncNotifyWhenDone API to check for cancellation
Craig Tiller69f90e62015-08-06 08:32:35 -0700740TEST_P(AsyncEnd2endTest, ServerCheckCancellation) {
yang-gb3352562015-08-04 14:42:06 -0700741 ResetStub();
742
743 EchoRequest send_request;
744 EchoRequest recv_request;
745 EchoResponse send_response;
746 EchoResponse recv_response;
747 Status recv_status;
748
749 ClientContext cli_ctx;
750 ServerContext srv_ctx;
751 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
752
753 send_request.set_message("Hello");
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800754 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
yang-gb3352562015-08-04 14:42:06 -0700755 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
756
757 srv_ctx.AsyncNotifyWhenDone(tag(5));
758 service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
759 cq_.get(), tag(2));
760
Vijay Paidf8b62c2016-05-02 14:34:24 -0700761 Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
yang-gb3352562015-08-04 14:42:06 -0700762 EXPECT_EQ(send_request.message(), recv_request.message());
763
764 cli_ctx.TryCancel();
Vijay Paidf8b62c2016-05-02 14:34:24 -0700765 Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get());
yang-gb3352562015-08-04 14:42:06 -0700766 EXPECT_TRUE(srv_ctx.IsCancelled());
767
768 response_reader->Finish(&recv_response, &recv_status, tag(4));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700769 Verifier(GetParam().disable_blocking).Expect(4, false).Verify(cq_.get());
yang-gb3352562015-08-04 14:42:06 -0700770
771 EXPECT_EQ(StatusCode::CANCELLED, recv_status.error_code());
772}
773
774// Server uses AsyncNotifyWhenDone API to check for normal finish
Craig Tiller69f90e62015-08-06 08:32:35 -0700775TEST_P(AsyncEnd2endTest, ServerCheckDone) {
yang-gb3352562015-08-04 14:42:06 -0700776 ResetStub();
777
778 EchoRequest send_request;
779 EchoRequest recv_request;
780 EchoResponse send_response;
781 EchoResponse recv_response;
782 Status recv_status;
783
784 ClientContext cli_ctx;
785 ServerContext srv_ctx;
786 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
787
788 send_request.set_message("Hello");
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800789 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
yang-gb3352562015-08-04 14:42:06 -0700790 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
791
792 srv_ctx.AsyncNotifyWhenDone(tag(5));
793 service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
794 cq_.get(), tag(2));
795
Vijay Paidf8b62c2016-05-02 14:34:24 -0700796 Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
yang-gb3352562015-08-04 14:42:06 -0700797 EXPECT_EQ(send_request.message(), recv_request.message());
798
799 send_response.set_message(recv_request.message());
800 response_writer.Finish(send_response, Status::OK, tag(3));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700801 Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
802 Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get());
yang-gb3352562015-08-04 14:42:06 -0700803 EXPECT_FALSE(srv_ctx.IsCancelled());
804
805 response_reader->Finish(&recv_response, &recv_status, tag(4));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700806 Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
yang-gb3352562015-08-04 14:42:06 -0700807
808 EXPECT_EQ(send_response.message(), recv_response.message());
809 EXPECT_TRUE(recv_status.ok());
810}
811
Craig Tiller8f7bff72015-08-17 13:23:14 -0700812TEST_P(AsyncEnd2endTest, UnimplementedRpc) {
Vijay Paidf8b62c2016-05-02 14:34:24 -0700813 ChannelArguments args;
814 auto channel_creds =
815 GetChannelCredentials(GetParam().credentials_type, &args);
yang-g730055d2015-08-27 12:29:45 -0700816 std::shared_ptr<Channel> channel =
Vijay Paidf8b62c2016-05-02 14:34:24 -0700817 CreateCustomChannel(server_address_.str(), channel_creds, args);
Craig Tiller1b4e3302015-12-17 16:35:00 -0800818 std::unique_ptr<grpc::testing::UnimplementedService::Stub> stub;
819 stub = grpc::testing::UnimplementedService::NewStub(channel);
yang-g9b7757d2015-08-13 11:15:53 -0700820 EchoRequest send_request;
821 EchoResponse recv_response;
822 Status recv_status;
823
824 ClientContext cli_ctx;
825 send_request.set_message("Hello");
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800826 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
yang-g9b7757d2015-08-13 11:15:53 -0700827 stub->AsyncUnimplemented(&cli_ctx, send_request, cq_.get()));
828
829 response_reader->Finish(&recv_response, &recv_status, tag(4));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700830 Verifier(GetParam().disable_blocking).Expect(4, false).Verify(cq_.get());
yang-g9b7757d2015-08-13 11:15:53 -0700831
832 EXPECT_EQ(StatusCode::UNIMPLEMENTED, recv_status.error_code());
833 EXPECT_EQ("", recv_status.error_message());
834}
835
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -0800836// This class is for testing scenarios where RPCs are cancelled on the server
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800837// by calling ServerContext::TryCancel(). Server uses AsyncNotifyWhenDone
838// API to check for cancellation
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800839class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
840 protected:
841 typedef enum {
842 DO_NOT_CANCEL = 0,
843 CANCEL_BEFORE_PROCESSING,
844 CANCEL_DURING_PROCESSING,
845 CANCEL_AFTER_PROCESSING
846 } ServerTryCancelRequestPhase;
847
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -0800848 // Helper for testing client-streaming RPCs which are cancelled on the server.
849 // Depending on the value of server_try_cancel parameter, this will test one
850 // of the following three scenarios:
851 // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading
852 // any messages from the client
853 //
854 // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading
855 // messages from the client
856 //
857 // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all
858 // messages from the client (but before sending any status back to the
859 // client)
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800860 void TestClientStreamingServerCancel(
861 ServerTryCancelRequestPhase server_try_cancel) {
862 ResetStub();
863
864 EchoRequest send_request;
865 EchoRequest recv_request;
866 EchoResponse send_response;
867 EchoResponse recv_response;
868 Status recv_status;
869
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800870 ClientContext cli_ctx;
871 ServerContext srv_ctx;
872 ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
873
874 // Initiate the 'RequestStream' call on client
875 std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
Sree Kuchibhotla4fb59082016-01-29 11:16:24 -0800876 stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700877 Verifier(GetParam().disable_blocking).Expect(1, true).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800878
879 // On the server, request to be notified of 'RequestStream' calls
880 // and receive the 'RequestStream' call just made by the client
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800881 srv_ctx.AsyncNotifyWhenDone(tag(11));
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800882 service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
883 tag(2));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700884 Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800885
886 // Client sends 3 messages (tags 3, 4 and 5)
887 for (int tag_idx = 3; tag_idx <= 5; tag_idx++) {
888 send_request.set_message("Ping " + std::to_string(tag_idx));
889 cli_stream->Write(send_request, tag(tag_idx));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700890 Verifier(GetParam().disable_blocking)
891 .Expect(tag_idx, true)
892 .Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800893 }
894 cli_stream->WritesDone(tag(6));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700895 Verifier(GetParam().disable_blocking).Expect(6, true).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800896
897 bool expected_server_cq_result = true;
898 bool ignore_cq_result = false;
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800899 bool want_done_tag = false;
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800900
901 if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800902 srv_ctx.TryCancel();
Vijay Paidf8b62c2016-05-02 14:34:24 -0700903 Verifier(GetParam().disable_blocking).Expect(11, true).Verify(cq_.get());
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800904 EXPECT_TRUE(srv_ctx.IsCancelled());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800905
906 // Since cancellation is done before server reads any results, we know
907 // for sure that all cq results will return false from this point forward
908 expected_server_cq_result = false;
909 }
910
911 std::thread* server_try_cancel_thd = NULL;
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800912
Vijay Paidf8b62c2016-05-02 14:34:24 -0700913 auto verif = Verifier(GetParam().disable_blocking);
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800914
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800915 if (server_try_cancel == CANCEL_DURING_PROCESSING) {
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800916 server_try_cancel_thd =
917 new std::thread(&ServerContext::TryCancel, &srv_ctx);
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800918 // Server will cancel the RPC in a parallel thread while reading the
919 // requests from the client. Since the cancellation can happen at anytime,
920 // some of the cq results (i.e those until cancellation) might be true but
921 // its non deterministic. So better to ignore the cq results
922 ignore_cq_result = true;
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800923 // Expect that we might possibly see the done tag that
924 // indicates cancellation completion in this case
925 want_done_tag = true;
926 verif.Expect(11, true);
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800927 }
928
929 // Server reads 3 messages (tags 6, 7 and 8)
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800930 // But if want_done_tag is true, we might also see tag 11
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800931 for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
932 srv_stream.Read(&recv_request, tag(tag_idx));
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800933 // Note that we'll add something to the verifier and verify that
934 // something was seen, but it might be tag 11 and not what we
935 // just added
936 int got_tag = verif.Expect(tag_idx, expected_server_cq_result)
937 .Next(cq_.get(), ignore_cq_result);
938 GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag));
939 if (got_tag == 11) {
940 EXPECT_TRUE(srv_ctx.IsCancelled());
941 want_done_tag = false;
942 // Now get the other entry that we were waiting on
943 EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), tag_idx);
944 }
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800945 }
946
947 if (server_try_cancel_thd != NULL) {
948 server_try_cancel_thd->join();
949 delete server_try_cancel_thd;
950 }
951
952 if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800953 srv_ctx.TryCancel();
954 want_done_tag = true;
955 verif.Expect(11, true);
956 }
957
958 if (want_done_tag) {
959 verif.Verify(cq_.get());
960 EXPECT_TRUE(srv_ctx.IsCancelled());
961 want_done_tag = false;
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800962 }
963
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -0800964 // The RPC has been cancelled at this point for sure (i.e irrespective of
965 // the value of `server_try_cancel` is). So, from this point forward, we
966 // know that cq results are supposed to return false on server.
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800967
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -0800968 // Server sends the final message and cancelled status (but the RPC is
969 // already cancelled at this point. So we expect the operation to fail)
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800970 srv_stream.Finish(send_response, Status::CANCELLED, tag(9));
Vijay Paidf8b62c2016-05-02 14:34:24 -0700971 Verifier(GetParam().disable_blocking).Expect(9, false).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800972
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -0800973 // Client will see the cancellation
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800974 cli_stream->Finish(&recv_status, tag(10));
Sree Kuchibhotla369a04a2016-02-01 10:53:13 -0800975 // TODO(sreek): The expectation here should be true. This is a bug (github
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -0800976 // issue #4972)
Vijay Paidf8b62c2016-05-02 14:34:24 -0700977 Verifier(GetParam().disable_blocking).Expect(10, false).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800978 EXPECT_FALSE(recv_status.ok());
979 EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code());
980 }
981
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -0800982 // Helper for testing server-streaming RPCs which are cancelled on the server.
983 // Depending on the value of server_try_cancel parameter, this will test one
984 // of the following three scenarios:
985 // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before sending
986 // any messages to the client
987 //
988 // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while sending
989 // messages to the client
990 //
991 // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after sending all
992 // messages to the client (but before sending any status back to the
993 // client)
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800994 void TestServerStreamingServerCancel(
995 ServerTryCancelRequestPhase server_try_cancel) {
996 ResetStub();
997
998 EchoRequest send_request;
999 EchoRequest recv_request;
1000 EchoResponse send_response;
1001 EchoResponse recv_response;
1002 Status recv_status;
1003 ClientContext cli_ctx;
1004 ServerContext srv_ctx;
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001005 ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
1006
1007 send_request.set_message("Ping");
1008 // Initiate the 'ResponseStream' call on the client
1009 std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
Sree Kuchibhotla4fb59082016-01-29 11:16:24 -08001010 stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
Vijay Paidf8b62c2016-05-02 14:34:24 -07001011 Verifier(GetParam().disable_blocking).Expect(1, true).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001012 // On the server, request to be notified of 'ResponseStream' calls and
1013 // receive the call just made by the client
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001014 srv_ctx.AsyncNotifyWhenDone(tag(11));
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001015 service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
1016 cq_.get(), cq_.get(), tag(2));
Vijay Paidf8b62c2016-05-02 14:34:24 -07001017 Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001018 EXPECT_EQ(send_request.message(), recv_request.message());
1019
1020 bool expected_cq_result = true;
1021 bool ignore_cq_result = false;
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001022 bool want_done_tag = false;
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001023
1024 if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001025 srv_ctx.TryCancel();
Vijay Paidf8b62c2016-05-02 14:34:24 -07001026 Verifier(GetParam().disable_blocking).Expect(11, true).Verify(cq_.get());
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001027 EXPECT_TRUE(srv_ctx.IsCancelled());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001028
1029 // We know for sure that all cq results will be false from this point
1030 // since the server cancelled the RPC
1031 expected_cq_result = false;
1032 }
1033
1034 std::thread* server_try_cancel_thd = NULL;
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001035
Vijay Paidf8b62c2016-05-02 14:34:24 -07001036 auto verif = Verifier(GetParam().disable_blocking);
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001037
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001038 if (server_try_cancel == CANCEL_DURING_PROCESSING) {
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001039 server_try_cancel_thd =
1040 new std::thread(&ServerContext::TryCancel, &srv_ctx);
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001041
1042 // Server will cancel the RPC in a parallel thread while writing responses
1043 // to the client. Since the cancellation can happen at anytime, some of
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001044 // the cq results (i.e those until cancellation) might be true but it is
1045 // non deterministic. So better to ignore the cq results
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001046 ignore_cq_result = true;
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001047 // Expect that we might possibly see the done tag that
1048 // indicates cancellation completion in this case
1049 want_done_tag = true;
1050 verif.Expect(11, true);
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001051 }
1052
1053 // Server sends three messages (tags 3, 4 and 5)
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001054 // But if want_done tag is true, we might also see tag 11
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001055 for (int tag_idx = 3; tag_idx <= 5; tag_idx++) {
1056 send_response.set_message("Pong " + std::to_string(tag_idx));
1057 srv_stream.Write(send_response, tag(tag_idx));
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001058 // Note that we'll add something to the verifier and verify that
1059 // something was seen, but it might be tag 11 and not what we
1060 // just added
1061 int got_tag = verif.Expect(tag_idx, expected_cq_result)
1062 .Next(cq_.get(), ignore_cq_result);
1063 GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag));
1064 if (got_tag == 11) {
1065 EXPECT_TRUE(srv_ctx.IsCancelled());
1066 want_done_tag = false;
1067 // Now get the other entry that we were waiting on
1068 EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), tag_idx);
1069 }
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001070 }
1071
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001072 if (server_try_cancel_thd != NULL) {
1073 server_try_cancel_thd->join();
1074 delete server_try_cancel_thd;
1075 }
1076
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001077 if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001078 srv_ctx.TryCancel();
1079 want_done_tag = true;
1080 verif.Expect(11, true);
yang-gad0df7b2016-02-22 10:00:20 -08001081
1082 // Client reads may fail bacause it is notified that the stream is
1083 // cancelled.
1084 ignore_cq_result = true;
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001085 }
1086
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001087 if (want_done_tag) {
1088 verif.Verify(cq_.get());
1089 EXPECT_TRUE(srv_ctx.IsCancelled());
1090 want_done_tag = false;
1091 }
1092
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001093 // Client attemts to read the three messages from the server
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001094 for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
1095 cli_stream->Read(&recv_response, tag(tag_idx));
Vijay Paidf8b62c2016-05-02 14:34:24 -07001096 Verifier(GetParam().disable_blocking)
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001097 .Expect(tag_idx, expected_cq_result)
Sree Kuchibhotla4fb59082016-01-29 11:16:24 -08001098 .Verify(cq_.get(), ignore_cq_result);
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001099 }
1100
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001101 // The RPC has been cancelled at this point for sure (i.e irrespective of
1102 // the value of `server_try_cancel` is). So, from this point forward, we
1103 // know that cq results are supposed to return false on server.
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001104
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001105 // Server finishes the stream (but the RPC is already cancelled)
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001106 srv_stream.Finish(Status::CANCELLED, tag(9));
Vijay Paidf8b62c2016-05-02 14:34:24 -07001107 Verifier(GetParam().disable_blocking).Expect(9, false).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001108
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001109 // Client will see the cancellation
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001110 cli_stream->Finish(&recv_status, tag(10));
Vijay Paidf8b62c2016-05-02 14:34:24 -07001111 Verifier(GetParam().disable_blocking).Expect(10, true).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001112 EXPECT_FALSE(recv_status.ok());
1113 EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code());
1114 }
1115
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001116 // Helper for testing bidirectinal-streaming RPCs which are cancelled on the
1117 // server.
1118 //
1119 // Depending on the value of server_try_cancel parameter, this will
1120 // test one of the following three scenarios:
1121 // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading/
1122 // writing any messages from/to the client
1123 //
1124 // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading
1125 // messages from the client
1126 //
1127 // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all
1128 // messages from the client (but before sending any status back to the
1129 // client)
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001130 void TestBidiStreamingServerCancel(
1131 ServerTryCancelRequestPhase server_try_cancel) {
1132 ResetStub();
1133
1134 EchoRequest send_request;
1135 EchoRequest recv_request;
1136 EchoResponse send_response;
1137 EchoResponse recv_response;
1138 Status recv_status;
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001139 ClientContext cli_ctx;
1140 ServerContext srv_ctx;
1141 ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
1142
1143 // Initiate the call from the client side
1144 std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
Sree Kuchibhotla4fb59082016-01-29 11:16:24 -08001145 cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
Vijay Paidf8b62c2016-05-02 14:34:24 -07001146 Verifier(GetParam().disable_blocking).Expect(1, true).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001147
1148 // On the server, request to be notified of the 'BidiStream' call and
1149 // receive the call just made by the client
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001150 srv_ctx.AsyncNotifyWhenDone(tag(11));
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001151 service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
1152 tag(2));
Vijay Paidf8b62c2016-05-02 14:34:24 -07001153 Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001154
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001155 // Client sends the first and the only message
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001156 send_request.set_message("Ping");
1157 cli_stream->Write(send_request, tag(3));
Vijay Paidf8b62c2016-05-02 14:34:24 -07001158 Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001159
1160 bool expected_cq_result = true;
1161 bool ignore_cq_result = false;
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001162 bool want_done_tag = false;
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001163
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001164 if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001165 srv_ctx.TryCancel();
Vijay Paidf8b62c2016-05-02 14:34:24 -07001166 Verifier(GetParam().disable_blocking).Expect(11, true).Verify(cq_.get());
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001167 EXPECT_TRUE(srv_ctx.IsCancelled());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001168
1169 // We know for sure that all cq results will be false from this point
1170 // since the server cancelled the RPC
1171 expected_cq_result = false;
1172 }
1173
1174 std::thread* server_try_cancel_thd = NULL;
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001175
Vijay Paidf8b62c2016-05-02 14:34:24 -07001176 auto verif = Verifier(GetParam().disable_blocking);
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001177
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001178 if (server_try_cancel == CANCEL_DURING_PROCESSING) {
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001179 server_try_cancel_thd =
1180 new std::thread(&ServerContext::TryCancel, &srv_ctx);
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001181
1182 // Since server is going to cancel the RPC in a parallel thread, some of
1183 // the cq results (i.e those until the cancellation) might be true. Since
1184 // that number is non-deterministic, it is better to ignore the cq results
1185 ignore_cq_result = true;
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001186 // Expect that we might possibly see the done tag that
1187 // indicates cancellation completion in this case
1188 want_done_tag = true;
1189 verif.Expect(11, true);
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001190 }
1191
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001192 int got_tag;
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001193 srv_stream.Read(&recv_request, tag(4));
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001194 verif.Expect(4, expected_cq_result);
1195 got_tag = verif.Next(cq_.get(), ignore_cq_result);
1196 GPR_ASSERT((got_tag == 4) || (got_tag == 11 && want_done_tag));
1197 if (got_tag == 11) {
1198 EXPECT_TRUE(srv_ctx.IsCancelled());
1199 want_done_tag = false;
1200 // Now get the other entry that we were waiting on
1201 EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 4);
1202 }
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001203
1204 send_response.set_message("Pong");
1205 srv_stream.Write(send_response, tag(5));
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001206 verif.Expect(5, expected_cq_result);
1207 got_tag = verif.Next(cq_.get(), ignore_cq_result);
1208 GPR_ASSERT((got_tag == 5) || (got_tag == 11 && want_done_tag));
1209 if (got_tag == 11) {
1210 EXPECT_TRUE(srv_ctx.IsCancelled());
1211 want_done_tag = false;
1212 // Now get the other entry that we were waiting on
1213 EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 5);
1214 }
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001215
1216 cli_stream->Read(&recv_response, tag(6));
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001217 verif.Expect(6, expected_cq_result);
1218 got_tag = verif.Next(cq_.get(), ignore_cq_result);
1219 GPR_ASSERT((got_tag == 6) || (got_tag == 11 && want_done_tag));
1220 if (got_tag == 11) {
1221 EXPECT_TRUE(srv_ctx.IsCancelled());
1222 want_done_tag = false;
1223 // Now get the other entry that we were waiting on
1224 EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 6);
1225 }
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001226
1227 // This is expected to succeed in all cases
1228 cli_stream->WritesDone(tag(7));
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001229 verif.Expect(7, true);
1230 got_tag = verif.Next(cq_.get(), ignore_cq_result);
1231 GPR_ASSERT((got_tag == 7) || (got_tag == 11 && want_done_tag));
1232 if (got_tag == 11) {
1233 EXPECT_TRUE(srv_ctx.IsCancelled());
1234 want_done_tag = false;
1235 // Now get the other entry that we were waiting on
1236 EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 7);
1237 }
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001238
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001239 // This is expected to fail in all cases i.e for all values of
Vijay Pai018879a2016-02-16 09:20:50 -08001240 // server_try_cancel. This is because at this point, either there are no
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001241 // more msgs from the client (because client called WritesDone) or the RPC
1242 // is cancelled on the server
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001243 srv_stream.Read(&recv_request, tag(8));
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001244 verif.Expect(8, false);
1245 got_tag = verif.Next(cq_.get(), ignore_cq_result);
1246 GPR_ASSERT((got_tag == 8) || (got_tag == 11 && want_done_tag));
1247 if (got_tag == 11) {
1248 EXPECT_TRUE(srv_ctx.IsCancelled());
1249 want_done_tag = false;
1250 // Now get the other entry that we were waiting on
1251 EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 8);
1252 }
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001253
1254 if (server_try_cancel_thd != NULL) {
1255 server_try_cancel_thd->join();
1256 delete server_try_cancel_thd;
1257 }
1258
1259 if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001260 srv_ctx.TryCancel();
1261 want_done_tag = true;
1262 verif.Expect(11, true);
1263 }
1264
1265 if (want_done_tag) {
1266 verif.Verify(cq_.get());
1267 EXPECT_TRUE(srv_ctx.IsCancelled());
1268 want_done_tag = false;
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001269 }
1270
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001271 // The RPC has been cancelled at this point for sure (i.e irrespective of
1272 // the value of `server_try_cancel` is). So, from this point forward, we
1273 // know that cq results are supposed to return false on server.
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001274
1275 srv_stream.Finish(Status::CANCELLED, tag(9));
Vijay Paidf8b62c2016-05-02 14:34:24 -07001276 Verifier(GetParam().disable_blocking).Expect(9, false).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001277
1278 cli_stream->Finish(&recv_status, tag(10));
Vijay Paidf8b62c2016-05-02 14:34:24 -07001279 Verifier(GetParam().disable_blocking).Expect(10, true).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001280 EXPECT_FALSE(recv_status.ok());
1281 EXPECT_EQ(grpc::StatusCode::CANCELLED, recv_status.error_code());
1282 }
1283};
1284
1285TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelBefore) {
1286 TestClientStreamingServerCancel(CANCEL_BEFORE_PROCESSING);
1287}
1288
1289TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelDuring) {
1290 TestClientStreamingServerCancel(CANCEL_DURING_PROCESSING);
1291}
1292
1293TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelAfter) {
1294 TestClientStreamingServerCancel(CANCEL_AFTER_PROCESSING);
1295}
1296
1297TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelBefore) {
1298 TestServerStreamingServerCancel(CANCEL_BEFORE_PROCESSING);
1299}
1300
1301TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelDuring) {
1302 TestServerStreamingServerCancel(CANCEL_DURING_PROCESSING);
1303}
1304
1305TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelAfter) {
1306 TestServerStreamingServerCancel(CANCEL_AFTER_PROCESSING);
1307}
1308
1309TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelBefore) {
1310 TestBidiStreamingServerCancel(CANCEL_BEFORE_PROCESSING);
1311}
1312
1313TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelDuring) {
1314 TestBidiStreamingServerCancel(CANCEL_DURING_PROCESSING);
1315}
1316
1317TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelAfter) {
1318 TestBidiStreamingServerCancel(CANCEL_AFTER_PROCESSING);
1319}
1320
Vijay Paidf8b62c2016-05-02 14:34:24 -07001321std::vector<TestScenario> CreateTestScenarios(bool test_disable_blocking,
1322 bool test_secure) {
1323 std::vector<TestScenario> scenarios;
1324 std::vector<grpc::string> credentials_types;
1325 if (test_secure) {
1326 credentials_types = GetSecureCredentialsTypeList();
1327 }
1328 credentials_types.push_back(kInsecureCredentialsType);
1329 for (auto it = credentials_types.begin(); it != credentials_types.end();
1330 ++it) {
1331 scenarios.push_back(TestScenario(false, *it));
1332 if (test_disable_blocking) {
1333 scenarios.push_back(TestScenario(true, *it));
1334 }
1335 }
1336 return scenarios;
1337}
1338
Craig Tiller4c06b822015-08-06 08:41:31 -07001339INSTANTIATE_TEST_CASE_P(AsyncEnd2end, AsyncEnd2endTest,
Vijay Paidf8b62c2016-05-02 14:34:24 -07001340 ::testing::ValuesIn(CreateTestScenarios(true, true)));
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001341INSTANTIATE_TEST_CASE_P(AsyncEnd2endServerTryCancel,
1342 AsyncEnd2endServerTryCancelTest,
Vijay Paidf8b62c2016-05-02 14:34:24 -07001343 ::testing::ValuesIn(CreateTestScenarios(false, false)));
Craig Tiller69f90e62015-08-06 08:32:35 -07001344
Craig Tiller0220cf12015-02-12 17:39:26 -08001345} // namespace
1346} // namespace testing
1347} // namespace grpc
1348
1349int main(int argc, char** argv) {
1350 grpc_test_init(argc, argv);
Vijay Paib65eda42016-02-16 13:48:05 -08001351 gpr_tls_init(&g_is_async_end2end_test);
Craig Tiller0220cf12015-02-12 17:39:26 -08001352 ::testing::InitGoogleTest(&argc, argv);
Vijay Paib65eda42016-02-16 13:48:05 -08001353 int ret = RUN_ALL_TESTS();
1354 gpr_tls_destroy(&g_is_async_end2end_test);
1355 return ret;
Craig Tiller0220cf12015-02-12 17:39:26 -08001356}