Merge branch 'master' of https://github.com/grpc/grpc into connect-auth-doc
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 266f2c0..f71563a 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -394,6 +394,7 @@
 add_dependencies(buildtests_c bdp_estimator_test)
 add_dependencies(buildtests_c bin_decoder_test)
 add_dependencies(buildtests_c bin_encoder_test)
+add_dependencies(buildtests_c byte_stream_test)
 add_dependencies(buildtests_c census_context_test)
 add_dependencies(buildtests_c census_intrusive_hash_map_test)
 add_dependencies(buildtests_c census_resource_test)
@@ -4785,6 +4786,37 @@
 endif (gRPC_BUILD_TESTS)
 if (gRPC_BUILD_TESTS)
 
+add_executable(byte_stream_test
+  test/core/transport/byte_stream_test.c
+)
+
+
+target_include_directories(byte_stream_test
+  PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
+  PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
+  PRIVATE ${BORINGSSL_ROOT_DIR}/include
+  PRIVATE ${PROTOBUF_ROOT_DIR}/src
+  PRIVATE ${BENCHMARK_ROOT_DIR}/include
+  PRIVATE ${ZLIB_ROOT_DIR}
+  PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/zlib
+  PRIVATE ${CARES_BUILD_INCLUDE_DIR}
+  PRIVATE ${CARES_INCLUDE_DIR}
+  PRIVATE ${CARES_PLATFORM_INCLUDE_DIR}
+  PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/cares/cares
+  PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/gflags/include
+)
+
+target_link_libraries(byte_stream_test
+  ${_gRPC_ALLTARGETS_LIBRARIES}
+  grpc_test_util
+  grpc
+  gpr_test_util
+  gpr
+)
+
+endif (gRPC_BUILD_TESTS)
+if (gRPC_BUILD_TESTS)
+
 add_executable(census_context_test
   test/core/census/context_test.c
 )
diff --git a/Makefile b/Makefile
index 7b53024..98cfb04 100644
--- a/Makefile
+++ b/Makefile
@@ -954,6 +954,7 @@
 bdp_estimator_test: $(BINDIR)/$(CONFIG)/bdp_estimator_test
 bin_decoder_test: $(BINDIR)/$(CONFIG)/bin_decoder_test
 bin_encoder_test: $(BINDIR)/$(CONFIG)/bin_encoder_test
+byte_stream_test: $(BINDIR)/$(CONFIG)/byte_stream_test
 census_context_test: $(BINDIR)/$(CONFIG)/census_context_test
 census_intrusive_hash_map_test: $(BINDIR)/$(CONFIG)/census_intrusive_hash_map_test
 census_resource_test: $(BINDIR)/$(CONFIG)/census_resource_test
@@ -1345,6 +1346,7 @@
   $(BINDIR)/$(CONFIG)/bdp_estimator_test \
   $(BINDIR)/$(CONFIG)/bin_decoder_test \
   $(BINDIR)/$(CONFIG)/bin_encoder_test \
+  $(BINDIR)/$(CONFIG)/byte_stream_test \
   $(BINDIR)/$(CONFIG)/census_context_test \
   $(BINDIR)/$(CONFIG)/census_intrusive_hash_map_test \
   $(BINDIR)/$(CONFIG)/census_resource_test \
@@ -1746,6 +1748,8 @@
 	$(Q) $(BINDIR)/$(CONFIG)/bin_decoder_test || ( echo test bin_decoder_test failed ; exit 1 )
 	$(E) "[RUN]     Testing bin_encoder_test"
 	$(Q) $(BINDIR)/$(CONFIG)/bin_encoder_test || ( echo test bin_encoder_test failed ; exit 1 )
+	$(E) "[RUN]     Testing byte_stream_test"
+	$(Q) $(BINDIR)/$(CONFIG)/byte_stream_test || ( echo test byte_stream_test failed ; exit 1 )
 	$(E) "[RUN]     Testing census_context_test"
 	$(Q) $(BINDIR)/$(CONFIG)/census_context_test || ( echo test census_context_test failed ; exit 1 )
 	$(E) "[RUN]     Testing census_intrusive_hash_map_test"
@@ -8411,6 +8415,38 @@
 endif
 
 
+BYTE_STREAM_TEST_SRC = \
+    test/core/transport/byte_stream_test.c \
+
+BYTE_STREAM_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(BYTE_STREAM_TEST_SRC))))
+ifeq ($(NO_SECURE),true)
+
+# You can't build secure targets if you don't have OpenSSL.
+
+$(BINDIR)/$(CONFIG)/byte_stream_test: openssl_dep_error
+
+else
+
+
+
+$(BINDIR)/$(CONFIG)/byte_stream_test: $(BYTE_STREAM_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+	$(E) "[LD]      Linking $@"
+	$(Q) mkdir -p `dirname $@`
+	$(Q) $(LD) $(LDFLAGS) $(BYTE_STREAM_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/byte_stream_test
+
+endif
+
+$(OBJDIR)/$(CONFIG)/test/core/transport/byte_stream_test.o:  $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+
+deps_byte_stream_test: $(BYTE_STREAM_TEST_OBJS:.o=.dep)
+
+ifneq ($(NO_SECURE),true)
+ifneq ($(NO_DEPS),true)
+-include $(BYTE_STREAM_TEST_OBJS:.o=.dep)
+endif
+endif
+
+
 CENSUS_CONTEXT_TEST_SRC = \
     test/core/census/context_test.c \
 
diff --git a/build.yaml b/build.yaml
index 198467b..9c2504a 100644
--- a/build.yaml
+++ b/build.yaml
@@ -1702,6 +1702,16 @@
   deps:
   - grpc_test_util
   - grpc
+- name: byte_stream_test
+  build: test
+  language: c
+  src:
+  - test/core/transport/byte_stream_test.c
+  deps:
+  - grpc_test_util
+  - grpc
+  - gpr_test_util
+  - gpr
 - name: census_context_test
   build: test
   language: c
diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec
index 90580c5..202af58 100644
--- a/gRPC-Core.podspec
+++ b/gRPC-Core.podspec
@@ -176,7 +176,7 @@
     ss.header_mappings_dir = '.'
     ss.libraries = 'z'
     ss.dependency "#{s.name}/Interface", version
-    ss.dependency 'BoringSSL', '~> 8.0'
+    ss.dependency 'BoringSSL', '~> 9.0'
     ss.dependency 'nanopb', '~> 0.3'
 
     # To save you from scrolling, this is the last part of the podspec.
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c
index 52c6e38..568bb2b 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c
@@ -88,7 +88,6 @@
   // Record call finished, optionally setting client_failed_to_send and
   // received.
   grpc_grpclb_client_stats_add_call_finished(
-      false /* drop_for_rate_limiting */, false /* drop_for_load_balancing */,
       !calld->send_initial_metadata_succeeded /* client_failed_to_send */,
       calld->recv_initial_metadata_succeeded /* known_received */,
       calld->client_stats);
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
index ebce801..bb9217d 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
@@ -416,9 +416,7 @@
 
 static bool is_server_valid(const grpc_grpclb_server *server, size_t idx,
                             bool log) {
-  if (server->drop_for_rate_limiting || server->drop_for_load_balancing) {
-    return false;
-  }
+  if (server->drop) return false;
   const grpc_grpclb_ip_address *ip = &server->ip_address;
   if (server->port >> 16 != 0) {
     if (log) {
@@ -462,7 +460,7 @@
 static void parse_server(const grpc_grpclb_server *server,
                          grpc_resolved_address *addr) {
   memset(addr, 0, sizeof(*addr));
-  if (server->drop_for_rate_limiting || server->drop_for_load_balancing) return;
+  if (server->drop) return;
   const uint16_t netorder_port = htons((uint16_t)server->port);
   /* the addresses are given in binary format (a in(6)_addr struct) in
    * server->ip_address.bytes. */
@@ -610,7 +608,7 @@
   if (glb_policy->serverlist_index == glb_policy->serverlist->num_servers) {
     glb_policy->serverlist_index = 0;  // Wrap-around.
   }
-  if (server->drop_for_rate_limiting || server->drop_for_load_balancing) {
+  if (server->drop) {
     // Not using the RR policy, so unref it.
     if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
       gpr_log(GPR_INFO, "Unreffing RR for drop (0x%" PRIxPTR ")",
@@ -622,11 +620,8 @@
     // the client_load_reporting filter, because we do not create a
     // subchannel call (and therefore no client_load_reporting filter)
     // for dropped calls.
-    grpc_grpclb_client_stats_add_call_started(wc_arg->client_stats);
-    grpc_grpclb_client_stats_add_call_finished(
-        server->drop_for_rate_limiting, server->drop_for_load_balancing,
-        false /* failed_to_send */, false /* known_received */,
-        wc_arg->client_stats);
+    grpc_grpclb_client_stats_add_call_dropped_locked(server->load_balance_token,
+                                                     wc_arg->client_stats);
     grpc_grpclb_client_stats_unref(wc_arg->client_stats);
     if (force_async) {
       GPR_ASSERT(wc_arg->wrapped_closure != NULL);
@@ -715,7 +710,6 @@
     return;
   }
   glb_policy->rr_policy = new_rr_policy;
-
   grpc_error *rr_state_error = NULL;
   const grpc_connectivity_state rr_state =
       grpc_lb_policy_check_connectivity_locked(exec_ctx, glb_policy->rr_policy,
@@ -741,7 +735,7 @@
   rr_connectivity->state = rr_state;
 
   /* Subscribe to changes to the connectivity of the new RR */
-  GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "rr_connectivity_sched");
+  GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "glb_rr_connectivity_cb");
   grpc_lb_policy_notify_on_state_change_locked(exec_ctx, glb_policy->rr_policy,
                                                &rr_connectivity->state,
                                                &rr_connectivity->on_change);
@@ -806,32 +800,31 @@
                                                void *arg, grpc_error *error) {
   rr_connectivity_data *rr_connectivity = arg;
   glb_lb_policy *glb_policy = rr_connectivity->glb_policy;
-
-  const bool shutting_down = glb_policy->shutting_down;
-  bool unref_needed = false;
-  GRPC_ERROR_REF(error);
-
-  if (rr_connectivity->state == GRPC_CHANNEL_SHUTDOWN || shutting_down) {
-    /* RR policy shutting down. Don't renew subscription and free the arg of
-     * this callback. In addition  we need to stash away the current policy to
-     * be UNREF'd after releasing the lock. Otherwise, if the UNREF is the last
-     * one, the policy would be destroyed, alongside the lock, which would
-     * result in a use-after-free */
-    unref_needed = true;
-    gpr_free(rr_connectivity);
-  } else { /* rr state != SHUTDOWN && !shutting down: biz as usual */
-    update_lb_connectivity_status_locked(
-        exec_ctx, glb_policy, rr_connectivity->state, GRPC_ERROR_REF(error));
-    /* Resubscribe. Reuse the "rr_connectivity_cb" weak ref. */
-    grpc_lb_policy_notify_on_state_change_locked(
-        exec_ctx, glb_policy->rr_policy, &rr_connectivity->state,
-        &rr_connectivity->on_change);
-  }
-  if (unref_needed) {
+  if (glb_policy->shutting_down) {
     GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
-                              "rr_connectivity_cb");
+                              "glb_rr_connectivity_cb");
+    gpr_free(rr_connectivity);
+    return;
   }
-  GRPC_ERROR_UNREF(error);
+  if (rr_connectivity->state == GRPC_CHANNEL_SHUTDOWN) {
+    /* An RR policy that has transitioned into the SHUTDOWN connectivity state
+     * should not be considered for picks or updates: the SHUTDOWN state is a
+     * sink, policies can't transition back from it. .*/
+    GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy,
+                         "rr_connectivity_shutdown");
+    glb_policy->rr_policy = NULL;
+    GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
+                              "glb_rr_connectivity_cb");
+    gpr_free(rr_connectivity);
+    return;
+  }
+  /* rr state != SHUTDOWN && !glb_policy->shutting down: biz as usual */
+  update_lb_connectivity_status_locked(
+      exec_ctx, glb_policy, rr_connectivity->state, GRPC_ERROR_REF(error));
+  /* Resubscribe. Reuse the "glb_rr_connectivity_cb" weak ref. */
+  grpc_lb_policy_notify_on_state_change_locked(exec_ctx, glb_policy->rr_policy,
+                                               &rr_connectivity->state,
+                                               &rr_connectivity->on_change);
 }
 
 static void destroy_balancer_name(grpc_exec_ctx *exec_ctx,
@@ -995,7 +988,6 @@
     gpr_free(glb_policy);
     return NULL;
   }
-
   GRPC_CLOSURE_INIT(&glb_policy->lb_channel_on_connectivity_changed,
                     glb_lb_channel_on_connectivity_changed_cb, glb_policy,
                     grpc_combiner_scheduler(args->combiner));
@@ -1052,7 +1044,7 @@
   glb_policy->pending_picks = NULL;
   pending_ping *pping = glb_policy->pending_pings;
   glb_policy->pending_pings = NULL;
-  if (glb_policy->rr_policy) {
+  if (glb_policy->rr_policy != NULL) {
     GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown");
   }
   // We destroy the LB channel here because
@@ -1309,15 +1301,14 @@
 }
 
 static bool load_report_counters_are_zero(grpc_grpclb_request *request) {
+  grpc_grpclb_dropped_call_counts *drop_entries =
+      request->client_stats.calls_finished_with_drop.arg;
   return request->client_stats.num_calls_started == 0 &&
          request->client_stats.num_calls_finished == 0 &&
-         request->client_stats.num_calls_finished_with_drop_for_rate_limiting ==
-             0 &&
-         request->client_stats
-                 .num_calls_finished_with_drop_for_load_balancing == 0 &&
          request->client_stats.num_calls_finished_with_client_failed_to_send ==
              0 &&
-         request->client_stats.num_calls_finished_known_received == 0;
+         request->client_stats.num_calls_finished_known_received == 0 &&
+         (drop_entries == NULL || drop_entries->num_entries == 0);
 }
 
 static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg,
@@ -1332,7 +1323,7 @@
   // Construct message payload.
   GPR_ASSERT(glb_policy->client_load_report_payload == NULL);
   grpc_grpclb_request *request =
-      grpc_grpclb_load_report_request_create(glb_policy->client_stats);
+      grpc_grpclb_load_report_request_create_locked(glb_policy->client_stats);
   // Skip client load report if the counters were all zero in the last
   // report and they are still zero in this one.
   if (load_report_counters_are_zero(request)) {
@@ -1778,7 +1769,8 @@
 
   if (!glb_policy->watching_lb_channel) {
     // Watch the LB channel connectivity for connection.
-    glb_policy->lb_channel_connectivity = GRPC_CHANNEL_INIT;
+    glb_policy->lb_channel_connectivity = grpc_channel_check_connectivity_state(
+        glb_policy->lb_channel, true /* try to connect */);
     grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element(
         grpc_channel_get_channel_stack(glb_policy->lb_channel));
     GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c
index c762443..5b62623 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c
@@ -18,8 +18,11 @@
 
 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h"
 
+#include <string.h>
+
 #include <grpc/support/alloc.h>
 #include <grpc/support/atm.h>
+#include <grpc/support/string_util.h>
 #include <grpc/support/sync.h>
 #include <grpc/support/useful.h>
 
@@ -29,10 +32,11 @@
 
 struct grpc_grpclb_client_stats {
   gpr_refcount refs;
+  // This field must only be accessed via *_locked() methods.
+  grpc_grpclb_dropped_call_counts* drop_token_counts;
+  // These fields may be accessed from multiple threads at a time.
   gpr_atm num_calls_started;
   gpr_atm num_calls_finished;
-  gpr_atm num_calls_finished_with_drop_for_rate_limiting;
-  gpr_atm num_calls_finished_with_drop_for_load_balancing;
   gpr_atm num_calls_finished_with_client_failed_to_send;
   gpr_atm num_calls_finished_known_received;
 };
@@ -51,6 +55,7 @@
 
 void grpc_grpclb_client_stats_unref(grpc_grpclb_client_stats* client_stats) {
   if (gpr_unref(&client_stats->refs)) {
+    grpc_grpclb_dropped_call_counts_destroy(client_stats->drop_token_counts);
     gpr_free(client_stats);
   }
 }
@@ -61,21 +66,9 @@
 }
 
 void grpc_grpclb_client_stats_add_call_finished(
-    bool finished_with_drop_for_rate_limiting,
-    bool finished_with_drop_for_load_balancing,
     bool finished_with_client_failed_to_send, bool finished_known_received,
     grpc_grpclb_client_stats* client_stats) {
   gpr_atm_full_fetch_add(&client_stats->num_calls_finished, (gpr_atm)1);
-  if (finished_with_drop_for_rate_limiting) {
-    gpr_atm_full_fetch_add(
-        &client_stats->num_calls_finished_with_drop_for_rate_limiting,
-        (gpr_atm)1);
-  }
-  if (finished_with_drop_for_load_balancing) {
-    gpr_atm_full_fetch_add(
-        &client_stats->num_calls_finished_with_drop_for_load_balancing,
-        (gpr_atm)1);
-  }
   if (finished_with_client_failed_to_send) {
     gpr_atm_full_fetch_add(
         &client_stats->num_calls_finished_with_client_failed_to_send,
@@ -87,32 +80,70 @@
   }
 }
 
+void grpc_grpclb_client_stats_add_call_dropped_locked(
+    char* token, grpc_grpclb_client_stats* client_stats) {
+  // Increment num_calls_started and num_calls_finished.
+  gpr_atm_full_fetch_add(&client_stats->num_calls_started, (gpr_atm)1);
+  gpr_atm_full_fetch_add(&client_stats->num_calls_finished, (gpr_atm)1);
+  // Record the drop.
+  if (client_stats->drop_token_counts == NULL) {
+    client_stats->drop_token_counts =
+        gpr_zalloc(sizeof(grpc_grpclb_dropped_call_counts));
+  }
+  grpc_grpclb_dropped_call_counts* drop_token_counts =
+      client_stats->drop_token_counts;
+  for (size_t i = 0; i < drop_token_counts->num_entries; ++i) {
+    if (strcmp(drop_token_counts->token_counts[i].token, token) == 0) {
+      ++drop_token_counts->token_counts[i].count;
+      return;
+    }
+  }
+  // Not found, so add a new entry.  We double the size of the array each time.
+  size_t new_num_entries = 2;
+  while (new_num_entries < drop_token_counts->num_entries + 1) {
+    new_num_entries *= 2;
+  }
+  drop_token_counts->token_counts =
+      gpr_realloc(drop_token_counts->token_counts,
+                  new_num_entries * sizeof(grpc_grpclb_drop_token_count));
+  grpc_grpclb_drop_token_count* new_entry =
+      &drop_token_counts->token_counts[drop_token_counts->num_entries++];
+  new_entry->token = gpr_strdup(token);
+  new_entry->count = 1;
+}
+
 static void atomic_get_and_reset_counter(int64_t* value, gpr_atm* counter) {
   *value = (int64_t)gpr_atm_acq_load(counter);
   gpr_atm_full_fetch_add(counter, (gpr_atm)(-*value));
 }
 
-void grpc_grpclb_client_stats_get(
+void grpc_grpclb_client_stats_get_locked(
     grpc_grpclb_client_stats* client_stats, int64_t* num_calls_started,
     int64_t* num_calls_finished,
-    int64_t* num_calls_finished_with_drop_for_rate_limiting,
-    int64_t* num_calls_finished_with_drop_for_load_balancing,
     int64_t* num_calls_finished_with_client_failed_to_send,
-    int64_t* num_calls_finished_known_received) {
+    int64_t* num_calls_finished_known_received,
+    grpc_grpclb_dropped_call_counts** drop_token_counts) {
   atomic_get_and_reset_counter(num_calls_started,
                                &client_stats->num_calls_started);
   atomic_get_and_reset_counter(num_calls_finished,
                                &client_stats->num_calls_finished);
   atomic_get_and_reset_counter(
-      num_calls_finished_with_drop_for_rate_limiting,
-      &client_stats->num_calls_finished_with_drop_for_rate_limiting);
-  atomic_get_and_reset_counter(
-      num_calls_finished_with_drop_for_load_balancing,
-      &client_stats->num_calls_finished_with_drop_for_load_balancing);
-  atomic_get_and_reset_counter(
       num_calls_finished_with_client_failed_to_send,
       &client_stats->num_calls_finished_with_client_failed_to_send);
   atomic_get_and_reset_counter(
       num_calls_finished_known_received,
       &client_stats->num_calls_finished_known_received);
+  *drop_token_counts = client_stats->drop_token_counts;
+  client_stats->drop_token_counts = NULL;
+}
+
+void grpc_grpclb_dropped_call_counts_destroy(
+    grpc_grpclb_dropped_call_counts* drop_entries) {
+  if (drop_entries != NULL) {
+    for (size_t i = 0; i < drop_entries->num_entries; ++i) {
+      gpr_free(drop_entries->token_counts[i].token);
+    }
+    gpr_free(drop_entries->token_counts);
+    gpr_free(drop_entries);
+  }
 }
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h
index 4bb47d5..c51e2a4 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h
@@ -25,6 +25,16 @@
 
 typedef struct grpc_grpclb_client_stats grpc_grpclb_client_stats;
 
+typedef struct {
+  char* token;
+  int64_t count;
+} grpc_grpclb_drop_token_count;
+
+typedef struct {
+  grpc_grpclb_drop_token_count* token_counts;
+  size_t num_entries;
+} grpc_grpclb_dropped_call_counts;
+
 grpc_grpclb_client_stats* grpc_grpclb_client_stats_create();
 grpc_grpclb_client_stats* grpc_grpclb_client_stats_ref(
     grpc_grpclb_client_stats* client_stats);
@@ -33,18 +43,23 @@
 void grpc_grpclb_client_stats_add_call_started(
     grpc_grpclb_client_stats* client_stats);
 void grpc_grpclb_client_stats_add_call_finished(
-    bool finished_with_drop_for_rate_limiting,
-    bool finished_with_drop_for_load_balancing,
     bool finished_with_client_failed_to_send, bool finished_known_received,
     grpc_grpclb_client_stats* client_stats);
 
-void grpc_grpclb_client_stats_get(
+// This method is not thread-safe; caller must synchronize.
+void grpc_grpclb_client_stats_add_call_dropped_locked(
+    char* token, grpc_grpclb_client_stats* client_stats);
+
+// This method is not thread-safe; caller must synchronize.
+void grpc_grpclb_client_stats_get_locked(
     grpc_grpclb_client_stats* client_stats, int64_t* num_calls_started,
     int64_t* num_calls_finished,
-    int64_t* num_calls_finished_with_drop_for_rate_limiting,
-    int64_t* num_calls_finished_with_drop_for_load_balancing,
     int64_t* num_calls_finished_with_client_failed_to_send,
-    int64_t* num_calls_finished_known_received);
+    int64_t* num_calls_finished_known_received,
+    grpc_grpclb_dropped_call_counts** drop_token_counts);
+
+void grpc_grpclb_dropped_call_counts_destroy(
+    grpc_grpclb_dropped_call_counts* drop_entries);
 
 #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_CLIENT_STATS_H \
           */
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c
index bec7c97..6fa29f3 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c
@@ -76,7 +76,33 @@
   timestamp_pb->nanos = timestamp.tv_nsec;
 }
 
-grpc_grpclb_request *grpc_grpclb_load_report_request_create(
+static bool encode_string(pb_ostream_t *stream, const pb_field_t *field,
+                          void *const *arg) {
+  char *str = *arg;
+  if (!pb_encode_tag_for_field(stream, field)) return false;
+  return pb_encode_string(stream, (uint8_t *)str, strlen(str));
+}
+
+static bool encode_drops(pb_ostream_t *stream, const pb_field_t *field,
+                         void *const *arg) {
+  grpc_grpclb_dropped_call_counts *drop_entries = *arg;
+  if (drop_entries == NULL) return true;
+  for (size_t i = 0; i < drop_entries->num_entries; ++i) {
+    if (!pb_encode_tag_for_field(stream, field)) return false;
+    grpc_lb_v1_ClientStatsPerToken drop_message;
+    drop_message.load_balance_token.funcs.encode = encode_string;
+    drop_message.load_balance_token.arg = drop_entries->token_counts[i].token;
+    drop_message.has_num_calls = true;
+    drop_message.num_calls = drop_entries->token_counts[i].count;
+    if (!pb_encode_submessage(stream, grpc_lb_v1_ClientStatsPerToken_fields,
+                              &drop_message)) {
+      return false;
+    }
+  }
+  return true;
+}
+
+grpc_grpclb_request *grpc_grpclb_load_report_request_create_locked(
     grpc_grpclb_client_stats *client_stats) {
   grpc_grpclb_request *req = gpr_zalloc(sizeof(grpc_grpclb_request));
   req->has_client_stats = true;
@@ -84,18 +110,17 @@
   populate_timestamp(gpr_now(GPR_CLOCK_REALTIME), &req->client_stats.timestamp);
   req->client_stats.has_num_calls_started = true;
   req->client_stats.has_num_calls_finished = true;
-  req->client_stats.has_num_calls_finished_with_drop_for_rate_limiting = true;
-  req->client_stats.has_num_calls_finished_with_drop_for_load_balancing = true;
   req->client_stats.has_num_calls_finished_with_client_failed_to_send = true;
   req->client_stats.has_num_calls_finished_with_client_failed_to_send = true;
   req->client_stats.has_num_calls_finished_known_received = true;
-  grpc_grpclb_client_stats_get(
+  req->client_stats.calls_finished_with_drop.funcs.encode = encode_drops;
+  grpc_grpclb_client_stats_get_locked(
       client_stats, &req->client_stats.num_calls_started,
       &req->client_stats.num_calls_finished,
-      &req->client_stats.num_calls_finished_with_drop_for_rate_limiting,
-      &req->client_stats.num_calls_finished_with_drop_for_load_balancing,
       &req->client_stats.num_calls_finished_with_client_failed_to_send,
-      &req->client_stats.num_calls_finished_known_received);
+      &req->client_stats.num_calls_finished_known_received,
+      (grpc_grpclb_dropped_call_counts **)&req->client_stats
+          .calls_finished_with_drop.arg);
   return req;
 }
 
@@ -117,6 +142,11 @@
 }
 
 void grpc_grpclb_request_destroy(grpc_grpclb_request *request) {
+  if (request->has_client_stats) {
+    grpc_grpclb_dropped_call_counts *drop_entries =
+        request->client_stats.calls_finished_with_drop.arg;
+    grpc_grpclb_dropped_call_counts_destroy(drop_entries);
+  }
   gpr_free(request);
 }
 
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
index ef8d563..c4a9849 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
@@ -44,7 +44,7 @@
 
 /** Create a request for a gRPC LB service under \a lb_service_name */
 grpc_grpclb_request *grpc_grpclb_request_create(const char *lb_service_name);
-grpc_grpclb_request *grpc_grpclb_load_report_request_create(
+grpc_grpclb_request *grpc_grpclb_load_report_request_create_locked(
     grpc_grpclb_client_stats *client_stats);
 
 /** Protocol Buffers v3-encode \a request */
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
index fb119c7..6a5d54c 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
@@ -33,14 +33,19 @@
     PB_LAST_FIELD
 };
 
-const pb_field_t grpc_lb_v1_ClientStats_fields[8] = {
+const pb_field_t grpc_lb_v1_ClientStatsPerToken_fields[3] = {
+    PB_FIELD(  1, STRING  , OPTIONAL, CALLBACK, FIRST, grpc_lb_v1_ClientStatsPerToken, load_balance_token, load_balance_token, 0),
+    PB_FIELD(  2, INT64   , OPTIONAL, STATIC  , OTHER, grpc_lb_v1_ClientStatsPerToken, num_calls, load_balance_token, 0),
+    PB_LAST_FIELD
+};
+
+const pb_field_t grpc_lb_v1_ClientStats_fields[7] = {
     PB_FIELD(  1, MESSAGE , OPTIONAL, STATIC  , FIRST, grpc_lb_v1_ClientStats, timestamp, timestamp, &grpc_lb_v1_Timestamp_fields),
     PB_FIELD(  2, INT64   , OPTIONAL, STATIC  , OTHER, grpc_lb_v1_ClientStats, num_calls_started, timestamp, 0),
     PB_FIELD(  3, INT64   , OPTIONAL, STATIC  , OTHER, grpc_lb_v1_ClientStats, num_calls_finished, num_calls_started, 0),
-    PB_FIELD(  4, INT64   , OPTIONAL, STATIC  , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_with_drop_for_rate_limiting, num_calls_finished, 0),
-    PB_FIELD(  5, INT64   , OPTIONAL, STATIC  , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_with_drop_for_load_balancing, num_calls_finished_with_drop_for_rate_limiting, 0),
-    PB_FIELD(  6, INT64   , OPTIONAL, STATIC  , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_with_client_failed_to_send, num_calls_finished_with_drop_for_load_balancing, 0),
+    PB_FIELD(  6, INT64   , OPTIONAL, STATIC  , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_with_client_failed_to_send, num_calls_finished, 0),
     PB_FIELD(  7, INT64   , OPTIONAL, STATIC  , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_known_received, num_calls_finished_with_client_failed_to_send, 0),
+    PB_FIELD(  8, MESSAGE , REPEATED, CALLBACK, OTHER, grpc_lb_v1_ClientStats, calls_finished_with_drop, num_calls_finished_known_received, &grpc_lb_v1_ClientStatsPerToken_fields),
     PB_LAST_FIELD
 };
 
@@ -62,12 +67,11 @@
     PB_LAST_FIELD
 };
 
-const pb_field_t grpc_lb_v1_Server_fields[6] = {
+const pb_field_t grpc_lb_v1_Server_fields[5] = {
     PB_FIELD(  1, BYTES   , OPTIONAL, STATIC  , FIRST, grpc_lb_v1_Server, ip_address, ip_address, 0),
     PB_FIELD(  2, INT32   , OPTIONAL, STATIC  , OTHER, grpc_lb_v1_Server, port, ip_address, 0),
     PB_FIELD(  3, STRING  , OPTIONAL, STATIC  , OTHER, grpc_lb_v1_Server, load_balance_token, port, 0),
-    PB_FIELD(  4, BOOL    , OPTIONAL, STATIC  , OTHER, grpc_lb_v1_Server, drop_for_rate_limiting, load_balance_token, 0),
-    PB_FIELD(  5, BOOL    , OPTIONAL, STATIC  , OTHER, grpc_lb_v1_Server, drop_for_load_balancing, drop_for_rate_limiting, 0),
+    PB_FIELD(  4, BOOL    , OPTIONAL, STATIC  , OTHER, grpc_lb_v1_Server, drop, load_balance_token, 0),
     PB_LAST_FIELD
 };
 
@@ -81,7 +85,7 @@
  * numbers or field sizes that are larger than what can fit in 8 or 16 bit
  * field descriptors.
  */
-PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 65536 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 65536 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 65536 && pb_membersize(grpc_lb_v1_ServerList, servers) < 65536 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 65536), YOU_MUST_DEFINE_PB_FIELD_32BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server)
+PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 65536 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 65536 && pb_membersize(grpc_lb_v1_ClientStats, calls_finished_with_drop) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 65536 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 65536 && pb_membersize(grpc_lb_v1_ServerList, servers) < 65536 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 65536), YOU_MUST_DEFINE_PB_FIELD_32BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStatsPerToken_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server)
 #endif
 
 #if !defined(PB_FIELD_16BIT) && !defined(PB_FIELD_32BIT)
@@ -92,7 +96,7 @@
  * numbers or field sizes that are larger than what can fit in the default
  * 8 bit descriptors.
  */
-PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 256 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 256 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 256 && pb_membersize(grpc_lb_v1_ServerList, servers) < 256 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 256), YOU_MUST_DEFINE_PB_FIELD_16BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server)
+PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 256 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 256 && pb_membersize(grpc_lb_v1_ClientStats, calls_finished_with_drop) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 256 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 256 && pb_membersize(grpc_lb_v1_ServerList, servers) < 256 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 256), YOU_MUST_DEFINE_PB_FIELD_16BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStatsPerToken_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server)
 #endif
 
 
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
index d3ae919..93333d1 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
@@ -14,6 +14,13 @@
 #endif
 
 /* Struct definitions */
+typedef struct _grpc_lb_v1_ClientStatsPerToken {
+    pb_callback_t load_balance_token;
+    bool has_num_calls;
+    int64_t num_calls;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_ClientStatsPerToken) */
+} grpc_lb_v1_ClientStatsPerToken;
+
 typedef struct _grpc_lb_v1_Duration {
     bool has_seconds;
     int64_t seconds;
@@ -36,10 +43,8 @@
     int32_t port;
     bool has_load_balance_token;
     char load_balance_token[50];
-    bool has_drop_for_rate_limiting;
-    bool drop_for_rate_limiting;
-    bool has_drop_for_load_balancing;
-    bool drop_for_load_balancing;
+    bool has_drop;
+    bool drop;
 /* @@protoc_insertion_point(struct:grpc_lb_v1_Server) */
 } grpc_lb_v1_Server;
 
@@ -58,14 +63,11 @@
     int64_t num_calls_started;
     bool has_num_calls_finished;
     int64_t num_calls_finished;
-    bool has_num_calls_finished_with_drop_for_rate_limiting;
-    int64_t num_calls_finished_with_drop_for_rate_limiting;
-    bool has_num_calls_finished_with_drop_for_load_balancing;
-    int64_t num_calls_finished_with_drop_for_load_balancing;
     bool has_num_calls_finished_with_client_failed_to_send;
     int64_t num_calls_finished_with_client_failed_to_send;
     bool has_num_calls_finished_known_received;
     int64_t num_calls_finished_known_received;
+    pb_callback_t calls_finished_with_drop;
 /* @@protoc_insertion_point(struct:grpc_lb_v1_ClientStats) */
 } grpc_lb_v1_ClientStats;
 
@@ -107,39 +109,41 @@
 #define grpc_lb_v1_Timestamp_init_default        {false, 0, false, 0}
 #define grpc_lb_v1_LoadBalanceRequest_init_default {false, grpc_lb_v1_InitialLoadBalanceRequest_init_default, false, grpc_lb_v1_ClientStats_init_default}
 #define grpc_lb_v1_InitialLoadBalanceRequest_init_default {false, ""}
-#define grpc_lb_v1_ClientStats_init_default      {false, grpc_lb_v1_Timestamp_init_default, false, 0, false, 0, false, 0, false, 0, false, 0, false, 0}
+#define grpc_lb_v1_ClientStatsPerToken_init_default {{{NULL}, NULL}, false, 0}
+#define grpc_lb_v1_ClientStats_init_default      {false, grpc_lb_v1_Timestamp_init_default, false, 0, false, 0, false, 0, false, 0, {{NULL}, NULL}}
 #define grpc_lb_v1_LoadBalanceResponse_init_default {false, grpc_lb_v1_InitialLoadBalanceResponse_init_default, false, grpc_lb_v1_ServerList_init_default}
 #define grpc_lb_v1_InitialLoadBalanceResponse_init_default {false, "", false, grpc_lb_v1_Duration_init_default}
 #define grpc_lb_v1_ServerList_init_default       {{{NULL}, NULL}, false, grpc_lb_v1_Duration_init_default}
-#define grpc_lb_v1_Server_init_default           {false, {0, {0}}, false, 0, false, "", false, 0, false, 0}
+#define grpc_lb_v1_Server_init_default           {false, {0, {0}}, false, 0, false, "", false, 0}
 #define grpc_lb_v1_Duration_init_zero            {false, 0, false, 0}
 #define grpc_lb_v1_Timestamp_init_zero           {false, 0, false, 0}
 #define grpc_lb_v1_LoadBalanceRequest_init_zero  {false, grpc_lb_v1_InitialLoadBalanceRequest_init_zero, false, grpc_lb_v1_ClientStats_init_zero}
 #define grpc_lb_v1_InitialLoadBalanceRequest_init_zero {false, ""}
-#define grpc_lb_v1_ClientStats_init_zero         {false, grpc_lb_v1_Timestamp_init_zero, false, 0, false, 0, false, 0, false, 0, false, 0, false, 0}
+#define grpc_lb_v1_ClientStatsPerToken_init_zero {{{NULL}, NULL}, false, 0}
+#define grpc_lb_v1_ClientStats_init_zero         {false, grpc_lb_v1_Timestamp_init_zero, false, 0, false, 0, false, 0, false, 0, {{NULL}, NULL}}
 #define grpc_lb_v1_LoadBalanceResponse_init_zero {false, grpc_lb_v1_InitialLoadBalanceResponse_init_zero, false, grpc_lb_v1_ServerList_init_zero}
 #define grpc_lb_v1_InitialLoadBalanceResponse_init_zero {false, "", false, grpc_lb_v1_Duration_init_zero}
 #define grpc_lb_v1_ServerList_init_zero          {{{NULL}, NULL}, false, grpc_lb_v1_Duration_init_zero}
-#define grpc_lb_v1_Server_init_zero              {false, {0, {0}}, false, 0, false, "", false, 0, false, 0}
+#define grpc_lb_v1_Server_init_zero              {false, {0, {0}}, false, 0, false, "", false, 0}
 
 /* Field tags (for use in manual encoding/decoding) */
+#define grpc_lb_v1_ClientStatsPerToken_load_balance_token_tag 1
+#define grpc_lb_v1_ClientStatsPerToken_num_calls_tag 2
 #define grpc_lb_v1_Duration_seconds_tag          1
 #define grpc_lb_v1_Duration_nanos_tag            2
 #define grpc_lb_v1_InitialLoadBalanceRequest_name_tag 1
 #define grpc_lb_v1_Server_ip_address_tag         1
 #define grpc_lb_v1_Server_port_tag               2
 #define grpc_lb_v1_Server_load_balance_token_tag 3
-#define grpc_lb_v1_Server_drop_for_rate_limiting_tag 4
-#define grpc_lb_v1_Server_drop_for_load_balancing_tag 5
+#define grpc_lb_v1_Server_drop_tag               4
 #define grpc_lb_v1_Timestamp_seconds_tag         1
 #define grpc_lb_v1_Timestamp_nanos_tag           2
 #define grpc_lb_v1_ClientStats_timestamp_tag     1
 #define grpc_lb_v1_ClientStats_num_calls_started_tag 2
 #define grpc_lb_v1_ClientStats_num_calls_finished_tag 3
-#define grpc_lb_v1_ClientStats_num_calls_finished_with_drop_for_rate_limiting_tag 4
-#define grpc_lb_v1_ClientStats_num_calls_finished_with_drop_for_load_balancing_tag 5
 #define grpc_lb_v1_ClientStats_num_calls_finished_with_client_failed_to_send_tag 6
 #define grpc_lb_v1_ClientStats_num_calls_finished_known_received_tag 7
+#define grpc_lb_v1_ClientStats_calls_finished_with_drop_tag 8
 #define grpc_lb_v1_InitialLoadBalanceResponse_load_balancer_delegate_tag 1
 #define grpc_lb_v1_InitialLoadBalanceResponse_client_stats_report_interval_tag 2
 #define grpc_lb_v1_ServerList_servers_tag        1
@@ -154,22 +158,24 @@
 extern const pb_field_t grpc_lb_v1_Timestamp_fields[3];
 extern const pb_field_t grpc_lb_v1_LoadBalanceRequest_fields[3];
 extern const pb_field_t grpc_lb_v1_InitialLoadBalanceRequest_fields[2];
-extern const pb_field_t grpc_lb_v1_ClientStats_fields[8];
+extern const pb_field_t grpc_lb_v1_ClientStatsPerToken_fields[3];
+extern const pb_field_t grpc_lb_v1_ClientStats_fields[7];
 extern const pb_field_t grpc_lb_v1_LoadBalanceResponse_fields[3];
 extern const pb_field_t grpc_lb_v1_InitialLoadBalanceResponse_fields[3];
 extern const pb_field_t grpc_lb_v1_ServerList_fields[3];
-extern const pb_field_t grpc_lb_v1_Server_fields[6];
+extern const pb_field_t grpc_lb_v1_Server_fields[5];
 
 /* Maximum encoded size of messages (where known) */
 #define grpc_lb_v1_Duration_size                 22
 #define grpc_lb_v1_Timestamp_size                22
-#define grpc_lb_v1_LoadBalanceRequest_size       226
+#define grpc_lb_v1_LoadBalanceRequest_size       (140 + grpc_lb_v1_ClientStats_size)
 #define grpc_lb_v1_InitialLoadBalanceRequest_size 131
-#define grpc_lb_v1_ClientStats_size              90
+/* grpc_lb_v1_ClientStatsPerToken_size depends on runtime parameters */
+/* grpc_lb_v1_ClientStats_size depends on runtime parameters */
 #define grpc_lb_v1_LoadBalanceResponse_size      (98 + grpc_lb_v1_ServerList_size)
 #define grpc_lb_v1_InitialLoadBalanceResponse_size 90
 /* grpc_lb_v1_ServerList_size depends on runtime parameters */
-#define grpc_lb_v1_Server_size                   85
+#define grpc_lb_v1_Server_size                   83
 
 /* Message IDs (where set with "msgid" option) */
 #ifdef PB_MSGID
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
index bc40165..a7f7e95 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
@@ -74,6 +74,9 @@
   bool started_picking;
   /** are we shutting down? */
   bool shutdown;
+  /** has the policy gotten into the GRPC_CHANNEL_SHUTDOWN? No picks can be
+   * service after this point, the policy will never transition out. */
+  bool in_connectivity_shutdown;
   /** List of picks that are waiting on connectivity */
   pending_pick *pending_picks;
 
@@ -420,6 +423,8 @@
                           grpc_call_context_element *context, void **user_data,
                           grpc_closure *on_complete) {
   round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
+  GPR_ASSERT(!p->shutdown);
+  GPR_ASSERT(!p->in_connectivity_shutdown);
   if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
     gpr_log(GPR_INFO, "[RR %p] Trying to pick", (void *)pol);
   }
@@ -532,6 +537,7 @@
     grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
                                 GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error),
                                 "rr_shutdown");
+    p->in_connectivity_shutdown = true;
     new_state = GRPC_CHANNEL_SHUTDOWN;
   } else if (subchannel_list->num_transient_failures ==
              p->subchannel_list->num_subchannels) { /* 4) TRANSIENT_FAILURE */
diff --git a/src/core/ext/filters/http/client/http_client_filter.c b/src/core/ext/filters/http/client/http_client_filter.c
index 90f0aed..3ca01a4 100644
--- a/src/core/ext/filters/http/client/http_client_filter.c
+++ b/src/core/ext/filters/http/client/http_client_filter.c
@@ -36,41 +36,29 @@
 static const size_t kMaxPayloadSizeForGet = 2048;
 
 typedef struct call_data {
+  // State for handling send_initial_metadata ops.
   grpc_linked_mdelem method;
   grpc_linked_mdelem scheme;
   grpc_linked_mdelem authority;
   grpc_linked_mdelem te_trailers;
   grpc_linked_mdelem content_type;
   grpc_linked_mdelem user_agent;
-
+  // State for handling recv_initial_metadata ops.
   grpc_metadata_batch *recv_initial_metadata;
+  grpc_closure *original_recv_initial_metadata_ready;
+  grpc_closure recv_initial_metadata_ready;
+  // State for handling recv_trailing_metadata ops.
   grpc_metadata_batch *recv_trailing_metadata;
-  uint8_t *payload_bytes;
-
-  /* Vars to read data off of send_message */
-  grpc_transport_stream_op_batch *send_op;
-  uint32_t send_length;
-  uint32_t send_flags;
-  grpc_slice incoming_slice;
-  grpc_slice_buffer_stream replacement_stream;
-  grpc_slice_buffer slices;
-  /* flag that indicates that all slices of send_messages aren't availble */
-  bool send_message_blocked;
-
-  /** Closure to call when finished with the hc_on_recv hook */
-  grpc_closure *on_done_recv_initial_metadata;
-  grpc_closure *on_done_recv_trailing_metadata;
-  grpc_closure *on_complete;
-  grpc_closure *post_send;
-
-  /** Receive closures are chained: we inject this closure as the on_done_recv
-      up-call on transport_op, and remember to call our on_done_recv member
-      after handling it. */
-  grpc_closure hc_on_recv_initial_metadata;
-  grpc_closure hc_on_recv_trailing_metadata;
-  grpc_closure hc_on_complete;
-  grpc_closure got_slice;
-  grpc_closure send_done;
+  grpc_closure *original_recv_trailing_metadata_on_complete;
+  grpc_closure recv_trailing_metadata_on_complete;
+  // State for handling send_message ops.
+  grpc_transport_stream_op_batch *send_message_batch;
+  size_t send_message_bytes_read;
+  grpc_byte_stream_cache send_message_cache;
+  grpc_caching_byte_stream send_message_caching_stream;
+  grpc_closure on_send_message_next_done;
+  grpc_closure *original_send_message_on_complete;
+  grpc_closure send_message_on_complete;
 } call_data;
 
 typedef struct channel_data {
@@ -148,7 +136,7 @@
   return GRPC_ERROR_NONE;
 }
 
-static void hc_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx,
+static void recv_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
                                         void *user_data, grpc_error *error) {
   grpc_call_element *elem = user_data;
   call_data *calld = elem->call_data;
@@ -158,11 +146,13 @@
   } else {
     GRPC_ERROR_REF(error);
   }
-  GRPC_CLOSURE_RUN(exec_ctx, calld->on_done_recv_initial_metadata, error);
+  GRPC_CLOSURE_RUN(exec_ctx, calld->original_recv_initial_metadata_ready,
+                   error);
 }
 
-static void hc_on_recv_trailing_metadata(grpc_exec_ctx *exec_ctx,
-                                         void *user_data, grpc_error *error) {
+static void recv_trailing_metadata_on_complete(grpc_exec_ctx *exec_ctx,
+                                               void *user_data,
+                                               grpc_error *error) {
   grpc_call_element *elem = user_data;
   call_data *calld = elem->call_data;
   if (error == GRPC_ERROR_NONE) {
@@ -171,25 +161,131 @@
   } else {
     GRPC_ERROR_REF(error);
   }
-  GRPC_CLOSURE_RUN(exec_ctx, calld->on_done_recv_trailing_metadata, error);
+  GRPC_CLOSURE_RUN(exec_ctx, calld->original_recv_trailing_metadata_on_complete,
+                   error);
 }
 
-static void hc_on_complete(grpc_exec_ctx *exec_ctx, void *user_data,
-                           grpc_error *error) {
-  grpc_call_element *elem = user_data;
-  call_data *calld = elem->call_data;
-  if (calld->payload_bytes) {
-    gpr_free(calld->payload_bytes);
-    calld->payload_bytes = NULL;
+static void send_message_on_complete(grpc_exec_ctx *exec_ctx, void *arg,
+                                     grpc_error *error) {
+  grpc_call_element *elem = (grpc_call_element *)arg;
+  call_data *calld = (call_data *)elem->call_data;
+  grpc_byte_stream_cache_destroy(exec_ctx, &calld->send_message_cache);
+  GRPC_CLOSURE_RUN(exec_ctx, calld->original_send_message_on_complete,
+                   GRPC_ERROR_REF(error));
+}
+
+// Pulls a slice from the send_message byte stream, updating
+// calld->send_message_bytes_read.
+static grpc_error *pull_slice_from_send_message(grpc_exec_ctx *exec_ctx,
+                                                call_data *calld) {
+  grpc_slice incoming_slice;
+  grpc_error *error = grpc_byte_stream_pull(
+      exec_ctx, &calld->send_message_caching_stream.base, &incoming_slice);
+  if (error == GRPC_ERROR_NONE) {
+    calld->send_message_bytes_read += GRPC_SLICE_LENGTH(incoming_slice);
+    grpc_slice_unref_internal(exec_ctx, incoming_slice);
   }
-  calld->on_complete->cb(exec_ctx, calld->on_complete->cb_arg, error);
+  return error;
 }
 
-static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
-  grpc_call_element *elem = elemp;
-  call_data *calld = elem->call_data;
-  grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &calld->slices);
-  calld->post_send->cb(exec_ctx, calld->post_send->cb_arg, error);
+// Reads as many slices as possible from the send_message byte stream.
+// Upon successful return, if calld->send_message_bytes_read ==
+// calld->send_message_caching_stream.base.length, then we have completed
+// reading from the byte stream; otherwise, an async read has been dispatched
+// and on_send_message_next_done() will be invoked when it is complete.
+static grpc_error *read_all_available_send_message_data(grpc_exec_ctx *exec_ctx,
+                                                        call_data *calld) {
+  while (grpc_byte_stream_next(exec_ctx,
+                               &calld->send_message_caching_stream.base,
+                               ~(size_t)0, &calld->on_send_message_next_done)) {
+    grpc_error *error = pull_slice_from_send_message(exec_ctx, calld);
+    if (error != GRPC_ERROR_NONE) return error;
+    if (calld->send_message_bytes_read ==
+        calld->send_message_caching_stream.base.length) {
+      break;
+    }
+  }
+  return GRPC_ERROR_NONE;
+}
+
+// Async callback for grpc_byte_stream_next().
+static void on_send_message_next_done(grpc_exec_ctx *exec_ctx, void *arg,
+                                      grpc_error *error) {
+  grpc_call_element *elem = (grpc_call_element *)arg;
+  call_data *calld = (call_data *)elem->call_data;
+  if (error != GRPC_ERROR_NONE) {
+    grpc_transport_stream_op_batch_finish_with_failure(
+        exec_ctx, calld->send_message_batch, error);
+    return;
+  }
+  error = pull_slice_from_send_message(exec_ctx, calld);
+  if (error != GRPC_ERROR_NONE) {
+    grpc_transport_stream_op_batch_finish_with_failure(
+        exec_ctx, calld->send_message_batch, error);
+    return;
+  }
+  // There may or may not be more to read, but we don't care.  If we got
+  // here, then we know that all of the data was not available
+  // synchronously, so we were not able to do a cached call.  Instead,
+  // we just reset the byte stream and then send down the batch as-is.
+  grpc_caching_byte_stream_reset(&calld->send_message_caching_stream);
+  grpc_call_next_op(exec_ctx, elem, calld->send_message_batch);
+}
+
+static char *slice_buffer_to_string(grpc_slice_buffer *slice_buffer) {
+  char *payload_bytes = gpr_malloc(slice_buffer->length + 1);
+  size_t offset = 0;
+  for (size_t i = 0; i < slice_buffer->count; ++i) {
+    memcpy(payload_bytes + offset,
+           GRPC_SLICE_START_PTR(slice_buffer->slices[i]),
+           GRPC_SLICE_LENGTH(slice_buffer->slices[i]));
+    offset += GRPC_SLICE_LENGTH(slice_buffer->slices[i]);
+  }
+  *(payload_bytes + offset) = '\0';
+  return payload_bytes;
+}
+
+// Modifies the path entry in the batch's send_initial_metadata to
+// append the base64-encoded query for a GET request.
+static grpc_error *update_path_for_get(grpc_exec_ctx *exec_ctx,
+                                       grpc_call_element *elem,
+                                       grpc_transport_stream_op_batch *batch) {
+  call_data *calld = (call_data *)elem->call_data;
+  grpc_slice path_slice =
+      GRPC_MDVALUE(batch->payload->send_initial_metadata.send_initial_metadata
+                       ->idx.named.path->md);
+  /* sum up individual component's lengths and allocate enough memory to
+   * hold combined path+query */
+  size_t estimated_len = GRPC_SLICE_LENGTH(path_slice);
+  estimated_len++; /* for the '?' */
+  estimated_len += grpc_base64_estimate_encoded_size(
+      batch->payload->send_message.send_message->length, true /* url_safe */,
+      false /* multi_line */);
+  grpc_slice path_with_query_slice = GRPC_SLICE_MALLOC(estimated_len);
+  /* memcopy individual pieces into this slice */
+  char *write_ptr = (char *)GRPC_SLICE_START_PTR(path_with_query_slice);
+  char *original_path = (char *)GRPC_SLICE_START_PTR(path_slice);
+  memcpy(write_ptr, original_path, GRPC_SLICE_LENGTH(path_slice));
+  write_ptr += GRPC_SLICE_LENGTH(path_slice);
+  *write_ptr++ = '?';
+  char *payload_bytes =
+      slice_buffer_to_string(&calld->send_message_cache.cache_buffer);
+  grpc_base64_encode_core((char *)write_ptr, payload_bytes,
+                          batch->payload->send_message.send_message->length,
+                          true /* url_safe */, false /* multi_line */);
+  gpr_free(payload_bytes);
+  /* remove trailing unused memory and add trailing 0 to terminate string */
+  char *t = (char *)GRPC_SLICE_START_PTR(path_with_query_slice);
+  /* safe to use strlen since base64_encode will always add '\0' */
+  path_with_query_slice =
+      grpc_slice_sub_no_ref(path_with_query_slice, 0, strlen(t));
+  /* substitute previous path with the new path+query */
+  grpc_mdelem mdelem_path_and_query =
+      grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_PATH, path_with_query_slice);
+  grpc_metadata_batch *b =
+      batch->payload->send_initial_metadata.send_initial_metadata;
+  return grpc_metadata_batch_substitute(exec_ctx, b, b->idx.named.path,
+                                        mdelem_path_and_query);
 }
 
 static void remove_if_present(grpc_exec_ctx *exec_ctx,
@@ -200,273 +296,153 @@
   }
 }
 
-static void continue_send_message(grpc_exec_ctx *exec_ctx,
-                                  grpc_call_element *elem) {
-  call_data *calld = elem->call_data;
-  uint8_t *wrptr = calld->payload_bytes;
-  while (grpc_byte_stream_next(
-      exec_ctx, calld->send_op->payload->send_message.send_message, ~(size_t)0,
-      &calld->got_slice)) {
-    grpc_byte_stream_pull(exec_ctx,
-                          calld->send_op->payload->send_message.send_message,
-                          &calld->incoming_slice);
-    if (GRPC_SLICE_LENGTH(calld->incoming_slice) > 0) {
-      memcpy(wrptr, GRPC_SLICE_START_PTR(calld->incoming_slice),
-             GRPC_SLICE_LENGTH(calld->incoming_slice));
-    }
-    wrptr += GRPC_SLICE_LENGTH(calld->incoming_slice);
-    grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
-    if (calld->send_length == calld->slices.length) {
-      calld->send_message_blocked = false;
-      break;
-    }
-  }
-}
-
-static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
-  grpc_call_element *elem = elemp;
-  call_data *calld = elem->call_data;
-  calld->send_message_blocked = false;
-  if (GRPC_ERROR_NONE !=
-      grpc_byte_stream_pull(exec_ctx,
-                            calld->send_op->payload->send_message.send_message,
-                            &calld->incoming_slice)) {
-    /* Should never reach here */
-    abort();
-  }
-  grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
-  if (calld->send_length == calld->slices.length) {
-    /* Pass down the original send_message op that was blocked.*/
-    grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices,
-                                  calld->send_flags);
-    calld->send_op->payload->send_message.send_message =
-        &calld->replacement_stream.base;
-    calld->post_send = calld->send_op->on_complete;
-    calld->send_op->on_complete = &calld->send_done;
-    grpc_call_next_op(exec_ctx, elem, calld->send_op);
-  } else {
-    continue_send_message(exec_ctx, elem);
-  }
-}
-
-static grpc_error *hc_mutate_op(grpc_exec_ctx *exec_ctx,
-                                grpc_call_element *elem,
-                                grpc_transport_stream_op_batch *op) {
-  /* grab pointers to our data from the call element */
+static void hc_start_transport_stream_op_batch(
+    grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+    grpc_transport_stream_op_batch *batch) {
   call_data *calld = elem->call_data;
   channel_data *channeld = elem->channel_data;
-  grpc_error *error;
+  GPR_TIMER_BEGIN("hc_start_transport_stream_op_batch", 0);
+  GRPC_CALL_LOG_OP(GPR_INFO, elem, batch);
 
-  if (op->send_initial_metadata) {
-    /* Decide which HTTP VERB to use. We use GET if the request is marked
-    cacheable, and the operation contains both initial metadata and send
-    message, and the payload is below the size threshold, and all the data
-    for this request is immediately available. */
+  if (batch->recv_initial_metadata) {
+    /* substitute our callback for the higher callback */
+    calld->recv_initial_metadata =
+        batch->payload->recv_initial_metadata.recv_initial_metadata;
+    calld->original_recv_initial_metadata_ready =
+        batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
+    batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
+        &calld->recv_initial_metadata_ready;
+  }
+
+  if (batch->recv_trailing_metadata) {
+    /* substitute our callback for the higher callback */
+    calld->recv_trailing_metadata =
+        batch->payload->recv_trailing_metadata.recv_trailing_metadata;
+    calld->original_recv_trailing_metadata_on_complete = batch->on_complete;
+    batch->on_complete = &calld->recv_trailing_metadata_on_complete;
+  }
+
+  grpc_error *error = GRPC_ERROR_NONE;
+  bool batch_will_be_handled_asynchronously = false;
+  if (batch->send_initial_metadata) {
+    // Decide which HTTP VERB to use. We use GET if the request is marked
+    // cacheable, and the operation contains both initial metadata and send
+    // message, and the payload is below the size threshold, and all the data
+    // for this request is immediately available.
     grpc_mdelem method = GRPC_MDELEM_METHOD_POST;
-    if (op->send_message &&
-        (op->payload->send_initial_metadata.send_initial_metadata_flags &
+    if (batch->send_message &&
+        (batch->payload->send_initial_metadata.send_initial_metadata_flags &
          GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) &&
-        op->payload->send_message.send_message->length <
+        batch->payload->send_message.send_message->length <
             channeld->max_payload_size_for_get) {
-      method = GRPC_MDELEM_METHOD_GET;
-      /* The following write to calld->send_message_blocked isn't racy with
-      reads in hc_start_transport_op (which deals with SEND_MESSAGE ops) because
-      being here means ops->send_message is not NULL, which is primarily
-      guarding the read there. */
-      calld->send_message_blocked = true;
-    } else if (op->payload->send_initial_metadata.send_initial_metadata_flags &
+      calld->send_message_bytes_read = 0;
+      grpc_byte_stream_cache_init(&calld->send_message_cache,
+                                  batch->payload->send_message.send_message);
+      grpc_caching_byte_stream_init(&calld->send_message_caching_stream,
+                                    &calld->send_message_cache);
+      batch->payload->send_message.send_message =
+          &calld->send_message_caching_stream.base;
+      calld->original_send_message_on_complete = batch->on_complete;
+      batch->on_complete = &calld->send_message_on_complete;
+      calld->send_message_batch = batch;
+      error = read_all_available_send_message_data(exec_ctx, calld);
+      if (error != GRPC_ERROR_NONE) goto done;
+      // If all the data has been read, then we can use GET.
+      if (calld->send_message_bytes_read ==
+          calld->send_message_caching_stream.base.length) {
+        method = GRPC_MDELEM_METHOD_GET;
+        error = update_path_for_get(exec_ctx, elem, batch);
+        if (error != GRPC_ERROR_NONE) goto done;
+        batch->send_message = false;
+        grpc_byte_stream_destroy(exec_ctx,
+                                 &calld->send_message_caching_stream.base);
+      } else {
+        // Not all data is available.  The batch will be sent down
+        // asynchronously in on_send_message_next_done().
+        batch_will_be_handled_asynchronously = true;
+        // Fall back to POST.
+        gpr_log(GPR_DEBUG,
+                "Request is marked Cacheable but not all data is available.  "
+                "Falling back to POST");
+      }
+    } else if (batch->payload->send_initial_metadata
+                   .send_initial_metadata_flags &
                GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) {
       method = GRPC_MDELEM_METHOD_PUT;
     }
 
-    /* Attempt to read the data from send_message and create a header field. */
-    if (grpc_mdelem_eq(method, GRPC_MDELEM_METHOD_GET)) {
-      /* allocate memory to hold the entire payload */
-      calld->payload_bytes =
-          gpr_malloc(op->payload->send_message.send_message->length);
-
-      /* read slices of send_message and copy into payload_bytes */
-      calld->send_op = op;
-      calld->send_length = op->payload->send_message.send_message->length;
-      calld->send_flags = op->payload->send_message.send_message->flags;
-      continue_send_message(exec_ctx, elem);
-
-      if (calld->send_message_blocked == false) {
-        /* when all the send_message data is available, then modify the path
-         * MDELEM by appending base64 encoded query to the path */
-        const int k_url_safe = 1;
-        const int k_multi_line = 0;
-        const unsigned char k_query_separator = '?';
-
-        grpc_slice path_slice =
-            GRPC_MDVALUE(op->payload->send_initial_metadata
-                             .send_initial_metadata->idx.named.path->md);
-        /* sum up individual component's lengths and allocate enough memory to
-         * hold combined path+query */
-        size_t estimated_len = GRPC_SLICE_LENGTH(path_slice);
-        estimated_len++; /* for the '?' */
-        estimated_len += grpc_base64_estimate_encoded_size(
-            op->payload->send_message.send_message->length, k_url_safe,
-            k_multi_line);
-        grpc_slice path_with_query_slice = GRPC_SLICE_MALLOC(estimated_len);
-
-        /* memcopy individual pieces into this slice */
-        uint8_t *write_ptr =
-            (uint8_t *)GRPC_SLICE_START_PTR(path_with_query_slice);
-        uint8_t *original_path = (uint8_t *)GRPC_SLICE_START_PTR(path_slice);
-        memcpy(write_ptr, original_path, GRPC_SLICE_LENGTH(path_slice));
-        write_ptr += GRPC_SLICE_LENGTH(path_slice);
-
-        *write_ptr = k_query_separator;
-        write_ptr++; /* for the '?' */
-
-        grpc_base64_encode_core((char *)write_ptr, calld->payload_bytes,
-                                op->payload->send_message.send_message->length,
-                                k_url_safe, k_multi_line);
-
-        /* remove trailing unused memory and add trailing 0 to terminate string
-         */
-        char *t = (char *)GRPC_SLICE_START_PTR(path_with_query_slice);
-        /* safe to use strlen since base64_encode will always add '\0' */
-        path_with_query_slice =
-            grpc_slice_sub_no_ref(path_with_query_slice, 0, strlen(t));
-
-        /* substitute previous path with the new path+query */
-        grpc_mdelem mdelem_path_and_query = grpc_mdelem_from_slices(
-            exec_ctx, GRPC_MDSTR_PATH, path_with_query_slice);
-        grpc_metadata_batch *b =
-            op->payload->send_initial_metadata.send_initial_metadata;
-        error = grpc_metadata_batch_substitute(exec_ctx, b, b->idx.named.path,
-                                               mdelem_path_and_query);
-        if (error != GRPC_ERROR_NONE) return error;
-
-        calld->on_complete = op->on_complete;
-        op->on_complete = &calld->hc_on_complete;
-        op->send_message = false;
-      } else {
-        /* Not all data is available. Fall back to POST. */
-        gpr_log(GPR_DEBUG,
-                "Request is marked Cacheable but not all data is available.\
-                            Falling back to POST");
-        method = GRPC_MDELEM_METHOD_POST;
-      }
-    }
-
-    remove_if_present(exec_ctx,
-                      op->payload->send_initial_metadata.send_initial_metadata,
-                      GRPC_BATCH_METHOD);
-    remove_if_present(exec_ctx,
-                      op->payload->send_initial_metadata.send_initial_metadata,
-                      GRPC_BATCH_SCHEME);
-    remove_if_present(exec_ctx,
-                      op->payload->send_initial_metadata.send_initial_metadata,
-                      GRPC_BATCH_TE);
-    remove_if_present(exec_ctx,
-                      op->payload->send_initial_metadata.send_initial_metadata,
-                      GRPC_BATCH_CONTENT_TYPE);
-    remove_if_present(exec_ctx,
-                      op->payload->send_initial_metadata.send_initial_metadata,
-                      GRPC_BATCH_USER_AGENT);
+    remove_if_present(
+        exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata,
+        GRPC_BATCH_METHOD);
+    remove_if_present(
+        exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata,
+        GRPC_BATCH_SCHEME);
+    remove_if_present(
+        exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata,
+        GRPC_BATCH_TE);
+    remove_if_present(
+        exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata,
+        GRPC_BATCH_CONTENT_TYPE);
+    remove_if_present(
+        exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata,
+        GRPC_BATCH_USER_AGENT);
 
     /* Send : prefixed headers, which have to be before any application
        layer headers. */
     error = grpc_metadata_batch_add_head(
-        exec_ctx, op->payload->send_initial_metadata.send_initial_metadata,
+        exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata,
         &calld->method, method);
-    if (error != GRPC_ERROR_NONE) return error;
+    if (error != GRPC_ERROR_NONE) goto done;
     error = grpc_metadata_batch_add_head(
-        exec_ctx, op->payload->send_initial_metadata.send_initial_metadata,
+        exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata,
         &calld->scheme, channeld->static_scheme);
-    if (error != GRPC_ERROR_NONE) return error;
+    if (error != GRPC_ERROR_NONE) goto done;
     error = grpc_metadata_batch_add_tail(
-        exec_ctx, op->payload->send_initial_metadata.send_initial_metadata,
+        exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata,
         &calld->te_trailers, GRPC_MDELEM_TE_TRAILERS);
-    if (error != GRPC_ERROR_NONE) return error;
+    if (error != GRPC_ERROR_NONE) goto done;
     error = grpc_metadata_batch_add_tail(
-        exec_ctx, op->payload->send_initial_metadata.send_initial_metadata,
+        exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata,
         &calld->content_type, GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC);
-    if (error != GRPC_ERROR_NONE) return error;
+    if (error != GRPC_ERROR_NONE) goto done;
     error = grpc_metadata_batch_add_tail(
-        exec_ctx, op->payload->send_initial_metadata.send_initial_metadata,
+        exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata,
         &calld->user_agent, GRPC_MDELEM_REF(channeld->user_agent));
-    if (error != GRPC_ERROR_NONE) return error;
+    if (error != GRPC_ERROR_NONE) goto done;
   }
 
-  if (op->recv_initial_metadata) {
-    /* substitute our callback for the higher callback */
-    calld->recv_initial_metadata =
-        op->payload->recv_initial_metadata.recv_initial_metadata;
-    calld->on_done_recv_initial_metadata =
-        op->payload->recv_initial_metadata.recv_initial_metadata_ready;
-    op->payload->recv_initial_metadata.recv_initial_metadata_ready =
-        &calld->hc_on_recv_initial_metadata;
-  }
-
-  if (op->recv_trailing_metadata) {
-    /* substitute our callback for the higher callback */
-    calld->recv_trailing_metadata =
-        op->payload->recv_trailing_metadata.recv_trailing_metadata;
-    calld->on_done_recv_trailing_metadata = op->on_complete;
-    op->on_complete = &calld->hc_on_recv_trailing_metadata;
-  }
-
-  return GRPC_ERROR_NONE;
-}
-
-static void hc_start_transport_op(grpc_exec_ctx *exec_ctx,
-                                  grpc_call_element *elem,
-                                  grpc_transport_stream_op_batch *op) {
-  GPR_TIMER_BEGIN("hc_start_transport_op", 0);
-  GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
-  grpc_error *error = hc_mutate_op(exec_ctx, elem, op);
+done:
   if (error != GRPC_ERROR_NONE) {
-    grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
-  } else {
-    call_data *calld = elem->call_data;
-    if (op->send_message && calld->send_message_blocked) {
-      /* Don't forward the op. send_message contains slices that aren't ready
-         yet. The call will be forwarded by the op_complete of slice read call.
-      */
-    } else {
-      grpc_call_next_op(exec_ctx, elem, op);
-    }
+    grpc_transport_stream_op_batch_finish_with_failure(
+        exec_ctx, calld->send_message_batch, error);
+  } else if (!batch_will_be_handled_asynchronously) {
+    grpc_call_next_op(exec_ctx, elem, batch);
   }
-  GPR_TIMER_END("hc_start_transport_op", 0);
+  GPR_TIMER_END("hc_start_transport_stream_op_batch", 0);
 }
 
 /* Constructor for call_data */
 static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
                                   grpc_call_element *elem,
                                   const grpc_call_element_args *args) {
-  call_data *calld = elem->call_data;
-  calld->on_done_recv_initial_metadata = NULL;
-  calld->on_done_recv_trailing_metadata = NULL;
-  calld->on_complete = NULL;
-  calld->payload_bytes = NULL;
-  calld->send_message_blocked = false;
-  grpc_slice_buffer_init(&calld->slices);
-  GRPC_CLOSURE_INIT(&calld->hc_on_recv_initial_metadata,
-                    hc_on_recv_initial_metadata, elem,
+  call_data *calld = (call_data *)elem->call_data;
+  GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
+                    recv_initial_metadata_ready, elem,
                     grpc_schedule_on_exec_ctx);
-  GRPC_CLOSURE_INIT(&calld->hc_on_recv_trailing_metadata,
-                    hc_on_recv_trailing_metadata, elem,
+  GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_on_complete,
+                    recv_trailing_metadata_on_complete, elem,
                     grpc_schedule_on_exec_ctx);
-  GRPC_CLOSURE_INIT(&calld->hc_on_complete, hc_on_complete, elem,
-                    grpc_schedule_on_exec_ctx);
-  GRPC_CLOSURE_INIT(&calld->got_slice, got_slice, elem,
-                    grpc_schedule_on_exec_ctx);
-  GRPC_CLOSURE_INIT(&calld->send_done, send_done, elem,
-                    grpc_schedule_on_exec_ctx);
+  GRPC_CLOSURE_INIT(&calld->send_message_on_complete, send_message_on_complete,
+                    elem, grpc_schedule_on_exec_ctx);
+  GRPC_CLOSURE_INIT(&calld->on_send_message_next_done,
+                    on_send_message_next_done, elem, grpc_schedule_on_exec_ctx);
   return GRPC_ERROR_NONE;
 }
 
 /* Destructor for call_data */
 static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
                               const grpc_call_final_info *final_info,
-                              grpc_closure *ignored) {
-  call_data *calld = elem->call_data;
-  grpc_slice_buffer_destroy_internal(exec_ctx, &calld->slices);
-}
+                              grpc_closure *ignored) {}
 
 static grpc_mdelem scheme_from_args(const grpc_channel_args *args) {
   unsigned i;
@@ -580,7 +556,7 @@
 }
 
 const grpc_channel_filter grpc_http_client_filter = {
-    hc_start_transport_op,
+    hc_start_transport_stream_op_batch,
     grpc_channel_next_op,
     sizeof(call_data),
     init_call_elem,
diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.c b/src/core/ext/filters/http/message_compress/message_compress_filter.c
index 71a8bc5..20a3488 100644
--- a/src/core/ext/filters/http/message_compress/message_compress_filter.c
+++ b/src/core/ext/filters/http/message_compress/message_compress_filter.c
@@ -61,14 +61,11 @@
      pointer | CANCELLED_BIT - request was cancelled with error pointed to */
   gpr_atm send_initial_metadata_state;
 
-  grpc_transport_stream_op_batch *send_op;
-  uint32_t send_length;
-  uint32_t send_flags;
-  grpc_slice incoming_slice;
+  grpc_transport_stream_op_batch *send_message_batch;
   grpc_slice_buffer_stream replacement_stream;
-  grpc_closure *post_send;
-  grpc_closure send_done;
-  grpc_closure got_slice;
+  grpc_closure *original_send_message_on_complete;
+  grpc_closure send_message_on_complete;
+  grpc_closure on_send_message_next_done;
 } call_data;
 
 typedef struct channel_data {
@@ -164,24 +161,25 @@
   return error;
 }
 
-static void continue_send_message(grpc_exec_ctx *exec_ctx,
-                                  grpc_call_element *elem);
-
-static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
-  grpc_call_element *elem = elemp;
-  call_data *calld = elem->call_data;
+static void send_message_on_complete(grpc_exec_ctx *exec_ctx, void *arg,
+                                     grpc_error *error) {
+  grpc_call_element *elem = (grpc_call_element *)arg;
+  call_data *calld = (call_data *)elem->call_data;
   grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &calld->slices);
-  calld->post_send->cb(exec_ctx, calld->post_send->cb_arg, error);
+  GRPC_CLOSURE_RUN(exec_ctx, calld->original_send_message_on_complete,
+                   GRPC_ERROR_REF(error));
 }
 
 static void finish_send_message(grpc_exec_ctx *exec_ctx,
                                 grpc_call_element *elem) {
-  call_data *calld = elem->call_data;
-  int did_compress;
+  call_data *calld = (call_data *)elem->call_data;
+  // Compress the data if appropriate.
   grpc_slice_buffer tmp;
   grpc_slice_buffer_init(&tmp);
-  did_compress = grpc_msg_compress(exec_ctx, calld->compression_algorithm,
-                                   &calld->slices, &tmp);
+  uint32_t send_flags =
+      calld->send_message_batch->payload->send_message.send_message->flags;
+  const bool did_compress = grpc_msg_compress(
+      exec_ctx, calld->compression_algorithm, &calld->slices, &tmp);
   if (did_compress) {
     if (GRPC_TRACER_ON(grpc_compression_trace)) {
       char *algo_name;
@@ -195,7 +193,7 @@
               algo_name, before_size, after_size, 100 * savings_ratio);
     }
     grpc_slice_buffer_swap(&calld->slices, &tmp);
-    calld->send_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
+    send_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
   } else {
     if (GRPC_TRACER_ON(grpc_compression_trace)) {
       char *algo_name;
@@ -207,83 +205,118 @@
               algo_name, calld->slices.length);
     }
   }
-
   grpc_slice_buffer_destroy_internal(exec_ctx, &tmp);
-
+  // Swap out the original byte stream with our new one and send the
+  // batch down.
+  grpc_byte_stream_destroy(
+      exec_ctx, calld->send_message_batch->payload->send_message.send_message);
   grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices,
-                                calld->send_flags);
-  calld->send_op->payload->send_message.send_message =
+                                send_flags);
+  calld->send_message_batch->payload->send_message.send_message =
       &calld->replacement_stream.base;
-  calld->post_send = calld->send_op->on_complete;
-  calld->send_op->on_complete = &calld->send_done;
-
-  grpc_call_next_op(exec_ctx, elem, calld->send_op);
+  calld->original_send_message_on_complete =
+      calld->send_message_batch->on_complete;
+  calld->send_message_batch->on_complete = &calld->send_message_on_complete;
+  grpc_call_next_op(exec_ctx, elem, calld->send_message_batch);
 }
 
-static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
-  grpc_call_element *elem = elemp;
-  call_data *calld = elem->call_data;
-  if (GRPC_ERROR_NONE !=
-      grpc_byte_stream_pull(exec_ctx,
-                            calld->send_op->payload->send_message.send_message,
-                            &calld->incoming_slice)) {
-    /* Should never reach here */
-    abort();
+// Pulls a slice from the send_message byte stream and adds it to calld->slices.
+static grpc_error *pull_slice_from_send_message(grpc_exec_ctx *exec_ctx,
+                                                call_data *calld) {
+  grpc_slice incoming_slice;
+  grpc_error *error = grpc_byte_stream_pull(
+      exec_ctx, calld->send_message_batch->payload->send_message.send_message,
+      &incoming_slice);
+  if (error == GRPC_ERROR_NONE) {
+    grpc_slice_buffer_add(&calld->slices, incoming_slice);
   }
-  grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
-  if (calld->send_length == calld->slices.length) {
-    finish_send_message(exec_ctx, elem);
-  } else {
-    continue_send_message(exec_ctx, elem);
-  }
+  return error;
 }
 
-static void continue_send_message(grpc_exec_ctx *exec_ctx,
-                                  grpc_call_element *elem) {
-  call_data *calld = elem->call_data;
+// Reads as many slices as possible from the send_message byte stream.
+// If all data has been read, invokes finish_send_message().  Otherwise,
+// an async call to grpc_byte_stream_next() has been started, which will
+// eventually result in calling on_send_message_next_done().
+static grpc_error *continue_reading_send_message(grpc_exec_ctx *exec_ctx,
+                                                 grpc_call_element *elem) {
+  call_data *calld = (call_data *)elem->call_data;
   while (grpc_byte_stream_next(
-      exec_ctx, calld->send_op->payload->send_message.send_message, ~(size_t)0,
-      &calld->got_slice)) {
-    grpc_byte_stream_pull(exec_ctx,
-                          calld->send_op->payload->send_message.send_message,
-                          &calld->incoming_slice);
-    grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
-    if (calld->send_length == calld->slices.length) {
+      exec_ctx, calld->send_message_batch->payload->send_message.send_message,
+      ~(size_t)0, &calld->on_send_message_next_done)) {
+    grpc_error *error = pull_slice_from_send_message(exec_ctx, calld);
+    if (error != GRPC_ERROR_NONE) return error;
+    if (calld->slices.length ==
+        calld->send_message_batch->payload->send_message.send_message->length) {
       finish_send_message(exec_ctx, elem);
       break;
     }
   }
+  return GRPC_ERROR_NONE;
 }
 
-static void handle_send_message_batch(grpc_exec_ctx *exec_ctx,
-                                      grpc_call_element *elem,
-                                      grpc_transport_stream_op_batch *op,
-                                      bool has_compression_algorithm) {
-  call_data *calld = elem->call_data;
-  if (!skip_compression(elem, op->payload->send_message.send_message->flags,
+// Async callback for grpc_byte_stream_next().
+static void on_send_message_next_done(grpc_exec_ctx *exec_ctx, void *arg,
+                                      grpc_error *error) {
+  grpc_call_element *elem = (grpc_call_element *)arg;
+  call_data *calld = (call_data *)elem->call_data;
+  if (error != GRPC_ERROR_NONE) goto fail;
+  error = pull_slice_from_send_message(exec_ctx, calld);
+  if (error != GRPC_ERROR_NONE) goto fail;
+  if (calld->slices.length ==
+      calld->send_message_batch->payload->send_message.send_message->length) {
+    finish_send_message(exec_ctx, elem);
+  } else {
+    // This will either finish reading all of the data and invoke
+    // finish_send_message(), or else it will make an async call to
+    // grpc_byte_stream_next(), which will eventually result in calling
+    // this function again.
+    error = continue_reading_send_message(exec_ctx, elem);
+    if (error != GRPC_ERROR_NONE) goto fail;
+  }
+  return;
+fail:
+  grpc_transport_stream_op_batch_finish_with_failure(
+      exec_ctx, calld->send_message_batch, error);
+}
+
+static void start_send_message_batch(grpc_exec_ctx *exec_ctx,
+                                     grpc_call_element *elem,
+                                     grpc_transport_stream_op_batch *batch,
+                                     bool has_compression_algorithm) {
+  call_data *calld = (call_data *)elem->call_data;
+  if (!skip_compression(elem, batch->payload->send_message.send_message->flags,
                         has_compression_algorithm)) {
-    calld->send_op = op;
-    calld->send_length = op->payload->send_message.send_message->length;
-    calld->send_flags = op->payload->send_message.send_message->flags;
-    continue_send_message(exec_ctx, elem);
+    calld->send_message_batch = batch;
+    // This will either finish reading all of the data and invoke
+    // finish_send_message(), or else it will make an async call to
+    // grpc_byte_stream_next(), which will eventually result in calling
+    // on_send_message_next_done().
+    grpc_error *error = continue_reading_send_message(exec_ctx, elem);
+    if (error != GRPC_ERROR_NONE) {
+      grpc_transport_stream_op_batch_finish_with_failure(
+          exec_ctx, calld->send_message_batch, error);
+    }
   } else {
     /* pass control down the stack */
-    grpc_call_next_op(exec_ctx, elem, op);
+    grpc_call_next_op(exec_ctx, elem, batch);
   }
 }
 
 static void compress_start_transport_stream_op_batch(
     grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
-    grpc_transport_stream_op_batch *op) {
+    grpc_transport_stream_op_batch *batch) {
   call_data *calld = elem->call_data;
 
   GPR_TIMER_BEGIN("compress_start_transport_stream_op_batch", 0);
 
-  if (op->cancel_stream) {
-    GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error);
+  if (batch->cancel_stream) {
+    // TODO(roth): As part of the upcoming call combiner work, change
+    // this to call grpc_byte_stream_shutdown() on the incoming byte
+    // stream, to cancel any in-flight calls to grpc_byte_stream_next().
+    GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
     gpr_atm cur = gpr_atm_full_xchg(
         &calld->send_initial_metadata_state,
-        CANCELLED_BIT | (gpr_atm)op->payload->cancel_stream.cancel_error);
+        CANCELLED_BIT | (gpr_atm)batch->payload->cancel_stream.cancel_error);
     switch (cur) {
       case HAS_COMPRESSION_ALGORITHM:
       case NO_COMPRESSION_ALGORITHM:
@@ -293,7 +326,7 @@
         if ((cur & CANCELLED_BIT) == 0) {
           grpc_transport_stream_op_batch_finish_with_failure(
               exec_ctx, (grpc_transport_stream_op_batch *)cur,
-              GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error));
+              GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error));
         } else {
           GRPC_ERROR_UNREF((grpc_error *)(cur & ~CANCELLED_BIT));
         }
@@ -301,14 +334,15 @@
     }
   }
 
-  if (op->send_initial_metadata) {
+  if (batch->send_initial_metadata) {
     bool has_compression_algorithm;
     grpc_error *error = process_send_initial_metadata(
         exec_ctx, elem,
-        op->payload->send_initial_metadata.send_initial_metadata,
+        batch->payload->send_initial_metadata.send_initial_metadata,
         &has_compression_algorithm);
     if (error != GRPC_ERROR_NONE) {
-      grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
+      grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch,
+                                                         error);
       return;
     }
     gpr_atm cur;
@@ -324,32 +358,32 @@
         goto retry_send_im;
       }
       if (cur != INITIAL_METADATA_UNSEEN) {
-        handle_send_message_batch(exec_ctx, elem,
-                                  (grpc_transport_stream_op_batch *)cur,
-                                  has_compression_algorithm);
+        start_send_message_batch(exec_ctx, elem,
+                                 (grpc_transport_stream_op_batch *)cur,
+                                 has_compression_algorithm);
       }
     }
   }
-  if (op->send_message) {
+  if (batch->send_message) {
     gpr_atm cur;
   retry_send:
     cur = gpr_atm_acq_load(&calld->send_initial_metadata_state);
     switch (cur) {
       case INITIAL_METADATA_UNSEEN:
         if (!gpr_atm_rel_cas(&calld->send_initial_metadata_state, cur,
-                             (gpr_atm)op)) {
+                             (gpr_atm)batch)) {
           goto retry_send;
         }
         break;
       case HAS_COMPRESSION_ALGORITHM:
       case NO_COMPRESSION_ALGORITHM:
-        handle_send_message_batch(exec_ctx, elem, op,
-                                  cur == HAS_COMPRESSION_ALGORITHM);
+        start_send_message_batch(exec_ctx, elem, batch,
+                                 cur == HAS_COMPRESSION_ALGORITHM);
         break;
       default:
         if (cur & CANCELLED_BIT) {
           grpc_transport_stream_op_batch_finish_with_failure(
-              exec_ctx, op,
+              exec_ctx, batch,
               GRPC_ERROR_REF((grpc_error *)(cur & ~CANCELLED_BIT)));
         } else {
           /* >1 send_message concurrently */
@@ -358,7 +392,7 @@
     }
   } else {
     /* pass control down the stack */
-    grpc_call_next_op(exec_ctx, elem, op);
+    grpc_call_next_op(exec_ctx, elem, batch);
   }
 
   GPR_TIMER_END("compress_start_transport_stream_op_batch", 0);
@@ -373,10 +407,10 @@
 
   /* initialize members */
   grpc_slice_buffer_init(&calld->slices);
-  GRPC_CLOSURE_INIT(&calld->got_slice, got_slice, elem,
-                    grpc_schedule_on_exec_ctx);
-  GRPC_CLOSURE_INIT(&calld->send_done, send_done, elem,
-                    grpc_schedule_on_exec_ctx);
+  GRPC_CLOSURE_INIT(&calld->on_send_message_next_done,
+                    on_send_message_next_done, elem, grpc_schedule_on_exec_ctx);
+  GRPC_CLOSURE_INIT(&calld->send_message_on_complete, send_message_on_complete,
+                    elem, grpc_schedule_on_exec_ctx);
 
   return GRPC_ERROR_NONE;
 }
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 731ebf4..dc35f48 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -730,6 +730,14 @@
   grpc_slice_buffer_destroy_internal(exec_ctx,
                                      &s->unprocessed_incoming_frames_buffer);
   grpc_slice_buffer_destroy_internal(exec_ctx, &s->frame_storage);
+  if (s->compressed_data_buffer) {
+    grpc_slice_buffer_destroy_internal(exec_ctx, s->compressed_data_buffer);
+    gpr_free(s->compressed_data_buffer);
+  }
+  if (s->decompressed_data_buffer) {
+    grpc_slice_buffer_destroy_internal(exec_ctx, s->decompressed_data_buffer);
+    gpr_free(s->decompressed_data_buffer);
+  }
 
   grpc_chttp2_list_remove_stalled_by_transport(t, s);
   grpc_chttp2_list_remove_stalled_by_stream(t, s);
@@ -780,6 +788,15 @@
   grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
   grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
 
+  if (s->stream_compression_ctx != NULL) {
+    grpc_stream_compression_context_destroy(s->stream_compression_ctx);
+    s->stream_compression_ctx = NULL;
+  }
+  if (s->stream_decompression_ctx != NULL) {
+    grpc_stream_compression_context_destroy(s->stream_decompression_ctx);
+    s->stream_decompression_ctx = NULL;
+  }
+
   s->destroy_stream_arg = then_schedule_closure;
   GRPC_CLOSURE_SCHED(
       exec_ctx, GRPC_CLOSURE_INIT(&s->destroy_stream, destroy_stream_locked, s,
@@ -1173,6 +1190,7 @@
       return;  /* early out */
     }
     if (s->fetched_send_message_length == s->fetching_send_message->length) {
+      grpc_byte_stream_destroy(exec_ctx, s->fetching_send_message);
       int64_t notify_offset = s->next_message_end_offset;
       if (notify_offset <= s->flow_controlled_bytes_written) {
         grpc_chttp2_complete_closure_step(
@@ -1195,9 +1213,14 @@
       return; /* early out */
     } else if (grpc_byte_stream_next(exec_ctx, s->fetching_send_message,
                                      UINT32_MAX, &s->complete_fetch_locked)) {
-      grpc_byte_stream_pull(exec_ctx, s->fetching_send_message,
-                            &s->fetching_slice);
-      add_fetched_slice_locked(exec_ctx, t, s);
+      grpc_error *error = grpc_byte_stream_pull(
+          exec_ctx, s->fetching_send_message, &s->fetching_slice);
+      if (error != GRPC_ERROR_NONE) {
+        grpc_byte_stream_destroy(exec_ctx, s->fetching_send_message);
+        grpc_chttp2_cancel_stream(exec_ctx, t, s, error);
+      } else {
+        add_fetched_slice_locked(exec_ctx, t, s);
+      }
     }
   }
 }
@@ -1214,10 +1237,9 @@
       continue_fetching_send_locked(exec_ctx, t, s);
     }
   }
-
   if (error != GRPC_ERROR_NONE) {
-    /* TODO(ctiller): what to do here */
-    abort();
+    grpc_byte_stream_destroy(exec_ctx, s->fetching_send_message);
+    grpc_chttp2_cancel_stream(exec_ctx, t, s, error);
   }
 }
 
@@ -1362,8 +1384,8 @@
           "fetching_send_message_finished");
     } else {
       GPR_ASSERT(s->fetching_send_message == NULL);
-      uint8_t *frame_hdr =
-          grpc_slice_buffer_tiny_add(&s->flow_controlled_buffer, 5);
+      uint8_t *frame_hdr = grpc_slice_buffer_tiny_add(
+          &s->flow_controlled_buffer, GRPC_HEADER_SIZE_IN_BYTES);
       uint32_t flags = op_payload->send_message.send_message->flags;
       frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0;
       size_t len = op_payload->send_message.send_message->length;
@@ -1454,14 +1476,9 @@
     s->recv_message_ready = op_payload->recv_message.recv_message_ready;
     s->recv_message = op_payload->recv_message.recv_message;
     if (s->id != 0) {
-      if (s->pending_byte_stream) {
-        already_received = s->frame_storage.length;
-      } else {
-        already_received = s->frame_storage.length +
-                           s->unprocessed_incoming_frames_buffer.length;
-      }
-      incoming_byte_stream_update_flow_control(exec_ctx, t, s, 5,
-                                               already_received);
+      already_received = s->frame_storage.length;
+      incoming_byte_stream_update_flow_control(
+          exec_ctx, t, s, GRPC_HEADER_SIZE_IN_BYTES, already_received);
     }
     grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
   }
@@ -1698,10 +1715,43 @@
         if (s->unprocessed_incoming_frames_buffer.length == 0) {
           grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer,
                                  &s->frame_storage);
+          s->unprocessed_incoming_frames_decompressed = false;
         }
-        error = grpc_deframe_unprocessed_incoming_frames(
-            exec_ctx, &s->data_parser, s,
-            &s->unprocessed_incoming_frames_buffer, NULL, s->recv_message);
+        if (s->stream_compression_recv_enabled &&
+            !s->unprocessed_incoming_frames_decompressed) {
+          GPR_ASSERT(s->decompressed_data_buffer->length == 0);
+          bool end_of_context;
+          if (!s->stream_decompression_ctx) {
+            s->stream_decompression_ctx =
+                grpc_stream_compression_context_create(
+                    GRPC_STREAM_COMPRESSION_DECOMPRESS);
+          }
+          if (!grpc_stream_decompress(s->stream_decompression_ctx,
+                                      &s->unprocessed_incoming_frames_buffer,
+                                      s->decompressed_data_buffer, NULL,
+                                      GRPC_HEADER_SIZE_IN_BYTES,
+                                      &end_of_context)) {
+            grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
+                                                       &s->frame_storage);
+            grpc_slice_buffer_reset_and_unref_internal(
+                exec_ctx, &s->unprocessed_incoming_frames_buffer);
+            error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+                "Stream decompression error.");
+          } else {
+            error = grpc_deframe_unprocessed_incoming_frames(
+                exec_ctx, &s->data_parser, s, s->decompressed_data_buffer, NULL,
+                s->recv_message);
+            if (end_of_context) {
+              grpc_stream_compression_context_destroy(
+                  s->stream_decompression_ctx);
+              s->stream_decompression_ctx = NULL;
+            }
+          }
+        } else {
+          error = grpc_deframe_unprocessed_incoming_frames(
+              exec_ctx, &s->data_parser, s,
+              &s->unprocessed_incoming_frames_buffer, NULL, s->recv_message);
+        }
         if (error != GRPC_ERROR_NONE) {
           s->seen_error = true;
           grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
@@ -1739,7 +1789,37 @@
     }
     bool pending_data = s->pending_byte_stream ||
                         s->unprocessed_incoming_frames_buffer.length > 0;
+    if (s->stream_compression_recv_enabled && s->read_closed &&
+        s->frame_storage.length > 0 &&
+        s->unprocessed_incoming_frames_buffer.length == 0 && !pending_data &&
+        !s->seen_error && s->recv_trailing_metadata_finished != NULL) {
+      /* Maybe some SYNC_FLUSH data is left in frame_storage. Consume them and
+       * maybe decompress the next 5 bytes in the stream. */
+      bool end_of_context;
+      if (!s->stream_decompression_ctx) {
+        s->stream_decompression_ctx = grpc_stream_compression_context_create(
+            GRPC_STREAM_COMPRESSION_DECOMPRESS);
+      }
+      if (!grpc_stream_decompress(s->stream_decompression_ctx,
+                                  &s->frame_storage,
+                                  &s->unprocessed_incoming_frames_buffer, NULL,
+                                  GRPC_HEADER_SIZE_IN_BYTES, &end_of_context)) {
+        grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage);
+        grpc_slice_buffer_reset_and_unref_internal(
+            exec_ctx, &s->unprocessed_incoming_frames_buffer);
+        s->seen_error = true;
+      } else {
+        if (s->unprocessed_incoming_frames_buffer.length > 0) {
+          s->unprocessed_incoming_frames_decompressed = true;
+        }
+        if (end_of_context) {
+          grpc_stream_compression_context_destroy(s->stream_decompression_ctx);
+          s->stream_decompression_ctx = NULL;
+        }
+      }
+    }
     if (s->read_closed && s->frame_storage.length == 0 &&
+        s->unprocessed_incoming_frames_buffer.length == 0 &&
         (!pending_data || s->seen_error) &&
         s->recv_trailing_metadata_finished != NULL) {
       grpc_chttp2_incoming_metadata_buffer_publish(
@@ -2607,6 +2687,7 @@
   if (s->frame_storage.length > 0) {
     grpc_slice_buffer_swap(&s->frame_storage,
                            &s->unprocessed_incoming_frames_buffer);
+    s->unprocessed_incoming_frames_decompressed = false;
     GRPC_CLOSURE_SCHED(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE);
   } else if (s->byte_stream_error != GRPC_ERROR_NONE) {
     GRPC_CLOSURE_SCHED(exec_ctx, bs->next_action.on_complete,
@@ -2668,17 +2749,41 @@
   grpc_chttp2_incoming_byte_stream *bs =
       (grpc_chttp2_incoming_byte_stream *)byte_stream;
   grpc_chttp2_stream *s = bs->stream;
+  grpc_error *error;
 
   if (s->unprocessed_incoming_frames_buffer.length > 0) {
-    grpc_error *error = grpc_deframe_unprocessed_incoming_frames(
+    if (s->stream_compression_recv_enabled &&
+        !s->unprocessed_incoming_frames_decompressed) {
+      bool end_of_context;
+      if (!s->stream_decompression_ctx) {
+        s->stream_decompression_ctx = grpc_stream_compression_context_create(
+            GRPC_STREAM_COMPRESSION_DECOMPRESS);
+      }
+      if (!grpc_stream_decompress(s->stream_decompression_ctx,
+                                  &s->unprocessed_incoming_frames_buffer,
+                                  s->decompressed_data_buffer, NULL, MAX_SIZE_T,
+                                  &end_of_context)) {
+        error =
+            GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream decompression error.");
+        return error;
+      }
+      GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0);
+      grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer,
+                             s->decompressed_data_buffer);
+      s->unprocessed_incoming_frames_decompressed = true;
+      if (end_of_context) {
+        grpc_stream_compression_context_destroy(s->stream_decompression_ctx);
+        s->stream_decompression_ctx = NULL;
+      }
+    }
+    error = grpc_deframe_unprocessed_incoming_frames(
         exec_ctx, &s->data_parser, s, &s->unprocessed_incoming_frames_buffer,
         slice, NULL);
     if (error != GRPC_ERROR_NONE) {
       return error;
     }
   } else {
-    grpc_error *error =
-        GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
+    error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
     GRPC_CLOSURE_SCHED(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error));
     return error;
   }
@@ -2686,22 +2791,9 @@
   return GRPC_ERROR_NONE;
 }
 
-static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
-                                         grpc_byte_stream *byte_stream);
-
 static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
                                                 void *byte_stream,
-                                                grpc_error *error_ignored) {
-  grpc_chttp2_incoming_byte_stream *bs = byte_stream;
-  grpc_chttp2_stream *s = bs->stream;
-  grpc_chttp2_transport *t = s->t;
-
-  GPR_ASSERT(bs->base.destroy == incoming_byte_stream_destroy);
-  incoming_byte_stream_unref(exec_ctx, bs);
-  s->pending_byte_stream = false;
-  grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
-  grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
-}
+                                                grpc_error *error_ignored);
 
 static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
                                          grpc_byte_stream *byte_stream) {
@@ -2768,6 +2860,33 @@
   return error;
 }
 
+static void incoming_byte_stream_shutdown(grpc_exec_ctx *exec_ctx,
+                                          grpc_byte_stream *byte_stream,
+                                          grpc_error *error) {
+  grpc_chttp2_incoming_byte_stream *bs =
+      (grpc_chttp2_incoming_byte_stream *)byte_stream;
+  GRPC_ERROR_UNREF(grpc_chttp2_incoming_byte_stream_finished(
+      exec_ctx, bs, error, true /* reset_on_error */));
+}
+
+static const grpc_byte_stream_vtable grpc_chttp2_incoming_byte_stream_vtable = {
+    incoming_byte_stream_next, incoming_byte_stream_pull,
+    incoming_byte_stream_shutdown, incoming_byte_stream_destroy};
+
+static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
+                                                void *byte_stream,
+                                                grpc_error *error_ignored) {
+  grpc_chttp2_incoming_byte_stream *bs = byte_stream;
+  grpc_chttp2_stream *s = bs->stream;
+  grpc_chttp2_transport *t = s->t;
+
+  GPR_ASSERT(bs->base.vtable == &grpc_chttp2_incoming_byte_stream_vtable);
+  incoming_byte_stream_unref(exec_ctx, bs);
+  s->pending_byte_stream = false;
+  grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
+  grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
+}
+
 grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
     grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s,
     uint32_t frame_size, uint32_t flags) {
@@ -2776,9 +2895,7 @@
   incoming_byte_stream->base.length = frame_size;
   incoming_byte_stream->remaining_bytes = frame_size;
   incoming_byte_stream->base.flags = flags;
-  incoming_byte_stream->base.next = incoming_byte_stream_next;
-  incoming_byte_stream->base.pull = incoming_byte_stream_pull;
-  incoming_byte_stream->base.destroy = incoming_byte_stream_destroy;
+  incoming_byte_stream->base.vtable = &grpc_chttp2_incoming_byte_stream_vtable;
   gpr_ref_init(&incoming_byte_stream->refs, 2);
   incoming_byte_stream->transport = t;
   incoming_byte_stream->stream = s;
diff --git a/src/core/ext/transport/chttp2/transport/frame_data.c b/src/core/ext/transport/chttp2/transport/frame_data.c
index dead6be..222d217 100644
--- a/src/core/ext/transport/chttp2/transport/frame_data.c
+++ b/src/core/ext/transport/chttp2/transport/frame_data.c
@@ -293,7 +293,6 @@
                                           grpc_chttp2_transport *t,
                                           grpc_chttp2_stream *s,
                                           grpc_slice slice, int is_last) {
-  /* grpc_error *error = parse_inner_buffer(exec_ctx, p, t, s, slice); */
   if (!s->pending_byte_stream) {
     grpc_slice_ref_internal(slice);
     grpc_slice_buffer_add(&s->frame_storage, slice);
@@ -304,6 +303,7 @@
     grpc_slice_buffer_add(&s->unprocessed_incoming_frames_buffer, slice);
     GRPC_CLOSURE_SCHED(exec_ctx, s->on_next, GRPC_ERROR_NONE);
     s->on_next = NULL;
+    s->unprocessed_incoming_frames_decompressed = false;
   } else {
     grpc_slice_ref_internal(slice);
     grpc_slice_buffer_add(&s->frame_storage, slice);
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 4563b78..b538d1d 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -526,6 +526,26 @@
   grpc_chttp2_write_cb *on_write_finished_cbs;
   grpc_chttp2_write_cb *finish_after_write;
   size_t sending_bytes;
+
+  /** Whether stream compression send is enabled */
+  bool stream_compression_recv_enabled;
+  /** Whether stream compression recv is enabled */
+  bool stream_compression_send_enabled;
+  /** Whether bytes stored in unprocessed_incoming_byte_stream is decompressed
+   */
+  bool unprocessed_incoming_frames_decompressed;
+  /** Stream compression decompress context */
+  grpc_stream_compression_context *stream_decompression_ctx;
+  /** Stream compression compress context */
+  grpc_stream_compression_context *stream_compression_ctx;
+
+  /** Buffer storing data that is compressed but not sent */
+  grpc_slice_buffer *compressed_data_buffer;
+  /** Amount of uncompressed bytes sent out when compressed_data_buffer is
+   * emptied */
+  size_t uncompressed_data_size;
+  /** Temporary buffer storing decompressed data */
+  grpc_slice_buffer *decompressed_data_buffer;
 };
 
 /** Transport writing call flow:
@@ -621,6 +641,9 @@
                                        grpc_closure **pclosure,
                                        grpc_error *error, const char *desc);
 
+#define GRPC_HEADER_SIZE_IN_BYTES 5
+#define MAX_SIZE_T (~(size_t)0)
+
 #define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
 #define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \
   (sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING) - 1)
diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c
index 315f2a6..c3ede08 100644
--- a/src/core/ext/transport/chttp2/transport/writing.c
+++ b/src/core/ext/transport/chttp2/transport/writing.c
@@ -303,7 +303,9 @@
     }
     if (sent_initial_metadata) {
       /* send any body bytes, if allowed by flow control */
-      if (s->flow_controlled_buffer.length > 0) {
+      if (s->flow_controlled_buffer.length > 0 ||
+          (s->stream_compression_send_enabled &&
+           s->compressed_data_buffer->length > 0)) {
         uint32_t stream_outgoing_window = (uint32_t)GPR_MAX(
             0,
             s->outgoing_window_delta +
@@ -314,21 +316,63 @@
                        [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
             GPR_MIN(stream_outgoing_window, t->outgoing_window));
         if (max_outgoing > 0) {
-          uint32_t send_bytes =
-              (uint32_t)GPR_MIN(max_outgoing, s->flow_controlled_buffer.length);
-          bool is_last_data_frame =
-              s->fetching_send_message == NULL &&
-              send_bytes == s->flow_controlled_buffer.length;
-          bool is_last_frame =
-              is_last_data_frame && s->send_trailing_metadata != NULL &&
-              grpc_metadata_batch_is_empty(s->send_trailing_metadata);
-          grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, send_bytes,
-                                  is_last_frame, &s->stats.outgoing,
-                                  &t->outbuf);
-          GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, outgoing_window_delta,
-                                        send_bytes);
-          GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, outgoing_window,
-                                           send_bytes);
+          bool is_last_data_frame = false;
+          bool is_last_frame = false;
+          if (s->stream_compression_send_enabled) {
+            while ((s->flow_controlled_buffer.length > 0 ||
+                    s->compressed_data_buffer->length > 0) &&
+                   max_outgoing > 0) {
+              if (s->compressed_data_buffer->length > 0) {
+                uint32_t send_bytes = (uint32_t)GPR_MIN(
+                    max_outgoing, s->compressed_data_buffer->length);
+                is_last_data_frame =
+                    (send_bytes == s->compressed_data_buffer->length &&
+                     s->flow_controlled_buffer.length == 0 &&
+                     s->fetching_send_message == NULL);
+                is_last_frame =
+                    is_last_data_frame && s->send_trailing_metadata != NULL &&
+                    grpc_metadata_batch_is_empty(s->send_trailing_metadata);
+                grpc_chttp2_encode_data(s->id, s->compressed_data_buffer,
+                                        send_bytes, is_last_frame,
+                                        &s->stats.outgoing, &t->outbuf);
+                GRPC_CHTTP2_FLOW_DEBIT_STREAM(
+                    "write", t, s, outgoing_window_delta, send_bytes);
+                GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, outgoing_window,
+                                                 send_bytes);
+                max_outgoing -= send_bytes;
+                if (s->compressed_data_buffer->length == 0) {
+                  s->sending_bytes += s->uncompressed_data_size;
+                }
+              } else {
+                if (s->stream_compression_ctx == NULL) {
+                  s->stream_compression_ctx =
+                      grpc_stream_compression_context_create(
+                          GRPC_STREAM_COMPRESSION_COMPRESS);
+                }
+                s->uncompressed_data_size = s->flow_controlled_buffer.length;
+                GPR_ASSERT(grpc_stream_compress(
+                    s->stream_compression_ctx, &s->flow_controlled_buffer,
+                    s->compressed_data_buffer, NULL, MAX_SIZE_T,
+                    GRPC_STREAM_COMPRESSION_FLUSH_SYNC));
+              }
+            }
+          } else {
+            uint32_t send_bytes = (uint32_t)GPR_MIN(
+                max_outgoing, s->flow_controlled_buffer.length);
+            is_last_data_frame = s->fetching_send_message == NULL &&
+                                 send_bytes == s->flow_controlled_buffer.length;
+            is_last_frame =
+                is_last_data_frame && s->send_trailing_metadata != NULL &&
+                grpc_metadata_batch_is_empty(s->send_trailing_metadata);
+            grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer,
+                                    send_bytes, is_last_frame,
+                                    &s->stats.outgoing, &t->outbuf);
+            GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, outgoing_window_delta,
+                                          send_bytes);
+            GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, outgoing_window,
+                                             send_bytes);
+            s->sending_bytes += send_bytes;
+          }
           t->ping_state.pings_before_data_required =
               t->ping_policy.max_pings_without_data;
           if (!t->is_client) {
@@ -345,9 +389,10 @@
                                                     &s->stats.outgoing));
             }
           }
-          s->sending_bytes += send_bytes;
           now_writing = true;
-          if (s->flow_controlled_buffer.length > 0) {
+          if (s->flow_controlled_buffer.length > 0 ||
+              (s->stream_compression_send_enabled &&
+               s->compressed_data_buffer->length > 0)) {
             GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:fork");
             grpc_chttp2_list_add_writable_stream(t, s);
           }
@@ -361,7 +406,9 @@
       }
       if (s->send_trailing_metadata != NULL &&
           s->fetching_send_message == NULL &&
-          s->flow_controlled_buffer.length == 0) {
+          s->flow_controlled_buffer.length == 0 &&
+          (!s->stream_compression_send_enabled ||
+           s->compressed_data_buffer->length == 0)) {
         GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "sending trailing_metadata"));
         if (grpc_metadata_batch_is_empty(s->send_trailing_metadata)) {
           grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, 0, true,
diff --git a/src/core/ext/transport/inproc/inproc_transport.c b/src/core/ext/transport/inproc/inproc_transport.c
index 1449802..6f4b429 100644
--- a/src/core/ext/transport/inproc/inproc_transport.c
+++ b/src/core/ext/transport/inproc/inproc_transport.c
@@ -72,6 +72,7 @@
 typedef struct {
   grpc_byte_stream base;
   sb_list_entry *le;
+  grpc_error *shutdown_error;
 } inproc_slice_byte_stream;
 
 typedef struct {
@@ -201,24 +202,39 @@
                                                  grpc_byte_stream *bs,
                                                  grpc_slice *slice) {
   inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs;
+  if (stream->shutdown_error != GRPC_ERROR_NONE) {
+    return GRPC_ERROR_REF(stream->shutdown_error);
+  }
   *slice = grpc_slice_buffer_take_first(&stream->le->sb);
   return GRPC_ERROR_NONE;
 }
 
+static void inproc_slice_byte_stream_shutdown(grpc_exec_ctx *exec_ctx,
+                                              grpc_byte_stream *bs,
+                                              grpc_error *error) {
+  inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs;
+  GRPC_ERROR_UNREF(stream->shutdown_error);
+  stream->shutdown_error = error;
+}
+
 static void inproc_slice_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
                                              grpc_byte_stream *bs) {
   inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs;
   sb_list_entry_destroy(exec_ctx, stream->le);
+  GRPC_ERROR_UNREF(stream->shutdown_error);
 }
 
+static const grpc_byte_stream_vtable inproc_slice_byte_stream_vtable = {
+    inproc_slice_byte_stream_next, inproc_slice_byte_stream_pull,
+    inproc_slice_byte_stream_shutdown, inproc_slice_byte_stream_destroy};
+
 void inproc_slice_byte_stream_init(inproc_slice_byte_stream *s,
                                    sb_list_entry *le) {
   s->base.length = (uint32_t)le->sb.length;
   s->base.flags = 0;
-  s->base.next = inproc_slice_byte_stream_next;
-  s->base.pull = inproc_slice_byte_stream_pull;
-  s->base.destroy = inproc_slice_byte_stream_destroy;
+  s->base.vtable = &inproc_slice_byte_stream_vtable;
   s->le = le;
+  s->shutdown_error = GRPC_ERROR_NONE;
 }
 
 static void ref_transport(inproc_transport *t) {
@@ -956,11 +972,18 @@
         GPR_ASSERT(grpc_byte_stream_next(exec_ctx,
                                          op->payload->send_message.send_message,
                                          SIZE_MAX, &unused));
-        grpc_byte_stream_pull(exec_ctx, op->payload->send_message.send_message,
-                              &message_slice);
+        error = grpc_byte_stream_pull(
+            exec_ctx, op->payload->send_message.send_message, &message_slice);
+        if (error != GRPC_ERROR_NONE) {
+          cancel_stream_locked(exec_ctx, s, GRPC_ERROR_REF(error));
+          break;
+        }
+        GPR_ASSERT(error == GRPC_ERROR_NONE);
         remaining -= GRPC_SLICE_LENGTH(message_slice);
         grpc_slice_buffer_add(dest, message_slice);
       } while (remaining != 0);
+      grpc_byte_stream_destroy(exec_ctx,
+                               op->payload->send_message.send_message);
     }
     if (error == GRPC_ERROR_NONE && op->send_trailing_metadata) {
       grpc_metadata_batch *dest = (other == NULL) ? &s->write_buffer_trailing_md
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index 978d7b4..3851993 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -855,8 +855,7 @@
          inconsistent state. If it is the latter, we shold do a 0-timeout poll
          so that the thread comes back quickly from poll to make a second
          attempt at popping. Not doing this can potentially deadlock this
-         thread
-         forever (if the deadline is infinity) */
+         thread forever (if the deadline is infinity) */
       if (cq_event_queue_num_items(&cqd->queue) > 0) {
         iteration_deadline = gpr_time_0(GPR_CLOCK_MONOTONIC);
       }
@@ -869,10 +868,8 @@
       if (cq_event_queue_num_items(&cqd->queue) > 0) {
         /* Go to the beginning of the loop. No point doing a poll because
            (cq->shutdown == true) is only possible when there is no pending
-           work
-           (i.e cq->pending_events == 0) and any outstanding
-           grpc_cq_completion
-           events are already queued on this cq */
+           work (i.e cq->pending_events == 0) and any outstanding completion
+           events should have already been queued on this cq */
         continue;
       }
 
@@ -909,11 +906,6 @@
     is_finished_arg.first_loop = false;
   }
 
-  GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
-  GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "next");
-  grpc_exec_ctx_finish(&exec_ctx);
-  GPR_ASSERT(is_finished_arg.stolen_completion == NULL);
-
   if (cq_event_queue_num_items(&cqd->queue) > 0 &&
       gpr_atm_no_barrier_load(&cqd->pending_events) > 0) {
     gpr_mu_lock(cq->mu);
@@ -921,6 +913,11 @@
     gpr_mu_unlock(cq->mu);
   }
 
+  GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
+  GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "next");
+  grpc_exec_ctx_finish(&exec_ctx);
+  GPR_ASSERT(is_finished_arg.stolen_completion == NULL);
+
   GPR_TIMER_END("grpc_completion_queue_next", 0);
 
   return ret;
diff --git a/src/core/lib/transport/byte_stream.c b/src/core/lib/transport/byte_stream.c
index 3355814..fb03a10 100644
--- a/src/core/lib/transport/byte_stream.c
+++ b/src/core/lib/transport/byte_stream.c
@@ -19,29 +19,37 @@
 #include "src/core/lib/transport/byte_stream.h"
 
 #include <stdlib.h>
+#include <string.h>
 
 #include <grpc/support/log.h>
 
 #include "src/core/lib/slice/slice_internal.h"
 
-int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx,
-                          grpc_byte_stream *byte_stream, size_t max_size_hint,
-                          grpc_closure *on_complete) {
-  return byte_stream->next(exec_ctx, byte_stream, max_size_hint, on_complete);
+bool grpc_byte_stream_next(grpc_exec_ctx *exec_ctx,
+                           grpc_byte_stream *byte_stream, size_t max_size_hint,
+                           grpc_closure *on_complete) {
+  return byte_stream->vtable->next(exec_ctx, byte_stream, max_size_hint,
+                                   on_complete);
 }
 
 grpc_error *grpc_byte_stream_pull(grpc_exec_ctx *exec_ctx,
                                   grpc_byte_stream *byte_stream,
                                   grpc_slice *slice) {
-  return byte_stream->pull(exec_ctx, byte_stream, slice);
+  return byte_stream->vtable->pull(exec_ctx, byte_stream, slice);
+}
+
+void grpc_byte_stream_shutdown(grpc_exec_ctx *exec_ctx,
+                               grpc_byte_stream *byte_stream,
+                               grpc_error *error) {
+  byte_stream->vtable->shutdown(exec_ctx, byte_stream, error);
 }
 
 void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
                               grpc_byte_stream *byte_stream) {
-  byte_stream->destroy(exec_ctx, byte_stream);
+  byte_stream->vtable->destroy(exec_ctx, byte_stream);
 }
 
-/* slice_buffer_stream */
+// grpc_slice_buffer_stream
 
 static bool slice_buffer_stream_next(grpc_exec_ctx *exec_ctx,
                                      grpc_byte_stream *byte_stream,
@@ -56,6 +64,9 @@
                                             grpc_byte_stream *byte_stream,
                                             grpc_slice *slice) {
   grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream;
+  if (stream->shutdown_error != GRPC_ERROR_NONE) {
+    return GRPC_ERROR_REF(stream->shutdown_error);
+  }
   GPR_ASSERT(stream->cursor < stream->backing_buffer->count);
   *slice =
       grpc_slice_ref_internal(stream->backing_buffer->slices[stream->cursor]);
@@ -63,8 +74,23 @@
   return GRPC_ERROR_NONE;
 }
 
+static void slice_buffer_stream_shutdown(grpc_exec_ctx *exec_ctx,
+                                         grpc_byte_stream *byte_stream,
+                                         grpc_error *error) {
+  grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream;
+  GRPC_ERROR_UNREF(stream->shutdown_error);
+  stream->shutdown_error = error;
+}
+
 static void slice_buffer_stream_destroy(grpc_exec_ctx *exec_ctx,
-                                        grpc_byte_stream *byte_stream) {}
+                                        grpc_byte_stream *byte_stream) {
+  grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream;
+  GRPC_ERROR_UNREF(stream->shutdown_error);
+}
+
+static const grpc_byte_stream_vtable slice_buffer_stream_vtable = {
+    slice_buffer_stream_next, slice_buffer_stream_pull,
+    slice_buffer_stream_shutdown, slice_buffer_stream_destroy};
 
 void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream *stream,
                                    grpc_slice_buffer *slice_buffer,
@@ -72,9 +98,89 @@
   GPR_ASSERT(slice_buffer->length <= UINT32_MAX);
   stream->base.length = (uint32_t)slice_buffer->length;
   stream->base.flags = flags;
-  stream->base.next = slice_buffer_stream_next;
-  stream->base.pull = slice_buffer_stream_pull;
-  stream->base.destroy = slice_buffer_stream_destroy;
+  stream->base.vtable = &slice_buffer_stream_vtable;
   stream->backing_buffer = slice_buffer;
   stream->cursor = 0;
+  stream->shutdown_error = GRPC_ERROR_NONE;
+}
+
+// grpc_caching_byte_stream
+
+void grpc_byte_stream_cache_init(grpc_byte_stream_cache *cache,
+                                 grpc_byte_stream *underlying_stream) {
+  cache->underlying_stream = underlying_stream;
+  grpc_slice_buffer_init(&cache->cache_buffer);
+}
+
+void grpc_byte_stream_cache_destroy(grpc_exec_ctx *exec_ctx,
+                                    grpc_byte_stream_cache *cache) {
+  grpc_byte_stream_destroy(exec_ctx, cache->underlying_stream);
+  grpc_slice_buffer_destroy_internal(exec_ctx, &cache->cache_buffer);
+}
+
+static bool caching_byte_stream_next(grpc_exec_ctx *exec_ctx,
+                                     grpc_byte_stream *byte_stream,
+                                     size_t max_size_hint,
+                                     grpc_closure *on_complete) {
+  grpc_caching_byte_stream *stream = (grpc_caching_byte_stream *)byte_stream;
+  if (stream->shutdown_error != GRPC_ERROR_NONE) return true;
+  if (stream->cursor < stream->cache->cache_buffer.count) return true;
+  return grpc_byte_stream_next(exec_ctx, stream->cache->underlying_stream,
+                               max_size_hint, on_complete);
+}
+
+static grpc_error *caching_byte_stream_pull(grpc_exec_ctx *exec_ctx,
+                                            grpc_byte_stream *byte_stream,
+                                            grpc_slice *slice) {
+  grpc_caching_byte_stream *stream = (grpc_caching_byte_stream *)byte_stream;
+  if (stream->shutdown_error != GRPC_ERROR_NONE) {
+    return GRPC_ERROR_REF(stream->shutdown_error);
+  }
+  if (stream->cursor < stream->cache->cache_buffer.count) {
+    *slice = grpc_slice_ref_internal(
+        stream->cache->cache_buffer.slices[stream->cursor]);
+    ++stream->cursor;
+    return GRPC_ERROR_NONE;
+  }
+  grpc_error *error =
+      grpc_byte_stream_pull(exec_ctx, stream->cache->underlying_stream, slice);
+  if (error == GRPC_ERROR_NONE) {
+    ++stream->cursor;
+    grpc_slice_buffer_add(&stream->cache->cache_buffer,
+                          grpc_slice_ref_internal(*slice));
+  }
+  return error;
+}
+
+static void caching_byte_stream_shutdown(grpc_exec_ctx *exec_ctx,
+                                         grpc_byte_stream *byte_stream,
+                                         grpc_error *error) {
+  grpc_caching_byte_stream *stream = (grpc_caching_byte_stream *)byte_stream;
+  GRPC_ERROR_UNREF(stream->shutdown_error);
+  stream->shutdown_error = GRPC_ERROR_REF(error);
+  grpc_byte_stream_shutdown(exec_ctx, stream->cache->underlying_stream, error);
+}
+
+static void caching_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
+                                        grpc_byte_stream *byte_stream) {
+  grpc_caching_byte_stream *stream = (grpc_caching_byte_stream *)byte_stream;
+  GRPC_ERROR_UNREF(stream->shutdown_error);
+}
+
+static const grpc_byte_stream_vtable caching_byte_stream_vtable = {
+    caching_byte_stream_next, caching_byte_stream_pull,
+    caching_byte_stream_shutdown, caching_byte_stream_destroy};
+
+void grpc_caching_byte_stream_init(grpc_caching_byte_stream *stream,
+                                   grpc_byte_stream_cache *cache) {
+  memset(stream, 0, sizeof(*stream));
+  stream->base.length = cache->underlying_stream->length;
+  stream->base.flags = cache->underlying_stream->flags;
+  stream->base.vtable = &caching_byte_stream_vtable;
+  stream->cache = cache;
+  stream->shutdown_error = GRPC_ERROR_NONE;
+}
+
+void grpc_caching_byte_stream_reset(grpc_caching_byte_stream *stream) {
+  stream->cursor = 0;
 }
diff --git a/src/core/lib/transport/byte_stream.h b/src/core/lib/transport/byte_stream.h
index f172296..1e1e831 100644
--- a/src/core/lib/transport/byte_stream.h
+++ b/src/core/lib/transport/byte_stream.h
@@ -28,52 +28,109 @@
 /** Mask of all valid internal flags. */
 #define GRPC_WRITE_INTERNAL_USED_MASK (GRPC_WRITE_INTERNAL_COMPRESS)
 
-struct grpc_byte_stream;
 typedef struct grpc_byte_stream grpc_byte_stream;
 
-struct grpc_byte_stream {
-  uint32_t length;
-  uint32_t flags;
+typedef struct {
   bool (*next)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream,
                size_t max_size_hint, grpc_closure *on_complete);
   grpc_error *(*pull)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream,
                       grpc_slice *slice);
+  void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream,
+                   grpc_error *error);
   void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream);
+} grpc_byte_stream_vtable;
+
+struct grpc_byte_stream {
+  uint32_t length;
+  uint32_t flags;
+  const grpc_byte_stream_vtable *vtable;
 };
 
-/* returns 1 if the bytes are available immediately (in which case
- * on_complete will not be called), 0 if the bytes will be available
- * asynchronously.
- *
- * max_size_hint can be set as a hint as to the maximum number
- * of bytes that would be acceptable to read.
- */
-int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx,
-                          grpc_byte_stream *byte_stream, size_t max_size_hint,
-                          grpc_closure *on_complete);
+// Returns true if the bytes are available immediately (in which case
+// on_complete will not be called), false if the bytes will be available
+// asynchronously.
+//
+// max_size_hint can be set as a hint as to the maximum number
+// of bytes that would be acceptable to read.
+bool grpc_byte_stream_next(grpc_exec_ctx *exec_ctx,
+                           grpc_byte_stream *byte_stream, size_t max_size_hint,
+                           grpc_closure *on_complete);
 
-/* returns the next slice in the byte stream when it is ready (indicated by
- * either grpc_byte_stream_next returning 1 or on_complete passed to
- * grpc_byte_stream_next is called).
- *
- * once a slice is returned into *slice, it is owned by the caller.
- */
+// Returns the next slice in the byte stream when it is ready (indicated by
+// either grpc_byte_stream_next returning true or on_complete passed to
+// grpc_byte_stream_next is called).
+//
+// Once a slice is returned into *slice, it is owned by the caller.
 grpc_error *grpc_byte_stream_pull(grpc_exec_ctx *exec_ctx,
                                   grpc_byte_stream *byte_stream,
                                   grpc_slice *slice);
 
+// Shuts down the byte stream.
+//
+// If there is a pending call to on_complete from grpc_byte_stream_next(),
+// it will be invoked with the error passed to grpc_byte_stream_shutdown().
+//
+// The next call to grpc_byte_stream_pull() (if any) will return the error
+// passed to grpc_byte_stream_shutdown().
+void grpc_byte_stream_shutdown(grpc_exec_ctx *exec_ctx,
+                               grpc_byte_stream *byte_stream,
+                               grpc_error *error);
+
 void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
                               grpc_byte_stream *byte_stream);
 
-/* grpc_byte_stream that wraps a slice buffer */
+// grpc_slice_buffer_stream
+//
+// A grpc_byte_stream that wraps a slice buffer.
+
 typedef struct grpc_slice_buffer_stream {
   grpc_byte_stream base;
   grpc_slice_buffer *backing_buffer;
   size_t cursor;
+  grpc_error *shutdown_error;
 } grpc_slice_buffer_stream;
 
 void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream *stream,
                                    grpc_slice_buffer *slice_buffer,
                                    uint32_t flags);
 
+// grpc_caching_byte_stream
+//
+// A grpc_byte_stream that that wraps an underlying byte stream but caches
+// the resulting slices in a slice buffer.  If an initial attempt fails
+// without fully draining the underlying stream, a new caching stream
+// can be created from the same underlying cache, in which case it will
+// return whatever is in the backing buffer before continuing to read the
+// underlying stream.
+//
+// NOTE: No synchronization is done, so it is not safe to have multiple
+// grpc_caching_byte_streams simultaneously drawing from the same underlying
+// grpc_byte_stream_cache at the same time.
+
+typedef struct {
+  grpc_byte_stream *underlying_stream;
+  grpc_slice_buffer cache_buffer;
+} grpc_byte_stream_cache;
+
+// Takes ownership of underlying_stream.
+void grpc_byte_stream_cache_init(grpc_byte_stream_cache *cache,
+                                 grpc_byte_stream *underlying_stream);
+
+// Must not be called while still in use by a grpc_caching_byte_stream.
+void grpc_byte_stream_cache_destroy(grpc_exec_ctx *exec_ctx,
+                                    grpc_byte_stream_cache *cache);
+
+typedef struct {
+  grpc_byte_stream base;
+  grpc_byte_stream_cache *cache;
+  size_t cursor;
+  grpc_error *shutdown_error;
+} grpc_caching_byte_stream;
+
+void grpc_caching_byte_stream_init(grpc_caching_byte_stream *stream,
+                                   grpc_byte_stream_cache *cache);
+
+// Resets the byte stream to the start of the underlying stream.
+void grpc_caching_byte_stream_reset(grpc_caching_byte_stream *stream);
+
 #endif /* GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H */
diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c
index 7281602..6c61f4b 100644
--- a/src/core/lib/transport/transport.c
+++ b/src/core/lib/transport/transport.c
@@ -207,27 +207,35 @@
   return transport->vtable->get_endpoint(exec_ctx, transport);
 }
 
+// This comment should be sung to the tune of
+// "Supercalifragilisticexpialidocious":
+//
 // grpc_transport_stream_op_batch_finish_with_failure
 // is a function that must always unref cancel_error
 // though it lives in lib, it handles transport stream ops sure
 // it's grpc_transport_stream_op_batch_finish_with_failure
 
 void grpc_transport_stream_op_batch_finish_with_failure(
-    grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *op,
+    grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *batch,
     grpc_error *error) {
-  if (op->recv_message) {
-    GRPC_CLOSURE_SCHED(exec_ctx, op->payload->recv_message.recv_message_ready,
+  if (batch->send_message) {
+    grpc_byte_stream_destroy(exec_ctx,
+                             batch->payload->send_message.send_message);
+  }
+  if (batch->recv_message) {
+    GRPC_CLOSURE_SCHED(exec_ctx,
+                       batch->payload->recv_message.recv_message_ready,
                        GRPC_ERROR_REF(error));
   }
-  if (op->recv_initial_metadata) {
+  if (batch->recv_initial_metadata) {
     GRPC_CLOSURE_SCHED(
         exec_ctx,
-        op->payload->recv_initial_metadata.recv_initial_metadata_ready,
+        batch->payload->recv_initial_metadata.recv_initial_metadata_ready,
         GRPC_ERROR_REF(error));
   }
-  GRPC_CLOSURE_SCHED(exec_ctx, op->on_complete, error);
-  if (op->cancel_stream) {
-    GRPC_ERROR_UNREF(op->payload->cancel_stream.cancel_error);
+  GRPC_CLOSURE_SCHED(exec_ctx, batch->on_complete, error);
+  if (batch->cancel_stream) {
+    GRPC_ERROR_UNREF(batch->payload->cancel_stream.cancel_error);
   }
 }
 
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index 84e53e6..099138e 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -159,6 +159,11 @@
   } send_trailing_metadata;
 
   struct {
+    // The transport (or a filter that decides to return a failure before
+    // the op gets down to the transport) is responsible for calling
+    // grpc_byte_stream_destroy() on this.
+    // The batch's on_complete will not be called until after the byte
+    // stream is destroyed.
     grpc_byte_stream *send_message;
   } send_message;
 
@@ -174,6 +179,10 @@
   } recv_initial_metadata;
 
   struct {
+    // Will be set by the transport to point to the byte stream
+    // containing a received message.
+    // The caller is responsible for calling grpc_byte_stream_destroy()
+    // on this byte stream.
     grpc_byte_stream **recv_message;
     /** Should be enqueued when one message is ready to be processed. */
     grpc_closure *recv_message_ready;
diff --git a/src/csharp/Grpc.Core.Tests/ThreadingModelTest.cs b/src/csharp/Grpc.Core.Tests/ThreadingModelTest.cs
new file mode 100644
index 0000000..fb18198
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/ThreadingModelTest.cs
@@ -0,0 +1,98 @@
+#region Copyright notice and license
+
+// Copyright 2015 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#endregion
+
+using System;
+using NUnit.Framework;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Grpc.Core.Tests
+{
+    public class ThreadingModelTest
+    {
+        const string Host = "127.0.0.1";
+
+        MockServiceHelper helper;
+        Server server;
+        Channel channel;
+
+        [SetUp]
+        public void Init()
+        {
+            helper = new MockServiceHelper(Host);
+            server = helper.GetServer();
+            server.Start();
+            channel = helper.GetChannel();
+        }
+
+        [TearDown]
+        public void Cleanup()
+        {
+            channel.ShutdownAsync().Wait();
+            server.ShutdownAsync().Wait();
+        }
+
+        [Test]
+        public void BlockingCallInServerHandlerDoesNotDeadlock()
+        {
+            helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
+            {
+                int recursionDepth = int.Parse(request);
+                if (recursionDepth <= 0) {
+                    return "SUCCESS";
+                }
+                return Calls.BlockingUnaryCall(helper.CreateUnaryCall(), (recursionDepth - 1).ToString());
+            });
+
+            int maxRecursionDepth = Environment.ProcessorCount * 2;  // make sure we have more pending blocking calls than threads in GrpcThreadPool
+            Assert.AreEqual("SUCCESS", Calls.BlockingUnaryCall(helper.CreateUnaryCall(), maxRecursionDepth.ToString()));
+        }
+
+        [Test]
+        public void HandlerDoesNotRunOnGrpcThread()
+        {
+            helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
+            {
+                if (IsRunningOnGrpcThreadPool()) {
+                    return "Server handler should not run on gRPC threadpool thread.";
+                }
+                return request;
+            });
+
+            Assert.AreEqual("ABC", Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "ABC"));
+        }
+
+        [Test]
+        public async Task ContinuationDoesNotRunOnGrpcThread()
+        {
+            helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
+            {
+                return request;
+            });
+
+            await Calls.AsyncUnaryCall(helper.CreateUnaryCall(), "ABC");
+            Assert.IsFalse(IsRunningOnGrpcThreadPool());
+        }
+
+        private static bool IsRunningOnGrpcThreadPool()
+        {
+            var threadName = Thread.CurrentThread.Name ?? "";
+            return threadName.Contains("grpc");
+        }
+    }
+}
diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj
index 5035829..e32711c 100755
--- a/src/csharp/Grpc.Core/Grpc.Core.csproj
+++ b/src/csharp/Grpc.Core/Grpc.Core.csproj
@@ -64,6 +64,7 @@
   <ItemGroup Condition=" '$(TargetFramework)' == 'netstandard1.5' ">
     <PackageReference Include="System.Runtime.Loader" Version="4.0.0" />
     <PackageReference Include="System.Threading.Thread" Version="4.0.0" />
+    <PackageReference Include="System.Threading.ThreadPool" Version="4.0.0" />
   </ItemGroup>
 
   <Import Project="NativeDeps.csproj.include" />
diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs
index 8d0c66a..0663ee9 100644
--- a/src/csharp/Grpc.Core/GrpcEnvironment.cs
+++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs
@@ -39,6 +39,7 @@
         static int refCount;
         static int? customThreadPoolSize;
         static int? customCompletionQueueCount;
+        static bool inlineHandlers;
         static readonly HashSet<Channel> registeredChannels = new HashSet<Channel>();
         static readonly HashSet<Server> registeredServers = new HashSet<Server>();
 
@@ -218,12 +219,31 @@
         }
 
         /// <summary>
+        /// By default, gRPC's internal event handlers get offloaded to .NET default thread pool thread (<c>inlineHandlers=false</c>).
+        /// Setting <c>inlineHandlers</c> to <c>true</c> will allow scheduling the event handlers directly to
+        /// <c>GrpcThreadPool</c> internal threads. That can lead to significant performance gains in some situations,
+        /// but requires user to never block in async code (incorrectly written code can easily lead to deadlocks).
+        /// Inlining handlers is an advanced setting and you should only use it if you know what you are doing.
+        /// Most users should rely on the default value provided by gRPC library.
+        /// Note: this method is part of an experimental API that can change or be removed without any prior notice.
+        /// Note: <c>inlineHandlers=true</c> was the default in gRPC C# v1.4.x and earlier.
+        /// </summary>
+        public static void SetHandlerInlining(bool inlineHandlers)
+        {
+            lock (staticLock)
+            {
+                GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized");
+                GrpcEnvironment.inlineHandlers = inlineHandlers;
+            }
+        }
+
+        /// <summary>
         /// Creates gRPC environment.
         /// </summary>
         private GrpcEnvironment()
         {
             GrpcNativeInit();
-            threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault(), GetCompletionQueueCountOrDefault());
+            threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault(), GetCompletionQueueCountOrDefault(), inlineHandlers);
             threadPool.Start();
         }
 
diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
index f9ae77c..19b44c2 100644
--- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
+++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
@@ -33,12 +33,15 @@
     internal class GrpcThreadPool
     {
         static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<GrpcThreadPool>();
+        static readonly WaitCallback RunCompletionQueueEventCallbackSuccess = new WaitCallback((callback) => RunCompletionQueueEventCallback((OpCompletionDelegate) callback, true));
+        static readonly WaitCallback RunCompletionQueueEventCallbackFailure = new WaitCallback((callback) => RunCompletionQueueEventCallback((OpCompletionDelegate) callback, false));
 
         readonly GrpcEnvironment environment;
         readonly object myLock = new object();
         readonly List<Thread> threads = new List<Thread>();
         readonly int poolSize;
         readonly int completionQueueCount;
+        readonly bool inlineHandlers;
 
         readonly List<BasicProfiler> threadProfilers = new List<BasicProfiler>();  // profilers assigned to threadpool threads
 
@@ -52,11 +55,13 @@
         /// <param name="environment">Environment.</param>
         /// <param name="poolSize">Pool size.</param>
         /// <param name="completionQueueCount">Completion queue count.</param>
-        public GrpcThreadPool(GrpcEnvironment environment, int poolSize, int completionQueueCount)
+        /// <param name="inlineHandlers">Handler inlining.</param>
+        public GrpcThreadPool(GrpcEnvironment environment, int poolSize, int completionQueueCount, bool inlineHandlers)
         {
             this.environment = environment;
             this.poolSize = poolSize;
             this.completionQueueCount = completionQueueCount;
+            this.inlineHandlers = inlineHandlers;
             GrpcPreconditions.CheckArgument(poolSize >= completionQueueCount,
                 "Thread pool size cannot be smaller than the number of completion queues used.");
         }
@@ -165,11 +170,19 @@
                     try
                     {
                         var callback = cq.CompletionRegistry.Extract(tag);
-                        callback(success);
+                        // Use cached delegates to avoid unnecessary allocations
+                        if (!inlineHandlers)
+                        {
+                            ThreadPool.QueueUserWorkItem(success ? RunCompletionQueueEventCallbackSuccess : RunCompletionQueueEventCallbackFailure, callback);
+                        }
+                        else
+                        {
+                            RunCompletionQueueEventCallback(callback, success);
+                        }
                     }
                     catch (Exception e)
                     {
-                        Logger.Error(e, "Exception occured while invoking completion delegate");
+                        Logger.Error(e, "Exception occured while extracting event from completion registry.");
                     }
                 }
             }
@@ -186,5 +199,17 @@
             }
             return list.AsReadOnly();
         }
+
+        private static void RunCompletionQueueEventCallback(OpCompletionDelegate callback, bool success)
+        {
+            try
+            {
+                callback(success);
+            }
+            catch (Exception e)
+            {
+                Logger.Error(e, "Exception occured while invoking completion delegate");
+            }
+        }
     }
 }
diff --git a/src/csharp/Grpc.IntegrationTesting/QpsWorker.cs b/src/csharp/Grpc.IntegrationTesting/QpsWorker.cs
index 7009a93..a579fb8 100644
--- a/src/csharp/Grpc.IntegrationTesting/QpsWorker.cs
+++ b/src/csharp/Grpc.IntegrationTesting/QpsWorker.cs
@@ -63,11 +63,6 @@
 
         private async Task RunAsync()
         {
-            // (ThreadPoolSize == ProcessorCount) gives best throughput in benchmarks
-            // and doesn't seem to harm performance even when server and client
-            // are running on the same machine.
-            GrpcEnvironment.SetThreadPoolSize(Environment.ProcessorCount);
-
             string host = "0.0.0.0";
             int port = options.DriverPort;
 
diff --git a/src/csharp/tests.json b/src/csharp/tests.json
index bc6adbb..7841051 100644
--- a/src/csharp/tests.json
+++ b/src/csharp/tests.json
@@ -31,6 +31,7 @@
     "Grpc.Core.Tests.ShutdownHookPendingCallTest",
     "Grpc.Core.Tests.ShutdownHookServerTest",
     "Grpc.Core.Tests.ShutdownTest",
+    "Grpc.Core.Tests.ThreadingModelTest",
     "Grpc.Core.Tests.TimeoutsTest",
     "Grpc.Core.Tests.UserAgentStringTest"
   ],
diff --git "a/src/objective-c/\041ProtoCompiler-gRPCPlugin.podspec" "b/src/objective-c/\041ProtoCompiler-gRPCPlugin.podspec"
index 22527d1..345eecc 100644
--- "a/src/objective-c/\041ProtoCompiler-gRPCPlugin.podspec"
+++ "b/src/objective-c/\041ProtoCompiler-gRPCPlugin.podspec"
@@ -101,7 +101,7 @@
   s.preserve_paths = plugin
 
   # Restrict the protoc version to the one supported by this plugin.
-  s.dependency '!ProtoCompiler', '3.2.0'
+  s.dependency '!ProtoCompiler', '3.3.0'
   # For the Protobuf dependency not to complain:
   s.ios.deployment_target = '7.0'
   s.osx.deployment_target = '10.9'
diff --git "a/src/objective-c/\041ProtoCompiler.podspec" "b/src/objective-c/\041ProtoCompiler.podspec"
index 2e9b944..c3f95f9 100644
--- "a/src/objective-c/\041ProtoCompiler.podspec"
+++ "b/src/objective-c/\041ProtoCompiler.podspec"
@@ -36,7 +36,7 @@
   # exclamation mark ensures that other "regular" pods will be able to find it as it'll be installed
   # before them.
   s.name     = '!ProtoCompiler'
-  v = '3.2.0'
+  v = '3.3.0'
   s.version  = v
   s.summary  = 'The Protobuf Compiler (protoc) generates Objective-C files from .proto files'
   s.description = <<-DESC
diff --git a/src/objective-c/BoringSSL.podspec b/src/objective-c/BoringSSL.podspec
index 651bd49..37798ec 100644
--- a/src/objective-c/BoringSSL.podspec
+++ b/src/objective-c/BoringSSL.podspec
@@ -31,7 +31,7 @@
 
 Pod::Spec.new do |s|
   s.name     = 'BoringSSL'
-  version = '8.2'
+  version = '9.0'
   s.version  = version
   s.summary  = 'BoringSSL is a fork of OpenSSL that is designed to meet Google’s needs.'
   # Adapted from the homepage:
@@ -69,9 +69,7 @@
 
   s.source = {
     :git => 'https://boringssl.googlesource.com/boringssl',
-    # Restore this version name hack in the next version!!
-    # :tag => "version_for_cocoapods_#{version}",
-    :tag => "version_for_cocoapods_8.0",
+    :tag => "version_for_cocoapods_#{version}",
   }
 
   name = 'openssl'
@@ -169,7 +167,6 @@
       #include "hkdf.h"
       #include "md4.h"
       #include "md5.h"
-      #include "newhope.h"
       #include "obj_mac.h"
       #include "objects.h"
       #include "opensslv.h"
@@ -183,7 +180,6 @@
       #include "ripemd.h"
       #include "safestack.h"
       #include "srtp.h"
-      #include "time_support.h"
       #include "x509.h"
       #include "x509v3.h"
     EOF
@@ -389,42 +385,42 @@
           0x28340c19,
           0x283480ac,
           0x283500ea,
-          0x2c3228ca,
-          0x2c32a8d8,
-          0x2c3328ea,
-          0x2c33a8fc,
-          0x2c342910,
-          0x2c34a922,
-          0x2c35293d,
-          0x2c35a94f,
-          0x2c362962,
+          0x2c3229b1,
+          0x2c32a9bf,
+          0x2c3329d1,
+          0x2c33a9e3,
+          0x2c3429f7,
+          0x2c34aa09,
+          0x2c352a24,
+          0x2c35aa36,
+          0x2c362a49,
           0x2c36832d,
-          0x2c37296f,
-          0x2c37a981,
-          0x2c382994,
-          0x2c38a9ab,
-          0x2c3929b9,
-          0x2c39a9c9,
-          0x2c3a29db,
-          0x2c3aa9ef,
-          0x2c3b2a00,
-          0x2c3baa1f,
-          0x2c3c2a33,
-          0x2c3caa49,
-          0x2c3d2a62,
-          0x2c3daa7f,
-          0x2c3e2a90,
-          0x2c3eaa9e,
-          0x2c3f2ab6,
-          0x2c3faace,
-          0x2c402adb,
+          0x2c372a56,
+          0x2c37aa68,
+          0x2c382a7b,
+          0x2c38aa92,
+          0x2c392aa0,
+          0x2c39aab0,
+          0x2c3a2ac2,
+          0x2c3aaad6,
+          0x2c3b2ae7,
+          0x2c3bab06,
+          0x2c3c2b1a,
+          0x2c3cab30,
+          0x2c3d2b49,
+          0x2c3dab66,
+          0x2c3e2b77,
+          0x2c3eab85,
+          0x2c3f2b9d,
+          0x2c3fabb5,
+          0x2c402bc2,
           0x2c4090e7,
-          0x2c412aec,
-          0x2c41aaff,
+          0x2c412bd3,
+          0x2c41abe6,
           0x2c4210c0,
-          0x2c42ab10,
+          0x2c42abf7,
           0x2c430720,
-          0x2c43aa11,
+          0x2c43aaf8,
           0x30320000,
           0x30328015,
           0x3033001f,
@@ -577,180 +573,189 @@
           0x403b9861,
           0x403c0064,
           0x403c8083,
-          0x403d18aa,
-          0x403d98c0,
-          0x403e18cf,
-          0x403e98e2,
-          0x403f18fc,
-          0x403f990a,
-          0x4040191f,
-          0x40409933,
-          0x40411950,
-          0x4041996b,
-          0x40421984,
-          0x40429997,
-          0x404319ab,
-          0x404399c3,
-          0x404419da,
+          0x403d18c1,
+          0x403d98d7,
+          0x403e18e6,
+          0x403e98f9,
+          0x403f1913,
+          0x403f9921,
+          0x40401936,
+          0x4040994a,
+          0x40411967,
+          0x40419982,
+          0x4042199b,
+          0x404299ae,
+          0x404319c2,
+          0x404399da,
+          0x404419f1,
           0x404480ac,
-          0x404519ef,
-          0x40459a01,
-          0x40461a25,
-          0x40469a45,
-          0x40471a53,
-          0x40479a7a,
-          0x40481ab7,
-          0x40489ad0,
-          0x40491ae7,
-          0x40499b01,
-          0x404a1b18,
-          0x404a9b36,
-          0x404b1b4e,
-          0x404b9b65,
-          0x404c1b7b,
-          0x404c9b8d,
-          0x404d1bae,
-          0x404d9bd0,
-          0x404e1be4,
-          0x404e9bf1,
-          0x404f1c1e,
-          0x404f9c47,
-          0x40501c71,
-          0x40509c85,
-          0x40511ca0,
-          0x40519cb0,
-          0x40521cc7,
-          0x40529ceb,
-          0x40531d03,
-          0x40539d16,
-          0x40541d2b,
-          0x40549d4e,
-          0x40551d5c,
-          0x40559d79,
-          0x40561d86,
-          0x40569d9f,
-          0x40571db7,
-          0x40579dca,
-          0x40581ddf,
-          0x40589e06,
-          0x40591e35,
-          0x40599e62,
-          0x405a1e76,
-          0x405a9e86,
-          0x405b1e9e,
-          0x405b9eaf,
-          0x405c1ec2,
-          0x405c9ed3,
-          0x405d1ee0,
-          0x405d9ef7,
-          0x405e1f17,
+          0x40451a06,
+          0x40459a18,
+          0x40461a3c,
+          0x40469a5c,
+          0x40471a6a,
+          0x40479a91,
+          0x40481ace,
+          0x40489ae7,
+          0x40491afe,
+          0x40499b18,
+          0x404a1b2f,
+          0x404a9b4d,
+          0x404b1b65,
+          0x404b9b7c,
+          0x404c1b92,
+          0x404c9ba4,
+          0x404d1bc5,
+          0x404d9be7,
+          0x404e1bfb,
+          0x404e9c08,
+          0x404f1c35,
+          0x404f9c5e,
+          0x40501c99,
+          0x40509cad,
+          0x40511cc8,
+          0x40519cd8,
+          0x40521cef,
+          0x40529d13,
+          0x40531d2b,
+          0x40539d3e,
+          0x40541d53,
+          0x40549d76,
+          0x40551d84,
+          0x40559da1,
+          0x40561dae,
+          0x40569dc7,
+          0x40571ddf,
+          0x40579df2,
+          0x40581e07,
+          0x40589e2e,
+          0x40591e5d,
+          0x40599e8a,
+          0x405a1e9e,
+          0x405a9eae,
+          0x405b1ec6,
+          0x405b9ed7,
+          0x405c1eea,
+          0x405c9f0b,
+          0x405d1f18,
+          0x405d9f2f,
+          0x405e1f6d,
           0x405e8a95,
-          0x405f1f38,
-          0x405f9f45,
-          0x40601f53,
-          0x40609f75,
-          0x40611f9d,
-          0x40619fb2,
-          0x40621fc9,
-          0x40629fda,
-          0x40631feb,
-          0x4063a000,
-          0x40642017,
-          0x4064a043,
-          0x4065205e,
-          0x4065a075,
-          0x4066208d,
-          0x4066a0b7,
-          0x406720e2,
-          0x4067a103,
-          0x40682116,
-          0x4068a137,
-          0x40692169,
-          0x4069a197,
-          0x406a21b8,
-          0x406aa1d8,
-          0x406b2360,
-          0x406ba383,
-          0x406c2399,
-          0x406ca5c5,
-          0x406d25f4,
-          0x406da61c,
-          0x406e264a,
-          0x406ea662,
-          0x406f2681,
-          0x406fa696,
-          0x407026a9,
-          0x4070a6c6,
+          0x405f1f8e,
+          0x405f9f9b,
+          0x40601fa9,
+          0x40609fcb,
+          0x4061200f,
+          0x4061a047,
+          0x4062205e,
+          0x4062a06f,
+          0x40632080,
+          0x4063a095,
+          0x406420ac,
+          0x4064a0d8,
+          0x406520f3,
+          0x4065a10a,
+          0x40662122,
+          0x4066a14c,
+          0x40672177,
+          0x4067a198,
+          0x406821ab,
+          0x4068a1cc,
+          0x406921fe,
+          0x4069a22c,
+          0x406a224d,
+          0x406aa26d,
+          0x406b23f5,
+          0x406ba418,
+          0x406c242e,
+          0x406ca690,
+          0x406d26bf,
+          0x406da6e7,
+          0x406e2715,
+          0x406ea749,
+          0x406f2768,
+          0x406fa77d,
+          0x40702790,
+          0x4070a7ad,
           0x40710800,
-          0x4071a6d8,
-          0x407226eb,
-          0x4072a704,
-          0x4073271c,
+          0x4071a7bf,
+          0x407227d2,
+          0x4072a7eb,
+          0x40732803,
           0x4073936d,
-          0x40742730,
-          0x4074a74a,
-          0x4075275b,
-          0x4075a76f,
-          0x4076277d,
+          0x40742817,
+          0x4074a831,
+          0x40752842,
+          0x4075a856,
+          0x40762864,
           0x407691aa,
-          0x407727a2,
-          0x4077a7c4,
-          0x407827df,
-          0x4078a818,
-          0x4079282f,
-          0x4079a845,
-          0x407a2851,
-          0x407aa864,
-          0x407b2879,
-          0x407ba88b,
-          0x407c28a0,
-          0x407ca8a9,
-          0x407d2152,
-          0x407d9c57,
-          0x407e27f4,
-          0x407e9e16,
-          0x407f1a67,
+          0x40772889,
+          0x4077a8ab,
+          0x407828c6,
+          0x4078a8ff,
+          0x40792916,
+          0x4079a92c,
+          0x407a2938,
+          0x407aa94b,
+          0x407b2960,
+          0x407ba972,
+          0x407c2987,
+          0x407ca990,
+          0x407d21e7,
+          0x407d9c6e,
+          0x407e28db,
+          0x407e9e3e,
+          0x407f1a7e,
           0x407f9887,
-          0x40801c2e,
-          0x40809a8f,
-          0x40811cd9,
-          0x40819c08,
-          0x40822635,
+          0x40801c45,
+          0x40809aa6,
+          0x40811d01,
+          0x40819c1f,
+          0x40822700,
           0x4082986d,
-          0x40831df1,
-          0x4083a028,
-          0x40841aa3,
-          0x40849e4e,
-          0x41f4228b,
-          0x41f9231d,
-          0x41fe2210,
-          0x41fea3ec,
-          0x41ff24dd,
-          0x420322a4,
-          0x420822c6,
-          0x4208a302,
-          0x420921f4,
-          0x4209a33c,
-          0x420a224b,
-          0x420aa22b,
-          0x420b226b,
-          0x420ba2e4,
-          0x420c24f9,
-          0x420ca3b9,
-          0x420d23d3,
-          0x420da40a,
-          0x42122424,
-          0x421724c0,
-          0x4217a466,
-          0x421c2488,
-          0x421f2443,
-          0x42212510,
-          0x422624a3,
-          0x422b25a9,
-          0x422ba572,
-          0x422c2591,
-          0x422ca54c,
-          0x422d252b,
+          0x40831e19,
+          0x4083a0bd,
+          0x40841aba,
+          0x40849e76,
+          0x40851efb,
+          0x40859ff3,
+          0x40861f4f,
+          0x40869c88,
+          0x4087272d,
+          0x4087a024,
+          0x408818aa,
+          0x41f42320,
+          0x41f923b2,
+          0x41fe22a5,
+          0x41fea481,
+          0x41ff2572,
+          0x42032339,
+          0x4208235b,
+          0x4208a397,
+          0x42092289,
+          0x4209a3d1,
+          0x420a22e0,
+          0x420aa2c0,
+          0x420b2300,
+          0x420ba379,
+          0x420c258e,
+          0x420ca44e,
+          0x420d2468,
+          0x420da49f,
+          0x421224b9,
+          0x42172555,
+          0x4217a4fb,
+          0x421c251d,
+          0x421f24d8,
+          0x422125a5,
+          0x42262538,
+          0x422b2674,
+          0x422ba622,
+          0x422c265c,
+          0x422ca5e1,
+          0x422d25c0,
+          0x422da641,
+          0x422e2607,
           0x4432072b,
           0x4432873a,
           0x44330746,
@@ -793,69 +798,69 @@
           0x4c3d136d,
           0x4c3d937c,
           0x4c3e1389,
-          0x50322b22,
-          0x5032ab31,
-          0x50332b3c,
-          0x5033ab4c,
-          0x50342b65,
-          0x5034ab7f,
-          0x50352b8d,
-          0x5035aba3,
-          0x50362bb5,
-          0x5036abcb,
-          0x50372be4,
-          0x5037abf7,
-          0x50382c0f,
-          0x5038ac20,
-          0x50392c35,
-          0x5039ac49,
-          0x503a2c69,
-          0x503aac7f,
-          0x503b2c97,
-          0x503baca9,
-          0x503c2cc5,
-          0x503cacdc,
-          0x503d2cf5,
-          0x503dad0b,
-          0x503e2d18,
-          0x503ead2e,
-          0x503f2d40,
+          0x50322c09,
+          0x5032ac18,
+          0x50332c23,
+          0x5033ac33,
+          0x50342c4c,
+          0x5034ac66,
+          0x50352c74,
+          0x5035ac8a,
+          0x50362c9c,
+          0x5036acb2,
+          0x50372ccb,
+          0x5037acde,
+          0x50382cf6,
+          0x5038ad07,
+          0x50392d1c,
+          0x5039ad30,
+          0x503a2d50,
+          0x503aad66,
+          0x503b2d7e,
+          0x503bad90,
+          0x503c2dac,
+          0x503cadc3,
+          0x503d2ddc,
+          0x503dadf2,
+          0x503e2dff,
+          0x503eae15,
+          0x503f2e27,
           0x503f8382,
-          0x50402d53,
-          0x5040ad63,
-          0x50412d7d,
-          0x5041ad8c,
-          0x50422da6,
-          0x5042adc3,
-          0x50432dd3,
-          0x5043ade3,
-          0x50442df2,
+          0x50402e3a,
+          0x5040ae4a,
+          0x50412e64,
+          0x5041ae73,
+          0x50422e8d,
+          0x5042aeaa,
+          0x50432eba,
+          0x5043aeca,
+          0x50442ed9,
           0x5044843f,
-          0x50452e06,
-          0x5045ae24,
-          0x50462e37,
-          0x5046ae4d,
-          0x50472e5f,
-          0x5047ae74,
-          0x50482e9a,
-          0x5048aea8,
-          0x50492ebb,
-          0x5049aed0,
-          0x504a2ee6,
-          0x504aaef6,
-          0x504b2f16,
-          0x504baf29,
-          0x504c2f4c,
-          0x504caf7a,
-          0x504d2f8c,
-          0x504dafa9,
-          0x504e2fc4,
-          0x504eafe0,
-          0x504f2ff2,
-          0x504fb009,
-          0x50503018,
+          0x50452eed,
+          0x5045af0b,
+          0x50462f1e,
+          0x5046af34,
+          0x50472f46,
+          0x5047af5b,
+          0x50482f81,
+          0x5048af8f,
+          0x50492fa2,
+          0x5049afb7,
+          0x504a2fcd,
+          0x504aafdd,
+          0x504b2ffd,
+          0x504bb010,
+          0x504c3033,
+          0x504cb061,
+          0x504d3073,
+          0x504db090,
+          0x504e30ab,
+          0x504eb0c7,
+          0x504f30d9,
+          0x504fb0f0,
+          0x505030ff,
           0x505086ef,
-          0x5051302b,
+          0x50513112,
           0x58320ec9,
           0x68320e8b,
           0x68328c25,
@@ -1218,6 +1223,7 @@
           "BIO_NOT_SET\\0"
           "BLOCK_CIPHER_PAD_IS_WRONG\\0"
           "BUFFERED_MESSAGES_ON_CIPHER_CHANGE\\0"
+          "CANNOT_PARSE_LEAF_CERT\\0"
           "CA_DN_LENGTH_MISMATCH\\0"
           "CA_DN_TOO_LONG\\0"
           "CCS_RECEIVED_EARLY\\0"
@@ -1261,6 +1267,7 @@
           "INVALID_COMPRESSION_LIST\\0"
           "INVALID_MESSAGE\\0"
           "INVALID_OUTER_RECORD_TYPE\\0"
+          "INVALID_SCT_LIST\\0"
           "INVALID_SSL_SESSION\\0"
           "INVALID_TICKET_KEYS_LENGTH\\0"
           "LENGTH_MISMATCH\\0"
@@ -1290,15 +1297,19 @@
           "NO_RENEGOTIATION\\0"
           "NO_REQUIRED_DIGEST\\0"
           "NO_SHARED_CIPHER\\0"
+          "NO_SHARED_GROUP\\0"
           "NULL_SSL_CTX\\0"
           "NULL_SSL_METHOD_PASSED\\0"
           "OLD_SESSION_CIPHER_NOT_RETURNED\\0"
+          "OLD_SESSION_PRF_HASH_MISMATCH\\0"
           "OLD_SESSION_VERSION_NOT_RETURNED\\0"
           "PARSE_TLSEXT\\0"
           "PATH_TOO_LONG\\0"
           "PEER_DID_NOT_RETURN_A_CERTIFICATE\\0"
           "PEER_ERROR_UNSUPPORTED_CERTIFICATE_TYPE\\0"
+          "PRE_SHARED_KEY_MUST_BE_LAST\\0"
           "PROTOCOL_IS_SHUTDOWN\\0"
+          "PSK_IDENTITY_BINDER_COUNT_MISMATCH\\0"
           "PSK_IDENTITY_NOT_FOUND\\0"
           "PSK_NO_CLIENT_CB\\0"
           "PSK_NO_SERVER_CB\\0"
@@ -1350,7 +1361,9 @@
           "TLSV1_ALERT_USER_CANCELLED\\0"
           "TLSV1_BAD_CERTIFICATE_HASH_VALUE\\0"
           "TLSV1_BAD_CERTIFICATE_STATUS_RESPONSE\\0"
+          "TLSV1_CERTIFICATE_REQUIRED\\0"
           "TLSV1_CERTIFICATE_UNOBTAINABLE\\0"
+          "TLSV1_UNKNOWN_PSK_IDENTITY\\0"
           "TLSV1_UNRECOGNIZED_NAME\\0"
           "TLSV1_UNSUPPORTED_EXTENSION\\0"
           "TLS_PEER_DID_NOT_RESPOND_WITH_CERTIFICATE_LIST\\0"
@@ -1358,6 +1371,7 @@
           "TOO_MANY_EMPTY_FRAGMENTS\\0"
           "TOO_MANY_KEY_UPDATES\\0"
           "TOO_MANY_WARNING_ALERTS\\0"
+          "TOO_MUCH_SKIPPED_EARLY_DATA\\0"
           "UNABLE_TO_FIND_ECDH_PARAMETERS\\0"
           "UNEXPECTED_EXTENSION\\0"
           "UNEXPECTED_MESSAGE\\0"
diff --git a/src/proto/grpc/lb/v1/load_balancer.proto b/src/proto/grpc/lb/v1/load_balancer.proto
index b13b343..0a33568 100644
--- a/src/proto/grpc/lb/v1/load_balancer.proto
+++ b/src/proto/grpc/lb/v1/load_balancer.proto
@@ -67,6 +67,15 @@
   string name = 1;
 }
 
+// Contains the number of calls finished for a particular load balance token.
+message ClientStatsPerToken {
+  // See Server.load_balance_token.
+  string load_balance_token = 1;
+
+  // The total number of RPCs that finished associated with the token.
+  int64 num_calls = 2;
+}
+
 // Contains client level statistics that are useful to load balancing. Each
 // count except the timestamp should be reset to zero after reporting the stats.
 message ClientStats {
@@ -79,20 +88,17 @@
   // The total number of RPCs that finished.
   int64 num_calls_finished = 3;
 
-  // The total number of RPCs that were dropped by the client because of rate
-  // limiting.
-  int64 num_calls_finished_with_drop_for_rate_limiting = 4;
-
-  // The total number of RPCs that were dropped by the client because of load
-  // balancing.
-  int64 num_calls_finished_with_drop_for_load_balancing = 5;
-
   // The total number of RPCs that failed to reach a server except dropped RPCs.
   int64 num_calls_finished_with_client_failed_to_send = 6;
 
   // The total number of RPCs that finished and are known to have been received
   // by a server.
   int64 num_calls_finished_known_received = 7;
+
+  // The list of dropped calls.
+  repeated ClientStatsPerToken calls_finished_with_drop = 8;
+
+  reserved 4, 5;
 }
 
 message LoadBalanceResponse {
@@ -134,10 +140,8 @@
   Duration expiration_interval = 3;
 }
 
-// Contains server information. When none of the [drop_for_*] fields are true,
-// use the other fields. When drop_for_rate_limiting is true, ignore all other
-// fields. Use drop_for_load_balancing only when it is true and
-// drop_for_rate_limiting is false.
+// Contains server information. When the drop field is not true, use the other
+// fields.
 message Server {
   // A resolved address for the server, serialized in network-byte-order. It may
   // either be an IPv4 or IPv6 address.
@@ -149,16 +153,16 @@
   // An opaque but printable token given to the frontend for each pick. All
   // frontend requests for that pick must include the token in its initial
   // metadata. The token is used by the backend to verify the request and to
-  // allow the backend to report load to the gRPC LB system.
+  // allow the backend to report load to the gRPC LB system. The token is also
+  // used in client stats for reporting dropped calls.
   //
   // Its length is variable but less than 50 bytes.
   string load_balance_token = 3;
 
-  // Indicates whether this particular request should be dropped by the client
-  // for rate limiting.
-  bool drop_for_rate_limiting = 4;
+  // Indicates whether this particular request should be dropped by the client.
+  // If the request is dropped, there will be a corresponding entry in
+  // ClientStats.calls_finished_with_drop.
+  bool drop = 4;
 
-  // Indicates whether this particular request should be dropped by the client
-  // for load balancing.
-  bool drop_for_load_balancing = 5;
+  reserved 5;
 }
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index 87b29c2..10eb70b 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -480,7 +480,20 @@
     def bidi_streamer(requests, metadata: {}, &blk)
       raise_error_if_already_executed
       # Metadata might have already been sent if this is an operation view
-      merge_metadata_and_send_if_not_already_sent(metadata)
+      begin
+        merge_metadata_and_send_if_not_already_sent(metadata)
+      rescue GRPC::Core::CallError => e
+        batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil)
+        set_input_stream_done
+        set_output_stream_done
+        attach_status_results_and_complete_call(batch_result)
+        raise e
+      rescue => e
+        set_input_stream_done
+        set_output_stream_done
+        raise e
+      end
+
       bd = BidiCall.new(@call,
                         @marshal,
                         @unmarshal,
diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb
index 42cff40..e1e7a53 100644
--- a/src/ruby/spec/generic/client_stub_spec.rb
+++ b/src/ruby/spec/generic/client_stub_spec.rb
@@ -616,8 +616,22 @@
         th.join
       end
 
-      # TODO: add test for metadata-related ArgumentError in a bidi call once
-      # issue mentioned in https://github.com/grpc/grpc/issues/10526 is fixed
+      it 'should raise ArgumentError if metadata contains invalid values' do
+        @metadata.merge!(k3: 3)
+        stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
+        expect do
+          get_responses(stub).collect { |r| r }
+        end.to raise_error(ArgumentError,
+                           /Header values must be of type string or array/)
+      end
+
+      it 'terminates if the call fails to start' do
+        # don't start the server
+        stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
+        expect do
+          get_responses(stub, deadline: from_relative_time(0)).collect { |r| r }
+        end.to raise_error(GRPC::BadStatus)
+      end
 
       it 'should send metadata to the server ok' do
         th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true,
@@ -630,9 +644,9 @@
     end
 
     describe 'without a call operation' do
-      def get_responses(stub)
+      def get_responses(stub, deadline: nil)
         e = stub.bidi_streamer(@method, @sent_msgs, noop, noop,
-                               metadata: @metadata)
+                               metadata: @metadata, deadline: deadline)
         expect(e).to be_a(Enumerator)
         e
       end
@@ -644,10 +658,10 @@
       after(:each) do
         @op.wait # make sure wait doesn't hang
       end
-      def get_responses(stub, run_start_call_first: false)
+      def get_responses(stub, run_start_call_first: false, deadline: nil)
         @op = stub.bidi_streamer(@method, @sent_msgs, noop, noop,
                                  return_op: true,
-                                 metadata: @metadata)
+                                 metadata: @metadata, deadline: deadline)
         expect(@op).to be_a(GRPC::ActiveCall::Operation)
         @op.start_call if run_start_call_first
         e = @op.execute
diff --git a/templates/gRPC-Core.podspec.template b/templates/gRPC-Core.podspec.template
index 3d54534..cfc13cf 100644
--- a/templates/gRPC-Core.podspec.template
+++ b/templates/gRPC-Core.podspec.template
@@ -135,7 +135,7 @@
       ss.header_mappings_dir = '.'
       ss.libraries = 'z'
       ss.dependency "#{s.name}/Interface", version
-      ss.dependency 'BoringSSL', '~> 8.0'
+      ss.dependency 'BoringSSL', '~> 9.0'
       ss.dependency 'nanopb', '~> 0.3'
 
       # To save you from scrolling, this is the last part of the podspec.
diff --git "a/templates/src/objective-c/\041ProtoCompiler-gRPCPlugin.podspec.template" "b/templates/src/objective-c/\041ProtoCompiler-gRPCPlugin.podspec.template"
index 24c167d..b822341 100644
--- "a/templates/src/objective-c/\041ProtoCompiler-gRPCPlugin.podspec.template"
+++ "b/templates/src/objective-c/\041ProtoCompiler-gRPCPlugin.podspec.template"
@@ -103,7 +103,7 @@
     s.preserve_paths = plugin
 
     # Restrict the protoc version to the one supported by this plugin.
-    s.dependency '!ProtoCompiler', '3.2.0'
+    s.dependency '!ProtoCompiler', '3.3.0'
     # For the Protobuf dependency not to complain:
     s.ios.deployment_target = '7.0'
     s.osx.deployment_target = '10.9'
diff --git a/test/core/transport/BUILD b/test/core/transport/BUILD
index 8091cf9..040c0c3 100644
--- a/test/core/transport/BUILD
+++ b/test/core/transport/BUILD
@@ -36,6 +36,18 @@
 )
 
 grpc_cc_test(
+    name = "byte_stream_test",
+    srcs = ["byte_stream_test.c"],
+    language = "C",
+    deps = [
+        "//:gpr",
+        "//:grpc",
+        "//test/core/util:gpr_test_util",
+        "//test/core/util:grpc_test_util",
+    ],
+)
+
+grpc_cc_test(
     name = "connectivity_state_test",
     srcs = ["connectivity_state_test.c"],
     language = "C",
diff --git a/test/core/transport/byte_stream_test.c b/test/core/transport/byte_stream_test.c
new file mode 100644
index 0000000..a0c5f96
--- /dev/null
+++ b/test/core/transport/byte_stream_test.c
@@ -0,0 +1,279 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "src/core/lib/transport/byte_stream.h"
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/useful.h>
+
+#include "src/core/lib/slice/slice_internal.h"
+
+#include "test/core/util/test_config.h"
+
+//
+// grpc_slice_buffer_stream tests
+//
+
+static void not_called_closure(grpc_exec_ctx *exec_ctx, void *arg,
+                               grpc_error *error) {
+  GPR_ASSERT(false);
+}
+
+static void test_slice_buffer_stream_basic(void) {
+  gpr_log(GPR_DEBUG, "test_slice_buffer_stream_basic");
+  grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+  // Create and populate slice buffer.
+  grpc_slice_buffer buffer;
+  grpc_slice_buffer_init(&buffer);
+  grpc_slice input[] = {
+      grpc_slice_from_static_string("foo"),
+      grpc_slice_from_static_string("bar"),
+  };
+  for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
+    grpc_slice_buffer_add(&buffer, input[i]);
+  }
+  // Create byte stream.
+  grpc_slice_buffer_stream stream;
+  grpc_slice_buffer_stream_init(&stream, &buffer, 0);
+  GPR_ASSERT(stream.base.length == 6);
+  grpc_closure closure;
+  GRPC_CLOSURE_INIT(&closure, not_called_closure, NULL,
+                    grpc_schedule_on_exec_ctx);
+  // Read each slice.  Note that next() always returns synchronously.
+  for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
+    GPR_ASSERT(
+        grpc_byte_stream_next(&exec_ctx, &stream.base, ~(size_t)0, &closure));
+    grpc_slice output;
+    grpc_error *error = grpc_byte_stream_pull(&exec_ctx, &stream.base, &output);
+    GPR_ASSERT(error == GRPC_ERROR_NONE);
+    GPR_ASSERT(grpc_slice_eq(input[i], output));
+    grpc_slice_unref_internal(&exec_ctx, output);
+  }
+  // Clean up.
+  grpc_byte_stream_destroy(&exec_ctx, &stream.base);
+  grpc_slice_buffer_destroy_internal(&exec_ctx, &buffer);
+  grpc_exec_ctx_finish(&exec_ctx);
+}
+
+static void test_slice_buffer_stream_shutdown(void) {
+  gpr_log(GPR_DEBUG, "test_slice_buffer_stream_shutdown");
+  grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+  // Create and populate slice buffer.
+  grpc_slice_buffer buffer;
+  grpc_slice_buffer_init(&buffer);
+  grpc_slice input[] = {
+      grpc_slice_from_static_string("foo"),
+      grpc_slice_from_static_string("bar"),
+  };
+  for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
+    grpc_slice_buffer_add(&buffer, input[i]);
+  }
+  // Create byte stream.
+  grpc_slice_buffer_stream stream;
+  grpc_slice_buffer_stream_init(&stream, &buffer, 0);
+  GPR_ASSERT(stream.base.length == 6);
+  grpc_closure closure;
+  GRPC_CLOSURE_INIT(&closure, not_called_closure, NULL,
+                    grpc_schedule_on_exec_ctx);
+  // Read the first slice.
+  GPR_ASSERT(
+      grpc_byte_stream_next(&exec_ctx, &stream.base, ~(size_t)0, &closure));
+  grpc_slice output;
+  grpc_error *error = grpc_byte_stream_pull(&exec_ctx, &stream.base, &output);
+  GPR_ASSERT(error == GRPC_ERROR_NONE);
+  GPR_ASSERT(grpc_slice_eq(input[0], output));
+  grpc_slice_unref_internal(&exec_ctx, output);
+  // Now shutdown.
+  grpc_error *shutdown_error =
+      GRPC_ERROR_CREATE_FROM_STATIC_STRING("shutdown error");
+  grpc_byte_stream_shutdown(&exec_ctx, &stream.base,
+                            GRPC_ERROR_REF(shutdown_error));
+  // After shutdown, the next pull() should return the error.
+  GPR_ASSERT(
+      grpc_byte_stream_next(&exec_ctx, &stream.base, ~(size_t)0, &closure));
+  error = grpc_byte_stream_pull(&exec_ctx, &stream.base, &output);
+  GPR_ASSERT(error == shutdown_error);
+  GRPC_ERROR_UNREF(error);
+  GRPC_ERROR_UNREF(shutdown_error);
+  // Clean up.
+  grpc_byte_stream_destroy(&exec_ctx, &stream.base);
+  grpc_slice_buffer_destroy_internal(&exec_ctx, &buffer);
+  grpc_exec_ctx_finish(&exec_ctx);
+}
+
+//
+// grpc_caching_byte_stream tests
+//
+
+static void test_caching_byte_stream_basic(void) {
+  gpr_log(GPR_DEBUG, "test_caching_byte_stream_basic");
+  grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+  // Create and populate slice buffer byte stream.
+  grpc_slice_buffer buffer;
+  grpc_slice_buffer_init(&buffer);
+  grpc_slice input[] = {
+      grpc_slice_from_static_string("foo"),
+      grpc_slice_from_static_string("bar"),
+  };
+  for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
+    grpc_slice_buffer_add(&buffer, input[i]);
+  }
+  grpc_slice_buffer_stream underlying_stream;
+  grpc_slice_buffer_stream_init(&underlying_stream, &buffer, 0);
+  // Create cache and caching stream.
+  grpc_byte_stream_cache cache;
+  grpc_byte_stream_cache_init(&cache, &underlying_stream.base);
+  grpc_caching_byte_stream stream;
+  grpc_caching_byte_stream_init(&stream, &cache);
+  grpc_closure closure;
+  GRPC_CLOSURE_INIT(&closure, not_called_closure, NULL,
+                    grpc_schedule_on_exec_ctx);
+  // Read each slice.  Note that next() always returns synchronously,
+  // because the underlying byte stream always does.
+  for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
+    GPR_ASSERT(
+        grpc_byte_stream_next(&exec_ctx, &stream.base, ~(size_t)0, &closure));
+    grpc_slice output;
+    grpc_error *error = grpc_byte_stream_pull(&exec_ctx, &stream.base, &output);
+    GPR_ASSERT(error == GRPC_ERROR_NONE);
+    GPR_ASSERT(grpc_slice_eq(input[i], output));
+    grpc_slice_unref_internal(&exec_ctx, output);
+  }
+  // Clean up.
+  grpc_byte_stream_destroy(&exec_ctx, &stream.base);
+  grpc_byte_stream_cache_destroy(&exec_ctx, &cache);
+  grpc_slice_buffer_destroy_internal(&exec_ctx, &buffer);
+  grpc_exec_ctx_finish(&exec_ctx);
+}
+
+static void test_caching_byte_stream_reset(void) {
+  gpr_log(GPR_DEBUG, "test_caching_byte_stream_reset");
+  grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+  // Create and populate slice buffer byte stream.
+  grpc_slice_buffer buffer;
+  grpc_slice_buffer_init(&buffer);
+  grpc_slice input[] = {
+      grpc_slice_from_static_string("foo"),
+      grpc_slice_from_static_string("bar"),
+  };
+  for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
+    grpc_slice_buffer_add(&buffer, input[i]);
+  }
+  grpc_slice_buffer_stream underlying_stream;
+  grpc_slice_buffer_stream_init(&underlying_stream, &buffer, 0);
+  // Create cache and caching stream.
+  grpc_byte_stream_cache cache;
+  grpc_byte_stream_cache_init(&cache, &underlying_stream.base);
+  grpc_caching_byte_stream stream;
+  grpc_caching_byte_stream_init(&stream, &cache);
+  grpc_closure closure;
+  GRPC_CLOSURE_INIT(&closure, not_called_closure, NULL,
+                    grpc_schedule_on_exec_ctx);
+  // Read one slice.
+  GPR_ASSERT(
+      grpc_byte_stream_next(&exec_ctx, &stream.base, ~(size_t)0, &closure));
+  grpc_slice output;
+  grpc_error *error = grpc_byte_stream_pull(&exec_ctx, &stream.base, &output);
+  GPR_ASSERT(error == GRPC_ERROR_NONE);
+  GPR_ASSERT(grpc_slice_eq(input[0], output));
+  grpc_slice_unref_internal(&exec_ctx, output);
+  // Reset the caching stream.  The reads should start over from the
+  // first slice.
+  grpc_caching_byte_stream_reset(&stream);
+  for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
+    GPR_ASSERT(
+        grpc_byte_stream_next(&exec_ctx, &stream.base, ~(size_t)0, &closure));
+    error = grpc_byte_stream_pull(&exec_ctx, &stream.base, &output);
+    GPR_ASSERT(error == GRPC_ERROR_NONE);
+    GPR_ASSERT(grpc_slice_eq(input[i], output));
+    grpc_slice_unref_internal(&exec_ctx, output);
+  }
+  // Clean up.
+  grpc_byte_stream_destroy(&exec_ctx, &stream.base);
+  grpc_byte_stream_cache_destroy(&exec_ctx, &cache);
+  grpc_slice_buffer_destroy_internal(&exec_ctx, &buffer);
+  grpc_exec_ctx_finish(&exec_ctx);
+}
+
+static void test_caching_byte_stream_shared_cache(void) {
+  gpr_log(GPR_DEBUG, "test_caching_byte_stream_shared_cache");
+  grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+  // Create and populate slice buffer byte stream.
+  grpc_slice_buffer buffer;
+  grpc_slice_buffer_init(&buffer);
+  grpc_slice input[] = {
+      grpc_slice_from_static_string("foo"),
+      grpc_slice_from_static_string("bar"),
+  };
+  for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
+    grpc_slice_buffer_add(&buffer, input[i]);
+  }
+  grpc_slice_buffer_stream underlying_stream;
+  grpc_slice_buffer_stream_init(&underlying_stream, &buffer, 0);
+  // Create cache and two caching streams.
+  grpc_byte_stream_cache cache;
+  grpc_byte_stream_cache_init(&cache, &underlying_stream.base);
+  grpc_caching_byte_stream stream1;
+  grpc_caching_byte_stream_init(&stream1, &cache);
+  grpc_caching_byte_stream stream2;
+  grpc_caching_byte_stream_init(&stream2, &cache);
+  grpc_closure closure;
+  GRPC_CLOSURE_INIT(&closure, not_called_closure, NULL,
+                    grpc_schedule_on_exec_ctx);
+  // Read one slice from stream1.
+  GPR_ASSERT(
+      grpc_byte_stream_next(&exec_ctx, &stream1.base, ~(size_t)0, &closure));
+  grpc_slice output;
+  grpc_error *error = grpc_byte_stream_pull(&exec_ctx, &stream1.base, &output);
+  GPR_ASSERT(error == GRPC_ERROR_NONE);
+  GPR_ASSERT(grpc_slice_eq(input[0], output));
+  grpc_slice_unref_internal(&exec_ctx, output);
+  // Read all slices from stream2.
+  for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
+    GPR_ASSERT(
+        grpc_byte_stream_next(&exec_ctx, &stream2.base, ~(size_t)0, &closure));
+    error = grpc_byte_stream_pull(&exec_ctx, &stream2.base, &output);
+    GPR_ASSERT(error == GRPC_ERROR_NONE);
+    GPR_ASSERT(grpc_slice_eq(input[i], output));
+    grpc_slice_unref_internal(&exec_ctx, output);
+  }
+  // Now read the second slice from stream1.
+  GPR_ASSERT(
+      grpc_byte_stream_next(&exec_ctx, &stream1.base, ~(size_t)0, &closure));
+  error = grpc_byte_stream_pull(&exec_ctx, &stream1.base, &output);
+  GPR_ASSERT(error == GRPC_ERROR_NONE);
+  GPR_ASSERT(grpc_slice_eq(input[1], output));
+  grpc_slice_unref_internal(&exec_ctx, output);
+  // Clean up.
+  grpc_byte_stream_destroy(&exec_ctx, &stream1.base);
+  grpc_byte_stream_destroy(&exec_ctx, &stream2.base);
+  grpc_byte_stream_cache_destroy(&exec_ctx, &cache);
+  grpc_slice_buffer_destroy_internal(&exec_ctx, &buffer);
+  grpc_exec_ctx_finish(&exec_ctx);
+}
+
+int main(int argc, char **argv) {
+  grpc_test_init(argc, argv);
+  test_slice_buffer_stream_basic();
+  test_slice_buffer_stream_shutdown();
+  test_caching_byte_stream_basic();
+  test_caching_byte_stream_reset();
+  test_caching_byte_stream_shared_cache();
+  return 0;
+}
diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc
index 1f3255d..4fef535 100644
--- a/test/cpp/end2end/grpclb_end2end_test.cc
+++ b/test/cpp/end2end/grpclb_end2end_test.cc
@@ -45,6 +45,7 @@
 #include "src/proto/grpc/lb/v1/load_balancer.grpc.pb.h"
 #include "src/proto/grpc/testing/echo.grpc.pb.h"
 
+#include <gmock/gmock.h>
 #include <gtest/gtest.h>
 
 // TODO(dgq): Other scenarios in need of testing:
@@ -131,6 +132,19 @@
     IncreaseResponseCount();
     return status;
   }
+
+  // Returns true on its first invocation, false otherwise.
+  bool Shutdown() {
+    std::unique_lock<std::mutex> lock(mu_);
+    const bool prev = !shutdown_;
+    shutdown_ = true;
+    gpr_log(GPR_INFO, "Backend: shut down");
+    return prev;
+  }
+
+ private:
+  std::mutex mu_;
+  bool shutdown_ = false;
 };
 
 grpc::string Ip4ToPackedString(const char* ip_str) {
@@ -142,22 +156,20 @@
 struct ClientStats {
   size_t num_calls_started = 0;
   size_t num_calls_finished = 0;
-  size_t num_calls_finished_with_drop_for_rate_limiting = 0;
-  size_t num_calls_finished_with_drop_for_load_balancing = 0;
   size_t num_calls_finished_with_client_failed_to_send = 0;
   size_t num_calls_finished_known_received = 0;
+  std::map<grpc::string, size_t> drop_token_counts;
 
   ClientStats& operator+=(const ClientStats& other) {
     num_calls_started += other.num_calls_started;
     num_calls_finished += other.num_calls_finished;
-    num_calls_finished_with_drop_for_rate_limiting +=
-        other.num_calls_finished_with_drop_for_rate_limiting;
-    num_calls_finished_with_drop_for_load_balancing +=
-        other.num_calls_finished_with_drop_for_load_balancing;
     num_calls_finished_with_client_failed_to_send +=
         other.num_calls_finished_with_client_failed_to_send;
     num_calls_finished_known_received +=
         other.num_calls_finished_known_received;
+    for (const auto& p : other.drop_token_counts) {
+      drop_token_counts[p.first] += p.second;
+    }
     return *this;
   }
 };
@@ -173,11 +185,12 @@
         shutdown_(false) {}
 
   Status BalanceLoad(ServerContext* context, Stream* stream) override {
-    gpr_log(GPR_INFO, "LB: BalanceLoad");
+    gpr_log(GPR_INFO, "LB[%p]: BalanceLoad", this);
     LoadBalanceRequest request;
     stream->Read(&request);
     IncreaseRequestCount();
-    gpr_log(GPR_INFO, "LB: recv msg '%s'", request.DebugString().c_str());
+    gpr_log(GPR_INFO, "LB[%p]: recv msg '%s'", this,
+            request.DebugString().c_str());
 
     if (client_load_reporting_interval_seconds_ > 0) {
       LoadBalanceResponse initial_response;
@@ -208,7 +221,7 @@
     if (client_load_reporting_interval_seconds_ > 0) {
       request.Clear();
       stream->Read(&request);
-      gpr_log(GPR_INFO, "LB: recv client load report msg: '%s'",
+      gpr_log(GPR_INFO, "LB[%p]: recv client load report msg: '%s'", this,
               request.DebugString().c_str());
       GPR_ASSERT(request.has_client_stats());
       // We need to acquire the lock here in order to prevent the notify_one
@@ -218,21 +231,21 @@
           request.client_stats().num_calls_started();
       client_stats_.num_calls_finished +=
           request.client_stats().num_calls_finished();
-      client_stats_.num_calls_finished_with_drop_for_rate_limiting +=
-          request.client_stats()
-              .num_calls_finished_with_drop_for_rate_limiting();
-      client_stats_.num_calls_finished_with_drop_for_load_balancing +=
-          request.client_stats()
-              .num_calls_finished_with_drop_for_load_balancing();
       client_stats_.num_calls_finished_with_client_failed_to_send +=
           request.client_stats()
               .num_calls_finished_with_client_failed_to_send();
       client_stats_.num_calls_finished_known_received +=
           request.client_stats().num_calls_finished_known_received();
+      for (const auto& drop_token_count :
+           request.client_stats().calls_finished_with_drop()) {
+        client_stats_
+            .drop_token_counts[drop_token_count.load_balance_token()] +=
+            drop_token_count.num_calls();
+      }
       load_report_cond_.notify_one();
     }
   done:
-    gpr_log(GPR_INFO, "LB: done");
+    gpr_log(GPR_INFO, "LB[%p]: done", this);
     return Status::OK;
   }
 
@@ -247,21 +260,20 @@
     std::unique_lock<std::mutex> lock(mu_);
     const bool prev = !shutdown_;
     shutdown_ = true;
-    gpr_log(GPR_INFO, "LB: shut down");
+    gpr_log(GPR_INFO, "LB[%p]: shut down", this);
     return prev;
   }
 
   static LoadBalanceResponse BuildResponseForBackends(
-      const std::vector<int>& backend_ports, int num_drops_for_rate_limiting,
-      int num_drops_for_load_balancing) {
+      const std::vector<int>& backend_ports,
+      const std::map<grpc::string, size_t>& drop_token_counts) {
     LoadBalanceResponse response;
-    for (int i = 0; i < num_drops_for_rate_limiting; ++i) {
-      auto* server = response.mutable_server_list()->add_servers();
-      server->set_drop_for_rate_limiting(true);
-    }
-    for (int i = 0; i < num_drops_for_load_balancing; ++i) {
-      auto* server = response.mutable_server_list()->add_servers();
-      server->set_drop_for_load_balancing(true);
+    for (const auto& drop_token_count : drop_token_counts) {
+      for (size_t i = 0; i < drop_token_count.second; ++i) {
+        auto* server = response.mutable_server_list()->add_servers();
+        server->set_drop(true);
+        server->set_load_balance_token(drop_token_count.first);
+      }
     }
     for (const int& backend_port : backend_ports) {
       auto* server = response.mutable_server_list()->add_servers();
@@ -285,13 +297,13 @@
  private:
   void SendResponse(Stream* stream, const LoadBalanceResponse& response,
                     int delay_ms) {
-    gpr_log(GPR_INFO, "LB: sleeping for %d ms...", delay_ms);
+    gpr_log(GPR_INFO, "LB[%p]: sleeping for %d ms...", this, delay_ms);
     if (delay_ms > 0) {
       gpr_sleep_until(
           gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
                        gpr_time_from_millis(delay_ms, GPR_TIMESPAN)));
     }
-    gpr_log(GPR_INFO, "LB: Woke up! Sending response '%s'",
+    gpr_log(GPR_INFO, "LB[%p]: Woke up! Sending response '%s'", this,
             response.DebugString().c_str());
     IncreaseResponseCount();
     stream->Write(response);
@@ -341,7 +353,7 @@
 
   void TearDown() override {
     for (size_t i = 0; i < backends_.size(); ++i) {
-      backend_servers_[i].Shutdown();
+      if (backends_[i]->Shutdown()) backend_servers_[i].Shutdown();
     }
     for (size_t i = 0; i < balancers_.size(); ++i) {
       if (balancers_[i]->Shutdown()) balancer_servers_[i].Shutdown();
@@ -499,7 +511,7 @@
 TEST_F(SingleBalancerTest, Vanilla) {
   const size_t kNumRpcsPerAddress = 100;
   ScheduleResponseForBalancer(
-      0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0),
+      0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
       0);
   // Make sure that trying to connect works without a call.
   channel_->GetState(true /* try_to_connect */);
@@ -538,7 +550,7 @@
   ScheduleResponseForBalancer(0, LoadBalanceResponse(), 0);
   // Send non-empty serverlist only after kServerlistDelayMs
   ScheduleResponseForBalancer(
-      0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0),
+      0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
       kServerlistDelayMs);
 
   const auto t0 = system_clock::now();
@@ -580,11 +592,11 @@
 
   // Send a serverlist right away.
   ScheduleResponseForBalancer(
-      0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0),
+      0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
       0);
   // ... and the same one a bit later.
   ScheduleResponseForBalancer(
-      0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0),
+      0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
       kServerlistDelayMs);
 
   // Send num_backends/2 requests.
@@ -639,6 +651,61 @@
   EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
 }
 
+TEST_F(SingleBalancerTest, BackendsRestart) {
+  const size_t kNumRpcsPerAddress = 100;
+  ScheduleResponseForBalancer(
+      0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
+      0);
+  // Make sure that trying to connect works without a call.
+  channel_->GetState(true /* try_to_connect */);
+  // Send 100 RPCs per server.
+  auto statuses_and_responses =
+      SendRpc(kMessage_, kNumRpcsPerAddress * num_backends_);
+  for (const auto& status_and_response : statuses_and_responses) {
+    const Status& status = status_and_response.first;
+    const EchoResponse& response = status_and_response.second;
+    EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
+                             << " message=" << status.error_message();
+    EXPECT_EQ(response.message(), kMessage_);
+  }
+  // Each backend should have gotten 100 requests.
+  for (size_t i = 0; i < backends_.size(); ++i) {
+    EXPECT_EQ(kNumRpcsPerAddress,
+              backend_servers_[i].service_->request_count());
+  }
+  balancers_[0]->NotifyDoneWithServerlists();
+  // The balancer got a single request.
+  EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
+  // and sent a single response.
+  EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
+  for (size_t i = 0; i < backends_.size(); ++i) {
+    if (backends_[i]->Shutdown()) backend_servers_[i].Shutdown();
+  }
+  statuses_and_responses = SendRpc(kMessage_, 1);
+  for (const auto& status_and_response : statuses_and_responses) {
+    const Status& status = status_and_response.first;
+    EXPECT_FALSE(status.ok());
+  }
+  for (size_t i = 0; i < num_backends_; ++i) {
+    backends_.emplace_back(new BackendServiceImpl());
+    backend_servers_.emplace_back(ServerThread<BackendService>(
+        "backend", server_host_, backends_.back().get()));
+  }
+  // The following RPC will fail due to the backend ports having changed. It
+  // will nonetheless exercise the grpclb-roundrobin handling of the RR policy
+  // having gone into shutdown.
+  // TODO(dgq): implement the "backend restart" component as well. We need extra
+  // machinery to either update the LB responses "on the fly" or instruct
+  // backends which ports to restart on.
+  statuses_and_responses = SendRpc(kMessage_, 1);
+  for (const auto& status_and_response : statuses_and_responses) {
+    const Status& status = status_and_response.first;
+    EXPECT_FALSE(status.ok());
+  }
+  // Check LB policy name for the channel.
+  EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
+}
+
 class UpdatesTest : public GrpclbEnd2endTest {
  public:
   UpdatesTest() : GrpclbEnd2endTest(4, 3, 0) {}
@@ -648,10 +715,9 @@
   const std::vector<int> first_backend{GetBackendPorts()[0]};
   const std::vector<int> second_backend{GetBackendPorts()[1]};
   ScheduleResponseForBalancer(
-      0, BalancerServiceImpl::BuildResponseForBackends(first_backend, 0, 0), 0);
+      0, BalancerServiceImpl::BuildResponseForBackends(first_backend, {}), 0);
   ScheduleResponseForBalancer(
-      1, BalancerServiceImpl::BuildResponseForBackends(second_backend, 0, 0),
-      0);
+      1, BalancerServiceImpl::BuildResponseForBackends(second_backend, {}), 0);
 
   // Start servers and send 10 RPCs per server.
   gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
@@ -726,10 +792,9 @@
   const std::vector<int> second_backend{GetBackendPorts()[0]};
 
   ScheduleResponseForBalancer(
-      0, BalancerServiceImpl::BuildResponseForBackends(first_backend, 0, 0), 0);
+      0, BalancerServiceImpl::BuildResponseForBackends(first_backend, {}), 0);
   ScheduleResponseForBalancer(
-      1, BalancerServiceImpl::BuildResponseForBackends(second_backend, 0, 0),
-      0);
+      1, BalancerServiceImpl::BuildResponseForBackends(second_backend, {}), 0);
 
   // Start servers and send 10 RPCs per server.
   gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
@@ -809,10 +874,9 @@
   const std::vector<int> second_backend{GetBackendPorts()[1]};
 
   ScheduleResponseForBalancer(
-      0, BalancerServiceImpl::BuildResponseForBackends(first_backend, 0, 0), 0);
+      0, BalancerServiceImpl::BuildResponseForBackends(first_backend, {}), 0);
   ScheduleResponseForBalancer(
-      1, BalancerServiceImpl::BuildResponseForBackends(second_backend, 0, 0),
-      0);
+      1, BalancerServiceImpl::BuildResponseForBackends(second_backend, {}), 0);
 
   // Start servers and send 10 RPCs per server.
   gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
@@ -901,7 +965,8 @@
 TEST_F(SingleBalancerTest, Drop) {
   const size_t kNumRpcsPerAddress = 100;
   ScheduleResponseForBalancer(
-      0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 1, 2),
+      0, BalancerServiceImpl::BuildResponseForBackends(
+             GetBackendPorts(), {{"rate_limiting", 1}, {"load_balancing", 2}}),
       0);
   // Send 100 RPCs for each server and drop address.
   const auto& statuses_and_responses =
@@ -936,7 +1001,9 @@
 TEST_F(SingleBalancerTest, DropAllFirst) {
   // All registered addresses are marked as "drop".
   ScheduleResponseForBalancer(
-      0, BalancerServiceImpl::BuildResponseForBackends({}, 1, 1), 0);
+      0, BalancerServiceImpl::BuildResponseForBackends(
+             {}, {{"rate_limiting", 1}, {"load_balancing", 1}}),
+      0);
   const auto& statuses_and_responses = SendRpc(kMessage_, 1);
   for (const auto& status_and_response : statuses_and_responses) {
     const Status& status = status_and_response.first;
@@ -947,10 +1014,12 @@
 
 TEST_F(SingleBalancerTest, DropAll) {
   ScheduleResponseForBalancer(
-      0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0),
+      0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
       0);
   ScheduleResponseForBalancer(
-      0, BalancerServiceImpl::BuildResponseForBackends({}, 1, 1), 1000);
+      0, BalancerServiceImpl::BuildResponseForBackends(
+             {}, {{"rate_limiting", 1}, {"load_balancing", 1}}),
+      1000);
 
   // First call succeeds.
   auto statuses_and_responses = SendRpc(kMessage_, 1);
@@ -980,7 +1049,7 @@
 TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) {
   const size_t kNumRpcsPerAddress = 100;
   ScheduleResponseForBalancer(
-      0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0),
+      0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
       0);
   // Send 100 RPCs per server.
   const auto& statuses_and_responses =
@@ -1009,17 +1078,17 @@
   EXPECT_EQ(kNumRpcsPerAddress * num_backends_, client_stats.num_calls_started);
   EXPECT_EQ(kNumRpcsPerAddress * num_backends_,
             client_stats.num_calls_finished);
-  EXPECT_EQ(0U, client_stats.num_calls_finished_with_drop_for_rate_limiting);
-  EXPECT_EQ(0U, client_stats.num_calls_finished_with_drop_for_load_balancing);
   EXPECT_EQ(0U, client_stats.num_calls_finished_with_client_failed_to_send);
   EXPECT_EQ(kNumRpcsPerAddress * num_backends_,
             client_stats.num_calls_finished_known_received);
+  EXPECT_THAT(client_stats.drop_token_counts, ::testing::ElementsAre());
 }
 
 TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) {
   const size_t kNumRpcsPerAddress = 3;
   ScheduleResponseForBalancer(
-      0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 2, 1),
+      0, BalancerServiceImpl::BuildResponseForBackends(
+             GetBackendPorts(), {{"rate_limiting", 2}, {"load_balancing", 1}}),
       0);
   // Send 100 RPCs for each server and drop address.
   const auto& statuses_and_responses =
@@ -1056,13 +1125,13 @@
             client_stats.num_calls_started);
   EXPECT_EQ(kNumRpcsPerAddress * (num_backends_ + 3),
             client_stats.num_calls_finished);
-  EXPECT_EQ(kNumRpcsPerAddress * 2,
-            client_stats.num_calls_finished_with_drop_for_rate_limiting);
-  EXPECT_EQ(kNumRpcsPerAddress,
-            client_stats.num_calls_finished_with_drop_for_load_balancing);
   EXPECT_EQ(0U, client_stats.num_calls_finished_with_client_failed_to_send);
   EXPECT_EQ(kNumRpcsPerAddress * num_backends_,
             client_stats.num_calls_finished_known_received);
+  EXPECT_THAT(client_stats.drop_token_counts,
+              ::testing::ElementsAre(
+                  ::testing::Pair("load_balancing", kNumRpcsPerAddress),
+                  ::testing::Pair("rate_limiting", kNumRpcsPerAddress * 2)));
 }
 
 }  // namespace
diff --git a/test/cpp/grpclb/grpclb_api_test.cc b/test/cpp/grpclb/grpclb_api_test.cc
index cec9666..6b0350e 100644
--- a/test/cpp/grpclb/grpclb_api_test.cc
+++ b/test/cpp/grpclb/grpclb_api_test.cc
@@ -91,13 +91,13 @@
   auto* server = serverlist->add_servers();
   server->set_ip_address(Ip4ToPackedString("127.0.0.1"));
   server->set_port(12345);
-  server->set_drop_for_rate_limiting(true);
-  server->set_drop_for_load_balancing(false);
+  server->set_load_balance_token("rate_limting");
+  server->set_drop(true);
   server = response.mutable_server_list()->add_servers();
   server->set_ip_address(Ip4ToPackedString("10.0.0.1"));
   server->set_port(54321);
-  server->set_drop_for_rate_limiting(false);
-  server->set_drop_for_load_balancing(true);
+  server->set_load_balance_token("load_balancing");
+  server->set_drop(true);
   auto* expiration_interval = serverlist->mutable_expiration_interval();
   expiration_interval->set_seconds(888);
   expiration_interval->set_nanos(999);
@@ -112,14 +112,14 @@
   EXPECT_EQ(PackedStringToIp(c_serverlist->servers[0]->ip_address),
             "127.0.0.1");
   EXPECT_EQ(c_serverlist->servers[0]->port, 12345);
-  EXPECT_TRUE(c_serverlist->servers[0]->drop_for_rate_limiting);
-  EXPECT_FALSE(c_serverlist->servers[0]->drop_for_load_balancing);
+  EXPECT_STREQ(c_serverlist->servers[0]->load_balance_token, "rate_limting");
+  EXPECT_TRUE(c_serverlist->servers[0]->drop);
   EXPECT_TRUE(c_serverlist->servers[1]->has_ip_address);
 
   EXPECT_EQ(PackedStringToIp(c_serverlist->servers[1]->ip_address), "10.0.0.1");
   EXPECT_EQ(c_serverlist->servers[1]->port, 54321);
-  EXPECT_FALSE(c_serverlist->servers[1]->drop_for_rate_limiting);
-  EXPECT_TRUE(c_serverlist->servers[1]->drop_for_load_balancing);
+  EXPECT_STREQ(c_serverlist->servers[1]->load_balance_token, "load_balancing");
+  EXPECT_TRUE(c_serverlist->servers[1]->drop);
 
   EXPECT_TRUE(c_serverlist->expiration_interval.has_seconds);
   EXPECT_EQ(c_serverlist->expiration_interval.seconds, 888);
diff --git a/test/cpp/qps/server.h b/test/cpp/qps/server.h
index c0dac96..df27a43 100644
--- a/test/cpp/qps/server.h
+++ b/test/cpp/qps/server.h
@@ -80,8 +80,11 @@
       return false;
     }
     payload->set_type(type);
-    std::unique_ptr<char[]> body(new char[size]());
-    payload->set_body(body.get(), size);
+    // Don't waste time creating a new payload of identical size.
+    if (payload->body().length() != (size_t)size) {
+      std::unique_ptr<char[]> body(new char[size]());
+      payload->set_body(body.get(), size);
+    }
     return true;
   }
 
diff --git a/tools/internal_ci/helper_scripts/prepare_build_macos_rc b/tools/internal_ci/helper_scripts/prepare_build_macos_rc
index 89e8e24..00105d4 100644
--- a/tools/internal_ci/helper_scripts/prepare_build_macos_rc
+++ b/tools/internal_ci/helper_scripts/prepare_build_macos_rc
@@ -27,6 +27,11 @@
 # show current limits
 ulimit -a
 
+# Add GCP credentials for BQ access
+# pip does not install google-api-python-client properly, so use easy_install
+sudo easy_install --upgrade google-api-python-client
+export GOOGLE_APPLICATION_CREDENTIALS=${KOKORO_GFILE_DIR}/GrpcTesting-d0eeee2db331.json
+gcloud auth activate-service-account --key-file=$GOOGLE_APPLICATION_CREDENTIALS
 
 # required to build protobuf
 brew install gflags
diff --git a/tools/internal_ci/helper_scripts/prepare_build_windows.bat b/tools/internal_ci/helper_scripts/prepare_build_windows.bat
index 1634acd..69e087e 100644
--- a/tools/internal_ci/helper_scripts/prepare_build_windows.bat
+++ b/tools/internal_ci/helper_scripts/prepare_build_windows.bat
@@ -18,4 +18,14 @@
 
 bash tools/internal_ci/helper_scripts/gen_report_index.sh
 
+@rem Update DNS settings to:
+@rem 1. allow resolving metadata.google.internal hostname
+@rem 2. make fetching default GCE credential by oauth2client work
+netsh interface ip set dns "Local Area Connection 8" static 169.254.169.254 primary
+netsh interface ip add dnsservers "Local Area Connection 8" 8.8.8.8 index=2
+netsh interface ip add dnsservers "Local Area Connection 8" 8.8.4.4 index=3
+
+@rem Needed for big_query_utils
+python -m pip install google-api-python-client
+
 git submodule update --init
diff --git a/tools/internal_ci/linux/grpc_build_artifacts.sh b/tools/internal_ci/linux/grpc_build_artifacts.sh
index 3997a13..985243e 100755
--- a/tools/internal_ci/linux/grpc_build_artifacts.sh
+++ b/tools/internal_ci/linux/grpc_build_artifacts.sh
@@ -26,4 +26,4 @@
 rvm --default use ruby-2.4.1
 set -ex
 
-tools/run_tests/task_runner.py -f artifact linux
+tools/run_tests/task_runner.py -f artifact linux -j 6
diff --git a/tools/internal_ci/linux/grpc_master.cfg b/tools/internal_ci/linux/pull_request/grpc_basictests_c_cpp_dbg.cfg
similarity index 89%
copy from tools/internal_ci/linux/grpc_master.cfg
copy to tools/internal_ci/linux/pull_request/grpc_basictests_c_cpp_dbg.cfg
index a2784df..dcc6265 100644
--- a/tools/internal_ci/linux/grpc_master.cfg
+++ b/tools/internal_ci/linux/pull_request/grpc_basictests_c_cpp_dbg.cfg
@@ -26,5 +26,5 @@
 
 env_vars {
   key: "RUN_TESTS_FLAGS"
-  value: "-f basictests linux --inner_jobs 16 -j 1 --internal_ci --bq_result_table aggregate_results"
+  value: "-f basictests linux corelang dbg --inner_jobs 16 -j 1 --internal_ci"
 }
diff --git a/tools/internal_ci/linux/grpc_master.cfg b/tools/internal_ci/linux/pull_request/grpc_basictests_c_cpp_opt.cfg
similarity index 89%
rename from tools/internal_ci/linux/grpc_master.cfg
rename to tools/internal_ci/linux/pull_request/grpc_basictests_c_cpp_opt.cfg
index a2784df..f60beaf 100644
--- a/tools/internal_ci/linux/grpc_master.cfg
+++ b/tools/internal_ci/linux/pull_request/grpc_basictests_c_cpp_opt.cfg
@@ -26,5 +26,5 @@
 
 env_vars {
   key: "RUN_TESTS_FLAGS"
-  value: "-f basictests linux --inner_jobs 16 -j 1 --internal_ci --bq_result_table aggregate_results"
+  value: "-f basictests linux corelang opt --inner_jobs 16 -j 1 --internal_ci"
 }
diff --git a/tools/internal_ci/linux/grpc_master.cfg b/tools/internal_ci/linux/pull_request/grpc_basictests_multilang.cfg
similarity index 89%
copy from tools/internal_ci/linux/grpc_master.cfg
copy to tools/internal_ci/linux/pull_request/grpc_basictests_multilang.cfg
index a2784df..7c16cf6 100644
--- a/tools/internal_ci/linux/grpc_master.cfg
+++ b/tools/internal_ci/linux/pull_request/grpc_basictests_multilang.cfg
@@ -26,5 +26,5 @@
 
 env_vars {
   key: "RUN_TESTS_FLAGS"
-  value: "-f basictests linux --inner_jobs 16 -j 1 --internal_ci --bq_result_table aggregate_results"
+  value: "-f basictests linux multilang --inner_jobs 16 -j 2 --internal_ci"
 }
diff --git a/tools/internal_ci/linux/grpc_master.cfg b/tools/internal_ci/macos/grpc_basictests.cfg
similarity index 76%
copy from tools/internal_ci/linux/grpc_master.cfg
copy to tools/internal_ci/macos/grpc_basictests.cfg
index a2784df..e10c2e3 100644
--- a/tools/internal_ci/linux/grpc_master.cfg
+++ b/tools/internal_ci/macos/grpc_basictests.cfg
@@ -15,7 +15,8 @@
 # Config file for the internal CI (in protobuf text format)
 
 # Location of the continuous shell script in repository.
-build_file: "grpc/tools/internal_ci/linux/grpc_run_tests_matrix.sh"
+build_file: "grpc/tools/internal_ci/macos/grpc_run_tests_matrix.sh"
+gfile_resources: "/bigstore/grpc-testing-secrets/gcp_credentials/GrpcTesting-d0eeee2db331.json"
 timeout_mins: 240
 action {
   define_artifacts {
@@ -26,5 +27,5 @@
 
 env_vars {
   key: "RUN_TESTS_FLAGS"
-  value: "-f basictests linux --inner_jobs 16 -j 1 --internal_ci --bq_result_table aggregate_results"
+  value: "-f basictests macos --internal_ci -j 2 --inner_jobs 4 --bq_result_table aggregate_results"
 }
diff --git a/tools/internal_ci/macos/grpc_master.cfg b/tools/internal_ci/macos/grpc_master.cfg
deleted file mode 100644
index 24c021d..0000000
--- a/tools/internal_ci/macos/grpc_master.cfg
+++ /dev/null
@@ -1,25 +0,0 @@
-# Copyright 2017 gRPC authors.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# Config file for the internal CI (in protobuf text format)
-
-# Location of the continuous shell script in repository.
-build_file: "grpc/tools/internal_ci/macos/grpc_master.sh"
-timeout_mins: 240
-action {
-  define_artifacts {
-    regex: "**/*sponge_log.xml"
-    regex: "github/grpc/reports/**"
-  }
-}
diff --git a/tools/internal_ci/macos/grpc_master.sh b/tools/internal_ci/macos/grpc_run_tests_matrix.sh
similarity index 88%
rename from tools/internal_ci/macos/grpc_master.sh
rename to tools/internal_ci/macos/grpc_run_tests_matrix.sh
index c64666b..9a43e48 100755
--- a/tools/internal_ci/macos/grpc_master.sh
+++ b/tools/internal_ci/macos/grpc_run_tests_matrix.sh
@@ -20,7 +20,7 @@
 
 source tools/internal_ci/helper_scripts/prepare_build_macos_rc
 
-tools/run_tests/run_tests_matrix.py -f basictests macos --internal_ci -j 2 --inner_jobs 4 || FAILED="true"
+tools/run_tests/run_tests_matrix.py $RUN_TESTS_FLAGS || FAILED="true"
 
 # kill port_server.py to prevent the build from hanging
 ps aux | grep port_server\\.py | awk '{print $2}' | xargs kill -9
diff --git a/tools/internal_ci/linux/grpc_master.cfg b/tools/internal_ci/windows/grpc_basictests.cfg
similarity index 80%
copy from tools/internal_ci/linux/grpc_master.cfg
copy to tools/internal_ci/windows/grpc_basictests.cfg
index a2784df..396d29e 100644
--- a/tools/internal_ci/linux/grpc_master.cfg
+++ b/tools/internal_ci/windows/grpc_basictests.cfg
@@ -15,8 +15,8 @@
 # Config file for the internal CI (in protobuf text format)
 
 # Location of the continuous shell script in repository.
-build_file: "grpc/tools/internal_ci/linux/grpc_run_tests_matrix.sh"
-timeout_mins: 240
+build_file: "grpc/tools/internal_ci/windows/grpc_run_tests_matrix.bat"
+timeout_mins: 360
 action {
   define_artifacts {
     regex: "**/*sponge_log.xml"
@@ -26,5 +26,5 @@
 
 env_vars {
   key: "RUN_TESTS_FLAGS"
-  value: "-f basictests linux --inner_jobs 16 -j 1 --internal_ci --bq_result_table aggregate_results"
+  value: "-f basictests windows -j 1 --inner_jobs 8 --internal_ci --bq_result_table aggregate_results"
 }
diff --git a/tools/internal_ci/windows/grpc_master.bat b/tools/internal_ci/windows/grpc_master.bat
deleted file mode 100644
index 58eefc3..0000000
--- a/tools/internal_ci/windows/grpc_master.bat
+++ /dev/null
@@ -1,24 +0,0 @@
-@rem Copyright 2017 gRPC authors.
-@rem
-@rem Licensed under the Apache License, Version 2.0 (the "License");
-@rem you may not use this file except in compliance with the License.
-@rem You may obtain a copy of the License at
-@rem
-@rem     http://www.apache.org/licenses/LICENSE-2.0
-@rem
-@rem Unless required by applicable law or agreed to in writing, software
-@rem distributed under the License is distributed on an "AS IS" BASIS,
-@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-@rem See the License for the specific language governing permissions and
-@rem limitations under the License.
-
-@rem enter repo root
-cd /d %~dp0\..\..\..
-
-call tools/internal_ci/helper_scripts/prepare_build_windows.bat
-
-python tools/run_tests/run_tests_matrix.py -f basictests windows -j 1 --inner_jobs 8 --internal_ci || goto :error
-goto :EOF
-
-:error
-exit /b %errorlevel%
diff --git a/tools/internal_ci/windows/grpc_master.cfg b/tools/internal_ci/windows/grpc_master.cfg
deleted file mode 100644
index 786ebc0..0000000
--- a/tools/internal_ci/windows/grpc_master.cfg
+++ /dev/null
@@ -1,25 +0,0 @@
-# Copyright 2017 gRPC authors.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# Config file for the internal CI (in protobuf text format)
-
-# Location of the continuous shell script in repository.
-build_file: "grpc/tools/internal_ci/windows/grpc_master.bat"
-timeout_mins: 360
-action {
-  define_artifacts {
-    regex: "**/*sponge_log.xml"
-    regex: "github/grpc/reports/**"
-  }
-}
diff --git a/tools/internal_ci/linux/grpc_master.cfg b/tools/internal_ci/windows/grpc_portability.cfg
similarity index 81%
copy from tools/internal_ci/linux/grpc_master.cfg
copy to tools/internal_ci/windows/grpc_portability.cfg
index a2784df..c395cb4 100644
--- a/tools/internal_ci/linux/grpc_master.cfg
+++ b/tools/internal_ci/windows/grpc_portability.cfg
@@ -15,8 +15,8 @@
 # Config file for the internal CI (in protobuf text format)
 
 # Location of the continuous shell script in repository.
-build_file: "grpc/tools/internal_ci/linux/grpc_run_tests_matrix.sh"
-timeout_mins: 240
+build_file: "grpc/tools/internal_ci/windows/grpc_run_tests_matrix.bat"
+timeout_mins: 360
 action {
   define_artifacts {
     regex: "**/*sponge_log.xml"
@@ -26,5 +26,5 @@
 
 env_vars {
   key: "RUN_TESTS_FLAGS"
-  value: "-f basictests linux --inner_jobs 16 -j 1 --internal_ci --bq_result_table aggregate_results"
+  value: "-f portability windows -j 1 --inner_jobs 8 --internal_ci"
 }
diff --git a/tools/internal_ci/windows/grpc_portability_master.cfg b/tools/internal_ci/windows/grpc_portability_master.cfg
index 85f6927..c395cb4 100644
--- a/tools/internal_ci/windows/grpc_portability_master.cfg
+++ b/tools/internal_ci/windows/grpc_portability_master.cfg
@@ -15,7 +15,7 @@
 # Config file for the internal CI (in protobuf text format)
 
 # Location of the continuous shell script in repository.
-build_file: "grpc/tools/internal_ci/windows/grpc_portability_master.bat"
+build_file: "grpc/tools/internal_ci/windows/grpc_run_tests_matrix.bat"
 timeout_mins: 360
 action {
   define_artifacts {
@@ -23,3 +23,8 @@
     regex: "github/grpc/reports/**"
   }
 }
+
+env_vars {
+  key: "RUN_TESTS_FLAGS"
+  value: "-f portability windows -j 1 --inner_jobs 8 --internal_ci"
+}
diff --git a/tools/internal_ci/windows/grpc_portability_master.bat b/tools/internal_ci/windows/grpc_run_tests_matrix.bat
similarity index 86%
rename from tools/internal_ci/windows/grpc_portability_master.bat
rename to tools/internal_ci/windows/grpc_run_tests_matrix.bat
index c93d56c..08d834f 100644
--- a/tools/internal_ci/windows/grpc_portability_master.bat
+++ b/tools/internal_ci/windows/grpc_run_tests_matrix.bat
@@ -17,7 +17,7 @@
 
 call tools/internal_ci/helper_scripts/prepare_build_windows.bat
 
-python tools/run_tests/run_tests_matrix.py -f portability windows -j 1 --inner_jobs 8 --internal_ci || goto :error
+python tools/run_tests/run_tests_matrix.py %RUN_TESTS_FLAGS% || goto :error
 goto :EOF
 
 :error
diff --git a/tools/internal_ci/linux/grpc_master.cfg b/tools/internal_ci/windows/pull_request/grpc_basictests.cfg
similarity index 81%
copy from tools/internal_ci/linux/grpc_master.cfg
copy to tools/internal_ci/windows/pull_request/grpc_basictests.cfg
index a2784df..a116738 100644
--- a/tools/internal_ci/linux/grpc_master.cfg
+++ b/tools/internal_ci/windows/pull_request/grpc_basictests.cfg
@@ -15,8 +15,8 @@
 # Config file for the internal CI (in protobuf text format)
 
 # Location of the continuous shell script in repository.
-build_file: "grpc/tools/internal_ci/linux/grpc_run_tests_matrix.sh"
-timeout_mins: 240
+build_file: "grpc/tools/internal_ci/windows/grpc_run_tests_matrix.bat"
+timeout_mins: 360
 action {
   define_artifacts {
     regex: "**/*sponge_log.xml"
@@ -26,5 +26,5 @@
 
 env_vars {
   key: "RUN_TESTS_FLAGS"
-  value: "-f basictests linux --inner_jobs 16 -j 1 --internal_ci --bq_result_table aggregate_results"
+  value: "-f basictests windows -j 1 --inner_jobs 8 --internal_ci"
 }
diff --git a/tools/internal_ci/linux/grpc_master.cfg b/tools/internal_ci/windows/pull_request/grpc_portability.cfg
similarity index 81%
copy from tools/internal_ci/linux/grpc_master.cfg
copy to tools/internal_ci/windows/pull_request/grpc_portability.cfg
index a2784df..c395cb4 100644
--- a/tools/internal_ci/linux/grpc_master.cfg
+++ b/tools/internal_ci/windows/pull_request/grpc_portability.cfg
@@ -15,8 +15,8 @@
 # Config file for the internal CI (in protobuf text format)
 
 # Location of the continuous shell script in repository.
-build_file: "grpc/tools/internal_ci/linux/grpc_run_tests_matrix.sh"
-timeout_mins: 240
+build_file: "grpc/tools/internal_ci/windows/grpc_run_tests_matrix.bat"
+timeout_mins: 360
 action {
   define_artifacts {
     regex: "**/*sponge_log.xml"
@@ -26,5 +26,5 @@
 
 env_vars {
   key: "RUN_TESTS_FLAGS"
-  value: "-f basictests linux --inner_jobs 16 -j 1 --internal_ci --bq_result_table aggregate_results"
+  value: "-f portability windows -j 1 --inner_jobs 8 --internal_ci"
 }
diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json
index 24e7ff2..aa76fc0 100644
--- a/tools/run_tests/generated/sources_and_headers.json
+++ b/tools/run_tests/generated/sources_and_headers.json
@@ -174,6 +174,23 @@
     "headers": [], 
     "is_filegroup": false, 
     "language": "c", 
+    "name": "byte_stream_test", 
+    "src": [
+      "test/core/transport/byte_stream_test.c"
+    ], 
+    "third_party": false, 
+    "type": "target"
+  }, 
+  {
+    "deps": [
+      "gpr", 
+      "gpr_test_util", 
+      "grpc", 
+      "grpc_test_util"
+    ], 
+    "headers": [], 
+    "is_filegroup": false, 
+    "language": "c", 
     "name": "census_context_test", 
     "src": [
       "test/core/census/context_test.c"
diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json
index ebf3071..83781f9 100644
--- a/tools/run_tests/generated/tests.json
+++ b/tools/run_tests/generated/tests.json
@@ -215,6 +215,28 @@
     "flaky": false, 
     "gtest": false, 
     "language": "c", 
+    "name": "byte_stream_test", 
+    "platforms": [
+      "linux", 
+      "mac", 
+      "posix", 
+      "windows"
+    ]
+  }, 
+  {
+    "args": [], 
+    "ci_platforms": [
+      "linux", 
+      "mac", 
+      "posix", 
+      "windows"
+    ], 
+    "cpu_cost": 1.0, 
+    "exclude_configs": [], 
+    "exclude_iomgrs": [], 
+    "flaky": false, 
+    "gtest": false, 
+    "language": "c", 
     "name": "census_context_test", 
     "platforms": [
       "linux", 
diff --git a/vsprojects/buildtests_c.sln b/vsprojects/buildtests_c.sln
index 55de734..6f13039 100644
--- a/vsprojects/buildtests_c.sln
+++ b/vsprojects/buildtests_c.sln
@@ -118,6 +118,17 @@
 		{29D16885-7228-4C31-81ED-5F9187C7F2A9} = {29D16885-7228-4C31-81ED-5F9187C7F2A9}
 	EndProjectSection
 EndProject
+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "byte_stream_test", "vcxproj\test\byte_stream_test\byte_stream_test.vcxproj", "{9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}"
+	ProjectSection(myProperties) = preProject
+        	lib = "False"
+	EndProjectSection
+	ProjectSection(ProjectDependencies) = postProject
+		{17BCAFC0-5FDC-4C94-AEB9-95F3E220614B} = {17BCAFC0-5FDC-4C94-AEB9-95F3E220614B}
+		{29D16885-7228-4C31-81ED-5F9187C7F2A9} = {29D16885-7228-4C31-81ED-5F9187C7F2A9}
+		{EAB0A629-17A9-44DB-B5FF-E91A721FE037} = {EAB0A629-17A9-44DB-B5FF-E91A721FE037}
+		{B23D3D1A-9438-4EDA-BEB6-9A0A03D17792} = {B23D3D1A-9438-4EDA-BEB6-9A0A03D17792}
+	EndProjectSection
+EndProject
 Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "census_context_test", "vcxproj\test\census_context_test\census_context_test.vcxproj", "{5C1CFC2D-AF3C-D7CB-BA74-D267E91CBC73}"
 	ProjectSection(myProperties) = preProject
         	lib = "False"
@@ -1940,6 +1951,22 @@
 		{D5C70922-D68E-0E9D-9988-995E0F9A79AE}.Release-DLL|Win32.Build.0 = Release|Win32
 		{D5C70922-D68E-0E9D-9988-995E0F9A79AE}.Release-DLL|x64.ActiveCfg = Release|x64
 		{D5C70922-D68E-0E9D-9988-995E0F9A79AE}.Release-DLL|x64.Build.0 = Release|x64
+		{9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Debug|Win32.ActiveCfg = Debug|Win32
+		{9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Debug|x64.ActiveCfg = Debug|x64
+		{9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Release|Win32.ActiveCfg = Release|Win32
+		{9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Release|x64.ActiveCfg = Release|x64
+		{9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Debug|Win32.Build.0 = Debug|Win32
+		{9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Debug|x64.Build.0 = Debug|x64
+		{9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Release|Win32.Build.0 = Release|Win32
+		{9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Release|x64.Build.0 = Release|x64
+		{9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Debug-DLL|Win32.ActiveCfg = Debug|Win32
+		{9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Debug-DLL|Win32.Build.0 = Debug|Win32
+		{9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Debug-DLL|x64.ActiveCfg = Debug|x64
+		{9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Debug-DLL|x64.Build.0 = Debug|x64
+		{9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Release-DLL|Win32.ActiveCfg = Release|Win32
+		{9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Release-DLL|Win32.Build.0 = Release|Win32
+		{9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Release-DLL|x64.ActiveCfg = Release|x64
+		{9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Release-DLL|x64.Build.0 = Release|x64
 		{5C1CFC2D-AF3C-D7CB-BA74-D267E91CBC73}.Debug|Win32.ActiveCfg = Debug|Win32
 		{5C1CFC2D-AF3C-D7CB-BA74-D267E91CBC73}.Debug|x64.ActiveCfg = Debug|x64
 		{5C1CFC2D-AF3C-D7CB-BA74-D267E91CBC73}.Release|Win32.ActiveCfg = Release|Win32
diff --git a/vsprojects/vcxproj/test/byte_stream_test/byte_stream_test.vcxproj b/vsprojects/vcxproj/test/byte_stream_test/byte_stream_test.vcxproj
new file mode 100644
index 0000000..5d65647
--- /dev/null
+++ b/vsprojects/vcxproj/test/byte_stream_test/byte_stream_test.vcxproj
@@ -0,0 +1,199 @@
+<?xml version="1.0" encoding="utf-8"?>
+<Project DefaultTargets="Build" ToolsVersion="12.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+  <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.props" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\1.0.204.1.props')" />
+  <ItemGroup Label="ProjectConfigurations">
+    <ProjectConfiguration Include="Debug|Win32">
+      <Configuration>Debug</Configuration>
+      <Platform>Win32</Platform>
+    </ProjectConfiguration>
+    <ProjectConfiguration Include="Debug|x64">
+      <Configuration>Debug</Configuration>
+      <Platform>x64</Platform>
+    </ProjectConfiguration>
+    <ProjectConfiguration Include="Release|Win32">
+      <Configuration>Release</Configuration>
+      <Platform>Win32</Platform>
+    </ProjectConfiguration>
+    <ProjectConfiguration Include="Release|x64">
+      <Configuration>Release</Configuration>
+      <Platform>x64</Platform>
+    </ProjectConfiguration>
+  </ItemGroup>
+  <PropertyGroup Label="Globals">
+    <ProjectGuid>{9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}</ProjectGuid>
+    <IgnoreWarnIntDirInTempDetected>true</IgnoreWarnIntDirInTempDetected>
+    <IntDir>$(SolutionDir)IntDir\$(MSBuildProjectName)\</IntDir>
+  </PropertyGroup>
+  <Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" />
+  <PropertyGroup Condition="'$(VisualStudioVersion)' == '10.0'" Label="Configuration">
+    <PlatformToolset>v100</PlatformToolset>
+  </PropertyGroup>
+  <PropertyGroup Condition="'$(VisualStudioVersion)' == '11.0'" Label="Configuration">
+    <PlatformToolset>v110</PlatformToolset>
+  </PropertyGroup>
+  <PropertyGroup Condition="'$(VisualStudioVersion)' == '12.0'" Label="Configuration">
+    <PlatformToolset>v120</PlatformToolset>
+  </PropertyGroup>
+  <PropertyGroup Condition="'$(VisualStudioVersion)' == '14.0'" Label="Configuration">
+    <PlatformToolset>v140</PlatformToolset>
+  </PropertyGroup>
+  <PropertyGroup Condition="'$(Configuration)'=='Debug'" Label="Configuration">
+    <ConfigurationType>Application</ConfigurationType>
+    <UseDebugLibraries>true</UseDebugLibraries>
+    <CharacterSet>Unicode</CharacterSet>
+  </PropertyGroup>
+  <PropertyGroup Condition="'$(Configuration)'=='Release'" Label="Configuration">
+    <ConfigurationType>Application</ConfigurationType>
+    <UseDebugLibraries>false</UseDebugLibraries>
+    <WholeProgramOptimization>true</WholeProgramOptimization>
+    <CharacterSet>Unicode</CharacterSet>
+  </PropertyGroup>
+  <Import Project="$(VCTargetsPath)\Microsoft.Cpp.props" />
+  <ImportGroup Label="ExtensionSettings">
+  </ImportGroup>
+  <ImportGroup Label="PropertySheets">
+    <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
+    <Import Project="$(SolutionDir)\..\vsprojects\global.props" />
+    <Import Project="$(SolutionDir)\..\vsprojects\openssl.props" />
+    <Import Project="$(SolutionDir)\..\vsprojects\winsock.props" />
+    <Import Project="$(SolutionDir)\..\vsprojects\zlib.props" />
+  </ImportGroup>
+  <PropertyGroup Label="UserMacros" />
+  <PropertyGroup Condition="'$(Configuration)'=='Debug'">
+    <TargetName>byte_stream_test</TargetName>
+    <Linkage-grpc_dependencies_zlib>static</Linkage-grpc_dependencies_zlib>
+    <Configuration-grpc_dependencies_zlib>Debug</Configuration-grpc_dependencies_zlib>
+    <Linkage-grpc_dependencies_openssl>static</Linkage-grpc_dependencies_openssl>
+    <Configuration-grpc_dependencies_openssl>Debug</Configuration-grpc_dependencies_openssl>
+  </PropertyGroup>
+  <PropertyGroup Condition="'$(Configuration)'=='Release'">
+    <TargetName>byte_stream_test</TargetName>
+    <Linkage-grpc_dependencies_zlib>static</Linkage-grpc_dependencies_zlib>
+    <Configuration-grpc_dependencies_zlib>Release</Configuration-grpc_dependencies_zlib>
+    <Linkage-grpc_dependencies_openssl>static</Linkage-grpc_dependencies_openssl>
+    <Configuration-grpc_dependencies_openssl>Release</Configuration-grpc_dependencies_openssl>
+  </PropertyGroup>
+    <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
+    <ClCompile>
+      <PrecompiledHeader>NotUsing</PrecompiledHeader>
+      <WarningLevel>Level3</WarningLevel>
+      <Optimization>Disabled</Optimization>
+      <PreprocessorDefinitions>WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+      <SDLCheck>true</SDLCheck>
+      <RuntimeLibrary>MultiThreadedDebug</RuntimeLibrary>
+      <TreatWarningAsError>true</TreatWarningAsError>
+      <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat>
+      <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild>
+    </ClCompile>
+    <Link>
+      <SubSystem>Console</SubSystem>
+      <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation>
+      <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation>
+    </Link>
+  </ItemDefinitionGroup>
+
+    <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">
+    <ClCompile>
+      <PrecompiledHeader>NotUsing</PrecompiledHeader>
+      <WarningLevel>Level3</WarningLevel>
+      <Optimization>Disabled</Optimization>
+      <PreprocessorDefinitions>WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+      <SDLCheck>true</SDLCheck>
+      <RuntimeLibrary>MultiThreadedDebug</RuntimeLibrary>
+      <TreatWarningAsError>true</TreatWarningAsError>
+      <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat>
+      <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild>
+    </ClCompile>
+    <Link>
+      <SubSystem>Console</SubSystem>
+      <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation>
+      <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation>
+    </Link>
+  </ItemDefinitionGroup>
+
+    <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
+    <ClCompile>
+      <PrecompiledHeader>NotUsing</PrecompiledHeader>
+      <WarningLevel>Level3</WarningLevel>
+      <Optimization>MaxSpeed</Optimization>
+      <PreprocessorDefinitions>WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+      <FunctionLevelLinking>true</FunctionLevelLinking>
+      <IntrinsicFunctions>true</IntrinsicFunctions>
+      <SDLCheck>true</SDLCheck>
+      <RuntimeLibrary>MultiThreaded</RuntimeLibrary>
+      <TreatWarningAsError>true</TreatWarningAsError>
+      <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat>
+      <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild>
+    </ClCompile>
+    <Link>
+      <SubSystem>Console</SubSystem>
+      <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation>
+      <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation>
+      <EnableCOMDATFolding>true</EnableCOMDATFolding>
+      <OptimizeReferences>true</OptimizeReferences>
+    </Link>
+  </ItemDefinitionGroup>
+
+    <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
+    <ClCompile>
+      <PrecompiledHeader>NotUsing</PrecompiledHeader>
+      <WarningLevel>Level3</WarningLevel>
+      <Optimization>MaxSpeed</Optimization>
+      <PreprocessorDefinitions>WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+      <FunctionLevelLinking>true</FunctionLevelLinking>
+      <IntrinsicFunctions>true</IntrinsicFunctions>
+      <SDLCheck>true</SDLCheck>
+      <RuntimeLibrary>MultiThreaded</RuntimeLibrary>
+      <TreatWarningAsError>true</TreatWarningAsError>
+      <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat>
+      <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild>
+    </ClCompile>
+    <Link>
+      <SubSystem>Console</SubSystem>
+      <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation>
+      <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation>
+      <EnableCOMDATFolding>true</EnableCOMDATFolding>
+      <OptimizeReferences>true</OptimizeReferences>
+    </Link>
+  </ItemDefinitionGroup>
+
+  <ItemGroup>
+    <ClCompile Include="$(SolutionDir)\..\test\core\transport\byte_stream_test.c">
+    </ClCompile>
+  </ItemGroup>
+  <ItemGroup>
+    <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\grpc_test_util\grpc_test_util.vcxproj">
+      <Project>{17BCAFC0-5FDC-4C94-AEB9-95F3E220614B}</Project>
+    </ProjectReference>
+    <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\grpc\grpc.vcxproj">
+      <Project>{29D16885-7228-4C31-81ED-5F9187C7F2A9}</Project>
+    </ProjectReference>
+    <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\gpr_test_util\gpr_test_util.vcxproj">
+      <Project>{EAB0A629-17A9-44DB-B5FF-E91A721FE037}</Project>
+    </ProjectReference>
+    <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\gpr\gpr.vcxproj">
+      <Project>{B23D3D1A-9438-4EDA-BEB6-9A0A03D17792}</Project>
+    </ProjectReference>
+  </ItemGroup>
+  <ItemGroup>
+    <None Include="packages.config" />
+  </ItemGroup>
+  <Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
+  <ImportGroup Label="ExtensionTargets">
+  <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies.zlib.redist.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies\grpc.dependencies.zlib.targets')" />
+  <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies.zlib.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies\grpc.dependencies.zlib.targets')" />
+  <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies.openssl.redist.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies\grpc.dependencies.openssl.targets')" />
+  <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies\grpc.dependencies.openssl.targets')" />
+  </ImportGroup>
+  <Target Name="EnsureNuGetPackageBuildImports" BeforeTargets="PrepareForBuild">
+    <PropertyGroup>
+      <ErrorText>This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them.  For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}.</ErrorText>
+    </PropertyGroup>
+    <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies.zlib.redist.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies.zlib.redist.targets')" />
+    <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies.zlib.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies.zlib.targets')" />
+    <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies.openssl.redist.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies.openssl.redist.targets')" />
+    <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.props')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.props')" />
+    <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.targets')" />
+  </Target>
+</Project>
+
diff --git a/vsprojects/vcxproj/test/byte_stream_test/byte_stream_test.vcxproj.filters b/vsprojects/vcxproj/test/byte_stream_test/byte_stream_test.vcxproj.filters
new file mode 100644
index 0000000..65e35b7
--- /dev/null
+++ b/vsprojects/vcxproj/test/byte_stream_test/byte_stream_test.vcxproj.filters
@@ -0,0 +1,21 @@
+<?xml version="1.0" encoding="utf-8"?>
+<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+  <ItemGroup>
+    <ClCompile Include="$(SolutionDir)\..\test\core\transport\byte_stream_test.c">
+      <Filter>test\core\transport</Filter>
+    </ClCompile>
+  </ItemGroup>
+
+  <ItemGroup>
+    <Filter Include="test">
+      <UniqueIdentifier>{f172d292-4ad6-342a-f27a-096c06d43a31}</UniqueIdentifier>
+    </Filter>
+    <Filter Include="test\core">
+      <UniqueIdentifier>{d7f690de-dfe0-56fc-ff3b-38eec3931699}</UniqueIdentifier>
+    </Filter>
+    <Filter Include="test\core\transport">
+      <UniqueIdentifier>{f78f56ef-47df-c99d-18f0-86277f7013f3}</UniqueIdentifier>
+    </Filter>
+  </ItemGroup>
+</Project>
+