blob: 38f1311eb71f782ed8c2a8c90904f63c8f128a58 [file] [log] [blame]
Craig Tiller0220cf12015-02-12 17:39:26 -08001/*
2 *
Craig Tiller6169d5f2016-03-31 07:46:18 -07003 * Copyright 2015, Google Inc.
Craig Tiller0220cf12015-02-12 17:39:26 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Yang Gaoda699b82015-02-18 01:10:22 -080034#include <memory>
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -080035#include <thread>
Craig Tiller0220cf12015-02-12 17:39:26 -080036
yang-g8c2be9f2015-08-19 16:28:09 -070037#include <grpc++/channel.h>
Craig Tiller0220cf12015-02-12 17:39:26 -080038#include <grpc++/client_context.h>
39#include <grpc++/create_channel.h>
Craig Tiller0220cf12015-02-12 17:39:26 -080040#include <grpc++/server.h>
41#include <grpc++/server_builder.h>
42#include <grpc++/server_context.h>
Sree Kuchibhotlab0d0c8e2016-01-13 22:52:17 -080043#include <grpc/grpc.h>
44#include <grpc/support/thd.h>
45#include <grpc/support/time.h>
Vijay Paib65eda42016-02-16 13:48:05 -080046#include <grpc/support/tls.h>
Craig Tiller0220cf12015-02-12 17:39:26 -080047#include <gtest/gtest.h>
48
Craig Tiller1b4e3302015-12-17 16:35:00 -080049#include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
50#include "src/proto/grpc/testing/echo.grpc.pb.h"
Sree Kuchibhotlab0d0c8e2016-01-13 22:52:17 -080051#include "test/core/util/port.h"
52#include "test/core/util/test_config.h"
yang-ge21908f2015-08-25 13:47:51 -070053#include "test/cpp/util/string_ref_helper.h"
Craig Tiller0220cf12015-02-12 17:39:26 -080054
Craig Tiller69f90e62015-08-06 08:32:35 -070055#ifdef GPR_POSIX_SOCKET
Craig Tiller9533d042016-03-25 17:11:06 -070056#include "src/core/lib/iomgr/pollset_posix.h"
Craig Tiller69f90e62015-08-06 08:32:35 -070057#endif
58
Craig Tiller1b4e3302015-12-17 16:35:00 -080059using grpc::testing::EchoRequest;
60using grpc::testing::EchoResponse;
Craig Tiller0220cf12015-02-12 17:39:26 -080061using std::chrono::system_clock;
62
Vijay Paib65eda42016-02-16 13:48:05 -080063GPR_TLS_DECL(g_is_async_end2end_test);
64
Craig Tiller0220cf12015-02-12 17:39:26 -080065namespace grpc {
66namespace testing {
67
68namespace {
69
Craig Tiller7536af02015-12-22 13:49:30 -080070void* tag(int i) { return (void*)(intptr_t)i; }
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -080071int detag(void* p) { return static_cast<int>(reinterpret_cast<intptr_t>(p)); }
Yang Gaoc05b6cb2015-02-13 00:34:10 -080072
Craig Tiller69f90e62015-08-06 08:32:35 -070073#ifdef GPR_POSIX_SOCKET
Vijay Paib65eda42016-02-16 13:48:05 -080074static int maybe_assert_non_blocking_poll(struct pollfd* pfds, nfds_t nfds,
75 int timeout) {
76 if (gpr_tls_get(&g_is_async_end2end_test)) {
77 GPR_ASSERT(timeout == 0);
78 }
79 return poll(pfds, nfds, timeout);
Craig Tiller69f90e62015-08-06 08:32:35 -070080}
81
82class PollOverride {
Craig Tiller06cf3cc2015-05-13 13:11:01 -070083 public:
Craig Tiller69f90e62015-08-06 08:32:35 -070084 PollOverride(grpc_poll_function_type f) {
85 prev_ = grpc_poll_function;
86 grpc_poll_function = f;
87 }
88
Craig Tiller4c06b822015-08-06 08:41:31 -070089 ~PollOverride() { grpc_poll_function = prev_; }
Craig Tiller69f90e62015-08-06 08:32:35 -070090
91 private:
92 grpc_poll_function_type prev_;
93};
94
vjpaicf4daeb2016-02-15 02:33:54 -080095class PollingOverrider : public PollOverride {
Craig Tiller69f90e62015-08-06 08:32:35 -070096 public:
vjpaicf4daeb2016-02-15 02:33:54 -080097 explicit PollingOverrider(bool allow_blocking)
Vijay Paib65eda42016-02-16 13:48:05 -080098 : PollOverride(allow_blocking ? poll : maybe_assert_non_blocking_poll) {}
Craig Tiller69f90e62015-08-06 08:32:35 -070099};
100#else
vjpaicf4daeb2016-02-15 02:33:54 -0800101class PollingOverrider {
Craig Tiller69f90e62015-08-06 08:32:35 -0700102 public:
vjpaicf4daeb2016-02-15 02:33:54 -0800103 explicit PollingOverrider(bool allow_blocking) {}
Craig Tiller69f90e62015-08-06 08:32:35 -0700104};
105#endif
106
vjpaicf4daeb2016-02-15 02:33:54 -0800107class Verifier {
Craig Tiller69f90e62015-08-06 08:32:35 -0700108 public:
vjpaicf4daeb2016-02-15 02:33:54 -0800109 explicit Verifier(bool spin) : spin_(spin) {}
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800110 // Expect sets the expected ok value for a specific tag
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700111 Verifier& Expect(int i, bool expect_ok) {
112 expectations_[tag(i)] = expect_ok;
113 return *this;
vjpai7aadf462015-03-16 23:58:44 -0700114 }
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800115
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800116 // Next waits for 1 async tag to complete, checks its
117 // expectations, and returns the tag
118 int Next(CompletionQueue* cq, bool ignore_ok) {
119 bool ok;
120 void* got_tag;
121 if (spin_) {
122 for (;;) {
123 auto r = cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME));
124 if (r == CompletionQueue::TIMEOUT) continue;
125 if (r == CompletionQueue::GOT_EVENT) break;
126 gpr_log(GPR_ERROR, "unexpected result from AsyncNext");
127 abort();
128 }
129 } else {
130 EXPECT_TRUE(cq->Next(&got_tag, &ok));
131 }
132 auto it = expectations_.find(got_tag);
133 EXPECT_TRUE(it != expectations_.end());
134 if (!ignore_ok) {
135 EXPECT_EQ(it->second, ok);
136 }
137 expectations_.erase(it);
138 return detag(got_tag);
139 }
140
141 // Verify keeps calling Next until all currently set
142 // expected tags are complete
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800143 void Verify(CompletionQueue* cq) { Verify(cq, false); }
144
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800145 // This version of Verify allows optionally ignoring the
146 // outcome of the expectation
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800147 void Verify(CompletionQueue* cq, bool ignore_ok) {
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700148 GPR_ASSERT(!expectations_.empty());
149 while (!expectations_.empty()) {
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800150 Next(cq, ignore_ok);
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700151 }
152 }
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800153 // This version of Verify stops after a certain deadline
Craig Tillerd6c98df2015-08-18 09:33:44 -0700154 void Verify(CompletionQueue* cq,
155 std::chrono::system_clock::time_point deadline) {
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700156 if (expectations_.empty()) {
157 bool ok;
Craig Tillerd6c98df2015-08-18 09:33:44 -0700158 void* got_tag;
Craig Tiller69f90e62015-08-06 08:32:35 -0700159 if (spin_) {
160 while (std::chrono::system_clock::now() < deadline) {
Craig Tiller4c06b822015-08-06 08:41:31 -0700161 EXPECT_EQ(
162 cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME)),
163 CompletionQueue::TIMEOUT);
Craig Tiller69f90e62015-08-06 08:32:35 -0700164 }
165 } else {
Craig Tiller4c06b822015-08-06 08:41:31 -0700166 EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
167 CompletionQueue::TIMEOUT);
Craig Tiller69f90e62015-08-06 08:32:35 -0700168 }
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700169 } else {
170 while (!expectations_.empty()) {
171 bool ok;
Craig Tillerd6c98df2015-08-18 09:33:44 -0700172 void* got_tag;
Craig Tiller69f90e62015-08-06 08:32:35 -0700173 if (spin_) {
174 for (;;) {
175 GPR_ASSERT(std::chrono::system_clock::now() < deadline);
Craig Tiller4c06b822015-08-06 08:41:31 -0700176 auto r =
177 cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME));
Craig Tiller69f90e62015-08-06 08:32:35 -0700178 if (r == CompletionQueue::TIMEOUT) continue;
179 if (r == CompletionQueue::GOT_EVENT) break;
180 gpr_log(GPR_ERROR, "unexpected result from AsyncNext");
181 abort();
Craig Tiller4c06b822015-08-06 08:41:31 -0700182 }
Craig Tiller69f90e62015-08-06 08:32:35 -0700183 } else {
Craig Tiller4c06b822015-08-06 08:41:31 -0700184 EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
185 CompletionQueue::GOT_EVENT);
Craig Tiller69f90e62015-08-06 08:32:35 -0700186 }
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700187 auto it = expectations_.find(got_tag);
188 EXPECT_TRUE(it != expectations_.end());
189 EXPECT_EQ(it->second, ok);
190 expectations_.erase(it);
191 }
192 }
193 }
194
195 private:
196 std::map<void*, bool> expectations_;
Craig Tiller69f90e62015-08-06 08:32:35 -0700197 bool spin_;
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700198};
vjpai7aadf462015-03-16 23:58:44 -0700199
Craig Tiller69f90e62015-08-06 08:32:35 -0700200class AsyncEnd2endTest : public ::testing::TestWithParam<bool> {
Craig Tiller0220cf12015-02-12 17:39:26 -0800201 protected:
Vijay Pai018879a2016-02-16 09:20:50 -0800202 AsyncEnd2endTest() {}
Craig Tiller0220cf12015-02-12 17:39:26 -0800203
Craig Tillercf133f42015-02-26 14:05:56 -0800204 void SetUp() GRPC_OVERRIDE {
Vijay Pai018879a2016-02-16 09:20:50 -0800205 poll_overrider_.reset(new PollingOverrider(!GetParam()));
206
Craig Tiller0220cf12015-02-12 17:39:26 -0800207 int port = grpc_pick_unused_port_or_die();
208 server_address_ << "localhost:" << port;
vjpai017ed622015-12-09 10:42:54 -0800209
Craig Tiller0220cf12015-02-12 17:39:26 -0800210 // Setup server
211 ServerBuilder builder;
Craig Tillerd6c98df2015-08-18 09:33:44 -0700212 builder.AddListeningPort(server_address_.str(),
213 grpc::InsecureServerCredentials());
Craig Tiller15f383c2016-01-07 12:45:32 -0800214 builder.RegisterService(&service_);
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700215 cq_ = builder.AddCompletionQueue();
Craig Tiller0220cf12015-02-12 17:39:26 -0800216 server_ = builder.BuildAndStart();
Vijay Paib65eda42016-02-16 13:48:05 -0800217
218 gpr_tls_set(&g_is_async_end2end_test, 1);
Craig Tiller0220cf12015-02-12 17:39:26 -0800219 }
220
Craig Tillercf133f42015-02-26 14:05:56 -0800221 void TearDown() GRPC_OVERRIDE {
Craig Tiller492968f2015-02-18 13:14:03 -0800222 server_->Shutdown();
223 void* ignored_tag;
224 bool ignored_ok;
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700225 cq_->Shutdown();
226 while (cq_->Next(&ignored_tag, &ignored_ok))
Craig Tiller492968f2015-02-18 13:14:03 -0800227 ;
Vijay Pai018879a2016-02-16 09:20:50 -0800228 poll_overrider_.reset();
Vijay Paib65eda42016-02-16 13:48:05 -0800229 gpr_tls_set(&g_is_async_end2end_test, 0);
Craig Tiller492968f2015-02-18 13:14:03 -0800230 }
Craig Tiller0220cf12015-02-12 17:39:26 -0800231
232 void ResetStub() {
yang-g730055d2015-08-27 12:29:45 -0700233 std::shared_ptr<Channel> channel =
Julien Boeufe5adc0e2015-10-12 14:08:10 -0700234 CreateChannel(server_address_.str(), InsecureChannelCredentials());
Sree Kuchibhotla5a05f512016-01-13 22:43:20 -0800235 stub_ = grpc::testing::EchoTestService::NewStub(channel);
Craig Tiller0220cf12015-02-12 17:39:26 -0800236 }
237
Yang Gao406b32f2015-02-13 16:25:33 -0800238 void SendRpc(int num_rpcs) {
239 for (int i = 0; i < num_rpcs; i++) {
240 EchoRequest send_request;
241 EchoRequest recv_request;
242 EchoResponse send_response;
243 EchoResponse recv_response;
244 Status recv_status;
245
246 ClientContext cli_ctx;
247 ServerContext srv_ctx;
248 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
249
250 send_request.set_message("Hello");
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800251 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700252 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
Yang Gao406b32f2015-02-13 16:25:33 -0800253
Craig Tillerd6c98df2015-08-18 09:33:44 -0700254 service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
255 cq_.get(), tag(2));
Yang Gao406b32f2015-02-13 16:25:33 -0800256
Craig Tiller69f90e62015-08-06 08:32:35 -0700257 Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
Yang Gao406b32f2015-02-13 16:25:33 -0800258 EXPECT_EQ(send_request.message(), recv_request.message());
259
260 send_response.set_message(recv_request.message());
261 response_writer.Finish(send_response, Status::OK, tag(3));
Craig Tiller69f90e62015-08-06 08:32:35 -0700262 Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
Yang Gao406b32f2015-02-13 16:25:33 -0800263
Yang Gao3a5e5492015-02-18 14:32:38 -0800264 response_reader->Finish(&recv_response, &recv_status, tag(4));
Craig Tiller69f90e62015-08-06 08:32:35 -0700265 Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
Yang Gao406b32f2015-02-13 16:25:33 -0800266
267 EXPECT_EQ(send_response.message(), recv_response.message());
Yang Gaoc1a2c312015-06-16 10:59:46 -0700268 EXPECT_TRUE(recv_status.ok());
Yang Gao406b32f2015-02-13 16:25:33 -0800269 }
270 }
271
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700272 std::unique_ptr<ServerCompletionQueue> cq_;
Sree Kuchibhotla5a05f512016-01-13 22:43:20 -0800273 std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
Craig Tiller0220cf12015-02-12 17:39:26 -0800274 std::unique_ptr<Server> server_;
Sree Kuchibhotla5a05f512016-01-13 22:43:20 -0800275 grpc::testing::EchoTestService::AsyncService service_;
Craig Tiller0220cf12015-02-12 17:39:26 -0800276 std::ostringstream server_address_;
vjpaicf4daeb2016-02-15 02:33:54 -0800277
Vijay Pai018879a2016-02-16 09:20:50 -0800278 std::unique_ptr<PollingOverrider> poll_overrider_;
Craig Tiller0220cf12015-02-12 17:39:26 -0800279};
280
Craig Tiller69f90e62015-08-06 08:32:35 -0700281TEST_P(AsyncEnd2endTest, SimpleRpc) {
Craig Tiller0220cf12015-02-12 17:39:26 -0800282 ResetStub();
Yang Gao406b32f2015-02-13 16:25:33 -0800283 SendRpc(1);
284}
Yang Gaobb84a302015-02-12 23:30:12 -0800285
Craig Tiller69f90e62015-08-06 08:32:35 -0700286TEST_P(AsyncEnd2endTest, SequentialRpcs) {
Yang Gao406b32f2015-02-13 16:25:33 -0800287 ResetStub();
288 SendRpc(10);
Craig Tiller0220cf12015-02-12 17:39:26 -0800289}
290
vjpai7aadf462015-03-16 23:58:44 -0700291// Test a simple RPC using the async version of Next
Craig Tiller69f90e62015-08-06 08:32:35 -0700292TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
vjpai7aadf462015-03-16 23:58:44 -0700293 ResetStub();
294
295 EchoRequest send_request;
296 EchoRequest recv_request;
297 EchoResponse send_response;
298 EchoResponse recv_response;
299 Status recv_status;
300
301 ClientContext cli_ctx;
302 ServerContext srv_ctx;
303 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
304
305 send_request.set_message("Hello");
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800306 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700307 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
vjpai7aadf462015-03-16 23:58:44 -0700308
Yang Gao757afae2015-03-17 15:49:26 -0700309 std::chrono::system_clock::time_point time_now(
Craig Tillerf51199f2015-05-08 09:32:53 -0700310 std::chrono::system_clock::now());
311 std::chrono::system_clock::time_point time_limit(
312 std::chrono::system_clock::now() + std::chrono::seconds(10));
Craig Tiller69f90e62015-08-06 08:32:35 -0700313 Verifier(GetParam()).Verify(cq_.get(), time_now);
314 Verifier(GetParam()).Verify(cq_.get(), time_now);
vjpai7aadf462015-03-16 23:58:44 -0700315
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700316 service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
317 cq_.get(), tag(2));
vjpai7aadf462015-03-16 23:58:44 -0700318
Craig Tiller69f90e62015-08-06 08:32:35 -0700319 Verifier(GetParam()).Expect(2, true).Verify(cq_.get(), time_limit);
vjpai7aadf462015-03-16 23:58:44 -0700320 EXPECT_EQ(send_request.message(), recv_request.message());
vjpai7aadf462015-03-16 23:58:44 -0700321
322 send_response.set_message(recv_request.message());
323 response_writer.Finish(send_response, Status::OK, tag(3));
Craig Tiller4c06b822015-08-06 08:41:31 -0700324 Verifier(GetParam())
325 .Expect(3, true)
326 .Verify(cq_.get(), std::chrono::system_clock::time_point::max());
vjpai7aadf462015-03-16 23:58:44 -0700327
328 response_reader->Finish(&recv_response, &recv_status, tag(4));
Craig Tiller4c06b822015-08-06 08:41:31 -0700329 Verifier(GetParam())
330 .Expect(4, true)
331 .Verify(cq_.get(), std::chrono::system_clock::time_point::max());
vjpai7aadf462015-03-16 23:58:44 -0700332
333 EXPECT_EQ(send_response.message(), recv_response.message());
Yang Gaoc1a2c312015-06-16 10:59:46 -0700334 EXPECT_TRUE(recv_status.ok());
vjpai7aadf462015-03-16 23:58:44 -0700335}
Yang Gao757afae2015-03-17 15:49:26 -0700336
Yang Gao0e0d8e12015-02-13 14:40:41 -0800337// Two pings and a final pong.
Craig Tiller69f90e62015-08-06 08:32:35 -0700338TEST_P(AsyncEnd2endTest, SimpleClientStreaming) {
Yang Gao005f18a2015-02-13 10:22:33 -0800339 ResetStub();
340
341 EchoRequest send_request;
342 EchoRequest recv_request;
343 EchoResponse send_response;
344 EchoResponse recv_response;
345 Status recv_status;
346 ClientContext cli_ctx;
347 ServerContext srv_ctx;
348 ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
349
350 send_request.set_message("Hello");
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800351 std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700352 stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
Yang Gao005f18a2015-02-13 10:22:33 -0800353
Craig Tillerd6c98df2015-08-18 09:33:44 -0700354 service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
355 tag(2));
Yang Gao005f18a2015-02-13 10:22:33 -0800356
Craig Tiller69f90e62015-08-06 08:32:35 -0700357 Verifier(GetParam()).Expect(2, true).Expect(1, true).Verify(cq_.get());
Yang Gao005f18a2015-02-13 10:22:33 -0800358
359 cli_stream->Write(send_request, tag(3));
Craig Tiller69f90e62015-08-06 08:32:35 -0700360 Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
Yang Gao005f18a2015-02-13 10:22:33 -0800361
362 srv_stream.Read(&recv_request, tag(4));
Craig Tiller69f90e62015-08-06 08:32:35 -0700363 Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
Yang Gao005f18a2015-02-13 10:22:33 -0800364 EXPECT_EQ(send_request.message(), recv_request.message());
365
366 cli_stream->Write(send_request, tag(5));
Craig Tiller69f90e62015-08-06 08:32:35 -0700367 Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
Yang Gao005f18a2015-02-13 10:22:33 -0800368
369 srv_stream.Read(&recv_request, tag(6));
Craig Tiller69f90e62015-08-06 08:32:35 -0700370 Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
Yang Gao005f18a2015-02-13 10:22:33 -0800371
372 EXPECT_EQ(send_request.message(), recv_request.message());
373 cli_stream->WritesDone(tag(7));
Craig Tiller69f90e62015-08-06 08:32:35 -0700374 Verifier(GetParam()).Expect(7, true).Verify(cq_.get());
Yang Gao005f18a2015-02-13 10:22:33 -0800375
376 srv_stream.Read(&recv_request, tag(8));
Craig Tiller69f90e62015-08-06 08:32:35 -0700377 Verifier(GetParam()).Expect(8, false).Verify(cq_.get());
Yang Gao005f18a2015-02-13 10:22:33 -0800378
379 send_response.set_message(recv_request.message());
380 srv_stream.Finish(send_response, Status::OK, tag(9));
Craig Tiller69f90e62015-08-06 08:32:35 -0700381 Verifier(GetParam()).Expect(9, true).Verify(cq_.get());
Yang Gao005f18a2015-02-13 10:22:33 -0800382
383 cli_stream->Finish(&recv_status, tag(10));
Craig Tiller69f90e62015-08-06 08:32:35 -0700384 Verifier(GetParam()).Expect(10, true).Verify(cq_.get());
Yang Gao005f18a2015-02-13 10:22:33 -0800385
386 EXPECT_EQ(send_response.message(), recv_response.message());
Yang Gaoc1a2c312015-06-16 10:59:46 -0700387 EXPECT_TRUE(recv_status.ok());
Yang Gao005f18a2015-02-13 10:22:33 -0800388}
389
Yang Gao0e0d8e12015-02-13 14:40:41 -0800390// One ping, two pongs.
Craig Tiller69f90e62015-08-06 08:32:35 -0700391TEST_P(AsyncEnd2endTest, SimpleServerStreaming) {
Yang Gao0e0d8e12015-02-13 14:40:41 -0800392 ResetStub();
393
394 EchoRequest send_request;
395 EchoRequest recv_request;
396 EchoResponse send_response;
397 EchoResponse recv_response;
398 Status recv_status;
399 ClientContext cli_ctx;
400 ServerContext srv_ctx;
401 ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
402
403 send_request.set_message("Hello");
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800404 std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700405 stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
Yang Gao0e0d8e12015-02-13 14:40:41 -0800406
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700407 service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700408 cq_.get(), cq_.get(), tag(2));
Yang Gao0e0d8e12015-02-13 14:40:41 -0800409
Craig Tiller69f90e62015-08-06 08:32:35 -0700410 Verifier(GetParam()).Expect(1, true).Expect(2, true).Verify(cq_.get());
Yang Gao0e0d8e12015-02-13 14:40:41 -0800411 EXPECT_EQ(send_request.message(), recv_request.message());
412
413 send_response.set_message(recv_request.message());
414 srv_stream.Write(send_response, tag(3));
Craig Tiller69f90e62015-08-06 08:32:35 -0700415 Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
Yang Gao0e0d8e12015-02-13 14:40:41 -0800416
417 cli_stream->Read(&recv_response, tag(4));
Craig Tiller69f90e62015-08-06 08:32:35 -0700418 Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
Yang Gao0e0d8e12015-02-13 14:40:41 -0800419 EXPECT_EQ(send_response.message(), recv_response.message());
420
421 srv_stream.Write(send_response, tag(5));
Craig Tiller69f90e62015-08-06 08:32:35 -0700422 Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
Yang Gao0e0d8e12015-02-13 14:40:41 -0800423
424 cli_stream->Read(&recv_response, tag(6));
Craig Tiller69f90e62015-08-06 08:32:35 -0700425 Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
Yang Gao0e0d8e12015-02-13 14:40:41 -0800426 EXPECT_EQ(send_response.message(), recv_response.message());
427
428 srv_stream.Finish(Status::OK, tag(7));
Craig Tiller69f90e62015-08-06 08:32:35 -0700429 Verifier(GetParam()).Expect(7, true).Verify(cq_.get());
Yang Gao0e0d8e12015-02-13 14:40:41 -0800430
431 cli_stream->Read(&recv_response, tag(8));
Craig Tiller69f90e62015-08-06 08:32:35 -0700432 Verifier(GetParam()).Expect(8, false).Verify(cq_.get());
Yang Gao0e0d8e12015-02-13 14:40:41 -0800433
434 cli_stream->Finish(&recv_status, tag(9));
Craig Tiller69f90e62015-08-06 08:32:35 -0700435 Verifier(GetParam()).Expect(9, true).Verify(cq_.get());
Yang Gao0e0d8e12015-02-13 14:40:41 -0800436
Yang Gaoc1a2c312015-06-16 10:59:46 -0700437 EXPECT_TRUE(recv_status.ok());
Yang Gao0e0d8e12015-02-13 14:40:41 -0800438}
439
440// One ping, one pong.
Craig Tiller69f90e62015-08-06 08:32:35 -0700441TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) {
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800442 ResetStub();
443
444 EchoRequest send_request;
445 EchoRequest recv_request;
446 EchoResponse send_response;
447 EchoResponse recv_response;
448 Status recv_status;
449 ClientContext cli_ctx;
450 ServerContext srv_ctx;
451 ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
452
453 send_request.set_message("Hello");
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800454 std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700455 cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800456
Craig Tillerd6c98df2015-08-18 09:33:44 -0700457 service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
458 tag(2));
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800459
Craig Tiller69f90e62015-08-06 08:32:35 -0700460 Verifier(GetParam()).Expect(1, true).Expect(2, true).Verify(cq_.get());
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800461
462 cli_stream->Write(send_request, tag(3));
Craig Tiller69f90e62015-08-06 08:32:35 -0700463 Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800464
465 srv_stream.Read(&recv_request, tag(4));
Craig Tiller69f90e62015-08-06 08:32:35 -0700466 Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800467 EXPECT_EQ(send_request.message(), recv_request.message());
468
469 send_response.set_message(recv_request.message());
470 srv_stream.Write(send_response, tag(5));
Craig Tiller69f90e62015-08-06 08:32:35 -0700471 Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800472
473 cli_stream->Read(&recv_response, tag(6));
Craig Tiller69f90e62015-08-06 08:32:35 -0700474 Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800475 EXPECT_EQ(send_response.message(), recv_response.message());
476
477 cli_stream->WritesDone(tag(7));
Craig Tiller69f90e62015-08-06 08:32:35 -0700478 Verifier(GetParam()).Expect(7, true).Verify(cq_.get());
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800479
480 srv_stream.Read(&recv_request, tag(8));
Craig Tiller69f90e62015-08-06 08:32:35 -0700481 Verifier(GetParam()).Expect(8, false).Verify(cq_.get());
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800482
483 srv_stream.Finish(Status::OK, tag(9));
Craig Tiller69f90e62015-08-06 08:32:35 -0700484 Verifier(GetParam()).Expect(9, true).Verify(cq_.get());
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800485
486 cli_stream->Finish(&recv_status, tag(10));
Craig Tiller69f90e62015-08-06 08:32:35 -0700487 Verifier(GetParam()).Expect(10, true).Verify(cq_.get());
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800488
Yang Gaoc1a2c312015-06-16 10:59:46 -0700489 EXPECT_TRUE(recv_status.ok());
Yang Gaoc05b6cb2015-02-13 00:34:10 -0800490}
491
Yang Gao406b32f2015-02-13 16:25:33 -0800492// Metadata tests
Craig Tiller69f90e62015-08-06 08:32:35 -0700493TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
Yang Gao406b32f2015-02-13 16:25:33 -0800494 ResetStub();
495
496 EchoRequest send_request;
497 EchoRequest recv_request;
498 EchoResponse send_response;
499 EchoResponse recv_response;
500 Status recv_status;
501
502 ClientContext cli_ctx;
503 ServerContext srv_ctx;
504 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
505
506 send_request.set_message("Hello");
507 std::pair<grpc::string, grpc::string> meta1("key1", "val1");
508 std::pair<grpc::string, grpc::string> meta2("key2", "val2");
Craig Tiller6f871642016-02-03 16:15:31 -0800509 std::pair<grpc::string, grpc::string> meta3("g.r.d-bin", "xyz");
Yang Gao406b32f2015-02-13 16:25:33 -0800510 cli_ctx.AddMetadata(meta1.first, meta1.second);
511 cli_ctx.AddMetadata(meta2.first, meta2.second);
Craig Tiller6f871642016-02-03 16:15:31 -0800512 cli_ctx.AddMetadata(meta3.first, meta3.second);
Yang Gao406b32f2015-02-13 16:25:33 -0800513
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800514 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700515 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
Yang Gao406b32f2015-02-13 16:25:33 -0800516
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700517 service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
518 cq_.get(), tag(2));
Craig Tiller69f90e62015-08-06 08:32:35 -0700519 Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
Yang Gao406b32f2015-02-13 16:25:33 -0800520 EXPECT_EQ(send_request.message(), recv_request.message());
521 auto client_initial_metadata = srv_ctx.client_metadata();
yang-ge21908f2015-08-25 13:47:51 -0700522 EXPECT_EQ(meta1.second,
523 ToString(client_initial_metadata.find(meta1.first)->second));
524 EXPECT_EQ(meta2.second,
525 ToString(client_initial_metadata.find(meta2.first)->second));
Craig Tiller6f871642016-02-03 16:15:31 -0800526 EXPECT_EQ(meta3.second,
527 ToString(client_initial_metadata.find(meta3.first)->second));
Craig Tiller8bf2dca2015-07-10 13:08:41 -0700528 EXPECT_GE(client_initial_metadata.size(), static_cast<size_t>(2));
Yang Gao406b32f2015-02-13 16:25:33 -0800529
530 send_response.set_message(recv_request.message());
531 response_writer.Finish(send_response, Status::OK, tag(3));
532
Craig Tiller69f90e62015-08-06 08:32:35 -0700533 Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
Yang Gao406b32f2015-02-13 16:25:33 -0800534
Yang Gao3a5e5492015-02-18 14:32:38 -0800535 response_reader->Finish(&recv_response, &recv_status, tag(4));
Craig Tiller69f90e62015-08-06 08:32:35 -0700536 Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
Yang Gao406b32f2015-02-13 16:25:33 -0800537
538 EXPECT_EQ(send_response.message(), recv_response.message());
Yang Gaoc1a2c312015-06-16 10:59:46 -0700539 EXPECT_TRUE(recv_status.ok());
Yang Gao406b32f2015-02-13 16:25:33 -0800540}
541
Craig Tiller69f90e62015-08-06 08:32:35 -0700542TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) {
Yang Gao2b7f5372015-02-18 00:45:53 -0800543 ResetStub();
544
545 EchoRequest send_request;
546 EchoRequest recv_request;
547 EchoResponse send_response;
548 EchoResponse recv_response;
549 Status recv_status;
550
551 ClientContext cli_ctx;
552 ServerContext srv_ctx;
553 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
554
555 send_request.set_message("Hello");
556 std::pair<grpc::string, grpc::string> meta1("key1", "val1");
557 std::pair<grpc::string, grpc::string> meta2("key2", "val2");
558
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800559 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700560 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
Yang Gao2b7f5372015-02-18 00:45:53 -0800561
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700562 service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
563 cq_.get(), tag(2));
Craig Tiller69f90e62015-08-06 08:32:35 -0700564 Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
Yang Gao2b7f5372015-02-18 00:45:53 -0800565 EXPECT_EQ(send_request.message(), recv_request.message());
566 srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
567 srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
568 response_writer.SendInitialMetadata(tag(3));
Craig Tiller69f90e62015-08-06 08:32:35 -0700569 Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
Yang Gao2b7f5372015-02-18 00:45:53 -0800570
Yang Gao3a5e5492015-02-18 14:32:38 -0800571 response_reader->ReadInitialMetadata(tag(4));
Craig Tiller69f90e62015-08-06 08:32:35 -0700572 Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
Yang Gao2b7f5372015-02-18 00:45:53 -0800573 auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
yang-ge21908f2015-08-25 13:47:51 -0700574 EXPECT_EQ(meta1.second,
575 ToString(server_initial_metadata.find(meta1.first)->second));
576 EXPECT_EQ(meta2.second,
577 ToString(server_initial_metadata.find(meta2.first)->second));
vjpaid5577aa2015-02-18 22:26:48 -0800578 EXPECT_EQ(static_cast<size_t>(2), server_initial_metadata.size());
Yang Gao3a5e5492015-02-18 14:32:38 -0800579
580 send_response.set_message(recv_request.message());
581 response_writer.Finish(send_response, Status::OK, tag(5));
Craig Tiller69f90e62015-08-06 08:32:35 -0700582 Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
Yang Gao3a5e5492015-02-18 14:32:38 -0800583
584 response_reader->Finish(&recv_response, &recv_status, tag(6));
Craig Tiller69f90e62015-08-06 08:32:35 -0700585 Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
Yang Gao3a5e5492015-02-18 14:32:38 -0800586
587 EXPECT_EQ(send_response.message(), recv_response.message());
Yang Gaoc1a2c312015-06-16 10:59:46 -0700588 EXPECT_TRUE(recv_status.ok());
Yang Gao2b7f5372015-02-18 00:45:53 -0800589}
590
Craig Tiller69f90e62015-08-06 08:32:35 -0700591TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
Yang Gao2b7f5372015-02-18 00:45:53 -0800592 ResetStub();
593
594 EchoRequest send_request;
595 EchoRequest recv_request;
596 EchoResponse send_response;
597 EchoResponse recv_response;
598 Status recv_status;
599
600 ClientContext cli_ctx;
601 ServerContext srv_ctx;
602 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
603
604 send_request.set_message("Hello");
605 std::pair<grpc::string, grpc::string> meta1("key1", "val1");
606 std::pair<grpc::string, grpc::string> meta2("key2", "val2");
607
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800608 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700609 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
Yang Gao2b7f5372015-02-18 00:45:53 -0800610
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700611 service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
612 cq_.get(), tag(2));
Craig Tiller69f90e62015-08-06 08:32:35 -0700613 Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
Yang Gao2b7f5372015-02-18 00:45:53 -0800614 EXPECT_EQ(send_request.message(), recv_request.message());
615 response_writer.SendInitialMetadata(tag(3));
Craig Tiller69f90e62015-08-06 08:32:35 -0700616 Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
Yang Gao2b7f5372015-02-18 00:45:53 -0800617
618 send_response.set_message(recv_request.message());
619 srv_ctx.AddTrailingMetadata(meta1.first, meta1.second);
620 srv_ctx.AddTrailingMetadata(meta2.first, meta2.second);
621 response_writer.Finish(send_response, Status::OK, tag(4));
622
Craig Tiller69f90e62015-08-06 08:32:35 -0700623 Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
Yang Gao2b7f5372015-02-18 00:45:53 -0800624
Yang Gao3a5e5492015-02-18 14:32:38 -0800625 response_reader->Finish(&recv_response, &recv_status, tag(5));
Craig Tiller69f90e62015-08-06 08:32:35 -0700626 Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
Yang Gao2b7f5372015-02-18 00:45:53 -0800627 EXPECT_EQ(send_response.message(), recv_response.message());
Yang Gaoc1a2c312015-06-16 10:59:46 -0700628 EXPECT_TRUE(recv_status.ok());
Yang Gao2b7f5372015-02-18 00:45:53 -0800629 auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
yang-ge21908f2015-08-25 13:47:51 -0700630 EXPECT_EQ(meta1.second,
631 ToString(server_trailing_metadata.find(meta1.first)->second));
632 EXPECT_EQ(meta2.second,
633 ToString(server_trailing_metadata.find(meta2.first)->second));
vjpaid5577aa2015-02-18 22:26:48 -0800634 EXPECT_EQ(static_cast<size_t>(2), server_trailing_metadata.size());
Yang Gao2b7f5372015-02-18 00:45:53 -0800635}
636
Craig Tiller69f90e62015-08-06 08:32:35 -0700637TEST_P(AsyncEnd2endTest, MetadataRpc) {
Yang Gao2b7f5372015-02-18 00:45:53 -0800638 ResetStub();
639
640 EchoRequest send_request;
641 EchoRequest recv_request;
642 EchoResponse send_response;
643 EchoResponse recv_response;
644 Status recv_status;
645
646 ClientContext cli_ctx;
647 ServerContext srv_ctx;
648 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
649
650 send_request.set_message("Hello");
651 std::pair<grpc::string, grpc::string> meta1("key1", "val1");
Yang Gao3a5e5492015-02-18 14:32:38 -0800652 std::pair<grpc::string, grpc::string> meta2(
Vijay Pai92a928f2015-03-26 16:30:22 -0400653 "key2-bin",
Craig Tillerd6c98df2015-08-18 09:33:44 -0700654 grpc::string("\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc", 13));
Yang Gao2b7f5372015-02-18 00:45:53 -0800655 std::pair<grpc::string, grpc::string> meta3("key3", "val3");
Craig Tiller47c83fd2015-02-21 22:45:35 -0800656 std::pair<grpc::string, grpc::string> meta6(
657 "key4-bin",
Vijay Pai92a928f2015-03-26 16:30:22 -0400658 grpc::string("\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d",
Craig Tillerd6c98df2015-08-18 09:33:44 -0700659 14));
Yang Gao2b7f5372015-02-18 00:45:53 -0800660 std::pair<grpc::string, grpc::string> meta5("key5", "val5");
Craig Tiller47c83fd2015-02-21 22:45:35 -0800661 std::pair<grpc::string, grpc::string> meta4(
662 "key6-bin",
Craig Tillerd6c98df2015-08-18 09:33:44 -0700663 grpc::string(
664 "\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee", 15));
Yang Gao2b7f5372015-02-18 00:45:53 -0800665
666 cli_ctx.AddMetadata(meta1.first, meta1.second);
667 cli_ctx.AddMetadata(meta2.first, meta2.second);
668
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800669 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700670 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
Yang Gao2b7f5372015-02-18 00:45:53 -0800671
Craig Tiller06cf3cc2015-05-13 13:11:01 -0700672 service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
673 cq_.get(), tag(2));
Craig Tiller69f90e62015-08-06 08:32:35 -0700674 Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
Yang Gao2b7f5372015-02-18 00:45:53 -0800675 EXPECT_EQ(send_request.message(), recv_request.message());
676 auto client_initial_metadata = srv_ctx.client_metadata();
yang-ge21908f2015-08-25 13:47:51 -0700677 EXPECT_EQ(meta1.second,
678 ToString(client_initial_metadata.find(meta1.first)->second));
679 EXPECT_EQ(meta2.second,
680 ToString(client_initial_metadata.find(meta2.first)->second));
Craig Tiller8bf2dca2015-07-10 13:08:41 -0700681 EXPECT_GE(client_initial_metadata.size(), static_cast<size_t>(2));
Yang Gao2b7f5372015-02-18 00:45:53 -0800682
683 srv_ctx.AddInitialMetadata(meta3.first, meta3.second);
684 srv_ctx.AddInitialMetadata(meta4.first, meta4.second);
685 response_writer.SendInitialMetadata(tag(3));
Craig Tiller69f90e62015-08-06 08:32:35 -0700686 Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
Yang Gao3a5e5492015-02-18 14:32:38 -0800687 response_reader->ReadInitialMetadata(tag(4));
Craig Tiller69f90e62015-08-06 08:32:35 -0700688 Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
Yang Gao2b7f5372015-02-18 00:45:53 -0800689 auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
yang-ge21908f2015-08-25 13:47:51 -0700690 EXPECT_EQ(meta3.second,
691 ToString(server_initial_metadata.find(meta3.first)->second));
692 EXPECT_EQ(meta4.second,
693 ToString(server_initial_metadata.find(meta4.first)->second));
Craig Tiller8bf2dca2015-07-10 13:08:41 -0700694 EXPECT_GE(server_initial_metadata.size(), static_cast<size_t>(2));
Yang Gao3a5e5492015-02-18 14:32:38 -0800695
696 send_response.set_message(recv_request.message());
697 srv_ctx.AddTrailingMetadata(meta5.first, meta5.second);
698 srv_ctx.AddTrailingMetadata(meta6.first, meta6.second);
699 response_writer.Finish(send_response, Status::OK, tag(5));
700
Craig Tiller69f90e62015-08-06 08:32:35 -0700701 Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
Yang Gao3a5e5492015-02-18 14:32:38 -0800702
Yang Gao3a5e5492015-02-18 14:32:38 -0800703 response_reader->Finish(&recv_response, &recv_status, tag(6));
Craig Tiller69f90e62015-08-06 08:32:35 -0700704 Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
Yang Gao3a5e5492015-02-18 14:32:38 -0800705 EXPECT_EQ(send_response.message(), recv_response.message());
Yang Gaoc1a2c312015-06-16 10:59:46 -0700706 EXPECT_TRUE(recv_status.ok());
Yang Gao2b7f5372015-02-18 00:45:53 -0800707 auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
yang-ge21908f2015-08-25 13:47:51 -0700708 EXPECT_EQ(meta5.second,
709 ToString(server_trailing_metadata.find(meta5.first)->second));
710 EXPECT_EQ(meta6.second,
711 ToString(server_trailing_metadata.find(meta6.first)->second));
Craig Tiller8bf2dca2015-07-10 13:08:41 -0700712 EXPECT_GE(server_trailing_metadata.size(), static_cast<size_t>(2));
Yang Gao2b7f5372015-02-18 00:45:53 -0800713}
yang-gb3352562015-08-04 14:42:06 -0700714
715// Server uses AsyncNotifyWhenDone API to check for cancellation
Craig Tiller69f90e62015-08-06 08:32:35 -0700716TEST_P(AsyncEnd2endTest, ServerCheckCancellation) {
yang-gb3352562015-08-04 14:42:06 -0700717 ResetStub();
718
719 EchoRequest send_request;
720 EchoRequest recv_request;
721 EchoResponse send_response;
722 EchoResponse recv_response;
723 Status recv_status;
724
725 ClientContext cli_ctx;
726 ServerContext srv_ctx;
727 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
728
729 send_request.set_message("Hello");
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800730 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
yang-gb3352562015-08-04 14:42:06 -0700731 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
732
733 srv_ctx.AsyncNotifyWhenDone(tag(5));
734 service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
735 cq_.get(), tag(2));
736
Craig Tiller69f90e62015-08-06 08:32:35 -0700737 Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
yang-gb3352562015-08-04 14:42:06 -0700738 EXPECT_EQ(send_request.message(), recv_request.message());
739
740 cli_ctx.TryCancel();
Craig Tiller69f90e62015-08-06 08:32:35 -0700741 Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
yang-gb3352562015-08-04 14:42:06 -0700742 EXPECT_TRUE(srv_ctx.IsCancelled());
743
744 response_reader->Finish(&recv_response, &recv_status, tag(4));
Craig Tiller69f90e62015-08-06 08:32:35 -0700745 Verifier(GetParam()).Expect(4, false).Verify(cq_.get());
yang-gb3352562015-08-04 14:42:06 -0700746
747 EXPECT_EQ(StatusCode::CANCELLED, recv_status.error_code());
748}
749
750// Server uses AsyncNotifyWhenDone API to check for normal finish
Craig Tiller69f90e62015-08-06 08:32:35 -0700751TEST_P(AsyncEnd2endTest, ServerCheckDone) {
yang-gb3352562015-08-04 14:42:06 -0700752 ResetStub();
753
754 EchoRequest send_request;
755 EchoRequest recv_request;
756 EchoResponse send_response;
757 EchoResponse recv_response;
758 Status recv_status;
759
760 ClientContext cli_ctx;
761 ServerContext srv_ctx;
762 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
763
764 send_request.set_message("Hello");
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800765 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
yang-gb3352562015-08-04 14:42:06 -0700766 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
767
768 srv_ctx.AsyncNotifyWhenDone(tag(5));
769 service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
770 cq_.get(), tag(2));
771
Craig Tiller69f90e62015-08-06 08:32:35 -0700772 Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
yang-gb3352562015-08-04 14:42:06 -0700773 EXPECT_EQ(send_request.message(), recv_request.message());
774
775 send_response.set_message(recv_request.message());
776 response_writer.Finish(send_response, Status::OK, tag(3));
Craig Tiller69f90e62015-08-06 08:32:35 -0700777 Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
778 Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
yang-gb3352562015-08-04 14:42:06 -0700779 EXPECT_FALSE(srv_ctx.IsCancelled());
780
781 response_reader->Finish(&recv_response, &recv_status, tag(4));
Craig Tiller69f90e62015-08-06 08:32:35 -0700782 Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
yang-gb3352562015-08-04 14:42:06 -0700783
784 EXPECT_EQ(send_response.message(), recv_response.message());
785 EXPECT_TRUE(recv_status.ok());
786}
787
Craig Tiller8f7bff72015-08-17 13:23:14 -0700788TEST_P(AsyncEnd2endTest, UnimplementedRpc) {
yang-g730055d2015-08-27 12:29:45 -0700789 std::shared_ptr<Channel> channel =
Julien Boeufe5adc0e2015-10-12 14:08:10 -0700790 CreateChannel(server_address_.str(), InsecureChannelCredentials());
Craig Tiller1b4e3302015-12-17 16:35:00 -0800791 std::unique_ptr<grpc::testing::UnimplementedService::Stub> stub;
792 stub = grpc::testing::UnimplementedService::NewStub(channel);
yang-g9b7757d2015-08-13 11:15:53 -0700793 EchoRequest send_request;
794 EchoResponse recv_response;
795 Status recv_status;
796
797 ClientContext cli_ctx;
798 send_request.set_message("Hello");
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800799 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
yang-g9b7757d2015-08-13 11:15:53 -0700800 stub->AsyncUnimplemented(&cli_ctx, send_request, cq_.get()));
801
802 response_reader->Finish(&recv_response, &recv_status, tag(4));
Craig Tiller8f7bff72015-08-17 13:23:14 -0700803 Verifier(GetParam()).Expect(4, false).Verify(cq_.get());
yang-g9b7757d2015-08-13 11:15:53 -0700804
805 EXPECT_EQ(StatusCode::UNIMPLEMENTED, recv_status.error_code());
806 EXPECT_EQ("", recv_status.error_message());
807}
808
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -0800809// This class is for testing scenarios where RPCs are cancelled on the server
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800810// by calling ServerContext::TryCancel(). Server uses AsyncNotifyWhenDone
811// API to check for cancellation
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800812class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
813 protected:
814 typedef enum {
815 DO_NOT_CANCEL = 0,
816 CANCEL_BEFORE_PROCESSING,
817 CANCEL_DURING_PROCESSING,
818 CANCEL_AFTER_PROCESSING
819 } ServerTryCancelRequestPhase;
820
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -0800821 // Helper for testing client-streaming RPCs which are cancelled on the server.
822 // Depending on the value of server_try_cancel parameter, this will test one
823 // of the following three scenarios:
824 // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading
825 // any messages from the client
826 //
827 // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading
828 // messages from the client
829 //
830 // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all
831 // messages from the client (but before sending any status back to the
832 // client)
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800833 void TestClientStreamingServerCancel(
834 ServerTryCancelRequestPhase server_try_cancel) {
835 ResetStub();
836
837 EchoRequest send_request;
838 EchoRequest recv_request;
839 EchoResponse send_response;
840 EchoResponse recv_response;
841 Status recv_status;
842
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800843 ClientContext cli_ctx;
844 ServerContext srv_ctx;
845 ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
846
847 // Initiate the 'RequestStream' call on client
848 std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
Sree Kuchibhotla4fb59082016-01-29 11:16:24 -0800849 stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
850 Verifier(GetParam()).Expect(1, true).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800851
852 // On the server, request to be notified of 'RequestStream' calls
853 // and receive the 'RequestStream' call just made by the client
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800854 srv_ctx.AsyncNotifyWhenDone(tag(11));
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800855 service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
856 tag(2));
857 Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
858
859 // Client sends 3 messages (tags 3, 4 and 5)
860 for (int tag_idx = 3; tag_idx <= 5; tag_idx++) {
861 send_request.set_message("Ping " + std::to_string(tag_idx));
862 cli_stream->Write(send_request, tag(tag_idx));
Sree Kuchibhotla4fb59082016-01-29 11:16:24 -0800863 Verifier(GetParam()).Expect(tag_idx, true).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800864 }
865 cli_stream->WritesDone(tag(6));
Sree Kuchibhotla4fb59082016-01-29 11:16:24 -0800866 Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800867
868 bool expected_server_cq_result = true;
869 bool ignore_cq_result = false;
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800870 bool want_done_tag = false;
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800871
872 if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800873 srv_ctx.TryCancel();
874 Verifier(GetParam()).Expect(11, true).Verify(cq_.get());
875 EXPECT_TRUE(srv_ctx.IsCancelled());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800876
877 // Since cancellation is done before server reads any results, we know
878 // for sure that all cq results will return false from this point forward
879 expected_server_cq_result = false;
880 }
881
882 std::thread* server_try_cancel_thd = NULL;
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800883
884 auto verif = Verifier(GetParam());
885
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800886 if (server_try_cancel == CANCEL_DURING_PROCESSING) {
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800887 server_try_cancel_thd =
888 new std::thread(&ServerContext::TryCancel, &srv_ctx);
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800889 // Server will cancel the RPC in a parallel thread while reading the
890 // requests from the client. Since the cancellation can happen at anytime,
891 // some of the cq results (i.e those until cancellation) might be true but
892 // its non deterministic. So better to ignore the cq results
893 ignore_cq_result = true;
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800894 // Expect that we might possibly see the done tag that
895 // indicates cancellation completion in this case
896 want_done_tag = true;
897 verif.Expect(11, true);
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800898 }
899
900 // Server reads 3 messages (tags 6, 7 and 8)
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800901 // But if want_done_tag is true, we might also see tag 11
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800902 for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
903 srv_stream.Read(&recv_request, tag(tag_idx));
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800904 // Note that we'll add something to the verifier and verify that
905 // something was seen, but it might be tag 11 and not what we
906 // just added
907 int got_tag = verif.Expect(tag_idx, expected_server_cq_result)
908 .Next(cq_.get(), ignore_cq_result);
909 GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag));
910 if (got_tag == 11) {
911 EXPECT_TRUE(srv_ctx.IsCancelled());
912 want_done_tag = false;
913 // Now get the other entry that we were waiting on
914 EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), tag_idx);
915 }
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800916 }
917
918 if (server_try_cancel_thd != NULL) {
919 server_try_cancel_thd->join();
920 delete server_try_cancel_thd;
921 }
922
923 if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800924 srv_ctx.TryCancel();
925 want_done_tag = true;
926 verif.Expect(11, true);
927 }
928
929 if (want_done_tag) {
930 verif.Verify(cq_.get());
931 EXPECT_TRUE(srv_ctx.IsCancelled());
932 want_done_tag = false;
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800933 }
934
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -0800935 // The RPC has been cancelled at this point for sure (i.e irrespective of
936 // the value of `server_try_cancel` is). So, from this point forward, we
937 // know that cq results are supposed to return false on server.
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800938
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -0800939 // Server sends the final message and cancelled status (but the RPC is
940 // already cancelled at this point. So we expect the operation to fail)
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800941 srv_stream.Finish(send_response, Status::CANCELLED, tag(9));
942 Verifier(GetParam()).Expect(9, false).Verify(cq_.get());
943
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -0800944 // Client will see the cancellation
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800945 cli_stream->Finish(&recv_status, tag(10));
Sree Kuchibhotla369a04a2016-02-01 10:53:13 -0800946 // TODO(sreek): The expectation here should be true. This is a bug (github
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -0800947 // issue #4972)
Sree Kuchibhotla4fb59082016-01-29 11:16:24 -0800948 Verifier(GetParam()).Expect(10, false).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800949 EXPECT_FALSE(recv_status.ok());
950 EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code());
951 }
952
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -0800953 // Helper for testing server-streaming RPCs which are cancelled on the server.
954 // Depending on the value of server_try_cancel parameter, this will test one
955 // of the following three scenarios:
956 // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before sending
957 // any messages to the client
958 //
959 // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while sending
960 // messages to the client
961 //
962 // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after sending all
963 // messages to the client (but before sending any status back to the
964 // client)
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800965 void TestServerStreamingServerCancel(
966 ServerTryCancelRequestPhase server_try_cancel) {
967 ResetStub();
968
969 EchoRequest send_request;
970 EchoRequest recv_request;
971 EchoResponse send_response;
972 EchoResponse recv_response;
973 Status recv_status;
974 ClientContext cli_ctx;
975 ServerContext srv_ctx;
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800976 ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
977
978 send_request.set_message("Ping");
979 // Initiate the 'ResponseStream' call on the client
980 std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
Sree Kuchibhotla4fb59082016-01-29 11:16:24 -0800981 stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
982 Verifier(GetParam()).Expect(1, true).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800983 // On the server, request to be notified of 'ResponseStream' calls and
984 // receive the call just made by the client
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800985 srv_ctx.AsyncNotifyWhenDone(tag(11));
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800986 service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
987 cq_.get(), cq_.get(), tag(2));
988 Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
989 EXPECT_EQ(send_request.message(), recv_request.message());
990
991 bool expected_cq_result = true;
992 bool ignore_cq_result = false;
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800993 bool want_done_tag = false;
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800994
995 if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -0800996 srv_ctx.TryCancel();
997 Verifier(GetParam()).Expect(11, true).Verify(cq_.get());
998 EXPECT_TRUE(srv_ctx.IsCancelled());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -0800999
1000 // We know for sure that all cq results will be false from this point
1001 // since the server cancelled the RPC
1002 expected_cq_result = false;
1003 }
1004
1005 std::thread* server_try_cancel_thd = NULL;
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001006
1007 auto verif = Verifier(GetParam());
1008
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001009 if (server_try_cancel == CANCEL_DURING_PROCESSING) {
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001010 server_try_cancel_thd =
1011 new std::thread(&ServerContext::TryCancel, &srv_ctx);
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001012
1013 // Server will cancel the RPC in a parallel thread while writing responses
1014 // to the client. Since the cancellation can happen at anytime, some of
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001015 // the cq results (i.e those until cancellation) might be true but it is
1016 // non deterministic. So better to ignore the cq results
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001017 ignore_cq_result = true;
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001018 // Expect that we might possibly see the done tag that
1019 // indicates cancellation completion in this case
1020 want_done_tag = true;
1021 verif.Expect(11, true);
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001022 }
1023
1024 // Server sends three messages (tags 3, 4 and 5)
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001025 // But if want_done tag is true, we might also see tag 11
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001026 for (int tag_idx = 3; tag_idx <= 5; tag_idx++) {
1027 send_response.set_message("Pong " + std::to_string(tag_idx));
1028 srv_stream.Write(send_response, tag(tag_idx));
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001029 // Note that we'll add something to the verifier and verify that
1030 // something was seen, but it might be tag 11 and not what we
1031 // just added
1032 int got_tag = verif.Expect(tag_idx, expected_cq_result)
1033 .Next(cq_.get(), ignore_cq_result);
1034 GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag));
1035 if (got_tag == 11) {
1036 EXPECT_TRUE(srv_ctx.IsCancelled());
1037 want_done_tag = false;
1038 // Now get the other entry that we were waiting on
1039 EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), tag_idx);
1040 }
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001041 }
1042
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001043 if (server_try_cancel_thd != NULL) {
1044 server_try_cancel_thd->join();
1045 delete server_try_cancel_thd;
1046 }
1047
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001048 if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001049 srv_ctx.TryCancel();
1050 want_done_tag = true;
1051 verif.Expect(11, true);
yang-gad0df7b2016-02-22 10:00:20 -08001052
1053 // Client reads may fail bacause it is notified that the stream is
1054 // cancelled.
1055 ignore_cq_result = true;
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001056 }
1057
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001058 if (want_done_tag) {
1059 verif.Verify(cq_.get());
1060 EXPECT_TRUE(srv_ctx.IsCancelled());
1061 want_done_tag = false;
1062 }
1063
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001064 // Client attemts to read the three messages from the server
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001065 for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
1066 cli_stream->Read(&recv_response, tag(tag_idx));
1067 Verifier(GetParam())
1068 .Expect(tag_idx, expected_cq_result)
Sree Kuchibhotla4fb59082016-01-29 11:16:24 -08001069 .Verify(cq_.get(), ignore_cq_result);
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001070 }
1071
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001072 // The RPC has been cancelled at this point for sure (i.e irrespective of
1073 // the value of `server_try_cancel` is). So, from this point forward, we
1074 // know that cq results are supposed to return false on server.
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001075
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001076 // Server finishes the stream (but the RPC is already cancelled)
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001077 srv_stream.Finish(Status::CANCELLED, tag(9));
1078 Verifier(GetParam()).Expect(9, false).Verify(cq_.get());
1079
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001080 // Client will see the cancellation
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001081 cli_stream->Finish(&recv_status, tag(10));
Sree Kuchibhotla4fb59082016-01-29 11:16:24 -08001082 Verifier(GetParam()).Expect(10, true).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001083 EXPECT_FALSE(recv_status.ok());
1084 EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code());
1085 }
1086
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001087 // Helper for testing bidirectinal-streaming RPCs which are cancelled on the
1088 // server.
1089 //
1090 // Depending on the value of server_try_cancel parameter, this will
1091 // test one of the following three scenarios:
1092 // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading/
1093 // writing any messages from/to the client
1094 //
1095 // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading
1096 // messages from the client
1097 //
1098 // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all
1099 // messages from the client (but before sending any status back to the
1100 // client)
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001101 void TestBidiStreamingServerCancel(
1102 ServerTryCancelRequestPhase server_try_cancel) {
1103 ResetStub();
1104
1105 EchoRequest send_request;
1106 EchoRequest recv_request;
1107 EchoResponse send_response;
1108 EchoResponse recv_response;
1109 Status recv_status;
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001110 ClientContext cli_ctx;
1111 ServerContext srv_ctx;
1112 ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
1113
1114 // Initiate the call from the client side
1115 std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
Sree Kuchibhotla4fb59082016-01-29 11:16:24 -08001116 cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
1117 Verifier(GetParam()).Expect(1, true).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001118
1119 // On the server, request to be notified of the 'BidiStream' call and
1120 // receive the call just made by the client
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001121 srv_ctx.AsyncNotifyWhenDone(tag(11));
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001122 service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
1123 tag(2));
1124 Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
1125
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001126 // Client sends the first and the only message
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001127 send_request.set_message("Ping");
1128 cli_stream->Write(send_request, tag(3));
Sree Kuchibhotla4fb59082016-01-29 11:16:24 -08001129 Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001130
1131 bool expected_cq_result = true;
1132 bool ignore_cq_result = false;
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001133 bool want_done_tag = false;
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001134
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001135 if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001136 srv_ctx.TryCancel();
1137 Verifier(GetParam()).Expect(11, true).Verify(cq_.get());
1138 EXPECT_TRUE(srv_ctx.IsCancelled());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001139
1140 // We know for sure that all cq results will be false from this point
1141 // since the server cancelled the RPC
1142 expected_cq_result = false;
1143 }
1144
1145 std::thread* server_try_cancel_thd = NULL;
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001146
1147 auto verif = Verifier(GetParam());
1148
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001149 if (server_try_cancel == CANCEL_DURING_PROCESSING) {
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001150 server_try_cancel_thd =
1151 new std::thread(&ServerContext::TryCancel, &srv_ctx);
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001152
1153 // Since server is going to cancel the RPC in a parallel thread, some of
1154 // the cq results (i.e those until the cancellation) might be true. Since
1155 // that number is non-deterministic, it is better to ignore the cq results
1156 ignore_cq_result = true;
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001157 // Expect that we might possibly see the done tag that
1158 // indicates cancellation completion in this case
1159 want_done_tag = true;
1160 verif.Expect(11, true);
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001161 }
1162
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001163 int got_tag;
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001164 srv_stream.Read(&recv_request, tag(4));
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001165 verif.Expect(4, expected_cq_result);
1166 got_tag = verif.Next(cq_.get(), ignore_cq_result);
1167 GPR_ASSERT((got_tag == 4) || (got_tag == 11 && want_done_tag));
1168 if (got_tag == 11) {
1169 EXPECT_TRUE(srv_ctx.IsCancelled());
1170 want_done_tag = false;
1171 // Now get the other entry that we were waiting on
1172 EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 4);
1173 }
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001174
1175 send_response.set_message("Pong");
1176 srv_stream.Write(send_response, tag(5));
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001177 verif.Expect(5, expected_cq_result);
1178 got_tag = verif.Next(cq_.get(), ignore_cq_result);
1179 GPR_ASSERT((got_tag == 5) || (got_tag == 11 && want_done_tag));
1180 if (got_tag == 11) {
1181 EXPECT_TRUE(srv_ctx.IsCancelled());
1182 want_done_tag = false;
1183 // Now get the other entry that we were waiting on
1184 EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 5);
1185 }
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001186
1187 cli_stream->Read(&recv_response, tag(6));
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001188 verif.Expect(6, expected_cq_result);
1189 got_tag = verif.Next(cq_.get(), ignore_cq_result);
1190 GPR_ASSERT((got_tag == 6) || (got_tag == 11 && want_done_tag));
1191 if (got_tag == 11) {
1192 EXPECT_TRUE(srv_ctx.IsCancelled());
1193 want_done_tag = false;
1194 // Now get the other entry that we were waiting on
1195 EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 6);
1196 }
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001197
1198 // This is expected to succeed in all cases
1199 cli_stream->WritesDone(tag(7));
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001200 verif.Expect(7, true);
1201 got_tag = verif.Next(cq_.get(), ignore_cq_result);
1202 GPR_ASSERT((got_tag == 7) || (got_tag == 11 && want_done_tag));
1203 if (got_tag == 11) {
1204 EXPECT_TRUE(srv_ctx.IsCancelled());
1205 want_done_tag = false;
1206 // Now get the other entry that we were waiting on
1207 EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 7);
1208 }
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001209
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001210 // This is expected to fail in all cases i.e for all values of
Vijay Pai018879a2016-02-16 09:20:50 -08001211 // server_try_cancel. This is because at this point, either there are no
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001212 // more msgs from the client (because client called WritesDone) or the RPC
1213 // is cancelled on the server
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001214 srv_stream.Read(&recv_request, tag(8));
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001215 verif.Expect(8, false);
1216 got_tag = verif.Next(cq_.get(), ignore_cq_result);
1217 GPR_ASSERT((got_tag == 8) || (got_tag == 11 && want_done_tag));
1218 if (got_tag == 11) {
1219 EXPECT_TRUE(srv_ctx.IsCancelled());
1220 want_done_tag = false;
1221 // Now get the other entry that we were waiting on
1222 EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 8);
1223 }
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001224
1225 if (server_try_cancel_thd != NULL) {
1226 server_try_cancel_thd->join();
1227 delete server_try_cancel_thd;
1228 }
1229
1230 if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
Sree Kuchibhotlab0d15672016-03-07 10:51:02 -08001231 srv_ctx.TryCancel();
1232 want_done_tag = true;
1233 verif.Expect(11, true);
1234 }
1235
1236 if (want_done_tag) {
1237 verif.Verify(cq_.get());
1238 EXPECT_TRUE(srv_ctx.IsCancelled());
1239 want_done_tag = false;
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001240 }
1241
Sree Kuchibhotla0f242ac2016-01-29 18:12:19 -08001242 // The RPC has been cancelled at this point for sure (i.e irrespective of
1243 // the value of `server_try_cancel` is). So, from this point forward, we
1244 // know that cq results are supposed to return false on server.
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001245
1246 srv_stream.Finish(Status::CANCELLED, tag(9));
1247 Verifier(GetParam()).Expect(9, false).Verify(cq_.get());
1248
1249 cli_stream->Finish(&recv_status, tag(10));
Sree Kuchibhotla4fb59082016-01-29 11:16:24 -08001250 Verifier(GetParam()).Expect(10, true).Verify(cq_.get());
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001251 EXPECT_FALSE(recv_status.ok());
1252 EXPECT_EQ(grpc::StatusCode::CANCELLED, recv_status.error_code());
1253 }
1254};
1255
1256TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelBefore) {
1257 TestClientStreamingServerCancel(CANCEL_BEFORE_PROCESSING);
1258}
1259
1260TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelDuring) {
1261 TestClientStreamingServerCancel(CANCEL_DURING_PROCESSING);
1262}
1263
1264TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelAfter) {
1265 TestClientStreamingServerCancel(CANCEL_AFTER_PROCESSING);
1266}
1267
1268TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelBefore) {
1269 TestServerStreamingServerCancel(CANCEL_BEFORE_PROCESSING);
1270}
1271
1272TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelDuring) {
1273 TestServerStreamingServerCancel(CANCEL_DURING_PROCESSING);
1274}
1275
1276TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelAfter) {
1277 TestServerStreamingServerCancel(CANCEL_AFTER_PROCESSING);
1278}
1279
1280TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelBefore) {
1281 TestBidiStreamingServerCancel(CANCEL_BEFORE_PROCESSING);
1282}
1283
1284TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelDuring) {
1285 TestBidiStreamingServerCancel(CANCEL_DURING_PROCESSING);
1286}
1287
1288TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelAfter) {
1289 TestBidiStreamingServerCancel(CANCEL_AFTER_PROCESSING);
1290}
1291
Craig Tiller4c06b822015-08-06 08:41:31 -07001292INSTANTIATE_TEST_CASE_P(AsyncEnd2end, AsyncEnd2endTest,
1293 ::testing::Values(false, true));
Sree Kuchibhotla944f4cf2016-01-27 14:37:26 -08001294INSTANTIATE_TEST_CASE_P(AsyncEnd2endServerTryCancel,
1295 AsyncEnd2endServerTryCancelTest,
1296 ::testing::Values(false));
Craig Tiller69f90e62015-08-06 08:32:35 -07001297
Craig Tiller0220cf12015-02-12 17:39:26 -08001298} // namespace
1299} // namespace testing
1300} // namespace grpc
1301
1302int main(int argc, char** argv) {
1303 grpc_test_init(argc, argv);
Vijay Paib65eda42016-02-16 13:48:05 -08001304 gpr_tls_init(&g_is_async_end2end_test);
Craig Tiller0220cf12015-02-12 17:39:26 -08001305 ::testing::InitGoogleTest(&argc, argv);
Vijay Paib65eda42016-02-16 13:48:05 -08001306 int ret = RUN_ALL_TESTS();
1307 gpr_tls_destroy(&g_is_async_end2end_test);
1308 return ret;
Craig Tiller0220cf12015-02-12 17:39:26 -08001309}