Merge pull request #16558 from ncteisen/user-agent

Stop Unconditionally Surfacing User Agent to Server
diff --git a/include/grpcpp/impl/codegen/async_stream.h b/include/grpcpp/impl/codegen/async_stream.h
index b213459..b306cd3 100644
--- a/include/grpcpp/impl/codegen/async_stream.h
+++ b/include/grpcpp/impl/codegen/async_stream.h
@@ -195,6 +195,13 @@
     assert(size == sizeof(ClientAsyncReader));
   }
 
+  // This operator should never be called as the memory should be freed as part
+  // of the arena destruction. It only exists to provide a matching operator
+  // delete to the operator new so that some compilers will not complain (see
+  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
+  // there are no tests catching the compiler warning.
+  static void operator delete(void*, void*) { assert(0); }
+
   void StartCall(void* tag) override {
     assert(!started_);
     started_ = true;
@@ -336,6 +343,13 @@
     assert(size == sizeof(ClientAsyncWriter));
   }
 
+  // This operator should never be called as the memory should be freed as part
+  // of the arena destruction. It only exists to provide a matching operator
+  // delete to the operator new so that some compilers will not complain (see
+  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
+  // there are no tests catching the compiler warning.
+  static void operator delete(void*, void*) { assert(0); }
+
   void StartCall(void* tag) override {
     assert(!started_);
     started_ = true;
@@ -496,6 +510,13 @@
     assert(size == sizeof(ClientAsyncReaderWriter));
   }
 
+  // This operator should never be called as the memory should be freed as part
+  // of the arena destruction. It only exists to provide a matching operator
+  // delete to the operator new so that some compilers will not complain (see
+  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
+  // there are no tests catching the compiler warning.
+  static void operator delete(void*, void*) { assert(0); }
+
   void StartCall(void* tag) override {
     assert(!started_);
     started_ = true;
diff --git a/include/grpcpp/impl/codegen/completion_queue.h b/include/grpcpp/impl/codegen/completion_queue.h
index 3f7d4fb..6c8428e 100644
--- a/include/grpcpp/impl/codegen/completion_queue.h
+++ b/include/grpcpp/impl/codegen/completion_queue.h
@@ -384,6 +384,7 @@
 
   grpc_cq_polling_type polling_type_;
   friend class ServerBuilder;
+  friend class Server;
 };
 
 }  // namespace grpc
diff --git a/src/core/ext/filters/client_channel/parse_address.cc b/src/core/ext/filters/client_channel/parse_address.cc
index b390011..7e726dd 100644
--- a/src/core/ext/filters/client_channel/parse_address.cc
+++ b/src/core/ext/filters/client_channel/parse_address.cc
@@ -125,9 +125,16 @@
   char* host_end = static_cast<char*>(gpr_memrchr(host, '%', strlen(host)));
   if (host_end != nullptr) {
     GPR_ASSERT(host_end >= host);
-    char host_without_scope[GRPC_INET6_ADDRSTRLEN];
+    char host_without_scope[GRPC_INET6_ADDRSTRLEN + 1];
     size_t host_without_scope_len = static_cast<size_t>(host_end - host);
     uint32_t sin6_scope_id = 0;
+    if (host_without_scope_len > GRPC_INET6_ADDRSTRLEN) {
+      gpr_log(GPR_ERROR,
+              "invalid ipv6 address length %zu. Length cannot be greater than "
+              "GRPC_INET6_ADDRSTRLEN i.e %d)",
+              host_without_scope_len, GRPC_INET6_ADDRSTRLEN);
+      goto done;
+    }
     strncpy(host_without_scope, host, host_without_scope_len);
     host_without_scope[host_without_scope_len] = '\0';
     if (grpc_inet_pton(GRPC_AF_INET6, host_without_scope, &in6->sin6_addr) ==
diff --git a/src/core/ext/filters/client_channel/subchannel_index.cc b/src/core/ext/filters/client_channel/subchannel_index.cc
index cb02b1a..f2b6c24 100644
--- a/src/core/ext/filters/client_channel/subchannel_index.cc
+++ b/src/core/ext/filters/client_channel/subchannel_index.cc
@@ -42,7 +42,7 @@
   grpc_subchannel_args args;
 };
 
-static bool g_force_creation = false;
+static gpr_atm g_force_creation = false;
 
 static grpc_subchannel_key* create_key(
     const grpc_subchannel_args* args,
@@ -73,7 +73,8 @@
 
 int grpc_subchannel_key_compare(const grpc_subchannel_key* a,
                                 const grpc_subchannel_key* b) {
-  if (g_force_creation) return false;
+  // To pretend the keys are different, return a non-zero value.
+  if (GPR_UNLIKELY(gpr_atm_no_barrier_load(&g_force_creation))) return 1;
   int c = GPR_ICMP(a->args.filter_count, b->args.filter_count);
   if (c != 0) return c;
   if (a->args.filter_count > 0) {
@@ -250,5 +251,5 @@
 }
 
 void grpc_subchannel_index_test_only_set_force_creation(bool force_creation) {
-  g_force_creation = force_creation;
+  gpr_atm_no_barrier_store(&g_force_creation, force_creation);
 }
diff --git a/src/core/ext/filters/client_channel/subchannel_index.h b/src/core/ext/filters/client_channel/subchannel_index.h
index a7dae9d..c135613 100644
--- a/src/core/ext/filters/client_channel/subchannel_index.h
+++ b/src/core/ext/filters/client_channel/subchannel_index.h
@@ -65,13 +65,10 @@
 void grpc_subchannel_index_unref(void);
 
 /** \em TEST ONLY.
- * If \a force_creation is true, all key comparisons will be false, resulting in
+ * If \a force_creation is true, all keys are regarded different, resulting in
  * new subchannels always being created. Otherwise, the keys will be compared as
  * usual.
  *
- * This function is *not* threadsafe on purpose: it should *only* be used in
- * test code.
- *
  * Tests using this function \em MUST run tests with and without \a
  * force_creation set. */
 void grpc_subchannel_index_test_only_set_force_creation(bool force_creation);
diff --git a/src/core/ext/filters/http/client/http_client_filter.cc b/src/core/ext/filters/http/client/http_client_filter.cc
index f44dc03..91fa163 100644
--- a/src/core/ext/filters/http/client/http_client_filter.cc
+++ b/src/core/ext/filters/http/client/http_client_filter.cc
@@ -79,7 +79,12 @@
 static grpc_error* client_filter_incoming_metadata(grpc_call_element* elem,
                                                    grpc_metadata_batch* b) {
   if (b->idx.named.status != nullptr) {
-    if (grpc_mdelem_eq(b->idx.named.status->md, GRPC_MDELEM_STATUS_200)) {
+    /* If both gRPC status and HTTP status are provided in the response, we
+     * should prefer the gRPC status code, as mentioned in
+     * https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md.
+     */
+    if (b->idx.named.grpc_status != nullptr ||
+        grpc_mdelem_eq(b->idx.named.status->md, GRPC_MDELEM_STATUS_200)) {
       grpc_metadata_batch_remove(b, b->idx.named.status);
     } else {
       char* val = grpc_dump_slice(GRPC_MDVALUE(b->idx.named.status->md),
diff --git a/src/core/lib/security/security_connector/security_connector.cc b/src/core/lib/security/security_connector/security_connector.cc
index 04b4c87..6246613 100644
--- a/src/core/lib/security/security_connector/security_connector.cc
+++ b/src/core/lib/security/security_connector/security_connector.cc
@@ -59,8 +59,8 @@
 
 /** Environment variable used as a flag to enable/disable loading system root
     certificates from the OS trust store. */
-#ifndef GRPC_USE_SYSTEM_SSL_ROOTS_ENV_VAR
-#define GRPC_USE_SYSTEM_SSL_ROOTS_ENV_VAR "GRPC_USE_SYSTEM_SSL_ROOTS"
+#ifndef GRPC_NOT_USE_SYSTEM_SSL_ROOTS_ENV_VAR
+#define GRPC_NOT_USE_SYSTEM_SSL_ROOTS_ENV_VAR "GRPC_NOT_USE_SYSTEM_SSL_ROOTS"
 #endif
 
 #ifndef TSI_OPENSSL_ALPN_SUPPORT
@@ -1192,10 +1192,10 @@
 
 grpc_slice DefaultSslRootStore::ComputePemRootCerts() {
   grpc_slice result = grpc_empty_slice();
-  char* use_system_roots_env_value =
-      gpr_getenv(GRPC_USE_SYSTEM_SSL_ROOTS_ENV_VAR);
-  const bool use_system_roots = gpr_is_true(use_system_roots_env_value);
-  gpr_free(use_system_roots_env_value);
+  char* not_use_system_roots_env_value =
+      gpr_getenv(GRPC_NOT_USE_SYSTEM_SSL_ROOTS_ENV_VAR);
+  const bool not_use_system_roots = gpr_is_true(not_use_system_roots_env_value);
+  gpr_free(not_use_system_roots_env_value);
   // First try to load the roots from the environment.
   char* default_root_certs_path =
       gpr_getenv(GRPC_DEFAULT_SSL_ROOTS_FILE_PATH_ENV_VAR);
@@ -1218,7 +1218,7 @@
     gpr_free(pem_root_certs);
   }
   // Try loading roots from OS trust store if flag is enabled.
-  if (GRPC_SLICE_IS_EMPTY(result) && use_system_roots) {
+  if (GRPC_SLICE_IS_EMPTY(result) && !not_use_system_roots) {
     result = LoadSystemRootCerts();
   }
   // Fallback to roots manually shipped with gRPC.
diff --git a/src/core/tsi/alts/handshaker/alts_handshaker_client.cc b/src/core/tsi/alts/handshaker/alts_handshaker_client.cc
index b5268ad..17e8026 100644
--- a/src/core/tsi/alts/handshaker/alts_handshaker_client.cc
+++ b/src/core/tsi/alts/handshaker/alts_handshaker_client.cc
@@ -24,6 +24,7 @@
 #include <grpc/support/alloc.h>
 #include <grpc/support/log.h>
 
+#include "src/core/lib/slice/slice_internal.h"
 #include "src/core/tsi/alts/handshaker/alts_handshaker_service_api.h"
 
 const int kHandshakerClientOpNum = 4;
@@ -109,7 +110,7 @@
   if (ok) {
     buffer = grpc_raw_byte_buffer_create(&slice, 1 /* number of slices */);
   }
-  grpc_slice_unref(slice);
+  grpc_slice_unref_internal(slice);
   gpr_free(target_name);
   grpc_gcp_handshaker_req_destroy(req);
   return buffer;
@@ -157,7 +158,7 @@
   if (ok) {
     buffer = grpc_raw_byte_buffer_create(&req_slice, 1 /* number of slices */);
   }
-  grpc_slice_unref(req_slice);
+  grpc_slice_unref_internal(req_slice);
   grpc_gcp_handshaker_req_destroy(req);
   return buffer;
 }
@@ -195,7 +196,7 @@
   if (ok) {
     buffer = grpc_raw_byte_buffer_create(&req_slice, 1 /* number of slices */);
   }
-  grpc_slice_unref(req_slice);
+  grpc_slice_unref_internal(req_slice);
   grpc_gcp_handshaker_req_destroy(req);
   return buffer;
 }
@@ -258,7 +259,7 @@
       grpc_slice_from_static_string(ALTS_SERVICE_METHOD), &slice,
       gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
   client->base.vtable = &vtable;
-  grpc_slice_unref(slice);
+  grpc_slice_unref_internal(slice);
   return &client->base;
 }
 
diff --git a/src/core/tsi/alts/handshaker/alts_handshaker_service_api_util.cc b/src/core/tsi/alts/handshaker/alts_handshaker_service_api_util.cc
index e0e4184..d63d353 100644
--- a/src/core/tsi/alts/handshaker/alts_handshaker_service_api_util.cc
+++ b/src/core/tsi/alts/handshaker/alts_handshaker_service_api_util.cc
@@ -20,6 +20,8 @@
 
 #include "src/core/tsi/alts/handshaker/alts_handshaker_service_api_util.h"
 
+#include "src/core/lib/slice/slice_internal.h"
+
 void add_repeated_field(repeated_field** head, const void* data) {
   repeated_field* field =
       static_cast<repeated_field*>(gpr_zalloc(sizeof(*field)));
@@ -67,7 +69,7 @@
 
 void destroy_slice(grpc_slice* slice) {
   if (slice != nullptr) {
-    grpc_slice_unref(*slice);
+    grpc_slice_unref_internal(*slice);
     gpr_free(slice);
   }
 }
diff --git a/src/core/tsi/alts/handshaker/alts_tsi_event.cc b/src/core/tsi/alts/handshaker/alts_tsi_event.cc
index ec0bf12..cb36d5e 100644
--- a/src/core/tsi/alts/handshaker/alts_tsi_event.cc
+++ b/src/core/tsi/alts/handshaker/alts_tsi_event.cc
@@ -24,6 +24,8 @@
 #include <grpc/support/alloc.h>
 #include <grpc/support/log.h>
 
+#include "src/core/lib/slice/slice_internal.h"
+
 tsi_result alts_tsi_event_create(alts_tsi_handshaker* handshaker,
                                  tsi_handshaker_on_next_done_cb cb,
                                  void* user_data,
@@ -66,8 +68,8 @@
   grpc_byte_buffer_destroy(event->recv_buffer);
   grpc_metadata_array_destroy(&event->initial_metadata);
   grpc_metadata_array_destroy(&event->trailing_metadata);
-  grpc_slice_unref(event->details);
-  grpc_slice_unref(event->target_name);
+  grpc_slice_unref_internal(event->details);
+  grpc_slice_unref_internal(event->target_name);
   grpc_alts_credentials_options_destroy(event->options);
   gpr_free(event);
 }
diff --git a/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc b/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc
index 1df1021..34608a3 100644
--- a/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc
+++ b/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc
@@ -31,6 +31,7 @@
 
 #include "src/core/lib/gpr/host_port.h"
 #include "src/core/lib/gprpp/thd.h"
+#include "src/core/lib/slice/slice_internal.h"
 #include "src/core/tsi/alts/frame_protector/alts_frame_protector.h"
 #include "src/core/tsi/alts/handshaker/alts_handshaker_client.h"
 #include "src/core/tsi/alts/handshaker/alts_tsi_utils.h"
@@ -182,7 +183,7 @@
   gpr_free(result->peer_identity);
   gpr_free(result->key_data);
   gpr_free(result->unused_bytes);
-  grpc_slice_unref(result->rpc_versions);
+  grpc_slice_unref_internal(result->rpc_versions);
   gpr_free(result);
 }
 
@@ -269,12 +270,12 @@
     handshaker->has_sent_start_message = true;
   } else {
     if (!GRPC_SLICE_IS_EMPTY(handshaker->recv_bytes)) {
-      grpc_slice_unref(handshaker->recv_bytes);
+      grpc_slice_unref_internal(handshaker->recv_bytes);
     }
     handshaker->recv_bytes = grpc_slice_ref(slice);
     ok = alts_handshaker_client_next(handshaker->client, event, &slice);
   }
-  grpc_slice_unref(slice);
+  grpc_slice_unref_internal(slice);
   if (ok != TSI_OK) {
     gpr_log(GPR_ERROR, "Failed to schedule ALTS handshaker requests");
     return ok;
@@ -299,8 +300,8 @@
   alts_tsi_handshaker* handshaker =
       reinterpret_cast<alts_tsi_handshaker*>(self);
   alts_handshaker_client_destroy(handshaker->client);
-  grpc_slice_unref(handshaker->recv_bytes);
-  grpc_slice_unref(handshaker->target_name);
+  grpc_slice_unref_internal(handshaker->recv_bytes);
+  grpc_slice_unref_internal(handshaker->target_name);
   grpc_alts_credentials_options_destroy(handshaker->options);
   gpr_free(handshaker->buffer);
   gpr_free(handshaker);
diff --git a/src/core/tsi/alts/handshaker/alts_tsi_utils.cc b/src/core/tsi/alts/handshaker/alts_tsi_utils.cc
index d9b5e6c..1747f1a 100644
--- a/src/core/tsi/alts/handshaker/alts_tsi_utils.cc
+++ b/src/core/tsi/alts/handshaker/alts_tsi_utils.cc
@@ -22,6 +22,8 @@
 
 #include <grpc/byte_buffer_reader.h>
 
+#include "src/core/lib/slice/slice_internal.h"
+
 tsi_result alts_tsi_utils_convert_to_tsi_result(grpc_status_code code) {
   switch (code) {
     case GRPC_STATUS_OK:
@@ -47,7 +49,7 @@
   grpc_slice slice = grpc_byte_buffer_reader_readall(&bbr);
   grpc_gcp_handshaker_resp* resp = grpc_gcp_handshaker_resp_create();
   bool ok = grpc_gcp_handshaker_resp_decode(slice, resp);
-  grpc_slice_unref(slice);
+  grpc_slice_unref_internal(slice);
   grpc_byte_buffer_reader_destroy(&bbr);
   if (!ok) {
     grpc_gcp_handshaker_resp_destroy(resp);
diff --git a/src/core/tsi/alts/zero_copy_frame_protector/alts_grpc_privacy_integrity_record_protocol.cc b/src/core/tsi/alts/zero_copy_frame_protector/alts_grpc_privacy_integrity_record_protocol.cc
index d4fd88d..e789090 100644
--- a/src/core/tsi/alts/zero_copy_frame_protector/alts_grpc_privacy_integrity_record_protocol.cc
+++ b/src/core/tsi/alts/zero_copy_frame_protector/alts_grpc_privacy_integrity_record_protocol.cc
@@ -61,7 +61,7 @@
   if (status != GRPC_STATUS_OK) {
     gpr_log(GPR_ERROR, "Failed to protect, %s", error_details);
     gpr_free(error_details);
-    grpc_slice_unref(protected_slice);
+    grpc_slice_unref_internal(protected_slice);
     return TSI_INTERNAL_ERROR;
   }
   grpc_slice_buffer_add(protected_slices, protected_slice);
@@ -106,7 +106,7 @@
   if (status != GRPC_STATUS_OK) {
     gpr_log(GPR_ERROR, "Failed to unprotect, %s", error_details);
     gpr_free(error_details);
-    grpc_slice_unref(unprotected_slice);
+    grpc_slice_unref_internal(unprotected_slice);
     return TSI_INTERNAL_ERROR;
   }
   grpc_slice_buffer_reset_and_unref_internal(&rp->header_sb);
diff --git a/src/core/tsi/ssl/session_cache/ssl_session_cache.cc b/src/core/tsi/ssl/session_cache/ssl_session_cache.cc
index ce74fde..f9184bc 100644
--- a/src/core/tsi/ssl/session_cache/ssl_session_cache.cc
+++ b/src/core/tsi/ssl/session_cache/ssl_session_cache.cc
@@ -19,6 +19,7 @@
 #include <grpc/support/port_platform.h>
 
 #include "src/core/lib/gprpp/mutex_lock.h"
+#include "src/core/lib/slice/slice_internal.h"
 #include "src/core/tsi/ssl/session_cache/ssl_session.h"
 #include "src/core/tsi/ssl/session_cache/ssl_session_cache.h"
 
@@ -53,7 +54,7 @@
     SetSession(std::move(session));
   }
 
-  ~Node() { grpc_slice_unref(key_); }
+  ~Node() { grpc_slice_unref_internal(key_); }
 
   // Not copyable nor movable.
   Node(const Node&) = delete;
diff --git a/src/cpp/server/health/default_health_check_service.cc b/src/cpp/server/health/default_health_check_service.cc
index bfda67d..fc3db1b 100644
--- a/src/cpp/server/health/default_health_check_service.cc
+++ b/src/cpp/server/health/default_health_check_service.cc
@@ -30,29 +30,162 @@
 #include "src/cpp/server/health/health.pb.h"
 
 namespace grpc {
+
+//
+// DefaultHealthCheckService
+//
+
+DefaultHealthCheckService::DefaultHealthCheckService() {
+  services_map_[""].SetServingStatus(SERVING);
+}
+
+void DefaultHealthCheckService::SetServingStatus(
+    const grpc::string& service_name, bool serving) {
+  std::unique_lock<std::mutex> lock(mu_);
+  services_map_[service_name].SetServingStatus(serving ? SERVING : NOT_SERVING);
+}
+
+void DefaultHealthCheckService::SetServingStatus(bool serving) {
+  const ServingStatus status = serving ? SERVING : NOT_SERVING;
+  std::unique_lock<std::mutex> lock(mu_);
+  for (auto& p : services_map_) {
+    ServiceData& service_data = p.second;
+    service_data.SetServingStatus(status);
+  }
+}
+
+DefaultHealthCheckService::ServingStatus
+DefaultHealthCheckService::GetServingStatus(
+    const grpc::string& service_name) const {
+  std::lock_guard<std::mutex> lock(mu_);
+  auto it = services_map_.find(service_name);
+  if (it == services_map_.end()) {
+    return NOT_FOUND;
+  }
+  const ServiceData& service_data = it->second;
+  return service_data.GetServingStatus();
+}
+
+void DefaultHealthCheckService::RegisterCallHandler(
+    const grpc::string& service_name,
+    std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
+  std::unique_lock<std::mutex> lock(mu_);
+  ServiceData& service_data = services_map_[service_name];
+  service_data.AddCallHandler(handler /* copies ref */);
+  handler->SendHealth(std::move(handler), service_data.GetServingStatus());
+}
+
+void DefaultHealthCheckService::UnregisterCallHandler(
+    const grpc::string& service_name,
+    std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
+  std::unique_lock<std::mutex> lock(mu_);
+  auto it = services_map_.find(service_name);
+  if (it == services_map_.end()) return;
+  ServiceData& service_data = it->second;
+  service_data.RemoveCallHandler(std::move(handler));
+  if (service_data.Unused()) {
+    services_map_.erase(it);
+  }
+}
+
+DefaultHealthCheckService::HealthCheckServiceImpl*
+DefaultHealthCheckService::GetHealthCheckService(
+    std::unique_ptr<ServerCompletionQueue> cq) {
+  GPR_ASSERT(impl_ == nullptr);
+  impl_.reset(new HealthCheckServiceImpl(this, std::move(cq)));
+  return impl_.get();
+}
+
+//
+// DefaultHealthCheckService::ServiceData
+//
+
+void DefaultHealthCheckService::ServiceData::SetServingStatus(
+    ServingStatus status) {
+  status_ = status;
+  for (auto& call_handler : call_handlers_) {
+    call_handler->SendHealth(call_handler /* copies ref */, status);
+  }
+}
+
+void DefaultHealthCheckService::ServiceData::AddCallHandler(
+    std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
+  call_handlers_.insert(std::move(handler));
+}
+
+void DefaultHealthCheckService::ServiceData::RemoveCallHandler(
+    std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
+  call_handlers_.erase(std::move(handler));
+}
+
+//
+// DefaultHealthCheckService::HealthCheckServiceImpl
+//
+
 namespace {
 const char kHealthCheckMethodName[] = "/grpc.health.v1.Health/Check";
+const char kHealthWatchMethodName[] = "/grpc.health.v1.Health/Watch";
 }  // namespace
 
 DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl(
-    DefaultHealthCheckService* service)
-    : service_(service), method_(nullptr) {
-  internal::MethodHandler* handler =
-      new internal::RpcMethodHandler<HealthCheckServiceImpl, ByteBuffer,
-                                     ByteBuffer>(
-          std::mem_fn(&HealthCheckServiceImpl::Check), this);
-  method_ = new internal::RpcServiceMethod(
-      kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, handler);
-  AddMethod(method_);
+    DefaultHealthCheckService* database,
+    std::unique_ptr<ServerCompletionQueue> cq)
+    : database_(database), cq_(std::move(cq)) {
+  // Add Check() method.
+  check_method_ = new internal::RpcServiceMethod(
+      kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, nullptr);
+  AddMethod(check_method_);
+  // Add Watch() method.
+  watch_method_ = new internal::RpcServiceMethod(
+      kHealthWatchMethodName, internal::RpcMethod::SERVER_STREAMING, nullptr);
+  AddMethod(watch_method_);
+  // Create serving thread.
+  thread_ = std::unique_ptr<::grpc_core::Thread>(
+      new ::grpc_core::Thread("grpc_health_check_service", Serve, this));
 }
 
-Status DefaultHealthCheckService::HealthCheckServiceImpl::Check(
-    ServerContext* context, const ByteBuffer* request, ByteBuffer* response) {
-  // Decode request.
-  std::vector<Slice> slices;
-  if (!request->Dump(&slices).ok()) {
-    return Status(StatusCode::INVALID_ARGUMENT, "");
+DefaultHealthCheckService::HealthCheckServiceImpl::~HealthCheckServiceImpl() {
+  // We will reach here after the server starts shutting down.
+  shutdown_ = true;
+  {
+    std::unique_lock<std::mutex> lock(cq_shutdown_mu_);
+    cq_->Shutdown();
   }
+  thread_->Join();
+}
+
+void DefaultHealthCheckService::HealthCheckServiceImpl::StartServingThread() {
+  thread_->Start();
+}
+
+void DefaultHealthCheckService::HealthCheckServiceImpl::Serve(void* arg) {
+  HealthCheckServiceImpl* service =
+      reinterpret_cast<HealthCheckServiceImpl*>(arg);
+  // TODO(juanlishen): This is a workaround to wait for the cq to be ready.
+  // Need to figure out why cq is not ready after service starts.
+  gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
+                               gpr_time_from_seconds(1, GPR_TIMESPAN)));
+  CheckCallHandler::CreateAndStart(service->cq_.get(), service->database_,
+                                   service);
+  WatchCallHandler::CreateAndStart(service->cq_.get(), service->database_,
+                                   service);
+  void* tag;
+  bool ok;
+  while (true) {
+    if (!service->cq_->Next(&tag, &ok)) {
+      // The completion queue is shutting down.
+      GPR_ASSERT(service->shutdown_);
+      break;
+    }
+    auto* next_step = static_cast<CallableTag*>(tag);
+    next_step->Run(ok);
+  }
+}
+
+bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest(
+    const ByteBuffer& request, grpc::string* service_name) {
+  std::vector<Slice> slices;
+  if (!request.Dump(&slices).ok()) return false;
   uint8_t* request_bytes = nullptr;
   bool request_bytes_owned = false;
   size_t request_size = 0;
@@ -64,14 +197,13 @@
     request_size = slices[0].size();
   } else {
     request_bytes_owned = true;
-    request_bytes = static_cast<uint8_t*>(gpr_malloc(request->Length()));
+    request_bytes = static_cast<uint8_t*>(gpr_malloc(request.Length()));
     uint8_t* copy_to = request_bytes;
     for (size_t i = 0; i < slices.size(); i++) {
       memcpy(copy_to, slices[i].begin(), slices[i].size());
       copy_to += slices[i].size();
     }
   }
-
   if (request_bytes != nullptr) {
     pb_istream_t istream = pb_istream_from_buffer(request_bytes, request_size);
     bool decode_status = pb_decode(
@@ -79,26 +211,22 @@
     if (request_bytes_owned) {
       gpr_free(request_bytes);
     }
-    if (!decode_status) {
-      return Status(StatusCode::INVALID_ARGUMENT, "");
-    }
+    if (!decode_status) return false;
   }
+  *service_name = request_struct.has_service ? request_struct.service : "";
+  return true;
+}
 
-  // Check status from the associated default health checking service.
-  DefaultHealthCheckService::ServingStatus serving_status =
-      service_->GetServingStatus(
-          request_struct.has_service ? request_struct.service : "");
-  if (serving_status == DefaultHealthCheckService::NOT_FOUND) {
-    return Status(StatusCode::NOT_FOUND, "");
-  }
-
-  // Encode response
+bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse(
+    ServingStatus status, ByteBuffer* response) {
   grpc_health_v1_HealthCheckResponse response_struct;
   response_struct.has_status = true;
   response_struct.status =
-      serving_status == DefaultHealthCheckService::SERVING
-          ? grpc_health_v1_HealthCheckResponse_ServingStatus_SERVING
-          : grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING;
+      status == NOT_FOUND
+          ? grpc_health_v1_HealthCheckResponse_ServingStatus_SERVICE_UNKNOWN
+          : status == SERVING
+                ? grpc_health_v1_HealthCheckResponse_ServingStatus_SERVING
+                : grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING;
   pb_ostream_t ostream;
   memset(&ostream, 0, sizeof(ostream));
   pb_encode(&ostream, grpc_health_v1_HealthCheckResponse_fields,
@@ -108,48 +236,282 @@
                                    GRPC_SLICE_LENGTH(response_slice));
   bool encode_status = pb_encode(
       &ostream, grpc_health_v1_HealthCheckResponse_fields, &response_struct);
-  if (!encode_status) {
-    return Status(StatusCode::INTERNAL, "Failed to encode response.");
-  }
+  if (!encode_status) return false;
   Slice encoded_response(response_slice, Slice::STEAL_REF);
   ByteBuffer response_buffer(&encoded_response, 1);
   response->Swap(&response_buffer);
-  return Status::OK;
+  return true;
 }
 
-DefaultHealthCheckService::DefaultHealthCheckService() {
-  services_map_.emplace("", true);
-}
+//
+// DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler
+//
 
-void DefaultHealthCheckService::SetServingStatus(
-    const grpc::string& service_name, bool serving) {
-  std::lock_guard<std::mutex> lock(mu_);
-  services_map_[service_name] = serving;
-}
-
-void DefaultHealthCheckService::SetServingStatus(bool serving) {
-  std::lock_guard<std::mutex> lock(mu_);
-  for (auto iter = services_map_.begin(); iter != services_map_.end(); ++iter) {
-    iter->second = serving;
+void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
+    CreateAndStart(ServerCompletionQueue* cq,
+                   DefaultHealthCheckService* database,
+                   HealthCheckServiceImpl* service) {
+  std::shared_ptr<CallHandler> self =
+      std::make_shared<CheckCallHandler>(cq, database, service);
+  CheckCallHandler* handler = static_cast<CheckCallHandler*>(self.get());
+  {
+    std::unique_lock<std::mutex> lock(service->cq_shutdown_mu_);
+    if (service->shutdown_) return;
+    // Request a Check() call.
+    handler->next_ =
+        CallableTag(std::bind(&CheckCallHandler::OnCallReceived, handler,
+                              std::placeholders::_1, std::placeholders::_2),
+                    std::move(self));
+    service->RequestAsyncUnary(0, &handler->ctx_, &handler->request_,
+                               &handler->writer_, cq, cq, &handler->next_);
   }
 }
 
-DefaultHealthCheckService::ServingStatus
-DefaultHealthCheckService::GetServingStatus(
-    const grpc::string& service_name) const {
-  std::lock_guard<std::mutex> lock(mu_);
-  const auto& iter = services_map_.find(service_name);
-  if (iter == services_map_.end()) {
-    return NOT_FOUND;
+DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
+    CheckCallHandler(ServerCompletionQueue* cq,
+                     DefaultHealthCheckService* database,
+                     HealthCheckServiceImpl* service)
+    : cq_(cq), database_(database), service_(service), writer_(&ctx_) {}
+
+void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
+    OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) {
+  if (!ok) {
+    // The value of ok being false means that the server is shutting down.
+    return;
   }
-  return iter->second ? SERVING : NOT_SERVING;
+  // Spawn a new handler instance to serve the next new client. Every handler
+  // instance will deallocate itself when it's done.
+  CreateAndStart(cq_, database_, service_);
+  // Process request.
+  gpr_log(GPR_DEBUG, "[HCS %p] Health check started for handler %p", service_,
+          this);
+  grpc::string service_name;
+  grpc::Status status = Status::OK;
+  ByteBuffer response;
+  if (!service_->DecodeRequest(request_, &service_name)) {
+    status = Status(StatusCode::INVALID_ARGUMENT, "");
+  } else {
+    ServingStatus serving_status = database_->GetServingStatus(service_name);
+    if (serving_status == NOT_FOUND) {
+      status = Status(StatusCode::NOT_FOUND, "service name unknown");
+    } else if (!service_->EncodeResponse(serving_status, &response)) {
+      status = Status(StatusCode::INTERNAL, "");
+    }
+  }
+  // Send response.
+  {
+    std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
+    if (!service_->shutdown_) {
+      next_ =
+          CallableTag(std::bind(&CheckCallHandler::OnFinishDone, this,
+                                std::placeholders::_1, std::placeholders::_2),
+                      std::move(self));
+      if (status.ok()) {
+        writer_.Finish(response, status, &next_);
+      } else {
+        writer_.FinishWithError(status, &next_);
+      }
+    }
+  }
 }
 
-DefaultHealthCheckService::HealthCheckServiceImpl*
-DefaultHealthCheckService::GetHealthCheckService() {
-  GPR_ASSERT(impl_ == nullptr);
-  impl_.reset(new HealthCheckServiceImpl(this));
-  return impl_.get();
+void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
+    OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) {
+  if (ok) {
+    gpr_log(GPR_DEBUG, "[HCS %p] Health check call finished for handler %p",
+            service_, this);
+  }
+}
+
+//
+// DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler
+//
+
+void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
+    CreateAndStart(ServerCompletionQueue* cq,
+                   DefaultHealthCheckService* database,
+                   HealthCheckServiceImpl* service) {
+  std::shared_ptr<CallHandler> self =
+      std::make_shared<WatchCallHandler>(cq, database, service);
+  WatchCallHandler* handler = static_cast<WatchCallHandler*>(self.get());
+  {
+    std::unique_lock<std::mutex> lock(service->cq_shutdown_mu_);
+    if (service->shutdown_) return;
+    // Request AsyncNotifyWhenDone().
+    handler->on_done_notified_ =
+        CallableTag(std::bind(&WatchCallHandler::OnDoneNotified, handler,
+                              std::placeholders::_1, std::placeholders::_2),
+                    self /* copies ref */);
+    handler->ctx_.AsyncNotifyWhenDone(&handler->on_done_notified_);
+    // Request a Watch() call.
+    handler->next_ =
+        CallableTag(std::bind(&WatchCallHandler::OnCallReceived, handler,
+                              std::placeholders::_1, std::placeholders::_2),
+                    std::move(self));
+    service->RequestAsyncServerStreaming(1, &handler->ctx_, &handler->request_,
+                                         &handler->stream_, cq, cq,
+                                         &handler->next_);
+  }
+}
+
+DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
+    WatchCallHandler(ServerCompletionQueue* cq,
+                     DefaultHealthCheckService* database,
+                     HealthCheckServiceImpl* service)
+    : cq_(cq),
+      database_(database),
+      service_(service),
+      stream_(&ctx_),
+      call_state_(WAITING_FOR_CALL) {}
+
+void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
+    OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) {
+  if (ok) {
+    call_state_ = CALL_RECEIVED;
+  } else {
+    // AsyncNotifyWhenDone() needs to be called before the call starts, but the
+    // tag will not pop out if the call never starts (
+    // https://github.com/grpc/grpc/issues/10136). So we need to manually
+    // release the ownership of the handler in this case.
+    GPR_ASSERT(on_done_notified_.ReleaseHandler() != nullptr);
+  }
+  if (!ok || shutdown_) {
+    // The value of ok being false means that the server is shutting down.
+    Shutdown(std::move(self), "OnCallReceived");
+    return;
+  }
+  // Spawn a new handler instance to serve the next new client. Every handler
+  // instance will deallocate itself when it's done.
+  CreateAndStart(cq_, database_, service_);
+  // Parse request.
+  if (!service_->DecodeRequest(request_, &service_name_)) {
+    on_finish_done_ =
+        CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this,
+                              std::placeholders::_1, std::placeholders::_2),
+                    std::move(self));
+    stream_.Finish(Status(StatusCode::INVALID_ARGUMENT, ""), &on_finish_done_);
+    call_state_ = FINISH_CALLED;
+    return;
+  }
+  // Register the call for updates to the service.
+  gpr_log(GPR_DEBUG,
+          "[HCS %p] Health check watch started for service \"%s\" "
+          "(handler: %p)",
+          service_, service_name_.c_str(), this);
+  database_->RegisterCallHandler(service_name_, std::move(self));
+}
+
+void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
+    SendHealth(std::shared_ptr<CallHandler> self, ServingStatus status) {
+  std::unique_lock<std::mutex> lock(mu_);
+  // If there's already a send in flight, cache the new status, and
+  // we'll start a new send for it when the one in flight completes.
+  if (send_in_flight_) {
+    pending_status_ = status;
+    return;
+  }
+  // Start a send.
+  SendHealthLocked(std::move(self), status);
+}
+
+void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
+    SendHealthLocked(std::shared_ptr<CallHandler> self, ServingStatus status) {
+  std::unique_lock<std::mutex> cq_lock(service_->cq_shutdown_mu_);
+  if (service_->shutdown_) {
+    cq_lock.release()->unlock();
+    Shutdown(std::move(self), "SendHealthLocked");
+    return;
+  }
+  send_in_flight_ = true;
+  call_state_ = SEND_MESSAGE_PENDING;
+  // Construct response.
+  ByteBuffer response;
+  if (!service_->EncodeResponse(status, &response)) {
+    on_finish_done_ =
+        CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this,
+                              std::placeholders::_1, std::placeholders::_2),
+                    std::move(self));
+    stream_.Finish(Status(StatusCode::INTERNAL, ""), &on_finish_done_);
+    return;
+  }
+  next_ = CallableTag(std::bind(&WatchCallHandler::OnSendHealthDone, this,
+                                std::placeholders::_1, std::placeholders::_2),
+                      std::move(self));
+  stream_.Write(response, &next_);
+}
+
+void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
+    OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok) {
+  if (!ok || shutdown_) {
+    Shutdown(std::move(self), "OnSendHealthDone");
+    return;
+  }
+  call_state_ = CALL_RECEIVED;
+  {
+    std::unique_lock<std::mutex> lock(mu_);
+    send_in_flight_ = false;
+    // If we got a new status since we started the last send, start a
+    // new send for it.
+    if (pending_status_ != NOT_FOUND) {
+      auto status = pending_status_;
+      pending_status_ = NOT_FOUND;
+      SendHealthLocked(std::move(self), status);
+    }
+  }
+}
+
+void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
+    OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok) {
+  GPR_ASSERT(ok);
+  done_notified_ = true;
+  if (ctx_.IsCancelled()) {
+    is_cancelled_ = true;
+  }
+  gpr_log(GPR_DEBUG,
+          "[HCS %p] Healt check call is notified done (handler: %p, "
+          "is_cancelled: %d).",
+          service_, this, static_cast<int>(is_cancelled_));
+  Shutdown(std::move(self), "OnDoneNotified");
+}
+
+// TODO(roth): This method currently assumes that there will be only one
+// thread polling the cq and invoking the corresponding callbacks.  If
+// that changes, we will need to add synchronization here.
+void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
+    Shutdown(std::shared_ptr<CallHandler> self, const char* reason) {
+  if (!shutdown_) {
+    gpr_log(GPR_DEBUG,
+            "[HCS %p] Shutting down the handler (service_name: \"%s\", "
+            "handler: %p, reason: %s).",
+            service_, service_name_.c_str(), this, reason);
+    shutdown_ = true;
+  }
+  // OnCallReceived() may be called after OnDoneNotified(), so we need to
+  // try to Finish() every time we are in Shutdown().
+  if (call_state_ >= CALL_RECEIVED && call_state_ < FINISH_CALLED) {
+    std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
+    if (!service_->shutdown_) {
+      on_finish_done_ =
+          CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this,
+                                std::placeholders::_1, std::placeholders::_2),
+                      std::move(self));
+      // TODO(juanlishen): Maybe add a message proto for the client to
+      // explicitly cancel the stream so that we can return OK status in such
+      // cases.
+      stream_.Finish(Status::CANCELLED, &on_finish_done_);
+      call_state_ = FINISH_CALLED;
+    }
+  }
+}
+
+void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
+    OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) {
+  if (ok) {
+    gpr_log(GPR_DEBUG,
+            "[HCS %p] Health check call finished (service_name: \"%s\", "
+            "handler: %p).",
+            service_, service_name_.c_str(), this);
+  }
 }
 
 }  // namespace grpc
diff --git a/src/cpp/server/health/default_health_check_service.h b/src/cpp/server/health/default_health_check_service.h
index a1ce5aa..edad594 100644
--- a/src/cpp/server/health/default_health_check_service.h
+++ b/src/cpp/server/health/default_health_check_service.h
@@ -19,42 +19,268 @@
 #ifndef GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H
 #define GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H
 
+#include <atomic>
 #include <mutex>
+#include <set>
 
+#include <grpc/support/log.h>
+#include <grpcpp/grpcpp.h>
 #include <grpcpp/health_check_service_interface.h>
+#include <grpcpp/impl/codegen/async_generic_service.h>
+#include <grpcpp/impl/codegen/async_unary_call.h>
 #include <grpcpp/impl/codegen/service_type.h>
 #include <grpcpp/support/byte_buffer.h>
 
+#include "src/core/lib/gprpp/thd.h"
+
 namespace grpc {
 
 // Default implementation of HealthCheckServiceInterface. Server will create and
 // own it.
 class DefaultHealthCheckService final : public HealthCheckServiceInterface {
  public:
+  enum ServingStatus { NOT_FOUND, SERVING, NOT_SERVING };
+
   // The service impl to register with the server.
   class HealthCheckServiceImpl : public Service {
    public:
-    explicit HealthCheckServiceImpl(DefaultHealthCheckService* service);
+    // Base class for call handlers.
+    class CallHandler {
+     public:
+      virtual ~CallHandler() = default;
+      virtual void SendHealth(std::shared_ptr<CallHandler> self,
+                              ServingStatus status) = 0;
+    };
 
-    Status Check(ServerContext* context, const ByteBuffer* request,
-                 ByteBuffer* response);
+    HealthCheckServiceImpl(DefaultHealthCheckService* database,
+                           std::unique_ptr<ServerCompletionQueue> cq);
+
+    ~HealthCheckServiceImpl();
+
+    void StartServingThread();
 
    private:
-    const DefaultHealthCheckService* const service_;
-    internal::RpcServiceMethod* method_;
+    // A tag that can be called with a bool argument. It's tailored for
+    // CallHandler's use. Before being used, it should be constructed with a
+    // method of CallHandler and a shared pointer to the handler. The
+    // shared pointer will be moved to the invoked function and the function
+    // can only be invoked once. That makes ref counting of the handler easier,
+    // because the shared pointer is not bound to the function and can be gone
+    // once the invoked function returns (if not used any more).
+    class CallableTag {
+     public:
+      using HandlerFunction =
+          std::function<void(std::shared_ptr<CallHandler>, bool)>;
+
+      CallableTag() {}
+
+      CallableTag(HandlerFunction func, std::shared_ptr<CallHandler> handler)
+          : handler_function_(std::move(func)), handler_(std::move(handler)) {
+        GPR_ASSERT(handler_function_ != nullptr);
+        GPR_ASSERT(handler_ != nullptr);
+      }
+
+      // Runs the tag. This should be called only once. The handler is no
+      // longer owned by this tag after this method is invoked.
+      void Run(bool ok) {
+        GPR_ASSERT(handler_function_ != nullptr);
+        GPR_ASSERT(handler_ != nullptr);
+        handler_function_(std::move(handler_), ok);
+      }
+
+      // Releases and returns the shared pointer to the handler.
+      std::shared_ptr<CallHandler> ReleaseHandler() {
+        return std::move(handler_);
+      }
+
+     private:
+      HandlerFunction handler_function_ = nullptr;
+      std::shared_ptr<CallHandler> handler_;
+    };
+
+    // Call handler for Check method.
+    // Each handler takes care of one call. It contains per-call data and it
+    // will access the members of the parent class (i.e.,
+    // DefaultHealthCheckService) for per-service health data.
+    class CheckCallHandler : public CallHandler {
+     public:
+      // Instantiates a CheckCallHandler and requests the next health check
+      // call. The handler object will manage its own lifetime, so no action is
+      // needed from the caller any more regarding that object.
+      static void CreateAndStart(ServerCompletionQueue* cq,
+                                 DefaultHealthCheckService* database,
+                                 HealthCheckServiceImpl* service);
+
+      // This ctor is public because we want to use std::make_shared<> in
+      // CreateAndStart(). This ctor shouldn't be used elsewhere.
+      CheckCallHandler(ServerCompletionQueue* cq,
+                       DefaultHealthCheckService* database,
+                       HealthCheckServiceImpl* service);
+
+      // Not used for Check.
+      void SendHealth(std::shared_ptr<CallHandler> self,
+                      ServingStatus status) override {}
+
+     private:
+      // Called when we receive a call.
+      // Spawns a new handler so that we can keep servicing future calls.
+      void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok);
+
+      // Called when Finish() is done.
+      void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok);
+
+      // The members passed down from HealthCheckServiceImpl.
+      ServerCompletionQueue* cq_;
+      DefaultHealthCheckService* database_;
+      HealthCheckServiceImpl* service_;
+
+      ByteBuffer request_;
+      GenericServerAsyncResponseWriter writer_;
+      ServerContext ctx_;
+
+      CallableTag next_;
+    };
+
+    // Call handler for Watch method.
+    // Each handler takes care of one call. It contains per-call data and it
+    // will access the members of the parent class (i.e.,
+    // DefaultHealthCheckService) for per-service health data.
+    class WatchCallHandler : public CallHandler {
+     public:
+      // Instantiates a WatchCallHandler and requests the next health check
+      // call. The handler object will manage its own lifetime, so no action is
+      // needed from the caller any more regarding that object.
+      static void CreateAndStart(ServerCompletionQueue* cq,
+                                 DefaultHealthCheckService* database,
+                                 HealthCheckServiceImpl* service);
+
+      // This ctor is public because we want to use std::make_shared<> in
+      // CreateAndStart(). This ctor shouldn't be used elsewhere.
+      WatchCallHandler(ServerCompletionQueue* cq,
+                       DefaultHealthCheckService* database,
+                       HealthCheckServiceImpl* service);
+
+      void SendHealth(std::shared_ptr<CallHandler> self,
+                      ServingStatus status) override;
+
+     private:
+      // Called when we receive a call.
+      // Spawns a new handler so that we can keep servicing future calls.
+      void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok);
+
+      // Requires holding mu_.
+      void SendHealthLocked(std::shared_ptr<CallHandler> self,
+                            ServingStatus status);
+
+      // When sending a health result finishes.
+      void OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok);
+
+      // Called when Finish() is done.
+      void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok);
+
+      // Called when AsyncNotifyWhenDone() notifies us.
+      void OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok);
+
+      void Shutdown(std::shared_ptr<CallHandler> self, const char* reason);
+
+      // The members passed down from HealthCheckServiceImpl.
+      ServerCompletionQueue* cq_;
+      DefaultHealthCheckService* database_;
+      HealthCheckServiceImpl* service_;
+
+      ByteBuffer request_;
+      grpc::string service_name_;
+      GenericServerAsyncWriter stream_;
+      ServerContext ctx_;
+
+      std::mutex mu_;
+      bool send_in_flight_ = false;               // Guarded by mu_.
+      ServingStatus pending_status_ = NOT_FOUND;  // Guarded by mu_.
+
+      // The state of the RPC progress.
+      enum CallState {
+        WAITING_FOR_CALL,
+        CALL_RECEIVED,
+        SEND_MESSAGE_PENDING,
+        FINISH_CALLED
+      } call_state_;
+
+      bool shutdown_ = false;
+      bool done_notified_ = false;
+      bool is_cancelled_ = false;
+      CallableTag next_;
+      CallableTag on_done_notified_;
+      CallableTag on_finish_done_;
+    };
+
+    // Handles the incoming requests and drives the completion queue in a loop.
+    static void Serve(void* arg);
+
+    // Returns true on success.
+    static bool DecodeRequest(const ByteBuffer& request,
+                              grpc::string* service_name);
+    static bool EncodeResponse(ServingStatus status, ByteBuffer* response);
+
+    // Needed to appease Windows compilers, which don't seem to allow
+    // nested classes to access protected members in the parent's
+    // superclass.
+    using Service::RequestAsyncServerStreaming;
+    using Service::RequestAsyncUnary;
+
+    DefaultHealthCheckService* database_;
+    std::unique_ptr<ServerCompletionQueue> cq_;
+    internal::RpcServiceMethod* check_method_;
+    internal::RpcServiceMethod* watch_method_;
+
+    // To synchronize the operations related to shutdown state of cq_, so that
+    // we don't enqueue new tags into cq_ after it is already shut down.
+    std::mutex cq_shutdown_mu_;
+    std::atomic_bool shutdown_{false};
+    std::unique_ptr<::grpc_core::Thread> thread_;
   };
 
   DefaultHealthCheckService();
+
   void SetServingStatus(const grpc::string& service_name,
                         bool serving) override;
   void SetServingStatus(bool serving) override;
-  enum ServingStatus { NOT_FOUND, SERVING, NOT_SERVING };
+
   ServingStatus GetServingStatus(const grpc::string& service_name) const;
-  HealthCheckServiceImpl* GetHealthCheckService();
+
+  HealthCheckServiceImpl* GetHealthCheckService(
+      std::unique_ptr<ServerCompletionQueue> cq);
 
  private:
+  // Stores the current serving status of a service and any call
+  // handlers registered for updates when the service's status changes.
+  class ServiceData {
+   public:
+    void SetServingStatus(ServingStatus status);
+    ServingStatus GetServingStatus() const { return status_; }
+    void AddCallHandler(
+        std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
+    void RemoveCallHandler(
+        std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
+    bool Unused() const {
+      return call_handlers_.empty() && status_ == NOT_FOUND;
+    }
+
+   private:
+    ServingStatus status_ = NOT_FOUND;
+    std::set<std::shared_ptr<HealthCheckServiceImpl::CallHandler>>
+        call_handlers_;
+  };
+
+  void RegisterCallHandler(
+      const grpc::string& service_name,
+      std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
+
+  void UnregisterCallHandler(
+      const grpc::string& service_name,
+      std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
+
   mutable std::mutex mu_;
-  std::map<grpc::string, bool> services_map_;
+  std::map<grpc::string, ServiceData> services_map_;  // Guarded by mu_.
   std::unique_ptr<HealthCheckServiceImpl> impl_;
 };
 
diff --git a/src/cpp/server/health/health.pb.c b/src/cpp/server/health/health.pb.c
index 09bd98a..5c214c7 100644
--- a/src/cpp/server/health/health.pb.c
+++ b/src/cpp/server/health/health.pb.c
@@ -2,7 +2,6 @@
 /* Generated by nanopb-0.3.7-dev */
 
 #include "src/cpp/server/health/health.pb.h"
-
 /* @@protoc_insertion_point(includes) */
 #if PB_PROTO_HEADER_VERSION != 30
 #error Regenerate this file with the current version of nanopb generator.
diff --git a/src/cpp/server/health/health.pb.h b/src/cpp/server/health/health.pb.h
index 29e1f3b..9d54ccd 100644
--- a/src/cpp/server/health/health.pb.h
+++ b/src/cpp/server/health/health.pb.h
@@ -17,11 +17,12 @@
 typedef enum _grpc_health_v1_HealthCheckResponse_ServingStatus {
     grpc_health_v1_HealthCheckResponse_ServingStatus_UNKNOWN = 0,
     grpc_health_v1_HealthCheckResponse_ServingStatus_SERVING = 1,
-    grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING = 2
+    grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING = 2,
+    grpc_health_v1_HealthCheckResponse_ServingStatus_SERVICE_UNKNOWN = 3
 } grpc_health_v1_HealthCheckResponse_ServingStatus;
 #define _grpc_health_v1_HealthCheckResponse_ServingStatus_MIN grpc_health_v1_HealthCheckResponse_ServingStatus_UNKNOWN
-#define _grpc_health_v1_HealthCheckResponse_ServingStatus_MAX grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING
-#define _grpc_health_v1_HealthCheckResponse_ServingStatus_ARRAYSIZE ((grpc_health_v1_HealthCheckResponse_ServingStatus)(grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING+1))
+#define _grpc_health_v1_HealthCheckResponse_ServingStatus_MAX grpc_health_v1_HealthCheckResponse_ServingStatus_SERVICE_UNKNOWN
+#define _grpc_health_v1_HealthCheckResponse_ServingStatus_ARRAYSIZE ((grpc_health_v1_HealthCheckResponse_ServingStatus)(grpc_health_v1_HealthCheckResponse_ServingStatus_SERVICE_UNKNOWN+1))
 
 /* Struct definitions */
 typedef struct _grpc_health_v1_HealthCheckRequest {
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index b8ba704..48f0f11 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -559,16 +559,20 @@
 
   // Only create default health check service when user did not provide an
   // explicit one.
+  ServerCompletionQueue* health_check_cq = nullptr;
+  DefaultHealthCheckService::HealthCheckServiceImpl*
+      default_health_check_service_impl = nullptr;
   if (health_check_service_ == nullptr && !health_check_service_disabled_ &&
       DefaultHealthCheckServiceEnabled()) {
-    if (sync_server_cqs_ == nullptr || sync_server_cqs_->empty()) {
-      gpr_log(GPR_INFO,
-              "Default health check service disabled at async-only server.");
-    } else {
-      auto* default_hc_service = new DefaultHealthCheckService;
-      health_check_service_.reset(default_hc_service);
-      RegisterService(nullptr, default_hc_service->GetHealthCheckService());
-    }
+    auto* default_hc_service = new DefaultHealthCheckService;
+    health_check_service_.reset(default_hc_service);
+    health_check_cq = new ServerCompletionQueue(GRPC_CQ_DEFAULT_POLLING);
+    grpc_server_register_completion_queue(server_, health_check_cq->cq(),
+                                          nullptr);
+    default_health_check_service_impl =
+        default_hc_service->GetHealthCheckService(
+            std::unique_ptr<ServerCompletionQueue>(health_check_cq));
+    RegisterService(nullptr, default_health_check_service_impl);
   }
 
   grpc_server_start(server_);
@@ -583,6 +587,9 @@
         new UnimplementedAsyncRequest(this, cqs[i]);
       }
     }
+    if (health_check_cq != nullptr) {
+      new UnimplementedAsyncRequest(this, health_check_cq);
+    }
   }
 
   // If this server has any support for synchronous methods (has any sync
@@ -595,6 +602,10 @@
   for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
     (*it)->Start();
   }
+
+  if (default_health_check_service_impl != nullptr) {
+    default_health_check_service_impl->StartServingThread();
+  }
 }
 
 void Server::ShutdownInternal(gpr_timespec deadline) {
diff --git a/src/csharp/Grpc.Core.Tests/ChannelConnectivityTest.cs b/src/csharp/Grpc.Core.Tests/ChannelConnectivityTest.cs
index a43040f..0834dda 100644
--- a/src/csharp/Grpc.Core.Tests/ChannelConnectivityTest.cs
+++ b/src/csharp/Grpc.Core.Tests/ChannelConnectivityTest.cs
@@ -57,23 +57,27 @@
         [Test]
         public async Task Channel_WaitForStateChangedAsync()
         {
-            helper.UnaryHandler = new UnaryServerMethod<string, string>((request, context) =>
-            {
-                return Task.FromResult(request);
-            });
-
             Assert.ThrowsAsync(typeof(TaskCanceledException), 
-                async () => await channel.WaitForStateChangedAsync(channel.State, DateTime.UtcNow.AddMilliseconds(10)));
+                async () => await channel.WaitForStateChangedAsync(channel.State, DateTime.UtcNow.AddMilliseconds(0)));
 
             var stateChangedTask = channel.WaitForStateChangedAsync(channel.State);
-
-            await Calls.AsyncUnaryCall(helper.CreateUnaryCall(), "abc");
-
+            await channel.ConnectAsync(DateTime.UtcNow.AddMilliseconds(5000));
             await stateChangedTask;
             Assert.AreEqual(ChannelState.Ready, channel.State);
         }
 
         [Test]
+        public async Task Channel_TryWaitForStateChangedAsync()
+        {
+            Assert.IsFalse(await channel.TryWaitForStateChangedAsync(channel.State, DateTime.UtcNow.AddMilliseconds(0)));
+
+            var stateChangedTask = channel.TryWaitForStateChangedAsync(channel.State);
+            await channel.ConnectAsync(DateTime.UtcNow.AddMilliseconds(5000));
+            Assert.IsTrue(await stateChangedTask);
+            Assert.AreEqual(ChannelState.Ready, channel.State);
+        }
+
+        [Test]
         public async Task Channel_ConnectAsync()
         {
             await channel.ConnectAsync();
diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
index 9aab54d..775849d 100644
--- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
+++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
@@ -107,6 +107,42 @@
         }
 
         [Test]
+        public void AsyncUnary_RequestSerializationExceptionDoesntLeakResources()
+        {
+            string nullRequest = null;  // will throw when serializing
+            Assert.Throws(typeof(ArgumentNullException), () => asyncCall.UnaryCallAsync(nullRequest));
+            Assert.AreEqual(0, channel.GetCallReferenceCount());
+            Assert.IsTrue(fakeCall.IsDisposed);
+        }
+
+        [Test]
+        public void AsyncUnary_StartCallFailureDoesntLeakResources()
+        {
+            fakeCall.MakeStartCallFail();
+            Assert.Throws(typeof(InvalidOperationException), () => asyncCall.UnaryCallAsync("request1"));
+            Assert.AreEqual(0, channel.GetCallReferenceCount());
+            Assert.IsTrue(fakeCall.IsDisposed);
+        }
+
+        [Test]
+        public void SyncUnary_RequestSerializationExceptionDoesntLeakResources()
+        {
+            string nullRequest = null;  // will throw when serializing
+            Assert.Throws(typeof(ArgumentNullException), () => asyncCall.UnaryCall(nullRequest));
+            Assert.AreEqual(0, channel.GetCallReferenceCount());
+            Assert.IsTrue(fakeCall.IsDisposed);
+        }
+
+        [Test]
+        public void SyncUnary_StartCallFailureDoesntLeakResources()
+        {
+            fakeCall.MakeStartCallFail();
+            Assert.Throws(typeof(InvalidOperationException), () => asyncCall.UnaryCall("request1"));
+            Assert.AreEqual(0, channel.GetCallReferenceCount());
+            Assert.IsTrue(fakeCall.IsDisposed);
+        }
+
+        [Test]
         public void ClientStreaming_StreamingReadNotAllowed()
         {
             asyncCall.ClientStreamingCallAsync();
@@ -328,6 +364,15 @@
         }
 
         [Test]
+        public void ClientStreaming_StartCallFailureDoesntLeakResources()
+        {
+            fakeCall.MakeStartCallFail();
+            Assert.Throws(typeof(InvalidOperationException), () => asyncCall.ClientStreamingCallAsync());
+            Assert.AreEqual(0, channel.GetCallReferenceCount());
+            Assert.IsTrue(fakeCall.IsDisposed);
+        }
+
+        [Test]
         public void ServerStreaming_StreamingSendNotAllowed()
         {
             asyncCall.StartServerStreamingCall("request1");
@@ -402,6 +447,27 @@
         }
 
         [Test]
+        public void ServerStreaming_RequestSerializationExceptionDoesntLeakResources()
+        {
+            string nullRequest = null;  // will throw when serializing
+            Assert.Throws(typeof(ArgumentNullException), () => asyncCall.StartServerStreamingCall(nullRequest));
+            Assert.AreEqual(0, channel.GetCallReferenceCount());
+            Assert.IsTrue(fakeCall.IsDisposed);
+
+            var responseStream = new ClientResponseStream<string, string>(asyncCall);
+            var readTask = responseStream.MoveNext();
+        }
+
+        [Test]
+        public void ServerStreaming_StartCallFailureDoesntLeakResources()
+        {
+            fakeCall.MakeStartCallFail();
+            Assert.Throws(typeof(InvalidOperationException), () => asyncCall.StartServerStreamingCall("request1"));
+            Assert.AreEqual(0, channel.GetCallReferenceCount());
+            Assert.IsTrue(fakeCall.IsDisposed);
+        }
+
+        [Test]
         public void DuplexStreaming_NoRequestNoResponse_Success()
         {
             asyncCall.StartDuplexStreamingCall();
@@ -558,6 +624,15 @@
             AssertStreamingResponseError(asyncCall, fakeCall, readTask2, StatusCode.Cancelled);
         }
 
+        [Test]
+        public void DuplexStreaming_StartCallFailureDoesntLeakResources()
+        {
+            fakeCall.MakeStartCallFail();
+            Assert.Throws(typeof(InvalidOperationException), () => asyncCall.StartDuplexStreamingCall());
+            Assert.AreEqual(0, channel.GetCallReferenceCount());
+            Assert.IsTrue(fakeCall.IsDisposed);
+        }
+
         ClientSideStatus CreateClientSideStatus(StatusCode statusCode)
         {
             return new ClientSideStatus(new Status(statusCode, ""), new Metadata());
diff --git a/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs b/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs
index 581ac33..ef67918 100644
--- a/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs
+++ b/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs
@@ -31,6 +31,7 @@
     /// </summary>
     internal class FakeNativeCall : INativeCall
     {
+        private bool shouldStartCallFail;
         public IUnaryResponseClientCallback UnaryResponseClientCallback
         {
             get;
@@ -102,26 +103,31 @@
 
         public void StartUnary(IUnaryResponseClientCallback callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
         {
+            StartCallMaybeFail();
             UnaryResponseClientCallback = callback;
         }
 
         public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
         {
+            StartCallMaybeFail();
             throw new NotImplementedException();
         }
 
         public void StartClientStreaming(IUnaryResponseClientCallback callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
         {
+            StartCallMaybeFail();
             UnaryResponseClientCallback = callback;
         }
 
         public void StartServerStreaming(IReceivedStatusOnClientCallback callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
         {
+            StartCallMaybeFail();
             ReceivedStatusOnClientCallback = callback;
         }
 
         public void StartDuplexStreaming(IReceivedStatusOnClientCallback callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
         {
+            StartCallMaybeFail();
             ReceivedStatusOnClientCallback = callback;
         }
 
@@ -165,5 +171,22 @@
         {
             IsDisposed = true;
         }
+
+        /// <summary>
+        /// Emulate CallSafeHandle.CheckOk() failure for all future attempts
+        /// to start a call.
+        /// </summary>
+        public void MakeStartCallFail()
+        {
+            shouldStartCallFail = true;
+        }
+
+        private void StartCallMaybeFail()
+        {
+            if (shouldStartCallFail)
+            {
+                throw new InvalidOperationException("Start call has failed.");
+            }
+        }
     }
 }
diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs
index 7912b06..7ce929d 100644
--- a/src/csharp/Grpc.Core/Channel.cs
+++ b/src/csharp/Grpc.Core/Channel.cs
@@ -136,7 +136,7 @@
         /// </summary>
         public async Task WaitForStateChangedAsync(ChannelState lastObservedState, DateTime? deadline = null)
         {
-            var result = await WaitForStateChangedInternalAsync(lastObservedState, deadline).ConfigureAwait(false);
+            var result = await TryWaitForStateChangedAsync(lastObservedState, deadline).ConfigureAwait(false);
             if (!result)
             {
                 throw new TaskCanceledException("Reached deadline.");
@@ -147,7 +147,7 @@
         /// Returned tasks completes once channel state has become different from
         /// given lastObservedState (<c>true</c> is returned) or if the wait has timed out (<c>false</c> is returned).
         /// </summary>
-        internal Task<bool> WaitForStateChangedInternalAsync(ChannelState lastObservedState, DateTime? deadline = null)
+        public Task<bool> TryWaitForStateChangedAsync(ChannelState lastObservedState, DateTime? deadline = null)
         {
             GrpcPreconditions.CheckArgument(lastObservedState != ChannelState.Shutdown,
                 "Shutdown is a terminal state. No further state changes can occur.");
@@ -297,6 +297,12 @@
             activeCallCounter.Decrement();
         }
 
+        // for testing only
+        internal long GetCallReferenceCount()
+        {
+            return activeCallCounter.Count;
+        }
+
         private ChannelState GetConnectivityState(bool tryToConnect)
         {
             try
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index 66902f3..4cdf0ee 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -17,6 +17,7 @@
 #endregion
 
 using System;
+using System.Threading;
 using System.Threading.Tasks;
 using Grpc.Core.Logging;
 using Grpc.Core.Profiling;
@@ -34,6 +35,8 @@
         readonly CallInvocationDetails<TRequest, TResponse> details;
         readonly INativeCall injectedNativeCall;  // for testing
 
+        bool registeredWithChannel;
+
         // Dispose of to de-register cancellation token registration
         IDisposable cancellationTokenRegistration;
 
@@ -77,43 +80,59 @@
             using (profiler.NewScope("AsyncCall.UnaryCall"))
             using (CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.CreateSync())
             {
-                byte[] payload = UnsafeSerialize(msg);
-
-                unaryResponseTcs = new TaskCompletionSource<TResponse>();
-
-                lock (myLock)
+                bool callStartedOk = false;
+                try
                 {
-                    GrpcPreconditions.CheckState(!started);
-                    started = true;
-                    Initialize(cq);
+                    unaryResponseTcs = new TaskCompletionSource<TResponse>();
 
-                    halfcloseRequested = true;
-                    readingDone = true;
-                }
-
-                using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
-                {
-                    var ctx = details.Channel.Environment.BatchContextPool.Lease();
-                    try
+                    lock (myLock)
                     {
-                        call.StartUnary(ctx, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags);
-                        var ev = cq.Pluck(ctx.Handle);
-                        bool success = (ev.success != 0);
+                        GrpcPreconditions.CheckState(!started);
+                        started = true;
+                        Initialize(cq);
+
+                        halfcloseRequested = true;
+                        readingDone = true;
+                    }
+
+                    byte[] payload = UnsafeSerialize(msg);
+
+                    using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
+                    {
+                        var ctx = details.Channel.Environment.BatchContextPool.Lease();
                         try
                         {
-                            using (profiler.NewScope("AsyncCall.UnaryCall.HandleBatch"))
+                            call.StartUnary(ctx, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags);
+                            callStartedOk = true;
+
+                            var ev = cq.Pluck(ctx.Handle);
+                            bool success = (ev.success != 0);
+                            try
                             {
-                                HandleUnaryResponse(success, ctx.GetReceivedStatusOnClient(), ctx.GetReceivedMessage(), ctx.GetReceivedInitialMetadata());
+                                using (profiler.NewScope("AsyncCall.UnaryCall.HandleBatch"))
+                                {
+                                    HandleUnaryResponse(success, ctx.GetReceivedStatusOnClient(), ctx.GetReceivedMessage(), ctx.GetReceivedInitialMetadata());
+                                }
+                            }
+                            catch (Exception e)
+                            {
+                                Logger.Error(e, "Exception occurred while invoking completion delegate.");
                             }
                         }
-                        catch (Exception e)
+                        finally
                         {
-                            Logger.Error(e, "Exception occurred while invoking completion delegate.");
+                            ctx.Recycle();
                         }
                     }
-                    finally
+                }
+                finally
+                {
+                    if (!callStartedOk)
                     {
-                        ctx.Recycle();
+                        lock (myLock)
+                        {
+                            OnFailedToStartCallLocked();
+                        }
                     }
                 }
                     
@@ -130,22 +149,35 @@
         {
             lock (myLock)
             {
-                GrpcPreconditions.CheckState(!started);
-                started = true;
-
-                Initialize(details.Channel.CompletionQueue);
-
-                halfcloseRequested = true;
-                readingDone = true;
-
-                byte[] payload = UnsafeSerialize(msg);
-
-                unaryResponseTcs = new TaskCompletionSource<TResponse>();
-                using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
+                bool callStartedOk = false;
+                try
                 {
-                    call.StartUnary(UnaryResponseClientCallback, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags);
+                    GrpcPreconditions.CheckState(!started);
+                    started = true;
+
+                    Initialize(details.Channel.CompletionQueue);
+
+                    halfcloseRequested = true;
+                    readingDone = true;
+
+                    byte[] payload = UnsafeSerialize(msg);
+
+                    unaryResponseTcs = new TaskCompletionSource<TResponse>();
+                    using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
+                    {
+                        call.StartUnary(UnaryResponseClientCallback, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags);
+                        callStartedOk = true;
+                    }
+
+                    return unaryResponseTcs.Task;
                 }
-                return unaryResponseTcs.Task;
+                finally
+                {
+                    if (!callStartedOk)
+                    {
+                        OnFailedToStartCallLocked();
+                    }
+                }
             }
         }
 
@@ -157,20 +189,32 @@
         {
             lock (myLock)
             {
-                GrpcPreconditions.CheckState(!started);
-                started = true;
-
-                Initialize(details.Channel.CompletionQueue);
-
-                readingDone = true;
-
-                unaryResponseTcs = new TaskCompletionSource<TResponse>();
-                using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
+                bool callStartedOk = false;
+                try
                 {
-                    call.StartClientStreaming(UnaryResponseClientCallback, metadataArray, details.Options.Flags);
-                }
+                    GrpcPreconditions.CheckState(!started);
+                    started = true;
 
-                return unaryResponseTcs.Task;
+                    Initialize(details.Channel.CompletionQueue);
+
+                    readingDone = true;
+
+                    unaryResponseTcs = new TaskCompletionSource<TResponse>();
+                    using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
+                    {
+                        call.StartClientStreaming(UnaryResponseClientCallback, metadataArray, details.Options.Flags);
+                        callStartedOk = true;
+                    }
+
+                    return unaryResponseTcs.Task;
+                }
+                finally
+                {
+                    if (!callStartedOk)
+                    {
+                        OnFailedToStartCallLocked();
+                    }
+                }
             }
         }
 
@@ -181,21 +225,33 @@
         {
             lock (myLock)
             {
-                GrpcPreconditions.CheckState(!started);
-                started = true;
-
-                Initialize(details.Channel.CompletionQueue);
-
-                halfcloseRequested = true;
-
-                byte[] payload = UnsafeSerialize(msg);
-
-                streamingResponseCallFinishedTcs = new TaskCompletionSource<object>();
-                using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
+                bool callStartedOk = false;
+                try
                 {
-                    call.StartServerStreaming(ReceivedStatusOnClientCallback, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags);
+                    GrpcPreconditions.CheckState(!started);
+                    started = true;
+
+                    Initialize(details.Channel.CompletionQueue);
+
+                    halfcloseRequested = true;
+
+                    byte[] payload = UnsafeSerialize(msg);
+
+                    streamingResponseCallFinishedTcs = new TaskCompletionSource<object>();
+                    using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
+                    {
+                        call.StartServerStreaming(ReceivedStatusOnClientCallback, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags);
+                        callStartedOk = true;
+                    }
+                    call.StartReceiveInitialMetadata(ReceivedResponseHeadersCallback);
                 }
-                call.StartReceiveInitialMetadata(ReceivedResponseHeadersCallback);
+                finally
+                {
+                    if (!callStartedOk)
+                    {
+                        OnFailedToStartCallLocked();
+                    }
+                }
             }
         }
 
@@ -207,17 +263,29 @@
         {
             lock (myLock)
             {
-                GrpcPreconditions.CheckState(!started);
-                started = true;
-
-                Initialize(details.Channel.CompletionQueue);
-
-                streamingResponseCallFinishedTcs = new TaskCompletionSource<object>();
-                using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
+                bool callStartedOk = false;
+                try
                 {
-                    call.StartDuplexStreaming(ReceivedStatusOnClientCallback, metadataArray, details.Options.Flags);
+                    GrpcPreconditions.CheckState(!started);
+                    started = true;
+
+                    Initialize(details.Channel.CompletionQueue);
+
+                    streamingResponseCallFinishedTcs = new TaskCompletionSource<object>();
+                    using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
+                    {
+                        call.StartDuplexStreaming(ReceivedStatusOnClientCallback, metadataArray, details.Options.Flags);
+                        callStartedOk = true;
+                    }
+                    call.StartReceiveInitialMetadata(ReceivedResponseHeadersCallback);
                 }
-                call.StartReceiveInitialMetadata(ReceivedResponseHeadersCallback);
+                finally
+                {
+                    if (!callStartedOk)
+                    {
+                        OnFailedToStartCallLocked();
+                    }
+                }
             }
         }
 
@@ -327,7 +395,11 @@
 
         protected override void OnAfterReleaseResourcesLocked()
         {
-            details.Channel.RemoveCallReference(this);
+            if (registeredWithChannel)
+            {
+                details.Channel.RemoveCallReference(this);
+                registeredWithChannel = false;
+            }
         }
 
         protected override void OnAfterReleaseResourcesUnlocked()
@@ -394,10 +466,27 @@
             var call = CreateNativeCall(cq);
 
             details.Channel.AddCallReference(this);
+            registeredWithChannel = true;
             InitializeInternal(call);
+
             RegisterCancellationCallback();
         }
 
+        private void OnFailedToStartCallLocked()
+        {
+            ReleaseResources();
+
+            // We need to execute the hook that disposes the cancellation token
+            // registration, but it cannot be done from under a lock.
+            // To make things simple, we just schedule the unregistering
+            // on a threadpool.
+            // - Once the native call is disposed, the Cancel() calls are ignored anyway
+            // - We don't care about the overhead as OnFailedToStartCallLocked() only happens
+            //   when something goes very bad when initializing a call and that should
+            //   never happen when gRPC is used correctly.
+            ThreadPool.QueueUserWorkItem((state) => OnAfterReleaseResourcesUnlocked());
+        }
+
         private INativeCall CreateNativeCall(CompletionQueueSafeHandle cq)
         {
             if (injectedNativeCall != null)
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index 5a53049..a93dc34 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -189,7 +189,7 @@
         /// </summary>
         protected abstract Exception GetRpcExceptionClientOnly();
 
-        private void ReleaseResources()
+        protected void ReleaseResources()
         {
             if (call != null)
             {
diff --git a/src/csharp/Grpc.Core/Internal/NativeMetadataCredentialsPlugin.cs b/src/csharp/Grpc.Core/Internal/NativeMetadataCredentialsPlugin.cs
index 4d695e8..faeb51e 100644
--- a/src/csharp/Grpc.Core/Internal/NativeMetadataCredentialsPlugin.cs
+++ b/src/csharp/Grpc.Core/Internal/NativeMetadataCredentialsPlugin.cs
@@ -68,8 +68,8 @@
             }
             catch (Exception e)
             {
-                Native.grpcsharp_metadata_credentials_notify_from_plugin(callbackPtr, userDataPtr, MetadataArraySafeHandle.Create(Metadata.Empty), StatusCode.Unknown, GetMetadataExceptionStatusMsg);
-                Logger.Error(e, GetMetadataExceptionLogMsg);
+                // eat the exception, we must not throw when inside callback from native code.
+                Logger.Error(e, "Exception occurred while invoking native metadata interceptor handler.");
             }
         }
 
@@ -87,7 +87,8 @@
             }
             catch (Exception e)
             {
-                Native.grpcsharp_metadata_credentials_notify_from_plugin(callbackPtr, userDataPtr, MetadataArraySafeHandle.Create(Metadata.Empty), StatusCode.Unknown, GetMetadataExceptionStatusMsg);
+                string detail = GetMetadataExceptionStatusMsg + " " + e.ToString();
+                Native.grpcsharp_metadata_credentials_notify_from_plugin(callbackPtr, userDataPtr, MetadataArraySafeHandle.Create(Metadata.Empty), StatusCode.Unknown, detail);
                 Logger.Error(e, GetMetadataExceptionLogMsg);
             }
         }
diff --git a/src/csharp/Grpc.Core/RpcException.cs b/src/csharp/Grpc.Core/RpcException.cs
index 94429d7..ff89897 100644
--- a/src/csharp/Grpc.Core/RpcException.cs
+++ b/src/csharp/Grpc.Core/RpcException.cs
@@ -33,10 +33,8 @@
         /// Creates a new <c>RpcException</c> associated with given status.
         /// </summary>
         /// <param name="status">Resulting status of a call.</param>
-        public RpcException(Status status) : base(status.ToString())
+        public RpcException(Status status) : this(status, Metadata.Empty, status.ToString())
         {
-            this.status = status;
-            this.trailers = Metadata.Empty;
         }
 
         /// <summary>
@@ -44,10 +42,8 @@
         /// </summary>
         /// <param name="status">Resulting status of a call.</param>
         /// <param name="message">The exception message.</param> 
-        public RpcException(Status status, string message) : base(message)
+        public RpcException(Status status, string message) : this(status, Metadata.Empty, message)
         {
-            this.status = status;
-            this.trailers = Metadata.Empty;
         }
 
         /// <summary>
@@ -55,7 +51,17 @@
         /// </summary>
         /// <param name="status">Resulting status of a call.</param>
         /// <param name="trailers">Response trailing metadata.</param> 
-        public RpcException(Status status, Metadata trailers) : base(status.ToString())
+        public RpcException(Status status, Metadata trailers) : this(status, trailers, status.ToString())
+        {
+        }
+
+        /// <summary>
+        /// Creates a new <c>RpcException</c> associated with given status, message and trailing response metadata.
+        /// </summary>
+        /// <param name="status">Resulting status of a call.</param>
+        /// <param name="trailers">Response trailing metadata.</param>
+        /// <param name="message">The exception message.</param>
+        public RpcException(Status status, Metadata trailers, string message) : base(message)
         {
             this.status = status;
             this.trailers = GrpcPreconditions.CheckNotNull(trailers);
diff --git a/src/csharp/Grpc.IntegrationTesting/MetadataCredentialsTest.cs b/src/csharp/Grpc.IntegrationTesting/MetadataCredentialsTest.cs
index c83ccd2..4044785 100644
--- a/src/csharp/Grpc.IntegrationTesting/MetadataCredentialsTest.cs
+++ b/src/csharp/Grpc.IntegrationTesting/MetadataCredentialsTest.cs
@@ -153,9 +153,10 @@
         [Test]
         public void MetadataCredentials_InterceptorThrows()
         {
+            var authInterceptorExceptionMessage = "Auth interceptor throws";
             var callCredentials = CallCredentials.FromInterceptor(new AsyncAuthInterceptor((context, metadata) =>
             {
-                throw new Exception("Auth interceptor throws");
+                throw new Exception(authInterceptorExceptionMessage);
             }));
             var channelCredentials = ChannelCredentials.Create(TestCredentials.CreateSslCredentials(), callCredentials);
             channel = new Channel(Host, server.Ports.Single().BoundPort, channelCredentials, options);
@@ -163,6 +164,7 @@
 
             var ex = Assert.Throws<RpcException>(() => client.UnaryCall(new SimpleRequest { }));
             Assert.AreEqual(StatusCode.Unavailable, ex.Status.StatusCode);
+            StringAssert.Contains(authInterceptorExceptionMessage, ex.Status.Detail);
         }
 
         private class FakeTestService : TestService.TestServiceBase
diff --git a/src/proto/grpc/health/v1/health.proto b/src/proto/grpc/health/v1/health.proto
index 4b4677b..38843ff 100644
--- a/src/proto/grpc/health/v1/health.proto
+++ b/src/proto/grpc/health/v1/health.proto
@@ -34,10 +34,30 @@
     UNKNOWN = 0;
     SERVING = 1;
     NOT_SERVING = 2;
+    SERVICE_UNKNOWN = 3;  // Used only by the Watch method.
   }
   ServingStatus status = 1;
 }
 
 service Health {
+  // If the requested service is unknown, the call will fail with status
+  // NOT_FOUND.
   rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
+
+  // Performs a watch for the serving status of the requested service.
+  // The server will immediately send back a message indicating the current
+  // serving status.  It will then subsequently send a new message whenever
+  // the service's serving status changes.
+  //
+  // If the requested service is unknown when the call is received, the
+  // server will send a message setting the serving status to
+  // SERVICE_UNKNOWN but will *not* terminate the call.  If at some
+  // future point, the serving status of the service becomes known, the
+  // server will send a new message with the service's serving status.
+  //
+  // If the call terminates with status UNIMPLEMENTED, then clients
+  // should assume this method is not supported and should not retry the
+  // call.  If the call terminates with any other status (including OK),
+  // clients should retry the call with appropriate exponential backoff.
+  rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse);
 }
diff --git a/src/python/grpcio/grpc/BUILD.bazel b/src/python/grpcio/grpc/BUILD.bazel
index 3f214bf..2e6839e 100644
--- a/src/python/grpcio/grpc/BUILD.bazel
+++ b/src/python/grpcio/grpc/BUILD.bazel
@@ -2,7 +2,7 @@
 
 package(default_visibility = ["//visibility:public"])
 
-py_binary(
+py_library(
     name = "grpcio",
     srcs = ["__init__.py"],
     deps = [
@@ -22,7 +22,6 @@
     data = [
         "//:grpc",
     ],
-    main = "__init__.py",
     imports = ["../",],
 )
 
diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py
index 6876601..3494c9b 100644
--- a/src/python/grpcio/grpc/_channel.py
+++ b/src/python/grpcio/grpc/_channel.py
@@ -24,6 +24,7 @@
 from grpc._cython import cygrpc
 from grpc.framework.foundation import callable_util
 
+logging.basicConfig()
 _LOGGER = logging.getLogger(__name__)
 
 _USER_AGENT = 'grpc-python/{}'.format(_grpcio_metadata.__version__)
diff --git a/src/python/grpcio/grpc/_common.py b/src/python/grpcio/grpc/_common.py
index 8358cbe..3805c7e 100644
--- a/src/python/grpcio/grpc/_common.py
+++ b/src/python/grpcio/grpc/_common.py
@@ -20,6 +20,7 @@
 import grpc
 from grpc._cython import cygrpc
 
+logging.basicConfig()
 _LOGGER = logging.getLogger(__name__)
 
 CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY = {
diff --git a/src/python/grpcio/grpc/_cython/BUILD.bazel b/src/python/grpcio/grpc/_cython/BUILD.bazel
index 7124e83..cfd3a51 100644
--- a/src/python/grpcio/grpc/_cython/BUILD.bazel
+++ b/src/python/grpcio/grpc/_cython/BUILD.bazel
@@ -8,6 +8,7 @@
         "__init__.py",
         "cygrpc.pxd",
         "cygrpc.pyx",
+        "_cygrpc/_hooks.pyx.pxi",
         "_cygrpc/grpc_string.pyx.pxi",
         "_cygrpc/arguments.pyx.pxi",
         "_cygrpc/call.pyx.pxi",
@@ -15,6 +16,7 @@
         "_cygrpc/credentials.pyx.pxi",
         "_cygrpc/completion_queue.pyx.pxi",
         "_cygrpc/event.pyx.pxi",
+        "_cygrpc/fork_posix.pyx.pxi",
         "_cygrpc/metadata.pyx.pxi",
         "_cygrpc/operation.pyx.pxi",
         "_cygrpc/records.pyx.pxi",
@@ -24,12 +26,14 @@
         "_cygrpc/time.pyx.pxi",
         "_cygrpc/grpc_gevent.pyx.pxi",
         "_cygrpc/grpc.pxi",
+        "_cygrpc/_hooks.pxd.pxi",
         "_cygrpc/arguments.pxd.pxi",
         "_cygrpc/call.pxd.pxi",
         "_cygrpc/channel.pxd.pxi",
         "_cygrpc/credentials.pxd.pxi",
         "_cygrpc/completion_queue.pxd.pxi",
         "_cygrpc/event.pxd.pxi",
+        "_cygrpc/fork_posix.pxd.pxi",
         "_cygrpc/metadata.pxd.pxi",
         "_cygrpc/operation.pxd.pxi",
         "_cygrpc/records.pxd.pxi",
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_string.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_string.pyx.pxi
index 00a1b23..334e561 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_string.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_string.pyx.pxi
@@ -14,6 +14,7 @@
 
 import logging
 
+logging.basicConfig()
 _LOGGER = logging.getLogger(__name__)
 
 # This function will ascii encode unicode string inputs if neccesary.
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
index ce70172..5779437 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
@@ -18,6 +18,7 @@
 import time
 import grpc
 
+logging.basicConfig()
 _LOGGER = logging.getLogger(__name__)
 
 cdef class Server:
diff --git a/src/python/grpcio/grpc/_plugin_wrapping.py b/src/python/grpcio/grpc/_plugin_wrapping.py
index 916ee08..88ab4d8 100644
--- a/src/python/grpcio/grpc/_plugin_wrapping.py
+++ b/src/python/grpcio/grpc/_plugin_wrapping.py
@@ -20,6 +20,7 @@
 from grpc import _common
 from grpc._cython import cygrpc
 
+logging.basicConfig()
 _LOGGER = logging.getLogger(__name__)
 
 
diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py
index 7276a7f..daa000a 100644
--- a/src/python/grpcio/grpc/_server.py
+++ b/src/python/grpcio/grpc/_server.py
@@ -27,6 +27,7 @@
 from grpc._cython import cygrpc
 from grpc.framework.foundation import callable_util
 
+logging.basicConfig()
 _LOGGER = logging.getLogger(__name__)
 
 _SHUTDOWN_TAG = 'shutdown'
diff --git a/src/python/grpcio/grpc/framework/foundation/callable_util.py b/src/python/grpcio/grpc/framework/foundation/callable_util.py
index 24daf34..fb8d5f7 100644
--- a/src/python/grpcio/grpc/framework/foundation/callable_util.py
+++ b/src/python/grpcio/grpc/framework/foundation/callable_util.py
@@ -21,6 +21,7 @@
 
 import six
 
+logging.basicConfig()
 _LOGGER = logging.getLogger(__name__)
 
 
diff --git a/src/python/grpcio/grpc/framework/foundation/logging_pool.py b/src/python/grpcio/grpc/framework/foundation/logging_pool.py
index 216e399..7702d17 100644
--- a/src/python/grpcio/grpc/framework/foundation/logging_pool.py
+++ b/src/python/grpcio/grpc/framework/foundation/logging_pool.py
@@ -17,6 +17,7 @@
 
 from concurrent import futures
 
+logging.basicConfig()
 _LOGGER = logging.getLogger(__name__)
 
 
diff --git a/src/python/grpcio/grpc/framework/foundation/stream_util.py b/src/python/grpcio/grpc/framework/foundation/stream_util.py
index 1faaf29..9184f95 100644
--- a/src/python/grpcio/grpc/framework/foundation/stream_util.py
+++ b/src/python/grpcio/grpc/framework/foundation/stream_util.py
@@ -19,6 +19,7 @@
 from grpc.framework.foundation import stream
 
 _NO_VALUE = object()
+logging.basicConfig()
 _LOGGER = logging.getLogger(__name__)
 
 
diff --git a/src/python/grpcio_testing/grpc_testing/_channel/_invocation.py b/src/python/grpcio_testing/grpc_testing/_channel/_invocation.py
index 191b1c1..d7205ca 100644
--- a/src/python/grpcio_testing/grpc_testing/_channel/_invocation.py
+++ b/src/python/grpcio_testing/grpc_testing/_channel/_invocation.py
@@ -18,6 +18,7 @@
 import grpc
 
 _NOT_YET_OBSERVED = object()
+logging.basicConfig()
 _LOGGER = logging.getLogger(__name__)
 
 
diff --git a/src/python/grpcio_testing/grpc_testing/_server/_rpc.py b/src/python/grpcio_testing/grpc_testing/_server/_rpc.py
index b856da1..736b714 100644
--- a/src/python/grpcio_testing/grpc_testing/_server/_rpc.py
+++ b/src/python/grpcio_testing/grpc_testing/_server/_rpc.py
@@ -18,6 +18,7 @@
 import grpc
 from grpc_testing import _common
 
+logging.basicConfig()
 _LOGGER = logging.getLogger(__name__)
 
 
diff --git a/src/python/grpcio_testing/grpc_testing/_time.py b/src/python/grpcio_testing/grpc_testing/_time.py
index 75e6db3..9692c34 100644
--- a/src/python/grpcio_testing/grpc_testing/_time.py
+++ b/src/python/grpcio_testing/grpc_testing/_time.py
@@ -21,6 +21,7 @@
 import grpc
 import grpc_testing
 
+logging.basicConfig()
 _LOGGER = logging.getLogger(__name__)
 
 
diff --git a/src/python/grpcio_tests/tests/interop/server.py b/src/python/grpcio_tests/tests/interop/server.py
index fd28d49..768cdaf 100644
--- a/src/python/grpcio_tests/tests/interop/server.py
+++ b/src/python/grpcio_tests/tests/interop/server.py
@@ -25,6 +25,7 @@
 from tests.interop import resources
 from tests.unit import test_common
 
+logging.basicConfig()
 _ONE_DAY_IN_SECONDS = 60 * 60 * 24
 _LOGGER = logging.getLogger(__name__)
 
diff --git a/templates/tools/dockerfile/interoptest/grpc_interop_dart/Dockerfile.template b/templates/tools/dockerfile/interoptest/grpc_interop_dart/Dockerfile.template
index 4fb7b46..34ea645 100644
--- a/templates/tools/dockerfile/interoptest/grpc_interop_dart/Dockerfile.template
+++ b/templates/tools/dockerfile/interoptest/grpc_interop_dart/Dockerfile.template
@@ -14,7 +14,7 @@
   # See the License for the specific language governing permissions and
   # limitations under the License.
 
-  FROM google/dart:latest
+  FROM google/dart:2.0
 
   # Upgrade Dart to version 2.
   RUN apt-get update && apt-get upgrade -y dart
diff --git a/test/core/client_channel/parse_address_test.cc b/test/core/client_channel/parse_address_test.cc
index ae157fb..004549f 100644
--- a/test/core/client_channel/parse_address_test.cc
+++ b/test/core/client_channel/parse_address_test.cc
@@ -91,6 +91,15 @@
   grpc_uri_destroy(uri);
 }
 
+/* Test parsing invalid ipv6 addresses (valid uri_text but invalid ipv6 addr) */
+static void test_grpc_parse_ipv6_invalid(const char* uri_text) {
+  grpc_core::ExecCtx exec_ctx;
+  grpc_uri* uri = grpc_uri_parse(uri_text, 0);
+  grpc_resolved_address addr;
+  GPR_ASSERT(!grpc_parse_ipv6(uri, &addr));
+  grpc_uri_destroy(uri);
+}
+
 int main(int argc, char** argv) {
   grpc_test_init(argc, argv);
   grpc_init();
@@ -100,5 +109,10 @@
   test_grpc_parse_ipv6("ipv6:[2001:db8::1]:12345", "2001:db8::1", 12345, 0);
   test_grpc_parse_ipv6("ipv6:[2001:db8::1%252]:12345", "2001:db8::1", 12345, 2);
 
+  /* Address length greater than GRPC_INET6_ADDRSTRLEN */
+  test_grpc_parse_ipv6_invalid(
+      "ipv6:WWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWW45%"
+      "v6:45%x$1*");
+
   grpc_shutdown();
 }
diff --git a/test/core/end2end/tests/filter_status_code.cc b/test/core/end2end/tests/filter_status_code.cc
index ba3cbfa..5ffc3d0 100644
--- a/test/core/end2end/tests/filter_status_code.cc
+++ b/test/core/end2end/tests/filter_status_code.cc
@@ -16,6 +16,14 @@
  *
  */
 
+/* This test verifies -
+ * 1) grpc_call_final_info passed to the filters on destroying a call contains
+ * the proper status.
+ * 2) If the response has both an HTTP status code and a gRPC status code, then
+ * we should prefer the gRPC status code as mentioned in
+ * https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md
+ */
+
 #include "test/core/end2end/end2end_tests.h"
 
 #include <limits.h>
@@ -249,6 +257,22 @@
   grpc_call_stack* call;
 } final_status_data;
 
+static void server_start_transport_stream_op_batch(
+    grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
+  auto* data = static_cast<final_status_data*>(elem->call_data);
+  if (data->call == g_server_call_stack) {
+    if (op->send_initial_metadata) {
+      auto* batch = op->payload->send_initial_metadata.send_initial_metadata;
+      if (batch->idx.named.status != nullptr) {
+        /* Replace the HTTP status with 404 */
+        grpc_metadata_batch_substitute(batch, batch->idx.named.status,
+                                       GRPC_MDELEM_STATUS_404);
+      }
+    }
+  }
+  grpc_call_next_op(elem, op);
+}
+
 static grpc_error* init_call_elem(grpc_call_element* elem,
                                   const grpc_call_element_args* args) {
   final_status_data* data = static_cast<final_status_data*>(elem->call_data);
@@ -307,7 +331,7 @@
     "client_filter_status_code"};
 
 static const grpc_channel_filter test_server_filter = {
-    grpc_call_next_op,
+    server_start_transport_stream_op_batch,
     grpc_channel_next_op,
     sizeof(final_status_data),
     init_call_elem,
diff --git a/test/core/security/linux_system_roots_test.cc b/test/core/security/linux_system_roots_test.cc
index fce9c8d..24d446d 100644
--- a/test/core/security/linux_system_roots_test.cc
+++ b/test/core/security/linux_system_roots_test.cc
@@ -41,10 +41,6 @@
 
 #include "gtest/gtest.h"
 
-#ifndef GRPC_USE_SYSTEM_SSL_ROOTS_ENV_VAR
-#define GRPC_USE_SYSTEM_SSL_ROOTS_ENV_VAR "GRPC_USE_SYSTEM_SSL_ROOTS"
-#endif
-
 namespace grpc {
 namespace {
 
@@ -68,7 +64,6 @@
 }
 
 TEST(CreateRootCertsBundleTest, BundlesCorrectly) {
-  gpr_setenv(GRPC_USE_SYSTEM_SSL_ROOTS_ENV_VAR, "true");
   // Test that CreateRootCertsBundle returns a correct slice.
   grpc_slice roots_bundle = grpc_empty_slice();
   GRPC_LOG_IF_ERROR(
@@ -81,7 +76,6 @@
   char* bundle_str = grpc_slice_to_c_string(roots_bundle);
   EXPECT_STREQ(result_str, bundle_str);
   // Clean up.
-  unsetenv(GRPC_USE_SYSTEM_SSL_ROOTS_ENV_VAR);
   gpr_free(result_str);
   gpr_free(bundle_str);
   grpc_slice_unref(roots_bundle);
diff --git a/test/core/security/security_connector_test.cc b/test/core/security/security_connector_test.cc
index 82d77ee..9dd37b9 100644
--- a/test/core/security/security_connector_test.cc
+++ b/test/core/security/security_connector_test.cc
@@ -415,6 +415,7 @@
 
   /* Now setup a permanent failure for the overridden roots and we should get
      an empty slice. */
+  gpr_setenv("GRPC_NOT_USE_SYSTEM_SSL_ROOTS", "true");
   grpc_set_ssl_roots_override_callback(override_roots_permanent_failure);
   roots = grpc_core::TestDefaultSslRootStore::ComputePemRootCertsForTesting();
   GPR_ASSERT(GRPC_SLICE_IS_EMPTY(roots));
diff --git a/test/cpp/end2end/health_service_end2end_test.cc b/test/cpp/end2end/health_service_end2end_test.cc
index 1c48b9d..fca65df 100644
--- a/test/cpp/end2end/health_service_end2end_test.cc
+++ b/test/cpp/end2end/health_service_end2end_test.cc
@@ -64,6 +64,29 @@
     return Status::OK;
   }
 
+  Status Watch(ServerContext* context, const HealthCheckRequest* request,
+               ::grpc::ServerWriter<HealthCheckResponse>* writer) override {
+    auto last_state = HealthCheckResponse::UNKNOWN;
+    while (!context->IsCancelled()) {
+      {
+        std::lock_guard<std::mutex> lock(mu_);
+        HealthCheckResponse response;
+        auto iter = status_map_.find(request->service());
+        if (iter == status_map_.end()) {
+          response.set_status(response.SERVICE_UNKNOWN);
+        } else {
+          response.set_status(iter->second);
+        }
+        if (response.status() != last_state) {
+          writer->Write(response, ::grpc::WriteOptions());
+        }
+      }
+      gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
+                                   gpr_time_from_millis(1000, GPR_TIMESPAN)));
+    }
+    return Status::OK;
+  }
+
   void SetStatus(const grpc::string& service_name,
                  HealthCheckResponse::ServingStatus status) {
     std::lock_guard<std::mutex> lock(mu_);
@@ -106,14 +129,6 @@
   HealthCheckServiceImpl* impl_;  // not owned
 };
 
-void LoopCompletionQueue(ServerCompletionQueue* cq) {
-  void* tag;
-  bool ok;
-  while (cq->Next(&tag, &ok)) {
-    abort();  // Nothing should come out of the cq.
-  }
-}
-
 class HealthServiceEnd2endTest : public ::testing::Test {
  protected:
   HealthServiceEnd2endTest() {}
@@ -218,6 +233,33 @@
                        Status(StatusCode::NOT_FOUND, ""));
   }
 
+  void VerifyHealthCheckServiceStreaming() {
+    const grpc::string kServiceName("service_name");
+    HealthCheckServiceInterface* service = server_->GetHealthCheckService();
+    // Start Watch for service.
+    ClientContext context;
+    HealthCheckRequest request;
+    request.set_service(kServiceName);
+    std::unique_ptr<::grpc::ClientReaderInterface<HealthCheckResponse>> reader =
+        hc_stub_->Watch(&context, request);
+    // Initial response will be SERVICE_UNKNOWN.
+    HealthCheckResponse response;
+    EXPECT_TRUE(reader->Read(&response));
+    EXPECT_EQ(response.SERVICE_UNKNOWN, response.status());
+    response.Clear();
+    // Now set service to NOT_SERVING and make sure we get an update.
+    service->SetServingStatus(kServiceName, false);
+    EXPECT_TRUE(reader->Read(&response));
+    EXPECT_EQ(response.NOT_SERVING, response.status());
+    response.Clear();
+    // Now set service to SERVING and make sure we get another update.
+    service->SetServingStatus(kServiceName, true);
+    EXPECT_TRUE(reader->Read(&response));
+    EXPECT_EQ(response.SERVING, response.status());
+    // Finish call.
+    context.TryCancel();
+  }
+
   TestServiceImpl echo_test_service_;
   HealthCheckServiceImpl health_check_service_impl_;
   std::unique_ptr<Health::Stub> hc_stub_;
@@ -245,6 +287,7 @@
   EXPECT_TRUE(DefaultHealthCheckServiceEnabled());
   SetUpServer(true, false, false, nullptr);
   VerifyHealthCheckService();
+  VerifyHealthCheckServiceStreaming();
 
   // The default service has a size limit of the service name.
   const grpc::string kTooLongServiceName(201, 'x');
@@ -252,22 +295,6 @@
                      Status(StatusCode::INVALID_ARGUMENT, ""));
 }
 
-// The server has no sync service.
-TEST_F(HealthServiceEnd2endTest, DefaultHealthServiceAsyncOnly) {
-  EnableDefaultHealthCheckService(true);
-  EXPECT_TRUE(DefaultHealthCheckServiceEnabled());
-  SetUpServer(false, true, false, nullptr);
-  cq_thread_ = std::thread(LoopCompletionQueue, cq_.get());
-
-  HealthCheckServiceInterface* default_service =
-      server_->GetHealthCheckService();
-  EXPECT_TRUE(default_service == nullptr);
-
-  ResetStubs();
-
-  SendHealthCheckRpc("", Status(StatusCode::UNIMPLEMENTED, ""));
-}
-
 // Provide an empty service to disable the default service.
 TEST_F(HealthServiceEnd2endTest, ExplicitlyDisableViaOverride) {
   EnableDefaultHealthCheckService(true);
@@ -296,6 +323,7 @@
   ResetStubs();
 
   VerifyHealthCheckService();
+  VerifyHealthCheckServiceStreaming();
 }
 
 }  // namespace
diff --git a/test/cpp/qps/qps_benchmark_script.bzl b/test/cpp/qps/qps_benchmark_script.bzl
index 0904ceb..b2b67d9 100644
--- a/test/cpp/qps/qps_benchmark_script.bzl
+++ b/test/cpp/qps/qps_benchmark_script.bzl
@@ -49,6 +49,9 @@
                 "//test/cpp/util:test_config",
                 "//test/cpp/util:test_util",
             ],
+            tags = [
+                "qps_json_driver",
+            ],
         )
 
 def json_run_localhost_batch():
@@ -71,4 +74,7 @@
                 "//test/cpp/util:test_config",
                 "//test/cpp/util:test_util",
             ],
+            tags = [
+                "json_run_localhost",
+            ],
         )
diff --git a/test/cpp/util/cli_credentials.cc b/test/cpp/util/cli_credentials.cc
index acf4ef8..0a92261 100644
--- a/test/cpp/util/cli_credentials.cc
+++ b/test/cpp/util/cli_credentials.cc
@@ -28,7 +28,8 @@
             "--channel_creds_type=gdc.");
 DEFINE_string(
     access_token, "",
-    "The access token that will be sent to the server to authenticate RPCs.");
+    "The access token that will be sent to the server to authenticate RPCs. "
+    "Deprecated. Use --call_creds=access_token=<token>.");
 DEFINE_string(
     ssl_target, "",
     "If not empty, treat the server host name as this for ssl/tls certificate "
@@ -37,10 +38,34 @@
     channel_creds_type, "",
     "The channel creds type: insecure, ssl, gdc (Google Default Credentials) "
     "or alts.");
+DEFINE_string(
+    call_creds, "",
+    "Call credentials to use: none (default), or access_token=<token>. If "
+    "provided, the call creds are composited on top of channel creds.");
 
 namespace grpc {
 namespace testing {
 
+namespace {
+
+const char ACCESS_TOKEN_PREFIX[] = "access_token=";
+constexpr int ACCESS_TOKEN_PREFIX_LEN =
+    sizeof(ACCESS_TOKEN_PREFIX) / sizeof(*ACCESS_TOKEN_PREFIX) - 1;
+
+bool IsAccessToken(const grpc::string& auth) {
+  return auth.length() > ACCESS_TOKEN_PREFIX_LEN &&
+         auth.compare(0, ACCESS_TOKEN_PREFIX_LEN, ACCESS_TOKEN_PREFIX) == 0;
+}
+
+grpc::string AccessToken(const grpc::string& auth) {
+  if (!IsAccessToken(auth)) {
+    return "";
+  }
+  return grpc::string(auth, ACCESS_TOKEN_PREFIX_LEN);
+}
+
+}  // namespace
+
 grpc::string CliCredentials::GetDefaultChannelCredsType() const {
   // Compatibility logic for --enable_ssl.
   if (FLAGS_enable_ssl) {
@@ -59,6 +84,16 @@
   return "insecure";
 }
 
+grpc::string CliCredentials::GetDefaultCallCreds() const {
+  if (!FLAGS_access_token.empty()) {
+    fprintf(stderr,
+            "warning: --access_token is deprecated. Use "
+            "--call_creds=access_token=<token>.\n");
+    return grpc::string("access_token=") + FLAGS_access_token;
+  }
+  return "none";
+}
+
 std::shared_ptr<grpc::ChannelCredentials>
 CliCredentials::GetChannelCredentials() const {
   if (FLAGS_channel_creds_type.compare("insecure") == 0) {
@@ -80,18 +115,30 @@
 
 std::shared_ptr<grpc::CallCredentials> CliCredentials::GetCallCredentials()
     const {
-  if (!FLAGS_access_token.empty()) {
-    if (FLAGS_use_auth) {
-      fprintf(stderr,
-              "warning: use_auth is ignored when access_token is provided.");
-    }
-    return grpc::AccessTokenCredentials(FLAGS_access_token);
+  if (IsAccessToken(FLAGS_call_creds)) {
+    return grpc::AccessTokenCredentials(AccessToken(FLAGS_call_creds));
   }
+  if (FLAGS_call_creds.compare("none") != 0) {
+    // Nothing to do; creds, if any, are baked into the channel.
+    return std::shared_ptr<grpc::CallCredentials>();
+  }
+  fprintf(stderr,
+          "--call_creds=%s invalid; must be none "
+          "or access_token=<token>.\n",
+          FLAGS_call_creds.c_str());
   return std::shared_ptr<grpc::CallCredentials>();
 }
 
 std::shared_ptr<grpc::ChannelCredentials> CliCredentials::GetCredentials()
     const {
+  if (FLAGS_call_creds.empty()) {
+    FLAGS_call_creds = GetDefaultCallCreds();
+  } else if (!FLAGS_access_token.empty() && !IsAccessToken(FLAGS_call_creds)) {
+    fprintf(stderr,
+            "warning: ignoring --access_token because --call_creds "
+            "already set to %s.\n",
+            FLAGS_call_creds.c_str());
+  }
   if (FLAGS_channel_creds_type.empty()) {
     FLAGS_channel_creds_type = GetDefaultChannelCredsType();
   } else if (FLAGS_enable_ssl && FLAGS_channel_creds_type.compare("ssl") != 0) {
@@ -106,7 +153,7 @@
             FLAGS_channel_creds_type.c_str());
   }
   // Legacy transport upgrade logic for insecure requests.
-  if (!FLAGS_access_token.empty() &&
+  if (IsAccessToken(FLAGS_call_creds) &&
       FLAGS_channel_creds_type.compare("insecure") == 0) {
     fprintf(stderr,
             "warning: --channel_creds_type=insecure upgraded to ssl because "
@@ -126,10 +173,14 @@
   return "    --enable_ssl             ; Set whether to use ssl (deprecated)\n"
          "    --use_auth               ; Set whether to create default google"
          " credentials\n"
+         "                             ; (deprecated)\n"
          "    --access_token           ; Set the access token in metadata,"
          " overrides --use_auth\n"
+         "                             ; (deprecated)\n"
          "    --ssl_target             ; Set server host for ssl validation\n"
-         "    --channel_creds_type     ; Set to insecure, ssl, gdc, or alts\n";
+         "    --channel_creds_type     ; Set to insecure, ssl, gdc, or alts\n"
+         "    --call_creds             ; Set to none, or"
+         " access_token=<token>\n";
 }
 
 const grpc::string CliCredentials::GetSslTargetNameOverride() const {
diff --git a/test/cpp/util/cli_credentials.h b/test/cpp/util/cli_credentials.h
index 4636d3c..472c7ab 100644
--- a/test/cpp/util/cli_credentials.h
+++ b/test/cpp/util/cli_credentials.h
@@ -36,6 +36,9 @@
   // Returns the appropriate channel_creds_type value for the set of legacy
   // flag arguments.
   virtual grpc::string GetDefaultChannelCredsType() const;
+  // Returns the appropriate call_creds value for the set of legacy flag
+  // arguments.
+  virtual grpc::string GetDefaultCallCreds() const;
   // Returns the base transport channel credentials. Child classes can override
   // to support additional channel_creds_types unknown to this base class.
   virtual std::shared_ptr<grpc::ChannelCredentials> GetChannelCredentials()
diff --git a/tools/distrib/check_nanopb_output.sh b/tools/distrib/check_nanopb_output.sh
index 6b98619..1c2ef9b 100755
--- a/tools/distrib/check_nanopb_output.sh
+++ b/tools/distrib/check_nanopb_output.sh
@@ -16,6 +16,7 @@
 set -ex
 
 readonly NANOPB_ALTS_TMP_OUTPUT="$(mktemp -d)"
+readonly NANOPB_HEALTH_TMP_OUTPUT="$(mktemp -d)"
 readonly NANOPB_TMP_OUTPUT="$(mktemp -d)"
 readonly PROTOBUF_INSTALL_PREFIX="$(mktemp -d)"
 
@@ -68,6 +69,23 @@
 fi
 
 #
+# checks for health.proto
+#
+readonly HEALTH_GRPC_OUTPUT_PATH='src/cpp/server/health'
+# nanopb-compile the proto to a temp location
+./tools/codegen/core/gen_nano_proto.sh \
+  src/proto/grpc/health/v1/health.proto \
+  "$NANOPB_HEALTH_TMP_OUTPUT" \
+  "$HEALTH_GRPC_OUTPUT_PATH"
+# compare outputs to checked compiled code
+for NANOPB_OUTPUT_FILE in $NANOPB_HEALTH_TMP_OUTPUT/*.pb.*; do
+  if ! diff "$NANOPB_OUTPUT_FILE" "src/cpp/server/health/$(basename $NANOPB_OUTPUT_FILE)"; then
+    echo "Outputs differ: $NANOPB_HEALTH_TMP_OUTPUT vs $HEALTH_GRPC_OUTPUT_PATH"
+    exit 2
+  fi
+done
+
+#
 # Checks for handshaker.proto and transport_security_common.proto
 #
 readonly HANDSHAKER_GRPC_OUTPUT_PATH='src/core/tsi/alts/handshaker'
diff --git a/tools/dockerfile/interoptest/grpc_interop_dart/Dockerfile b/tools/dockerfile/interoptest/grpc_interop_dart/Dockerfile
index bff20b5..8973548 100644
--- a/tools/dockerfile/interoptest/grpc_interop_dart/Dockerfile
+++ b/tools/dockerfile/interoptest/grpc_interop_dart/Dockerfile
@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-FROM google/dart:latest
+FROM google/dart:2.0
 
 # Upgrade Dart to version 2.
 RUN apt-get update && apt-get upgrade -y dart
diff --git a/tools/internal_ci/linux/grpc_asan_on_foundry.sh b/tools/internal_ci/linux/grpc_asan_on_foundry.sh
old mode 100644
new mode 100755
index a6367ad..dfef004
--- a/tools/internal_ci/linux/grpc_asan_on_foundry.sh
+++ b/tools/internal_ci/linux/grpc_asan_on_foundry.sh
@@ -15,5 +15,6 @@
 
 export UPLOAD_TEST_RESULTS=true
 EXTRA_FLAGS="--copt=-gmlt --strip=never --copt=-fsanitize=address --linkopt=-fsanitize=address --test_timeout=3600 --cache_test_results=no"
-github/grpc/tools/internal_ci/linux/grpc_bazel_on_foundry_base.sh "${EXTRA_FLAGS}"
+EXCLUDE_TESTS="--test_tag_filters=-qps_json_driver,-json_run_localhost"
+github/grpc/tools/internal_ci/linux/grpc_bazel_on_foundry_base.sh "${EXTRA_FLAGS}" "${EXCLUDE_TESTS}"
 
diff --git a/tools/internal_ci/linux/grpc_bazel_on_foundry_base.sh b/tools/internal_ci/linux/grpc_bazel_on_foundry_base.sh
index 8b42779..bb2a851 100755
--- a/tools/internal_ci/linux/grpc_bazel_on_foundry_base.sh
+++ b/tools/internal_ci/linux/grpc_bazel_on_foundry_base.sh
@@ -60,7 +60,7 @@
   --platforms=//third_party/toolchains:rbe_ubuntu1604 \
   --test_env=GRPC_VERBOSITY=debug \
   --remote_instance_name=projects/grpc-testing/instances/default_instance \
-  $1 \
+  $@ \
   -- //test/... || FAILED="true"
 
 if [ "$UPLOAD_TEST_RESULTS" != "" ]
diff --git a/tools/internal_ci/linux/grpc_tsan_on_foundry.sh b/tools/internal_ci/linux/grpc_tsan_on_foundry.sh
index 2ba7d46..366b5cb 100644
--- a/tools/internal_ci/linux/grpc_tsan_on_foundry.sh
+++ b/tools/internal_ci/linux/grpc_tsan_on_foundry.sh
@@ -15,4 +15,5 @@
 
 export UPLOAD_TEST_RESULTS=true
 EXTRA_FLAGS="--copt=-gmlt --strip=never --copt=-fsanitize=thread --linkopt=-fsanitize=thread --test_timeout=3600 --action_env=TSAN_OPTIONS=suppressions=test/core/util/tsan_suppressions.txt:halt_on_error=1:second_deadlock_stack=1 --cache_test_results=no"
-github/grpc/tools/internal_ci/linux/grpc_bazel_on_foundry_base.sh "${EXTRA_FLAGS}"
+EXCLUDE_TESTS="--test_tag_filters=-qps_json_driver,-json_run_localhost"
+github/grpc/tools/internal_ci/linux/grpc_bazel_on_foundry_base.sh "${EXTRA_FLAGS}" "${EXCLUDE_TESTS}"
diff --git a/tools/internal_ci/linux/pull_request/grpc_asan_on_foundry.sh b/tools/internal_ci/linux/pull_request/grpc_asan_on_foundry.sh
index 2aebb65..39c991f 100644
--- a/tools/internal_ci/linux/pull_request/grpc_asan_on_foundry.sh
+++ b/tools/internal_ci/linux/pull_request/grpc_asan_on_foundry.sh
@@ -14,5 +14,6 @@
 # limitations under the License.
 
 EXTRA_FLAGS="--copt=-gmlt --strip=never --copt=-fsanitize=address --linkopt=-fsanitize=address --test_timeout=3600"
-github/grpc/tools/internal_ci/linux/grpc_bazel_on_foundry_base.sh "${EXTRA_FLAGS}"
+EXCLUDE_TESTS="--test_tag_filters=-qps_json_driver,-json_run_localhost"
+github/grpc/tools/internal_ci/linux/grpc_bazel_on_foundry_base.sh "${EXTRA_FLAGS}" "${EXCLUDE_TESTS}"
 
diff --git a/tools/internal_ci/linux/pull_request/grpc_tsan_on_foundry.sh b/tools/internal_ci/linux/pull_request/grpc_tsan_on_foundry.sh
index edd8f92..3dee115 100644
--- a/tools/internal_ci/linux/pull_request/grpc_tsan_on_foundry.sh
+++ b/tools/internal_ci/linux/pull_request/grpc_tsan_on_foundry.sh
@@ -14,4 +14,5 @@
 # limitations under the License.
 
 EXTRA_FLAGS="--copt=-gmlt --strip=never --copt=-fsanitize=thread --linkopt=-fsanitize=thread --test_timeout=3600 --action_env=TSAN_OPTIONS=suppressions=test/core/util/tsan_suppressions.txt:halt_on_error=1:second_deadlock_stack=1"
-github/grpc/tools/internal_ci/linux/grpc_bazel_on_foundry_base.sh "${EXTRA_FLAGS}"
+EXCLUDE_TESTS="--test_tag_filters=-qps_json_driver,-json_run_localhost"
+github/grpc/tools/internal_ci/linux/grpc_bazel_on_foundry_base.sh "${EXTRA_FLAGS}" "${EXCLUDE_TESTS}"