blob: 90b93fbe2363b82b93a99b1d8ec8f95498aef65b [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2015 gRPC authors.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080016 *
17 */
18
Yash Tibrewal37fdb732017-09-25 16:45:02 -070019#include <grpc/support/port_platform.h>
20
Craig Tiller9eb0fde2017-03-31 16:59:30 -070021#include "src/core/ext/filters/client_channel/client_channel.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080022
Yash Tibrewalfcd26bc2017-09-25 15:08:28 -070023#include <inttypes.h>
Mark D. Roth718c8342018-02-28 13:00:04 -080024#include <limits.h>
Mark D. Roth4c0fe492016-08-31 13:51:55 -070025#include <stdbool.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080026#include <stdio.h>
Craig Tillereb3b12e2015-06-26 14:42:49 -070027#include <string.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080028
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080029#include <grpc/support/alloc.h>
30#include <grpc/support/log.h>
Mark D. Rothb2d24882016-10-27 15:44:07 -070031#include <grpc/support/string_util.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080032#include <grpc/support/sync.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080033
Yuchen Zeng0bad30a2017-10-05 21:47:39 -070034#include "src/core/ext/filters/client_channel/backup_poller.h"
Craig Tiller9eb0fde2017-03-31 16:59:30 -070035#include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
36#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
Mark D. Roth3e7f2df2018-02-26 13:17:06 -080037#include "src/core/ext/filters/client_channel/method_params.h"
Craig Tiller9eb0fde2017-03-31 16:59:30 -070038#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
39#include "src/core/ext/filters/client_channel/resolver_registry.h"
40#include "src/core/ext/filters/client_channel/retry_throttle.h"
Mark D. Roth718c8342018-02-28 13:00:04 -080041#include "src/core/ext/filters/client_channel/status_util.h"
Craig Tiller9eb0fde2017-03-31 16:59:30 -070042#include "src/core/ext/filters/client_channel/subchannel.h"
Craig Tiller3be7dd02017-04-03 14:30:03 -070043#include "src/core/ext/filters/deadline/deadline_filter.h"
Mark D. Roth718c8342018-02-28 13:00:04 -080044#include "src/core/lib/backoff/backoff.h"
Craig Tiller9533d042016-03-25 17:11:06 -070045#include "src/core/lib/channel/channel_args.h"
46#include "src/core/lib/channel/connected_channel.h"
Mark D. Rothdbdf4952018-01-18 11:21:12 -080047#include "src/core/lib/gpr/string.h"
Mark D. Roth718c8342018-02-28 13:00:04 -080048#include "src/core/lib/gprpp/inlined_vector.h"
49#include "src/core/lib/gprpp/manual_constructor.h"
Craig Tillerbefafe62017-02-09 11:30:54 -080050#include "src/core/lib/iomgr/combiner.h"
Craig Tiller9533d042016-03-25 17:11:06 -070051#include "src/core/lib/iomgr/iomgr.h"
Mark D. Roth4c0fe492016-08-31 13:51:55 -070052#include "src/core/lib/iomgr/polling_entity.h"
Craig Tiller9533d042016-03-25 17:11:06 -070053#include "src/core/lib/profiling/timers.h"
Craig Tiller7c70b6c2017-01-23 07:48:42 -080054#include "src/core/lib/slice/slice_internal.h"
Mark D. Roth718c8342018-02-28 13:00:04 -080055#include "src/core/lib/slice/slice_string_helpers.h"
Craig Tiller9533d042016-03-25 17:11:06 -070056#include "src/core/lib/surface/channel.h"
57#include "src/core/lib/transport/connectivity_state.h"
Mark D. Roth718c8342018-02-28 13:00:04 -080058#include "src/core/lib/transport/error_utils.h"
Mark D. Roth9fe284e2016-09-12 11:22:27 -070059#include "src/core/lib/transport/metadata.h"
60#include "src/core/lib/transport/metadata_batch.h"
Mark D. Rothea846a02016-11-03 11:32:54 -070061#include "src/core/lib/transport/service_config.h"
Mark D. Roth9fe284e2016-09-12 11:22:27 -070062#include "src/core/lib/transport/static_metadata.h"
Mark D. Roth718c8342018-02-28 13:00:04 -080063#include "src/core/lib/transport/status_metadata.h"
Craig Tiller8910ac62015-10-08 16:49:15 -070064
Mark D. Roth3e7f2df2018-02-26 13:17:06 -080065using grpc_core::internal::ClientChannelMethodParams;
66
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080067/* Client channel implementation */
68
Mark D. Roth718c8342018-02-28 13:00:04 -080069// By default, we buffer 256 KiB per RPC for retries.
70// TODO(roth): Do we have any data to suggest a better value?
71#define DEFAULT_PER_RPC_RETRY_BUFFER_SIZE (256 << 10)
72
73// This value was picked arbitrarily. It can be changed if there is
74// any even moderately compelling reason to do so.
75#define RETRY_BACKOFF_JITTER 0.2
76
Craig Tiller694580f2017-10-18 14:48:14 -070077grpc_core::TraceFlag grpc_client_channel_trace(false, "client_channel");
Mark D. Roth60751fe2017-07-07 12:50:33 -070078
Mark D. Roth26b7be42016-10-24 10:08:07 -070079/*************************************************************************
Mark D. Roth3e7f2df2018-02-26 13:17:06 -080080 * CHANNEL-WIDE FUNCTIONS
Mark D. Roth26b7be42016-10-24 10:08:07 -070081 */
82
Alexander Polcync3b1f182017-04-18 13:51:36 -070083struct external_connectivity_watcher;
84
Mark D. Roth3e7f2df2018-02-26 13:17:06 -080085typedef grpc_core::SliceHashTable<
86 grpc_core::RefCountedPtr<ClientChannelMethodParams>>
87 MethodParamsTable;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080088
Craig Tiller800dacb2015-10-06 09:10:26 -070089typedef struct client_channel_channel_data {
Mark D. Roth209f6442018-02-08 10:26:46 -080090 grpc_core::OrphanablePtr<grpc_core::Resolver> resolver;
Mark D. Roth4c0fe492016-08-31 13:51:55 -070091 bool started_resolving;
Craig Tiller3be7dd02017-04-03 14:30:03 -070092 bool deadline_checking_enabled;
Craig Tillerbaa14a92017-11-03 09:09:36 -070093 grpc_client_channel_factory* client_channel_factory;
Mark D. Roth718c8342018-02-28 13:00:04 -080094 bool enable_retries;
95 size_t per_rpc_retry_buffer_size;
Craig Tillerf5f17122015-06-25 08:47:26 -070096
Craig Tillerbefafe62017-02-09 11:30:54 -080097 /** combiner protecting all variables below in this data structure */
Craig Tillerbaa14a92017-11-03 09:09:36 -070098 grpc_combiner* combiner;
Mark D. Roth046cf762016-09-26 11:13:51 -070099 /** currently active load balancer */
Mark D. Rothc8875492018-02-20 08:33:48 -0800100 grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> lb_policy;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800101 /** retry throttle data */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700102 grpc_server_retry_throttle_data* retry_throttle_data;
Mark D. Roth9d480942016-10-19 14:18:05 -0700103 /** maps method names to method_parameters structs */
Mark D. Roth3e7f2df2018-02-26 13:17:06 -0800104 grpc_core::RefCountedPtr<MethodParamsTable> method_params_table;
Mark D. Roth046cf762016-09-26 11:13:51 -0700105 /** incoming resolver result - set by resolver.next() */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700106 grpc_channel_args* resolver_result;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700107 /** a list of closures that are all waiting for resolver result to come in */
108 grpc_closure_list waiting_for_resolver_result_closures;
Craig Tiller3f475422015-06-25 10:43:05 -0700109 /** resolver callback */
Mark D. Rothff4df062016-08-22 15:02:49 -0700110 grpc_closure on_resolver_result_changed;
Craig Tiller3f475422015-06-25 10:43:05 -0700111 /** connectivity state being tracked */
Craig Tillerca3e9d32015-06-27 18:37:27 -0700112 grpc_connectivity_state_tracker state_tracker;
Craig Tiller48cb07c2015-07-15 16:16:15 -0700113 /** when an lb_policy arrives, should we try to exit idle */
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700114 bool exit_idle_when_lb_policy_arrives;
Craig Tiller906e3bc2015-11-24 07:31:31 -0800115 /** owning stack */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700116 grpc_channel_stack* owning_stack;
Craig Tiller69b093b2016-02-25 19:04:07 -0800117 /** interested parties (owned) */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700118 grpc_pollset_set* interested_parties;
Craig Tiller613dafa2017-02-09 12:00:43 -0800119
Alexander Polcync3b1f182017-04-18 13:51:36 -0700120 /* external_connectivity_watcher_list head is guarded by its own mutex, since
121 * counts need to be grabbed immediately without polling on a cq */
122 gpr_mu external_connectivity_watcher_list_mu;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700123 struct external_connectivity_watcher* external_connectivity_watcher_list_head;
Alexander Polcync3b1f182017-04-18 13:51:36 -0700124
Mark D. Roth718c8342018-02-28 13:00:04 -0800125 /* the following properties are guarded by a mutex since APIs require them
Craig Tiller46dd7902017-02-23 09:42:16 -0800126 to be instantaneously available */
Craig Tiller613dafa2017-02-09 12:00:43 -0800127 gpr_mu info_mu;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700128 char* info_lb_policy_name;
Craig Tiller613dafa2017-02-09 12:00:43 -0800129 /** service config in JSON form */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700130 char* info_service_config_json;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800131} channel_data;
132
Juanli Shen592cf342017-12-04 20:52:01 -0800133typedef struct {
134 channel_data* chand;
135 /** used as an identifier, don't dereference it because the LB policy may be
136 * non-existing when the callback is run */
Mark D. Rothc8875492018-02-20 08:33:48 -0800137 grpc_core::LoadBalancingPolicy* lb_policy;
Juanli Shen592cf342017-12-04 20:52:01 -0800138 grpc_closure closure;
139} reresolution_request_args;
140
Craig Tillerd6c98df2015-08-18 09:33:44 -0700141/** We create one watcher for each new lb_policy that is returned from a
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700142 resolver, to watch for state changes from the lb_policy. When a state
143 change is seen, we update the channel, and create a new watcher. */
Craig Tillera82950e2015-09-22 12:33:20 -0700144typedef struct {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700145 channel_data* chand;
Craig Tiller33825112015-09-18 07:44:19 -0700146 grpc_closure on_changed;
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700147 grpc_connectivity_state state;
Mark D. Rothc8875492018-02-20 08:33:48 -0800148 grpc_core::LoadBalancingPolicy* lb_policy;
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700149} lb_policy_connectivity_watcher;
150
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800151static void watch_lb_policy_locked(channel_data* chand,
Mark D. Rothc8875492018-02-20 08:33:48 -0800152 grpc_core::LoadBalancingPolicy* lb_policy,
Craig Tiller2400bf52017-02-09 16:25:19 -0800153 grpc_connectivity_state current_state);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700154
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800155static void set_channel_connectivity_state_locked(channel_data* chand,
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800156 grpc_connectivity_state state,
Craig Tillerbaa14a92017-11-03 09:09:36 -0700157 grpc_error* error,
158 const char* reason) {
David Garcia Quintas37251282017-04-14 13:46:03 -0700159 /* TODO: Improve failure handling:
160 * - Make it possible for policies to return GRPC_CHANNEL_TRANSIENT_FAILURE.
161 * - Hand over pending picks from old policies during the switch that happens
162 * when resolver provides an update. */
Craig Tiller4782d922017-11-10 09:53:21 -0800163 if (chand->lb_policy != nullptr) {
David Garcia Quintas956f7002017-04-13 15:40:06 -0700164 if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
165 /* cancel picks with wait_for_ready=false */
Mark D. Rothc8875492018-02-20 08:33:48 -0800166 chand->lb_policy->CancelMatchingPicksLocked(
David Garcia Quintas956f7002017-04-13 15:40:06 -0700167 /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY,
168 /* check= */ 0, GRPC_ERROR_REF(error));
169 } else if (state == GRPC_CHANNEL_SHUTDOWN) {
170 /* cancel all picks */
Mark D. Rothc8875492018-02-20 08:33:48 -0800171 chand->lb_policy->CancelMatchingPicksLocked(/* mask= */ 0, /* check= */ 0,
172 GRPC_ERROR_REF(error));
David Garcia Quintas956f7002017-04-13 15:40:06 -0700173 }
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800174 }
Craig Tiller6014e8a2017-10-16 13:50:29 -0700175 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700176 gpr_log(GPR_DEBUG, "chand=%p: setting connectivity state to %s", chand,
177 grpc_connectivity_state_name(state));
178 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800179 grpc_connectivity_state_set(&chand->state_tracker, state, error, reason);
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800180}
181
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800182static void on_lb_policy_state_changed_locked(void* arg, grpc_error* error) {
Noah Eisen4d20a662018-02-09 09:34:04 -0800183 lb_policy_connectivity_watcher* w =
184 static_cast<lb_policy_connectivity_watcher*>(arg);
Craig Tillerc5de8352017-02-09 14:08:05 -0800185 /* check if the notification is for the latest policy */
Mark D. Rothc8875492018-02-20 08:33:48 -0800186 if (w->lb_policy == w->chand->lb_policy.get()) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700187 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700188 gpr_log(GPR_DEBUG, "chand=%p: lb_policy=%p state changed to %s", w->chand,
189 w->lb_policy, grpc_connectivity_state_name(w->state));
190 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800191 set_channel_connectivity_state_locked(w->chand, w->state,
Craig Tillerc5de8352017-02-09 14:08:05 -0800192 GRPC_ERROR_REF(error), "lb_changed");
193 if (w->state != GRPC_CHANNEL_SHUTDOWN) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800194 watch_lb_policy_locked(w->chand, w->lb_policy, w->state);
Craig Tillerc5de8352017-02-09 14:08:05 -0800195 }
Craig Tillera82950e2015-09-22 12:33:20 -0700196 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800197 GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack, "watch_lb_policy");
Craig Tillera82950e2015-09-22 12:33:20 -0700198 gpr_free(w);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700199}
200
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800201static void watch_lb_policy_locked(channel_data* chand,
Mark D. Rothc8875492018-02-20 08:33:48 -0800202 grpc_core::LoadBalancingPolicy* lb_policy,
Craig Tiller2400bf52017-02-09 16:25:19 -0800203 grpc_connectivity_state current_state) {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700204 lb_policy_connectivity_watcher* w =
Noah Eisenbe82e642018-02-09 09:16:55 -0800205 static_cast<lb_policy_connectivity_watcher*>(gpr_malloc(sizeof(*w)));
Craig Tiller906e3bc2015-11-24 07:31:31 -0800206 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy");
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700207 w->chand = chand;
ncteisen274bbbe2017-06-08 14:57:11 -0700208 GRPC_CLOSURE_INIT(&w->on_changed, on_lb_policy_state_changed_locked, w,
Craig Tilleree4b1452017-05-12 10:56:03 -0700209 grpc_combiner_scheduler(chand->combiner));
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700210 w->state = current_state;
211 w->lb_policy = lb_policy;
Mark D. Rothc8875492018-02-20 08:33:48 -0800212 lb_policy->NotifyOnStateChangeLocked(&w->state, &w->on_changed);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700213}
214
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800215static void start_resolving_locked(channel_data* chand) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700216 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700217 gpr_log(GPR_DEBUG, "chand=%p: starting name resolution", chand);
218 }
219 GPR_ASSERT(!chand->started_resolving);
220 chand->started_resolving = true;
221 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
Mark D. Roth209f6442018-02-08 10:26:46 -0800222 chand->resolver->NextLocked(&chand->resolver_result,
223 &chand->on_resolver_result_changed);
Mark D. Roth60751fe2017-07-07 12:50:33 -0700224}
225
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800226typedef struct {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700227 char* server_name;
228 grpc_server_retry_throttle_data* retry_throttle_data;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800229} service_config_parsing_state;
230
Mark D. Roth3e7f2df2018-02-26 13:17:06 -0800231static void parse_retry_throttle_params(
232 const grpc_json* field, service_config_parsing_state* parsing_state) {
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800233 if (strcmp(field->key, "retryThrottling") == 0) {
Craig Tiller4782d922017-11-10 09:53:21 -0800234 if (parsing_state->retry_throttle_data != nullptr) return; // Duplicate.
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800235 if (field->type != GRPC_JSON_OBJECT) return;
236 int max_milli_tokens = 0;
237 int milli_token_ratio = 0;
Craig Tiller4782d922017-11-10 09:53:21 -0800238 for (grpc_json* sub_field = field->child; sub_field != nullptr;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800239 sub_field = sub_field->next) {
Craig Tiller4782d922017-11-10 09:53:21 -0800240 if (sub_field->key == nullptr) return;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800241 if (strcmp(sub_field->key, "maxTokens") == 0) {
242 if (max_milli_tokens != 0) return; // Duplicate.
243 if (sub_field->type != GRPC_JSON_NUMBER) return;
244 max_milli_tokens = gpr_parse_nonnegative_int(sub_field->value);
245 if (max_milli_tokens == -1) return;
246 max_milli_tokens *= 1000;
247 } else if (strcmp(sub_field->key, "tokenRatio") == 0) {
248 if (milli_token_ratio != 0) return; // Duplicate.
249 if (sub_field->type != GRPC_JSON_NUMBER) return;
250 // We support up to 3 decimal digits.
251 size_t whole_len = strlen(sub_field->value);
252 uint32_t multiplier = 1;
253 uint32_t decimal_value = 0;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700254 const char* decimal_point = strchr(sub_field->value, '.');
Craig Tiller4782d922017-11-10 09:53:21 -0800255 if (decimal_point != nullptr) {
Noah Eisenbe82e642018-02-09 09:16:55 -0800256 whole_len = static_cast<size_t>(decimal_point - sub_field->value);
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800257 multiplier = 1000;
258 size_t decimal_len = strlen(decimal_point + 1);
259 if (decimal_len > 3) decimal_len = 3;
260 if (!gpr_parse_bytes_to_uint32(decimal_point + 1, decimal_len,
261 &decimal_value)) {
262 return;
263 }
264 uint32_t decimal_multiplier = 1;
265 for (size_t i = 0; i < (3 - decimal_len); ++i) {
266 decimal_multiplier *= 10;
267 }
268 decimal_value *= decimal_multiplier;
269 }
270 uint32_t whole_value;
271 if (!gpr_parse_bytes_to_uint32(sub_field->value, whole_len,
272 &whole_value)) {
273 return;
274 }
Noah Eisen4d20a662018-02-09 09:34:04 -0800275 milli_token_ratio =
276 static_cast<int>((whole_value * multiplier) + decimal_value);
Mark D. Rothb3322562017-02-23 14:38:02 -0800277 if (milli_token_ratio <= 0) return;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800278 }
279 }
280 parsing_state->retry_throttle_data =
281 grpc_retry_throttle_map_get_data_for_server(
282 parsing_state->server_name, max_milli_tokens, milli_token_ratio);
283 }
284}
285
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800286static void request_reresolution_locked(void* arg, grpc_error* error) {
Noah Eisen4d20a662018-02-09 09:34:04 -0800287 reresolution_request_args* args =
288 static_cast<reresolution_request_args*>(arg);
Juanli Shen592cf342017-12-04 20:52:01 -0800289 channel_data* chand = args->chand;
290 // If this invocation is for a stale LB policy, treat it as an LB shutdown
291 // signal.
Mark D. Rothc8875492018-02-20 08:33:48 -0800292 if (args->lb_policy != chand->lb_policy.get() || error != GRPC_ERROR_NONE ||
Juanli Shen592cf342017-12-04 20:52:01 -0800293 chand->resolver == nullptr) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800294 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "re-resolution");
Juanli Shen592cf342017-12-04 20:52:01 -0800295 gpr_free(args);
296 return;
297 }
298 if (grpc_client_channel_trace.enabled()) {
299 gpr_log(GPR_DEBUG, "chand=%p: started name re-resolving", chand);
300 }
Mark D. Roth209f6442018-02-08 10:26:46 -0800301 chand->resolver->RequestReresolutionLocked();
Juanli Shen592cf342017-12-04 20:52:01 -0800302 // Give back the closure to the LB policy.
Mark D. Rothc8875492018-02-20 08:33:48 -0800303 chand->lb_policy->SetReresolutionClosureLocked(&args->closure);
Juanli Shen592cf342017-12-04 20:52:01 -0800304}
305
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800306static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
Noah Eisenbe82e642018-02-09 09:16:55 -0800307 channel_data* chand = static_cast<channel_data*>(arg);
Craig Tiller6014e8a2017-10-16 13:50:29 -0700308 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700309 gpr_log(GPR_DEBUG, "chand=%p: got resolver result: error=%s", chand,
310 grpc_error_string(error));
311 }
Mark D. Roth718c8342018-02-28 13:00:04 -0800312 // Extract the following fields from the resolver result, if non-nullptr.
Mark D. Roth15494b52017-07-12 15:26:55 -0700313 bool lb_policy_updated = false;
Mark D. Rothc8875492018-02-20 08:33:48 -0800314 bool lb_policy_created = false;
Craig Tiller4782d922017-11-10 09:53:21 -0800315 char* lb_policy_name_dup = nullptr;
Mark D. Roth60751fe2017-07-07 12:50:33 -0700316 bool lb_policy_name_changed = false;
Mark D. Rothc8875492018-02-20 08:33:48 -0800317 grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> new_lb_policy;
Craig Tiller4782d922017-11-10 09:53:21 -0800318 char* service_config_json = nullptr;
319 grpc_server_retry_throttle_data* retry_throttle_data = nullptr;
Mark D. Roth3e7f2df2018-02-26 13:17:06 -0800320 grpc_core::RefCountedPtr<MethodParamsTable> method_params_table;
Craig Tiller4782d922017-11-10 09:53:21 -0800321 if (chand->resolver_result != nullptr) {
Juanli Shen592cf342017-12-04 20:52:01 -0800322 if (chand->resolver != nullptr) {
323 // Find LB policy name.
Juanli Shen592cf342017-12-04 20:52:01 -0800324 const grpc_arg* channel_arg = grpc_channel_args_find(
325 chand->resolver_result, GRPC_ARG_LB_POLICY_NAME);
ncteisenbf323a92018-02-14 17:34:05 -0800326 const char* lb_policy_name = grpc_channel_arg_get_string(channel_arg);
Juanli Shen592cf342017-12-04 20:52:01 -0800327 // Special case: If at least one balancer address is present, we use
328 // the grpclb policy, regardless of what the resolver actually specified.
329 channel_arg =
330 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES);
331 if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) {
332 grpc_lb_addresses* addresses =
Noah Eisenbe82e642018-02-09 09:16:55 -0800333 static_cast<grpc_lb_addresses*>(channel_arg->value.pointer.p);
Juanli Shen592cf342017-12-04 20:52:01 -0800334 bool found_balancer_address = false;
335 for (size_t i = 0; i < addresses->num_addresses; ++i) {
336 if (addresses->addresses[i].is_balancer) {
337 found_balancer_address = true;
338 break;
339 }
340 }
341 if (found_balancer_address) {
342 if (lb_policy_name != nullptr &&
343 strcmp(lb_policy_name, "grpclb") != 0) {
344 gpr_log(GPR_INFO,
345 "resolver requested LB policy %s but provided at least one "
346 "balancer address -- forcing use of grpclb LB policy",
347 lb_policy_name);
348 }
349 lb_policy_name = "grpclb";
350 }
351 }
352 // Use pick_first if nothing was specified and we didn't select grpclb
353 // above.
354 if (lb_policy_name == nullptr) lb_policy_name = "pick_first";
Juanli Shen592cf342017-12-04 20:52:01 -0800355 // Check to see if we're already using the right LB policy.
356 // Note: It's safe to use chand->info_lb_policy_name here without
357 // taking a lock on chand->info_mu, because this function is the
358 // only thing that modifies its value, and it can only be invoked
359 // once at any given time.
360 lb_policy_name_changed =
361 chand->info_lb_policy_name == nullptr ||
362 gpr_stricmp(chand->info_lb_policy_name, lb_policy_name) != 0;
363 if (chand->lb_policy != nullptr && !lb_policy_name_changed) {
364 // Continue using the same LB policy. Update with new addresses.
365 lb_policy_updated = true;
Mark D. Rothc8875492018-02-20 08:33:48 -0800366 chand->lb_policy->UpdateLocked(*chand->resolver_result);
Juanli Shen592cf342017-12-04 20:52:01 -0800367 } else {
368 // Instantiate new LB policy.
Mark D. Rothc8875492018-02-20 08:33:48 -0800369 grpc_core::LoadBalancingPolicy::Args lb_policy_args;
370 lb_policy_args.combiner = chand->combiner;
371 lb_policy_args.client_channel_factory = chand->client_channel_factory;
372 lb_policy_args.args = chand->resolver_result;
373 new_lb_policy =
374 grpc_core::LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
375 lb_policy_name, lb_policy_args);
Juanli Shen592cf342017-12-04 20:52:01 -0800376 if (new_lb_policy == nullptr) {
377 gpr_log(GPR_ERROR, "could not create LB policy \"%s\"",
378 lb_policy_name);
379 } else {
Mark D. Roth3ef4af22018-02-21 07:53:26 -0800380 lb_policy_created = true;
Juanli Shen592cf342017-12-04 20:52:01 -0800381 reresolution_request_args* args =
Noah Eisen4d20a662018-02-09 09:34:04 -0800382 static_cast<reresolution_request_args*>(
383 gpr_zalloc(sizeof(*args)));
Juanli Shen592cf342017-12-04 20:52:01 -0800384 args->chand = chand;
Mark D. Rothc8875492018-02-20 08:33:48 -0800385 args->lb_policy = new_lb_policy.get();
Juanli Shen592cf342017-12-04 20:52:01 -0800386 GRPC_CLOSURE_INIT(&args->closure, request_reresolution_locked, args,
387 grpc_combiner_scheduler(chand->combiner));
388 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "re-resolution");
Mark D. Rothc8875492018-02-20 08:33:48 -0800389 new_lb_policy->SetReresolutionClosureLocked(&args->closure);
Juanli Shen592cf342017-12-04 20:52:01 -0800390 }
391 }
Mark D. Roth718c8342018-02-28 13:00:04 -0800392 // Before we clean up, save a copy of lb_policy_name, since it might
393 // be pointing to data inside chand->resolver_result.
394 // The copy will be saved in chand->lb_policy_name below.
395 lb_policy_name_dup = gpr_strdup(lb_policy_name);
Juanli Shen592cf342017-12-04 20:52:01 -0800396 // Find service config.
397 channel_arg = grpc_channel_args_find(chand->resolver_result,
398 GRPC_ARG_SERVICE_CONFIG);
ncteisenbf323a92018-02-14 17:34:05 -0800399 service_config_json =
400 gpr_strdup(grpc_channel_arg_get_string(channel_arg));
401 if (service_config_json != nullptr) {
Mark D. Roth3e7f2df2018-02-26 13:17:06 -0800402 grpc_core::UniquePtr<grpc_core::ServiceConfig> service_config =
403 grpc_core::ServiceConfig::Create(service_config_json);
Juanli Shen592cf342017-12-04 20:52:01 -0800404 if (service_config != nullptr) {
Mark D. Roth718c8342018-02-28 13:00:04 -0800405 if (chand->enable_retries) {
406 channel_arg = grpc_channel_args_find(chand->resolver_result,
407 GRPC_ARG_SERVER_URI);
408 const char* server_uri = grpc_channel_arg_get_string(channel_arg);
409 GPR_ASSERT(server_uri != nullptr);
410 grpc_uri* uri = grpc_uri_parse(server_uri, true);
411 GPR_ASSERT(uri->path[0] != '\0');
412 service_config_parsing_state parsing_state;
413 memset(&parsing_state, 0, sizeof(parsing_state));
414 parsing_state.server_name =
415 uri->path[0] == '/' ? uri->path + 1 : uri->path;
416 service_config->ParseGlobalParams(parse_retry_throttle_params,
417 &parsing_state);
418 grpc_uri_destroy(uri);
419 retry_throttle_data = parsing_state.retry_throttle_data;
420 }
Mark D. Roth3e7f2df2018-02-26 13:17:06 -0800421 method_params_table = service_config->CreateMethodConfigTable(
422 ClientChannelMethodParams::CreateFromJson);
Juanli Shen592cf342017-12-04 20:52:01 -0800423 }
424 }
Mark D. Roth9fe284e2016-09-12 11:22:27 -0700425 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800426 grpc_channel_args_destroy(chand->resolver_result);
Craig Tiller4782d922017-11-10 09:53:21 -0800427 chand->resolver_result = nullptr;
Craig Tillera82950e2015-09-22 12:33:20 -0700428 }
Craig Tiller6014e8a2017-10-16 13:50:29 -0700429 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700430 gpr_log(GPR_DEBUG,
431 "chand=%p: resolver result: lb_policy_name=\"%s\"%s, "
432 "service_config=\"%s\"",
Yash Tibrewal9eb86722017-09-17 23:43:30 -0700433 chand, lb_policy_name_dup,
434 lb_policy_name_changed ? " (changed)" : "", service_config_json);
Mark D. Roth60751fe2017-07-07 12:50:33 -0700435 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700436 // Now swap out fields in chand. Note that the new values may still
Mark D. Roth718c8342018-02-28 13:00:04 -0800437 // be nullptr if (e.g.) the resolver failed to return results or the
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700438 // results did not contain the necessary data.
439 //
440 // First, swap out the data used by cc_get_channel_info().
Craig Tiller613dafa2017-02-09 12:00:43 -0800441 gpr_mu_lock(&chand->info_mu);
Craig Tiller4782d922017-11-10 09:53:21 -0800442 if (lb_policy_name_dup != nullptr) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800443 gpr_free(chand->info_lb_policy_name);
Yash Tibrewal9eb86722017-09-17 23:43:30 -0700444 chand->info_lb_policy_name = lb_policy_name_dup;
Mark D. Rothb2d24882016-10-27 15:44:07 -0700445 }
Craig Tiller4782d922017-11-10 09:53:21 -0800446 if (service_config_json != nullptr) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800447 gpr_free(chand->info_service_config_json);
448 chand->info_service_config_json = service_config_json;
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800449 }
Craig Tiller613dafa2017-02-09 12:00:43 -0800450 gpr_mu_unlock(&chand->info_mu);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700451 // Swap out the retry throttle data.
Craig Tiller4782d922017-11-10 09:53:21 -0800452 if (chand->retry_throttle_data != nullptr) {
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800453 grpc_server_retry_throttle_data_unref(chand->retry_throttle_data);
454 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700455 chand->retry_throttle_data = retry_throttle_data;
456 // Swap out the method params table.
Mark D. Roth3e7f2df2018-02-26 13:17:06 -0800457 chand->method_params_table = std::move(method_params_table);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700458 // If we have a new LB policy or are shutting down (in which case
Mark D. Roth718c8342018-02-28 13:00:04 -0800459 // new_lb_policy will be nullptr), swap out the LB policy, unreffing the
460 // old one and removing its fds from chand->interested_parties.
461 // Note that we do NOT do this if either (a) we updated the existing
462 // LB policy above or (b) we failed to create the new LB policy (in
463 // which case we want to continue using the most recent one we had).
Craig Tiller4782d922017-11-10 09:53:21 -0800464 if (new_lb_policy != nullptr || error != GRPC_ERROR_NONE ||
465 chand->resolver == nullptr) {
466 if (chand->lb_policy != nullptr) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700467 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700468 gpr_log(GPR_DEBUG, "chand=%p: unreffing lb_policy=%p", chand,
Mark D. Rothc8875492018-02-20 08:33:48 -0800469 chand->lb_policy.get());
Mark D. Roth60751fe2017-07-07 12:50:33 -0700470 }
Mark D. Rothc8875492018-02-20 08:33:48 -0800471 grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700472 chand->interested_parties);
Mark D. Rothc8875492018-02-20 08:33:48 -0800473 chand->lb_policy->HandOffPendingPicksLocked(new_lb_policy.get());
474 chand->lb_policy.reset();
Craig Tiller45724b32015-09-22 10:42:19 -0700475 }
Mark D. Rothc8875492018-02-20 08:33:48 -0800476 chand->lb_policy = std::move(new_lb_policy);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700477 }
478 // Now that we've swapped out the relevant fields of chand, check for
479 // error or shutdown.
Craig Tiller4782d922017-11-10 09:53:21 -0800480 if (error != GRPC_ERROR_NONE || chand->resolver == nullptr) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700481 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700482 gpr_log(GPR_DEBUG, "chand=%p: shutting down", chand);
483 }
Craig Tiller4782d922017-11-10 09:53:21 -0800484 if (chand->resolver != nullptr) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700485 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700486 gpr_log(GPR_DEBUG, "chand=%p: shutting down resolver", chand);
487 }
Mark D. Roth209f6442018-02-08 10:26:46 -0800488 chand->resolver.reset();
Craig Tiller76a5c0e2016-03-09 09:05:30 -0800489 }
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800490 set_channel_connectivity_state_locked(
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800491 chand, GRPC_CHANNEL_SHUTDOWN,
ncteisen4b36a3d2017-03-13 19:08:06 -0700492 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700493 "Got resolver result after disconnection", &error, 1),
Craig Tiller804ff712016-05-05 16:25:40 -0700494 "resolver_gone");
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700495 grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures,
496 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
497 "Channel disconnected", &error, 1));
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800498 GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
Mark D. Roth1b95f472018-02-15 12:54:02 -0800499 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "resolver");
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700500 } else { // Not shutting down.
501 grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700502 grpc_error* state_error =
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700503 GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy");
Mark D. Rothc8875492018-02-20 08:33:48 -0800504 if (lb_policy_created) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700505 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700506 gpr_log(GPR_DEBUG, "chand=%p: initializing new LB policy", chand);
507 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700508 GRPC_ERROR_UNREF(state_error);
Mark D. Rothc8875492018-02-20 08:33:48 -0800509 state = chand->lb_policy->CheckConnectivityLocked(&state_error);
510 grpc_pollset_set_add_pollset_set(chand->lb_policy->interested_parties(),
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700511 chand->interested_parties);
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800512 GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700513 if (chand->exit_idle_when_lb_policy_arrives) {
Mark D. Rothc8875492018-02-20 08:33:48 -0800514 chand->lb_policy->ExitIdleLocked();
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700515 chand->exit_idle_when_lb_policy_arrives = false;
516 }
Mark D. Rothc8875492018-02-20 08:33:48 -0800517 watch_lb_policy_locked(chand, chand->lb_policy.get(), state);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700518 }
Mark D. Roth15494b52017-07-12 15:26:55 -0700519 if (!lb_policy_updated) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800520 set_channel_connectivity_state_locked(
521 chand, state, GRPC_ERROR_REF(state_error), "new_lb+resolver");
Mark D. Roth15494b52017-07-12 15:26:55 -0700522 }
Mark D. Roth209f6442018-02-08 10:26:46 -0800523 chand->resolver->NextLocked(&chand->resolver_result,
524 &chand->on_resolver_result_changed);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700525 GRPC_ERROR_UNREF(state_error);
Craig Tillera82950e2015-09-22 12:33:20 -0700526 }
Craig Tiller3f475422015-06-25 10:43:05 -0700527}
528
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800529static void start_transport_op_locked(void* arg, grpc_error* error_ignored) {
Noah Eisenbe82e642018-02-09 09:16:55 -0800530 grpc_transport_op* op = static_cast<grpc_transport_op*>(arg);
Craig Tillerbaa14a92017-11-03 09:09:36 -0700531 grpc_channel_element* elem =
Noah Eisenbe82e642018-02-09 09:16:55 -0800532 static_cast<grpc_channel_element*>(op->handler_private.extra_arg);
533 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Craig Tiller000cd8f2015-09-18 07:20:29 -0700534
Craig Tiller4782d922017-11-10 09:53:21 -0800535 if (op->on_connectivity_state_change != nullptr) {
Craig Tillera82950e2015-09-22 12:33:20 -0700536 grpc_connectivity_state_notify_on_state_change(
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800537 &chand->state_tracker, op->connectivity_state,
Craig Tillera82950e2015-09-22 12:33:20 -0700538 op->on_connectivity_state_change);
Craig Tiller4782d922017-11-10 09:53:21 -0800539 op->on_connectivity_state_change = nullptr;
540 op->connectivity_state = nullptr;
Craig Tillera82950e2015-09-22 12:33:20 -0700541 }
542
Yuchen Zengc272dd72017-12-05 12:18:34 -0800543 if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
Craig Tiller4782d922017-11-10 09:53:21 -0800544 if (chand->lb_policy == nullptr) {
ncteisen274bbbe2017-06-08 14:57:11 -0700545 GRPC_CLOSURE_SCHED(
Yash Tibrewald6c292f2017-12-07 19:38:43 -0800546 op->send_ping.on_initiate,
Yuchen Zengc272dd72017-12-05 12:18:34 -0800547 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing"));
548 GRPC_CLOSURE_SCHED(
Yash Tibrewald6c292f2017-12-07 19:38:43 -0800549 op->send_ping.on_ack,
ncteisen4b36a3d2017-03-13 19:08:06 -0700550 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing"));
Craig Tiller26dab312015-12-07 14:43:47 -0800551 } else {
Mark D. Rothc8875492018-02-20 08:33:48 -0800552 chand->lb_policy->PingOneLocked(op->send_ping.on_initiate,
553 op->send_ping.on_ack);
Craig Tiller4782d922017-11-10 09:53:21 -0800554 op->bind_pollset = nullptr;
Craig Tiller26dab312015-12-07 14:43:47 -0800555 }
Yuchen Zengc272dd72017-12-05 12:18:34 -0800556 op->send_ping.on_initiate = nullptr;
557 op->send_ping.on_ack = nullptr;
Craig Tiller26dab312015-12-07 14:43:47 -0800558 }
559
Craig Tiller1c51edc2016-05-07 16:18:43 -0700560 if (op->disconnect_with_error != GRPC_ERROR_NONE) {
Craig Tiller4782d922017-11-10 09:53:21 -0800561 if (chand->resolver != nullptr) {
Craig Tiller1c51edc2016-05-07 16:18:43 -0700562 set_channel_connectivity_state_locked(
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800563 chand, GRPC_CHANNEL_SHUTDOWN,
Craig Tiller1c51edc2016-05-07 16:18:43 -0700564 GRPC_ERROR_REF(op->disconnect_with_error), "disconnect");
Mark D. Roth209f6442018-02-08 10:26:46 -0800565 chand->resolver.reset();
Craig Tiller1c51edc2016-05-07 16:18:43 -0700566 if (!chand->started_resolving) {
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700567 grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures,
Craig Tiller1c51edc2016-05-07 16:18:43 -0700568 GRPC_ERROR_REF(op->disconnect_with_error));
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800569 GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
Craig Tiller1c51edc2016-05-07 16:18:43 -0700570 }
Craig Tiller4782d922017-11-10 09:53:21 -0800571 if (chand->lb_policy != nullptr) {
Mark D. Rothc8875492018-02-20 08:33:48 -0800572 grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
Craig Tiller1c51edc2016-05-07 16:18:43 -0700573 chand->interested_parties);
Mark D. Rothc8875492018-02-20 08:33:48 -0800574 chand->lb_policy.reset();
Craig Tiller1c51edc2016-05-07 16:18:43 -0700575 }
Craig Tillerb12d22a2016-04-23 12:50:21 -0700576 }
Craig Tiller1c51edc2016-05-07 16:18:43 -0700577 GRPC_ERROR_UNREF(op->disconnect_with_error);
Craig Tillera82950e2015-09-22 12:33:20 -0700578 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800579 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "start_transport_op");
Craig Tillerd2e5cfc2017-02-09 13:02:20 -0800580
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800581 GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE);
Craig Tillerbefafe62017-02-09 11:30:54 -0800582}
583
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800584static void cc_start_transport_op(grpc_channel_element* elem,
Craig Tillerbaa14a92017-11-03 09:09:36 -0700585 grpc_transport_op* op) {
Noah Eisenbe82e642018-02-09 09:16:55 -0800586 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Craig Tillerbefafe62017-02-09 11:30:54 -0800587
Craig Tillerbefafe62017-02-09 11:30:54 -0800588 GPR_ASSERT(op->set_accept_stream == false);
Craig Tiller4782d922017-11-10 09:53:21 -0800589 if (op->bind_pollset != nullptr) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800590 grpc_pollset_set_add_pollset(chand->interested_parties, op->bind_pollset);
Craig Tillerbefafe62017-02-09 11:30:54 -0800591 }
592
Craig Tillerc55c1022017-03-10 10:26:42 -0800593 op->handler_private.extra_arg = elem;
Craig Tillerd2e5cfc2017-02-09 13:02:20 -0800594 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "start_transport_op");
ncteisen274bbbe2017-06-08 14:57:11 -0700595 GRPC_CLOSURE_SCHED(
ncteisen274bbbe2017-06-08 14:57:11 -0700596 GRPC_CLOSURE_INIT(&op->handler_private.closure, start_transport_op_locked,
Craig Tilleree4b1452017-05-12 10:56:03 -0700597 op, grpc_combiner_scheduler(chand->combiner)),
Craig Tillerbefafe62017-02-09 11:30:54 -0800598 GRPC_ERROR_NONE);
Craig Tillerca3e9d32015-06-27 18:37:27 -0700599}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800600
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800601static void cc_get_channel_info(grpc_channel_element* elem,
Craig Tillerbaa14a92017-11-03 09:09:36 -0700602 const grpc_channel_info* info) {
Noah Eisenbe82e642018-02-09 09:16:55 -0800603 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Craig Tiller613dafa2017-02-09 12:00:43 -0800604 gpr_mu_lock(&chand->info_mu);
Craig Tiller4782d922017-11-10 09:53:21 -0800605 if (info->lb_policy_name != nullptr) {
606 *info->lb_policy_name = chand->info_lb_policy_name == nullptr
607 ? nullptr
Craig Tiller613dafa2017-02-09 12:00:43 -0800608 : gpr_strdup(chand->info_lb_policy_name);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700609 }
Craig Tiller4782d922017-11-10 09:53:21 -0800610 if (info->service_config_json != nullptr) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800611 *info->service_config_json =
Craig Tiller4782d922017-11-10 09:53:21 -0800612 chand->info_service_config_json == nullptr
613 ? nullptr
Craig Tiller613dafa2017-02-09 12:00:43 -0800614 : gpr_strdup(chand->info_service_config_json);
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800615 }
Craig Tiller613dafa2017-02-09 12:00:43 -0800616 gpr_mu_unlock(&chand->info_mu);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700617}
618
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700619/* Constructor for channel_data */
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800620static grpc_error* cc_init_channel_elem(grpc_channel_element* elem,
Craig Tillerbaa14a92017-11-03 09:09:36 -0700621 grpc_channel_element_args* args) {
Noah Eisenbe82e642018-02-09 09:16:55 -0800622 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700623 GPR_ASSERT(args->is_last);
624 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800625 // Initialize data members.
Craig Tilleree4b1452017-05-12 10:56:03 -0700626 chand->combiner = grpc_combiner_create();
Craig Tillerd85477512017-02-09 12:02:39 -0800627 gpr_mu_init(&chand->info_mu);
Alexander Polcync3b1f182017-04-18 13:51:36 -0700628 gpr_mu_init(&chand->external_connectivity_watcher_list_mu);
629
630 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
Craig Tiller4782d922017-11-10 09:53:21 -0800631 chand->external_connectivity_watcher_list_head = nullptr;
Alexander Polcync3b1f182017-04-18 13:51:36 -0700632 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
633
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800634 chand->owning_stack = args->channel_stack;
ncteisen274bbbe2017-06-08 14:57:11 -0700635 GRPC_CLOSURE_INIT(&chand->on_resolver_result_changed,
Craig Tillerbefafe62017-02-09 11:30:54 -0800636 on_resolver_result_changed_locked, chand,
Craig Tilleree4b1452017-05-12 10:56:03 -0700637 grpc_combiner_scheduler(chand->combiner));
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800638 chand->interested_parties = grpc_pollset_set_create();
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700639 grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
640 "client_channel");
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800641 grpc_client_channel_start_backup_polling(chand->interested_parties);
Mark D. Roth718c8342018-02-28 13:00:04 -0800642 // Record max per-RPC retry buffer size.
643 const grpc_arg* arg = grpc_channel_args_find(
644 args->channel_args, GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE);
645 chand->per_rpc_retry_buffer_size = (size_t)grpc_channel_arg_get_integer(
646 arg, {DEFAULT_PER_RPC_RETRY_BUFFER_SIZE, 0, INT_MAX});
647 // Record enable_retries.
648 arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_ENABLE_RETRIES);
649 chand->enable_retries = grpc_channel_arg_get_bool(arg, true);
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800650 // Record client channel factory.
Mark D. Roth718c8342018-02-28 13:00:04 -0800651 arg = grpc_channel_args_find(args->channel_args,
652 GRPC_ARG_CLIENT_CHANNEL_FACTORY);
Craig Tiller4782d922017-11-10 09:53:21 -0800653 if (arg == nullptr) {
David Garcia Quintas228a5142017-03-30 19:43:00 -0700654 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
655 "Missing client channel factory in args for client channel filter");
656 }
657 if (arg->type != GRPC_ARG_POINTER) {
658 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
659 "client channel factory arg must be a pointer");
660 }
Yash Tibrewalbc130da2017-09-12 22:44:08 -0700661 grpc_client_channel_factory_ref(
Noah Eisenbe82e642018-02-09 09:16:55 -0800662 static_cast<grpc_client_channel_factory*>(arg->value.pointer.p));
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700663 chand->client_channel_factory =
Noah Eisenbe82e642018-02-09 09:16:55 -0800664 static_cast<grpc_client_channel_factory*>(arg->value.pointer.p);
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800665 // Get server name to resolve, using proxy mapper if needed.
Mark D. Roth86e90592016-11-18 09:56:40 -0800666 arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI);
Craig Tiller4782d922017-11-10 09:53:21 -0800667 if (arg == nullptr) {
David Garcia Quintas228a5142017-03-30 19:43:00 -0700668 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
669 "Missing server uri in args for client channel filter");
670 }
671 if (arg->type != GRPC_ARG_STRING) {
672 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
673 "server uri arg must be a string");
674 }
Craig Tiller4782d922017-11-10 09:53:21 -0800675 char* proxy_name = nullptr;
676 grpc_channel_args* new_args = nullptr;
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800677 grpc_proxy_mappers_map_name(arg->value.string, args->channel_args,
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800678 &proxy_name, &new_args);
679 // Instantiate resolver.
Mark D. Roth209f6442018-02-08 10:26:46 -0800680 chand->resolver = grpc_core::ResolverRegistry::CreateResolver(
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800681 proxy_name != nullptr ? proxy_name : arg->value.string,
Craig Tiller4782d922017-11-10 09:53:21 -0800682 new_args != nullptr ? new_args : args->channel_args,
Craig Tiller972470b2017-02-09 15:05:36 -0800683 chand->interested_parties, chand->combiner);
Craig Tiller4782d922017-11-10 09:53:21 -0800684 if (proxy_name != nullptr) gpr_free(proxy_name);
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800685 if (new_args != nullptr) grpc_channel_args_destroy(new_args);
Craig Tiller4782d922017-11-10 09:53:21 -0800686 if (chand->resolver == nullptr) {
ncteisen4b36a3d2017-03-13 19:08:06 -0700687 return GRPC_ERROR_CREATE_FROM_STATIC_STRING("resolver creation failed");
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800688 }
Craig Tiller3be7dd02017-04-03 14:30:03 -0700689 chand->deadline_checking_enabled =
690 grpc_deadline_checking_enabled(args->channel_args);
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800691 return GRPC_ERROR_NONE;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700692}
693
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800694static void shutdown_resolver_locked(void* arg, grpc_error* error) {
Mark D. Roth209f6442018-02-08 10:26:46 -0800695 grpc_core::Resolver* resolver = static_cast<grpc_core::Resolver*>(arg);
696 resolver->Orphan();
Craig Tiller972470b2017-02-09 15:05:36 -0800697}
698
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700699/* Destructor for channel_data */
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800700static void cc_destroy_channel_elem(grpc_channel_element* elem) {
Noah Eisenbe82e642018-02-09 09:16:55 -0800701 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Craig Tiller4782d922017-11-10 09:53:21 -0800702 if (chand->resolver != nullptr) {
ncteisen274bbbe2017-06-08 14:57:11 -0700703 GRPC_CLOSURE_SCHED(
Mark D. Roth209f6442018-02-08 10:26:46 -0800704 GRPC_CLOSURE_CREATE(shutdown_resolver_locked, chand->resolver.release(),
Yash Tibrewal0ee75742017-10-13 16:07:13 -0700705 grpc_combiner_scheduler(chand->combiner)),
Craig Tiller972470b2017-02-09 15:05:36 -0800706 GRPC_ERROR_NONE);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700707 }
Craig Tiller4782d922017-11-10 09:53:21 -0800708 if (chand->client_channel_factory != nullptr) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800709 grpc_client_channel_factory_unref(chand->client_channel_factory);
Mark D. Roth0e48a9a2016-09-08 14:14:39 -0700710 }
Craig Tiller4782d922017-11-10 09:53:21 -0800711 if (chand->lb_policy != nullptr) {
Mark D. Rothc8875492018-02-20 08:33:48 -0800712 grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700713 chand->interested_parties);
Mark D. Rothc8875492018-02-20 08:33:48 -0800714 chand->lb_policy.reset();
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700715 }
Craig Tiller613dafa2017-02-09 12:00:43 -0800716 gpr_free(chand->info_lb_policy_name);
717 gpr_free(chand->info_service_config_json);
Craig Tiller4782d922017-11-10 09:53:21 -0800718 if (chand->retry_throttle_data != nullptr) {
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800719 grpc_server_retry_throttle_data_unref(chand->retry_throttle_data);
720 }
Craig Tiller4782d922017-11-10 09:53:21 -0800721 if (chand->method_params_table != nullptr) {
Mark D. Roth3e7f2df2018-02-26 13:17:06 -0800722 chand->method_params_table.reset();
Mark D. Roth9fe284e2016-09-12 11:22:27 -0700723 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800724 grpc_client_channel_stop_backup_polling(chand->interested_parties);
725 grpc_connectivity_state_destroy(&chand->state_tracker);
726 grpc_pollset_set_destroy(chand->interested_parties);
727 GRPC_COMBINER_UNREF(chand->combiner, "client_channel");
Craig Tillerd85477512017-02-09 12:02:39 -0800728 gpr_mu_destroy(&chand->info_mu);
Alexander Polcync3b1f182017-04-18 13:51:36 -0700729 gpr_mu_destroy(&chand->external_connectivity_watcher_list_mu);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700730}
731
732/*************************************************************************
733 * PER-CALL FUNCTIONS
734 */
735
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700736// Max number of batches that can be pending on a call at any given
Mark D. Roth718c8342018-02-28 13:00:04 -0800737// time. This includes one batch for each of the following ops:
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700738// recv_initial_metadata
739// send_initial_metadata
740// recv_message
741// send_message
742// recv_trailing_metadata
743// send_trailing_metadata
Mark D. Roth718c8342018-02-28 13:00:04 -0800744#define MAX_PENDING_BATCHES 6
745
746// Retry support:
747//
748// In order to support retries, we act as a proxy for stream op batches.
749// When we get a batch from the surface, we add it to our list of pending
750// batches, and we then use those batches to construct separate "child"
751// batches to be started on the subchannel call. When the child batches
752// return, we then decide which pending batches have been completed and
753// schedule their callbacks accordingly. If a subchannel call fails and
754// we want to retry it, we do a new pick and start again, constructing
755// new "child" batches for the new subchannel call.
756//
757// Note that retries are committed when receiving data from the server
758// (except for Trailers-Only responses). However, there may be many
759// send ops started before receiving any data, so we may have already
760// completed some number of send ops (and returned the completions up to
761// the surface) by the time we realize that we need to retry. To deal
762// with this, we cache data for send ops, so that we can replay them on a
763// different subchannel call even after we have completed the original
764// batches.
765//
766// There are two sets of data to maintain:
767// - In call_data (in the parent channel), we maintain a list of pending
768// ops and cached data for send ops.
769// - In the subchannel call, we maintain state to indicate what ops have
770// already been sent down to that call.
771//
772// When constructing the "child" batches, we compare those two sets of
773// data to see which batches need to be sent to the subchannel call.
774
775// TODO(roth): In subsequent PRs:
776// - add support for transparent retries (including initial metadata)
777// - figure out how to record stats in census for retries
778// (census filter is on top of this one)
779// - add census stats for retries
780
781// State used for starting a retryable batch on a subchannel call.
782// This provides its own grpc_transport_stream_op_batch and other data
783// structures needed to populate the ops in the batch.
784// We allocate one struct on the arena for each attempt at starting a
785// batch on a given subchannel call.
786typedef struct {
787 gpr_refcount refs;
788 grpc_call_element* elem;
789 grpc_subchannel_call* subchannel_call; // Holds a ref.
790 // The batch to use in the subchannel call.
791 // Its payload field points to subchannel_call_retry_state.batch_payload.
792 grpc_transport_stream_op_batch batch;
793 // For send_initial_metadata.
794 // Note that we need to make a copy of the initial metadata for each
795 // subchannel call instead of just referring to the copy in call_data,
796 // because filters in the subchannel stack will probably add entries,
797 // so we need to start in a pristine state for each attempt of the call.
798 grpc_linked_mdelem* send_initial_metadata_storage;
799 grpc_metadata_batch send_initial_metadata;
800 // For send_message.
801 grpc_caching_byte_stream send_message;
802 // For send_trailing_metadata.
803 grpc_linked_mdelem* send_trailing_metadata_storage;
804 grpc_metadata_batch send_trailing_metadata;
805 // For intercepting recv_initial_metadata.
806 grpc_metadata_batch recv_initial_metadata;
807 grpc_closure recv_initial_metadata_ready;
808 bool trailing_metadata_available;
809 // For intercepting recv_message.
810 grpc_closure recv_message_ready;
811 grpc_byte_stream* recv_message;
812 // For intercepting recv_trailing_metadata.
813 grpc_metadata_batch recv_trailing_metadata;
814 grpc_transport_stream_stats collect_stats;
815 // For intercepting on_complete.
816 grpc_closure on_complete;
817} subchannel_batch_data;
818
819// Retry state associated with a subchannel call.
820// Stored in the parent_data of the subchannel call object.
821typedef struct {
822 // subchannel_batch_data.batch.payload points to this.
823 grpc_transport_stream_op_batch_payload batch_payload;
824 // These fields indicate which ops have been started and completed on
825 // this subchannel call.
826 size_t started_send_message_count;
827 size_t completed_send_message_count;
828 size_t started_recv_message_count;
829 size_t completed_recv_message_count;
830 bool started_send_initial_metadata : 1;
831 bool completed_send_initial_metadata : 1;
832 bool started_send_trailing_metadata : 1;
833 bool completed_send_trailing_metadata : 1;
834 bool started_recv_initial_metadata : 1;
835 bool completed_recv_initial_metadata : 1;
836 bool started_recv_trailing_metadata : 1;
837 bool completed_recv_trailing_metadata : 1;
838 // State for callback processing.
839 bool retry_dispatched : 1;
840 bool recv_initial_metadata_ready_deferred : 1;
841 bool recv_message_ready_deferred : 1;
842 grpc_error* recv_initial_metadata_error;
843 grpc_error* recv_message_error;
844} subchannel_call_retry_state;
845
846// Pending batches stored in call data.
847typedef struct {
848 // The pending batch. If nullptr, this slot is empty.
849 grpc_transport_stream_op_batch* batch;
850 // Indicates whether payload for send ops has been cached in call data.
851 bool send_ops_cached;
852} pending_batch;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700853
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700854/** Call data. Holds a pointer to grpc_subchannel_call and the
855 associated machinery to create such a pointer.
856 Handles queueing of stream ops until a call object is ready, waiting
857 for initial metadata before trying to create a call object,
858 and handling cancellation gracefully. */
859typedef struct client_channel_call_data {
Mark D. Roth72f6da82016-09-02 13:42:38 -0700860 // State for handling deadlines.
861 // The code in deadline_filter.c requires this to be the first field.
Mark D. Roth72f6da82016-09-02 13:42:38 -0700862 // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
Mark D. Roth66f3d2b2017-09-01 09:02:17 -0700863 // and this struct both independently store pointers to the call stack
864 // and call combiner. If/when we have time, find a way to avoid this
865 // without breaking the grpc_deadline_state abstraction.
Mark D. Roth72f6da82016-09-02 13:42:38 -0700866 grpc_deadline_state deadline_state;
Mark D. Rothf28763c2016-09-14 15:18:40 -0700867
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800868 grpc_slice path; // Request path.
Mark D. Rothe40dd292016-10-05 14:58:37 -0700869 gpr_timespec call_start_time;
Craig Tiller89c14282017-07-19 15:32:27 -0700870 grpc_millis deadline;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700871 gpr_arena* arena;
872 grpc_call_stack* owning_call;
873 grpc_call_combiner* call_combiner;
Mark D. Roth76e264b2017-08-25 09:03:33 -0700874
Craig Tillerbaa14a92017-11-03 09:09:36 -0700875 grpc_server_retry_throttle_data* retry_throttle_data;
Mark D. Roth3e7f2df2018-02-26 13:17:06 -0800876 grpc_core::RefCountedPtr<ClientChannelMethodParams> method_params;
Mark D. Rothaa850a72016-09-26 13:38:02 -0700877
Craig Tillerbaa14a92017-11-03 09:09:36 -0700878 grpc_subchannel_call* subchannel_call;
Mark D. Roth718c8342018-02-28 13:00:04 -0800879
880 // Set when we get a cancel_stream op.
881 grpc_error* cancel_error;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700882
Mark D. Rothc8875492018-02-20 08:33:48 -0800883 grpc_core::LoadBalancingPolicy::PickState pick;
Mark D. Roth718c8342018-02-28 13:00:04 -0800884 grpc_closure pick_closure;
885 grpc_closure pick_cancel_closure;
Mark D. Roth60751fe2017-07-07 12:50:33 -0700886
Craig Tillerbaa14a92017-11-03 09:09:36 -0700887 grpc_polling_entity* pollent;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700888
Mark D. Roth718c8342018-02-28 13:00:04 -0800889 // Batches are added to this list when received from above.
890 // They are removed when we are done handling the batch (i.e., when
891 // either we have invoked all of the batch's callbacks or we have
892 // passed the batch down to the subchannel call and are not
893 // intercepting any of its callbacks).
894 pending_batch pending_batches[MAX_PENDING_BATCHES];
895 bool pending_send_initial_metadata : 1;
896 bool pending_send_message : 1;
897 bool pending_send_trailing_metadata : 1;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700898
Mark D. Roth718c8342018-02-28 13:00:04 -0800899 // Retry state.
900 bool enable_retries : 1;
901 bool retry_committed : 1;
902 bool last_attempt_got_server_pushback : 1;
903 int num_attempts_completed;
904 size_t bytes_buffered_for_retry;
905 grpc_core::ManualConstructor<grpc_core::BackOff> retry_backoff;
906 grpc_timer retry_timer;
David Garcia Quintasd1a47f12016-09-02 12:46:44 +0200907
Mark D. Roth718c8342018-02-28 13:00:04 -0800908 // Cached data for retrying send ops.
909 // send_initial_metadata
910 bool seen_send_initial_metadata;
911 grpc_linked_mdelem* send_initial_metadata_storage;
912 grpc_metadata_batch send_initial_metadata;
913 uint32_t send_initial_metadata_flags;
914 gpr_atm* peer_string;
915 // send_message
916 // When we get a send_message op, we replace the original byte stream
917 // with a grpc_caching_byte_stream that caches the slices to a
918 // local buffer for use in retries.
919 // Note: We inline the cache for the first 3 send_message ops and use
920 // dynamic allocation after that. This number was essentially picked
921 // at random; it could be changed in the future to tune performance.
922 grpc_core::InlinedVector<grpc_byte_stream_cache*, 3> send_messages;
923 // send_trailing_metadata
924 bool seen_send_trailing_metadata;
925 grpc_linked_mdelem* send_trailing_metadata_storage;
926 grpc_metadata_batch send_trailing_metadata;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700927} call_data;
928
Mark D. Roth718c8342018-02-28 13:00:04 -0800929// Forward declarations.
930static void retry_commit(grpc_call_element* elem,
931 subchannel_call_retry_state* retry_state);
932static void start_internal_recv_trailing_metadata(grpc_call_element* elem);
933static void on_complete(void* arg, grpc_error* error);
934static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored);
935static void pick_after_resolver_result_start_locked(grpc_call_element* elem);
936static void start_pick_locked(void* arg, grpc_error* ignored);
Craig Tiller8b1d59c2016-12-27 15:15:30 -0800937
Mark D. Roth718c8342018-02-28 13:00:04 -0800938//
939// send op data caching
940//
941
942// Caches data for send ops so that it can be retried later, if not
943// already cached.
944static void maybe_cache_send_ops_for_batch(call_data* calld,
945 pending_batch* pending) {
946 if (pending->send_ops_cached) return;
947 pending->send_ops_cached = true;
948 grpc_transport_stream_op_batch* batch = pending->batch;
949 // Save a copy of metadata for send_initial_metadata ops.
Mark D. Roth76e264b2017-08-25 09:03:33 -0700950 if (batch->send_initial_metadata) {
Mark D. Roth718c8342018-02-28 13:00:04 -0800951 calld->seen_send_initial_metadata = true;
952 GPR_ASSERT(calld->send_initial_metadata_storage == nullptr);
953 grpc_metadata_batch* send_initial_metadata =
954 batch->payload->send_initial_metadata.send_initial_metadata;
955 calld->send_initial_metadata_storage = (grpc_linked_mdelem*)gpr_arena_alloc(
956 calld->arena,
957 sizeof(grpc_linked_mdelem) * send_initial_metadata->list.count);
958 grpc_metadata_batch_copy(send_initial_metadata,
959 &calld->send_initial_metadata,
960 calld->send_initial_metadata_storage);
961 calld->send_initial_metadata_flags =
962 batch->payload->send_initial_metadata.send_initial_metadata_flags;
963 calld->peer_string = batch->payload->send_initial_metadata.peer_string;
964 }
965 // Set up cache for send_message ops.
966 if (batch->send_message) {
967 grpc_byte_stream_cache* cache = (grpc_byte_stream_cache*)gpr_arena_alloc(
968 calld->arena, sizeof(grpc_byte_stream_cache));
969 grpc_byte_stream_cache_init(cache,
970 batch->payload->send_message.send_message);
971 calld->send_messages.push_back(cache);
972 }
973 // Save metadata batch for send_trailing_metadata ops.
974 if (batch->send_trailing_metadata) {
975 calld->seen_send_trailing_metadata = true;
976 GPR_ASSERT(calld->send_trailing_metadata_storage == nullptr);
977 grpc_metadata_batch* send_trailing_metadata =
978 batch->payload->send_trailing_metadata.send_trailing_metadata;
979 calld->send_trailing_metadata_storage =
980 (grpc_linked_mdelem*)gpr_arena_alloc(
981 calld->arena,
982 sizeof(grpc_linked_mdelem) * send_trailing_metadata->list.count);
983 grpc_metadata_batch_copy(send_trailing_metadata,
984 &calld->send_trailing_metadata,
985 calld->send_trailing_metadata_storage);
Mark D. Roth76e264b2017-08-25 09:03:33 -0700986 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700987}
988
Mark D. Roth718c8342018-02-28 13:00:04 -0800989// Frees cached send ops that have already been completed after
990// committing the call.
991static void free_cached_send_op_data_after_commit(
992 grpc_call_element* elem, subchannel_call_retry_state* retry_state) {
Noah Eisenbe82e642018-02-09 09:16:55 -0800993 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
994 call_data* calld = static_cast<call_data*>(elem->call_data);
Mark D. Roth718c8342018-02-28 13:00:04 -0800995 if (retry_state->completed_send_initial_metadata) {
996 grpc_metadata_batch_destroy(&calld->send_initial_metadata);
Mark D. Roth60751fe2017-07-07 12:50:33 -0700997 }
Mark D. Roth718c8342018-02-28 13:00:04 -0800998 for (size_t i = 0; i < retry_state->completed_send_message_count; ++i) {
999 if (grpc_client_channel_trace.enabled()) {
1000 gpr_log(GPR_DEBUG,
1001 "chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR
1002 "]",
1003 chand, calld, i);
1004 }
1005 grpc_byte_stream_cache_destroy(calld->send_messages[i]);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001006 }
Mark D. Roth718c8342018-02-28 13:00:04 -08001007 if (retry_state->completed_send_trailing_metadata) {
1008 grpc_metadata_batch_destroy(&calld->send_trailing_metadata);
1009 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001010}
1011
Mark D. Roth718c8342018-02-28 13:00:04 -08001012// Frees cached send ops that were completed by the completed batch in
1013// batch_data. Used when batches are completed after the call is committed.
1014static void free_cached_send_op_data_for_completed_batch(
1015 grpc_call_element* elem, subchannel_batch_data* batch_data,
1016 subchannel_call_retry_state* retry_state) {
Noah Eisenbe82e642018-02-09 09:16:55 -08001017 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1018 call_data* calld = static_cast<call_data*>(elem->call_data);
Mark D. Roth718c8342018-02-28 13:00:04 -08001019 if (batch_data->batch.send_initial_metadata) {
1020 grpc_metadata_batch_destroy(&calld->send_initial_metadata);
1021 }
1022 if (batch_data->batch.send_message) {
1023 if (grpc_client_channel_trace.enabled()) {
1024 gpr_log(GPR_DEBUG,
1025 "chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR
1026 "]",
1027 chand, calld, retry_state->completed_send_message_count - 1);
1028 }
1029 grpc_byte_stream_cache_destroy(
1030 calld->send_messages[retry_state->completed_send_message_count - 1]);
1031 }
1032 if (batch_data->batch.send_trailing_metadata) {
1033 grpc_metadata_batch_destroy(&calld->send_trailing_metadata);
1034 }
1035}
1036
1037//
1038// pending_batches management
1039//
1040
1041// Returns the index into calld->pending_batches to be used for batch.
1042static size_t get_batch_index(grpc_transport_stream_op_batch* batch) {
1043 // Note: It is important the send_initial_metadata be the first entry
1044 // here, since the code in pick_subchannel_locked() assumes it will be.
1045 if (batch->send_initial_metadata) return 0;
1046 if (batch->send_message) return 1;
1047 if (batch->send_trailing_metadata) return 2;
1048 if (batch->recv_initial_metadata) return 3;
1049 if (batch->recv_message) return 4;
1050 if (batch->recv_trailing_metadata) return 5;
1051 GPR_UNREACHABLE_CODE(return (size_t)-1);
1052}
1053
1054// This is called via the call combiner, so access to calld is synchronized.
1055static void pending_batches_add(grpc_call_element* elem,
1056 grpc_transport_stream_op_batch* batch) {
1057 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1058 call_data* calld = static_cast<call_data*>(elem->call_data);
1059 const size_t idx = get_batch_index(batch);
Craig Tiller6014e8a2017-10-16 13:50:29 -07001060 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth718c8342018-02-28 13:00:04 -08001061 gpr_log(GPR_DEBUG,
1062 "chand=%p calld=%p: adding pending batch at index %" PRIuPTR, chand,
1063 calld, idx);
Mark D. Roth60751fe2017-07-07 12:50:33 -07001064 }
Mark D. Roth718c8342018-02-28 13:00:04 -08001065 pending_batch* pending = &calld->pending_batches[idx];
1066 GPR_ASSERT(pending->batch == nullptr);
1067 pending->batch = batch;
1068 pending->send_ops_cached = false;
1069 if (calld->enable_retries) {
1070 // Update state in calld about pending batches.
1071 // Also check if the batch takes us over the retry buffer limit.
1072 // Note: We don't check the size of trailing metadata here, because
1073 // gRPC clients do not send trailing metadata.
1074 if (batch->send_initial_metadata) {
1075 calld->pending_send_initial_metadata = true;
1076 calld->bytes_buffered_for_retry += grpc_metadata_batch_size(
1077 batch->payload->send_initial_metadata.send_initial_metadata);
1078 }
1079 if (batch->send_message) {
1080 calld->pending_send_message = true;
1081 calld->bytes_buffered_for_retry +=
1082 batch->payload->send_message.send_message->length;
1083 }
1084 if (batch->send_trailing_metadata) {
1085 calld->pending_send_trailing_metadata = true;
1086 }
1087 if (calld->bytes_buffered_for_retry > chand->per_rpc_retry_buffer_size) {
1088 if (grpc_client_channel_trace.enabled()) {
1089 gpr_log(GPR_DEBUG,
1090 "chand=%p calld=%p: exceeded retry buffer size, committing",
1091 chand, calld);
1092 }
1093 subchannel_call_retry_state* retry_state =
1094 calld->subchannel_call == nullptr
1095 ? nullptr
1096 : static_cast<subchannel_call_retry_state*>(
1097 grpc_connected_subchannel_call_get_parent_data(
1098 calld->subchannel_call));
1099 retry_commit(elem, retry_state);
1100 // If we are not going to retry and have not yet started, pretend
1101 // retries are disabled so that we don't bother with retry overhead.
1102 if (calld->num_attempts_completed == 0) {
1103 if (grpc_client_channel_trace.enabled()) {
1104 gpr_log(GPR_DEBUG,
1105 "chand=%p calld=%p: disabling retries before first attempt",
1106 chand, calld);
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001107 }
Mark D. Roth718c8342018-02-28 13:00:04 -08001108 calld->enable_retries = false;
Craig Tiller11c17d42017-03-13 13:36:34 -07001109 }
1110 }
1111 }
Craig Tiller11c17d42017-03-13 13:36:34 -07001112}
Craig Tillerea4a4f12017-03-13 13:36:52 -07001113
Mark D. Roth718c8342018-02-28 13:00:04 -08001114static void pending_batch_clear(call_data* calld, pending_batch* pending) {
1115 if (calld->enable_retries) {
1116 if (pending->batch->send_initial_metadata) {
1117 calld->pending_send_initial_metadata = false;
1118 }
1119 if (pending->batch->send_message) {
1120 calld->pending_send_message = false;
1121 }
1122 if (pending->batch->send_trailing_metadata) {
1123 calld->pending_send_trailing_metadata = false;
1124 }
1125 }
1126 pending->batch = nullptr;
1127}
1128
1129// This is called via the call combiner, so access to calld is synchronized.
1130static void fail_pending_batch_in_call_combiner(void* arg, grpc_error* error) {
1131 grpc_transport_stream_op_batch* batch =
1132 static_cast<grpc_transport_stream_op_batch*>(arg);
1133 call_data* calld = static_cast<call_data*>(batch->handler_private.extra_arg);
1134 // Note: This will release the call combiner.
1135 grpc_transport_stream_op_batch_finish_with_failure(
1136 batch, GRPC_ERROR_REF(error), calld->call_combiner);
1137}
1138
1139// This is called via the call combiner, so access to calld is synchronized.
1140// If yield_call_combiner is true, assumes responsibility for yielding
1141// the call combiner.
1142static void pending_batches_fail(grpc_call_element* elem, grpc_error* error,
1143 bool yield_call_combiner) {
1144 GPR_ASSERT(error != GRPC_ERROR_NONE);
1145 call_data* calld = static_cast<call_data*>(elem->call_data);
1146 if (grpc_client_channel_trace.enabled()) {
1147 size_t num_batches = 0;
1148 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1149 if (calld->pending_batches[i].batch != nullptr) ++num_batches;
1150 }
1151 gpr_log(GPR_DEBUG,
1152 "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
1153 elem->channel_data, calld, num_batches, grpc_error_string(error));
1154 }
1155 grpc_transport_stream_op_batch*
1156 batches[GPR_ARRAY_SIZE(calld->pending_batches)];
1157 size_t num_batches = 0;
1158 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1159 pending_batch* pending = &calld->pending_batches[i];
1160 grpc_transport_stream_op_batch* batch = pending->batch;
1161 if (batch != nullptr) {
1162 batches[num_batches++] = batch;
1163 pending_batch_clear(calld, pending);
1164 }
1165 }
1166 for (size_t i = yield_call_combiner ? 1 : 0; i < num_batches; ++i) {
1167 grpc_transport_stream_op_batch* batch = batches[i];
1168 batch->handler_private.extra_arg = calld;
1169 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
1170 fail_pending_batch_in_call_combiner, batch,
1171 grpc_schedule_on_exec_ctx);
1172 GRPC_CALL_COMBINER_START(calld->call_combiner,
1173 &batch->handler_private.closure,
1174 GRPC_ERROR_REF(error), "pending_batches_fail");
1175 }
1176 if (yield_call_combiner) {
1177 if (num_batches > 0) {
1178 // Note: This will release the call combiner.
1179 grpc_transport_stream_op_batch_finish_with_failure(
1180 batches[0], GRPC_ERROR_REF(error), calld->call_combiner);
1181 } else {
1182 GRPC_CALL_COMBINER_STOP(calld->call_combiner, "pending_batches_fail");
1183 }
1184 }
1185 GRPC_ERROR_UNREF(error);
1186}
1187
1188// This is called via the call combiner, so access to calld is synchronized.
1189static void resume_pending_batch_in_call_combiner(void* arg,
1190 grpc_error* ignored) {
1191 grpc_transport_stream_op_batch* batch =
1192 static_cast<grpc_transport_stream_op_batch*>(arg);
1193 grpc_subchannel_call* subchannel_call =
1194 static_cast<grpc_subchannel_call*>(batch->handler_private.extra_arg);
1195 // Note: This will release the call combiner.
1196 grpc_subchannel_call_process_op(subchannel_call, batch);
1197}
1198
1199// This is called via the call combiner, so access to calld is synchronized.
1200static void pending_batches_resume(grpc_call_element* elem) {
Noah Eisenbe82e642018-02-09 09:16:55 -08001201 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1202 call_data* calld = static_cast<call_data*>(elem->call_data);
Mark D. Roth718c8342018-02-28 13:00:04 -08001203 if (calld->enable_retries) {
1204 start_retriable_subchannel_batches(elem, GRPC_ERROR_NONE);
1205 return;
1206 }
1207 // Retries not enabled; send down batches as-is.
1208 if (grpc_client_channel_trace.enabled()) {
1209 size_t num_batches = 0;
1210 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1211 if (calld->pending_batches[i].batch != nullptr) ++num_batches;
1212 }
1213 gpr_log(GPR_DEBUG,
1214 "chand=%p calld=%p: starting %" PRIuPTR
1215 " pending batches on subchannel_call=%p",
1216 chand, calld, num_batches, calld->subchannel_call);
1217 }
1218 grpc_transport_stream_op_batch*
1219 batches[GPR_ARRAY_SIZE(calld->pending_batches)];
1220 size_t num_batches = 0;
1221 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1222 pending_batch* pending = &calld->pending_batches[i];
1223 grpc_transport_stream_op_batch* batch = pending->batch;
1224 if (batch != nullptr) {
1225 batches[num_batches++] = batch;
1226 pending_batch_clear(calld, pending);
1227 }
1228 }
1229 for (size_t i = 1; i < num_batches; ++i) {
1230 grpc_transport_stream_op_batch* batch = batches[i];
1231 batch->handler_private.extra_arg = calld->subchannel_call;
1232 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
1233 resume_pending_batch_in_call_combiner, batch,
1234 grpc_schedule_on_exec_ctx);
1235 GRPC_CALL_COMBINER_START(calld->call_combiner,
1236 &batch->handler_private.closure, GRPC_ERROR_NONE,
1237 "pending_batches_resume");
1238 }
1239 GPR_ASSERT(num_batches > 0);
1240 // Note: This will release the call combiner.
1241 grpc_subchannel_call_process_op(calld->subchannel_call, batches[0]);
1242}
1243
1244static void maybe_clear_pending_batch(grpc_call_element* elem,
1245 pending_batch* pending) {
1246 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1247 call_data* calld = static_cast<call_data*>(elem->call_data);
1248 grpc_transport_stream_op_batch* batch = pending->batch;
1249 // We clear the pending batch if all of its callbacks have been
1250 // scheduled and reset to nullptr.
1251 if (batch->on_complete == nullptr &&
1252 (!batch->recv_initial_metadata ||
1253 batch->payload->recv_initial_metadata.recv_initial_metadata_ready ==
1254 nullptr) &&
1255 (!batch->recv_message ||
1256 batch->payload->recv_message.recv_message_ready == nullptr)) {
1257 if (grpc_client_channel_trace.enabled()) {
1258 gpr_log(GPR_DEBUG, "chand=%p calld=%p: clearing pending batch", chand,
1259 calld);
1260 }
1261 pending_batch_clear(calld, pending);
1262 }
1263}
1264
1265// Returns true if all ops in the pending batch have been completed.
1266static bool pending_batch_is_completed(
1267 pending_batch* pending, call_data* calld,
1268 subchannel_call_retry_state* retry_state) {
1269 if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
1270 return false;
1271 }
1272 if (pending->batch->send_initial_metadata &&
1273 !retry_state->completed_send_initial_metadata) {
1274 return false;
1275 }
1276 if (pending->batch->send_message &&
1277 retry_state->completed_send_message_count < calld->send_messages.size()) {
1278 return false;
1279 }
1280 if (pending->batch->send_trailing_metadata &&
1281 !retry_state->completed_send_trailing_metadata) {
1282 return false;
1283 }
1284 if (pending->batch->recv_initial_metadata &&
1285 !retry_state->completed_recv_initial_metadata) {
1286 return false;
1287 }
1288 if (pending->batch->recv_message &&
1289 retry_state->completed_recv_message_count <
1290 retry_state->started_recv_message_count) {
1291 return false;
1292 }
1293 if (pending->batch->recv_trailing_metadata &&
1294 !retry_state->completed_recv_trailing_metadata) {
1295 return false;
1296 }
1297 return true;
1298}
1299
1300// Returns true if any op in the batch was not yet started.
1301static bool pending_batch_is_unstarted(
1302 pending_batch* pending, call_data* calld,
1303 subchannel_call_retry_state* retry_state) {
1304 if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
1305 return false;
1306 }
1307 if (pending->batch->send_initial_metadata &&
1308 !retry_state->started_send_initial_metadata) {
1309 return true;
1310 }
1311 if (pending->batch->send_message &&
1312 retry_state->started_send_message_count < calld->send_messages.size()) {
1313 return true;
1314 }
1315 if (pending->batch->send_trailing_metadata &&
1316 !retry_state->started_send_trailing_metadata) {
1317 return true;
1318 }
1319 if (pending->batch->recv_initial_metadata &&
1320 !retry_state->started_recv_initial_metadata) {
1321 return true;
1322 }
1323 if (pending->batch->recv_message &&
1324 retry_state->completed_recv_message_count ==
1325 retry_state->started_recv_message_count) {
1326 return true;
1327 }
1328 if (pending->batch->recv_trailing_metadata &&
1329 !retry_state->started_recv_trailing_metadata) {
1330 return true;
1331 }
1332 return false;
1333}
1334
1335//
1336// retry code
1337//
1338
1339// Commits the call so that no further retry attempts will be performed.
1340static void retry_commit(grpc_call_element* elem,
1341 subchannel_call_retry_state* retry_state) {
1342 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1343 call_data* calld = static_cast<call_data*>(elem->call_data);
1344 if (calld->retry_committed) return;
1345 calld->retry_committed = true;
1346 if (grpc_client_channel_trace.enabled()) {
1347 gpr_log(GPR_DEBUG, "chand=%p calld=%p: committing retries", chand, calld);
1348 }
1349 if (retry_state != nullptr) {
1350 free_cached_send_op_data_after_commit(elem, retry_state);
1351 }
1352}
1353
1354// Starts a retry after appropriate back-off.
1355static void do_retry(grpc_call_element* elem,
1356 subchannel_call_retry_state* retry_state,
1357 grpc_millis server_pushback_ms) {
1358 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1359 call_data* calld = static_cast<call_data*>(elem->call_data);
1360 GPR_ASSERT(calld->method_params != nullptr);
1361 const ClientChannelMethodParams::RetryPolicy* retry_policy =
1362 calld->method_params->retry_policy();
1363 GPR_ASSERT(retry_policy != nullptr);
1364 // Reset subchannel call and connected subchannel.
1365 if (calld->subchannel_call != nullptr) {
1366 GRPC_SUBCHANNEL_CALL_UNREF(calld->subchannel_call,
1367 "client_channel_call_retry");
1368 calld->subchannel_call = nullptr;
1369 }
1370 if (calld->pick.connected_subchannel != nullptr) {
1371 calld->pick.connected_subchannel.reset();
1372 }
1373 // Compute backoff delay.
1374 grpc_millis next_attempt_time;
1375 if (server_pushback_ms >= 0) {
1376 next_attempt_time = grpc_core::ExecCtx::Get()->Now() + server_pushback_ms;
1377 calld->last_attempt_got_server_pushback = true;
1378 } else {
1379 if (calld->num_attempts_completed == 1 ||
1380 calld->last_attempt_got_server_pushback) {
1381 calld->retry_backoff.Init(
1382 grpc_core::BackOff::Options()
1383 .set_initial_backoff(retry_policy->initial_backoff)
1384 .set_multiplier(retry_policy->backoff_multiplier)
1385 .set_jitter(RETRY_BACKOFF_JITTER)
1386 .set_max_backoff(retry_policy->max_backoff));
1387 calld->last_attempt_got_server_pushback = false;
1388 }
1389 next_attempt_time = calld->retry_backoff->NextAttemptTime();
1390 }
1391 if (grpc_client_channel_trace.enabled()) {
1392 gpr_log(GPR_DEBUG,
1393 "chand=%p calld=%p: retrying failed call in %" PRIuPTR " ms", chand,
1394 calld, next_attempt_time - grpc_core::ExecCtx::Get()->Now());
1395 }
1396 // Schedule retry after computed delay.
1397 GRPC_CLOSURE_INIT(&calld->pick_closure, start_pick_locked, elem,
1398 grpc_combiner_scheduler(chand->combiner));
1399 grpc_timer_init(&calld->retry_timer, next_attempt_time, &calld->pick_closure);
1400 // Update bookkeeping.
1401 if (retry_state != nullptr) retry_state->retry_dispatched = true;
1402}
1403
1404// Returns true if the call is being retried.
1405static bool maybe_retry(grpc_call_element* elem,
1406 subchannel_batch_data* batch_data,
1407 grpc_status_code status,
1408 grpc_mdelem* server_pushback_md) {
1409 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1410 call_data* calld = static_cast<call_data*>(elem->call_data);
1411 // Get retry policy.
1412 if (calld->method_params == nullptr) return false;
1413 const ClientChannelMethodParams::RetryPolicy* retry_policy =
1414 calld->method_params->retry_policy();
1415 if (retry_policy == nullptr) return false;
1416 // If we've already dispatched a retry from this call, return true.
1417 // This catches the case where the batch has multiple callbacks
1418 // (i.e., it includes either recv_message or recv_initial_metadata).
1419 subchannel_call_retry_state* retry_state = nullptr;
1420 if (batch_data != nullptr) {
1421 retry_state = static_cast<subchannel_call_retry_state*>(
1422 grpc_connected_subchannel_call_get_parent_data(
1423 batch_data->subchannel_call));
1424 if (retry_state->retry_dispatched) {
1425 if (grpc_client_channel_trace.enabled()) {
1426 gpr_log(GPR_DEBUG, "chand=%p calld=%p: retry already dispatched", chand,
1427 calld);
1428 }
1429 return true;
1430 }
1431 }
1432 // Check status.
1433 if (status == GRPC_STATUS_OK) {
1434 grpc_server_retry_throttle_data_record_success(calld->retry_throttle_data);
1435 if (grpc_client_channel_trace.enabled()) {
1436 gpr_log(GPR_DEBUG, "chand=%p calld=%p: call succeeded", chand, calld);
1437 }
1438 return false;
1439 }
1440 // Status is not OK. Check whether the status is retryable.
1441 if (!retry_policy->retryable_status_codes.Contains(status)) {
1442 if (grpc_client_channel_trace.enabled()) {
1443 gpr_log(GPR_DEBUG,
1444 "chand=%p calld=%p: status %s not configured as retryable", chand,
1445 calld, grpc_status_code_to_string(status));
1446 }
1447 return false;
1448 }
1449 // Record the failure and check whether retries are throttled.
1450 // Note that it's important for this check to come after the status
1451 // code check above, since we should only record failures whose statuses
1452 // match the configured retryable status codes, so that we don't count
1453 // things like failures due to malformed requests (INVALID_ARGUMENT).
1454 // Conversely, it's important for this to come before the remaining
1455 // checks, so that we don't fail to record failures due to other factors.
1456 if (!grpc_server_retry_throttle_data_record_failure(
1457 calld->retry_throttle_data)) {
1458 if (grpc_client_channel_trace.enabled()) {
1459 gpr_log(GPR_DEBUG, "chand=%p calld=%p: retries throttled", chand, calld);
1460 }
1461 return false;
1462 }
1463 // Check whether the call is committed.
1464 if (calld->retry_committed) {
1465 if (grpc_client_channel_trace.enabled()) {
1466 gpr_log(GPR_DEBUG, "chand=%p calld=%p: retries already committed", chand,
1467 calld);
1468 }
1469 return false;
1470 }
1471 // Check whether we have retries remaining.
1472 ++calld->num_attempts_completed;
1473 if (calld->num_attempts_completed >= retry_policy->max_attempts) {
1474 if (grpc_client_channel_trace.enabled()) {
1475 gpr_log(GPR_DEBUG, "chand=%p calld=%p: exceeded %d retry attempts", chand,
1476 calld, retry_policy->max_attempts);
1477 }
1478 return false;
1479 }
1480 // If the call was cancelled from the surface, don't retry.
1481 if (calld->cancel_error != GRPC_ERROR_NONE) {
1482 if (grpc_client_channel_trace.enabled()) {
1483 gpr_log(GPR_DEBUG,
1484 "chand=%p calld=%p: call cancelled from surface, not retrying",
1485 chand, calld);
1486 }
1487 return false;
1488 }
1489 // Check server push-back.
1490 grpc_millis server_pushback_ms = -1;
1491 if (server_pushback_md != nullptr) {
1492 // If the value is "-1" or any other unparseable string, we do not retry.
1493 uint32_t ms;
1494 if (!grpc_parse_slice_to_uint32(GRPC_MDVALUE(*server_pushback_md), &ms)) {
1495 if (grpc_client_channel_trace.enabled()) {
1496 gpr_log(GPR_DEBUG,
1497 "chand=%p calld=%p: not retrying due to server push-back",
1498 chand, calld);
1499 }
1500 return false;
1501 } else {
1502 if (grpc_client_channel_trace.enabled()) {
1503 gpr_log(GPR_DEBUG,
1504 "chand=%p calld=%p: server push-back: retry in %u ms", chand,
1505 calld, ms);
1506 }
1507 server_pushback_ms = (grpc_millis)ms;
1508 }
1509 }
1510 do_retry(elem, retry_state, server_pushback_ms);
1511 return true;
1512}
1513
1514//
1515// subchannel_batch_data
1516//
1517
1518static subchannel_batch_data* batch_data_create(grpc_call_element* elem,
1519 int refcount) {
1520 call_data* calld = static_cast<call_data*>(elem->call_data);
1521 subchannel_call_retry_state* retry_state =
1522 static_cast<subchannel_call_retry_state*>(
1523 grpc_connected_subchannel_call_get_parent_data(
1524 calld->subchannel_call));
1525 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(
1526 gpr_arena_alloc(calld->arena, sizeof(*batch_data)));
1527 batch_data->elem = elem;
1528 batch_data->subchannel_call =
1529 GRPC_SUBCHANNEL_CALL_REF(calld->subchannel_call, "batch_data_create");
1530 batch_data->batch.payload = &retry_state->batch_payload;
1531 gpr_ref_init(&batch_data->refs, refcount);
1532 GRPC_CLOSURE_INIT(&batch_data->on_complete, on_complete, batch_data,
1533 grpc_schedule_on_exec_ctx);
1534 batch_data->batch.on_complete = &batch_data->on_complete;
1535 GRPC_CALL_STACK_REF(calld->owning_call, "batch_data");
1536 return batch_data;
1537}
1538
1539static void batch_data_unref(subchannel_batch_data* batch_data) {
1540 if (gpr_unref(&batch_data->refs)) {
1541 if (batch_data->send_initial_metadata_storage != nullptr) {
1542 grpc_metadata_batch_destroy(&batch_data->send_initial_metadata);
1543 }
1544 if (batch_data->send_trailing_metadata_storage != nullptr) {
1545 grpc_metadata_batch_destroy(&batch_data->send_trailing_metadata);
1546 }
1547 if (batch_data->batch.recv_initial_metadata) {
1548 grpc_metadata_batch_destroy(&batch_data->recv_initial_metadata);
1549 }
1550 if (batch_data->batch.recv_trailing_metadata) {
1551 grpc_metadata_batch_destroy(&batch_data->recv_trailing_metadata);
1552 }
1553 GRPC_SUBCHANNEL_CALL_UNREF(batch_data->subchannel_call, "batch_data_unref");
1554 call_data* calld = static_cast<call_data*>(batch_data->elem->call_data);
1555 GRPC_CALL_STACK_UNREF(calld->owning_call, "batch_data");
1556 }
1557}
1558
1559//
1560// recv_initial_metadata callback handling
1561//
1562
1563// Invokes recv_initial_metadata_ready for a subchannel batch.
1564static void invoke_recv_initial_metadata_callback(void* arg,
1565 grpc_error* error) {
1566 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1567 channel_data* chand =
1568 static_cast<channel_data*>(batch_data->elem->channel_data);
1569 call_data* calld = static_cast<call_data*>(batch_data->elem->call_data);
1570 // Find pending batch.
1571 pending_batch* pending = nullptr;
1572 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1573 grpc_transport_stream_op_batch* batch = calld->pending_batches[i].batch;
1574 if (batch != nullptr && batch->recv_initial_metadata &&
1575 batch->payload->recv_initial_metadata.recv_initial_metadata_ready !=
1576 nullptr) {
1577 if (grpc_client_channel_trace.enabled()) {
1578 gpr_log(GPR_DEBUG,
1579 "chand=%p calld=%p: invoking recv_initial_metadata_ready for "
1580 "pending batch at index %" PRIuPTR,
1581 chand, calld, i);
1582 }
1583 pending = &calld->pending_batches[i];
1584 break;
1585 }
1586 }
1587 GPR_ASSERT(pending != nullptr);
1588 // Return metadata.
1589 grpc_metadata_batch_move(
1590 &batch_data->recv_initial_metadata,
1591 pending->batch->payload->recv_initial_metadata.recv_initial_metadata);
1592 // Update bookkeeping.
1593 // Note: Need to do this before invoking the callback, since invoking
1594 // the callback will result in yielding the call combiner.
1595 grpc_closure* recv_initial_metadata_ready =
1596 pending->batch->payload->recv_initial_metadata
1597 .recv_initial_metadata_ready;
1598 pending->batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
1599 nullptr;
1600 maybe_clear_pending_batch(batch_data->elem, pending);
1601 batch_data_unref(batch_data);
1602 // Invoke callback.
1603 GRPC_CLOSURE_RUN(recv_initial_metadata_ready, GRPC_ERROR_REF(error));
1604}
1605
1606// Intercepts recv_initial_metadata_ready callback for retries.
1607// Commits the call and returns the initial metadata up the stack.
1608static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
1609 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1610 grpc_call_element* elem = batch_data->elem;
1611 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1612 call_data* calld = static_cast<call_data*>(elem->call_data);
1613 if (grpc_client_channel_trace.enabled()) {
1614 gpr_log(GPR_DEBUG,
1615 "chand=%p calld=%p: got recv_initial_metadata_ready, error=%s",
1616 chand, calld, grpc_error_string(error));
1617 }
1618 subchannel_call_retry_state* retry_state =
1619 static_cast<subchannel_call_retry_state*>(
1620 grpc_connected_subchannel_call_get_parent_data(
1621 batch_data->subchannel_call));
1622 // If we got an error or a Trailers-Only response and have not yet gotten
1623 // the recv_trailing_metadata on_complete callback, then defer
1624 // propagating this callback back to the surface. We can evaluate whether
1625 // to retry when recv_trailing_metadata comes back.
1626 if ((batch_data->trailing_metadata_available || error != GRPC_ERROR_NONE) &&
1627 !retry_state->completed_recv_trailing_metadata) {
1628 if (grpc_client_channel_trace.enabled()) {
1629 gpr_log(GPR_DEBUG,
1630 "chand=%p calld=%p: deferring recv_initial_metadata_ready "
1631 "(Trailers-Only)",
1632 chand, calld);
1633 }
1634 retry_state->recv_initial_metadata_ready_deferred = true;
1635 retry_state->recv_initial_metadata_error = GRPC_ERROR_REF(error);
1636 if (!retry_state->started_recv_trailing_metadata) {
1637 // recv_trailing_metadata not yet started by application; start it
1638 // ourselves to get status.
1639 start_internal_recv_trailing_metadata(elem);
1640 } else {
1641 GRPC_CALL_COMBINER_STOP(
1642 calld->call_combiner,
1643 "recv_initial_metadata_ready trailers-only or error");
1644 }
1645 return;
1646 }
1647 // Received valid initial metadata, so commit the call.
1648 retry_commit(elem, retry_state);
1649 // Manually invoking a callback function; it does not take ownership of error.
1650 invoke_recv_initial_metadata_callback(batch_data, error);
1651 GRPC_ERROR_UNREF(error);
1652}
1653
1654//
1655// recv_message callback handling
1656//
1657
1658// Invokes recv_message_ready for a subchannel batch.
1659static void invoke_recv_message_callback(void* arg, grpc_error* error) {
1660 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1661 channel_data* chand =
1662 static_cast<channel_data*>(batch_data->elem->channel_data);
1663 call_data* calld = static_cast<call_data*>(batch_data->elem->call_data);
1664 // Find pending op.
1665 pending_batch* pending = nullptr;
1666 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1667 grpc_transport_stream_op_batch* batch = calld->pending_batches[i].batch;
1668 if (batch != nullptr && batch->recv_message &&
1669 batch->payload->recv_message.recv_message_ready != nullptr) {
1670 if (grpc_client_channel_trace.enabled()) {
1671 gpr_log(GPR_DEBUG,
1672 "chand=%p calld=%p: invoking recv_message_ready for "
1673 "pending batch at index %" PRIuPTR,
1674 chand, calld, i);
1675 }
1676 pending = &calld->pending_batches[i];
1677 break;
1678 }
1679 }
1680 GPR_ASSERT(pending != nullptr);
1681 // Return payload.
1682 *pending->batch->payload->recv_message.recv_message =
1683 batch_data->recv_message;
1684 // Update bookkeeping.
1685 // Note: Need to do this before invoking the callback, since invoking
1686 // the callback will result in yielding the call combiner.
1687 grpc_closure* recv_message_ready =
1688 pending->batch->payload->recv_message.recv_message_ready;
1689 pending->batch->payload->recv_message.recv_message_ready = nullptr;
1690 maybe_clear_pending_batch(batch_data->elem, pending);
1691 batch_data_unref(batch_data);
1692 // Invoke callback.
1693 GRPC_CLOSURE_RUN(recv_message_ready, GRPC_ERROR_REF(error));
1694}
1695
1696// Intercepts recv_message_ready callback for retries.
1697// Commits the call and returns the message up the stack.
1698static void recv_message_ready(void* arg, grpc_error* error) {
1699 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1700 grpc_call_element* elem = batch_data->elem;
1701 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1702 call_data* calld = static_cast<call_data*>(elem->call_data);
1703 if (grpc_client_channel_trace.enabled()) {
1704 gpr_log(GPR_DEBUG, "chand=%p calld=%p: got recv_message_ready, error=%s",
1705 chand, calld, grpc_error_string(error));
1706 }
1707 subchannel_call_retry_state* retry_state =
1708 static_cast<subchannel_call_retry_state*>(
1709 grpc_connected_subchannel_call_get_parent_data(
1710 batch_data->subchannel_call));
1711 // If we got an error or the payload was nullptr and we have not yet gotten
1712 // the recv_trailing_metadata on_complete callback, then defer
1713 // propagating this callback back to the surface. We can evaluate whether
1714 // to retry when recv_trailing_metadata comes back.
1715 if ((batch_data->recv_message == nullptr || error != GRPC_ERROR_NONE) &&
1716 !retry_state->completed_recv_trailing_metadata) {
1717 if (grpc_client_channel_trace.enabled()) {
1718 gpr_log(GPR_DEBUG,
1719 "chand=%p calld=%p: deferring recv_message_ready (nullptr "
1720 "message and recv_trailing_metadata pending)",
1721 chand, calld);
1722 }
1723 retry_state->recv_message_ready_deferred = true;
1724 retry_state->recv_message_error = GRPC_ERROR_REF(error);
1725 if (!retry_state->started_recv_trailing_metadata) {
1726 // recv_trailing_metadata not yet started by application; start it
1727 // ourselves to get status.
1728 start_internal_recv_trailing_metadata(elem);
1729 } else {
1730 GRPC_CALL_COMBINER_STOP(calld->call_combiner, "recv_message_ready null");
1731 }
1732 return;
1733 }
1734 // Received a valid message, so commit the call.
1735 retry_commit(elem, retry_state);
1736 // Manually invoking a callback function; it does not take ownership of error.
1737 invoke_recv_message_callback(batch_data, error);
1738 GRPC_ERROR_UNREF(error);
1739}
1740
1741//
1742// on_complete callback handling
1743//
1744
1745// Updates retry_state to reflect the ops completed in batch_data.
1746static void update_retry_state_for_completed_batch(
1747 subchannel_batch_data* batch_data,
1748 subchannel_call_retry_state* retry_state) {
1749 if (batch_data->batch.send_initial_metadata) {
1750 retry_state->completed_send_initial_metadata = true;
1751 }
1752 if (batch_data->batch.send_message) {
1753 ++retry_state->completed_send_message_count;
1754 }
1755 if (batch_data->batch.send_trailing_metadata) {
1756 retry_state->completed_send_trailing_metadata = true;
1757 }
1758 if (batch_data->batch.recv_initial_metadata) {
1759 retry_state->completed_recv_initial_metadata = true;
1760 }
1761 if (batch_data->batch.recv_message) {
1762 ++retry_state->completed_recv_message_count;
1763 }
1764 if (batch_data->batch.recv_trailing_metadata) {
1765 retry_state->completed_recv_trailing_metadata = true;
1766 }
1767}
1768
1769// Represents a closure that needs to run as a result of a completed batch.
1770typedef struct {
1771 grpc_closure* closure;
1772 grpc_error* error;
1773 const char* reason;
1774} closure_to_execute;
1775
1776// Adds any necessary closures for deferred recv_initial_metadata and
1777// recv_message callbacks to closures, updating *num_closures as needed.
1778static void add_closures_for_deferred_recv_callbacks(
1779 subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state,
1780 closure_to_execute* closures, size_t* num_closures) {
1781 if (batch_data->batch.recv_trailing_metadata &&
1782 retry_state->recv_initial_metadata_ready_deferred) {
1783 closure_to_execute* closure = &closures[(*num_closures)++];
1784 closure->closure =
1785 GRPC_CLOSURE_INIT(&batch_data->recv_initial_metadata_ready,
1786 invoke_recv_initial_metadata_callback, batch_data,
1787 grpc_schedule_on_exec_ctx);
1788 closure->error = retry_state->recv_initial_metadata_error;
1789 closure->reason = "resuming recv_initial_metadata_ready";
1790 }
1791 if (batch_data->batch.recv_trailing_metadata &&
1792 retry_state->recv_message_ready_deferred) {
1793 closure_to_execute* closure = &closures[(*num_closures)++];
1794 closure->closure = GRPC_CLOSURE_INIT(&batch_data->recv_message_ready,
1795 invoke_recv_message_callback,
1796 batch_data, grpc_schedule_on_exec_ctx);
1797 closure->error = retry_state->recv_message_error;
1798 closure->reason = "resuming recv_message_ready";
1799 }
1800}
1801
1802// If there are any cached ops to replay or pending ops to start on the
1803// subchannel call, adds a closure to closures to invoke
1804// start_retriable_subchannel_batches(), updating *num_closures as needed.
1805static void add_closures_for_replay_or_pending_send_ops(
1806 grpc_call_element* elem, subchannel_batch_data* batch_data,
1807 subchannel_call_retry_state* retry_state, closure_to_execute* closures,
1808 size_t* num_closures) {
1809 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1810 call_data* calld = static_cast<call_data*>(elem->call_data);
1811 bool have_pending_send_message_ops =
1812 retry_state->started_send_message_count < calld->send_messages.size();
1813 bool have_pending_send_trailing_metadata_op =
1814 calld->seen_send_trailing_metadata &&
1815 !retry_state->started_send_trailing_metadata;
1816 if (!have_pending_send_message_ops &&
1817 !have_pending_send_trailing_metadata_op) {
1818 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1819 pending_batch* pending = &calld->pending_batches[i];
1820 grpc_transport_stream_op_batch* batch = pending->batch;
1821 if (batch == nullptr || pending->send_ops_cached) continue;
1822 if (batch->send_message) have_pending_send_message_ops = true;
1823 if (batch->send_trailing_metadata) {
1824 have_pending_send_trailing_metadata_op = true;
1825 }
1826 }
1827 }
1828 if (have_pending_send_message_ops || have_pending_send_trailing_metadata_op) {
1829 if (grpc_client_channel_trace.enabled()) {
1830 gpr_log(GPR_DEBUG,
1831 "chand=%p calld=%p: starting next batch for pending send op(s)",
1832 chand, calld);
1833 }
1834 closure_to_execute* closure = &closures[(*num_closures)++];
1835 closure->closure = GRPC_CLOSURE_INIT(
1836 &batch_data->batch.handler_private.closure,
1837 start_retriable_subchannel_batches, elem, grpc_schedule_on_exec_ctx);
1838 closure->error = GRPC_ERROR_NONE;
1839 closure->reason = "starting next batch for send_* op(s)";
1840 }
1841}
1842
1843// For any pending batch completed in batch_data, adds the necessary
1844// completion closures to closures, updating *num_closures as needed.
1845static void add_closures_for_completed_pending_batches(
1846 grpc_call_element* elem, subchannel_batch_data* batch_data,
1847 subchannel_call_retry_state* retry_state, grpc_error* error,
1848 closure_to_execute* closures, size_t* num_closures) {
1849 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1850 call_data* calld = static_cast<call_data*>(elem->call_data);
1851 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1852 pending_batch* pending = &calld->pending_batches[i];
1853 if (pending_batch_is_completed(pending, calld, retry_state)) {
1854 if (grpc_client_channel_trace.enabled()) {
1855 gpr_log(GPR_DEBUG,
1856 "chand=%p calld=%p: pending batch completed at index %" PRIuPTR,
1857 chand, calld, i);
1858 }
1859 // Copy the trailing metadata to return it to the surface.
1860 if (batch_data->batch.recv_trailing_metadata) {
1861 grpc_metadata_batch_move(&batch_data->recv_trailing_metadata,
1862 pending->batch->payload->recv_trailing_metadata
1863 .recv_trailing_metadata);
1864 }
1865 closure_to_execute* closure = &closures[(*num_closures)++];
1866 closure->closure = pending->batch->on_complete;
1867 closure->error = GRPC_ERROR_REF(error);
1868 closure->reason = "on_complete for pending batch";
1869 pending->batch->on_complete = nullptr;
1870 maybe_clear_pending_batch(elem, pending);
1871 }
1872 }
1873 GRPC_ERROR_UNREF(error);
1874}
1875
1876// For any pending batch containing an op that has not yet been started,
1877// adds the pending batch's completion closures to closures, updating
1878// *num_closures as needed.
1879static void add_closures_to_fail_unstarted_pending_batches(
1880 grpc_call_element* elem, subchannel_call_retry_state* retry_state,
1881 grpc_error* error, closure_to_execute* closures, size_t* num_closures) {
1882 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1883 call_data* calld = static_cast<call_data*>(elem->call_data);
1884 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1885 pending_batch* pending = &calld->pending_batches[i];
1886 if (pending_batch_is_unstarted(pending, calld, retry_state)) {
1887 if (grpc_client_channel_trace.enabled()) {
1888 gpr_log(GPR_DEBUG,
1889 "chand=%p calld=%p: failing unstarted pending batch at index "
1890 "%" PRIuPTR,
1891 chand, calld, i);
1892 }
1893 if (pending->batch->recv_initial_metadata) {
1894 closure_to_execute* closure = &closures[(*num_closures)++];
1895 closure->closure = pending->batch->payload->recv_initial_metadata
1896 .recv_initial_metadata_ready;
1897 closure->error = GRPC_ERROR_REF(error);
1898 closure->reason =
1899 "failing recv_initial_metadata_ready for pending batch";
1900 pending->batch->payload->recv_initial_metadata
1901 .recv_initial_metadata_ready = nullptr;
1902 }
1903 if (pending->batch->recv_message) {
1904 *pending->batch->payload->recv_message.recv_message = nullptr;
1905 closure_to_execute* closure = &closures[(*num_closures)++];
1906 closure->closure =
1907 pending->batch->payload->recv_message.recv_message_ready;
1908 closure->error = GRPC_ERROR_REF(error);
1909 closure->reason = "failing recv_message_ready for pending batch";
1910 pending->batch->payload->recv_message.recv_message_ready = nullptr;
1911 }
1912 closure_to_execute* closure = &closures[(*num_closures)++];
1913 closure->closure = pending->batch->on_complete;
1914 closure->error = GRPC_ERROR_REF(error);
1915 closure->reason = "failing on_complete for pending batch";
1916 pending->batch->on_complete = nullptr;
1917 maybe_clear_pending_batch(elem, pending);
1918 }
1919 }
1920 GRPC_ERROR_UNREF(error);
1921}
1922
1923// Callback used to intercept on_complete from subchannel calls.
1924// Called only when retries are enabled.
1925static void on_complete(void* arg, grpc_error* error) {
1926 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1927 grpc_call_element* elem = batch_data->elem;
1928 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1929 call_data* calld = static_cast<call_data*>(elem->call_data);
1930 if (grpc_client_channel_trace.enabled()) {
1931 char* batch_str = grpc_transport_stream_op_batch_string(&batch_data->batch);
1932 gpr_log(GPR_DEBUG, "chand=%p calld=%p: got on_complete, error=%s, batch=%s",
1933 chand, calld, grpc_error_string(error), batch_str);
1934 gpr_free(batch_str);
1935 }
1936 subchannel_call_retry_state* retry_state =
1937 static_cast<subchannel_call_retry_state*>(
1938 grpc_connected_subchannel_call_get_parent_data(
1939 batch_data->subchannel_call));
1940 // If we have previously completed recv_trailing_metadata, then the
1941 // call is finished.
1942 bool call_finished = retry_state->completed_recv_trailing_metadata;
1943 // Update bookkeeping in retry_state.
1944 update_retry_state_for_completed_batch(batch_data, retry_state);
1945 if (call_finished) {
1946 if (grpc_client_channel_trace.enabled()) {
1947 gpr_log(GPR_DEBUG, "chand=%p calld=%p: call already finished", chand,
1948 calld);
1949 }
1950 } else {
1951 // Check if this batch finished the call, and if so, get its status.
1952 // The call is finished if either (a) this callback was invoked with
1953 // an error or (b) we receive status.
1954 grpc_status_code status = GRPC_STATUS_OK;
1955 grpc_mdelem* server_pushback_md = nullptr;
1956 if (error != GRPC_ERROR_NONE) { // Case (a).
1957 call_finished = true;
1958 grpc_error_get_status(error, calld->deadline, &status, nullptr, nullptr,
1959 nullptr);
1960 } else if (batch_data->batch.recv_trailing_metadata) { // Case (b).
1961 call_finished = true;
1962 grpc_metadata_batch* md_batch =
1963 batch_data->batch.payload->recv_trailing_metadata
1964 .recv_trailing_metadata;
1965 GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
1966 status = grpc_get_status_code_from_metadata(
1967 md_batch->idx.named.grpc_status->md);
1968 if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
1969 server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
1970 }
1971 } else if (retry_state->completed_recv_trailing_metadata) {
1972 call_finished = true;
1973 }
1974 if (call_finished && grpc_client_channel_trace.enabled()) {
1975 gpr_log(GPR_DEBUG, "chand=%p calld=%p: call finished, status=%s", chand,
1976 calld, grpc_status_code_to_string(status));
1977 }
1978 // If the call is finished, check if we should retry.
1979 if (call_finished &&
1980 maybe_retry(elem, batch_data, status, server_pushback_md)) {
1981 // Unref batch_data for deferred recv_initial_metadata_ready or
1982 // recv_message_ready callbacks, if any.
1983 if (batch_data->batch.recv_trailing_metadata &&
1984 retry_state->recv_initial_metadata_ready_deferred) {
1985 batch_data_unref(batch_data);
1986 GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
1987 }
1988 if (batch_data->batch.recv_trailing_metadata &&
1989 retry_state->recv_message_ready_deferred) {
1990 batch_data_unref(batch_data);
1991 GRPC_ERROR_UNREF(retry_state->recv_message_error);
1992 }
1993 batch_data_unref(batch_data);
1994 return;
1995 }
1996 }
1997 // If the call is finished or retries are committed, free cached data for
1998 // send ops that we've just completed.
1999 if (call_finished || calld->retry_committed) {
2000 free_cached_send_op_data_for_completed_batch(elem, batch_data, retry_state);
2001 }
2002 // Call not being retried.
2003 // Construct list of closures to execute.
2004 // Max number of closures is number of pending batches plus one for
2005 // each of:
2006 // - recv_initial_metadata_ready (either deferred or unstarted)
2007 // - recv_message_ready (either deferred or unstarted)
2008 // - starting a new batch for pending send ops
2009 closure_to_execute closures[GPR_ARRAY_SIZE(calld->pending_batches) + 3];
2010 size_t num_closures = 0;
2011 // If there are deferred recv_initial_metadata_ready or recv_message_ready
2012 // callbacks, add them to closures.
2013 add_closures_for_deferred_recv_callbacks(batch_data, retry_state, closures,
2014 &num_closures);
2015 // Find pending batches whose ops are now complete and add their
2016 // on_complete callbacks to closures.
2017 add_closures_for_completed_pending_batches(elem, batch_data, retry_state,
2018 GRPC_ERROR_REF(error), closures,
2019 &num_closures);
2020 // Add closures to handle any pending batches that have not yet been started.
2021 // If the call is finished, we fail these batches; otherwise, we add a
2022 // callback to start_retriable_subchannel_batches() to start them on
2023 // the subchannel call.
2024 if (call_finished) {
2025 add_closures_to_fail_unstarted_pending_batches(
2026 elem, retry_state, GRPC_ERROR_REF(error), closures, &num_closures);
2027 } else {
2028 add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state,
2029 closures, &num_closures);
2030 }
2031 // Don't need batch_data anymore.
2032 batch_data_unref(batch_data);
2033 // Schedule all of the closures identified above.
2034 // Note that the call combiner will be yielded for each closure that
2035 // we schedule. We're already running in the call combiner, so one of
2036 // the closures can be scheduled directly, but the others will
2037 // have to re-enter the call combiner.
2038 if (num_closures > 0) {
2039 GRPC_CLOSURE_SCHED(closures[0].closure, closures[0].error);
2040 for (size_t i = 1; i < num_closures; ++i) {
2041 GRPC_CALL_COMBINER_START(calld->call_combiner, closures[i].closure,
2042 closures[i].error, closures[i].reason);
2043 }
2044 } else {
2045 GRPC_CALL_COMBINER_STOP(calld->call_combiner,
2046 "no closures to run for on_complete");
2047 }
2048}
2049
2050//
2051// subchannel batch construction
2052//
2053
2054// Helper function used to start a subchannel batch in the call combiner.
2055static void start_batch_in_call_combiner(void* arg, grpc_error* ignored) {
2056 grpc_transport_stream_op_batch* batch =
2057 static_cast<grpc_transport_stream_op_batch*>(arg);
2058 grpc_subchannel_call* subchannel_call =
2059 static_cast<grpc_subchannel_call*>(batch->handler_private.extra_arg);
2060 // Note: This will release the call combiner.
2061 grpc_subchannel_call_process_op(subchannel_call, batch);
2062}
2063
2064// Adds retriable send_initial_metadata op to batch_data.
2065static void add_retriable_send_initial_metadata_op(
2066 call_data* calld, subchannel_call_retry_state* retry_state,
2067 subchannel_batch_data* batch_data) {
2068 // Maps the number of retries to the corresponding metadata value slice.
2069 static const grpc_slice* retry_count_strings[] = {
2070 &GRPC_MDSTR_1, &GRPC_MDSTR_2, &GRPC_MDSTR_3, &GRPC_MDSTR_4};
2071 // We need to make a copy of the metadata batch for each attempt, since
2072 // the filters in the subchannel stack may modify this batch, and we don't
2073 // want those modifications to be passed forward to subsequent attempts.
2074 //
2075 // If we've already completed one or more attempts, add the
2076 // grpc-retry-attempts header.
2077 batch_data->send_initial_metadata_storage =
2078 static_cast<grpc_linked_mdelem*>(gpr_arena_alloc(
2079 calld->arena, sizeof(grpc_linked_mdelem) *
2080 (calld->send_initial_metadata.list.count +
2081 (calld->num_attempts_completed > 0))));
2082 grpc_metadata_batch_copy(&calld->send_initial_metadata,
2083 &batch_data->send_initial_metadata,
2084 batch_data->send_initial_metadata_storage);
2085 if (batch_data->send_initial_metadata.idx.named.grpc_previous_rpc_attempts !=
2086 nullptr) {
2087 grpc_metadata_batch_remove(
2088 &batch_data->send_initial_metadata,
2089 batch_data->send_initial_metadata.idx.named.grpc_previous_rpc_attempts);
2090 }
2091 if (calld->num_attempts_completed > 0) {
2092 grpc_mdelem retry_md = grpc_mdelem_from_slices(
2093 GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS,
2094 *retry_count_strings[calld->num_attempts_completed - 1]);
2095 grpc_error* error = grpc_metadata_batch_add_tail(
2096 &batch_data->send_initial_metadata,
2097 &batch_data->send_initial_metadata_storage[calld->send_initial_metadata
2098 .list.count],
2099 retry_md);
2100 if (error != GRPC_ERROR_NONE) {
2101 gpr_log(GPR_ERROR, "error adding retry metadata: %s",
2102 grpc_error_string(error));
2103 GPR_ASSERT(false);
2104 }
2105 }
2106 retry_state->started_send_initial_metadata = true;
2107 batch_data->batch.send_initial_metadata = true;
2108 batch_data->batch.payload->send_initial_metadata.send_initial_metadata =
2109 &batch_data->send_initial_metadata;
2110 batch_data->batch.payload->send_initial_metadata.send_initial_metadata_flags =
2111 calld->send_initial_metadata_flags;
2112 batch_data->batch.payload->send_initial_metadata.peer_string =
2113 calld->peer_string;
2114}
2115
2116// Adds retriable send_message op to batch_data.
2117static void add_retriable_send_message_op(
2118 grpc_call_element* elem, subchannel_call_retry_state* retry_state,
2119 subchannel_batch_data* batch_data) {
2120 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2121 call_data* calld = static_cast<call_data*>(elem->call_data);
2122 if (grpc_client_channel_trace.enabled()) {
2123 gpr_log(GPR_DEBUG,
2124 "chand=%p calld=%p: starting calld->send_messages[%" PRIuPTR "]",
2125 chand, calld, retry_state->started_send_message_count);
2126 }
2127 grpc_byte_stream_cache* cache =
2128 calld->send_messages[retry_state->started_send_message_count];
2129 ++retry_state->started_send_message_count;
2130 grpc_caching_byte_stream_init(&batch_data->send_message, cache);
2131 batch_data->batch.send_message = true;
2132 batch_data->batch.payload->send_message.send_message =
2133 &batch_data->send_message.base;
2134}
2135
2136// Adds retriable send_trailing_metadata op to batch_data.
2137static void add_retriable_send_trailing_metadata_op(
2138 call_data* calld, subchannel_call_retry_state* retry_state,
2139 subchannel_batch_data* batch_data) {
2140 // We need to make a copy of the metadata batch for each attempt, since
2141 // the filters in the subchannel stack may modify this batch, and we don't
2142 // want those modifications to be passed forward to subsequent attempts.
2143 batch_data->send_trailing_metadata_storage =
2144 static_cast<grpc_linked_mdelem*>(gpr_arena_alloc(
2145 calld->arena, sizeof(grpc_linked_mdelem) *
2146 calld->send_trailing_metadata.list.count));
2147 grpc_metadata_batch_copy(&calld->send_trailing_metadata,
2148 &batch_data->send_trailing_metadata,
2149 batch_data->send_trailing_metadata_storage);
2150 retry_state->started_send_trailing_metadata = true;
2151 batch_data->batch.send_trailing_metadata = true;
2152 batch_data->batch.payload->send_trailing_metadata.send_trailing_metadata =
2153 &batch_data->send_trailing_metadata;
2154}
2155
2156// Adds retriable recv_initial_metadata op to batch_data.
2157static void add_retriable_recv_initial_metadata_op(
2158 call_data* calld, subchannel_call_retry_state* retry_state,
2159 subchannel_batch_data* batch_data) {
2160 retry_state->started_recv_initial_metadata = true;
2161 batch_data->batch.recv_initial_metadata = true;
2162 grpc_metadata_batch_init(&batch_data->recv_initial_metadata);
2163 batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata =
2164 &batch_data->recv_initial_metadata;
2165 batch_data->batch.payload->recv_initial_metadata.trailing_metadata_available =
2166 &batch_data->trailing_metadata_available;
2167 GRPC_CLOSURE_INIT(&batch_data->recv_initial_metadata_ready,
2168 recv_initial_metadata_ready, batch_data,
2169 grpc_schedule_on_exec_ctx);
2170 batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata_ready =
2171 &batch_data->recv_initial_metadata_ready;
2172}
2173
2174// Adds retriable recv_message op to batch_data.
2175static void add_retriable_recv_message_op(
2176 call_data* calld, subchannel_call_retry_state* retry_state,
2177 subchannel_batch_data* batch_data) {
2178 ++retry_state->started_recv_message_count;
2179 batch_data->batch.recv_message = true;
2180 batch_data->batch.payload->recv_message.recv_message =
2181 &batch_data->recv_message;
2182 GRPC_CLOSURE_INIT(&batch_data->recv_message_ready, recv_message_ready,
2183 batch_data, grpc_schedule_on_exec_ctx);
2184 batch_data->batch.payload->recv_message.recv_message_ready =
2185 &batch_data->recv_message_ready;
2186}
2187
2188// Adds retriable recv_trailing_metadata op to batch_data.
2189static void add_retriable_recv_trailing_metadata_op(
2190 call_data* calld, subchannel_call_retry_state* retry_state,
2191 subchannel_batch_data* batch_data) {
2192 retry_state->started_recv_trailing_metadata = true;
2193 batch_data->batch.recv_trailing_metadata = true;
2194 grpc_metadata_batch_init(&batch_data->recv_trailing_metadata);
2195 batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata =
2196 &batch_data->recv_trailing_metadata;
2197 batch_data->batch.collect_stats = true;
2198 batch_data->batch.payload->collect_stats.collect_stats =
2199 &batch_data->collect_stats;
2200}
2201
2202// Helper function used to start a recv_trailing_metadata batch. This
2203// is used in the case where a recv_initial_metadata or recv_message
2204// op fails in a way that we know the call is over but when the application
2205// has not yet started its own recv_trailing_metadata op.
2206static void start_internal_recv_trailing_metadata(grpc_call_element* elem) {
2207 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2208 call_data* calld = static_cast<call_data*>(elem->call_data);
2209 if (grpc_client_channel_trace.enabled()) {
2210 gpr_log(GPR_DEBUG,
2211 "chand=%p calld=%p: call failed but recv_trailing_metadata not "
2212 "started; starting it internally",
2213 chand, calld);
2214 }
2215 subchannel_call_retry_state* retry_state =
2216 static_cast<subchannel_call_retry_state*>(
2217 grpc_connected_subchannel_call_get_parent_data(
2218 calld->subchannel_call));
2219 subchannel_batch_data* batch_data = batch_data_create(elem, 1);
2220 add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
2221 // Note: This will release the call combiner.
2222 grpc_subchannel_call_process_op(calld->subchannel_call, &batch_data->batch);
2223}
2224
2225// If there are any cached send ops that need to be replayed on the
2226// current subchannel call, creates and returns a new subchannel batch
2227// to replay those ops. Otherwise, returns nullptr.
2228static subchannel_batch_data* maybe_create_subchannel_batch_for_replay(
2229 grpc_call_element* elem, subchannel_call_retry_state* retry_state) {
2230 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2231 call_data* calld = static_cast<call_data*>(elem->call_data);
2232 subchannel_batch_data* replay_batch_data = nullptr;
2233 // send_initial_metadata.
2234 if (calld->seen_send_initial_metadata &&
2235 !retry_state->started_send_initial_metadata &&
2236 !calld->pending_send_initial_metadata) {
2237 if (grpc_client_channel_trace.enabled()) {
2238 gpr_log(GPR_DEBUG,
2239 "chand=%p calld=%p: replaying previously completed "
2240 "send_initial_metadata op",
2241 chand, calld);
2242 }
2243 replay_batch_data = batch_data_create(elem, 1);
2244 add_retriable_send_initial_metadata_op(calld, retry_state,
2245 replay_batch_data);
2246 }
2247 // send_message.
2248 // Note that we can only have one send_message op in flight at a time.
2249 if (retry_state->started_send_message_count < calld->send_messages.size() &&
2250 retry_state->started_send_message_count ==
2251 retry_state->completed_send_message_count &&
2252 !calld->pending_send_message) {
2253 if (grpc_client_channel_trace.enabled()) {
2254 gpr_log(GPR_DEBUG,
2255 "chand=%p calld=%p: replaying previously completed "
2256 "send_message op",
2257 chand, calld);
2258 }
2259 if (replay_batch_data == nullptr) {
2260 replay_batch_data = batch_data_create(elem, 1);
2261 }
2262 add_retriable_send_message_op(elem, retry_state, replay_batch_data);
2263 }
2264 // send_trailing_metadata.
2265 // Note that we only add this op if we have no more send_message ops
2266 // to start, since we can't send down any more send_message ops after
2267 // send_trailing_metadata.
2268 if (calld->seen_send_trailing_metadata &&
2269 retry_state->started_send_message_count == calld->send_messages.size() &&
2270 !retry_state->started_send_trailing_metadata &&
2271 !calld->pending_send_trailing_metadata) {
2272 if (grpc_client_channel_trace.enabled()) {
2273 gpr_log(GPR_DEBUG,
2274 "chand=%p calld=%p: replaying previously completed "
2275 "send_trailing_metadata op",
2276 chand, calld);
2277 }
2278 if (replay_batch_data == nullptr) {
2279 replay_batch_data = batch_data_create(elem, 1);
2280 }
2281 add_retriable_send_trailing_metadata_op(calld, retry_state,
2282 replay_batch_data);
2283 }
2284 return replay_batch_data;
2285}
2286
2287// Adds subchannel batches for pending batches to batches, updating
2288// *num_batches as needed.
2289static void add_subchannel_batches_for_pending_batches(
2290 grpc_call_element* elem, subchannel_call_retry_state* retry_state,
2291 grpc_transport_stream_op_batch** batches, size_t* num_batches) {
2292 call_data* calld = static_cast<call_data*>(elem->call_data);
2293 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
2294 pending_batch* pending = &calld->pending_batches[i];
2295 grpc_transport_stream_op_batch* batch = pending->batch;
2296 if (batch == nullptr) continue;
2297 // Skip any batch that either (a) has already been started on this
2298 // subchannel call or (b) we can't start yet because we're still
2299 // replaying send ops that need to be completed first.
2300 // TODO(roth): Note that if any one op in the batch can't be sent
2301 // yet due to ops that we're replaying, we don't start any of the ops
2302 // in the batch. This is probably okay, but it could conceivably
2303 // lead to increased latency in some cases -- e.g., we could delay
2304 // starting a recv op due to it being in the same batch with a send
2305 // op. If/when we revamp the callback protocol in
2306 // transport_stream_op_batch, we may be able to fix this.
2307 if (batch->send_initial_metadata &&
2308 retry_state->started_send_initial_metadata) {
2309 continue;
2310 }
2311 if (batch->send_message && retry_state->completed_send_message_count <
2312 retry_state->started_send_message_count) {
2313 continue;
2314 }
2315 // Note that we only start send_trailing_metadata if we have no more
2316 // send_message ops to start, since we can't send down any more
2317 // send_message ops after send_trailing_metadata.
2318 if (batch->send_trailing_metadata &&
2319 (retry_state->started_send_message_count + batch->send_message <
2320 calld->send_messages.size() ||
2321 retry_state->started_send_trailing_metadata)) {
2322 continue;
2323 }
2324 if (batch->recv_initial_metadata &&
2325 retry_state->started_recv_initial_metadata) {
2326 continue;
2327 }
2328 if (batch->recv_message && retry_state->completed_recv_message_count <
2329 retry_state->started_recv_message_count) {
2330 continue;
2331 }
2332 if (batch->recv_trailing_metadata &&
2333 retry_state->started_recv_trailing_metadata) {
2334 continue;
2335 }
2336 // If we're not retrying, just send the batch as-is.
2337 if (calld->method_params == nullptr ||
2338 calld->method_params->retry_policy() == nullptr ||
2339 calld->retry_committed) {
2340 batches[(*num_batches)++] = batch;
2341 pending_batch_clear(calld, pending);
2342 continue;
2343 }
2344 // Create batch with the right number of callbacks.
2345 const int num_callbacks =
2346 1 + batch->recv_initial_metadata + batch->recv_message;
2347 subchannel_batch_data* batch_data = batch_data_create(elem, num_callbacks);
2348 // Cache send ops if needed.
2349 maybe_cache_send_ops_for_batch(calld, pending);
2350 // send_initial_metadata.
2351 if (batch->send_initial_metadata) {
2352 add_retriable_send_initial_metadata_op(calld, retry_state, batch_data);
2353 }
2354 // send_message.
2355 if (batch->send_message) {
2356 add_retriable_send_message_op(elem, retry_state, batch_data);
2357 }
2358 // send_trailing_metadata.
2359 if (batch->send_trailing_metadata) {
2360 add_retriable_send_trailing_metadata_op(calld, retry_state, batch_data);
2361 }
2362 // recv_initial_metadata.
2363 if (batch->recv_initial_metadata) {
2364 // recv_flags is only used on the server side.
2365 GPR_ASSERT(batch->payload->recv_initial_metadata.recv_flags == nullptr);
2366 add_retriable_recv_initial_metadata_op(calld, retry_state, batch_data);
2367 }
2368 // recv_message.
2369 if (batch->recv_message) {
2370 add_retriable_recv_message_op(calld, retry_state, batch_data);
2371 }
2372 // recv_trailing_metadata.
2373 if (batch->recv_trailing_metadata) {
2374 GPR_ASSERT(batch->collect_stats);
2375 add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
2376 }
2377 batches[(*num_batches)++] = &batch_data->batch;
2378 }
2379}
2380
2381// Constructs and starts whatever subchannel batches are needed on the
2382// subchannel call.
2383static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) {
2384 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2385 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2386 call_data* calld = static_cast<call_data*>(elem->call_data);
2387 if (grpc_client_channel_trace.enabled()) {
2388 gpr_log(GPR_DEBUG, "chand=%p calld=%p: constructing retriable batches",
2389 chand, calld);
2390 }
2391 subchannel_call_retry_state* retry_state =
2392 static_cast<subchannel_call_retry_state*>(
2393 grpc_connected_subchannel_call_get_parent_data(
2394 calld->subchannel_call));
2395 // We can start up to 6 batches.
2396 grpc_transport_stream_op_batch*
2397 batches[GPR_ARRAY_SIZE(calld->pending_batches)];
2398 size_t num_batches = 0;
2399 // Replay previously-returned send_* ops if needed.
2400 subchannel_batch_data* replay_batch_data =
2401 maybe_create_subchannel_batch_for_replay(elem, retry_state);
2402 if (replay_batch_data != nullptr) {
2403 batches[num_batches++] = &replay_batch_data->batch;
2404 }
2405 // Now add pending batches.
2406 add_subchannel_batches_for_pending_batches(elem, retry_state, batches,
2407 &num_batches);
2408 // Start batches on subchannel call.
2409 // Note that the call combiner will be yielded for each batch that we
2410 // send down. We're already running in the call combiner, so one of
2411 // the batches can be started directly, but the others will have to
2412 // re-enter the call combiner.
2413 if (grpc_client_channel_trace.enabled()) {
2414 gpr_log(GPR_DEBUG,
2415 "chand=%p calld=%p: starting %" PRIuPTR
2416 " retriable batches on subchannel_call=%p",
2417 chand, calld, num_batches, calld->subchannel_call);
2418 }
2419 if (num_batches == 0) {
2420 // This should be fairly rare, but it can happen when (e.g.) an
2421 // attempt completes before it has finished replaying all
2422 // previously sent messages.
2423 GRPC_CALL_COMBINER_STOP(calld->call_combiner,
2424 "no retriable subchannel batches to start");
2425 } else {
2426 for (size_t i = 1; i < num_batches; ++i) {
2427 if (grpc_client_channel_trace.enabled()) {
2428 char* batch_str = grpc_transport_stream_op_batch_string(batches[i]);
2429 gpr_log(GPR_DEBUG,
2430 "chand=%p calld=%p: starting batch in call combiner: %s", chand,
2431 calld, batch_str);
2432 gpr_free(batch_str);
2433 }
2434 batches[i]->handler_private.extra_arg = calld->subchannel_call;
2435 GRPC_CLOSURE_INIT(&batches[i]->handler_private.closure,
2436 start_batch_in_call_combiner, batches[i],
2437 grpc_schedule_on_exec_ctx);
2438 GRPC_CALL_COMBINER_START(calld->call_combiner,
2439 &batches[i]->handler_private.closure,
2440 GRPC_ERROR_NONE, "start_subchannel_batch");
2441 }
2442 if (grpc_client_channel_trace.enabled()) {
2443 char* batch_str = grpc_transport_stream_op_batch_string(batches[0]);
2444 gpr_log(GPR_DEBUG, "chand=%p calld=%p: starting batch: %s", chand, calld,
2445 batch_str);
2446 gpr_free(batch_str);
2447 }
2448 // Note: This will release the call combiner.
2449 grpc_subchannel_call_process_op(calld->subchannel_call, batches[0]);
2450 }
2451}
2452
2453//
2454// LB pick
2455//
2456
2457static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) {
2458 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2459 call_data* calld = static_cast<call_data*>(elem->call_data);
2460 const size_t parent_data_size =
2461 calld->enable_retries ? sizeof(subchannel_call_retry_state) : 0;
David Garcia Quintasbaf1ac72018-01-09 14:24:32 -08002462 const grpc_core::ConnectedSubchannel::CallArgs call_args = {
Mark D. Rothc0febd32018-01-09 10:25:24 -08002463 calld->pollent, // pollent
2464 calld->path, // path
2465 calld->call_start_time, // start_time
2466 calld->deadline, // deadline
2467 calld->arena, // arena
2468 calld->pick.subchannel_call_context, // context
Mark D. Roth718c8342018-02-28 13:00:04 -08002469 calld->call_combiner, // call_combiner
2470 parent_data_size // parent_data_size
Yash Tibrewald8b84a22017-09-25 13:38:03 -07002471 };
David Garcia Quintas70fbe622018-01-09 19:27:46 -08002472 grpc_error* new_error = calld->pick.connected_subchannel->CreateCall(
David Garcia Quintasbaf1ac72018-01-09 14:24:32 -08002473 call_args, &calld->subchannel_call);
Craig Tiller6014e8a2017-10-16 13:50:29 -07002474 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth60751fe2017-07-07 12:50:33 -07002475 gpr_log(GPR_DEBUG, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
Mark D. Roth76e264b2017-08-25 09:03:33 -07002476 chand, calld, calld->subchannel_call, grpc_error_string(new_error));
Mark D. Roth60751fe2017-07-07 12:50:33 -07002477 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -07002478 if (new_error != GRPC_ERROR_NONE) {
2479 new_error = grpc_error_add_child(new_error, error);
Mark D. Roth718c8342018-02-28 13:00:04 -08002480 pending_batches_fail(elem, new_error, true /* yield_call_combiner */);
Mark D. Roth0ca0be82017-06-20 07:49:33 -07002481 } else {
Mark D. Roth718c8342018-02-28 13:00:04 -08002482 if (parent_data_size > 0) {
2483 subchannel_call_retry_state* retry_state =
2484 static_cast<subchannel_call_retry_state*>(
2485 grpc_connected_subchannel_call_get_parent_data(
2486 calld->subchannel_call));
2487 retry_state->batch_payload.context = calld->pick.subchannel_call_context;
2488 }
2489 pending_batches_resume(elem);
Craig Tiller11c17d42017-03-13 13:36:34 -07002490 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -07002491 GRPC_ERROR_UNREF(error);
Craig Tiller11c17d42017-03-13 13:36:34 -07002492}
2493
Mark D. Rothb2929602017-09-11 09:31:11 -07002494// Invoked when a pick is completed, on both success or failure.
Mark D. Roth718c8342018-02-28 13:00:04 -08002495static void pick_done(void* arg, grpc_error* error) {
2496 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
Noah Eisenbe82e642018-02-09 09:16:55 -08002497 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Mark D. Roth718c8342018-02-28 13:00:04 -08002498 call_data* calld = static_cast<call_data*>(elem->call_data);
David Garcia Quintasbe1b7f92018-01-12 14:01:38 -08002499 if (calld->pick.connected_subchannel == nullptr) {
Mark D. Roth0ca0be82017-06-20 07:49:33 -07002500 // Failed to create subchannel.
Mark D. Roth718c8342018-02-28 13:00:04 -08002501 // If there was no error, this is an LB policy drop, in which case
2502 // we return an error; otherwise, we may retry.
2503 grpc_status_code status = GRPC_STATUS_OK;
2504 grpc_error_get_status(error, calld->deadline, &status, nullptr, nullptr,
2505 nullptr);
2506 if (error == GRPC_ERROR_NONE || !calld->enable_retries ||
2507 !maybe_retry(elem, nullptr /* batch_data */, status,
2508 nullptr /* server_pushback_md */)) {
2509 grpc_error* new_error =
2510 error == GRPC_ERROR_NONE
2511 ? GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2512 "Call dropped by load balancing policy")
2513 : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2514 "Failed to create subchannel", &error, 1);
2515 if (grpc_client_channel_trace.enabled()) {
2516 gpr_log(GPR_DEBUG,
2517 "chand=%p calld=%p: failed to create subchannel: error=%s",
2518 chand, calld, grpc_error_string(new_error));
2519 }
2520 pending_batches_fail(elem, new_error, true /* yield_call_combiner */);
Mark D. Roth60751fe2017-07-07 12:50:33 -07002521 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -07002522 } else {
Mark D. Roth9fe284e2016-09-12 11:22:27 -07002523 /* Create call on subchannel. */
Mark D. Roth718c8342018-02-28 13:00:04 -08002524 create_subchannel_call(elem, GRPC_ERROR_REF(error));
Mark D. Roth2a5959f2016-09-01 08:20:27 -07002525 }
Mark D. Roth718c8342018-02-28 13:00:04 -08002526}
2527
2528// Invoked when a pick is completed to leave the client_channel combiner
2529// and continue processing in the call combiner.
2530static void pick_done_locked(grpc_call_element* elem, grpc_error* error) {
2531 call_data* calld = static_cast<call_data*>(elem->call_data);
2532 GRPC_CLOSURE_INIT(&calld->pick_closure, pick_done, elem,
2533 grpc_schedule_on_exec_ctx);
2534 GRPC_CLOSURE_SCHED(&calld->pick_closure, error);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07002535}
2536
Mark D. Rothb2929602017-09-11 09:31:11 -07002537// A wrapper around pick_done_locked() that is used in cases where
2538// either (a) the pick was deferred pending a resolver result or (b) the
2539// pick was done asynchronously. Removes the call's polling entity from
2540// chand->interested_parties before invoking pick_done_locked().
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002541static void async_pick_done_locked(grpc_call_element* elem, grpc_error* error) {
Noah Eisenbe82e642018-02-09 09:16:55 -08002542 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2543 call_data* calld = static_cast<call_data*>(elem->call_data);
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002544 grpc_polling_entity_del_from_pollset_set(calld->pollent,
Mark D. Rothb2929602017-09-11 09:31:11 -07002545 chand->interested_parties);
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002546 pick_done_locked(elem, error);
Mark D. Rothb2929602017-09-11 09:31:11 -07002547}
2548
2549// Note: This runs under the client_channel combiner, but will NOT be
2550// holding the call combiner.
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002551static void pick_callback_cancel_locked(void* arg, grpc_error* error) {
Noah Eisenbe82e642018-02-09 09:16:55 -08002552 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2553 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2554 call_data* calld = static_cast<call_data*>(elem->call_data);
Mark D. Rothc0febd32018-01-09 10:25:24 -08002555 // Note: chand->lb_policy may have changed since we started our pick,
2556 // in which case we will be cancelling the pick on a policy other than
2557 // the one we started it on. However, this will just be a no-op.
2558 if (error != GRPC_ERROR_NONE && chand->lb_policy != nullptr) {
Craig Tiller6014e8a2017-10-16 13:50:29 -07002559 if (grpc_client_channel_trace.enabled()) {
Mark D. Rothb2929602017-09-11 09:31:11 -07002560 gpr_log(GPR_DEBUG, "chand=%p calld=%p: cancelling pick from LB policy %p",
Mark D. Rothc8875492018-02-20 08:33:48 -08002561 chand, calld, chand->lb_policy.get());
Mark D. Rothb2929602017-09-11 09:31:11 -07002562 }
Mark D. Rothc8875492018-02-20 08:33:48 -08002563 chand->lb_policy->CancelPickLocked(&calld->pick, GRPC_ERROR_REF(error));
Mark D. Rothb2929602017-09-11 09:31:11 -07002564 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002565 GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback_cancel");
Mark D. Rothb2929602017-09-11 09:31:11 -07002566}
2567
Mark D. Rothc8875492018-02-20 08:33:48 -08002568// Callback invoked by LoadBalancingPolicy::PickLocked() for async picks.
Mark D. Rothb2929602017-09-11 09:31:11 -07002569// Unrefs the LB policy and invokes async_pick_done_locked().
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002570static void pick_callback_done_locked(void* arg, grpc_error* error) {
Noah Eisenbe82e642018-02-09 09:16:55 -08002571 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2572 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2573 call_data* calld = static_cast<call_data*>(elem->call_data);
Craig Tiller6014e8a2017-10-16 13:50:29 -07002574 if (grpc_client_channel_trace.enabled()) {
Mark D. Rothb2929602017-09-11 09:31:11 -07002575 gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed asynchronously",
2576 chand, calld);
2577 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002578 async_pick_done_locked(elem, GRPC_ERROR_REF(error));
Ken Paysonf069dd42018-02-05 09:15:05 -08002579 GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback");
Mark D. Rothb2929602017-09-11 09:31:11 -07002580}
2581
Mark D. Roth718c8342018-02-28 13:00:04 -08002582// Applies service config to the call. Must be invoked once we know
2583// that the resolver has returned results to the channel.
2584static void apply_service_config_to_call_locked(grpc_call_element* elem) {
2585 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2586 call_data* calld = static_cast<call_data*>(elem->call_data);
2587 if (grpc_client_channel_trace.enabled()) {
2588 gpr_log(GPR_DEBUG, "chand=%p calld=%p: applying service config to call",
2589 chand, calld);
2590 }
2591 if (chand->retry_throttle_data != nullptr) {
2592 calld->retry_throttle_data =
2593 grpc_server_retry_throttle_data_ref(chand->retry_throttle_data);
2594 }
2595 if (chand->method_params_table != nullptr) {
2596 calld->method_params = grpc_core::ServiceConfig::MethodConfigTableLookup(
2597 *chand->method_params_table, calld->path);
2598 if (calld->method_params != nullptr) {
2599 // If the deadline from the service config is shorter than the one
2600 // from the client API, reset the deadline timer.
2601 if (chand->deadline_checking_enabled &&
2602 calld->method_params->timeout() != 0) {
2603 const grpc_millis per_method_deadline =
2604 grpc_timespec_to_millis_round_up(calld->call_start_time) +
2605 calld->method_params->timeout();
2606 if (per_method_deadline < calld->deadline) {
2607 calld->deadline = per_method_deadline;
2608 grpc_deadline_state_reset(elem, calld->deadline);
2609 }
2610 }
2611 }
2612 }
2613 // If no retry policy, disable retries.
2614 // TODO(roth): Remove this when adding support for transparent retries.
2615 if (calld->method_params == nullptr ||
2616 calld->method_params->retry_policy() == nullptr) {
2617 calld->enable_retries = false;
2618 }
2619}
2620
Mark D. Rothc8875492018-02-20 08:33:48 -08002621// Starts a pick on chand->lb_policy.
2622// Returns true if pick is completed synchronously.
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002623static bool pick_callback_start_locked(grpc_call_element* elem) {
Noah Eisenbe82e642018-02-09 09:16:55 -08002624 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2625 call_data* calld = static_cast<call_data*>(elem->call_data);
Craig Tiller6014e8a2017-10-16 13:50:29 -07002626 if (grpc_client_channel_trace.enabled()) {
Mark D. Rothb2929602017-09-11 09:31:11 -07002627 gpr_log(GPR_DEBUG, "chand=%p calld=%p: starting pick on lb_policy=%p",
Mark D. Rothc8875492018-02-20 08:33:48 -08002628 chand, calld, chand->lb_policy.get());
Mark D. Rothb2929602017-09-11 09:31:11 -07002629 }
Mark D. Roth718c8342018-02-28 13:00:04 -08002630 // Only get service config data on the first attempt.
2631 if (calld->num_attempts_completed == 0) {
2632 apply_service_config_to_call_locked(elem);
2633 }
Mark D. Rothb2929602017-09-11 09:31:11 -07002634 // If the application explicitly set wait_for_ready, use that.
2635 // Otherwise, if the service config specified a value for this
2636 // method, use that.
Mark D. Roth718c8342018-02-28 13:00:04 -08002637 //
2638 // The send_initial_metadata batch will be the first one in the list,
2639 // as set by get_batch_index() above.
2640 calld->pick.initial_metadata =
2641 calld->seen_send_initial_metadata
2642 ? &calld->send_initial_metadata
2643 : calld->pending_batches[0]
2644 .batch->payload->send_initial_metadata.send_initial_metadata;
2645 uint32_t send_initial_metadata_flags =
2646 calld->seen_send_initial_metadata
2647 ? calld->send_initial_metadata_flags
2648 : calld->pending_batches[0]
2649 .batch->payload->send_initial_metadata
2650 .send_initial_metadata_flags;
Mark D. Rothb2929602017-09-11 09:31:11 -07002651 const bool wait_for_ready_set_from_api =
Mark D. Roth718c8342018-02-28 13:00:04 -08002652 send_initial_metadata_flags &
Mark D. Rothb2929602017-09-11 09:31:11 -07002653 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
2654 const bool wait_for_ready_set_from_service_config =
Craig Tiller4782d922017-11-10 09:53:21 -08002655 calld->method_params != nullptr &&
Mark D. Roth3e7f2df2018-02-26 13:17:06 -08002656 calld->method_params->wait_for_ready() !=
2657 ClientChannelMethodParams::WAIT_FOR_READY_UNSET;
Mark D. Rothb2929602017-09-11 09:31:11 -07002658 if (!wait_for_ready_set_from_api && wait_for_ready_set_from_service_config) {
Mark D. Roth3e7f2df2018-02-26 13:17:06 -08002659 if (calld->method_params->wait_for_ready() ==
2660 ClientChannelMethodParams::WAIT_FOR_READY_TRUE) {
Mark D. Roth718c8342018-02-28 13:00:04 -08002661 send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
Mark D. Rothb2929602017-09-11 09:31:11 -07002662 } else {
Mark D. Roth718c8342018-02-28 13:00:04 -08002663 send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
Mark D. Rothb2929602017-09-11 09:31:11 -07002664 }
2665 }
Mark D. Roth718c8342018-02-28 13:00:04 -08002666 calld->pick.initial_metadata_flags = send_initial_metadata_flags;
2667 GRPC_CLOSURE_INIT(&calld->pick_closure, pick_callback_done_locked, elem,
Mark D. Rothb2929602017-09-11 09:31:11 -07002668 grpc_combiner_scheduler(chand->combiner));
Mark D. Roth718c8342018-02-28 13:00:04 -08002669 calld->pick.on_complete = &calld->pick_closure;
Ken Paysonf069dd42018-02-05 09:15:05 -08002670 GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback");
Mark D. Rothc8875492018-02-20 08:33:48 -08002671 const bool pick_done = chand->lb_policy->PickLocked(&calld->pick);
Mark D. Rothb2929602017-09-11 09:31:11 -07002672 if (pick_done) {
Mark D. Rothc8875492018-02-20 08:33:48 -08002673 // Pick completed synchronously.
Craig Tiller6014e8a2017-10-16 13:50:29 -07002674 if (grpc_client_channel_trace.enabled()) {
Mark D. Rothb2929602017-09-11 09:31:11 -07002675 gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed synchronously",
2676 chand, calld);
2677 }
Ken Paysonf069dd42018-02-05 09:15:05 -08002678 GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback");
Mark D. Rothb2929602017-09-11 09:31:11 -07002679 } else {
2680 GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback_cancel");
2681 grpc_call_combiner_set_notify_on_cancel(
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002682 calld->call_combiner,
Mark D. Roth718c8342018-02-28 13:00:04 -08002683 GRPC_CLOSURE_INIT(&calld->pick_cancel_closure,
Mark D. Rothb2929602017-09-11 09:31:11 -07002684 pick_callback_cancel_locked, elem,
2685 grpc_combiner_scheduler(chand->combiner)));
2686 }
2687 return pick_done;
2688}
Mark D. Roth0ca0be82017-06-20 07:49:33 -07002689
Craig Tiller577c9b22015-11-02 14:11:15 -08002690typedef struct {
Craig Tillerbaa14a92017-11-03 09:09:36 -07002691 grpc_call_element* elem;
Mark D. Roth66f3d2b2017-09-01 09:02:17 -07002692 bool finished;
Craig Tiller577c9b22015-11-02 14:11:15 -08002693 grpc_closure closure;
Mark D. Roth66f3d2b2017-09-01 09:02:17 -07002694 grpc_closure cancel_closure;
Mark D. Roth0ca0be82017-06-20 07:49:33 -07002695} pick_after_resolver_result_args;
Craig Tiller577c9b22015-11-02 14:11:15 -08002696
Mark D. Roth76e264b2017-08-25 09:03:33 -07002697// Note: This runs under the client_channel combiner, but will NOT be
2698// holding the call combiner.
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002699static void pick_after_resolver_result_cancel_locked(void* arg,
Craig Tillerbaa14a92017-11-03 09:09:36 -07002700 grpc_error* error) {
Noah Eisen4d20a662018-02-09 09:34:04 -08002701 pick_after_resolver_result_args* args =
2702 static_cast<pick_after_resolver_result_args*>(arg);
Mark D. Roth66f3d2b2017-09-01 09:02:17 -07002703 if (args->finished) {
2704 gpr_free(args);
Mark D. Rothb2b9a0f2017-09-01 09:06:47 -07002705 return;
Mark D. Roth764cf042017-09-01 09:00:06 -07002706 }
Mark D. Roth76e264b2017-08-25 09:03:33 -07002707 // If we don't yet have a resolver result, then a closure for
2708 // pick_after_resolver_result_done_locked() will have been added to
2709 // chand->waiting_for_resolver_result_closures, and it may not be invoked
2710 // until after this call has been destroyed. We mark the operation as
Mark D. Rothb2b9a0f2017-09-01 09:06:47 -07002711 // finished, so that when pick_after_resolver_result_done_locked()
Mark D. Roth76e264b2017-08-25 09:03:33 -07002712 // is called, it will be a no-op. We also immediately invoke
Mark D. Rothb2929602017-09-11 09:31:11 -07002713 // async_pick_done_locked() to propagate the error back to the caller.
2714 args->finished = true;
Craig Tillerbaa14a92017-11-03 09:09:36 -07002715 grpc_call_element* elem = args->elem;
Noah Eisenbe82e642018-02-09 09:16:55 -08002716 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2717 call_data* calld = static_cast<call_data*>(elem->call_data);
Craig Tiller6014e8a2017-10-16 13:50:29 -07002718 if (grpc_client_channel_trace.enabled()) {
Mark D. Rothb2b9a0f2017-09-01 09:06:47 -07002719 gpr_log(GPR_DEBUG,
2720 "chand=%p calld=%p: cancelling pick waiting for resolver result",
2721 chand, calld);
Mark D. Roth76e264b2017-08-25 09:03:33 -07002722 }
Mark D. Rothb2b9a0f2017-09-01 09:06:47 -07002723 // Note: Although we are not in the call combiner here, we are
2724 // basically stealing the call combiner from the pending pick, so
Mark D. Rothb2929602017-09-11 09:31:11 -07002725 // it's safe to call async_pick_done_locked() here -- we are
Mark D. Rothb2b9a0f2017-09-01 09:06:47 -07002726 // essentially calling it here instead of calling it in
2727 // pick_after_resolver_result_done_locked().
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002728 async_pick_done_locked(elem, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2729 "Pick cancelled", &error, 1));
Mark D. Roth76e264b2017-08-25 09:03:33 -07002730}
2731
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002732static void pick_after_resolver_result_done_locked(void* arg,
Craig Tillerbaa14a92017-11-03 09:09:36 -07002733 grpc_error* error) {
Noah Eisen4d20a662018-02-09 09:34:04 -08002734 pick_after_resolver_result_args* args =
2735 static_cast<pick_after_resolver_result_args*>(arg);
Mark D. Roth66f3d2b2017-09-01 09:02:17 -07002736 if (args->finished) {
Craig Tiller577c9b22015-11-02 14:11:15 -08002737 /* cancelled, do nothing */
Craig Tiller6014e8a2017-10-16 13:50:29 -07002738 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth60751fe2017-07-07 12:50:33 -07002739 gpr_log(GPR_DEBUG, "call cancelled before resolver result");
2740 }
Mark D. Roth66f3d2b2017-09-01 09:02:17 -07002741 gpr_free(args);
Mark D. Rothb2b9a0f2017-09-01 09:06:47 -07002742 return;
2743 }
2744 args->finished = true;
Craig Tillerbaa14a92017-11-03 09:09:36 -07002745 grpc_call_element* elem = args->elem;
Noah Eisenbe82e642018-02-09 09:16:55 -08002746 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2747 call_data* calld = static_cast<call_data*>(elem->call_data);
Mark D. Rothb2b9a0f2017-09-01 09:06:47 -07002748 if (error != GRPC_ERROR_NONE) {
Craig Tiller6014e8a2017-10-16 13:50:29 -07002749 if (grpc_client_channel_trace.enabled()) {
Mark D. Rothb2b9a0f2017-09-01 09:06:47 -07002750 gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver failed to return data",
2751 chand, calld);
2752 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002753 async_pick_done_locked(elem, GRPC_ERROR_REF(error));
Craig Tiller4782d922017-11-10 09:53:21 -08002754 } else if (chand->lb_policy != nullptr) {
Craig Tiller6014e8a2017-10-16 13:50:29 -07002755 if (grpc_client_channel_trace.enabled()) {
Mark D. Rothb2b9a0f2017-09-01 09:06:47 -07002756 gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver returned, doing pick",
2757 chand, calld);
2758 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002759 if (pick_callback_start_locked(elem)) {
Mark D. Rothb2929602017-09-11 09:31:11 -07002760 // Even if the LB policy returns a result synchronously, we have
2761 // already added our polling entity to chand->interested_parties
2762 // in order to wait for the resolver result, so we need to
2763 // remove it here. Therefore, we call async_pick_done_locked()
2764 // instead of pick_done_locked().
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002765 async_pick_done_locked(elem, GRPC_ERROR_NONE);
Mark D. Roth9dab7d52016-10-07 07:48:03 -07002766 }
Craig Tiller577c9b22015-11-02 14:11:15 -08002767 }
Mark D. Roth718c8342018-02-28 13:00:04 -08002768 // TODO(roth): It should be impossible for chand->lb_policy to be nullptr
Mark D. Roth48be9de2017-10-23 12:27:37 -07002769 // here, so the rest of this code should never actually be executed.
2770 // However, we have reports of a crash on iOS that triggers this case,
2771 // so we are temporarily adding this to restore branches that were
2772 // removed in https://github.com/grpc/grpc/pull/12297. Need to figure
2773 // out what is actually causing this to occur and then figure out the
2774 // right way to deal with it.
Craig Tiller4782d922017-11-10 09:53:21 -08002775 else if (chand->resolver != nullptr) {
Mark D. Roth48be9de2017-10-23 12:27:37 -07002776 // No LB policy, so try again.
ncteisen72afb762017-11-10 12:23:12 -08002777 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48be9de2017-10-23 12:27:37 -07002778 gpr_log(GPR_DEBUG,
2779 "chand=%p calld=%p: resolver returned but no LB policy, "
2780 "trying again",
2781 chand, calld);
2782 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002783 pick_after_resolver_result_start_locked(elem);
Mark D. Roth48be9de2017-10-23 12:27:37 -07002784 } else {
ncteisen72afb762017-11-10 12:23:12 -08002785 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48be9de2017-10-23 12:27:37 -07002786 gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver disconnected", chand,
2787 calld);
2788 }
2789 async_pick_done_locked(
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002790 elem, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
Mark D. Roth48be9de2017-10-23 12:27:37 -07002791 }
Craig Tiller577c9b22015-11-02 14:11:15 -08002792}
2793
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002794static void pick_after_resolver_result_start_locked(grpc_call_element* elem) {
Noah Eisenbe82e642018-02-09 09:16:55 -08002795 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2796 call_data* calld = static_cast<call_data*>(elem->call_data);
Craig Tiller6014e8a2017-10-16 13:50:29 -07002797 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth60751fe2017-07-07 12:50:33 -07002798 gpr_log(GPR_DEBUG,
2799 "chand=%p calld=%p: deferring pick pending resolver result", chand,
2800 calld);
Mark D. Roth64a317c2017-05-02 08:27:08 -07002801 }
Craig Tillerbaa14a92017-11-03 09:09:36 -07002802 pick_after_resolver_result_args* args =
Noah Eisenbe82e642018-02-09 09:16:55 -08002803 static_cast<pick_after_resolver_result_args*>(gpr_zalloc(sizeof(*args)));
Mark D. Roth60751fe2017-07-07 12:50:33 -07002804 args->elem = elem;
2805 GRPC_CLOSURE_INIT(&args->closure, pick_after_resolver_result_done_locked,
2806 args, grpc_combiner_scheduler(chand->combiner));
2807 grpc_closure_list_append(&chand->waiting_for_resolver_result_closures,
2808 &args->closure, GRPC_ERROR_NONE);
Mark D. Roth76e264b2017-08-25 09:03:33 -07002809 grpc_call_combiner_set_notify_on_cancel(
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002810 calld->call_combiner,
Mark D. Roth66f3d2b2017-09-01 09:02:17 -07002811 GRPC_CLOSURE_INIT(&args->cancel_closure,
2812 pick_after_resolver_result_cancel_locked, args,
Mark D. Roth76e264b2017-08-25 09:03:33 -07002813 grpc_combiner_scheduler(chand->combiner)));
Mark D. Roth60751fe2017-07-07 12:50:33 -07002814}
2815
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002816static void start_pick_locked(void* arg, grpc_error* ignored) {
Noah Eisenbe82e642018-02-09 09:16:55 -08002817 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2818 call_data* calld = static_cast<call_data*>(elem->call_data);
2819 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
David Garcia Quintasbe1b7f92018-01-12 14:01:38 -08002820 GPR_ASSERT(calld->pick.connected_subchannel == nullptr);
Mark D. Roth718c8342018-02-28 13:00:04 -08002821 GPR_ASSERT(calld->subchannel_call == nullptr);
Craig Tiller4782d922017-11-10 09:53:21 -08002822 if (chand->lb_policy != nullptr) {
Mark D. Rothb2929602017-09-11 09:31:11 -07002823 // We already have an LB policy, so ask it for a pick.
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002824 if (pick_callback_start_locked(elem)) {
Mark D. Rothb2929602017-09-11 09:31:11 -07002825 // Pick completed synchronously.
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002826 pick_done_locked(elem, GRPC_ERROR_NONE);
Mark D. Rothb2929602017-09-11 09:31:11 -07002827 return;
Mark D. Roth2a5959f2016-09-01 08:20:27 -07002828 }
Mark D. Roth76e264b2017-08-25 09:03:33 -07002829 } else {
Mark D. Rothb2929602017-09-11 09:31:11 -07002830 // We do not yet have an LB policy, so wait for a resolver result.
Craig Tiller4782d922017-11-10 09:53:21 -08002831 if (chand->resolver == nullptr) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002832 pick_done_locked(elem,
Mark D. Rothb2929602017-09-11 09:31:11 -07002833 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
2834 return;
2835 }
2836 if (!chand->started_resolving) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002837 start_resolving_locked(chand);
Mark D. Rothb2929602017-09-11 09:31:11 -07002838 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002839 pick_after_resolver_result_start_locked(elem);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07002840 }
Mark D. Rothb2929602017-09-11 09:31:11 -07002841 // We need to wait for either a resolver result or for an async result
2842 // from the LB policy. Add the polling entity from call_data to the
2843 // channel_data's interested_parties, so that the I/O of the LB policy
2844 // and resolver can be done under it. The polling entity will be
2845 // removed in async_pick_done_locked().
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002846 grpc_polling_entity_add_to_pollset_set(calld->pollent,
Mark D. Rothb2929602017-09-11 09:31:11 -07002847 chand->interested_parties);
Craig Tillera11bfc82017-02-14 09:56:33 -08002848}
2849
Mark D. Roth718c8342018-02-28 13:00:04 -08002850//
2851// filter call vtable functions
2852//
Mark D. Rothd6d192d2017-02-23 08:58:42 -08002853
Craig Tillere1b51da2017-03-31 15:44:33 -07002854static void cc_start_transport_stream_op_batch(
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002855 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
yang-gce1cfea2018-01-31 15:59:50 -08002856 GPR_TIMER_SCOPE("cc_start_transport_stream_op_batch", 0);
Noah Eisenbe82e642018-02-09 09:16:55 -08002857 call_data* calld = static_cast<call_data*>(elem->call_data);
2858 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Craig Tiller3be7dd02017-04-03 14:30:03 -07002859 if (chand->deadline_checking_enabled) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002860 grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch);
Craig Tiller3be7dd02017-04-03 14:30:03 -07002861 }
Mark D. Roth76e264b2017-08-25 09:03:33 -07002862 // If we've previously been cancelled, immediately fail any new batches.
Mark D. Roth718c8342018-02-28 13:00:04 -08002863 if (calld->cancel_error != GRPC_ERROR_NONE) {
Craig Tiller6014e8a2017-10-16 13:50:29 -07002864 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth76e264b2017-08-25 09:03:33 -07002865 gpr_log(GPR_DEBUG, "chand=%p calld=%p: failing batch with error: %s",
Mark D. Roth718c8342018-02-28 13:00:04 -08002866 chand, calld, grpc_error_string(calld->cancel_error));
Mark D. Roth76e264b2017-08-25 09:03:33 -07002867 }
Mark D. Roth718c8342018-02-28 13:00:04 -08002868 // Note: This will release the call combiner.
Mark D. Roth76e264b2017-08-25 09:03:33 -07002869 grpc_transport_stream_op_batch_finish_with_failure(
Mark D. Roth718c8342018-02-28 13:00:04 -08002870 batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner);
yang-gce1cfea2018-01-31 15:59:50 -08002871 return;
Mark D. Roth76e264b2017-08-25 09:03:33 -07002872 }
Mark D. Roth718c8342018-02-28 13:00:04 -08002873 // Handle cancellation.
Mark D. Roth76e264b2017-08-25 09:03:33 -07002874 if (batch->cancel_stream) {
2875 // Stash a copy of cancel_error in our call data, so that we can use
2876 // it for subsequent operations. This ensures that if the call is
2877 // cancelled before any batches are passed down (e.g., if the deadline
2878 // is in the past when the call starts), we can return the right
2879 // error to the caller when the first batch does get passed down.
Mark D. Roth718c8342018-02-28 13:00:04 -08002880 GRPC_ERROR_UNREF(calld->cancel_error);
2881 calld->cancel_error =
2882 GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
Craig Tiller6014e8a2017-10-16 13:50:29 -07002883 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth76e264b2017-08-25 09:03:33 -07002884 gpr_log(GPR_DEBUG, "chand=%p calld=%p: recording cancel_error=%s", chand,
Mark D. Roth718c8342018-02-28 13:00:04 -08002885 calld, grpc_error_string(calld->cancel_error));
Mark D. Roth76e264b2017-08-25 09:03:33 -07002886 }
Mark D. Roth718c8342018-02-28 13:00:04 -08002887 // If we do not have a subchannel call (i.e., a pick has not yet
2888 // been started), fail all pending batches. Otherwise, send the
2889 // cancellation down to the subchannel call.
2890 if (calld->subchannel_call == nullptr) {
2891 pending_batches_fail(elem, GRPC_ERROR_REF(calld->cancel_error),
2892 false /* yield_call_combiner */);
2893 // Note: This will release the call combiner.
2894 grpc_transport_stream_op_batch_finish_with_failure(
2895 batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner);
Mark D. Roth76e264b2017-08-25 09:03:33 -07002896 } else {
Mark D. Roth718c8342018-02-28 13:00:04 -08002897 // Note: This will release the call combiner.
2898 grpc_subchannel_call_process_op(calld->subchannel_call, batch);
Mark D. Roth76e264b2017-08-25 09:03:33 -07002899 }
yang-gce1cfea2018-01-31 15:59:50 -08002900 return;
Mark D. Roth76e264b2017-08-25 09:03:33 -07002901 }
Mark D. Roth718c8342018-02-28 13:00:04 -08002902 // Add the batch to the pending list.
2903 pending_batches_add(elem, batch);
Mark D. Roth76e264b2017-08-25 09:03:33 -07002904 // Check if we've already gotten a subchannel call.
2905 // Note that once we have completed the pick, we do not need to enter
2906 // the channel combiner, which is more efficient (especially for
2907 // streaming calls).
Craig Tiller4782d922017-11-10 09:53:21 -08002908 if (calld->subchannel_call != nullptr) {
Craig Tiller6014e8a2017-10-16 13:50:29 -07002909 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth60751fe2017-07-07 12:50:33 -07002910 gpr_log(GPR_DEBUG,
Mark D. Roth718c8342018-02-28 13:00:04 -08002911 "chand=%p calld=%p: starting batch on subchannel_call=%p", chand,
Mark D. Roth76e264b2017-08-25 09:03:33 -07002912 calld, calld->subchannel_call);
Mark D. Roth60751fe2017-07-07 12:50:33 -07002913 }
Mark D. Roth718c8342018-02-28 13:00:04 -08002914 pending_batches_resume(elem);
yang-gce1cfea2018-01-31 15:59:50 -08002915 return;
Mark D. Roth2a5959f2016-09-01 08:20:27 -07002916 }
Mark D. Roth76e264b2017-08-25 09:03:33 -07002917 // We do not yet have a subchannel call.
Mark D. Roth76e264b2017-08-25 09:03:33 -07002918 // For batches containing a send_initial_metadata op, enter the channel
2919 // combiner to start a pick.
2920 if (batch->send_initial_metadata) {
Craig Tiller6014e8a2017-10-16 13:50:29 -07002921 if (grpc_client_channel_trace.enabled()) {
Mark D. Rothb2929602017-09-11 09:31:11 -07002922 gpr_log(GPR_DEBUG, "chand=%p calld=%p: entering client_channel combiner",
2923 chand, calld);
Mark D. Roth76e264b2017-08-25 09:03:33 -07002924 }
2925 GRPC_CLOSURE_SCHED(
Mark D. Roth76e264b2017-08-25 09:03:33 -07002926 GRPC_CLOSURE_INIT(&batch->handler_private.closure, start_pick_locked,
2927 elem, grpc_combiner_scheduler(chand->combiner)),
2928 GRPC_ERROR_NONE);
2929 } else {
2930 // For all other batches, release the call combiner.
Craig Tiller6014e8a2017-10-16 13:50:29 -07002931 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth76e264b2017-08-25 09:03:33 -07002932 gpr_log(GPR_DEBUG,
2933 "chand=%p calld=%p: saved batch, yeilding call combiner", chand,
2934 calld);
2935 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002936 GRPC_CALL_COMBINER_STOP(calld->call_combiner,
Mark D. Roth76e264b2017-08-25 09:03:33 -07002937 "batch does not include send_initial_metadata");
Mark D. Roth60751fe2017-07-07 12:50:33 -07002938 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -07002939}
2940
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08002941/* Constructor for call_data */
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002942static grpc_error* cc_init_call_elem(grpc_call_element* elem,
Craig Tillerbaa14a92017-11-03 09:09:36 -07002943 const grpc_call_element_args* args) {
Noah Eisenbe82e642018-02-09 09:16:55 -08002944 call_data* calld = static_cast<call_data*>(elem->call_data);
2945 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Mark D. Rothe40dd292016-10-05 14:58:37 -07002946 // Initialize data members.
Craig Tiller7c70b6c2017-01-23 07:48:42 -08002947 calld->path = grpc_slice_ref_internal(args->path);
Mark D. Rothff08f332016-10-14 13:01:01 -07002948 calld->call_start_time = args->start_time;
Craig Tiller89c14282017-07-19 15:32:27 -07002949 calld->deadline = args->deadline;
Craig Tillerd426cac2017-03-13 12:30:45 -07002950 calld->arena = args->arena;
Mark D. Roth66f3d2b2017-09-01 09:02:17 -07002951 calld->owning_call = args->call_stack;
Mark D. Roth76e264b2017-08-25 09:03:33 -07002952 calld->call_combiner = args->call_combiner;
Craig Tiller3be7dd02017-04-03 14:30:03 -07002953 if (chand->deadline_checking_enabled) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002954 grpc_deadline_state_init(elem, args->call_stack, args->call_combiner,
2955 calld->deadline);
Craig Tiller3be7dd02017-04-03 14:30:03 -07002956 }
Mark D. Roth718c8342018-02-28 13:00:04 -08002957 calld->enable_retries = chand->enable_retries;
Mark D. Roth0badbe82016-06-23 10:15:12 -07002958 return GRPC_ERROR_NONE;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08002959}
2960
2961/* Destructor for call_data */
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002962static void cc_destroy_call_elem(grpc_call_element* elem,
Craig Tillerbaa14a92017-11-03 09:09:36 -07002963 const grpc_call_final_info* final_info,
2964 grpc_closure* then_schedule_closure) {
Noah Eisenbe82e642018-02-09 09:16:55 -08002965 call_data* calld = static_cast<call_data*>(elem->call_data);
2966 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Craig Tiller3be7dd02017-04-03 14:30:03 -07002967 if (chand->deadline_checking_enabled) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002968 grpc_deadline_state_destroy(elem);
Craig Tiller3be7dd02017-04-03 14:30:03 -07002969 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002970 grpc_slice_unref_internal(calld->path);
Mark D. Roth3e7f2df2018-02-26 13:17:06 -08002971 calld->method_params.reset();
Mark D. Roth718c8342018-02-28 13:00:04 -08002972 GRPC_ERROR_UNREF(calld->cancel_error);
Craig Tiller4782d922017-11-10 09:53:21 -08002973 if (calld->subchannel_call != nullptr) {
Mark D. Roth76e264b2017-08-25 09:03:33 -07002974 grpc_subchannel_call_set_cleanup_closure(calld->subchannel_call,
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07002975 then_schedule_closure);
Craig Tiller4782d922017-11-10 09:53:21 -08002976 then_schedule_closure = nullptr;
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002977 GRPC_SUBCHANNEL_CALL_UNREF(calld->subchannel_call,
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07002978 "client_channel_destroy_call");
Mark D. Roth4c0fe492016-08-31 13:51:55 -07002979 }
Mark D. Roth718c8342018-02-28 13:00:04 -08002980 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
2981 GPR_ASSERT(calld->pending_batches[i].batch == nullptr);
2982 }
David Garcia Quintasbe1b7f92018-01-12 14:01:38 -08002983 if (calld->pick.connected_subchannel != nullptr) {
David Garcia Quintasdfa28512018-01-11 18:31:13 -08002984 calld->pick.connected_subchannel.reset();
Craig Tiller693d3942016-10-27 16:51:25 -07002985 }
Mark D. Roth09e458c2017-05-02 08:13:26 -07002986 for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) {
Mark D. Rothc0febd32018-01-09 10:25:24 -08002987 if (calld->pick.subchannel_call_context[i].value != nullptr) {
2988 calld->pick.subchannel_call_context[i].destroy(
2989 calld->pick.subchannel_call_context[i].value);
Mark D. Roth09e458c2017-05-02 08:13:26 -07002990 }
2991 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002992 GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08002993}
2994
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002995static void cc_set_pollset_or_pollset_set(grpc_call_element* elem,
Craig Tillerbaa14a92017-11-03 09:09:36 -07002996 grpc_polling_entity* pollent) {
Noah Eisenbe82e642018-02-09 09:16:55 -08002997 call_data* calld = static_cast<call_data*>(elem->call_data);
David Garcia Quintas2a50dfe2016-05-31 15:09:12 -07002998 calld->pollent = pollent;
Craig Tiller577c9b22015-11-02 14:11:15 -08002999}
3000
Mark D. Roth2a5959f2016-09-01 08:20:27 -07003001/*************************************************************************
3002 * EXPORTED SYMBOLS
3003 */
3004
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08003005const grpc_channel_filter grpc_client_channel_filter = {
Craig Tillera0f3abd2017-03-31 15:42:16 -07003006 cc_start_transport_stream_op_batch,
Craig Tillerf40df232016-03-25 13:38:14 -07003007 cc_start_transport_op,
3008 sizeof(call_data),
Mark D. Roth2a5959f2016-09-01 08:20:27 -07003009 cc_init_call_elem,
David Garcia Quintas4afce7e2016-04-18 16:25:17 -07003010 cc_set_pollset_or_pollset_set,
Mark D. Roth2a5959f2016-09-01 08:20:27 -07003011 cc_destroy_call_elem,
Craig Tillerf40df232016-03-25 13:38:14 -07003012 sizeof(channel_data),
Mark D. Roth2a5959f2016-09-01 08:20:27 -07003013 cc_init_channel_elem,
3014 cc_destroy_channel_elem,
Mark D. Rothb2d24882016-10-27 15:44:07 -07003015 cc_get_channel_info,
Craig Tillerf40df232016-03-25 13:38:14 -07003016 "client-channel",
Craig Tiller87d5b192015-04-16 14:37:57 -07003017};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08003018
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003019static void try_to_connect_locked(void* arg, grpc_error* error_ignored) {
Noah Eisenbe82e642018-02-09 09:16:55 -08003020 channel_data* chand = static_cast<channel_data*>(arg);
Craig Tiller4782d922017-11-10 09:53:21 -08003021 if (chand->lb_policy != nullptr) {
Mark D. Rothc8875492018-02-20 08:33:48 -08003022 chand->lb_policy->ExitIdleLocked();
Craig Tiller613dafa2017-02-09 12:00:43 -08003023 } else {
3024 chand->exit_idle_when_lb_policy_arrives = true;
Craig Tiller4782d922017-11-10 09:53:21 -08003025 if (!chand->started_resolving && chand->resolver != nullptr) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003026 start_resolving_locked(chand);
Craig Tiller613dafa2017-02-09 12:00:43 -08003027 }
3028 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003029 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "try_to_connect");
Craig Tiller613dafa2017-02-09 12:00:43 -08003030}
3031
Craig Tillera82950e2015-09-22 12:33:20 -07003032grpc_connectivity_state grpc_client_channel_check_connectivity_state(
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003033 grpc_channel_element* elem, int try_to_connect) {
Noah Eisenbe82e642018-02-09 09:16:55 -08003034 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Craig Tillera8610c02017-02-14 10:05:11 -08003035 grpc_connectivity_state out =
3036 grpc_connectivity_state_check(&chand->state_tracker);
Craig Tillera82950e2015-09-22 12:33:20 -07003037 if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
Craig Tillerd2e5cfc2017-02-09 13:02:20 -08003038 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect");
ncteisen274bbbe2017-06-08 14:57:11 -07003039 GRPC_CLOSURE_SCHED(
Yash Tibrewal0ee75742017-10-13 16:07:13 -07003040 GRPC_CLOSURE_CREATE(try_to_connect_locked, chand,
3041 grpc_combiner_scheduler(chand->combiner)),
Craig Tiller613dafa2017-02-09 12:00:43 -08003042 GRPC_ERROR_NONE);
Craig Tillera82950e2015-09-22 12:33:20 -07003043 }
Craig Tiller48cb07c2015-07-15 16:16:15 -07003044 return out;
3045}
3046
Alexander Polcync3b1f182017-04-18 13:51:36 -07003047typedef struct external_connectivity_watcher {
Craig Tillerbaa14a92017-11-03 09:09:36 -07003048 channel_data* chand;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07003049 grpc_polling_entity pollent;
Craig Tillerbaa14a92017-11-03 09:09:36 -07003050 grpc_closure* on_complete;
3051 grpc_closure* watcher_timer_init;
3052 grpc_connectivity_state* state;
Craig Tiller86c99582015-11-25 15:22:26 -08003053 grpc_closure my_closure;
Craig Tillerbaa14a92017-11-03 09:09:36 -07003054 struct external_connectivity_watcher* next;
Craig Tiller86c99582015-11-25 15:22:26 -08003055} external_connectivity_watcher;
3056
Craig Tillerbaa14a92017-11-03 09:09:36 -07003057static external_connectivity_watcher* lookup_external_connectivity_watcher(
3058 channel_data* chand, grpc_closure* on_complete) {
Alexander Polcync3b1f182017-04-18 13:51:36 -07003059 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
Craig Tillerbaa14a92017-11-03 09:09:36 -07003060 external_connectivity_watcher* w =
Alexander Polcync3b1f182017-04-18 13:51:36 -07003061 chand->external_connectivity_watcher_list_head;
Craig Tiller4782d922017-11-10 09:53:21 -08003062 while (w != nullptr && w->on_complete != on_complete) {
Alexander Polcync3b1f182017-04-18 13:51:36 -07003063 w = w->next;
3064 }
3065 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
3066 return w;
3067}
3068
3069static void external_connectivity_watcher_list_append(
Craig Tillerbaa14a92017-11-03 09:09:36 -07003070 channel_data* chand, external_connectivity_watcher* w) {
Alexander Polcync3b1f182017-04-18 13:51:36 -07003071 GPR_ASSERT(!lookup_external_connectivity_watcher(chand, w->on_complete));
3072
3073 gpr_mu_lock(&w->chand->external_connectivity_watcher_list_mu);
3074 GPR_ASSERT(!w->next);
3075 w->next = chand->external_connectivity_watcher_list_head;
3076 chand->external_connectivity_watcher_list_head = w;
3077 gpr_mu_unlock(&w->chand->external_connectivity_watcher_list_mu);
3078}
3079
3080static void external_connectivity_watcher_list_remove(
Craig Tillerbaa14a92017-11-03 09:09:36 -07003081 channel_data* chand, external_connectivity_watcher* too_remove) {
Alexander Polcync3b1f182017-04-18 13:51:36 -07003082 GPR_ASSERT(
3083 lookup_external_connectivity_watcher(chand, too_remove->on_complete));
3084 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
3085 if (too_remove == chand->external_connectivity_watcher_list_head) {
3086 chand->external_connectivity_watcher_list_head = too_remove->next;
3087 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
3088 return;
3089 }
Craig Tillerbaa14a92017-11-03 09:09:36 -07003090 external_connectivity_watcher* w =
Alexander Polcync3b1f182017-04-18 13:51:36 -07003091 chand->external_connectivity_watcher_list_head;
Craig Tiller4782d922017-11-10 09:53:21 -08003092 while (w != nullptr) {
Alexander Polcync3b1f182017-04-18 13:51:36 -07003093 if (w->next == too_remove) {
3094 w->next = w->next->next;
3095 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
3096 return;
3097 }
3098 w = w->next;
3099 }
3100 GPR_UNREACHABLE_CODE(return );
3101}
3102
3103int grpc_client_channel_num_external_connectivity_watchers(
Craig Tillerbaa14a92017-11-03 09:09:36 -07003104 grpc_channel_element* elem) {
Noah Eisenbe82e642018-02-09 09:16:55 -08003105 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Alexander Polcync3b1f182017-04-18 13:51:36 -07003106 int count = 0;
3107
3108 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
Craig Tillerbaa14a92017-11-03 09:09:36 -07003109 external_connectivity_watcher* w =
Alexander Polcync3b1f182017-04-18 13:51:36 -07003110 chand->external_connectivity_watcher_list_head;
Craig Tiller4782d922017-11-10 09:53:21 -08003111 while (w != nullptr) {
Alexander Polcync3b1f182017-04-18 13:51:36 -07003112 count++;
3113 w = w->next;
3114 }
3115 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
3116
3117 return count;
3118}
3119
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003120static void on_external_watch_complete_locked(void* arg, grpc_error* error) {
Noah Eisen4d20a662018-02-09 09:34:04 -08003121 external_connectivity_watcher* w =
3122 static_cast<external_connectivity_watcher*>(arg);
Craig Tillerbaa14a92017-11-03 09:09:36 -07003123 grpc_closure* follow_up = w->on_complete;
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003124 grpc_polling_entity_del_from_pollset_set(&w->pollent,
David Garcia Quintas87d5a312017-06-06 19:45:58 -07003125 w->chand->interested_parties);
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003126 GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack,
Craig Tiller1d881fb2015-12-01 07:39:04 -08003127 "external_connectivity_watcher");
Alexander Polcync3b1f182017-04-18 13:51:36 -07003128 external_connectivity_watcher_list_remove(w->chand, w);
Craig Tiller86c99582015-11-25 15:22:26 -08003129 gpr_free(w);
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003130 GRPC_CLOSURE_RUN(follow_up, GRPC_ERROR_REF(error));
Craig Tiller613dafa2017-02-09 12:00:43 -08003131}
3132
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003133static void watch_connectivity_state_locked(void* arg,
Craig Tillerbaa14a92017-11-03 09:09:36 -07003134 grpc_error* error_ignored) {
Noah Eisen4d20a662018-02-09 09:34:04 -08003135 external_connectivity_watcher* w =
3136 static_cast<external_connectivity_watcher*>(arg);
Craig Tiller4782d922017-11-10 09:53:21 -08003137 external_connectivity_watcher* found = nullptr;
3138 if (w->state != nullptr) {
Alexander Polcync3b1f182017-04-18 13:51:36 -07003139 external_connectivity_watcher_list_append(w->chand, w);
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003140 GRPC_CLOSURE_RUN(w->watcher_timer_init, GRPC_ERROR_NONE);
Alexander Polcyn2004e392017-10-16 15:14:46 -07003141 GRPC_CLOSURE_INIT(&w->my_closure, on_external_watch_complete_locked, w,
3142 grpc_combiner_scheduler(w->chand->combiner));
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003143 grpc_connectivity_state_notify_on_state_change(&w->chand->state_tracker,
3144 w->state, &w->my_closure);
Alexander Polcync3b1f182017-04-18 13:51:36 -07003145 } else {
Craig Tiller4782d922017-11-10 09:53:21 -08003146 GPR_ASSERT(w->watcher_timer_init == nullptr);
Alexander Polcync3b1f182017-04-18 13:51:36 -07003147 found = lookup_external_connectivity_watcher(w->chand, w->on_complete);
3148 if (found) {
3149 GPR_ASSERT(found->on_complete == w->on_complete);
3150 grpc_connectivity_state_notify_on_state_change(
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003151 &found->chand->state_tracker, nullptr, &found->my_closure);
Alexander Polcync3b1f182017-04-18 13:51:36 -07003152 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003153 grpc_polling_entity_del_from_pollset_set(&w->pollent,
David Garcia Quintas87d5a312017-06-06 19:45:58 -07003154 w->chand->interested_parties);
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003155 GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack,
Alexander Polcync3b1f182017-04-18 13:51:36 -07003156 "external_connectivity_watcher");
3157 gpr_free(w);
3158 }
Craig Tiller86c99582015-11-25 15:22:26 -08003159}
3160
Craig Tillera82950e2015-09-22 12:33:20 -07003161void grpc_client_channel_watch_connectivity_state(
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003162 grpc_channel_element* elem, grpc_polling_entity pollent,
3163 grpc_connectivity_state* state, grpc_closure* closure,
3164 grpc_closure* watcher_timer_init) {
Noah Eisenbe82e642018-02-09 09:16:55 -08003165 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Craig Tillerbaa14a92017-11-03 09:09:36 -07003166 external_connectivity_watcher* w =
Noah Eisenbe82e642018-02-09 09:16:55 -08003167 static_cast<external_connectivity_watcher*>(gpr_zalloc(sizeof(*w)));
Craig Tiller86c99582015-11-25 15:22:26 -08003168 w->chand = chand;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07003169 w->pollent = pollent;
Mark D. Roth92210832017-05-02 15:04:39 -07003170 w->on_complete = closure;
Craig Tiller613dafa2017-02-09 12:00:43 -08003171 w->state = state;
Alexander Polcync3b1f182017-04-18 13:51:36 -07003172 w->watcher_timer_init = watcher_timer_init;
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003173 grpc_polling_entity_add_to_pollset_set(&w->pollent,
David Garcia Quintas87d5a312017-06-06 19:45:58 -07003174 chand->interested_parties);
Craig Tiller1d881fb2015-12-01 07:39:04 -08003175 GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
3176 "external_connectivity_watcher");
ncteisen274bbbe2017-06-08 14:57:11 -07003177 GRPC_CLOSURE_SCHED(
ncteisen274bbbe2017-06-08 14:57:11 -07003178 GRPC_CLOSURE_INIT(&w->my_closure, watch_connectivity_state_locked, w,
Craig Tilleree4b1452017-05-12 10:56:03 -07003179 grpc_combiner_scheduler(chand->combiner)),
Craig Tiller613dafa2017-02-09 12:00:43 -08003180 GRPC_ERROR_NONE);
Craig Tiller48cb07c2015-07-15 16:16:15 -07003181}
Mark D. Roth718c8342018-02-28 13:00:04 -08003182
3183grpc_subchannel_call* grpc_client_channel_get_subchannel_call(
3184 grpc_call_element* elem) {
3185 call_data* calld = static_cast<call_data*>(elem->call_data);
3186 return calld->subchannel_call;
3187}