Merge pull request #1023 from vjpai/async

Non-blocking Next method for C++ async completion queue
diff --git a/include/grpc++/completion_queue.h b/include/grpc++/completion_queue.h
index f741e3c..d742d85 100644
--- a/include/grpc++/completion_queue.h
+++ b/include/grpc++/completion_queue.h
@@ -34,6 +34,7 @@
 #ifndef GRPCXX_COMPLETION_QUEUE_H
 #define GRPCXX_COMPLETION_QUEUE_H
 
+#include <chrono>
 #include <grpc++/impl/client_unary_call.h>
 
 struct grpc_completion_queue;
@@ -75,10 +76,21 @@
   explicit CompletionQueue(grpc_completion_queue *take);
   ~CompletionQueue();
 
-  // Blocking read from queue.
-  // Returns true if an event was received, false if the queue is ready
-  // for destruction.
-  bool Next(void **tag, bool *ok);
+  // Tri-state return for AsyncNext: SHUTDOWN, GOT_EVENT, TIMEOUT
+  enum NextStatus {SHUTDOWN, GOT_EVENT, TIMEOUT};
+
+  // Nonblocking (until deadline) read from queue.
+  // Cannot rely on result of tag or ok if return is TIMEOUT
+  NextStatus AsyncNext(void **tag, bool *ok,
+		       std::chrono::system_clock::time_point deadline);
+
+  // Blocking (until deadline) read from queue.
+  // Returns false if the queue is ready for destruction, true if event
+  bool Next(void **tag, bool *ok) {
+    return (AsyncNext(tag,ok,
+		      std::chrono::system_clock::time_point::max()) !=
+	    SHUTDOWN);
+  }
 
   // Shutdown has to be called, and the CompletionQueue can only be
   // destructed when false is returned from Next().
diff --git a/src/cpp/common/completion_queue.cc b/src/cpp/common/completion_queue.cc
index 414966c..fede2da 100644
--- a/src/cpp/common/completion_queue.cc
+++ b/src/cpp/common/completion_queue.cc
@@ -57,19 +57,26 @@
   }
 };
 
-bool CompletionQueue::Next(void** tag, bool* ok) {
+CompletionQueue::NextStatus
+CompletionQueue::AsyncNext(void** tag, bool* ok,
+			   std::chrono::system_clock::time_point deadline) {
   std::unique_ptr<grpc_event, EventDeleter> ev;
 
+  gpr_timespec gpr_deadline;
+  Timepoint2Timespec(deadline, &gpr_deadline);
   for (;;) {
-    ev.reset(grpc_completion_queue_next(cq_, gpr_inf_future));
+    ev.reset(grpc_completion_queue_next(cq_, gpr_deadline));
+    if (!ev) { /* got a NULL back because deadline passed */
+      return TIMEOUT;
+    }
     if (ev->type == GRPC_QUEUE_SHUTDOWN) {
-      return false;
+      return SHUTDOWN;
     }
     auto cq_tag = static_cast<CompletionQueueTag*>(ev->tag);
     *ok = ev->data.op_complete == GRPC_OP_OK;
     *tag = cq_tag;
     if (cq_tag->FinalizeResult(tag, ok)) {
-      return true;
+      return GOT_EVENT;
     }
   }
 }
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index 70df9e1..e011b78 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -76,6 +76,20 @@
   EXPECT_EQ(tag(i), got_tag);
 }
 
+void verify_timed_ok(CompletionQueue* cq, int i, bool expect_ok,
+		     std::chrono::system_clock::time_point deadline =
+		     std::chrono::system_clock::time_point::max(),
+		     CompletionQueue::NextStatus expected_outcome =
+		     CompletionQueue::GOT_EVENT) {
+  bool ok;
+  void* got_tag;
+  EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline), expected_outcome);
+  if (expected_outcome == CompletionQueue::GOT_EVENT) {
+    EXPECT_EQ(expect_ok, ok);
+    EXPECT_EQ(tag(i), got_tag);
+  }
+}
+
 class AsyncEnd2endTest : public ::testing::Test {
  protected:
   AsyncEnd2endTest() : service_(&srv_cq_) {}
@@ -166,6 +180,50 @@
   SendRpc(10);
 }
 
+// Test a simple RPC using the async version of Next
+TEST_F(AsyncEnd2endTest, AsyncNextRpc) {
+  ResetStub();
+
+  EchoRequest send_request;
+  EchoRequest recv_request;
+  EchoResponse send_response;
+  EchoResponse recv_response;
+  Status recv_status;
+
+  ClientContext cli_ctx;
+  ServerContext srv_ctx;
+  grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
+
+  send_request.set_message("Hello");
+  std::unique_ptr<ClientAsyncResponseReader<EchoResponse> >
+    response_reader(stub_->AsyncEcho(&cli_ctx, send_request,
+				     &cli_cq_, tag(1)));
+
+  std::chrono::system_clock::time_point
+    time_now(std::chrono::system_clock::now()),
+    time_limit(std::chrono::system_clock::now()+std::chrono::seconds(5));
+  verify_timed_ok(&srv_cq_, -1, true, time_now, CompletionQueue::TIMEOUT);
+  verify_timed_ok(&cli_cq_, -1, true, time_now, CompletionQueue::TIMEOUT);
+
+  service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_,
+		       tag(2));
+
+  verify_timed_ok(&srv_cq_, 2, true, time_limit);
+  EXPECT_EQ(send_request.message(), recv_request.message());
+  verify_timed_ok(&cli_cq_, 1, true, time_limit);
+
+  send_response.set_message(recv_request.message());
+  response_writer.Finish(send_response, Status::OK, tag(3));
+  verify_timed_ok(&srv_cq_, 3, true);
+
+  response_reader->Finish(&recv_response, &recv_status, tag(4));
+  verify_timed_ok(&cli_cq_, 4, true);
+
+  EXPECT_EQ(send_response.message(), recv_response.message());
+  EXPECT_TRUE(recv_status.IsOk());
+
+}
+  
 // Two pings and a final pong.
 TEST_F(AsyncEnd2endTest, SimpleClientStreaming) {
   ResetStub();