Merge github.com:grpc/grpc into reuse_port
diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h
index c51ffa4..c0ed139 100644
--- a/include/grpc/impl/codegen/grpc_types.h
+++ b/include/grpc/impl/codegen/grpc_types.h
@@ -156,6 +156,8 @@
 #define GRPC_SSL_TARGET_NAME_OVERRIDE_ARG "grpc.ssl_target_name_override"
 /* Maximum metadata size */
 #define GRPC_ARG_MAX_METADATA_SIZE "grpc.max_metadata_size"
+/** If non-zero, allow the use of SO_REUSEPORT if it's available (default 1) */
+#define GRPC_ARG_ALLOW_REUSEPORT "grpc.so_reuseport"
 
 /** Result of a grpc call. If the caller satisfies the prerequisites of a
     particular operation, the grpc_call_error returned will be GRPC_CALL_OK.
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
index 9bae3a9..e5c9879 100644
--- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
+++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
@@ -97,7 +97,8 @@
     goto error;
   }
 
-  err = grpc_tcp_server_create(NULL, &tcp);
+  err =
+      grpc_tcp_server_create(NULL, grpc_server_get_channel_args(server), &tcp);
   if (err != GRPC_ERROR_NONE) {
     goto error;
   }
diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
index ead8a4d..c42810e 100644
--- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
+++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
@@ -216,7 +216,8 @@
   state = gpr_malloc(sizeof(*state));
   memset(state, 0, sizeof(*state));
   grpc_closure_init(&state->destroy_closure, destroy_done, state);
-  err = grpc_tcp_server_create(&state->destroy_closure, &tcp);
+  err = grpc_tcp_server_create(&state->destroy_closure,
+                               grpc_server_get_channel_args(server), &tcp);
   if (err != GRPC_ERROR_NONE) {
     goto error;
   }
diff --git a/src/core/lib/iomgr/socket_utils_common_posix.c b/src/core/lib/iomgr/socket_utils_common_posix.c
index 3a13716..d2f6261 100644
--- a/src/core/lib/iomgr/socket_utils_common_posix.c
+++ b/src/core/lib/iomgr/socket_utils_common_posix.c
@@ -169,6 +169,28 @@
   return GRPC_ERROR_NONE;
 }
 
+/* set a socket to reuse old addresses */
+grpc_error *grpc_set_socket_reuse_port(int fd, int reuse) {
+#ifndef SO_REUSEPORT
+  return GRPC_ERROR_CREATE("SO_REUSEPORT unavailable on compiling system");
+#else
+  int val = (reuse != 0);
+  int newval;
+  socklen_t intlen = sizeof(newval);
+  if (0 != setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &val, sizeof(val))) {
+    return GRPC_OS_ERROR(errno, "setsockopt(SO_REUSEPORT)");
+  }
+  if (0 != getsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &newval, &intlen)) {
+    return GRPC_OS_ERROR(errno, "getsockopt(SO_REUSEPORT)");
+  }
+  if ((newval != 0) != val) {
+    return GRPC_ERROR_CREATE("Failed to set SO_REUSEPORT");
+  }
+
+  return GRPC_ERROR_NONE;
+#endif
+}
+
 /* disable nagle */
 grpc_error *grpc_set_socket_low_latency(int fd, int low_latency) {
   int val = (low_latency != 0);
diff --git a/src/core/lib/iomgr/socket_utils_posix.h b/src/core/lib/iomgr/socket_utils_posix.h
index 30ff39d..7bcc221 100644
--- a/src/core/lib/iomgr/socket_utils_posix.h
+++ b/src/core/lib/iomgr/socket_utils_posix.h
@@ -55,6 +55,9 @@
 /* disable nagle */
 grpc_error *grpc_set_socket_low_latency(int fd, int low_latency);
 
+/* set SO_REUSEPORT */
+grpc_error *grpc_set_socket_reuse_port(int fd, int reuse);
+
 /* Returns true if this system can create AF_INET6 sockets bound to ::1.
    The value is probed once, and cached for the life of the process.
 
diff --git a/src/core/lib/iomgr/tcp_server.h b/src/core/lib/iomgr/tcp_server.h
index 1d6e70d..875a6ca 100644
--- a/src/core/lib/iomgr/tcp_server.h
+++ b/src/core/lib/iomgr/tcp_server.h
@@ -34,6 +34,8 @@
 #ifndef GRPC_CORE_LIB_IOMGR_TCP_SERVER_H
 #define GRPC_CORE_LIB_IOMGR_TCP_SERVER_H
 
+#include <grpc/grpc.h>
+
 #include "src/core/lib/iomgr/closure.h"
 #include "src/core/lib/iomgr/endpoint.h"
 
@@ -59,6 +61,7 @@
    If shutdown_complete is not NULL, it will be used by
    grpc_tcp_server_unref() when the ref count reaches zero. */
 grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete,
+                                   const grpc_channel_args *args,
                                    grpc_tcp_server **server);
 
 /* Start listening to bound ports */
diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c
index 6fb547b..a1a4635 100644
--- a/src/core/lib/iomgr/tcp_server_posix.c
+++ b/src/core/lib/iomgr/tcp_server_posix.c
@@ -112,8 +112,10 @@
   /* destroyed port count: how many ports are completely destroyed */
   size_t destroyed_ports;
 
-  /* is this server shutting down? (boolean) */
-  int shutdown;
+  /* is this server shutting down? */
+  bool shutdown;
+  /* use SO_REUSEPORT */
+  bool so_reuseport;
 
   /* linked list of server ports */
   grpc_tcp_listener *head;
@@ -135,14 +137,42 @@
   size_t next_pollset_to_assign;
 };
 
+static gpr_once check_init = GPR_ONCE_INIT;
+static bool has_so_reuseport;
+
+static void init(void) {
+  int s = socket(AF_INET, SOCK_STREAM, 0);
+  if (s >= 0) {
+    has_so_reuseport = GRPC_LOG_IF_ERROR("check for SO_REUSEPORT",
+                                         grpc_set_socket_reuse_port(s, 1));
+    close(s);
+  }
+}
+
 grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete,
+                                   const grpc_channel_args *args,
                                    grpc_tcp_server **server) {
+  gpr_once_init(&check_init, init);
+
   grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
+  s->so_reuseport = has_so_reuseport;
+  for (size_t i = 0; i < (args == NULL ? 0 : args->num_args); i++) {
+    if (0 == strcmp(GRPC_ARG_ALLOW_REUSEPORT, args->args[i].key)) {
+      if (args->args[i].type == GRPC_ARG_INTEGER) {
+        s->so_reuseport =
+            has_so_reuseport && (args->args[i].value.integer != 0);
+      } else {
+        gpr_free(s);
+        return GRPC_ERROR_CREATE(GRPC_ARG_ALLOW_REUSEPORT
+                                 " must be an integer");
+      }
+    }
+  }
   gpr_ref_init(&s->refs, 1);
   gpr_mu_init(&s->mu);
   s->active_ports = 0;
   s->destroyed_ports = 0;
-  s->shutdown = 0;
+  s->shutdown = false;
   s->shutdown_starting.head = NULL;
   s->shutdown_starting.tail = NULL;
   s->shutdown_complete = shutdown_complete;
@@ -218,7 +248,7 @@
   gpr_mu_lock(&s->mu);
 
   GPR_ASSERT(!s->shutdown);
-  s->shutdown = 1;
+  s->shutdown = true;
 
   /* shutdown all fd's */
   if (s->active_ports) {
@@ -268,13 +298,19 @@
 
 /* Prepare a recently-created socket for listening. */
 static grpc_error *prepare_socket(int fd, const struct sockaddr *addr,
-                                  size_t addr_len, int *port) {
+                                  size_t addr_len, bool so_reuseport,
+                                  int *port) {
   struct sockaddr_storage sockname_temp;
   socklen_t sockname_len;
   grpc_error *err = GRPC_ERROR_NONE;
 
   GPR_ASSERT(fd >= 0);
 
+  if (so_reuseport) {
+    err = grpc_set_socket_reuse_port(fd, 1);
+    if (err != GRPC_ERROR_NONE) goto error;
+  }
+
   err = grpc_set_socket_nonblocking(fd, 1);
   if (err != GRPC_ERROR_NONE) goto error;
   err = grpc_set_socket_cloexec(fd, 1);
@@ -407,7 +443,7 @@
   char *addr_str;
   char *name;
 
-  grpc_error *err = prepare_socket(fd, addr, addr_len, &port);
+  grpc_error *err = prepare_socket(fd, addr, addr_len, s->so_reuseport, &port);
   if (err == GRPC_ERROR_NONE) {
     GPR_ASSERT(port > 0);
     grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1);
@@ -443,6 +479,52 @@
   return err;
 }
 
+static grpc_error *clone_port(grpc_tcp_listener *listener, unsigned count) {
+  grpc_tcp_listener *sp = NULL;
+  char *addr_str;
+  char *name;
+  grpc_error *err;
+
+  for (grpc_tcp_listener *l = listener->next; l && l->is_sibling; l = l->next) {
+    l->fd_index += count;
+  }
+
+  for (unsigned i = 0; i < count; i++) {
+    int fd, port;
+    grpc_dualstack_mode dsmode;
+    err = grpc_create_dualstack_socket(&listener->addr.sockaddr, SOCK_STREAM, 0,
+                                       &dsmode, &fd);
+    if (err != GRPC_ERROR_NONE) return err;
+    err = prepare_socket(fd, &listener->addr.sockaddr, listener->addr_len, true,
+                         &port);
+    if (err != GRPC_ERROR_NONE) return err;
+    listener->server->nports++;
+    grpc_sockaddr_to_string(&addr_str, &listener->addr.sockaddr, 1);
+    gpr_asprintf(&name, "tcp-server-listener:%s/clone-%d", addr_str, i);
+    sp = gpr_malloc(sizeof(grpc_tcp_listener));
+    sp->next = listener->next;
+    listener->next = sp;
+    sp->server = listener->server;
+    sp->fd = fd;
+    sp->emfd = grpc_fd_create(fd, name);
+    memcpy(sp->addr.untyped, listener->addr.untyped, listener->addr_len);
+    sp->addr_len = listener->addr_len;
+    sp->port = port;
+    sp->port_index = listener->port_index;
+    sp->fd_index = listener->fd_index + count - i;
+    sp->is_sibling = 1;
+    sp->sibling = listener->is_sibling ? listener->sibling : listener;
+    GPR_ASSERT(sp->emfd);
+    while (listener->server->tail->next != NULL) {
+      listener->server->tail = listener->server->tail->next;
+    }
+    gpr_free(addr_str);
+    gpr_free(name);
+  }
+
+  return GRPC_ERROR_NONE;
+}
+
 grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
                                      size_t addr_len, int *out_port) {
   grpc_tcp_listener *sp;
@@ -599,14 +681,29 @@
   s->on_accept_cb_arg = on_accept_cb_arg;
   s->pollsets = pollsets;
   s->pollset_count = pollset_count;
-  for (sp = s->head; sp; sp = sp->next) {
-    for (i = 0; i < pollset_count; i++) {
-      grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd);
+  sp = s->head;
+  while (sp != NULL) {
+    if (s->so_reuseport && pollset_count > 1) {
+      GPR_ASSERT(GRPC_LOG_IF_ERROR(
+          "clone_port", clone_port(sp, (unsigned)(pollset_count - 1))));
+      for (i = 0; i < pollset_count; i++) {
+        grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd);
+        sp->read_closure.cb = on_read;
+        sp->read_closure.cb_arg = sp;
+        grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
+        s->active_ports++;
+        sp = sp->next;
+      }
+    } else {
+      for (i = 0; i < pollset_count; i++) {
+        grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd);
+      }
+      sp->read_closure.cb = on_read;
+      sp->read_closure.cb_arg = sp;
+      grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
+      s->active_ports++;
+      sp = sp->next;
     }
-    sp->read_closure.cb = on_read;
-    sp->read_closure.cb_arg = sp;
-    grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
-    s->active_ports++;
   }
   gpr_mu_unlock(&s->mu);
 }
diff --git a/test/core/iomgr/tcp_server_posix_test.c b/test/core/iomgr/tcp_server_posix_test.c
index eda774f..6361e7a 100644
--- a/test/core/iomgr/tcp_server_posix_test.c
+++ b/test/core/iomgr/tcp_server_posix_test.c
@@ -129,7 +129,7 @@
 static void test_no_op(void) {
   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
   grpc_tcp_server *s;
-  GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(NULL, &s));
+  GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(NULL, NULL, &s));
   grpc_tcp_server_unref(&exec_ctx, s);
   grpc_exec_ctx_finish(&exec_ctx);
 }
@@ -137,7 +137,7 @@
 static void test_no_op_with_start(void) {
   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
   grpc_tcp_server *s;
-  GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(NULL, &s));
+  GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(NULL, NULL, &s));
   LOG_TEST("test_no_op_with_start");
   grpc_tcp_server_start(&exec_ctx, s, NULL, 0, on_connect, NULL);
   grpc_tcp_server_unref(&exec_ctx, s);
@@ -148,7 +148,7 @@
   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
   struct sockaddr_in addr;
   grpc_tcp_server *s;
-  GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(NULL, &s));
+  GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(NULL, NULL, &s));
   LOG_TEST("test_no_op_with_port");
 
   memset(&addr, 0, sizeof(addr));
@@ -166,7 +166,7 @@
   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
   struct sockaddr_in addr;
   grpc_tcp_server *s;
-  GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(NULL, &s));
+  GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(NULL, NULL,  &s));
   LOG_TEST("test_no_op_with_port_and_start");
   int port;
 
@@ -226,7 +226,7 @@
   unsigned svr1_fd_count;
   int svr1_port;
   grpc_tcp_server *s;
-  GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(NULL, &s));
+  GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(NULL, NULL, &s));
   unsigned i;
   server_weak_ref weak_ref;
   server_weak_ref_init(&weak_ref);
diff --git a/test/core/surface/concurrent_connectivity_test.c b/test/core/surface/concurrent_connectivity_test.c
index f447a68..f7567f3 100644
--- a/test/core/surface/concurrent_connectivity_test.c
+++ b/test/core/surface/concurrent_connectivity_test.c
@@ -113,7 +113,7 @@
   socklen_t addr_len = sizeof(addr);
   int port;
   grpc_tcp_server *s;
-  grpc_error *error = grpc_tcp_server_create(NULL, &s);
+  grpc_error *error = grpc_tcp_server_create(NULL, NULL, &s);
   GPR_ASSERT(error == GRPC_ERROR_NONE);
   memset(&addr, 0, sizeof(addr));
   addr.ss_family = AF_INET;
diff --git a/test/core/surface/server_chttp2_test.c b/test/core/surface/server_chttp2_test.c
index f42ca9f..40cfa6b 100644
--- a/test/core/surface/server_chttp2_test.c
+++ b/test/core/surface/server_chttp2_test.c
@@ -49,10 +49,16 @@
 }
 
 void test_add_same_port_twice() {
+  grpc_arg a;
+  a.type = GRPC_ARG_INTEGER;
+  a.key = GRPC_ARG_ALLOW_REUSEPORT;
+  a.value.integer = 0;
+  grpc_channel_args args = {1, &a};
+  
   int port = grpc_pick_unused_port_or_die();
   char *addr = NULL;
   grpc_completion_queue *cq = grpc_completion_queue_create(NULL);
-  grpc_server *server = grpc_server_create(NULL, NULL);
+  grpc_server *server = grpc_server_create(&args, NULL);
   grpc_server_credentials *fake_creds =
       grpc_fake_transport_security_server_credentials_create();
   gpr_join_host_port(&addr, "localhost", port);
diff --git a/test/core/surface/server_test.c b/test/core/surface/server_test.c
index 02eb432..6dd8a43 100644
--- a/test/core/surface/server_test.c
+++ b/test/core/surface/server_test.c
@@ -82,9 +82,15 @@
 }
 
 void test_bind_server_twice(void) {
+  grpc_arg a;
+  a.type = GRPC_ARG_INTEGER;
+  a.key = GRPC_ARG_ALLOW_REUSEPORT;
+  a.value.integer = 0;
+  grpc_channel_args args = {1, &a};
+
   char *addr;
-  grpc_server *server1 = grpc_server_create(NULL, NULL);
-  grpc_server *server2 = grpc_server_create(NULL, NULL);
+  grpc_server *server1 = grpc_server_create(&args, NULL);
+  grpc_server *server2 = grpc_server_create(&args, NULL);
   grpc_completion_queue *cq = grpc_completion_queue_create(NULL);
   int port = grpc_pick_unused_port_or_die();
   gpr_asprintf(&addr, "[::]:%d", port);
diff --git a/test/core/util/test_tcp_server.c b/test/core/util/test_tcp_server.c
index 27c16fc..c1d307f 100644
--- a/test/core/util/test_tcp_server.c
+++ b/test/core/util/test_tcp_server.c
@@ -73,7 +73,7 @@
   memset(&addr.sin_addr, 0, sizeof(addr.sin_addr));
 
   grpc_error *error =
-      grpc_tcp_server_create(&server->shutdown_complete, &server->tcp_server);
+      grpc_tcp_server_create(&server->shutdown_complete, NULL, &server->tcp_server);
   GPR_ASSERT(error == GRPC_ERROR_NONE);
   error = grpc_tcp_server_add_port(server->tcp_server, &addr, sizeof(addr),
                                    &port_added);