Merge pull request #13554 from danzh2010/asyncio

udp_server async I/O
diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc
index 68ab935..7b7d694 100644
--- a/src/core/lib/iomgr/udp_server.cc
+++ b/src/core/lib/iomgr/udp_server.cc
@@ -47,6 +47,7 @@
 #include "src/core/lib/channel/channel_args.h"
 #include "src/core/lib/iomgr/error.h"
 #include "src/core/lib/iomgr/ev_posix.h"
+#include "src/core/lib/iomgr/executor.h"
 #include "src/core/lib/iomgr/resolve_address.h"
 #include "src/core/lib/iomgr/sockaddr.h"
 #include "src/core/lib/iomgr/sockaddr_utils.h"
@@ -71,14 +72,22 @@
   grpc_udp_server_read_cb read_cb;
   grpc_udp_server_write_cb write_cb;
   grpc_udp_server_orphan_cb orphan_cb;
+  // To be scheduled on another thread to actually read/write.
+  grpc_closure do_read_closure;
+  grpc_closure do_write_closure;
+  grpc_closure notify_on_write_closure;
   // True if orphan_cb is trigered.
   bool orphan_notified;
+  // True if grpc_fd_notify_on_write() is called after on_write() call.
+  bool notify_on_write_armed;
+  // True if fd has been shutdown.
+  bool already_shutdown;
 
   struct grpc_udp_listener* next;
 };
 
 struct shutdown_fd_args {
-  grpc_fd* fd;
+  grpc_udp_listener* sp;
   gpr_mu* server_mu;
 };
 
@@ -144,8 +153,17 @@
 static void shutdown_fd(grpc_exec_ctx* exec_ctx, void* args,
                         grpc_error* error) {
   struct shutdown_fd_args* shutdown_args = (struct shutdown_fd_args*)args;
+  grpc_udp_listener* sp = shutdown_args->sp;
+  gpr_log(GPR_DEBUG, "shutdown fd %d", sp->fd);
   gpr_mu_lock(shutdown_args->server_mu);
-  grpc_fd_shutdown(exec_ctx, shutdown_args->fd, GRPC_ERROR_REF(error));
+  grpc_fd_shutdown(exec_ctx, sp->emfd, GRPC_ERROR_REF(error));
+  sp->already_shutdown = true;
+  if (!sp->notify_on_write_armed) {
+    // Re-arm write notification to notify listener with error. This is
+    // necessary to decrement active_ports.
+    sp->notify_on_write_armed = true;
+    grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure);
+  }
   gpr_mu_unlock(shutdown_args->server_mu);
   gpr_free(shutdown_args);
 }
@@ -161,6 +179,7 @@
 
   gpr_mu_destroy(&s->mu);
 
+  gpr_log(GPR_DEBUG, "Destroy all listeners.");
   while (s->head) {
     grpc_udp_listener* sp = s->head;
     s->head = sp->next;
@@ -207,9 +226,10 @@
         /* Call the orphan_cb to signal that the FD is about to be closed and
          * should no longer be used. Because at this point, all listening ports
          * have been shutdown already, no need to shutdown again.*/
-        GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, dummy_cb, sp->emfd,
+        GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, dummy_cb, sp,
                           grpc_schedule_on_exec_ctx);
         GPR_ASSERT(sp->orphan_cb);
+        gpr_log(GPR_DEBUG, "Orphan fd %d", sp->fd);
         sp->orphan_cb(exec_ctx, sp->emfd, &sp->orphan_fd_closure,
                       sp->server->user_data);
       }
@@ -233,13 +253,14 @@
 
   s->shutdown_complete = on_done;
 
+  gpr_log(GPR_DEBUG, "start to destroy udp_server");
   /* shutdown all fd's */
   if (s->active_ports) {
     for (sp = s->head; sp; sp = sp->next) {
       GPR_ASSERT(sp->orphan_cb);
       struct shutdown_fd_args* args =
           (struct shutdown_fd_args*)gpr_malloc(sizeof(*args));
-      args->fd = sp->emfd;
+      args->sp = sp;
       args->server_mu = &s->mu;
       GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, shutdown_fd, args,
                         grpc_schedule_on_exec_ctx);
@@ -329,6 +350,28 @@
   return -1;
 }
 
+static void do_read(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
+  grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg);
+  GPR_ASSERT(sp->read_cb && error == GRPC_ERROR_NONE);
+  /* TODO: the reason we hold server->mu here is merely to prevent fd
+   * shutdown while we are reading. However, it blocks do_write(). Switch to
+   * read lock if available. */
+  gpr_mu_lock(&sp->server->mu);
+  /* Tell the registered callback that data is available to read. */
+  if (!sp->already_shutdown &&
+      sp->read_cb(exec_ctx, sp->emfd, sp->server->user_data)) {
+    /* There maybe more packets to read. Schedule read_more_cb_ closure to run
+     * after finishing this event loop. */
+    GRPC_CLOSURE_SCHED(exec_ctx, &sp->do_read_closure, GRPC_ERROR_NONE);
+  } else {
+    /* Finish reading all the packets, re-arm the notification event so we can
+     * get another chance to read. Or fd already shutdown, re-arm to get a
+     * notification with shutdown error. */
+    grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
+  }
+  gpr_mu_unlock(&sp->server->mu);
+}
+
 /* event manager callback when reads are ready */
 static void on_read(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
   grpc_udp_listener* sp = (grpc_udp_listener*)arg;
@@ -343,13 +386,51 @@
     }
     return;
   }
-
-  /* Tell the registered callback that data is available to read. */
+  /* Read once. If there is more data to read, off load the work to another
+   * thread to finish. */
   GPR_ASSERT(sp->read_cb);
-  sp->read_cb(exec_ctx, sp->emfd, sp->server->user_data);
+  if (sp->read_cb(exec_ctx, sp->emfd, sp->server->user_data)) {
+    /* There maybe more packets to read. Schedule read_more_cb_ closure to run
+     * after finishing this event loop. */
+    GRPC_CLOSURE_INIT(&sp->do_read_closure, do_read, arg,
+                      grpc_executor_scheduler(GRPC_EXECUTOR_LONG));
+    GRPC_CLOSURE_SCHED(exec_ctx, &sp->do_read_closure, GRPC_ERROR_NONE);
+  } else {
+    /* Finish reading all the packets, re-arm the notification event so we can
+     * get another chance to read. Or fd already shutdown, re-arm to get a
+     * notification with shutdown error. */
+    grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
+  }
+  gpr_mu_unlock(&sp->server->mu);
+}
 
-  /* Re-arm the notification event so we get another chance to read. */
-  grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
+// Wrapper of grpc_fd_notify_on_write() with a grpc_closure callback interface.
+void fd_notify_on_write_wrapper(grpc_exec_ctx* exec_ctx, void* arg,
+                                grpc_error* error) {
+  grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg);
+  gpr_mu_lock(&sp->server->mu);
+  if (!sp->notify_on_write_armed) {
+    grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure);
+    sp->notify_on_write_armed = true;
+  }
+  gpr_mu_unlock(&sp->server->mu);
+}
+
+static void do_write(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
+  grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg);
+  gpr_mu_lock(&(sp->server->mu));
+  if (sp->already_shutdown) {
+    // If fd has been shutdown, don't write any more and re-arm notification.
+    grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure);
+  } else {
+    sp->notify_on_write_armed = false;
+    /* Tell the registered callback that the socket is writeable. */
+    GPR_ASSERT(sp->write_cb && error == GRPC_ERROR_NONE);
+    GRPC_CLOSURE_INIT(&sp->notify_on_write_closure, fd_notify_on_write_wrapper,
+                      arg, grpc_schedule_on_exec_ctx);
+    sp->write_cb(exec_ctx, sp->emfd, sp->server->user_data,
+                 &sp->notify_on_write_closure);
+  }
   gpr_mu_unlock(&sp->server->mu);
 }
 
@@ -367,12 +448,11 @@
     return;
   }
 
-  /* Tell the registered callback that the socket is writeable. */
-  GPR_ASSERT(sp->write_cb);
-  sp->write_cb(exec_ctx, sp->emfd, sp->server->user_data);
+  /* Schedule actual write in another thread. */
+  GRPC_CLOSURE_INIT(&sp->do_write_closure, do_write, arg,
+                    grpc_executor_scheduler(GRPC_EXECUTOR_LONG));
 
-  /* Re-arm the notification event so we get another chance to write. */
-  grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure);
+  GRPC_CLOSURE_SCHED(exec_ctx, &sp->do_write_closure, GRPC_ERROR_NONE);
   gpr_mu_unlock(&sp->server->mu);
 }
 
@@ -409,6 +489,7 @@
     sp->write_cb = write_cb;
     sp->orphan_cb = orphan_cb;
     sp->orphan_notified = false;
+    sp->already_shutdown = false;
     GPR_ASSERT(sp->emfd);
     gpr_mu_unlock(&s->mu);
     gpr_free(name);
@@ -533,6 +614,7 @@
 
     GRPC_CLOSURE_INIT(&sp->write_closure, on_write, sp,
                       grpc_schedule_on_exec_ctx);
+    sp->notify_on_write_armed = true;
     grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure);
 
     /* Registered for both read and write callbacks: increment active_ports
diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h
index 8cc08d3..1bd6922 100644
--- a/src/core/lib/iomgr/udp_server.h
+++ b/src/core/lib/iomgr/udp_server.h
@@ -30,13 +30,16 @@
 /* Forward decl of grpc_udp_server */
 typedef struct grpc_udp_server grpc_udp_server;
 
-/* Called when data is available to read from the socket. */
-typedef void (*grpc_udp_server_read_cb)(grpc_exec_ctx* exec_ctx, grpc_fd* emfd,
+/* Called when data is available to read from the socket.
+ * Return true if there is more data to read from fd. */
+typedef bool (*grpc_udp_server_read_cb)(grpc_exec_ctx* exec_ctx, grpc_fd* emfd,
                                         void* user_data);
 
-/* Called when the socket is writeable. */
+/* Called when the socket is writeable. The given closure should be scheduled
+ * when the socket becomes blocked next time. */
 typedef void (*grpc_udp_server_write_cb)(grpc_exec_ctx* exec_ctx, grpc_fd* emfd,
-                                         void* user_data);
+                                         void* user_data,
+                                         grpc_closure* notify_on_write_closure);
 
 /* Called when the grpc_fd is about to be orphaned (and the FD closed). */
 typedef void (*grpc_udp_server_orphan_cb)(grpc_exec_ctx* exec_ctx,
diff --git a/test/core/iomgr/udp_server_test.cc b/test/core/iomgr/udp_server_test.cc
index 803f017..6e17be9 100644
--- a/test/core/iomgr/udp_server_test.cc
+++ b/test/core/iomgr/udp_server_test.cc
@@ -50,7 +50,7 @@
 static int g_number_of_bytes_read = 0;
 static int g_number_of_orphan_calls = 0;
 
-static void on_read(grpc_exec_ctx* exec_ctx, grpc_fd* emfd, void* user_data) {
+static bool on_read(grpc_exec_ctx* exec_ctx, grpc_fd* emfd, void* user_data) {
   char read_buffer[512];
   ssize_t byte_count;
 
@@ -64,9 +64,11 @@
   GPR_ASSERT(GRPC_LOG_IF_ERROR(
       "pollset_kick", grpc_pollset_kick(exec_ctx, g_pollset, nullptr)));
   gpr_mu_unlock(g_mu);
+  return false;
 }
 
-static void on_write(grpc_exec_ctx* exec_ctx, grpc_fd* emfd, void* user_data) {
+static void on_write(grpc_exec_ctx* exec_ctx, grpc_fd* emfd, void* user_data,
+                     grpc_closure* notify_on_write_closure) {
   gpr_mu_lock(g_mu);
   g_number_of_writes++;
 
@@ -79,6 +81,7 @@
                            grpc_closure* closure, void* user_data) {
   gpr_log(GPR_INFO, "gRPC FD about to be orphaned: %d",
           grpc_fd_wrapped_fd(emfd));
+  GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE);
   g_number_of_orphan_calls++;
 }
 
@@ -226,7 +229,6 @@
   int clifd, svrfd;
   grpc_udp_server* s = grpc_udp_server_create(nullptr);
   int i;
-  int number_of_reads_before;
   grpc_millis deadline;
   grpc_pollset* pollsets[1];
   LOG_TEST("test_receive");
@@ -256,14 +258,14 @@
     deadline =
         grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(10));
 
-    number_of_reads_before = g_number_of_reads;
+    int number_of_bytes_read_before = g_number_of_bytes_read;
     /* Create a socket, send a packet to the UDP server. */
     clifd = socket(addr->ss_family, SOCK_DGRAM, 0);
     GPR_ASSERT(clifd >= 0);
     GPR_ASSERT(connect(clifd, (struct sockaddr*)addr,
                        (socklen_t)resolved_addr.len) == 0);
     GPR_ASSERT(5 == write(clifd, "hello", 5));
-    while (g_number_of_reads == number_of_reads_before &&
+    while (g_number_of_bytes_read < (number_of_bytes_read_before + 5) &&
            deadline > grpc_exec_ctx_now(&exec_ctx)) {
       grpc_pollset_worker* worker = nullptr;
       GPR_ASSERT(GRPC_LOG_IF_ERROR(
@@ -273,7 +275,6 @@
       grpc_exec_ctx_flush(&exec_ctx);
       gpr_mu_lock(g_mu);
     }
-    GPR_ASSERT(g_number_of_reads == number_of_reads_before + 1);
     close(clifd);
   }
   GPR_ASSERT(g_number_of_bytes_read == 5 * number_of_clients);
@@ -286,9 +287,6 @@
   /* The server had a single FD, which is orphaned exactly once in *
    * grpc_udp_server_destroy. */
   GPR_ASSERT(g_number_of_orphan_calls == 1);
-
-  /* The write callback should have fired a few times. */
-  GPR_ASSERT(g_number_of_writes > 0);
 }
 
 static void destroy_pollset(grpc_exec_ctx* exec_ctx, void* p,