Revert "Revert "Properly integrate async API with server-side cancellations.""
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index 9ca3bf9..dc8c2bb 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -68,6 +68,7 @@
 namespace {
 
 void* tag(int i) { return (void*)(intptr_t)i; }
+int detag(void* p) { return static_cast<int>(reinterpret_cast<intptr_t>(p)); }
 
 #ifdef GPR_POSIX_SOCKET
 static int maybe_assert_non_blocking_poll(struct pollfd* pfds, nfds_t nfds,
@@ -106,37 +107,50 @@
 class Verifier {
  public:
   explicit Verifier(bool spin) : spin_(spin) {}
+  // Expect sets the expected ok value for a specific tag
   Verifier& Expect(int i, bool expect_ok) {
     expectations_[tag(i)] = expect_ok;
     return *this;
   }
 
+  // Next waits for 1 async tag to complete, checks its
+  // expectations, and returns the tag
+  int Next(CompletionQueue* cq, bool ignore_ok) {
+    bool ok;
+    void* got_tag;
+    if (spin_) {
+      for (;;) {
+        auto r = cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME));
+        if (r == CompletionQueue::TIMEOUT) continue;
+        if (r == CompletionQueue::GOT_EVENT) break;
+        gpr_log(GPR_ERROR, "unexpected result from AsyncNext");
+        abort();
+      }
+    } else {
+      EXPECT_TRUE(cq->Next(&got_tag, &ok));
+    }
+    auto it = expectations_.find(got_tag);
+    EXPECT_TRUE(it != expectations_.end());
+    if (!ignore_ok) {
+      EXPECT_EQ(it->second, ok);
+    }
+    expectations_.erase(it);
+    return detag(got_tag);
+  }
+
+  // Verify keeps calling Next until all currently set
+  // expected tags are complete
   void Verify(CompletionQueue* cq) { Verify(cq, false); }
 
+  // This version of Verify allows optionally ignoring the
+  // outcome of the expectation
   void Verify(CompletionQueue* cq, bool ignore_ok) {
     GPR_ASSERT(!expectations_.empty());
     while (!expectations_.empty()) {
-      bool ok;
-      void* got_tag;
-      if (spin_) {
-        for (;;) {
-          auto r = cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME));
-          if (r == CompletionQueue::TIMEOUT) continue;
-          if (r == CompletionQueue::GOT_EVENT) break;
-          gpr_log(GPR_ERROR, "unexpected result from AsyncNext");
-          abort();
-        }
-      } else {
-        EXPECT_TRUE(cq->Next(&got_tag, &ok));
-      }
-      auto it = expectations_.find(got_tag);
-      EXPECT_TRUE(it != expectations_.end());
-      if (!ignore_ok) {
-        EXPECT_EQ(it->second, ok);
-      }
-      expectations_.erase(it);
+      Next(cq, ignore_ok);
     }
   }
+  // This version of Verify stops after a certain deadline
   void Verify(CompletionQueue* cq,
               std::chrono::system_clock::time_point deadline) {
     if (expectations_.empty()) {
@@ -793,7 +807,8 @@
 }
 
 // This class is for testing scenarios where RPCs are cancelled on the server
-// by calling ServerContext::TryCancel()
+// by calling ServerContext::TryCancel(). Server uses AsyncNotifyWhenDone
+// API to check for cancellation
 class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
  protected:
   typedef enum {
@@ -803,13 +818,6 @@
     CANCEL_AFTER_PROCESSING
   } ServerTryCancelRequestPhase;
 
-  void ServerTryCancel(ServerContext* context) {
-    EXPECT_FALSE(context->IsCancelled());
-    context->TryCancel();
-    gpr_log(GPR_INFO, "Server called TryCancel()");
-    EXPECT_TRUE(context->IsCancelled());
-  }
-
   // Helper for testing client-streaming RPCs which are cancelled on the server.
   // Depending on the value of server_try_cancel parameter, this will test one
   // of the following three scenarios:
@@ -843,6 +851,7 @@
 
     // On the server, request to be notified of 'RequestStream' calls
     // and receive the 'RequestStream' call just made by the client
+    srv_ctx.AsyncNotifyWhenDone(tag(11));
     service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
                                   tag(2));
     Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
@@ -858,9 +867,12 @@
 
     bool expected_server_cq_result = true;
     bool ignore_cq_result = false;
+    bool want_done_tag = false;
 
     if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
-      ServerTryCancel(&srv_ctx);
+      srv_ctx.TryCancel();
+      Verifier(GetParam()).Expect(11, true).Verify(cq_.get());
+      EXPECT_TRUE(srv_ctx.IsCancelled());
 
       // Since cancellation is done before server reads any results, we know
       // for sure that all cq results will return false from this point forward
@@ -868,22 +880,39 @@
     }
 
     std::thread* server_try_cancel_thd = NULL;
+
+    auto verif = Verifier(GetParam());
+
     if (server_try_cancel == CANCEL_DURING_PROCESSING) {
-      server_try_cancel_thd = new std::thread(
-          &AsyncEnd2endServerTryCancelTest::ServerTryCancel, this, &srv_ctx);
+      server_try_cancel_thd =
+          new std::thread(&ServerContext::TryCancel, &srv_ctx);
       // Server will cancel the RPC in a parallel thread while reading the
       // requests from the client. Since the cancellation can happen at anytime,
       // some of the cq results (i.e those until cancellation) might be true but
       // its non deterministic. So better to ignore the cq results
       ignore_cq_result = true;
+      // Expect that we might possibly see the done tag that
+      // indicates cancellation completion in this case
+      want_done_tag = true;
+      verif.Expect(11, true);
     }
 
     // Server reads 3 messages (tags 6, 7 and 8)
+    // But if want_done_tag is true, we might also see tag 11
     for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
       srv_stream.Read(&recv_request, tag(tag_idx));
-      Verifier(GetParam())
-          .Expect(tag_idx, expected_server_cq_result)
-          .Verify(cq_.get(), ignore_cq_result);
+      // Note that we'll add something to the verifier and verify that
+      // something was seen, but it might be tag 11 and not what we
+      // just added
+      int got_tag = verif.Expect(tag_idx, expected_server_cq_result)
+                        .Next(cq_.get(), ignore_cq_result);
+      GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag));
+      if (got_tag == 11) {
+        EXPECT_TRUE(srv_ctx.IsCancelled());
+        want_done_tag = false;
+        // Now get the other entry that we were waiting on
+        EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), tag_idx);
+      }
     }
 
     if (server_try_cancel_thd != NULL) {
@@ -892,7 +921,15 @@
     }
 
     if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
-      ServerTryCancel(&srv_ctx);
+      srv_ctx.TryCancel();
+      want_done_tag = true;
+      verif.Expect(11, true);
+    }
+
+    if (want_done_tag) {
+      verif.Verify(cq_.get());
+      EXPECT_TRUE(srv_ctx.IsCancelled());
+      want_done_tag = false;
     }
 
     // The RPC has been cancelled at this point for sure (i.e irrespective of
@@ -945,6 +982,7 @@
     Verifier(GetParam()).Expect(1, true).Verify(cq_.get());
     // On the server, request to be notified of 'ResponseStream' calls and
     // receive the call just made by the client
+    srv_ctx.AsyncNotifyWhenDone(tag(11));
     service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
                                    cq_.get(), cq_.get(), tag(2));
     Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
@@ -952,9 +990,12 @@
 
     bool expected_cq_result = true;
     bool ignore_cq_result = false;
+    bool want_done_tag = false;
 
     if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
-      ServerTryCancel(&srv_ctx);
+      srv_ctx.TryCancel();
+      Verifier(GetParam()).Expect(11, true).Verify(cq_.get());
+      EXPECT_TRUE(srv_ctx.IsCancelled());
 
       // We know for sure that all cq results will be false from this point
       // since the server cancelled the RPC
@@ -962,24 +1003,41 @@
     }
 
     std::thread* server_try_cancel_thd = NULL;
+
+    auto verif = Verifier(GetParam());
+
     if (server_try_cancel == CANCEL_DURING_PROCESSING) {
-      server_try_cancel_thd = new std::thread(
-          &AsyncEnd2endServerTryCancelTest::ServerTryCancel, this, &srv_ctx);
+      server_try_cancel_thd =
+          new std::thread(&ServerContext::TryCancel, &srv_ctx);
 
       // Server will cancel the RPC in a parallel thread while writing responses
       // to the client. Since the cancellation can happen at anytime, some of
       // the cq results (i.e those until cancellation) might be true but it is
       // non deterministic. So better to ignore the cq results
       ignore_cq_result = true;
+      // Expect that we might possibly see the done tag that
+      // indicates cancellation completion in this case
+      want_done_tag = true;
+      verif.Expect(11, true);
     }
 
     // Server sends three messages (tags 3, 4 and 5)
+    // But if want_done tag is true, we might also see tag 11
     for (int tag_idx = 3; tag_idx <= 5; tag_idx++) {
       send_response.set_message("Pong " + std::to_string(tag_idx));
       srv_stream.Write(send_response, tag(tag_idx));
-      Verifier(GetParam())
-          .Expect(tag_idx, expected_cq_result)
-          .Verify(cq_.get(), ignore_cq_result);
+      // Note that we'll add something to the verifier and verify that
+      // something was seen, but it might be tag 11 and not what we
+      // just added
+      int got_tag = verif.Expect(tag_idx, expected_cq_result)
+                        .Next(cq_.get(), ignore_cq_result);
+      GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag));
+      if (got_tag == 11) {
+        EXPECT_TRUE(srv_ctx.IsCancelled());
+        want_done_tag = false;
+        // Now get the other entry that we were waiting on
+        EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), tag_idx);
+      }
     }
 
     if (server_try_cancel_thd != NULL) {
@@ -988,13 +1046,21 @@
     }
 
     if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
-      ServerTryCancel(&srv_ctx);
+      srv_ctx.TryCancel();
+      want_done_tag = true;
+      verif.Expect(11, true);
 
       // Client reads may fail bacause it is notified that the stream is
       // cancelled.
       ignore_cq_result = true;
     }
 
+    if (want_done_tag) {
+      verif.Verify(cq_.get());
+      EXPECT_TRUE(srv_ctx.IsCancelled());
+      want_done_tag = false;
+    }
+
     // Client attemts to read the three messages from the server
     for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
       cli_stream->Read(&recv_response, tag(tag_idx));
@@ -1052,6 +1118,7 @@
 
     // On the server, request to be notified of the 'BidiStream' call and
     // receive the call just made by the client
+    srv_ctx.AsyncNotifyWhenDone(tag(11));
     service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
                                tag(2));
     Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
@@ -1063,9 +1130,12 @@
 
     bool expected_cq_result = true;
     bool ignore_cq_result = false;
+    bool want_done_tag = false;
 
     if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
-      ServerTryCancel(&srv_ctx);
+      srv_ctx.TryCancel();
+      Verifier(GetParam()).Expect(11, true).Verify(cq_.get());
+      EXPECT_TRUE(srv_ctx.IsCancelled());
 
       // We know for sure that all cq results will be false from this point
       // since the server cancelled the RPC
@@ -1073,42 +1143,84 @@
     }
 
     std::thread* server_try_cancel_thd = NULL;
+
+    auto verif = Verifier(GetParam());
+
     if (server_try_cancel == CANCEL_DURING_PROCESSING) {
-      server_try_cancel_thd = new std::thread(
-          &AsyncEnd2endServerTryCancelTest::ServerTryCancel, this, &srv_ctx);
+      server_try_cancel_thd =
+          new std::thread(&ServerContext::TryCancel, &srv_ctx);
 
       // Since server is going to cancel the RPC in a parallel thread, some of
       // the cq results (i.e those until the cancellation) might be true. Since
       // that number is non-deterministic, it is better to ignore the cq results
       ignore_cq_result = true;
+      // Expect that we might possibly see the done tag that
+      // indicates cancellation completion in this case
+      want_done_tag = true;
+      verif.Expect(11, true);
     }
 
+    int got_tag;
     srv_stream.Read(&recv_request, tag(4));
-    Verifier(GetParam())
-        .Expect(4, expected_cq_result)
-        .Verify(cq_.get(), ignore_cq_result);
+    verif.Expect(4, expected_cq_result);
+    got_tag = verif.Next(cq_.get(), ignore_cq_result);
+    GPR_ASSERT((got_tag == 4) || (got_tag == 11 && want_done_tag));
+    if (got_tag == 11) {
+      EXPECT_TRUE(srv_ctx.IsCancelled());
+      want_done_tag = false;
+      // Now get the other entry that we were waiting on
+      EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 4);
+    }
 
     send_response.set_message("Pong");
     srv_stream.Write(send_response, tag(5));
-    Verifier(GetParam())
-        .Expect(5, expected_cq_result)
-        .Verify(cq_.get(), ignore_cq_result);
+    verif.Expect(5, expected_cq_result);
+    got_tag = verif.Next(cq_.get(), ignore_cq_result);
+    GPR_ASSERT((got_tag == 5) || (got_tag == 11 && want_done_tag));
+    if (got_tag == 11) {
+      EXPECT_TRUE(srv_ctx.IsCancelled());
+      want_done_tag = false;
+      // Now get the other entry that we were waiting on
+      EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 5);
+    }
 
     cli_stream->Read(&recv_response, tag(6));
-    Verifier(GetParam())
-        .Expect(6, expected_cq_result)
-        .Verify(cq_.get(), ignore_cq_result);
+    verif.Expect(6, expected_cq_result);
+    got_tag = verif.Next(cq_.get(), ignore_cq_result);
+    GPR_ASSERT((got_tag == 6) || (got_tag == 11 && want_done_tag));
+    if (got_tag == 11) {
+      EXPECT_TRUE(srv_ctx.IsCancelled());
+      want_done_tag = false;
+      // Now get the other entry that we were waiting on
+      EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 6);
+    }
 
     // This is expected to succeed in all cases
     cli_stream->WritesDone(tag(7));
-    Verifier(GetParam()).Expect(7, true).Verify(cq_.get());
+    verif.Expect(7, true);
+    got_tag = verif.Next(cq_.get(), ignore_cq_result);
+    GPR_ASSERT((got_tag == 7) || (got_tag == 11 && want_done_tag));
+    if (got_tag == 11) {
+      EXPECT_TRUE(srv_ctx.IsCancelled());
+      want_done_tag = false;
+      // Now get the other entry that we were waiting on
+      EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 7);
+    }
 
     // This is expected to fail in all cases i.e for all values of
     // server_try_cancel. This is because at this point, either there are no
     // more msgs from the client (because client called WritesDone) or the RPC
     // is cancelled on the server
     srv_stream.Read(&recv_request, tag(8));
-    Verifier(GetParam()).Expect(8, false).Verify(cq_.get());
+    verif.Expect(8, false);
+    got_tag = verif.Next(cq_.get(), ignore_cq_result);
+    GPR_ASSERT((got_tag == 8) || (got_tag == 11 && want_done_tag));
+    if (got_tag == 11) {
+      EXPECT_TRUE(srv_ctx.IsCancelled());
+      want_done_tag = false;
+      // Now get the other entry that we were waiting on
+      EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 8);
+    }
 
     if (server_try_cancel_thd != NULL) {
       server_try_cancel_thd->join();
@@ -1116,7 +1228,15 @@
     }
 
     if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
-      ServerTryCancel(&srv_ctx);
+      srv_ctx.TryCancel();
+      want_done_tag = true;
+      verif.Expect(11, true);
+    }
+
+    if (want_done_tag) {
+      verif.Verify(cq_.get());
+      EXPECT_TRUE(srv_ctx.IsCancelled());
+      want_done_tag = false;
     }
 
     // The RPC has been cancelled at this point for sure (i.e irrespective of