Merge pull request #13450 from daniel-j-born/tcp_client

Refactor POSIX TCP client connect.
diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc
index 15062a5..24ccab1 100644
--- a/src/core/lib/iomgr/tcp_client_posix.cc
+++ b/src/core/lib/iomgr/tcp_client_posix.cc
@@ -236,65 +236,68 @@
   GRPC_CLOSURE_SCHED(closure, error);
 }
 
-static void tcp_client_connect_impl(grpc_closure* closure, grpc_endpoint** ep,
-                                    grpc_pollset_set* interested_parties,
-                                    const grpc_channel_args* channel_args,
-                                    const grpc_resolved_address* addr,
-                                    grpc_millis deadline) {
-  int fd;
+grpc_error* grpc_tcp_client_prepare_fd(const grpc_channel_args* channel_args,
+                                       const grpc_resolved_address* addr,
+                                       grpc_resolved_address* mapped_addr,
+                                       grpc_fd** fdobj) {
   grpc_dualstack_mode dsmode;
-  int err;
-  async_connect* ac;
-  grpc_resolved_address addr6_v4mapped;
-  grpc_resolved_address addr4_copy;
-  grpc_fd* fdobj;
+  int fd;
+  grpc_error* error;
   char* name;
   char* addr_str;
-  grpc_error* error;
-
-  *ep = nullptr;
-
-  /* Use dualstack sockets where available. */
-  if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
-    addr = &addr6_v4mapped;
+  *fdobj = nullptr;
+  /* Use dualstack sockets where available. Set mapped to v6 or v4 mapped to
+     v6. */
+  if (!grpc_sockaddr_to_v4mapped(addr, mapped_addr)) {
+    /* addr is v4 mapped to v6 or v6. */
+    memcpy(mapped_addr, addr, sizeof(*mapped_addr));
   }
-
-  error = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode, &fd);
+  error =
+      grpc_create_dualstack_socket(mapped_addr, SOCK_STREAM, 0, &dsmode, &fd);
   if (error != GRPC_ERROR_NONE) {
-    GRPC_CLOSURE_SCHED(closure, error);
-    return;
+    return error;
   }
   if (dsmode == GRPC_DSMODE_IPV4) {
-    /* If we got an AF_INET socket, map the address back to IPv4. */
-    GPR_ASSERT(grpc_sockaddr_is_v4mapped(addr, &addr4_copy));
-    addr = &addr4_copy;
+    /* Original addr is either v4 or v4 mapped to v6. Set mapped_addr to v4. */
+    if (!grpc_sockaddr_is_v4mapped(addr, mapped_addr)) {
+      memcpy(mapped_addr, addr, sizeof(*mapped_addr));
+    }
   }
-  if ((error = prepare_socket(addr, fd, channel_args)) != GRPC_ERROR_NONE) {
-    GRPC_CLOSURE_SCHED(closure, error);
-    return;
+  if ((error = prepare_socket(mapped_addr, fd, channel_args)) !=
+      GRPC_ERROR_NONE) {
+    return error;
   }
+  addr_str = grpc_sockaddr_to_uri(mapped_addr);
+  gpr_asprintf(&name, "tcp-client:%s", addr_str);
+  *fdobj = grpc_fd_create(fd, name);
+  gpr_free(name);
+  gpr_free(addr_str);
+  return GRPC_ERROR_NONE;
+}
 
+void grpc_tcp_client_create_from_prepared_fd(
+    grpc_pollset_set* interested_parties, grpc_closure* closure, grpc_fd* fdobj,
+    const grpc_channel_args* channel_args, const grpc_resolved_address* addr,
+    grpc_millis deadline, grpc_endpoint** ep) {
+  const int fd = grpc_fd_wrapped_fd(fdobj);
+  int err;
+  async_connect* ac;
   do {
     GPR_ASSERT(addr->len < ~(socklen_t)0);
     err = connect(fd, (const struct sockaddr*)addr->addr, (socklen_t)addr->len);
   } while (err < 0 && errno == EINTR);
-
-  addr_str = grpc_sockaddr_to_uri(addr);
-  gpr_asprintf(&name, "tcp-client:%s", addr_str);
-
-  fdobj = grpc_fd_create(fd, name);
-
   if (err >= 0) {
+    char* addr_str = grpc_sockaddr_to_uri(addr);
     *ep = grpc_tcp_client_create_from_fd(fdobj, channel_args, addr_str);
+    gpr_free(addr_str);
     GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
-    goto done;
+    return;
   }
-
   if (errno != EWOULDBLOCK && errno != EINPROGRESS) {
     grpc_fd_orphan(fdobj, nullptr, nullptr, false /* already_closed */,
                    "tcp_client_connect_error");
     GRPC_CLOSURE_SCHED(closure, GRPC_OS_ERROR(errno, "connect"));
-    goto done;
+    return;
   }
 
   grpc_pollset_set_add_fd(interested_parties, fdobj);
@@ -304,8 +307,7 @@
   ac->ep = ep;
   ac->fd = fdobj;
   ac->interested_parties = interested_parties;
-  ac->addr_str = addr_str;
-  addr_str = nullptr;
+  ac->addr_str = grpc_sockaddr_to_uri(addr);
   gpr_mu_init(&ac->mu);
   ac->refs = 2;
   GRPC_CLOSURE_INIT(&ac->write_closure, on_writable, ac,
@@ -322,10 +324,25 @@
   grpc_timer_init(&ac->alarm, deadline, &ac->on_alarm);
   grpc_fd_notify_on_write(ac->fd, &ac->write_closure);
   gpr_mu_unlock(&ac->mu);
+}
 
-done:
-  gpr_free(name);
-  gpr_free(addr_str);
+static void tcp_client_connect_impl(grpc_closure* closure, grpc_endpoint** ep,
+                                    grpc_pollset_set* interested_parties,
+                                    const grpc_channel_args* channel_args,
+                                    const grpc_resolved_address* addr,
+                                    grpc_millis deadline) {
+  grpc_resolved_address mapped_addr;
+  grpc_fd* fdobj = nullptr;
+  grpc_error* error;
+  *ep = nullptr;
+  if ((error = grpc_tcp_client_prepare_fd(channel_args, addr, &mapped_addr,
+                                          &fdobj)) != GRPC_ERROR_NONE) {
+    GRPC_CLOSURE_SCHED(closure, error);
+    return;
+  }
+  grpc_tcp_client_create_from_prepared_fd(interested_parties, closure, fdobj,
+                                          channel_args, &mapped_addr, deadline,
+                                          ep);
 }
 
 // overridden by api_fuzzer.c
diff --git a/src/core/lib/iomgr/tcp_client_posix.h b/src/core/lib/iomgr/tcp_client_posix.h
index 7d0f133..57e50a6 100644
--- a/src/core/lib/iomgr/tcp_client_posix.h
+++ b/src/core/lib/iomgr/tcp_client_posix.h
@@ -23,7 +23,44 @@
 #include "src/core/lib/iomgr/ev_posix.h"
 #include "src/core/lib/iomgr/tcp_client.h"
 
+/* Create an endpoint from a connected grpc_fd.
+
+   fd: a connected FD. Ownership is taken.
+   channel_args: may contain custom settings for the endpoint
+   addr_str: destination address in printable format
+   Returns: a new endpoint
+*/
 grpc_endpoint* grpc_tcp_client_create_from_fd(
     grpc_fd* fd, const grpc_channel_args* channel_args, const char* addr_str);
 
+/* Return a configured, unbound, unconnected TCP client grpc_fd.
+
+   channel_args: may contain custom settings for the fd
+   addr: the destination address
+   mapped_addr: out parameter. addr mapped to an address appropriate to the
+     type of socket FD created. For example, if addr is IPv4 and dual stack
+     sockets are available, mapped_addr will be an IPv4-mapped IPv6 address
+   fdobj: out parameter. The new FD
+   Returns: error, if any. Out parameters are not set on error
+*/
+grpc_error* grpc_tcp_client_prepare_fd(const grpc_channel_args* channel_args,
+                                       const grpc_resolved_address* addr,
+                                       grpc_resolved_address* mapped_addr,
+                                       grpc_fd** fdobj);
+
+/* Connect a configured TCP client grpc_fd.
+
+   interested_parties: a set of pollsets that would be interested in this
+     connection being established (in order to continue their work
+   closure: called when complete. On success, *ep will be set.
+   fdobj: an FD returned from grpc_tcp_client_prepare_fd(). Ownership is taken
+   channel_args: may contain custom settings for the endpoint
+   deadline: connection deadline
+   ep: out parameter. Set before closure is called if successful
+*/
+void grpc_tcp_client_create_from_prepared_fd(
+    grpc_pollset_set* interested_parties, grpc_closure* closure, grpc_fd* fdobj,
+    const grpc_channel_args* channel_args, const grpc_resolved_address* addr,
+    grpc_millis deadline, grpc_endpoint** ep);
+
 #endif /* GRPC_CORE_LIB_IOMGR_TCP_CLIENT_POSIX_H */
diff --git a/test/core/end2end/dualstack_socket_test.cc b/test/core/end2end/dualstack_socket_test.cc
index ad2b24f..2ba1c17 100644
--- a/test/core/end2end/dualstack_socket_test.cc
+++ b/test/core/end2end/dualstack_socket_test.cc
@@ -29,7 +29,9 @@
 #include <grpc/support/log.h>
 #include <grpc/support/string_util.h>
 
+#include "src/core/lib/iomgr/error.h"
 #include "src/core/lib/iomgr/resolve_address.h"
+#include "src/core/lib/iomgr/sockaddr_utils.h"
 #include "src/core/lib/iomgr/socket_utils_posix.h"
 #include "src/core/lib/slice/slice_string_helpers.h"
 #include "src/core/lib/support/string.h"
@@ -54,6 +56,21 @@
 
 static void do_nothing(void* ignored) {}
 
+static void log_resolved_addrs(const char* label, const char* hostname) {
+  grpc_resolved_addresses* res = nullptr;
+  grpc_error* error = grpc_blocking_resolve_address(hostname, "80", &res);
+  if (error != GRPC_ERROR_NONE || res == nullptr) {
+    GRPC_LOG_IF_ERROR(hostname, error);
+    return;
+  }
+  for (size_t i = 0; i < res->naddrs; ++i) {
+    char* addr_str = grpc_sockaddr_to_uri(&res->addrs[i]);
+    gpr_log(GPR_INFO, "%s: %s", label, addr_str);
+    gpr_free(addr_str);
+  }
+  grpc_resolved_addresses_destroy(res);
+}
+
 void test_connect(const char* server_host, const char* client_host, int port,
                   int expect_ok) {
   char* client_hostport;
@@ -140,6 +157,8 @@
 
   gpr_log(GPR_INFO, "Testing with server=%s client=%s (expecting %s)",
           server_hostport, client_hostport, expect_ok ? "success" : "failure");
+  log_resolved_addrs("server resolved addr", server_host);
+  log_resolved_addrs("client resolved addr", client_host);
 
   gpr_free(client_hostport);
   gpr_free(server_hostport);
@@ -236,6 +255,8 @@
     CQ_EXPECT_COMPLETION(cqv, tag(1), 1);
     cq_verify(cqv);
 
+    gpr_log(GPR_INFO, "status: %d (expected: %d)", status,
+            GRPC_STATUS_UNAVAILABLE);
     GPR_ASSERT(status == GRPC_STATUS_UNAVAILABLE);
   }