Retry throttling implementation.
diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c
index 6cbc333..1cc2b94 100644
--- a/src/core/ext/client_channel/client_channel.c
+++ b/src/core/ext/client_channel/client_channel.c
@@ -47,6 +47,7 @@
 #include "src/core/ext/client_channel/lb_policy_registry.h"
 #include "src/core/ext/client_channel/proxy_mapper_registry.h"
 #include "src/core/ext/client_channel/resolver_registry.h"
+#include "src/core/ext/client_channel/retry_throttle.h"
 #include "src/core/ext/client_channel/subchannel.h"
 #include "src/core/lib/channel/channel_args.h"
 #include "src/core/lib/channel/connected_channel.h"
@@ -165,6 +166,8 @@
   grpc_combiner *combiner;
   /** currently active load balancer */
   grpc_lb_policy *lb_policy;
+  /** retry throttle data */
+  grpc_server_retry_throttle_data *retry_throttle_data;
   /** maps method names to method_parameters structs */
   grpc_slice_hash_table *method_params_table;
   /** incoming resolver result - set by resolver.next() */
@@ -260,6 +263,64 @@
                                         &w->on_changed);
 }
 
+typedef struct {
+  char *server_name;
+  grpc_server_retry_throttle_data *retry_throttle_data;
+} service_config_parsing_state;
+
+static void parse_retry_throttle_params(const grpc_json *field, void *arg) {
+  service_config_parsing_state *parsing_state = arg;
+  if (strcmp(field->key, "retryThrottling") == 0) {
+    if (parsing_state->retry_throttle_data != NULL) return;  // Duplicate.
+    if (field->type != GRPC_JSON_OBJECT) return;
+    int max_milli_tokens = 0;
+    int milli_token_ratio = 0;
+    for (grpc_json *sub_field = field->child; sub_field != NULL;
+         sub_field = sub_field->next) {
+      if (sub_field->key == NULL) continue;
+      if (strcmp(sub_field->key, "maxTokens") == 0) {
+        if (max_milli_tokens != 0) return;  // Duplicate.
+        if (sub_field->type != GRPC_JSON_NUMBER) return;
+        max_milli_tokens = gpr_parse_nonnegative_int(sub_field->value);
+        if (max_milli_tokens == -1) return;
+        max_milli_tokens *= 1000;
+      } else if (strcmp(sub_field->key, "tokenRatio") == 0) {
+        if (milli_token_ratio != 0) return;  // Duplicate.
+        if (sub_field->type != GRPC_JSON_NUMBER) return;
+        // We support up to 3 decimal digits.
+        size_t whole_len = strlen(sub_field->value);
+        uint32_t multiplier = 1;
+        uint32_t decimal_value = 0;
+        const char *decimal_point = strchr(sub_field->value, '.');
+        if (decimal_point != NULL) {
+          whole_len = (size_t)(decimal_point - sub_field->value);
+          multiplier = 1000;
+          size_t decimal_len = strlen(decimal_point + 1);
+          if (decimal_len > 3) decimal_len = 3;
+          if (!gpr_parse_bytes_to_uint32(decimal_point + 1, decimal_len,
+                                         &decimal_value)) {
+            return;
+          }
+          uint32_t decimal_multiplier = 1;
+          for (size_t i = 0; i < (3 - decimal_len); ++i) {
+            decimal_multiplier *= 10;
+          }
+          decimal_value *= decimal_multiplier;
+        }
+        uint32_t whole_value;
+        if (!gpr_parse_bytes_to_uint32(sub_field->value, whole_len,
+                                       &whole_value)) {
+          return;
+        }
+        milli_token_ratio = (int)((whole_value * multiplier) + decimal_value);
+      }
+    }
+    parsing_state->retry_throttle_data =
+        grpc_retry_throttle_map_get_data_for_server(
+            parsing_state->server_name, max_milli_tokens, milli_token_ratio);
+  }
+}
+
 static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
                                               void *arg, grpc_error *error) {
   channel_data *chand = arg;
@@ -271,6 +332,8 @@
   bool exit_idle = false;
   grpc_error *state_error = GRPC_ERROR_CREATE("No load balancing policy");
   char *service_config_json = NULL;
+  service_config_parsing_state parsing_state;
+  memset(&parsing_state, 0, sizeof(parsing_state));
 
   if (chand->resolver_result != NULL) {
     // Find LB policy name.
@@ -330,6 +393,18 @@
       grpc_service_config *service_config =
           grpc_service_config_create(service_config_json);
       if (service_config != NULL) {
+        channel_arg =
+            grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI);
+        GPR_ASSERT(channel_arg != NULL);
+        GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
+        grpc_uri *uri = grpc_uri_parse(channel_arg->value.string, true);
+        GPR_ASSERT(uri->path[0] != '\0');
+        parsing_state.server_name =
+            uri->path[0] == '/' ? uri->path + 1 : uri->path;
+        grpc_service_config_parse_global_params(
+            service_config, parse_retry_throttle_params, &parsing_state);
+        parsing_state.server_name = NULL;
+        grpc_uri_destroy(uri);
         method_params_table = grpc_service_config_create_method_config_table(
             exec_ctx, service_config, method_parameters_create_from_json,
             &method_parameters_vtable);
@@ -361,6 +436,11 @@
     chand->info_service_config_json = service_config_json;
   }
   gpr_mu_unlock(&chand->info_mu);
+
+  if (chand->retry_throttle_data != NULL) {
+    grpc_server_retry_throttle_data_unref(chand->retry_throttle_data);
+  }
+  chand->retry_throttle_data = parsing_state.retry_throttle_data;
   if (chand->method_params_table != NULL) {
     grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table);
   }
@@ -589,6 +669,9 @@
   }
   gpr_free(chand->info_lb_policy_name);
   gpr_free(chand->info_service_config_json);
+  if (chand->retry_throttle_data != NULL) {
+    grpc_server_retry_throttle_data_unref(chand->retry_throttle_data);
+  }
   if (chand->method_params_table != NULL) {
     grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table);
   }
@@ -651,6 +734,9 @@
   grpc_call_stack *owning_call;
 
   grpc_linked_mdelem lb_token_mdelem;
+
+  grpc_closure on_complete;
+  grpc_closure *original_on_complete;
 } call_data;
 
 grpc_subchannel_call *grpc_client_channel_get_subchannel_call(
@@ -977,20 +1063,47 @@
   add_waiting_locked(calld, op);
 }
 
-static void cc_start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx,
-                                                void *arg,
-                                                grpc_error *error_ignored) {
-  GPR_TIMER_BEGIN("cc_start_transport_stream_op_locked", 0);
+static void on_complete_locked(grpc_exec_ctx *exec_ctx, void *arg,
+                               grpc_error *error) {
+  grpc_call_element *elem = arg;
+  channel_data *chand = elem->channel_data;
+  call_data *calld = elem->call_data;
+  if (chand->retry_throttle_data != NULL) {
+    if (error == GRPC_ERROR_NONE) {
+      grpc_server_retry_throttle_data_record_success(
+          &chand->retry_throttle_data);
+    } else {
+      // TODO(roth): In a subsequent PR, check the return value here and
+      // decide whether or not to retry.
+      grpc_server_retry_throttle_data_record_failure(
+          &chand->retry_throttle_data);
+    }
+  }
+  grpc_closure_run(exec_ctx, calld->original_on_complete, error);
+}
+
+static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
+                                             grpc_error *error_ignored) {
+  GPR_TIMER_BEGIN("start_transport_stream_op_locked", 0);
 
   grpc_transport_stream_op *op = arg;
   grpc_call_element *elem = op->handler_private.args[0];
+  channel_data *chand = elem->channel_data;
   call_data *calld = elem->call_data;
 
+  if (op->recv_trailing_metadata != NULL) {
+    GPR_ASSERT(op->on_complete != NULL);
+    calld->original_on_complete = op->on_complete;
+    grpc_closure_init(&calld->on_complete, on_complete_locked, elem,
+                      grpc_combiner_scheduler(chand->combiner, false));
+    op->on_complete = &calld->on_complete;
+  }
+
   start_transport_stream_op_locked_inner(exec_ctx, op, elem);
 
   GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call,
                         "start_transport_stream_op");
-  GPR_TIMER_END("cc_start_transport_stream_op_locked", 0);
+  GPR_TIMER_END("start_transport_stream_op_locked", 0);
 }
 
 /* The logic here is fairly complicated, due to (a) the fact that we
@@ -1030,7 +1143,7 @@
   grpc_closure_sched(
       exec_ctx,
       grpc_closure_init(&op->handler_private.closure,
-                        cc_start_transport_stream_op_locked, op,
+                        start_transport_stream_op_locked, op,
                         grpc_combiner_scheduler(chand->combiner, false)),
       GRPC_ERROR_NONE);
   GPR_TIMER_END("cc_start_transport_stream_op", 0);