blob: 0de6c74c47d6c5bf8f09bf238cd01bdd00a94ac1 [file] [log] [blame]
Craig Tiller0220cf12015-02-12 17:39:26 -08001/*
2 *
Craig Tiller6169d5f2016-03-31 07:46:18 -07003 * Copyright 2015, Google Inc.
Craig Tiller0220cf12015-02-12 17:39:26 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Yang Gaoda699b82015-02-18 01:10:22 -080034#include <memory>
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -080035#include <thread>
Craig Tiller0220cf12015-02-12 17:39:26 -080036
yang-g8c2be9f2015-08-19 16:28:09 -070037#include <grpc++/channel.h>
Craig Tiller0220cf12015-02-12 17:39:26 -080038#include <grpc++/client_context.h>
39#include <grpc++/create_channel.h>
Craig Tiller0220cf12015-02-12 17:39:26 -080040#include <grpc++/server.h>
41#include <grpc++/server_builder.h>
42#include <grpc++/server_context.h>
Sree Kuchibhotlab0d0c8e2016-01-13 22:52:17 -080043#include <grpc/grpc.h>
44#include <grpc/support/thd.h>
45#include <grpc/support/time.h>
Vijay Paib65eda42016-02-16 13:48:05 -080046#include <grpc/support/tls.h>
Craig Tiller0220cf12015-02-12 17:39:26 -080047#include <gtest/gtest.h>
48
Craig Tiller1b4e3302015-12-17 16:35:00 -080049#include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
50#include "src/proto/grpc/testing/echo.grpc.pb.h"
Sree Kuchibhotlab0d0c8e2016-01-13 22:52:17 -080051#include "test/core/util/port.h"
52#include "test/core/util/test_config.h"
yang-ge21908f2015-08-25 13:47:51 -070053#include "test/cpp/util/string_ref_helper.h"
Craig Tiller0220cf12015-02-12 17:39:26 -080054
Craig Tiller69f90e62015-08-06 08:32:35 -070055#ifdef GPR_POSIX_SOCKET
Craig Tillerf45496f2016-03-30 07:41:19 -070056#include "src/core/lib/iomgr/ev_posix.h"
Craig Tiller69f90e62015-08-06 08:32:35 -070057#endif
58
Craig Tiller1b4e3302015-12-17 16:35:00 -080059using grpc::testing::EchoRequest;
60using grpc::testing::EchoResponse;
Craig Tiller0220cf12015-02-12 17:39:26 -080061using std::chrono::system_clock;
62
Vijay Paib65eda42016-02-16 13:48:05 -080063GPR_TLS_DECL(g_is_async_end2end_test);
64
Craig Tiller0220cf12015-02-12 17:39:26 -080065namespace grpc {
66namespace testing {
67
68namespace {
69
Craig Tiller7536af02015-12-22 13:49:30 -080070void* tag(int i) { return (void*)(intptr_t)i; }
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -080071int detag(void* p) { return static_cast<int>(reinterpret_cast<intptr_t>(p)); }
Yang Gaoc05b6cb2015-02-13 00:34:10 -080072
Craig Tiller69f90e62015-08-06 08:32:35 -070073#ifdef GPR_POSIX_SOCKET
Vijay Paib65eda42016-02-16 13:48:05 -080074static int maybe_assert_non_blocking_poll(struct pollfd* pfds, nfds_t nfds,
75 int timeout) {
76 if (gpr_tls_get(&g_is_async_end2end_test)) {
77 GPR_ASSERT(timeout == 0);
78 }
79 return poll(pfds, nfds, timeout);
Craig Tiller69f90e62015-08-06 08:32:35 -070080}
81
82class PollOverride {
Craig Tiller06cf3cc2015-05-13 13:11:01 -070083 public:
Craig Tiller69f90e62015-08-06 08:32:35 -070084 PollOverride(grpc_poll_function_type f) {
85 prev_ = grpc_poll_function;
86 grpc_poll_function = f;
87 }
88
Craig Tiller4c06b822015-08-06 08:41:31 -070089 ~PollOverride() { grpc_poll_function = prev_; }
Craig Tiller69f90e62015-08-06 08:32:35 -070090
91 private:
92 grpc_poll_function_type prev_;
93};
94
vjpaicf4daeb2016-02-15 02:33:54 -080095class PollingOverrider : public PollOverride {
Craig Tiller69f90e62015-08-06 08:32:35 -070096 public:
vjpaicf4daeb2016-02-15 02:33:54 -080097 explicit PollingOverrider(bool allow_blocking)
Vijay Paib65eda42016-02-16 13:48:05 -080098 : PollOverride(allow_blocking ? poll : maybe_assert_non_blocking_poll) {}
Craig Tiller69f90e62015-08-06 08:32:35 -070099};
100#else
vjpaicf4daeb2016-02-15 02:33:54 -0800101class PollingOverrider {
Craig Tiller69f90e62015-08-06 08:32:35 -0700102 public:
vjpaicf4daeb2016-02-15 02:33:54 -0800103 explicit PollingOverrider(bool allow_blocking) {}
Craig Tiller69f90e62015-08-06 08:32:35 -0700104};
105#endif
106
vjpaicf4daeb2016-02-15 02:33:54 -0800107class Verifier {
Craig Tiller69f90e62015-08-06 08:32:35 -0700108 public:
vjpaicf4daeb2016-02-15 02:33:54 -0800109 explicit Verifier(bool spin) : spin_(spin) {}
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800110 // Expect sets the expected ok value for a specific tag
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700111 Verifier& Expect(int i, bool expect_ok) {
112 expectations_[tag(i)] = expect_ok;
113 return *this;
vjpai7aadf462015-03-16 23:58:44 -0700114 }
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800115
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800116 // Next waits for 1 async tag to complete, checks its
117 // expectations, and returns the tag
118 int Next(CompletionQueue* cq, bool ignore_ok) {
119 bool ok;
120 void* got_tag;
121 if (spin_) {
122 for (;;) {
123 auto r = cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME));
124 if (r == CompletionQueue::TIMEOUT) continue;
125 if (r == CompletionQueue::GOT_EVENT) break;
126 gpr_log(GPR_ERROR, "unexpected result from AsyncNext");
127 abort();
128 }
129 } else {
130 EXPECT_TRUE(cq->Next(&got_tag, &ok));
131 }
132 auto it = expectations_.find(got_tag);
133 EXPECT_TRUE(it != expectations_.end());
134 if (!ignore_ok) {
135 EXPECT_EQ(it->second, ok);
136 }
137 expectations_.erase(it);
138 return detag(got_tag);
139 }
140
141 // Verify keeps calling Next until all currently set
142 // expected tags are complete
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800143 void Verify(CompletionQueue* cq) { Verify(cq, false); }
144
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800145 // This version of Verify allows optionally ignoring the
146 // outcome of the expectation
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800147 void Verify(CompletionQueue* cq, bool ignore_ok) {
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700148 GPR_ASSERT(!expectations_.empty());
149 while (!expectations_.empty()) {
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800150 Next(cq, ignore_ok);
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700151 }
152 }
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800153 // This version of Verify stops after a certain deadline
Craig Tillerd6c98df2015-08-18 09:33:44 -0700154 void Verify(CompletionQueue* cq,
155 std::chrono::system_clock::time_point deadline) {
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700156 if (expectations_.empty()) {
157 bool ok;
Craig Tillerd6c98df2015-08-18 09:33:44 -0700158 void* got_tag;
Craig Tiller69f90e62015-08-06 08:32:35 -0700159 if (spin_) {
160 while (std::chrono::system_clock::now() < deadline) {
Craig Tiller4c06b822015-08-06 08:41:31 -0700161 EXPECT_EQ(
162 cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME)),
163 CompletionQueue::TIMEOUT);
Craig Tiller69f90e62015-08-06 08:32:35 -0700164 }
165 } else {
Craig Tiller4c06b822015-08-06 08:41:31 -0700166 EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
167 CompletionQueue::TIMEOUT);
Craig Tiller69f90e62015-08-06 08:32:35 -0700168 }
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700169 } else {
170 while (!expectations_.empty()) {
171 bool ok;
Craig Tillerd6c98df2015-08-18 09:33:44 -0700172 void* got_tag;
Craig Tiller69f90e62015-08-06 08:32:35 -0700173 if (spin_) {
174 for (;;) {
175 GPR_ASSERT(std::chrono::system_clock::now() < deadline);
Craig Tiller4c06b822015-08-06 08:41:31 -0700176 auto r =
177 cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME));
Craig Tiller69f90e62015-08-06 08:32:35 -0700178 if (r == CompletionQueue::TIMEOUT) continue;
179 if (r == CompletionQueue::GOT_EVENT) break;
180 gpr_log(GPR_ERROR, "unexpected result from AsyncNext");
181 abort();
Craig Tiller4c06b822015-08-06 08:41:31 -0700182 }
Craig Tiller69f90e62015-08-06 08:32:35 -0700183 } else {
Craig Tiller4c06b822015-08-06 08:41:31 -0700184 EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
185 CompletionQueue::GOT_EVENT);
Craig Tiller69f90e62015-08-06 08:32:35 -0700186 }
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700187 auto it = expectations_.find(got_tag);
188 EXPECT_TRUE(it != expectations_.end());
189 EXPECT_EQ(it->second, ok);
190 expectations_.erase(it);
191 }
192 }
193 }
194
195 private:
196 std::map<void*, bool> expectations_;
Craig Tiller69f90e62015-08-06 08:32:35 -0700197 bool spin_;
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700198};
vjpai7aadf462015-03-16 23:58:44 -0700199
Yuchen Zenga42ec212016-04-29 13:03:06 -0700200// This class disables the server builder plugins that may add sync services to
201// the server. If there are sync services, UnimplementedRpc test will triger
202// the sync unkown rpc routine on the server side, rather than the async one
203// that needs to be tested here.
204class ServerBuilderSyncPluginDisabler : public ::grpc::ServerBuilderOption {
205 public:
206 void UpdateArguments(ChannelArguments* arg) GRPC_OVERRIDE {}
207
208 void UpdatePlugins(
209 std::map<grpc::string, std::unique_ptr<ServerBuilderPlugin>>* plugins)
210 GRPC_OVERRIDE {
211 auto plugin = plugins->begin();
212 while (plugin != plugins->end()) {
213 if ((*plugin).second->has_sync_methods()) {
214 plugins->erase(plugin++);
215 } else {
216 plugin++;
217 }
218 }
219 }
220};
221
Craig Tiller69f90e62015-08-06 08:32:35 -0700222class AsyncEnd2endTest : public ::testing::TestWithParam<bool> {
Craig Tiller0220cf12015-02-12 17:39:26 -0800223 protected:
Vijay Pai018879a2016-02-16 09:20:50 -0800224 AsyncEnd2endTest() {}
Craig Tiller0220cf12015-02-12 17:39:26 -0800225
Craig Tillercf133f42015-02-26 14:05:56 -0800226 void SetUp() GRPC_OVERRIDE {
Vijay Pai018879a2016-02-16 09:20:50 -0800227 poll_overrider_.reset(new PollingOverrider(!GetParam()));
228
Craig Tiller0220cf12015-02-12 17:39:26 -0800229 int port = grpc_pick_unused_port_or_die();
230 server_address_ << "localhost:" << port;
vjpai017ed622015-12-09 10:42:54 -0800231
Craig Tiller0220cf12015-02-12 17:39:26 -0800232 // Setup server
233 ServerBuilder builder;
Craig Tillerd6c98df2015-08-18 09:33:44 -0700234 builder.AddListeningPort(server_address_.str(),
235 grpc::InsecureServerCredentials());
Craig Tiller15f383c2016-01-07 12:45:32 -0800236 builder.RegisterService(&service_);
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700237 cq_ = builder.AddCompletionQueue();
Yuchen Zenga42ec212016-04-29 13:03:06 -0700238
239 // TODO(zyc): make a test option to choose wheather sync plugins should be
240 // deleted
241 std::unique_ptr<ServerBuilderOption> sync_plugin_disabler(
242 new ServerBuilderSyncPluginDisabler());
243 builder.SetOption(move(sync_plugin_disabler));
Craig Tiller0220cf12015-02-12 17:39:26 -0800244 server_ = builder.BuildAndStart();
Vijay Paib65eda42016-02-16 13:48:05 -0800245
246 gpr_tls_set(&g_is_async_end2end_test, 1);
Craig Tiller0220cf12015-02-12 17:39:26 -0800247 }
248
Craig Tillercf133f42015-02-26 14:05:56 -0800249 void TearDown() GRPC_OVERRIDE {
Craig Tiller492968f2015-02-18 13:14:03 -0800250 server_->Shutdown();
251 void* ignored_tag;
252 bool ignored_ok;
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700253 cq_->Shutdown();
254 while (cq_->Next(&ignored_tag, &ignored_ok))
Craig Tiller492968f2015-02-18 13:14:03 -0800255 ;
Vijay Pai018879a2016-02-16 09:20:50 -0800256 poll_overrider_.reset();
Vijay Paib65eda42016-02-16 13:48:05 -0800257 gpr_tls_set(&g_is_async_end2end_test, 0);
Craig Tiller492968f2015-02-18 13:14:03 -0800258 }
Craig Tiller0220cf12015-02-12 17:39:26 -0800259
260 void ResetStub() {
yang-g730055d2015-08-27 12:29:45 -0700261 std::shared_ptr<Channel> channel =
Julien Boeufe5adc0e2015-10-12 14:08:10 -0700262 CreateChannel(server_address_.str(), InsecureChannelCredentials());
Sree Kuchibhotla5a05f512016-01-13 22:43:20 -0800263 stub_ = grpc::testing::EchoTestService::NewStub(channel);
Craig Tiller0220cf12015-02-12 17:39:26 -0800264 }
265
Yang Gao406b32f2015-02-13 16:25:33 -0800266 void SendRpc(int num_rpcs) {
267 for (int i = 0; i < num_rpcs; i++) {
268 EchoRequest send_request;
269 EchoRequest recv_request;
270 EchoResponse send_response;
271 EchoResponse recv_response;
272 Status recv_status;
273
274 ClientContext cli_ctx;
275 ServerContext srv_ctx;
276 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
277
278 send_request.set_message("Hello");
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800279 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700280 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
Yang Gao406b32f2015-02-13 16:25:33 -0800281
Craig Tillerd6c98df2015-08-18 09:33:44 -0700282 service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
283 cq_.get(), tag(2));
Yang Gao406b32f2015-02-13 16:25:33 -0800284
Craig Tiller69f90e62015-08-06 08:32:35 -0700285 Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
Yang Gao406b32f2015-02-13 16:25:33 -0800286 EXPECT_EQ(send_request.message(), recv_request.message());
287
288 send_response.set_message(recv_request.message());
289 response_writer.Finish(send_response, Status::OK, tag(3));
Craig Tiller69f90e62015-08-06 08:32:35 -0700290 Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
Yang Gao406b32f2015-02-13 16:25:33 -0800291
Yang Gao3a5e5492015-02-18 14:32:38 -0800292 response_reader->Finish(&recv_response, &recv_status, tag(4));
Craig Tiller69f90e62015-08-06 08:32:35 -0700293 Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
Yang Gao406b32f2015-02-13 16:25:33 -0800294
295 EXPECT_EQ(send_response.message(), recv_response.message());
Yang Gaoc1a2c312015-06-16 10:59:46 -0700296 EXPECT_TRUE(recv_status.ok());
Yang Gao406b32f2015-02-13 16:25:33 -0800297 }
298 }
299
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700300 std::unique_ptr<ServerCompletionQueue> cq_;
Sree Kuchibhotla5a05f512016-01-13 22:43:20 -0800301 std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
Craig Tiller0220cf12015-02-12 17:39:26 -0800302 std::unique_ptr<Server> server_;
Sree Kuchibhotla5a05f512016-01-13 22:43:20 -0800303 grpc::testing::EchoTestService::AsyncService service_;
Craig Tiller0220cf12015-02-12 17:39:26 -0800304 std::ostringstream server_address_;
vjpaicf4daeb2016-02-15 02:33:54 -0800305
Vijay Pai018879a2016-02-16 09:20:50 -0800306 std::unique_ptr<PollingOverrider> poll_overrider_;
Craig Tiller0220cf12015-02-12 17:39:26 -0800307};
308
Craig Tiller69f90e62015-08-06 08:32:35 -0700309TEST_P(AsyncEnd2endTest, SimpleRpc) {
Craig Tiller0220cf12015-02-12 17:39:26 -0800310 ResetStub();
Yang Gao406b32f2015-02-13 16:25:33 -0800311 SendRpc(1);
312}
Yang Gaobb84a302015-02-12 23:30:12 -0800313
Craig Tiller69f90e62015-08-06 08:32:35 -0700314TEST_P(AsyncEnd2endTest, SequentialRpcs) {
Yang Gao406b32f2015-02-13 16:25:33 -0800315 ResetStub();
316 SendRpc(10);
Craig Tiller0220cf12015-02-12 17:39:26 -0800317}
318
vjpai7aadf462015-03-16 23:58:44 -0700319// Test a simple RPC using the async version of Next
Craig Tiller69f90e62015-08-06 08:32:35 -0700320TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
vjpai7aadf462015-03-16 23:58:44 -0700321 ResetStub();
322
323 EchoRequest send_request;
324 EchoRequest recv_request;
325 EchoResponse send_response;
326 EchoResponse recv_response;
327 Status recv_status;
328
329 ClientContext cli_ctx;
330 ServerContext srv_ctx;
331 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
332
333 send_request.set_message("Hello");
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800334 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700335 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
vjpai7aadf462015-03-16 23:58:44 -0700336
Yang Gao757afae2015-03-17 15:49:26 -0700337 std::chrono::system_clock::time_point time_now(
Craig Tillerf51199f2015-05-08 09:32:53 -0700338 std::chrono::system_clock::now());
339 std::chrono::system_clock::time_point time_limit(
340 std::chrono::system_clock::now() + std::chrono::seconds(10));
Craig Tiller69f90e62015-08-06 08:32:35 -0700341 Verifier(GetParam()).Verify(cq_.get(), time_now);
342 Verifier(GetParam()).Verify(cq_.get(), time_now);
vjpai7aadf462015-03-16 23:58:44 -0700343
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700344 service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
345 cq_.get(), tag(2));
vjpai7aadf462015-03-16 23:58:44 -0700346
Craig Tiller69f90e62015-08-06 08:32:35 -0700347 Verifier(GetParam()).Expect(2, true).Verify(cq_.get(), time_limit);
vjpai7aadf462015-03-16 23:58:44 -0700348 EXPECT_EQ(send_request.message(), recv_request.message());
vjpai7aadf462015-03-16 23:58:44 -0700349
350 send_response.set_message(recv_request.message());
351 response_writer.Finish(send_response, Status::OK, tag(3));
Craig Tiller4c06b822015-08-06 08:41:31 -0700352 Verifier(GetParam())
353 .Expect(3, true)
354 .Verify(cq_.get(), std::chrono::system_clock::time_point::max());
vjpai7aadf462015-03-16 23:58:44 -0700355
356 response_reader->Finish(&recv_response, &recv_status, tag(4));
Craig Tiller4c06b822015-08-06 08:41:31 -0700357 Verifier(GetParam())
358 .Expect(4, true)
359 .Verify(cq_.get(), std::chrono::system_clock::time_point::max());
vjpai7aadf462015-03-16 23:58:44 -0700360
361 EXPECT_EQ(send_response.message(), recv_response.message());
Yang Gaoc1a2c312015-06-16 10:59:46 -0700362 EXPECT_TRUE(recv_status.ok());
vjpai7aadf462015-03-16 23:58:44 -0700363}
Yang Gao757afae2015-03-17 15:49:26 -0700364
Yang Gao0e0d8e12015-02-13 14:40:41 -0800365// Two pings and a final pong.
Craig Tiller69f90e62015-08-06 08:32:35 -0700366TEST_P(AsyncEnd2endTest, SimpleClientStreaming) {
Yang Gao005f18a2015-02-13 10:22:33 -0800367 ResetStub();
368
369 EchoRequest send_request;
370 EchoRequest recv_request;
371 EchoResponse send_response;
372 EchoResponse recv_response;
373 Status recv_status;
374 ClientContext cli_ctx;
375 ServerContext srv_ctx;
376 ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
377
378 send_request.set_message("Hello");
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800379 std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700380 stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
Yang Gao005f18a2015-02-13 10:22:33 -0800381
Craig Tillerd6c98df2015-08-18 09:33:44 -0700382 service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
383 tag(2));
Yang Gao005f18a2015-02-13 10:22:33 -0800384
Craig Tiller69f90e62015-08-06 08:32:35 -0700385 Verifier(GetParam()).Expect(2, true).Expect(1, true).Verify(cq_.get());
Yang Gao005f18a2015-02-13 10:22:33 -0800386
387 cli_stream->Write(send_request, tag(3));
Craig Tiller69f90e62015-08-06 08:32:35 -0700388 Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
Yang Gao005f18a2015-02-13 10:22:33 -0800389
390 srv_stream.Read(&recv_request, tag(4));
Craig Tiller69f90e62015-08-06 08:32:35 -0700391 Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
Yang Gao005f18a2015-02-13 10:22:33 -0800392 EXPECT_EQ(send_request.message(), recv_request.message());
393
394 cli_stream->Write(send_request, tag(5));
Craig Tiller69f90e62015-08-06 08:32:35 -0700395 Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
Yang Gao005f18a2015-02-13 10:22:33 -0800396
397 srv_stream.Read(&recv_request, tag(6));
Craig Tiller69f90e62015-08-06 08:32:35 -0700398 Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
Yang Gao005f18a2015-02-13 10:22:33 -0800399
400 EXPECT_EQ(send_request.message(), recv_request.message());
401 cli_stream->WritesDone(tag(7));
Craig Tiller69f90e62015-08-06 08:32:35 -0700402 Verifier(GetParam()).Expect(7, true).Verify(cq_.get());
Yang Gao005f18a2015-02-13 10:22:33 -0800403
404 srv_stream.Read(&recv_request, tag(8));
Craig Tiller69f90e62015-08-06 08:32:35 -0700405 Verifier(GetParam()).Expect(8, false).Verify(cq_.get());
Yang Gao005f18a2015-02-13 10:22:33 -0800406
407 send_response.set_message(recv_request.message());
408 srv_stream.Finish(send_response, Status::OK, tag(9));
Craig Tiller69f90e62015-08-06 08:32:35 -0700409 Verifier(GetParam()).Expect(9, true).Verify(cq_.get());
Yang Gao005f18a2015-02-13 10:22:33 -0800410
411 cli_stream->Finish(&recv_status, tag(10));
Craig Tiller69f90e62015-08-06 08:32:35 -0700412 Verifier(GetParam()).Expect(10, true).Verify(cq_.get());
Yang Gao005f18a2015-02-13 10:22:33 -0800413
414 EXPECT_EQ(send_response.message(), recv_response.message());
Yang Gaoc1a2c312015-06-16 10:59:46 -0700415 EXPECT_TRUE(recv_status.ok());
Yang Gao005f18a2015-02-13 10:22:33 -0800416}
417
Yang Gao0e0d8e12015-02-13 14:40:41 -0800418// One ping, two pongs.
Craig Tiller69f90e62015-08-06 08:32:35 -0700419TEST_P(AsyncEnd2endTest, SimpleServerStreaming) {
Yang Gao0e0d8e12015-02-13 14:40:41 -0800420 ResetStub();
421
422 EchoRequest send_request;
423 EchoRequest recv_request;
424 EchoResponse send_response;
425 EchoResponse recv_response;
426 Status recv_status;
427 ClientContext cli_ctx;
428 ServerContext srv_ctx;
429 ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
430
431 send_request.set_message("Hello");
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800432 std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700433 stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
Yang Gao0e0d8e12015-02-13 14:40:41 -0800434
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700435 service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700436 cq_.get(), cq_.get(), tag(2));
Yang Gao0e0d8e12015-02-13 14:40:41 -0800437
Craig Tiller69f90e62015-08-06 08:32:35 -0700438 Verifier(GetParam()).Expect(1, true).Expect(2, true).Verify(cq_.get());
Yang Gao0e0d8e12015-02-13 14:40:41 -0800439 EXPECT_EQ(send_request.message(), recv_request.message());
440
441 send_response.set_message(recv_request.message());
442 srv_stream.Write(send_response, tag(3));
Craig Tiller69f90e62015-08-06 08:32:35 -0700443 Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
Yang Gao0e0d8e12015-02-13 14:40:41 -0800444
445 cli_stream->Read(&recv_response, tag(4));
Craig Tiller69f90e62015-08-06 08:32:35 -0700446 Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
Yang Gao0e0d8e12015-02-13 14:40:41 -0800447 EXPECT_EQ(send_response.message(), recv_response.message());
448
449 srv_stream.Write(send_response, tag(5));
Craig Tiller69f90e62015-08-06 08:32:35 -0700450 Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
Yang Gao0e0d8e12015-02-13 14:40:41 -0800451
452 cli_stream->Read(&recv_response, tag(6));
Craig Tiller69f90e62015-08-06 08:32:35 -0700453 Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
Yang Gao0e0d8e12015-02-13 14:40:41 -0800454 EXPECT_EQ(send_response.message(), recv_response.message());
455
456 srv_stream.Finish(Status::OK, tag(7));
Craig Tiller69f90e62015-08-06 08:32:35 -0700457 Verifier(GetParam()).Expect(7, true).Verify(cq_.get());
Yang Gao0e0d8e12015-02-13 14:40:41 -0800458
459 cli_stream->Read(&recv_response, tag(8));
Craig Tiller69f90e62015-08-06 08:32:35 -0700460 Verifier(GetParam()).Expect(8, false).Verify(cq_.get());
Yang Gao0e0d8e12015-02-13 14:40:41 -0800461
462 cli_stream->Finish(&recv_status, tag(9));
Craig Tiller69f90e62015-08-06 08:32:35 -0700463 Verifier(GetParam()).Expect(9, true).Verify(cq_.get());
Yang Gao0e0d8e12015-02-13 14:40:41 -0800464
Yang Gaoc1a2c312015-06-16 10:59:46 -0700465 EXPECT_TRUE(recv_status.ok());
Yang Gao0e0d8e12015-02-13 14:40:41 -0800466}
467
468// One ping, one pong.
Craig Tiller69f90e62015-08-06 08:32:35 -0700469TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) {
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800470 ResetStub();
471
472 EchoRequest send_request;
473 EchoRequest recv_request;
474 EchoResponse send_response;
475 EchoResponse recv_response;
476 Status recv_status;
477 ClientContext cli_ctx;
478 ServerContext srv_ctx;
479 ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
480
481 send_request.set_message("Hello");
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800482 std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700483 cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800484
Craig Tillerd6c98df2015-08-18 09:33:44 -0700485 service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
486 tag(2));
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800487
Craig Tiller69f90e62015-08-06 08:32:35 -0700488 Verifier(GetParam()).Expect(1, true).Expect(2, true).Verify(cq_.get());
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800489
490 cli_stream->Write(send_request, tag(3));
Craig Tiller69f90e62015-08-06 08:32:35 -0700491 Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800492
493 srv_stream.Read(&recv_request, tag(4));
Craig Tiller69f90e62015-08-06 08:32:35 -0700494 Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800495 EXPECT_EQ(send_request.message(), recv_request.message());
496
497 send_response.set_message(recv_request.message());
498 srv_stream.Write(send_response, tag(5));
Craig Tiller69f90e62015-08-06 08:32:35 -0700499 Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800500
501 cli_stream->Read(&recv_response, tag(6));
Craig Tiller69f90e62015-08-06 08:32:35 -0700502 Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800503 EXPECT_EQ(send_response.message(), recv_response.message());
504
505 cli_stream->WritesDone(tag(7));
Craig Tiller69f90e62015-08-06 08:32:35 -0700506 Verifier(GetParam()).Expect(7, true).Verify(cq_.get());
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800507
508 srv_stream.Read(&recv_request, tag(8));
Craig Tiller69f90e62015-08-06 08:32:35 -0700509 Verifier(GetParam()).Expect(8, false).Verify(cq_.get());
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800510
511 srv_stream.Finish(Status::OK, tag(9));
Craig Tiller69f90e62015-08-06 08:32:35 -0700512 Verifier(GetParam()).Expect(9, true).Verify(cq_.get());
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800513
514 cli_stream->Finish(&recv_status, tag(10));
Craig Tiller69f90e62015-08-06 08:32:35 -0700515 Verifier(GetParam()).Expect(10, true).Verify(cq_.get());
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800516
Yang Gaoc1a2c312015-06-16 10:59:46 -0700517 EXPECT_TRUE(recv_status.ok());
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800518}
519
Yang Gao406b32f2015-02-13 16:25:33 -0800520// Metadata tests
Craig Tiller69f90e62015-08-06 08:32:35 -0700521TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
Yang Gao406b32f2015-02-13 16:25:33 -0800522 ResetStub();
523
524 EchoRequest send_request;
525 EchoRequest recv_request;
526 EchoResponse send_response;
527 EchoResponse recv_response;
528 Status recv_status;
529
530 ClientContext cli_ctx;
531 ServerContext srv_ctx;
532 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
533
534 send_request.set_message("Hello");
535 std::pair<grpc::string, grpc::string> meta1("key1", "val1");
536 std::pair<grpc::string, grpc::string> meta2("key2", "val2");
Craig Tiller6f871642016-02-03 16:15:31 -0800537 std::pair<grpc::string, grpc::string> meta3("g.r.d-bin", "xyz");
Yang Gao406b32f2015-02-13 16:25:33 -0800538 cli_ctx.AddMetadata(meta1.first, meta1.second);
539 cli_ctx.AddMetadata(meta2.first, meta2.second);
Craig Tiller6f871642016-02-03 16:15:31 -0800540 cli_ctx.AddMetadata(meta3.first, meta3.second);
Yang Gao406b32f2015-02-13 16:25:33 -0800541
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800542 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700543 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
Yang Gao406b32f2015-02-13 16:25:33 -0800544
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700545 service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
546 cq_.get(), tag(2));
Craig Tiller69f90e62015-08-06 08:32:35 -0700547 Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
Yang Gao406b32f2015-02-13 16:25:33 -0800548 EXPECT_EQ(send_request.message(), recv_request.message());
549 auto client_initial_metadata = srv_ctx.client_metadata();
yang-ge21908f2015-08-25 13:47:51 -0700550 EXPECT_EQ(meta1.second,
551 ToString(client_initial_metadata.find(meta1.first)->second));
552 EXPECT_EQ(meta2.second,
553 ToString(client_initial_metadata.find(meta2.first)->second));
Craig Tiller6f871642016-02-03 16:15:31 -0800554 EXPECT_EQ(meta3.second,
555 ToString(client_initial_metadata.find(meta3.first)->second));
Craig Tiller8bf2dca2015-07-10 13:08:41 -0700556 EXPECT_GE(client_initial_metadata.size(), static_cast<size_t>(2));
Yang Gao406b32f2015-02-13 16:25:33 -0800557
558 send_response.set_message(recv_request.message());
559 response_writer.Finish(send_response, Status::OK, tag(3));
560
Craig Tiller69f90e62015-08-06 08:32:35 -0700561 Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
Yang Gao406b32f2015-02-13 16:25:33 -0800562
Yang Gao3a5e5492015-02-18 14:32:38 -0800563 response_reader->Finish(&recv_response, &recv_status, tag(4));
Craig Tiller69f90e62015-08-06 08:32:35 -0700564 Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
Yang Gao406b32f2015-02-13 16:25:33 -0800565
566 EXPECT_EQ(send_response.message(), recv_response.message());
Yang Gaoc1a2c312015-06-16 10:59:46 -0700567 EXPECT_TRUE(recv_status.ok());
Yang Gao406b32f2015-02-13 16:25:33 -0800568}
569
Craig Tiller69f90e62015-08-06 08:32:35 -0700570TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) {
Yang Gao2b7f5372015-02-18 00:45:53 -0800571 ResetStub();
572
573 EchoRequest send_request;
574 EchoRequest recv_request;
575 EchoResponse send_response;
576 EchoResponse recv_response;
577 Status recv_status;
578
579 ClientContext cli_ctx;
580 ServerContext srv_ctx;
581 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
582
583 send_request.set_message("Hello");
584 std::pair<grpc::string, grpc::string> meta1("key1", "val1");
585 std::pair<grpc::string, grpc::string> meta2("key2", "val2");
586
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800587 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700588 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
Yang Gao2b7f5372015-02-18 00:45:53 -0800589
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700590 service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
591 cq_.get(), tag(2));
Craig Tiller69f90e62015-08-06 08:32:35 -0700592 Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
Yang Gao2b7f5372015-02-18 00:45:53 -0800593 EXPECT_EQ(send_request.message(), recv_request.message());
594 srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
595 srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
596 response_writer.SendInitialMetadata(tag(3));
Craig Tiller69f90e62015-08-06 08:32:35 -0700597 Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
Yang Gao2b7f5372015-02-18 00:45:53 -0800598
Yang Gao3a5e5492015-02-18 14:32:38 -0800599 response_reader->ReadInitialMetadata(tag(4));
Craig Tiller69f90e62015-08-06 08:32:35 -0700600 Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
Yang Gao2b7f5372015-02-18 00:45:53 -0800601 auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
yang-ge21908f2015-08-25 13:47:51 -0700602 EXPECT_EQ(meta1.second,
603 ToString(server_initial_metadata.find(meta1.first)->second));
604 EXPECT_EQ(meta2.second,
605 ToString(server_initial_metadata.find(meta2.first)->second));
vjpaid5577aa2015-02-18 22:26:48 -0800606 EXPECT_EQ(static_cast<size_t>(2), server_initial_metadata.size());
Yang Gao3a5e5492015-02-18 14:32:38 -0800607
608 send_response.set_message(recv_request.message());
609 response_writer.Finish(send_response, Status::OK, tag(5));
Craig Tiller69f90e62015-08-06 08:32:35 -0700610 Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
Yang Gao3a5e5492015-02-18 14:32:38 -0800611
612 response_reader->Finish(&recv_response, &recv_status, tag(6));
Craig Tiller69f90e62015-08-06 08:32:35 -0700613 Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
Yang Gao3a5e5492015-02-18 14:32:38 -0800614
615 EXPECT_EQ(send_response.message(), recv_response.message());
Yang Gaoc1a2c312015-06-16 10:59:46 -0700616 EXPECT_TRUE(recv_status.ok());
Yang Gao2b7f5372015-02-18 00:45:53 -0800617}
618
Craig Tiller69f90e62015-08-06 08:32:35 -0700619TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
Yang Gao2b7f5372015-02-18 00:45:53 -0800620 ResetStub();
621
622 EchoRequest send_request;
623 EchoRequest recv_request;
624 EchoResponse send_response;
625 EchoResponse recv_response;
626 Status recv_status;
627
628 ClientContext cli_ctx;
629 ServerContext srv_ctx;
630 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
631
632 send_request.set_message("Hello");
633 std::pair<grpc::string, grpc::string> meta1("key1", "val1");
634 std::pair<grpc::string, grpc::string> meta2("key2", "val2");
635
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800636 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700637 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
Yang Gao2b7f5372015-02-18 00:45:53 -0800638
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700639 service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
640 cq_.get(), tag(2));
Craig Tiller69f90e62015-08-06 08:32:35 -0700641 Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
Yang Gao2b7f5372015-02-18 00:45:53 -0800642 EXPECT_EQ(send_request.message(), recv_request.message());
643 response_writer.SendInitialMetadata(tag(3));
Craig Tiller69f90e62015-08-06 08:32:35 -0700644 Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
Yang Gao2b7f5372015-02-18 00:45:53 -0800645
646 send_response.set_message(recv_request.message());
647 srv_ctx.AddTrailingMetadata(meta1.first, meta1.second);
648 srv_ctx.AddTrailingMetadata(meta2.first, meta2.second);
649 response_writer.Finish(send_response, Status::OK, tag(4));
650
Craig Tiller69f90e62015-08-06 08:32:35 -0700651 Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
Yang Gao2b7f5372015-02-18 00:45:53 -0800652
Yang Gao3a5e5492015-02-18 14:32:38 -0800653 response_reader->Finish(&recv_response, &recv_status, tag(5));
Craig Tiller69f90e62015-08-06 08:32:35 -0700654 Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
Yang Gao2b7f5372015-02-18 00:45:53 -0800655 EXPECT_EQ(send_response.message(), recv_response.message());
Yang Gaoc1a2c312015-06-16 10:59:46 -0700656 EXPECT_TRUE(recv_status.ok());
Yang Gao2b7f5372015-02-18 00:45:53 -0800657 auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
yang-ge21908f2015-08-25 13:47:51 -0700658 EXPECT_EQ(meta1.second,
659 ToString(server_trailing_metadata.find(meta1.first)->second));
660 EXPECT_EQ(meta2.second,
661 ToString(server_trailing_metadata.find(meta2.first)->second));
vjpaid5577aa2015-02-18 22:26:48 -0800662 EXPECT_EQ(static_cast<size_t>(2), server_trailing_metadata.size());
Yang Gao2b7f5372015-02-18 00:45:53 -0800663}
664
Craig Tiller69f90e62015-08-06 08:32:35 -0700665TEST_P(AsyncEnd2endTest, MetadataRpc) {
Yang Gao2b7f5372015-02-18 00:45:53 -0800666 ResetStub();
667
668 EchoRequest send_request;
669 EchoRequest recv_request;
670 EchoResponse send_response;
671 EchoResponse recv_response;
672 Status recv_status;
673
674 ClientContext cli_ctx;
675 ServerContext srv_ctx;
676 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
677
678 send_request.set_message("Hello");
679 std::pair<grpc::string, grpc::string> meta1("key1", "val1");
Yang Gao3a5e5492015-02-18 14:32:38 -0800680 std::pair<grpc::string, grpc::string> meta2(
Vijay Pai92a928f2015-03-26 16:30:22 -0400681 "key2-bin",
Craig Tillerd6c98df2015-08-18 09:33:44 -0700682 grpc::string("\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc", 13));
Yang Gao2b7f5372015-02-18 00:45:53 -0800683 std::pair<grpc::string, grpc::string> meta3("key3", "val3");
Craig Tiller47c83fd2015-02-21 22:45:35 -0800684 std::pair<grpc::string, grpc::string> meta6(
685 "key4-bin",
Vijay Pai92a928f2015-03-26 16:30:22 -0400686 grpc::string("\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d",
Craig Tillerd6c98df2015-08-18 09:33:44 -0700687 14));
Yang Gao2b7f5372015-02-18 00:45:53 -0800688 std::pair<grpc::string, grpc::string> meta5("key5", "val5");
Craig Tiller47c83fd2015-02-21 22:45:35 -0800689 std::pair<grpc::string, grpc::string> meta4(
690 "key6-bin",
Craig Tillerd6c98df2015-08-18 09:33:44 -0700691 grpc::string(
692 "\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee", 15));
Yang Gao2b7f5372015-02-18 00:45:53 -0800693
694 cli_ctx.AddMetadata(meta1.first, meta1.second);
695 cli_ctx.AddMetadata(meta2.first, meta2.second);
696
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800697 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700698 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
Yang Gao2b7f5372015-02-18 00:45:53 -0800699
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700700 service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
701 cq_.get(), tag(2));
Craig Tiller69f90e62015-08-06 08:32:35 -0700702 Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
Yang Gao2b7f5372015-02-18 00:45:53 -0800703 EXPECT_EQ(send_request.message(), recv_request.message());
704 auto client_initial_metadata = srv_ctx.client_metadata();
yang-ge21908f2015-08-25 13:47:51 -0700705 EXPECT_EQ(meta1.second,
706 ToString(client_initial_metadata.find(meta1.first)->second));
707 EXPECT_EQ(meta2.second,
708 ToString(client_initial_metadata.find(meta2.first)->second));
Craig Tiller8bf2dca2015-07-10 13:08:41 -0700709 EXPECT_GE(client_initial_metadata.size(), static_cast<size_t>(2));
Yang Gao2b7f5372015-02-18 00:45:53 -0800710
711 srv_ctx.AddInitialMetadata(meta3.first, meta3.second);
712 srv_ctx.AddInitialMetadata(meta4.first, meta4.second);
713 response_writer.SendInitialMetadata(tag(3));
Craig Tiller69f90e62015-08-06 08:32:35 -0700714 Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
Yang Gao3a5e5492015-02-18 14:32:38 -0800715 response_reader->ReadInitialMetadata(tag(4));
Craig Tiller69f90e62015-08-06 08:32:35 -0700716 Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
Yang Gao2b7f5372015-02-18 00:45:53 -0800717 auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
yang-ge21908f2015-08-25 13:47:51 -0700718 EXPECT_EQ(meta3.second,
719 ToString(server_initial_metadata.find(meta3.first)->second));
720 EXPECT_EQ(meta4.second,
721 ToString(server_initial_metadata.find(meta4.first)->second));
Craig Tiller8bf2dca2015-07-10 13:08:41 -0700722 EXPECT_GE(server_initial_metadata.size(), static_cast<size_t>(2));
Yang Gao3a5e5492015-02-18 14:32:38 -0800723
724 send_response.set_message(recv_request.message());
725 srv_ctx.AddTrailingMetadata(meta5.first, meta5.second);
726 srv_ctx.AddTrailingMetadata(meta6.first, meta6.second);
727 response_writer.Finish(send_response, Status::OK, tag(5));
728
Craig Tiller69f90e62015-08-06 08:32:35 -0700729 Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
Yang Gao3a5e5492015-02-18 14:32:38 -0800730
Yang Gao3a5e5492015-02-18 14:32:38 -0800731 response_reader->Finish(&recv_response, &recv_status, tag(6));
Craig Tiller69f90e62015-08-06 08:32:35 -0700732 Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
Yang Gao3a5e5492015-02-18 14:32:38 -0800733 EXPECT_EQ(send_response.message(), recv_response.message());
Yang Gaoc1a2c312015-06-16 10:59:46 -0700734 EXPECT_TRUE(recv_status.ok());
Yang Gao2b7f5372015-02-18 00:45:53 -0800735 auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
yang-ge21908f2015-08-25 13:47:51 -0700736 EXPECT_EQ(meta5.second,
737 ToString(server_trailing_metadata.find(meta5.first)->second));
738 EXPECT_EQ(meta6.second,
739 ToString(server_trailing_metadata.find(meta6.first)->second));
Craig Tiller8bf2dca2015-07-10 13:08:41 -0700740 EXPECT_GE(server_trailing_metadata.size(), static_cast<size_t>(2));
Yang Gao2b7f5372015-02-18 00:45:53 -0800741}
yang-gb3352562015-08-04 14:42:06 -0700742
743// Server uses AsyncNotifyWhenDone API to check for cancellation
Craig Tiller69f90e62015-08-06 08:32:35 -0700744TEST_P(AsyncEnd2endTest, ServerCheckCancellation) {
yang-gb3352562015-08-04 14:42:06 -0700745 ResetStub();
746
747 EchoRequest send_request;
748 EchoRequest recv_request;
749 EchoResponse send_response;
750 EchoResponse recv_response;
751 Status recv_status;
752
753 ClientContext cli_ctx;
754 ServerContext srv_ctx;
755 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
756
757 send_request.set_message("Hello");
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800758 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
yang-gb3352562015-08-04 14:42:06 -0700759 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
760
761 srv_ctx.AsyncNotifyWhenDone(tag(5));
762 service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
763 cq_.get(), tag(2));
764
Craig Tiller69f90e62015-08-06 08:32:35 -0700765 Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
yang-gb3352562015-08-04 14:42:06 -0700766 EXPECT_EQ(send_request.message(), recv_request.message());
767
768 cli_ctx.TryCancel();
Craig Tiller69f90e62015-08-06 08:32:35 -0700769 Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
yang-gb3352562015-08-04 14:42:06 -0700770 EXPECT_TRUE(srv_ctx.IsCancelled());
771
772 response_reader->Finish(&recv_response, &recv_status, tag(4));
Craig Tiller69f90e62015-08-06 08:32:35 -0700773 Verifier(GetParam()).Expect(4, false).Verify(cq_.get());
yang-gb3352562015-08-04 14:42:06 -0700774
775 EXPECT_EQ(StatusCode::CANCELLED, recv_status.error_code());
776}
777
778// Server uses AsyncNotifyWhenDone API to check for normal finish
Craig Tiller69f90e62015-08-06 08:32:35 -0700779TEST_P(AsyncEnd2endTest, ServerCheckDone) {
yang-gb3352562015-08-04 14:42:06 -0700780 ResetStub();
781
782 EchoRequest send_request;
783 EchoRequest recv_request;
784 EchoResponse send_response;
785 EchoResponse recv_response;
786 Status recv_status;
787
788 ClientContext cli_ctx;
789 ServerContext srv_ctx;
790 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
791
792 send_request.set_message("Hello");
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800793 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
yang-gb3352562015-08-04 14:42:06 -0700794 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
795
796 srv_ctx.AsyncNotifyWhenDone(tag(5));
797 service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
798 cq_.get(), tag(2));
799
Craig Tiller69f90e62015-08-06 08:32:35 -0700800 Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
yang-gb3352562015-08-04 14:42:06 -0700801 EXPECT_EQ(send_request.message(), recv_request.message());
802
803 send_response.set_message(recv_request.message());
804 response_writer.Finish(send_response, Status::OK, tag(3));
Craig Tiller69f90e62015-08-06 08:32:35 -0700805 Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
806 Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
yang-gb3352562015-08-04 14:42:06 -0700807 EXPECT_FALSE(srv_ctx.IsCancelled());
808
809 response_reader->Finish(&recv_response, &recv_status, tag(4));
Craig Tiller69f90e62015-08-06 08:32:35 -0700810 Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
yang-gb3352562015-08-04 14:42:06 -0700811
812 EXPECT_EQ(send_response.message(), recv_response.message());
813 EXPECT_TRUE(recv_status.ok());
814}
815
Craig Tiller8f7bff72015-08-17 13:23:14 -0700816TEST_P(AsyncEnd2endTest, UnimplementedRpc) {
yang-g730055d2015-08-27 12:29:45 -0700817 std::shared_ptr<Channel> channel =
Julien Boeufe5adc0e2015-10-12 14:08:10 -0700818 CreateChannel(server_address_.str(), InsecureChannelCredentials());
Craig Tiller1b4e3302015-12-17 16:35:00 -0800819 std::unique_ptr<grpc::testing::UnimplementedService::Stub> stub;
820 stub = grpc::testing::UnimplementedService::NewStub(channel);
yang-g9b7757d2015-08-13 11:15:53 -0700821 EchoRequest send_request;
822 EchoResponse recv_response;
823 Status recv_status;
824
825 ClientContext cli_ctx;
826 send_request.set_message("Hello");
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800827 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
yang-g9b7757d2015-08-13 11:15:53 -0700828 stub->AsyncUnimplemented(&cli_ctx, send_request, cq_.get()));
829
830 response_reader->Finish(&recv_response, &recv_status, tag(4));
Craig Tiller8f7bff72015-08-17 13:23:14 -0700831 Verifier(GetParam()).Expect(4, false).Verify(cq_.get());
yang-g9b7757d2015-08-13 11:15:53 -0700832
833 EXPECT_EQ(StatusCode::UNIMPLEMENTED, recv_status.error_code());
834 EXPECT_EQ("", recv_status.error_message());
835}
836
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -0800837// This class is for testing scenarios where RPCs are cancelled on the server
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800838// by calling ServerContext::TryCancel(). Server uses AsyncNotifyWhenDone
839// API to check for cancellation
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800840class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
841 protected:
842 typedef enum {
843 DO_NOT_CANCEL = 0,
844 CANCEL_BEFORE_PROCESSING,
845 CANCEL_DURING_PROCESSING,
846 CANCEL_AFTER_PROCESSING
847 } ServerTryCancelRequestPhase;
848
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -0800849 // Helper for testing client-streaming RPCs which are cancelled on the server.
850 // Depending on the value of server_try_cancel parameter, this will test one
851 // of the following three scenarios:
852 // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading
853 // any messages from the client
854 //
855 // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading
856 // messages from the client
857 //
858 // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all
859 // messages from the client (but before sending any status back to the
860 // client)
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800861 void TestClientStreamingServerCancel(
862 ServerTryCancelRequestPhase server_try_cancel) {
863 ResetStub();
864
865 EchoRequest send_request;
866 EchoRequest recv_request;
867 EchoResponse send_response;
868 EchoResponse recv_response;
869 Status recv_status;
870
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800871 ClientContext cli_ctx;
872 ServerContext srv_ctx;
873 ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
874
875 // Initiate the 'RequestStream' call on client
876 std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
Sree Kuchibhotla4fb59082016-01-29 11:16:24 -0800877 stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
878 Verifier(GetParam()).Expect(1, true).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800879
880 // On the server, request to be notified of 'RequestStream' calls
881 // and receive the 'RequestStream' call just made by the client
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800882 srv_ctx.AsyncNotifyWhenDone(tag(11));
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800883 service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
884 tag(2));
885 Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
886
887 // Client sends 3 messages (tags 3, 4 and 5)
888 for (int tag_idx = 3; tag_idx <= 5; tag_idx++) {
889 send_request.set_message("Ping " + std::to_string(tag_idx));
890 cli_stream->Write(send_request, tag(tag_idx));
Sree Kuchibhotla4fb59082016-01-29 11:16:24 -0800891 Verifier(GetParam()).Expect(tag_idx, true).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800892 }
893 cli_stream->WritesDone(tag(6));
Sree Kuchibhotla4fb59082016-01-29 11:16:24 -0800894 Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800895
896 bool expected_server_cq_result = true;
897 bool ignore_cq_result = false;
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800898 bool want_done_tag = false;
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800899
900 if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800901 srv_ctx.TryCancel();
902 Verifier(GetParam()).Expect(11, true).Verify(cq_.get());
903 EXPECT_TRUE(srv_ctx.IsCancelled());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800904
905 // Since cancellation is done before server reads any results, we know
906 // for sure that all cq results will return false from this point forward
907 expected_server_cq_result = false;
908 }
909
910 std::thread* server_try_cancel_thd = NULL;
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800911
912 auto verif = Verifier(GetParam());
913
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800914 if (server_try_cancel == CANCEL_DURING_PROCESSING) {
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800915 server_try_cancel_thd =
916 new std::thread(&ServerContext::TryCancel, &srv_ctx);
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800917 // Server will cancel the RPC in a parallel thread while reading the
918 // requests from the client. Since the cancellation can happen at anytime,
919 // some of the cq results (i.e those until cancellation) might be true but
920 // its non deterministic. So better to ignore the cq results
921 ignore_cq_result = true;
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800922 // Expect that we might possibly see the done tag that
923 // indicates cancellation completion in this case
924 want_done_tag = true;
925 verif.Expect(11, true);
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800926 }
927
928 // Server reads 3 messages (tags 6, 7 and 8)
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800929 // But if want_done_tag is true, we might also see tag 11
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800930 for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
931 srv_stream.Read(&recv_request, tag(tag_idx));
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800932 // Note that we'll add something to the verifier and verify that
933 // something was seen, but it might be tag 11 and not what we
934 // just added
935 int got_tag = verif.Expect(tag_idx, expected_server_cq_result)
936 .Next(cq_.get(), ignore_cq_result);
937 GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag));
938 if (got_tag == 11) {
939 EXPECT_TRUE(srv_ctx.IsCancelled());
940 want_done_tag = false;
941 // Now get the other entry that we were waiting on
942 EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), tag_idx);
943 }
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800944 }
945
946 if (server_try_cancel_thd != NULL) {
947 server_try_cancel_thd->join();
948 delete server_try_cancel_thd;
949 }
950
951 if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800952 srv_ctx.TryCancel();
953 want_done_tag = true;
954 verif.Expect(11, true);
955 }
956
957 if (want_done_tag) {
958 verif.Verify(cq_.get());
959 EXPECT_TRUE(srv_ctx.IsCancelled());
960 want_done_tag = false;
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800961 }
962
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -0800963 // The RPC has been cancelled at this point for sure (i.e irrespective of
964 // the value of `server_try_cancel` is). So, from this point forward, we
965 // know that cq results are supposed to return false on server.
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800966
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -0800967 // Server sends the final message and cancelled status (but the RPC is
968 // already cancelled at this point. So we expect the operation to fail)
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800969 srv_stream.Finish(send_response, Status::CANCELLED, tag(9));
970 Verifier(GetParam()).Expect(9, false).Verify(cq_.get());
971
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -0800972 // Client will see the cancellation
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800973 cli_stream->Finish(&recv_status, tag(10));
Sree Kuchibhotla369a04a2016-02-01 10:53:13 -0800974 // TODO(sreek): The expectation here should be true. This is a bug (github
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -0800975 // issue #4972)
Sree Kuchibhotla4fb59082016-01-29 11:16:24 -0800976 Verifier(GetParam()).Expect(10, false).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800977 EXPECT_FALSE(recv_status.ok());
978 EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code());
979 }
980
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -0800981 // Helper for testing server-streaming RPCs which are cancelled on the server.
982 // Depending on the value of server_try_cancel parameter, this will test one
983 // of the following three scenarios:
984 // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before sending
985 // any messages to the client
986 //
987 // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while sending
988 // messages to the client
989 //
990 // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after sending all
991 // messages to the client (but before sending any status back to the
992 // client)
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800993 void TestServerStreamingServerCancel(
994 ServerTryCancelRequestPhase server_try_cancel) {
995 ResetStub();
996
997 EchoRequest send_request;
998 EchoRequest recv_request;
999 EchoResponse send_response;
1000 EchoResponse recv_response;
1001 Status recv_status;
1002 ClientContext cli_ctx;
1003 ServerContext srv_ctx;
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001004 ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
1005
1006 send_request.set_message("Ping");
1007 // Initiate the 'ResponseStream' call on the client
1008 std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
Sree Kuchibhotla4fb59082016-01-29 11:16:24 -08001009 stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
1010 Verifier(GetParam()).Expect(1, true).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001011 // On the server, request to be notified of 'ResponseStream' calls and
1012 // receive the call just made by the client
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001013 srv_ctx.AsyncNotifyWhenDone(tag(11));
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001014 service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
1015 cq_.get(), cq_.get(), tag(2));
1016 Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
1017 EXPECT_EQ(send_request.message(), recv_request.message());
1018
1019 bool expected_cq_result = true;
1020 bool ignore_cq_result = false;
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001021 bool want_done_tag = false;
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001022
1023 if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001024 srv_ctx.TryCancel();
1025 Verifier(GetParam()).Expect(11, true).Verify(cq_.get());
1026 EXPECT_TRUE(srv_ctx.IsCancelled());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001027
1028 // We know for sure that all cq results will be false from this point
1029 // since the server cancelled the RPC
1030 expected_cq_result = false;
1031 }
1032
1033 std::thread* server_try_cancel_thd = NULL;
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001034
1035 auto verif = Verifier(GetParam());
1036
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001037 if (server_try_cancel == CANCEL_DURING_PROCESSING) {
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001038 server_try_cancel_thd =
1039 new std::thread(&ServerContext::TryCancel, &srv_ctx);
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001040
1041 // Server will cancel the RPC in a parallel thread while writing responses
1042 // to the client. Since the cancellation can happen at anytime, some of
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001043 // the cq results (i.e those until cancellation) might be true but it is
1044 // non deterministic. So better to ignore the cq results
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001045 ignore_cq_result = true;
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001046 // Expect that we might possibly see the done tag that
1047 // indicates cancellation completion in this case
1048 want_done_tag = true;
1049 verif.Expect(11, true);
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001050 }
1051
1052 // Server sends three messages (tags 3, 4 and 5)
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001053 // But if want_done tag is true, we might also see tag 11
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001054 for (int tag_idx = 3; tag_idx <= 5; tag_idx++) {
1055 send_response.set_message("Pong " + std::to_string(tag_idx));
1056 srv_stream.Write(send_response, tag(tag_idx));
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001057 // Note that we'll add something to the verifier and verify that
1058 // something was seen, but it might be tag 11 and not what we
1059 // just added
1060 int got_tag = verif.Expect(tag_idx, expected_cq_result)
1061 .Next(cq_.get(), ignore_cq_result);
1062 GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag));
1063 if (got_tag == 11) {
1064 EXPECT_TRUE(srv_ctx.IsCancelled());
1065 want_done_tag = false;
1066 // Now get the other entry that we were waiting on
1067 EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), tag_idx);
1068 }
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001069 }
1070
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001071 if (server_try_cancel_thd != NULL) {
1072 server_try_cancel_thd->join();
1073 delete server_try_cancel_thd;
1074 }
1075
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001076 if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001077 srv_ctx.TryCancel();
1078 want_done_tag = true;
1079 verif.Expect(11, true);
yang-gad0df7b2016-02-22 10:00:20 -08001080
1081 // Client reads may fail bacause it is notified that the stream is
1082 // cancelled.
1083 ignore_cq_result = true;
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001084 }
1085
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001086 if (want_done_tag) {
1087 verif.Verify(cq_.get());
1088 EXPECT_TRUE(srv_ctx.IsCancelled());
1089 want_done_tag = false;
1090 }
1091
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001092 // Client attemts to read the three messages from the server
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001093 for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
1094 cli_stream->Read(&recv_response, tag(tag_idx));
1095 Verifier(GetParam())
1096 .Expect(tag_idx, expected_cq_result)
Sree Kuchibhotla4fb59082016-01-29 11:16:24 -08001097 .Verify(cq_.get(), ignore_cq_result);
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001098 }
1099
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001100 // The RPC has been cancelled at this point for sure (i.e irrespective of
1101 // the value of `server_try_cancel` is). So, from this point forward, we
1102 // know that cq results are supposed to return false on server.
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001103
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001104 // Server finishes the stream (but the RPC is already cancelled)
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001105 srv_stream.Finish(Status::CANCELLED, tag(9));
1106 Verifier(GetParam()).Expect(9, false).Verify(cq_.get());
1107
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001108 // Client will see the cancellation
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001109 cli_stream->Finish(&recv_status, tag(10));
Sree Kuchibhotla4fb59082016-01-29 11:16:24 -08001110 Verifier(GetParam()).Expect(10, true).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001111 EXPECT_FALSE(recv_status.ok());
1112 EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code());
1113 }
1114
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001115 // Helper for testing bidirectinal-streaming RPCs which are cancelled on the
1116 // server.
1117 //
1118 // Depending on the value of server_try_cancel parameter, this will
1119 // test one of the following three scenarios:
1120 // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading/
1121 // writing any messages from/to the client
1122 //
1123 // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading
1124 // messages from the client
1125 //
1126 // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all
1127 // messages from the client (but before sending any status back to the
1128 // client)
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001129 void TestBidiStreamingServerCancel(
1130 ServerTryCancelRequestPhase server_try_cancel) {
1131 ResetStub();
1132
1133 EchoRequest send_request;
1134 EchoRequest recv_request;
1135 EchoResponse send_response;
1136 EchoResponse recv_response;
1137 Status recv_status;
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001138 ClientContext cli_ctx;
1139 ServerContext srv_ctx;
1140 ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
1141
1142 // Initiate the call from the client side
1143 std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
Sree Kuchibhotla4fb59082016-01-29 11:16:24 -08001144 cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
1145 Verifier(GetParam()).Expect(1, true).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001146
1147 // On the server, request to be notified of the 'BidiStream' call and
1148 // receive the call just made by the client
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001149 srv_ctx.AsyncNotifyWhenDone(tag(11));
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001150 service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
1151 tag(2));
1152 Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
1153
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001154 // Client sends the first and the only message
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001155 send_request.set_message("Ping");
1156 cli_stream->Write(send_request, tag(3));
Sree Kuchibhotla4fb59082016-01-29 11:16:24 -08001157 Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001158
1159 bool expected_cq_result = true;
1160 bool ignore_cq_result = false;
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001161 bool want_done_tag = false;
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001162
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001163 if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001164 srv_ctx.TryCancel();
1165 Verifier(GetParam()).Expect(11, true).Verify(cq_.get());
1166 EXPECT_TRUE(srv_ctx.IsCancelled());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001167
1168 // We know for sure that all cq results will be false from this point
1169 // since the server cancelled the RPC
1170 expected_cq_result = false;
1171 }
1172
1173 std::thread* server_try_cancel_thd = NULL;
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001174
1175 auto verif = Verifier(GetParam());
1176
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001177 if (server_try_cancel == CANCEL_DURING_PROCESSING) {
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001178 server_try_cancel_thd =
1179 new std::thread(&ServerContext::TryCancel, &srv_ctx);
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001180
1181 // Since server is going to cancel the RPC in a parallel thread, some of
1182 // the cq results (i.e those until the cancellation) might be true. Since
1183 // that number is non-deterministic, it is better to ignore the cq results
1184 ignore_cq_result = true;
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001185 // Expect that we might possibly see the done tag that
1186 // indicates cancellation completion in this case
1187 want_done_tag = true;
1188 verif.Expect(11, true);
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001189 }
1190
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001191 int got_tag;
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001192 srv_stream.Read(&recv_request, tag(4));
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001193 verif.Expect(4, expected_cq_result);
1194 got_tag = verif.Next(cq_.get(), ignore_cq_result);
1195 GPR_ASSERT((got_tag == 4) || (got_tag == 11 && want_done_tag));
1196 if (got_tag == 11) {
1197 EXPECT_TRUE(srv_ctx.IsCancelled());
1198 want_done_tag = false;
1199 // Now get the other entry that we were waiting on
1200 EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 4);
1201 }
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001202
1203 send_response.set_message("Pong");
1204 srv_stream.Write(send_response, tag(5));
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001205 verif.Expect(5, expected_cq_result);
1206 got_tag = verif.Next(cq_.get(), ignore_cq_result);
1207 GPR_ASSERT((got_tag == 5) || (got_tag == 11 && want_done_tag));
1208 if (got_tag == 11) {
1209 EXPECT_TRUE(srv_ctx.IsCancelled());
1210 want_done_tag = false;
1211 // Now get the other entry that we were waiting on
1212 EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 5);
1213 }
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001214
1215 cli_stream->Read(&recv_response, tag(6));
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001216 verif.Expect(6, expected_cq_result);
1217 got_tag = verif.Next(cq_.get(), ignore_cq_result);
1218 GPR_ASSERT((got_tag == 6) || (got_tag == 11 && want_done_tag));
1219 if (got_tag == 11) {
1220 EXPECT_TRUE(srv_ctx.IsCancelled());
1221 want_done_tag = false;
1222 // Now get the other entry that we were waiting on
1223 EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 6);
1224 }
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001225
1226 // This is expected to succeed in all cases
1227 cli_stream->WritesDone(tag(7));
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001228 verif.Expect(7, true);
1229 got_tag = verif.Next(cq_.get(), ignore_cq_result);
1230 GPR_ASSERT((got_tag == 7) || (got_tag == 11 && want_done_tag));
1231 if (got_tag == 11) {
1232 EXPECT_TRUE(srv_ctx.IsCancelled());
1233 want_done_tag = false;
1234 // Now get the other entry that we were waiting on
1235 EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 7);
1236 }
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001237
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001238 // This is expected to fail in all cases i.e for all values of
Vijay Pai018879a2016-02-16 09:20:50 -08001239 // server_try_cancel. This is because at this point, either there are no
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001240 // more msgs from the client (because client called WritesDone) or the RPC
1241 // is cancelled on the server
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001242 srv_stream.Read(&recv_request, tag(8));
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001243 verif.Expect(8, false);
1244 got_tag = verif.Next(cq_.get(), ignore_cq_result);
1245 GPR_ASSERT((got_tag == 8) || (got_tag == 11 && want_done_tag));
1246 if (got_tag == 11) {
1247 EXPECT_TRUE(srv_ctx.IsCancelled());
1248 want_done_tag = false;
1249 // Now get the other entry that we were waiting on
1250 EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 8);
1251 }
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001252
1253 if (server_try_cancel_thd != NULL) {
1254 server_try_cancel_thd->join();
1255 delete server_try_cancel_thd;
1256 }
1257
1258 if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001259 srv_ctx.TryCancel();
1260 want_done_tag = true;
1261 verif.Expect(11, true);
1262 }
1263
1264 if (want_done_tag) {
1265 verif.Verify(cq_.get());
1266 EXPECT_TRUE(srv_ctx.IsCancelled());
1267 want_done_tag = false;
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001268 }
1269
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001270 // The RPC has been cancelled at this point for sure (i.e irrespective of
1271 // the value of `server_try_cancel` is). So, from this point forward, we
1272 // know that cq results are supposed to return false on server.
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001273
1274 srv_stream.Finish(Status::CANCELLED, tag(9));
1275 Verifier(GetParam()).Expect(9, false).Verify(cq_.get());
1276
1277 cli_stream->Finish(&recv_status, tag(10));
Sree Kuchibhotla4fb59082016-01-29 11:16:24 -08001278 Verifier(GetParam()).Expect(10, true).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001279 EXPECT_FALSE(recv_status.ok());
1280 EXPECT_EQ(grpc::StatusCode::CANCELLED, recv_status.error_code());
1281 }
1282};
1283
1284TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelBefore) {
1285 TestClientStreamingServerCancel(CANCEL_BEFORE_PROCESSING);
1286}
1287
1288TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelDuring) {
1289 TestClientStreamingServerCancel(CANCEL_DURING_PROCESSING);
1290}
1291
1292TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelAfter) {
1293 TestClientStreamingServerCancel(CANCEL_AFTER_PROCESSING);
1294}
1295
1296TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelBefore) {
1297 TestServerStreamingServerCancel(CANCEL_BEFORE_PROCESSING);
1298}
1299
1300TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelDuring) {
1301 TestServerStreamingServerCancel(CANCEL_DURING_PROCESSING);
1302}
1303
1304TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelAfter) {
1305 TestServerStreamingServerCancel(CANCEL_AFTER_PROCESSING);
1306}
1307
1308TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelBefore) {
1309 TestBidiStreamingServerCancel(CANCEL_BEFORE_PROCESSING);
1310}
1311
1312TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelDuring) {
1313 TestBidiStreamingServerCancel(CANCEL_DURING_PROCESSING);
1314}
1315
1316TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelAfter) {
1317 TestBidiStreamingServerCancel(CANCEL_AFTER_PROCESSING);
1318}
1319
Craig Tiller4c06b822015-08-06 08:41:31 -07001320INSTANTIATE_TEST_CASE_P(AsyncEnd2end, AsyncEnd2endTest,
1321 ::testing::Values(false, true));
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001322INSTANTIATE_TEST_CASE_P(AsyncEnd2endServerTryCancel,
1323 AsyncEnd2endServerTryCancelTest,
1324 ::testing::Values(false));
Craig Tiller69f90e62015-08-06 08:32:35 -07001325
Craig Tiller0220cf12015-02-12 17:39:26 -08001326} // namespace
1327} // namespace testing
1328} // namespace grpc
1329
1330int main(int argc, char** argv) {
1331 grpc_test_init(argc, argv);
Vijay Paib65eda42016-02-16 13:48:05 -08001332 gpr_tls_init(&g_is_async_end2end_test);
Craig Tiller0220cf12015-02-12 17:39:26 -08001333 ::testing::InitGoogleTest(&argc, argv);
Vijay Paib65eda42016-02-16 13:48:05 -08001334 int ret = RUN_ALL_TESTS();
1335 gpr_tls_destroy(&g_is_async_end2end_test);
1336 return ret;
Craig Tiller0220cf12015-02-12 17:39:26 -08001337}