Merge pull request #16288 from yashykt/combinernotify

Explictly Flush exec_ctx after resetting call_combiner_set_notify_on_…
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 855b921..f242ee9 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -662,12 +662,8 @@
 if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
 add_dependencies(buildtests_cxx writes_per_rpc_test)
 endif()
-if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
 add_dependencies(buildtests_cxx resolver_component_test_unsecure)
-endif()
-if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
 add_dependencies(buildtests_cxx resolver_component_test)
-endif()
 if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
 add_dependencies(buildtests_cxx resolver_component_tests_runner_invoker_unsecure)
 endif()
@@ -676,9 +672,7 @@
 endif()
 add_dependencies(buildtests_cxx address_sorting_test_unsecure)
 add_dependencies(buildtests_cxx address_sorting_test)
-if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
 add_dependencies(buildtests_cxx cancel_ares_query_test)
-endif()
 
 add_custom_target(buildtests
   DEPENDS buildtests_c buildtests_cxx)
@@ -16213,7 +16207,6 @@
 
 endif (gRPC_BUILD_TESTS)
 if (gRPC_BUILD_TESTS)
-if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
 
 add_executable(resolver_component_test_unsecure
   test/cpp/naming/resolver_component_test.cc
@@ -16253,10 +16246,8 @@
   ${_gRPC_GFLAGS_LIBRARIES}
 )
 
-endif()
 endif (gRPC_BUILD_TESTS)
 if (gRPC_BUILD_TESTS)
-if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
 
 add_executable(resolver_component_test
   test/cpp/naming/resolver_component_test.cc
@@ -16296,7 +16287,6 @@
   ${_gRPC_GFLAGS_LIBRARIES}
 )
 
-endif()
 endif (gRPC_BUILD_TESTS)
 if (gRPC_BUILD_TESTS)
 if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
@@ -16467,7 +16457,6 @@
 
 endif (gRPC_BUILD_TESTS)
 if (gRPC_BUILD_TESTS)
-if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
 
 add_executable(cancel_ares_query_test
   test/cpp/naming/cancel_ares_query_test.cc
@@ -16507,7 +16496,6 @@
   ${_gRPC_GFLAGS_LIBRARIES}
 )
 
-endif()
 endif (gRPC_BUILD_TESTS)
 if (gRPC_BUILD_TESTS)
 
diff --git a/doc/server-reflection.md b/doc/server-reflection.md
index c9b4765..3716dc5 100644
--- a/doc/server-reflection.md
+++ b/doc/server-reflection.md
@@ -191,6 +191,6 @@
 - [Go](https://github.com/grpc/grpc-go/blob/master/Documentation/server-reflection-tutorial.md#enable-server-reflection)
 - [C++](https://grpc.io/grpc/cpp/md_doc_server_reflection_tutorial.html)
 - [C#](https://github.com/grpc/grpc/blob/master/doc/csharp/server_reflection.md)
-- Python: (tutorial not yet written)
+- [Python](https://github.com/grpc/grpc/blob/master/doc/python/server_reflection.md)
 - Ruby: not yet implemented [#2567](https://github.com/grpc/grpc/issues/2567)
 - Node: not yet implemented [#2568](https://github.com/grpc/grpc/issues/2568)
diff --git a/setup.py b/setup.py
index b87843f..388e629 100644
--- a/setup.py
+++ b/setup.py
@@ -276,11 +276,11 @@
 }
 
 INSTALL_REQUIRES = (
-    "six>=1.5.2",
-    "futures>=2.2.0 ; python_version<'3.2'",
-    "enum34>=1.0.4 ; python_version<'3.4'"
+    'six>=1.5.2',
 )
 
+if not PY3:
+  INSTALL_REQUIRES += ('futures>=2.2.0', 'enum34>=1.0.4')
 
 SETUP_REQUIRES = INSTALL_REQUIRES + (
     'sphinx>=1.3',
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc
index 0068d0d..fdbd07e 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc
@@ -74,6 +74,8 @@
   bool shutting_down;
   /** request object that's using this ev driver */
   grpc_ares_request* request;
+  /** Owned by the ev_driver. Creates new GrpcPolledFd's */
+  grpc_core::UniquePtr<grpc_core::GrpcPolledFdFactory> polled_fd_factory;
 };
 
 static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver);
@@ -93,7 +95,7 @@
     GRPC_COMBINER_UNREF(ev_driver->combiner, "free ares event driver");
     ares_destroy(ev_driver->channel);
     grpc_ares_complete_request_locked(ev_driver->request);
-    gpr_free(ev_driver);
+    grpc_core::Delete(ev_driver);
   }
 }
 
@@ -118,13 +120,11 @@
                                               grpc_pollset_set* pollset_set,
                                               grpc_combiner* combiner,
                                               grpc_ares_request* request) {
-  *ev_driver = static_cast<grpc_ares_ev_driver*>(
-      gpr_malloc(sizeof(grpc_ares_ev_driver)));
+  *ev_driver = grpc_core::New<grpc_ares_ev_driver>();
   ares_options opts;
   memset(&opts, 0, sizeof(opts));
   opts.flags |= ARES_FLAG_STAYOPEN;
   int status = ares_init_options(&(*ev_driver)->channel, &opts, ARES_OPT_FLAGS);
-  grpc_core::ConfigureAresChannelLocked(&(*ev_driver)->channel);
   gpr_log(GPR_DEBUG, "grpc_ares_ev_driver_create_locked");
   if (status != ARES_SUCCESS) {
     char* err_msg;
@@ -142,6 +142,10 @@
   (*ev_driver)->working = false;
   (*ev_driver)->shutting_down = false;
   (*ev_driver)->request = request;
+  (*ev_driver)->polled_fd_factory =
+      grpc_core::NewGrpcPolledFdFactory((*ev_driver)->combiner);
+  (*ev_driver)
+      ->polled_fd_factory->ConfigureAresChannelLocked((*ev_driver)->channel);
   return GRPC_ERROR_NONE;
 }
 
@@ -245,8 +249,9 @@
         // Create a new fd_node if sock[i] is not in the fd_node list.
         if (fdn == nullptr) {
           fdn = static_cast<fd_node*>(gpr_malloc(sizeof(fd_node)));
-          fdn->grpc_polled_fd = grpc_core::NewGrpcPolledFdLocked(
-              socks[i], ev_driver->pollset_set);
+          fdn->grpc_polled_fd =
+              ev_driver->polled_fd_factory->NewGrpcPolledFdLocked(
+                  socks[i], ev_driver->pollset_set, ev_driver->combiner);
           gpr_log(GPR_DEBUG, "new fd: %s", fdn->grpc_polled_fd->GetName());
           fdn->ev_driver = ev_driver;
           fdn->readable_registered = false;
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h
index 2c9db71..671c537 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h
@@ -81,10 +81,24 @@
   GRPC_ABSTRACT_BASE_CLASS
 };
 
-/* Creates a new wrapped fd for the current platform */
-GrpcPolledFd* NewGrpcPolledFdLocked(ares_socket_t as,
-                                    grpc_pollset_set* driver_pollset_set);
-void ConfigureAresChannelLocked(ares_channel* channel);
+/* A GrpcPolledFdFactory is 1-to-1 with and owned by the
+ * ares event driver. It knows how to create GrpcPolledFd's
+ * for the current platform, and the ares driver uses it for all of
+ * its fd's. */
+class GrpcPolledFdFactory {
+ public:
+  virtual ~GrpcPolledFdFactory() {}
+  /* Creates a new wrapped fd for the current platform */
+  virtual GrpcPolledFd* NewGrpcPolledFdLocked(
+      ares_socket_t as, grpc_pollset_set* driver_pollset_set,
+      grpc_combiner* combiner) GRPC_ABSTRACT;
+  /* Optionally configures the ares channel after creation */
+  virtual void ConfigureAresChannelLocked(ares_channel channel) GRPC_ABSTRACT;
+
+  GRPC_ABSTRACT_BASE_CLASS
+};
+
+UniquePtr<GrpcPolledFdFactory> NewGrpcPolledFdFactory(grpc_combiner* combiner);
 
 }  // namespace grpc_core
 
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
index fffe9ed..aa58e1a 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
@@ -86,12 +86,20 @@
   grpc_pollset_set* driver_pollset_set_;
 };
 
-GrpcPolledFd* NewGrpcPolledFdLocked(ares_socket_t as,
-                                    grpc_pollset_set* driver_pollset_set) {
-  return grpc_core::New<GrpcPolledFdPosix>(as, driver_pollset_set);
-}
+class GrpcPolledFdFactoryPosix : public GrpcPolledFdFactory {
+ public:
+  GrpcPolledFd* NewGrpcPolledFdLocked(ares_socket_t as,
+                                      grpc_pollset_set* driver_pollset_set,
+                                      grpc_combiner* combiner) override {
+    return New<GrpcPolledFdPosix>(as, driver_pollset_set);
+  }
 
-void ConfigureAresChannelLocked(ares_channel* channel) {}
+  void ConfigureAresChannelLocked(ares_channel channel) override {}
+};
+
+UniquePtr<GrpcPolledFdFactory> NewGrpcPolledFdFactory(grpc_combiner* combiner) {
+  return UniquePtr<GrpcPolledFdFactory>(New<GrpcPolledFdFactoryPosix>());
+}
 
 }  // namespace grpc_core
 
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc
index 5d65ae3..02121aa 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc
@@ -21,38 +21,516 @@
 #if GRPC_ARES == 1 && defined(GPR_WINDOWS)
 
 #include <ares.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/log_windows.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/time.h>
 #include <string.h>
+#include "src/core/lib/gpr/string.h"
 #include "src/core/lib/gprpp/memory.h"
+#include "src/core/lib/iomgr/combiner.h"
+#include "src/core/lib/iomgr/socket_windows.h"
+#include "src/core/lib/iomgr/tcp_windows.h"
+#include "src/core/lib/slice/slice_internal.h"
 
 #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h"
+#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
+
+/* TODO(apolcyn): remove this hack after fixing upstream.
+ * Our grpc/c-ares code on Windows uses the ares_set_socket_functions API,
+ * which uses "struct iovec" type, which on Windows is defined inside of
+ * a c-ares header that is not public.
+ * See https://github.com/c-ares/c-ares/issues/206. */
+struct iovec {
+  void* iov_base;
+  size_t iov_len;
+};
 
 namespace grpc_core {
 
-/* TODO: fill in the body of GrpcPolledFdWindows to enable c-ares on Windows.
-   This dummy implementation only allows grpc to compile on windows with
-   GRPC_ARES=1. */
+/* c-ares creates its own sockets and is meant to read them when readable and
+ * write them when writeable. To fit this socket usage model into the grpc
+ * windows poller (which gives notifications when attempted reads and writes are
+ * actually fulfilled rather than possible), this GrpcPolledFdWindows class
+ * takes advantage of the ares_set_socket_functions API and acts as a virtual
+ * socket. It holds its own read and write buffers which are written to and read
+ * from c-ares and are used with the grpc windows poller, and it, e.g.,
+ * manufactures virtual socket error codes when it e.g. needs to tell the c-ares
+ * library to wait for an async read. */
 class GrpcPolledFdWindows : public GrpcPolledFd {
  public:
-  GrpcPolledFdWindows() { abort(); }
-  ~GrpcPolledFdWindows() { abort(); }
+  enum WriteState {
+    WRITE_IDLE,
+    WRITE_REQUESTED,
+    WRITE_PENDING,
+    WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY,
+  };
+
+  GrpcPolledFdWindows(ares_socket_t as, grpc_combiner* combiner)
+      : read_buf_(grpc_empty_slice()),
+        write_buf_(grpc_empty_slice()),
+        write_state_(WRITE_IDLE),
+        gotten_into_driver_list_(false) {
+    gpr_asprintf(&name_, "c-ares socket: %" PRIdPTR, as);
+    winsocket_ = grpc_winsocket_create(as, name_);
+    combiner_ = GRPC_COMBINER_REF(combiner, name_);
+    GRPC_CLOSURE_INIT(&outer_read_closure_,
+                      &GrpcPolledFdWindows::OnIocpReadable, this,
+                      grpc_combiner_scheduler(combiner_));
+    GRPC_CLOSURE_INIT(&outer_write_closure_,
+                      &GrpcPolledFdWindows::OnIocpWriteable, this,
+                      grpc_combiner_scheduler(combiner_));
+  }
+
+  ~GrpcPolledFdWindows() {
+    GRPC_COMBINER_UNREF(combiner_, name_);
+    grpc_slice_unref_internal(read_buf_);
+    grpc_slice_unref_internal(write_buf_);
+    GPR_ASSERT(read_closure_ == nullptr);
+    GPR_ASSERT(write_closure_ == nullptr);
+    grpc_winsocket_destroy(winsocket_);
+    gpr_free(name_);
+  }
+
+  void ScheduleAndNullReadClosure(grpc_error* error) {
+    GRPC_CLOSURE_SCHED(read_closure_, error);
+    read_closure_ = nullptr;
+  }
+
+  void ScheduleAndNullWriteClosure(grpc_error* error) {
+    GRPC_CLOSURE_SCHED(write_closure_, error);
+    write_closure_ = nullptr;
+  }
+
   void RegisterForOnReadableLocked(grpc_closure* read_closure) override {
-    abort();
+    GPR_ASSERT(read_closure_ == nullptr);
+    read_closure_ = read_closure;
+    GPR_ASSERT(GRPC_SLICE_LENGTH(read_buf_) == 0);
+    grpc_slice_unref_internal(read_buf_);
+    read_buf_ = GRPC_SLICE_MALLOC(4192);
+    WSABUF buffer;
+    buffer.buf = (char*)GRPC_SLICE_START_PTR(read_buf_);
+    buffer.len = GRPC_SLICE_LENGTH(read_buf_);
+    memset(&winsocket_->read_info.overlapped, 0, sizeof(OVERLAPPED));
+    recv_from_source_addr_len_ = sizeof(recv_from_source_addr_);
+    DWORD flags = 0;
+    if (WSARecvFrom(grpc_winsocket_wrapped_socket(winsocket_), &buffer, 1,
+                    nullptr, &flags, (sockaddr*)recv_from_source_addr_,
+                    &recv_from_source_addr_len_,
+                    &winsocket_->read_info.overlapped, nullptr)) {
+      char* msg = gpr_format_message(WSAGetLastError());
+      grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
+      GRPC_CARES_TRACE_LOG(
+          "RegisterForOnReadableLocked: WSARecvFrom error:|%s|. fd:|%s|", msg,
+          GetName());
+      gpr_free(msg);
+      if (WSAGetLastError() != WSA_IO_PENDING) {
+        ScheduleAndNullReadClosure(error);
+        return;
+      }
+    }
+    grpc_socket_notify_on_read(winsocket_, &outer_read_closure_);
   }
+
   void RegisterForOnWriteableLocked(grpc_closure* write_closure) override {
+    GRPC_CARES_TRACE_LOG(
+        "RegisterForOnWriteableLocked. fd:|%s|. Current write state: %d",
+        GetName(), write_state_);
+    GPR_ASSERT(write_closure_ == nullptr);
+    write_closure_ = write_closure;
+    switch (write_state_) {
+      case WRITE_IDLE:
+        ScheduleAndNullWriteClosure(GRPC_ERROR_NONE);
+        break;
+      case WRITE_REQUESTED:
+        write_state_ = WRITE_PENDING;
+        SendWriteBuf(nullptr, &winsocket_->write_info.overlapped);
+        grpc_socket_notify_on_write(winsocket_, &outer_write_closure_);
+        break;
+      case WRITE_PENDING:
+      case WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY:
+        abort();
+    }
+  }
+
+  bool IsFdStillReadableLocked() override {
+    return GRPC_SLICE_LENGTH(read_buf_) > 0;
+  }
+
+  void ShutdownLocked(grpc_error* error) override {
+    grpc_winsocket_shutdown(winsocket_);
+  }
+
+  ares_socket_t GetWrappedAresSocketLocked() override {
+    return grpc_winsocket_wrapped_socket(winsocket_);
+  }
+
+  const char* GetName() override { return name_; }
+
+  ares_ssize_t RecvFrom(void* data, ares_socket_t data_len, int flags,
+                        struct sockaddr* from, ares_socklen_t* from_len) {
+    GRPC_CARES_TRACE_LOG(
+        "RecvFrom called on fd:|%s|. Current read buf length:|%d|", GetName(),
+        GRPC_SLICE_LENGTH(read_buf_));
+    if (GRPC_SLICE_LENGTH(read_buf_) == 0) {
+      WSASetLastError(WSAEWOULDBLOCK);
+      return -1;
+    }
+    ares_ssize_t bytes_read = 0;
+    for (size_t i = 0; i < GRPC_SLICE_LENGTH(read_buf_) && i < data_len; i++) {
+      ((char*)data)[i] = GRPC_SLICE_START_PTR(read_buf_)[i];
+      bytes_read++;
+    }
+    read_buf_ = grpc_slice_sub_no_ref(read_buf_, bytes_read,
+                                      GRPC_SLICE_LENGTH(read_buf_));
+    /* c-ares overloads this recv_from virtual socket function to receive
+     * data on both UDP and TCP sockets, and from is nullptr for TCP. */
+    if (from != nullptr) {
+      GPR_ASSERT(*from_len <= recv_from_source_addr_len_);
+      memcpy(from, &recv_from_source_addr_, recv_from_source_addr_len_);
+      *from_len = recv_from_source_addr_len_;
+    }
+    return bytes_read;
+  }
+
+  grpc_slice FlattenIovec(const struct iovec* iov, int iov_count) {
+    int total = 0;
+    for (int i = 0; i < iov_count; i++) {
+      total += iov[i].iov_len;
+    }
+    grpc_slice out = GRPC_SLICE_MALLOC(total);
+    size_t cur = 0;
+    for (int i = 0; i < iov_count; i++) {
+      for (int k = 0; k < iov[i].iov_len; k++) {
+        GRPC_SLICE_START_PTR(out)[cur++] = ((char*)iov[i].iov_base)[k];
+      }
+    }
+    return out;
+  }
+
+  int SendWriteBuf(LPDWORD bytes_sent_ptr, LPWSAOVERLAPPED overlapped) {
+    WSABUF buf;
+    buf.len = GRPC_SLICE_LENGTH(write_buf_);
+    buf.buf = (char*)GRPC_SLICE_START_PTR(write_buf_);
+    DWORD flags = 0;
+    int out = WSASend(grpc_winsocket_wrapped_socket(winsocket_), &buf, 1,
+                      bytes_sent_ptr, flags, overlapped, nullptr);
+    GRPC_CARES_TRACE_LOG(
+        "WSASend: name:%s. buf len:%d. bytes sent: %d. overlapped %p. return "
+        "val: %d",
+        GetName(), buf.len, *bytes_sent_ptr, overlapped, out);
+    return out;
+  }
+
+  ares_ssize_t TrySendWriteBufSyncNonBlocking() {
+    GPR_ASSERT(write_state_ == WRITE_IDLE);
+    ares_ssize_t total_sent;
+    DWORD bytes_sent = 0;
+    if (SendWriteBuf(&bytes_sent, nullptr) != 0) {
+      char* msg = gpr_format_message(WSAGetLastError());
+      GRPC_CARES_TRACE_LOG(
+          "TrySendWriteBufSyncNonBlocking: SendWriteBuf error:|%s|. fd:|%s|",
+          msg, GetName());
+      gpr_free(msg);
+      if (WSAGetLastError() == WSA_IO_PENDING) {
+        WSASetLastError(WSAEWOULDBLOCK);
+        write_state_ = WRITE_REQUESTED;
+      }
+    }
+    write_buf_ = grpc_slice_sub_no_ref(write_buf_, bytes_sent,
+                                       GRPC_SLICE_LENGTH(write_buf_));
+    return bytes_sent;
+  }
+
+  ares_ssize_t SendV(const struct iovec* iov, int iov_count) {
+    GRPC_CARES_TRACE_LOG("SendV called on fd:|%s|. Current write state: %d",
+                         GetName(), write_state_);
+    switch (write_state_) {
+      case WRITE_IDLE:
+        GPR_ASSERT(GRPC_SLICE_LENGTH(write_buf_) == 0);
+        grpc_slice_unref_internal(write_buf_);
+        write_buf_ = FlattenIovec(iov, iov_count);
+        return TrySendWriteBufSyncNonBlocking();
+      case WRITE_REQUESTED:
+      case WRITE_PENDING:
+        WSASetLastError(WSAEWOULDBLOCK);
+        return -1;
+      case WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY:
+        grpc_slice currently_attempted = FlattenIovec(iov, iov_count);
+        GPR_ASSERT(GRPC_SLICE_LENGTH(currently_attempted) >=
+                   GRPC_SLICE_LENGTH(write_buf_));
+        ares_ssize_t total_sent = 0;
+        for (size_t i = 0; i < GRPC_SLICE_LENGTH(write_buf_); i++) {
+          GPR_ASSERT(GRPC_SLICE_START_PTR(currently_attempted)[i] ==
+                     GRPC_SLICE_START_PTR(write_buf_)[i]);
+          total_sent++;
+        }
+        grpc_slice_unref_internal(write_buf_);
+        write_buf_ =
+            grpc_slice_sub_no_ref(currently_attempted, total_sent,
+                                  GRPC_SLICE_LENGTH(currently_attempted));
+        write_state_ = WRITE_IDLE;
+        total_sent += TrySendWriteBufSyncNonBlocking();
+        return total_sent;
+    }
     abort();
   }
-  bool IsFdStillReadableLocked() override { abort(); }
-  void ShutdownLocked(grpc_error* error) override { abort(); }
-  ares_socket_t GetWrappedAresSocketLocked() override { abort(); }
-  const char* GetName() override { abort(); }
+
+  int Connect(const struct sockaddr* target, ares_socklen_t target_len) {
+    SOCKET s = grpc_winsocket_wrapped_socket(winsocket_);
+    GRPC_CARES_TRACE_LOG("Connect: fd:|%s|", GetName());
+    int out =
+        WSAConnect(s, target, target_len, nullptr, nullptr, nullptr, nullptr);
+    if (out != 0) {
+      char* msg = gpr_format_message(WSAGetLastError());
+      GRPC_CARES_TRACE_LOG("Connect error code:|%d|, msg:|%s|. fd:|%s|",
+                           WSAGetLastError(), msg, GetName());
+      gpr_free(msg);
+      // c-ares expects a posix-style connect API
+      out = -1;
+    }
+    return out;
+  }
+
+  static void OnIocpReadable(void* arg, grpc_error* error) {
+    GrpcPolledFdWindows* polled_fd = static_cast<GrpcPolledFdWindows*>(arg);
+    polled_fd->OnIocpReadableInner(error);
+  }
+
+  void OnIocpReadableInner(grpc_error* error) {
+    if (error == GRPC_ERROR_NONE) {
+      if (winsocket_->read_info.wsa_error != 0) {
+        /* WSAEMSGSIZE would be due to receiving more data
+         * than our read buffer's fixed capacity. Assume that
+         * the connection is TCP and read the leftovers
+         * in subsequent c-ares reads. */
+        if (winsocket_->read_info.wsa_error != WSAEMSGSIZE) {
+          GRPC_ERROR_UNREF(error);
+          char* msg = gpr_format_message(winsocket_->read_info.wsa_error);
+          GRPC_CARES_TRACE_LOG(
+              "OnIocpReadableInner. winsocket error:|%s|. fd:|%s|", msg,
+              GetName());
+          error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
+          gpr_free(msg);
+        }
+      }
+    }
+    if (error == GRPC_ERROR_NONE) {
+      read_buf_ = grpc_slice_sub_no_ref(read_buf_, 0,
+                                        winsocket_->read_info.bytes_transfered);
+    } else {
+      grpc_slice_unref_internal(read_buf_);
+      read_buf_ = grpc_empty_slice();
+    }
+    GRPC_CARES_TRACE_LOG(
+        "OnIocpReadable finishing. read buf length now:|%d|. :fd:|%s|",
+        GRPC_SLICE_LENGTH(read_buf_), GetName());
+    ScheduleAndNullReadClosure(error);
+  }
+
+  static void OnIocpWriteable(void* arg, grpc_error* error) {
+    GrpcPolledFdWindows* polled_fd = static_cast<GrpcPolledFdWindows*>(arg);
+    polled_fd->OnIocpWriteableInner(error);
+  }
+
+  void OnIocpWriteableInner(grpc_error* error) {
+    GRPC_CARES_TRACE_LOG("OnIocpWriteableInner. fd:|%s|", GetName());
+    if (error == GRPC_ERROR_NONE) {
+      if (winsocket_->write_info.wsa_error != 0) {
+        char* msg = gpr_format_message(winsocket_->write_info.wsa_error);
+        GRPC_CARES_TRACE_LOG(
+            "OnIocpWriteableInner. winsocket error:|%s|. fd:|%s|", msg,
+            GetName());
+        GRPC_ERROR_UNREF(error);
+        error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
+        gpr_free(msg);
+      }
+    }
+    GPR_ASSERT(write_state_ == WRITE_PENDING);
+    if (error == GRPC_ERROR_NONE) {
+      write_state_ = WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY;
+      write_buf_ = grpc_slice_sub_no_ref(
+          write_buf_, 0, winsocket_->write_info.bytes_transfered);
+    } else {
+      grpc_slice_unref_internal(write_buf_);
+      write_buf_ = grpc_empty_slice();
+    }
+    ScheduleAndNullWriteClosure(error);
+  }
+
+  bool gotten_into_driver_list() const { return gotten_into_driver_list_; }
+  void set_gotten_into_driver_list() { gotten_into_driver_list_ = true; }
+
+  grpc_combiner* combiner_;
+  char recv_from_source_addr_[200];
+  ares_socklen_t recv_from_source_addr_len_;
+  grpc_slice read_buf_;
+  grpc_slice write_buf_;
+  grpc_closure* read_closure_ = nullptr;
+  grpc_closure* write_closure_ = nullptr;
+  grpc_closure outer_read_closure_;
+  grpc_closure outer_write_closure_;
+  grpc_winsocket* winsocket_;
+  WriteState write_state_;
+  char* name_ = nullptr;
+  bool gotten_into_driver_list_;
 };
 
-GrpcPolledFd* NewGrpcPolledFdLocked(ares_socket_t as,
-                                    grpc_pollset_set* driver_pollset_set) {
-  return nullptr;
-}
+struct SockToPolledFdEntry {
+  SockToPolledFdEntry(SOCKET s, GrpcPolledFdWindows* fd)
+      : socket(s), polled_fd(fd) {}
+  SOCKET socket;
+  GrpcPolledFdWindows* polled_fd;
+  SockToPolledFdEntry* next = nullptr;
+};
 
-void ConfigureAresChannelLocked(ares_channel* channel) { abort(); }
+/* A SockToPolledFdMap can make ares_socket_t types (SOCKET's on windows)
+ * to GrpcPolledFdWindow's, and is used to find the appropriate
+ * GrpcPolledFdWindows to handle a virtual socket call when c-ares makes that
+ * socket call on the ares_socket_t type. Instances are owned by and one-to-one
+ * with a GrpcPolledFdWindows factory and event driver */
+class SockToPolledFdMap {
+ public:
+  SockToPolledFdMap(grpc_combiner* combiner) {
+    combiner_ = GRPC_COMBINER_REF(combiner, "sock to polled fd map");
+  }
+
+  ~SockToPolledFdMap() {
+    GPR_ASSERT(head_ == nullptr);
+    GRPC_COMBINER_UNREF(combiner_, "sock to polled fd map");
+  }
+
+  void AddNewSocket(SOCKET s, GrpcPolledFdWindows* polled_fd) {
+    SockToPolledFdEntry* new_node = New<SockToPolledFdEntry>(s, polled_fd);
+    new_node->next = head_;
+    head_ = new_node;
+  }
+
+  GrpcPolledFdWindows* LookupPolledFd(SOCKET s) {
+    for (SockToPolledFdEntry* node = head_; node != nullptr;
+         node = node->next) {
+      if (node->socket == s) {
+        GPR_ASSERT(node->polled_fd != nullptr);
+        return node->polled_fd;
+      }
+    }
+    abort();
+  }
+
+  void RemoveEntry(SOCKET s) {
+    GPR_ASSERT(head_ != nullptr);
+    SockToPolledFdEntry** prev = &head_;
+    for (SockToPolledFdEntry* node = head_; node != nullptr;
+         node = node->next) {
+      if (node->socket == s) {
+        *prev = node->next;
+        Delete(node);
+        return;
+      }
+      prev = &node->next;
+    }
+    abort();
+  }
+
+  /* These virtual socket functions are called from within the c-ares
+   * library. These methods generally dispatch those socket calls to the
+   * appropriate methods. The virtual "socket" and "close" methods are
+   * special and instead create/add and remove/destroy GrpcPolledFdWindows
+   * objects.
+   */
+  static ares_socket_t Socket(int af, int type, int protocol, void* user_data) {
+    SockToPolledFdMap* map = static_cast<SockToPolledFdMap*>(user_data);
+    SOCKET s = WSASocket(af, type, protocol, nullptr, 0, WSA_FLAG_OVERLAPPED);
+    if (s == INVALID_SOCKET) {
+      return s;
+    }
+    grpc_tcp_set_non_block(s);
+    GrpcPolledFdWindows* polled_fd =
+        New<GrpcPolledFdWindows>(s, map->combiner_);
+    map->AddNewSocket(s, polled_fd);
+    return s;
+  }
+
+  static int Connect(ares_socket_t as, const struct sockaddr* target,
+                     ares_socklen_t target_len, void* user_data) {
+    SockToPolledFdMap* map = static_cast<SockToPolledFdMap*>(user_data);
+    GrpcPolledFdWindows* polled_fd = map->LookupPolledFd(as);
+    return polled_fd->Connect(target, target_len);
+  }
+
+  static ares_ssize_t SendV(ares_socket_t as, const struct iovec* iov,
+                            int iovec_count, void* user_data) {
+    SockToPolledFdMap* map = static_cast<SockToPolledFdMap*>(user_data);
+    GrpcPolledFdWindows* polled_fd = map->LookupPolledFd(as);
+    return polled_fd->SendV(iov, iovec_count);
+  }
+
+  static ares_ssize_t RecvFrom(ares_socket_t as, void* data, size_t data_len,
+                               int flags, struct sockaddr* from,
+                               ares_socklen_t* from_len, void* user_data) {
+    SockToPolledFdMap* map = static_cast<SockToPolledFdMap*>(user_data);
+    GrpcPolledFdWindows* polled_fd = map->LookupPolledFd(as);
+    return polled_fd->RecvFrom(data, data_len, flags, from, from_len);
+  }
+
+  static int CloseSocket(SOCKET s, void* user_data) {
+    SockToPolledFdMap* map = static_cast<SockToPolledFdMap*>(user_data);
+    GrpcPolledFdWindows* polled_fd = map->LookupPolledFd(s);
+    map->RemoveEntry(s);
+    // If a gRPC polled fd has not made it in to the driver's list yet, then
+    // the driver has not and will never see this socket.
+    if (!polled_fd->gotten_into_driver_list()) {
+      polled_fd->ShutdownLocked(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+          "Shut down c-ares fd before without it ever having made it into the "
+          "driver's list"));
+      return 0;
+    }
+    return 0;
+  }
+
+ private:
+  SockToPolledFdEntry* head_ = nullptr;
+  grpc_combiner* combiner_;
+};
+
+const struct ares_socket_functions custom_ares_sock_funcs = {
+    &SockToPolledFdMap::Socket /* socket */,
+    &SockToPolledFdMap::CloseSocket /* close */,
+    &SockToPolledFdMap::Connect /* connect */,
+    &SockToPolledFdMap::RecvFrom /* recvfrom */,
+    &SockToPolledFdMap::SendV /* sendv */,
+};
+
+class GrpcPolledFdFactoryWindows : public GrpcPolledFdFactory {
+ public:
+  GrpcPolledFdFactoryWindows(grpc_combiner* combiner)
+      : sock_to_polled_fd_map_(combiner) {}
+
+  GrpcPolledFd* NewGrpcPolledFdLocked(ares_socket_t as,
+                                      grpc_pollset_set* driver_pollset_set,
+                                      grpc_combiner* combiner) override {
+    GrpcPolledFdWindows* polled_fd = sock_to_polled_fd_map_.LookupPolledFd(as);
+    // Set a flag so that the virtual socket "close" method knows it
+    // doesn't need to call ShutdownLocked, since now the driver will.
+    polled_fd->set_gotten_into_driver_list();
+    return polled_fd;
+  }
+
+  void ConfigureAresChannelLocked(ares_channel channel) override {
+    ares_set_socket_functions(channel, &custom_ares_sock_funcs,
+                              &sock_to_polled_fd_map_);
+  }
+
+ private:
+  SockToPolledFdMap sock_to_polled_fd_map_;
+};
+
+UniquePtr<GrpcPolledFdFactory> NewGrpcPolledFdFactory(grpc_combiner* combiner) {
+  return UniquePtr<GrpcPolledFdFactory>(
+      New<GrpcPolledFdFactoryWindows>(combiner));
+}
 
 }  // namespace grpc_core
 
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
index b3d6437..485998f 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
@@ -49,6 +49,8 @@
 grpc_core::TraceFlag grpc_trace_cares_address_sorting(false,
                                                       "cares_address_sorting");
 
+grpc_core::TraceFlag grpc_trace_cares_resolver(false, "cares_resolver");
+
 struct grpc_ares_request {
   /** indicates the DNS server to use, if specified */
   struct ares_addr_port_node dns_server_addr;
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
index 17eaa7c..ca5779e 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
@@ -28,6 +28,13 @@
 
 extern grpc_core::TraceFlag grpc_trace_cares_address_sorting;
 
+extern grpc_core::TraceFlag grpc_trace_cares_resolver;
+
+#define GRPC_CARES_TRACE_LOG(format, ...)                         \
+  if (grpc_trace_cares_resolver.enabled()) {                      \
+    gpr_log(GPR_DEBUG, "(c-ares resolver) " format, __VA_ARGS__); \
+  }
+
 typedef struct grpc_ares_request grpc_ares_request;
 
 /* Asynchronously resolve \a name. Use \a default_port if a port isn't
diff --git a/src/core/lib/iomgr/iocp_windows.cc b/src/core/lib/iomgr/iocp_windows.cc
index ce77231..ad325fe 100644
--- a/src/core/lib/iomgr/iocp_windows.cc
+++ b/src/core/lib/iomgr/iocp_windows.cc
@@ -89,10 +89,15 @@
   } else {
     abort();
   }
-  success = WSAGetOverlappedResult(socket->socket, &info->overlapped, &bytes,
-                                   FALSE, &flags);
-  info->bytes_transfered = bytes;
-  info->wsa_error = success ? 0 : WSAGetLastError();
+  if (socket->shutdown_called) {
+    info->bytes_transfered = 0;
+    info->wsa_error = WSA_OPERATION_ABORTED;
+  } else {
+    success = WSAGetOverlappedResult(socket->socket, &info->overlapped, &bytes,
+                                     FALSE, &flags);
+    info->bytes_transfered = bytes;
+    info->wsa_error = success ? 0 : WSAGetLastError();
+  }
   GPR_ASSERT(overlapped == &info->overlapped);
   grpc_socket_become_ready(socket, info);
   return GRPC_IOCP_WORK_WORK;
diff --git a/src/core/lib/iomgr/socket_windows.cc b/src/core/lib/iomgr/socket_windows.cc
index 4ad31cb..999c664 100644
--- a/src/core/lib/iomgr/socket_windows.cc
+++ b/src/core/lib/iomgr/socket_windows.cc
@@ -52,6 +52,10 @@
   return r;
 }
 
+SOCKET grpc_winsocket_wrapped_socket(grpc_winsocket* socket) {
+  return socket->socket;
+}
+
 /* Schedule a shutdown of the socket operations. Will call the pending
    operations to abort them. We need to do that this way because of the
    various callsites of that function, which happens to be in various
diff --git a/src/core/lib/iomgr/socket_windows.h b/src/core/lib/iomgr/socket_windows.h
index b09b9da..46d7d58 100644
--- a/src/core/lib/iomgr/socket_windows.h
+++ b/src/core/lib/iomgr/socket_windows.h
@@ -92,6 +92,8 @@
    it will be responsible for closing it. */
 grpc_winsocket* grpc_winsocket_create(SOCKET socket, const char* name);
 
+SOCKET grpc_winsocket_wrapped_socket(grpc_winsocket* socket);
+
 /* Initiate an asynchronous shutdown of the socket. Will call off any pending
    operation to cancel them. */
 void grpc_winsocket_shutdown(grpc_winsocket* socket);
diff --git a/src/core/lib/iomgr/tcp_windows.cc b/src/core/lib/iomgr/tcp_windows.cc
index 5d316d4..b3cb442 100644
--- a/src/core/lib/iomgr/tcp_windows.cc
+++ b/src/core/lib/iomgr/tcp_windows.cc
@@ -53,7 +53,7 @@
 
 extern grpc_core::TraceFlag grpc_tcp_trace;
 
-static grpc_error* set_non_block(SOCKET sock) {
+grpc_error* grpc_tcp_set_non_block(SOCKET sock) {
   int status;
   uint32_t param = 1;
   DWORD ret;
@@ -90,7 +90,7 @@
 
 grpc_error* grpc_tcp_prepare_socket(SOCKET sock) {
   grpc_error* err;
-  err = set_non_block(sock);
+  err = grpc_tcp_set_non_block(sock);
   if (err != GRPC_ERROR_NONE) return err;
   err = set_dualstack(sock);
   if (err != GRPC_ERROR_NONE) return err;
diff --git a/src/core/lib/iomgr/tcp_windows.h b/src/core/lib/iomgr/tcp_windows.h
index 161a545..04ef810 100644
--- a/src/core/lib/iomgr/tcp_windows.h
+++ b/src/core/lib/iomgr/tcp_windows.h
@@ -46,6 +46,8 @@
 
 grpc_error* grpc_tcp_prepare_socket(SOCKET sock);
 
+grpc_error* grpc_tcp_set_non_block(SOCKET sock);
+
 #endif
 
 #endif /* GRPC_CORE_LIB_IOMGR_TCP_WINDOWS_H */
diff --git a/src/cpp/server/load_reporter/util.cc b/src/cpp/server/load_reporter/util.cc
index a2f2f11..89bdf57 100644
--- a/src/cpp/server/load_reporter/util.cc
+++ b/src/cpp/server/load_reporter/util.cc
@@ -20,6 +20,8 @@
 
 #include <grpcpp/ext/server_load_reporting.h>
 
+#include <cmath>
+
 #include <grpc/support/log.h>
 
 namespace grpc {
diff --git a/templates/test/cpp/naming/resolver_component_tests_defs.include b/templates/test/cpp/naming/resolver_component_tests_defs.include
index bc981dc..b34845e 100644
--- a/templates/test/cpp/naming/resolver_component_tests_defs.include
+++ b/templates/test/cpp/naming/resolver_component_tests_defs.include
@@ -22,6 +22,7 @@
 import os
 import time
 import signal
+import platform
 
 
 argp = argparse.ArgumentParser(description='Run c-ares resolver tests')
@@ -43,6 +44,11 @@
 def test_runner_log(msg):
   sys.stderr.write('\n%s: %s\n' % (__file__, msg))
 
+def python_args(arg_list):
+  if platform.system() == 'Windows':
+    return [sys.executable] + arg_list
+  return arg_list
+
 cur_resolver = os.environ.get('GRPC_DNS_RESOLVER')
 if cur_resolver and cur_resolver != 'ares':
   test_runner_log(('WARNING: cur resolver set to %s. This set of tests '
@@ -50,26 +56,27 @@
   test_runner_log('Exit 1 without running tests.')
   sys.exit(1)
 os.environ.update({'GRPC_DNS_RESOLVER': 'ares'})
+os.environ.update({'GRPC_TRACE': 'cares_resolver'})
 
 def wait_until_dns_server_is_up(args,
                                 dns_server_subprocess,
                                 dns_server_subprocess_output):
   for i in range(0, 30):
     test_runner_log('Health check: attempt to connect to DNS server over TCP.')
-    tcp_connect_subprocess = subprocess.Popen([
+    tcp_connect_subprocess = subprocess.Popen(python_args([
         args.tcp_connect_bin_path,
         '--server_host', '127.0.0.1',
         '--server_port', str(args.dns_server_port),
-        '--timeout', str(1)])
+        '--timeout', str(1)]))
     tcp_connect_subprocess.communicate()
     if tcp_connect_subprocess.returncode == 0:
       test_runner_log(('Health check: attempt to make an A-record '
                        'query to DNS server.'))
-      dns_resolver_subprocess = subprocess.Popen([
+      dns_resolver_subprocess = subprocess.Popen(python_args([
           args.dns_resolver_bin_path,
           '--qname', 'health-check-local-dns-server-is-alive.resolver-tests.grpctestingexp',
           '--server_host', '127.0.0.1',
-          '--server_port', str(args.dns_server_port)],
+          '--server_port', str(args.dns_server_port)]),
           stdout=subprocess.PIPE)
       dns_resolver_stdout, _ = dns_resolver_subprocess.communicate()
       if dns_resolver_subprocess.returncode == 0:
@@ -91,10 +98,10 @@
 
 dns_server_subprocess_output = tempfile.mktemp()
 with open(dns_server_subprocess_output, 'w') as l:
-  dns_server_subprocess = subprocess.Popen([
+  dns_server_subprocess = subprocess.Popen(python_args([
       args.dns_server_bin_path,
       '--port', str(args.dns_server_port),
-      '--records_config_path', args.records_config_path],
+      '--records_config_path', args.records_config_path]),
       stdin=subprocess.PIPE,
       stdout=l,
       stderr=l)
diff --git a/test/cpp/naming/cancel_ares_query_test.cc b/test/cpp/naming/cancel_ares_query_test.cc
index 0d59bf6..dec7c17 100644
--- a/test/cpp/naming/cancel_ares_query_test.cc
+++ b/test/cpp/naming/cancel_ares_query_test.cc
@@ -45,11 +45,14 @@
 #include "test/core/util/port.h"
 #include "test/core/util/test_config.h"
 
-// TODO: pull in different headers when enabling this
-// test on windows. Also set BAD_SOCKET_RETURN_VAL
-// to INVALID_SOCKET on windows.
+#ifdef GPR_WINDOWS
+#include "src/core/lib/iomgr/sockaddr_windows.h"
+#include "src/core/lib/iomgr/socket_windows.h"
+#define BAD_SOCKET_RETURN_VAL INVALID_SOCKET
+#else
 #include "src/core/lib/iomgr/sockaddr_posix.h"
 #define BAD_SOCKET_RETURN_VAL -1
+#endif
 
 namespace {
 
@@ -91,7 +94,13 @@
       abort();
     }
   }
-  ~FakeNonResponsiveDNSServer() { close(socket_); }
+  ~FakeNonResponsiveDNSServer() {
+#ifdef GPR_WINDOWS
+    closesocket(socket_);
+#else
+    close(socket_);
+#endif
+  }
 
  private:
   int socket_;
@@ -193,6 +202,38 @@
   TestCancelActiveDNSQuery(&args);
 }
 
+#ifdef GPR_WINDOWS
+
+void MaybePollArbitraryPollsetTwice() {
+  grpc_pollset* pollset = (grpc_pollset*)gpr_zalloc(grpc_pollset_size());
+  gpr_mu* mu;
+  grpc_pollset_init(pollset, &mu);
+  grpc_pollset_worker* worker = nullptr;
+  // Make a zero timeout poll
+  gpr_mu_lock(mu);
+  GRPC_LOG_IF_ERROR(
+      "pollset_work",
+      grpc_pollset_work(pollset, &worker, grpc_core::ExecCtx::Get()->Now()));
+  gpr_mu_unlock(mu);
+  grpc_core::ExecCtx::Get()->Flush();
+  // Make a second zero-timeout poll (in case the first one
+  // short-circuited by picking up a previous "kick")
+  gpr_mu_lock(mu);
+  GRPC_LOG_IF_ERROR(
+      "pollset_work",
+      grpc_pollset_work(pollset, &worker, grpc_core::ExecCtx::Get()->Now()));
+  gpr_mu_unlock(mu);
+  grpc_core::ExecCtx::Get()->Flush();
+  grpc_pollset_destroy(pollset);
+  gpr_free(pollset);
+}
+
+#else
+
+void MaybePollArbitraryPollsetTwice() {}
+
+#endif
+
 TEST(CancelDuringAresQuery, TestFdsAreDeletedFromPollsetSet) {
   grpc_core::ExecCtx exec_ctx;
   ArgsStruct args;
@@ -209,6 +250,12 @@
   // this test. This test only cares about what happens to fd's that c-ares
   // opens.
   TestCancelActiveDNSQuery(&args);
+  // This test relies on the assumption that cancelling a c-ares query
+  // will flush out all callbacks on the current exec ctx, which is true
+  // on posix platforms but not on Windows, because fd shutdown on Windows
+  // requires a trip through the polling loop to schedule the callback.
+  // So we need to do extra polling work on Windows to free things up.
+  MaybePollArbitraryPollsetTwice();
   EXPECT_EQ(grpc_iomgr_count_objects_for_testing(), 0u);
   grpc_pollset_set_destroy(fake_other_pollset_set);
 }
diff --git a/test/cpp/naming/gen_build_yaml.py b/test/cpp/naming/gen_build_yaml.py
index 5dad2ea..1c9d067 100755
--- a/test/cpp/naming/gen_build_yaml.py
+++ b/test/cpp/naming/gen_build_yaml.py
@@ -68,7 +68,7 @@
               'gtest': False,
               'run': False,
               'src': ['test/cpp/naming/resolver_component_test.cc'],
-              'platforms': ['linux', 'posix', 'mac'],
+              'platforms': ['linux', 'posix', 'mac', 'windows'],
               'deps': [
                   'grpc++_test_util' + unsecure_build_config_suffix,
                   'grpc_test_util' + unsecure_build_config_suffix,
@@ -129,7 +129,7 @@
           'gtest': True,
           'run': True,
           'src': ['test/cpp/naming/cancel_ares_query_test.cc'],
-          'platforms': ['linux', 'posix', 'mac'],
+          'platforms': ['linux', 'posix', 'mac', 'windows'],
           'deps': [
               'grpc++_test_util',
               'grpc_test_util',
diff --git a/test/cpp/naming/manual_run_resolver_component_test.py b/test/cpp/naming/manual_run_resolver_component_test.py
new file mode 100644
index 0000000..fb21577
--- /dev/null
+++ b/test/cpp/naming/manual_run_resolver_component_test.py
@@ -0,0 +1,36 @@
+#!/usr/bin/env python
+# Copyright 2015 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import subprocess
+import sys
+
+# The c-ares test suite doesn't get ran regularly on Windows, but
+# this script provides a way to run a lot of the tests manually.
+_MSBUILD_CONFIG = os.environ['CONFIG']
+os.chdir(os.path.join('..', '..', os.getcwd()))
+# This port is arbitrary, but it needs to be available.
+_DNS_SERVER_PORT = 15353
+
+subprocess.call([
+    sys.executable,
+    'test\\cpp\\naming\\resolver_component_tests_runner.py',
+    '--test_bin_path', 'cmake\\build\\%s\\resolver_component_test.exe' % _MSBUILD_CONFIG,
+    '--dns_server_bin_path', 'test\\cpp\\naming\\utils\\dns_server.py',
+    '--records_config_path', 'test\\cpp\\naming\\resolver_test_record_groups.yaml',
+    '--dns_server_port', str(_DNS_SERVER_PORT),
+    '--dns_resolver_bin_path', 'test\\cpp\\naming\\utils\\dns_resolver.py',
+    '--tcp_connect_bin_path', 'test\\cpp\\naming\\utils\\tcp_connect.py',
+])
diff --git a/test/cpp/naming/resolver_component_test.cc b/test/cpp/naming/resolver_component_test.cc
index 6ac5481..3dc6e71 100644
--- a/test/cpp/naming/resolver_component_test.cc
+++ b/test/cpp/naming/resolver_component_test.cc
@@ -16,6 +16,8 @@
  *
  */
 
+#include <grpc/support/port_platform.h>
+
 #include <grpc/grpc.h>
 #include <grpc/support/alloc.h>
 #include <grpc/support/log.h>
@@ -55,8 +57,15 @@
 // TODO: pull in different headers when enabling this
 // test on windows. Also set BAD_SOCKET_RETURN_VAL
 // to INVALID_SOCKET on windows.
+#ifdef GPR_WINDOWS
+#include "src/core/lib/iomgr/sockaddr_windows.h"
+#include "src/core/lib/iomgr/socket_windows.h"
+#include "src/core/lib/iomgr/tcp_windows.h"
+#define BAD_SOCKET_RETURN_VAL INVALID_SOCKET
+#else
 #include "src/core/lib/iomgr/sockaddr_posix.h"
 #define BAD_SOCKET_RETURN_VAL -1
+#endif
 
 using grpc::SubProcess;
 using std::vector;
@@ -241,6 +250,62 @@
   }
 }
 
+#ifdef GPR_WINDOWS
+void OpenAndCloseSocketsStressLoop(int dummy_port, gpr_event* done_ev) {
+  sockaddr_in6 addr;
+  memset(&addr, 0, sizeof(addr));
+  addr.sin6_family = AF_INET6;
+  addr.sin6_port = htons(dummy_port);
+  ((char*)&addr.sin6_addr)[15] = 1;
+  for (;;) {
+    if (gpr_event_get(done_ev)) {
+      return;
+    }
+    std::vector<int> sockets;
+    for (size_t i = 0; i < 50; i++) {
+      SOCKET s = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, nullptr, 0,
+                           WSA_FLAG_OVERLAPPED);
+      ASSERT_TRUE(s != BAD_SOCKET_RETURN_VAL)
+          << "Failed to create TCP ipv6 socket";
+      gpr_log(GPR_DEBUG, "Opened socket: %d", s);
+      char val = 1;
+      ASSERT_TRUE(setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) !=
+                  SOCKET_ERROR)
+          << "Failed to set socketopt reuseaddr. WSA error: " +
+                 std::to_string(WSAGetLastError());
+      ASSERT_TRUE(grpc_tcp_set_non_block(s) == GRPC_ERROR_NONE)
+          << "Failed to set socket non-blocking";
+      ASSERT_TRUE(bind(s, (const sockaddr*)&addr, sizeof(addr)) != SOCKET_ERROR)
+          << "Failed to bind socket " + std::to_string(s) +
+                 " to [::1]:" + std::to_string(dummy_port) +
+                 ". WSA error: " + std::to_string(WSAGetLastError());
+      ASSERT_TRUE(listen(s, 1) != SOCKET_ERROR)
+          << "Failed to listen on socket " + std::to_string(s) +
+                 ". WSA error: " + std::to_string(WSAGetLastError());
+      sockets.push_back(s);
+    }
+    // Do a non-blocking accept followed by a close on all of those sockets.
+    // Do this in a separate loop to try to induce a time window to hit races.
+    for (size_t i = 0; i < sockets.size(); i++) {
+      gpr_log(GPR_DEBUG, "non-blocking accept then close on %d", sockets[i]);
+      ASSERT_TRUE(accept(sockets[i], nullptr, nullptr) == INVALID_SOCKET)
+          << "Accept on dummy socket unexpectedly accepted actual connection.";
+      ASSERT_TRUE(WSAGetLastError() == WSAEWOULDBLOCK)
+          << "OpenAndCloseSocketsStressLoop accept on socket " +
+                 std::to_string(sockets[i]) +
+                 " failed in "
+                 "an unexpected way. "
+                 "WSA error: " +
+                 std::to_string(WSAGetLastError()) +
+                 ". Socket use-after-close bugs are likely.";
+      ASSERT_TRUE(closesocket(sockets[i]) != SOCKET_ERROR)
+          << "Failed to close socket: " + std::to_string(sockets[i]) +
+                 ". WSA error: " + std::to_string(WSAGetLastError());
+    }
+  }
+  return;
+}
+#else
 void OpenAndCloseSocketsStressLoop(int dummy_port, gpr_event* done_ev) {
   // The goal of this loop is to catch socket
   // "use after close" bugs within the c-ares resolver by acting
@@ -311,6 +376,7 @@
     }
   }
 }
+#endif
 
 void CheckResolverResultLocked(void* argsp, grpc_error* err) {
   EXPECT_EQ(err, GRPC_ERROR_NONE);
@@ -372,9 +438,9 @@
   args.expected_lb_policy = FLAGS_expected_lb_policy;
   // maybe build the address with an authority
   char* whole_uri = nullptr;
-  GPR_ASSERT(asprintf(&whole_uri, "dns://%s/%s",
-                      FLAGS_local_dns_server_address.c_str(),
-                      FLAGS_target_name.c_str()));
+  GPR_ASSERT(gpr_asprintf(&whole_uri, "dns://%s/%s",
+                          FLAGS_local_dns_server_address.c_str(),
+                          FLAGS_target_name.c_str()));
   // create resolver and resolve
   grpc_core::OrphanablePtr<grpc_core::Resolver> resolver =
       grpc_core::ResolverRegistry::CreateResolver(whole_uri, nullptr,
diff --git a/test/cpp/naming/resolver_component_tests_runner.py b/test/cpp/naming/resolver_component_tests_runner.py
index 69386eb..1873eec 100755
--- a/test/cpp/naming/resolver_component_tests_runner.py
+++ b/test/cpp/naming/resolver_component_tests_runner.py
@@ -22,6 +22,7 @@
 import os
 import time
 import signal
+import platform
 
 
 argp = argparse.ArgumentParser(description='Run c-ares resolver tests')
@@ -43,6 +44,11 @@
 def test_runner_log(msg):
   sys.stderr.write('\n%s: %s\n' % (__file__, msg))
 
+def python_args(arg_list):
+  if platform.system() == 'Windows':
+    return [sys.executable] + arg_list
+  return arg_list
+
 cur_resolver = os.environ.get('GRPC_DNS_RESOLVER')
 if cur_resolver and cur_resolver != 'ares':
   test_runner_log(('WARNING: cur resolver set to %s. This set of tests '
@@ -50,26 +56,27 @@
   test_runner_log('Exit 1 without running tests.')
   sys.exit(1)
 os.environ.update({'GRPC_DNS_RESOLVER': 'ares'})
+os.environ.update({'GRPC_TRACE': 'cares_resolver'})
 
 def wait_until_dns_server_is_up(args,
                                 dns_server_subprocess,
                                 dns_server_subprocess_output):
   for i in range(0, 30):
     test_runner_log('Health check: attempt to connect to DNS server over TCP.')
-    tcp_connect_subprocess = subprocess.Popen([
+    tcp_connect_subprocess = subprocess.Popen(python_args([
         args.tcp_connect_bin_path,
         '--server_host', '127.0.0.1',
         '--server_port', str(args.dns_server_port),
-        '--timeout', str(1)])
+        '--timeout', str(1)]))
     tcp_connect_subprocess.communicate()
     if tcp_connect_subprocess.returncode == 0:
       test_runner_log(('Health check: attempt to make an A-record '
                        'query to DNS server.'))
-      dns_resolver_subprocess = subprocess.Popen([
+      dns_resolver_subprocess = subprocess.Popen(python_args([
           args.dns_resolver_bin_path,
           '--qname', 'health-check-local-dns-server-is-alive.resolver-tests.grpctestingexp',
           '--server_host', '127.0.0.1',
-          '--server_port', str(args.dns_server_port)],
+          '--server_port', str(args.dns_server_port)]),
           stdout=subprocess.PIPE)
       dns_resolver_stdout, _ = dns_resolver_subprocess.communicate()
       if dns_resolver_subprocess.returncode == 0:
@@ -91,10 +98,10 @@
 
 dns_server_subprocess_output = tempfile.mktemp()
 with open(dns_server_subprocess_output, 'w') as l:
-  dns_server_subprocess = subprocess.Popen([
+  dns_server_subprocess = subprocess.Popen(python_args([
       args.dns_server_bin_path,
       '--port', str(args.dns_server_port),
-      '--records_config_path', args.records_config_path],
+      '--records_config_path', args.records_config_path]),
       stdin=subprocess.PIPE,
       stdout=l,
       stderr=l)
@@ -112,6 +119,18 @@
                             dns_server_subprocess_output)
 num_test_failures = 0
 
+test_runner_log('Run test with target: %s' % 'no-srv-ipv4-single-target.resolver-tests-version-4.grpctestingexp.')
+current_test_subprocess = subprocess.Popen([
+  args.test_bin_path,
+  '--target_name', 'no-srv-ipv4-single-target.resolver-tests-version-4.grpctestingexp.',
+  '--expected_addrs', '5.5.5.5:443,False',
+  '--expected_chosen_service_config', '',
+  '--expected_lb_policy', '',
+  '--local_dns_server_address', '127.0.0.1:%d' % args.dns_server_port])
+current_test_subprocess.communicate()
+if current_test_subprocess.returncode != 0:
+  num_test_failures += 1
+
 test_runner_log('Run test with target: %s' % 'srv-ipv4-single-target.resolver-tests-version-4.grpctestingexp.')
 current_test_subprocess = subprocess.Popen([
   args.test_bin_path,
diff --git a/test/cpp/naming/resolver_test_record_groups.yaml b/test/cpp/naming/resolver_test_record_groups.yaml
index 6c4f89d..3c51a00 100644
--- a/test/cpp/naming/resolver_test_record_groups.yaml
+++ b/test/cpp/naming/resolver_test_record_groups.yaml
@@ -1,6 +1,14 @@
 resolver_tests_common_zone_name: resolver-tests-version-4.grpctestingexp.
 resolver_component_tests:
 - expected_addrs:
+  - {address: '5.5.5.5:443', is_balancer: false}
+  expected_chosen_service_config: null
+  expected_lb_policy: null
+  record_to_resolve: no-srv-ipv4-single-target
+  records:
+    no-srv-ipv4-single-target:
+    - {TTL: '2100', data: 5.5.5.5, type: A}
+- expected_addrs:
   - {address: '1.2.3.4:1234', is_balancer: true}
   expected_chosen_service_config: null
   expected_lb_policy: null
diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json
index 5815f82..cf3b54e 100644
--- a/tools/run_tests/generated/tests.json
+++ b/tools/run_tests/generated/tests.json
@@ -5784,7 +5784,8 @@
     "ci_platforms": [
       "linux", 
       "mac", 
-      "posix"
+      "posix", 
+      "windows"
     ], 
     "cpu_cost": 1.0, 
     "exclude_configs": [], 
@@ -5796,7 +5797,8 @@
     "platforms": [
       "linux", 
       "mac", 
-      "posix"
+      "posix", 
+      "windows"
     ], 
     "uses_polling": true
   },