blob: 98c9e599c60b82c16f90424ff3fa84defbb9ead3 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2015 gRPC authors.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080016 *
17 */
18
Yash Tibrewal37fdb732017-09-25 16:45:02 -070019#include <grpc/support/port_platform.h>
20
Craig Tiller9eb0fde2017-03-31 16:59:30 -070021#include "src/core/ext/filters/client_channel/client_channel.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080022
Yash Tibrewalfcd26bc2017-09-25 15:08:28 -070023#include <inttypes.h>
Mark D. Roth718c8342018-02-28 13:00:04 -080024#include <limits.h>
Mark D. Roth4c0fe492016-08-31 13:51:55 -070025#include <stdbool.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080026#include <stdio.h>
Craig Tillereb3b12e2015-06-26 14:42:49 -070027#include <string.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080028
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080029#include <grpc/support/alloc.h>
30#include <grpc/support/log.h>
Mark D. Rothb2d24882016-10-27 15:44:07 -070031#include <grpc/support/string_util.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080032#include <grpc/support/sync.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080033
Yuchen Zeng0bad30a2017-10-05 21:47:39 -070034#include "src/core/ext/filters/client_channel/backup_poller.h"
Craig Tiller9eb0fde2017-03-31 16:59:30 -070035#include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
36#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
Mark D. Roth3e7f2df2018-02-26 13:17:06 -080037#include "src/core/ext/filters/client_channel/method_params.h"
Craig Tiller9eb0fde2017-03-31 16:59:30 -070038#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
39#include "src/core/ext/filters/client_channel/resolver_registry.h"
40#include "src/core/ext/filters/client_channel/retry_throttle.h"
41#include "src/core/ext/filters/client_channel/subchannel.h"
Craig Tiller3be7dd02017-04-03 14:30:03 -070042#include "src/core/ext/filters/deadline/deadline_filter.h"
Mark D. Roth718c8342018-02-28 13:00:04 -080043#include "src/core/lib/backoff/backoff.h"
Craig Tiller9533d042016-03-25 17:11:06 -070044#include "src/core/lib/channel/channel_args.h"
45#include "src/core/lib/channel/connected_channel.h"
ncteisen3b42f832018-03-19 13:22:35 -070046#include "src/core/lib/channel/status_util.h"
Mark D. Rothdbdf4952018-01-18 11:21:12 -080047#include "src/core/lib/gpr/string.h"
Mark D. Roth718c8342018-02-28 13:00:04 -080048#include "src/core/lib/gprpp/inlined_vector.h"
49#include "src/core/lib/gprpp/manual_constructor.h"
Craig Tillerbefafe62017-02-09 11:30:54 -080050#include "src/core/lib/iomgr/combiner.h"
Craig Tiller9533d042016-03-25 17:11:06 -070051#include "src/core/lib/iomgr/iomgr.h"
Mark D. Roth4c0fe492016-08-31 13:51:55 -070052#include "src/core/lib/iomgr/polling_entity.h"
Craig Tiller9533d042016-03-25 17:11:06 -070053#include "src/core/lib/profiling/timers.h"
Craig Tiller7c70b6c2017-01-23 07:48:42 -080054#include "src/core/lib/slice/slice_internal.h"
Mark D. Roth718c8342018-02-28 13:00:04 -080055#include "src/core/lib/slice/slice_string_helpers.h"
Craig Tiller9533d042016-03-25 17:11:06 -070056#include "src/core/lib/surface/channel.h"
57#include "src/core/lib/transport/connectivity_state.h"
Mark D. Roth718c8342018-02-28 13:00:04 -080058#include "src/core/lib/transport/error_utils.h"
Mark D. Roth9fe284e2016-09-12 11:22:27 -070059#include "src/core/lib/transport/metadata.h"
60#include "src/core/lib/transport/metadata_batch.h"
Mark D. Rothea846a02016-11-03 11:32:54 -070061#include "src/core/lib/transport/service_config.h"
Mark D. Roth9fe284e2016-09-12 11:22:27 -070062#include "src/core/lib/transport/static_metadata.h"
Mark D. Roth718c8342018-02-28 13:00:04 -080063#include "src/core/lib/transport/status_metadata.h"
Craig Tiller8910ac62015-10-08 16:49:15 -070064
Mark D. Roth3e7f2df2018-02-26 13:17:06 -080065using grpc_core::internal::ClientChannelMethodParams;
66
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080067/* Client channel implementation */
68
Mark D. Roth718c8342018-02-28 13:00:04 -080069// By default, we buffer 256 KiB per RPC for retries.
70// TODO(roth): Do we have any data to suggest a better value?
71#define DEFAULT_PER_RPC_RETRY_BUFFER_SIZE (256 << 10)
72
73// This value was picked arbitrarily. It can be changed if there is
74// any even moderately compelling reason to do so.
75#define RETRY_BACKOFF_JITTER 0.2
76
Craig Tiller694580f2017-10-18 14:48:14 -070077grpc_core::TraceFlag grpc_client_channel_trace(false, "client_channel");
Mark D. Roth60751fe2017-07-07 12:50:33 -070078
Mark D. Roth26b7be42016-10-24 10:08:07 -070079/*************************************************************************
Mark D. Roth3e7f2df2018-02-26 13:17:06 -080080 * CHANNEL-WIDE FUNCTIONS
Mark D. Roth26b7be42016-10-24 10:08:07 -070081 */
82
Alexander Polcync3b1f182017-04-18 13:51:36 -070083struct external_connectivity_watcher;
84
Mark D. Roth3e7f2df2018-02-26 13:17:06 -080085typedef grpc_core::SliceHashTable<
86 grpc_core::RefCountedPtr<ClientChannelMethodParams>>
87 MethodParamsTable;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080088
Craig Tiller800dacb2015-10-06 09:10:26 -070089typedef struct client_channel_channel_data {
Mark D. Roth209f6442018-02-08 10:26:46 -080090 grpc_core::OrphanablePtr<grpc_core::Resolver> resolver;
Mark D. Roth4c0fe492016-08-31 13:51:55 -070091 bool started_resolving;
Craig Tiller3be7dd02017-04-03 14:30:03 -070092 bool deadline_checking_enabled;
Craig Tillerbaa14a92017-11-03 09:09:36 -070093 grpc_client_channel_factory* client_channel_factory;
Mark D. Roth718c8342018-02-28 13:00:04 -080094 bool enable_retries;
95 size_t per_rpc_retry_buffer_size;
Craig Tillerf5f17122015-06-25 08:47:26 -070096
Craig Tillerbefafe62017-02-09 11:30:54 -080097 /** combiner protecting all variables below in this data structure */
Craig Tillerbaa14a92017-11-03 09:09:36 -070098 grpc_combiner* combiner;
Mark D. Roth046cf762016-09-26 11:13:51 -070099 /** currently active load balancer */
Mark D. Rothc8875492018-02-20 08:33:48 -0800100 grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> lb_policy;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800101 /** retry throttle data */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700102 grpc_server_retry_throttle_data* retry_throttle_data;
Mark D. Roth9d480942016-10-19 14:18:05 -0700103 /** maps method names to method_parameters structs */
Mark D. Roth3e7f2df2018-02-26 13:17:06 -0800104 grpc_core::RefCountedPtr<MethodParamsTable> method_params_table;
Mark D. Roth046cf762016-09-26 11:13:51 -0700105 /** incoming resolver result - set by resolver.next() */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700106 grpc_channel_args* resolver_result;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700107 /** a list of closures that are all waiting for resolver result to come in */
108 grpc_closure_list waiting_for_resolver_result_closures;
Craig Tiller3f475422015-06-25 10:43:05 -0700109 /** resolver callback */
Mark D. Rothff4df062016-08-22 15:02:49 -0700110 grpc_closure on_resolver_result_changed;
Craig Tiller3f475422015-06-25 10:43:05 -0700111 /** connectivity state being tracked */
Craig Tillerca3e9d32015-06-27 18:37:27 -0700112 grpc_connectivity_state_tracker state_tracker;
Craig Tiller48cb07c2015-07-15 16:16:15 -0700113 /** when an lb_policy arrives, should we try to exit idle */
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700114 bool exit_idle_when_lb_policy_arrives;
Craig Tiller906e3bc2015-11-24 07:31:31 -0800115 /** owning stack */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700116 grpc_channel_stack* owning_stack;
Craig Tiller69b093b2016-02-25 19:04:07 -0800117 /** interested parties (owned) */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700118 grpc_pollset_set* interested_parties;
Craig Tiller613dafa2017-02-09 12:00:43 -0800119
Alexander Polcync3b1f182017-04-18 13:51:36 -0700120 /* external_connectivity_watcher_list head is guarded by its own mutex, since
121 * counts need to be grabbed immediately without polling on a cq */
122 gpr_mu external_connectivity_watcher_list_mu;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700123 struct external_connectivity_watcher* external_connectivity_watcher_list_head;
Alexander Polcync3b1f182017-04-18 13:51:36 -0700124
Mark D. Roth718c8342018-02-28 13:00:04 -0800125 /* the following properties are guarded by a mutex since APIs require them
Craig Tiller46dd7902017-02-23 09:42:16 -0800126 to be instantaneously available */
Craig Tiller613dafa2017-02-09 12:00:43 -0800127 gpr_mu info_mu;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700128 char* info_lb_policy_name;
Craig Tiller613dafa2017-02-09 12:00:43 -0800129 /** service config in JSON form */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700130 char* info_service_config_json;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800131} channel_data;
132
Juanli Shen592cf342017-12-04 20:52:01 -0800133typedef struct {
134 channel_data* chand;
135 /** used as an identifier, don't dereference it because the LB policy may be
136 * non-existing when the callback is run */
Mark D. Rothc8875492018-02-20 08:33:48 -0800137 grpc_core::LoadBalancingPolicy* lb_policy;
Juanli Shen592cf342017-12-04 20:52:01 -0800138 grpc_closure closure;
139} reresolution_request_args;
140
Craig Tillerd6c98df2015-08-18 09:33:44 -0700141/** We create one watcher for each new lb_policy that is returned from a
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700142 resolver, to watch for state changes from the lb_policy. When a state
143 change is seen, we update the channel, and create a new watcher. */
Craig Tillera82950e2015-09-22 12:33:20 -0700144typedef struct {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700145 channel_data* chand;
Craig Tiller33825112015-09-18 07:44:19 -0700146 grpc_closure on_changed;
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700147 grpc_connectivity_state state;
Mark D. Rothc8875492018-02-20 08:33:48 -0800148 grpc_core::LoadBalancingPolicy* lb_policy;
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700149} lb_policy_connectivity_watcher;
150
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800151static void watch_lb_policy_locked(channel_data* chand,
Mark D. Rothc8875492018-02-20 08:33:48 -0800152 grpc_core::LoadBalancingPolicy* lb_policy,
Craig Tiller2400bf52017-02-09 16:25:19 -0800153 grpc_connectivity_state current_state);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700154
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800155static void set_channel_connectivity_state_locked(channel_data* chand,
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800156 grpc_connectivity_state state,
Craig Tillerbaa14a92017-11-03 09:09:36 -0700157 grpc_error* error,
158 const char* reason) {
David Garcia Quintas37251282017-04-14 13:46:03 -0700159 /* TODO: Improve failure handling:
160 * - Make it possible for policies to return GRPC_CHANNEL_TRANSIENT_FAILURE.
161 * - Hand over pending picks from old policies during the switch that happens
162 * when resolver provides an update. */
Craig Tiller4782d922017-11-10 09:53:21 -0800163 if (chand->lb_policy != nullptr) {
David Garcia Quintas956f7002017-04-13 15:40:06 -0700164 if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
165 /* cancel picks with wait_for_ready=false */
Mark D. Rothc8875492018-02-20 08:33:48 -0800166 chand->lb_policy->CancelMatchingPicksLocked(
David Garcia Quintas956f7002017-04-13 15:40:06 -0700167 /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY,
168 /* check= */ 0, GRPC_ERROR_REF(error));
169 } else if (state == GRPC_CHANNEL_SHUTDOWN) {
170 /* cancel all picks */
Mark D. Rothc8875492018-02-20 08:33:48 -0800171 chand->lb_policy->CancelMatchingPicksLocked(/* mask= */ 0, /* check= */ 0,
172 GRPC_ERROR_REF(error));
David Garcia Quintas956f7002017-04-13 15:40:06 -0700173 }
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800174 }
Craig Tiller6014e8a2017-10-16 13:50:29 -0700175 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700176 gpr_log(GPR_DEBUG, "chand=%p: setting connectivity state to %s", chand,
177 grpc_connectivity_state_name(state));
178 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800179 grpc_connectivity_state_set(&chand->state_tracker, state, error, reason);
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800180}
181
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800182static void on_lb_policy_state_changed_locked(void* arg, grpc_error* error) {
Noah Eisen4d20a662018-02-09 09:34:04 -0800183 lb_policy_connectivity_watcher* w =
184 static_cast<lb_policy_connectivity_watcher*>(arg);
Craig Tillerc5de8352017-02-09 14:08:05 -0800185 /* check if the notification is for the latest policy */
Mark D. Rothc8875492018-02-20 08:33:48 -0800186 if (w->lb_policy == w->chand->lb_policy.get()) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700187 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700188 gpr_log(GPR_DEBUG, "chand=%p: lb_policy=%p state changed to %s", w->chand,
189 w->lb_policy, grpc_connectivity_state_name(w->state));
190 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800191 set_channel_connectivity_state_locked(w->chand, w->state,
Craig Tillerc5de8352017-02-09 14:08:05 -0800192 GRPC_ERROR_REF(error), "lb_changed");
193 if (w->state != GRPC_CHANNEL_SHUTDOWN) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800194 watch_lb_policy_locked(w->chand, w->lb_policy, w->state);
Craig Tillerc5de8352017-02-09 14:08:05 -0800195 }
Craig Tillera82950e2015-09-22 12:33:20 -0700196 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800197 GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack, "watch_lb_policy");
Craig Tillera82950e2015-09-22 12:33:20 -0700198 gpr_free(w);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700199}
200
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800201static void watch_lb_policy_locked(channel_data* chand,
Mark D. Rothc8875492018-02-20 08:33:48 -0800202 grpc_core::LoadBalancingPolicy* lb_policy,
Craig Tiller2400bf52017-02-09 16:25:19 -0800203 grpc_connectivity_state current_state) {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700204 lb_policy_connectivity_watcher* w =
Noah Eisenbe82e642018-02-09 09:16:55 -0800205 static_cast<lb_policy_connectivity_watcher*>(gpr_malloc(sizeof(*w)));
Craig Tiller906e3bc2015-11-24 07:31:31 -0800206 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy");
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700207 w->chand = chand;
ncteisen274bbbe2017-06-08 14:57:11 -0700208 GRPC_CLOSURE_INIT(&w->on_changed, on_lb_policy_state_changed_locked, w,
Craig Tilleree4b1452017-05-12 10:56:03 -0700209 grpc_combiner_scheduler(chand->combiner));
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700210 w->state = current_state;
211 w->lb_policy = lb_policy;
Mark D. Rothc8875492018-02-20 08:33:48 -0800212 lb_policy->NotifyOnStateChangeLocked(&w->state, &w->on_changed);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700213}
214
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800215static void start_resolving_locked(channel_data* chand) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700216 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700217 gpr_log(GPR_DEBUG, "chand=%p: starting name resolution", chand);
218 }
219 GPR_ASSERT(!chand->started_resolving);
220 chand->started_resolving = true;
221 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
Mark D. Roth209f6442018-02-08 10:26:46 -0800222 chand->resolver->NextLocked(&chand->resolver_result,
223 &chand->on_resolver_result_changed);
Mark D. Roth60751fe2017-07-07 12:50:33 -0700224}
225
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800226typedef struct {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700227 char* server_name;
228 grpc_server_retry_throttle_data* retry_throttle_data;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800229} service_config_parsing_state;
230
Mark D. Roth3e7f2df2018-02-26 13:17:06 -0800231static void parse_retry_throttle_params(
232 const grpc_json* field, service_config_parsing_state* parsing_state) {
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800233 if (strcmp(field->key, "retryThrottling") == 0) {
Craig Tiller4782d922017-11-10 09:53:21 -0800234 if (parsing_state->retry_throttle_data != nullptr) return; // Duplicate.
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800235 if (field->type != GRPC_JSON_OBJECT) return;
236 int max_milli_tokens = 0;
237 int milli_token_ratio = 0;
Craig Tiller4782d922017-11-10 09:53:21 -0800238 for (grpc_json* sub_field = field->child; sub_field != nullptr;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800239 sub_field = sub_field->next) {
Craig Tiller4782d922017-11-10 09:53:21 -0800240 if (sub_field->key == nullptr) return;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800241 if (strcmp(sub_field->key, "maxTokens") == 0) {
242 if (max_milli_tokens != 0) return; // Duplicate.
243 if (sub_field->type != GRPC_JSON_NUMBER) return;
244 max_milli_tokens = gpr_parse_nonnegative_int(sub_field->value);
245 if (max_milli_tokens == -1) return;
246 max_milli_tokens *= 1000;
247 } else if (strcmp(sub_field->key, "tokenRatio") == 0) {
248 if (milli_token_ratio != 0) return; // Duplicate.
249 if (sub_field->type != GRPC_JSON_NUMBER) return;
250 // We support up to 3 decimal digits.
251 size_t whole_len = strlen(sub_field->value);
252 uint32_t multiplier = 1;
253 uint32_t decimal_value = 0;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700254 const char* decimal_point = strchr(sub_field->value, '.');
Craig Tiller4782d922017-11-10 09:53:21 -0800255 if (decimal_point != nullptr) {
Noah Eisenbe82e642018-02-09 09:16:55 -0800256 whole_len = static_cast<size_t>(decimal_point - sub_field->value);
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800257 multiplier = 1000;
258 size_t decimal_len = strlen(decimal_point + 1);
259 if (decimal_len > 3) decimal_len = 3;
260 if (!gpr_parse_bytes_to_uint32(decimal_point + 1, decimal_len,
261 &decimal_value)) {
262 return;
263 }
264 uint32_t decimal_multiplier = 1;
265 for (size_t i = 0; i < (3 - decimal_len); ++i) {
266 decimal_multiplier *= 10;
267 }
268 decimal_value *= decimal_multiplier;
269 }
270 uint32_t whole_value;
271 if (!gpr_parse_bytes_to_uint32(sub_field->value, whole_len,
272 &whole_value)) {
273 return;
274 }
Noah Eisen4d20a662018-02-09 09:34:04 -0800275 milli_token_ratio =
276 static_cast<int>((whole_value * multiplier) + decimal_value);
Mark D. Rothb3322562017-02-23 14:38:02 -0800277 if (milli_token_ratio <= 0) return;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800278 }
279 }
280 parsing_state->retry_throttle_data =
281 grpc_retry_throttle_map_get_data_for_server(
282 parsing_state->server_name, max_milli_tokens, milli_token_ratio);
283 }
284}
285
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800286static void request_reresolution_locked(void* arg, grpc_error* error) {
Noah Eisen4d20a662018-02-09 09:34:04 -0800287 reresolution_request_args* args =
288 static_cast<reresolution_request_args*>(arg);
Juanli Shen592cf342017-12-04 20:52:01 -0800289 channel_data* chand = args->chand;
290 // If this invocation is for a stale LB policy, treat it as an LB shutdown
291 // signal.
Mark D. Rothc8875492018-02-20 08:33:48 -0800292 if (args->lb_policy != chand->lb_policy.get() || error != GRPC_ERROR_NONE ||
Juanli Shen592cf342017-12-04 20:52:01 -0800293 chand->resolver == nullptr) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800294 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "re-resolution");
Juanli Shen592cf342017-12-04 20:52:01 -0800295 gpr_free(args);
296 return;
297 }
298 if (grpc_client_channel_trace.enabled()) {
299 gpr_log(GPR_DEBUG, "chand=%p: started name re-resolving", chand);
300 }
Mark D. Roth209f6442018-02-08 10:26:46 -0800301 chand->resolver->RequestReresolutionLocked();
Juanli Shen592cf342017-12-04 20:52:01 -0800302 // Give back the closure to the LB policy.
Mark D. Rothc8875492018-02-20 08:33:48 -0800303 chand->lb_policy->SetReresolutionClosureLocked(&args->closure);
Juanli Shen592cf342017-12-04 20:52:01 -0800304}
305
Mark D. Rothe63e06d2018-03-23 08:12:11 -0700306// TODO(roth): The logic in this function is very hard to follow. We
307// should refactor this so that it's easier to understand, perhaps as
308// part of changing the resolver API to more clearly differentiate
309// between transient failures and shutdown.
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800310static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
Noah Eisenbe82e642018-02-09 09:16:55 -0800311 channel_data* chand = static_cast<channel_data*>(arg);
Craig Tiller6014e8a2017-10-16 13:50:29 -0700312 if (grpc_client_channel_trace.enabled()) {
Mark D. Rothe63e06d2018-03-23 08:12:11 -0700313 gpr_log(GPR_DEBUG,
314 "chand=%p: got resolver result: resolver_result=%p error=%s", chand,
315 chand->resolver_result, grpc_error_string(error));
Mark D. Roth60751fe2017-07-07 12:50:33 -0700316 }
Mark D. Roth718c8342018-02-28 13:00:04 -0800317 // Extract the following fields from the resolver result, if non-nullptr.
Mark D. Roth15494b52017-07-12 15:26:55 -0700318 bool lb_policy_updated = false;
Mark D. Rothc8875492018-02-20 08:33:48 -0800319 bool lb_policy_created = false;
Craig Tiller4782d922017-11-10 09:53:21 -0800320 char* lb_policy_name_dup = nullptr;
Mark D. Roth60751fe2017-07-07 12:50:33 -0700321 bool lb_policy_name_changed = false;
Mark D. Rothc8875492018-02-20 08:33:48 -0800322 grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> new_lb_policy;
Craig Tiller4782d922017-11-10 09:53:21 -0800323 char* service_config_json = nullptr;
324 grpc_server_retry_throttle_data* retry_throttle_data = nullptr;
Mark D. Roth3e7f2df2018-02-26 13:17:06 -0800325 grpc_core::RefCountedPtr<MethodParamsTable> method_params_table;
Craig Tiller4782d922017-11-10 09:53:21 -0800326 if (chand->resolver_result != nullptr) {
Juanli Shen592cf342017-12-04 20:52:01 -0800327 if (chand->resolver != nullptr) {
328 // Find LB policy name.
Juanli Shen592cf342017-12-04 20:52:01 -0800329 const grpc_arg* channel_arg = grpc_channel_args_find(
330 chand->resolver_result, GRPC_ARG_LB_POLICY_NAME);
ncteisenbf323a92018-02-14 17:34:05 -0800331 const char* lb_policy_name = grpc_channel_arg_get_string(channel_arg);
Juanli Shen592cf342017-12-04 20:52:01 -0800332 // Special case: If at least one balancer address is present, we use
333 // the grpclb policy, regardless of what the resolver actually specified.
334 channel_arg =
335 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES);
336 if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) {
337 grpc_lb_addresses* addresses =
Noah Eisenbe82e642018-02-09 09:16:55 -0800338 static_cast<grpc_lb_addresses*>(channel_arg->value.pointer.p);
Juanli Shen592cf342017-12-04 20:52:01 -0800339 bool found_balancer_address = false;
340 for (size_t i = 0; i < addresses->num_addresses; ++i) {
341 if (addresses->addresses[i].is_balancer) {
342 found_balancer_address = true;
343 break;
344 }
345 }
346 if (found_balancer_address) {
347 if (lb_policy_name != nullptr &&
348 strcmp(lb_policy_name, "grpclb") != 0) {
349 gpr_log(GPR_INFO,
350 "resolver requested LB policy %s but provided at least one "
351 "balancer address -- forcing use of grpclb LB policy",
352 lb_policy_name);
353 }
354 lb_policy_name = "grpclb";
355 }
356 }
357 // Use pick_first if nothing was specified and we didn't select grpclb
358 // above.
359 if (lb_policy_name == nullptr) lb_policy_name = "pick_first";
Juanli Shen592cf342017-12-04 20:52:01 -0800360 // Check to see if we're already using the right LB policy.
361 // Note: It's safe to use chand->info_lb_policy_name here without
362 // taking a lock on chand->info_mu, because this function is the
363 // only thing that modifies its value, and it can only be invoked
364 // once at any given time.
365 lb_policy_name_changed =
366 chand->info_lb_policy_name == nullptr ||
367 gpr_stricmp(chand->info_lb_policy_name, lb_policy_name) != 0;
368 if (chand->lb_policy != nullptr && !lb_policy_name_changed) {
369 // Continue using the same LB policy. Update with new addresses.
370 lb_policy_updated = true;
Mark D. Rothc8875492018-02-20 08:33:48 -0800371 chand->lb_policy->UpdateLocked(*chand->resolver_result);
Juanli Shen592cf342017-12-04 20:52:01 -0800372 } else {
373 // Instantiate new LB policy.
Mark D. Rothc8875492018-02-20 08:33:48 -0800374 grpc_core::LoadBalancingPolicy::Args lb_policy_args;
375 lb_policy_args.combiner = chand->combiner;
376 lb_policy_args.client_channel_factory = chand->client_channel_factory;
377 lb_policy_args.args = chand->resolver_result;
378 new_lb_policy =
379 grpc_core::LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
380 lb_policy_name, lb_policy_args);
Juanli Shen592cf342017-12-04 20:52:01 -0800381 if (new_lb_policy == nullptr) {
382 gpr_log(GPR_ERROR, "could not create LB policy \"%s\"",
383 lb_policy_name);
384 } else {
Mark D. Roth3ef4af22018-02-21 07:53:26 -0800385 lb_policy_created = true;
Juanli Shen592cf342017-12-04 20:52:01 -0800386 reresolution_request_args* args =
Noah Eisen4d20a662018-02-09 09:34:04 -0800387 static_cast<reresolution_request_args*>(
388 gpr_zalloc(sizeof(*args)));
Juanli Shen592cf342017-12-04 20:52:01 -0800389 args->chand = chand;
Mark D. Rothc8875492018-02-20 08:33:48 -0800390 args->lb_policy = new_lb_policy.get();
Juanli Shen592cf342017-12-04 20:52:01 -0800391 GRPC_CLOSURE_INIT(&args->closure, request_reresolution_locked, args,
392 grpc_combiner_scheduler(chand->combiner));
393 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "re-resolution");
Mark D. Rothc8875492018-02-20 08:33:48 -0800394 new_lb_policy->SetReresolutionClosureLocked(&args->closure);
Juanli Shen592cf342017-12-04 20:52:01 -0800395 }
396 }
Mark D. Roth718c8342018-02-28 13:00:04 -0800397 // Before we clean up, save a copy of lb_policy_name, since it might
398 // be pointing to data inside chand->resolver_result.
399 // The copy will be saved in chand->lb_policy_name below.
400 lb_policy_name_dup = gpr_strdup(lb_policy_name);
Juanli Shen592cf342017-12-04 20:52:01 -0800401 // Find service config.
402 channel_arg = grpc_channel_args_find(chand->resolver_result,
403 GRPC_ARG_SERVICE_CONFIG);
ncteisenbf323a92018-02-14 17:34:05 -0800404 service_config_json =
405 gpr_strdup(grpc_channel_arg_get_string(channel_arg));
406 if (service_config_json != nullptr) {
Mark D. Roth3e7f2df2018-02-26 13:17:06 -0800407 grpc_core::UniquePtr<grpc_core::ServiceConfig> service_config =
408 grpc_core::ServiceConfig::Create(service_config_json);
Juanli Shen592cf342017-12-04 20:52:01 -0800409 if (service_config != nullptr) {
Mark D. Roth718c8342018-02-28 13:00:04 -0800410 if (chand->enable_retries) {
411 channel_arg = grpc_channel_args_find(chand->resolver_result,
412 GRPC_ARG_SERVER_URI);
413 const char* server_uri = grpc_channel_arg_get_string(channel_arg);
414 GPR_ASSERT(server_uri != nullptr);
415 grpc_uri* uri = grpc_uri_parse(server_uri, true);
416 GPR_ASSERT(uri->path[0] != '\0');
417 service_config_parsing_state parsing_state;
418 memset(&parsing_state, 0, sizeof(parsing_state));
419 parsing_state.server_name =
420 uri->path[0] == '/' ? uri->path + 1 : uri->path;
421 service_config->ParseGlobalParams(parse_retry_throttle_params,
422 &parsing_state);
423 grpc_uri_destroy(uri);
424 retry_throttle_data = parsing_state.retry_throttle_data;
425 }
Mark D. Roth3e7f2df2018-02-26 13:17:06 -0800426 method_params_table = service_config->CreateMethodConfigTable(
427 ClientChannelMethodParams::CreateFromJson);
Juanli Shen592cf342017-12-04 20:52:01 -0800428 }
429 }
Mark D. Roth9fe284e2016-09-12 11:22:27 -0700430 }
Craig Tillera82950e2015-09-22 12:33:20 -0700431 }
Craig Tiller6014e8a2017-10-16 13:50:29 -0700432 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700433 gpr_log(GPR_DEBUG,
434 "chand=%p: resolver result: lb_policy_name=\"%s\"%s, "
435 "service_config=\"%s\"",
Yash Tibrewal9eb86722017-09-17 23:43:30 -0700436 chand, lb_policy_name_dup,
437 lb_policy_name_changed ? " (changed)" : "", service_config_json);
Mark D. Roth60751fe2017-07-07 12:50:33 -0700438 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700439 // Now swap out fields in chand. Note that the new values may still
Mark D. Roth718c8342018-02-28 13:00:04 -0800440 // be nullptr if (e.g.) the resolver failed to return results or the
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700441 // results did not contain the necessary data.
442 //
443 // First, swap out the data used by cc_get_channel_info().
Craig Tiller613dafa2017-02-09 12:00:43 -0800444 gpr_mu_lock(&chand->info_mu);
Craig Tiller4782d922017-11-10 09:53:21 -0800445 if (lb_policy_name_dup != nullptr) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800446 gpr_free(chand->info_lb_policy_name);
Yash Tibrewal9eb86722017-09-17 23:43:30 -0700447 chand->info_lb_policy_name = lb_policy_name_dup;
Mark D. Rothb2d24882016-10-27 15:44:07 -0700448 }
Craig Tiller4782d922017-11-10 09:53:21 -0800449 if (service_config_json != nullptr) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800450 gpr_free(chand->info_service_config_json);
451 chand->info_service_config_json = service_config_json;
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800452 }
Craig Tiller613dafa2017-02-09 12:00:43 -0800453 gpr_mu_unlock(&chand->info_mu);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700454 // Swap out the retry throttle data.
Craig Tiller4782d922017-11-10 09:53:21 -0800455 if (chand->retry_throttle_data != nullptr) {
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800456 grpc_server_retry_throttle_data_unref(chand->retry_throttle_data);
457 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700458 chand->retry_throttle_data = retry_throttle_data;
459 // Swap out the method params table.
Mark D. Roth3e7f2df2018-02-26 13:17:06 -0800460 chand->method_params_table = std::move(method_params_table);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700461 // If we have a new LB policy or are shutting down (in which case
Mark D. Roth718c8342018-02-28 13:00:04 -0800462 // new_lb_policy will be nullptr), swap out the LB policy, unreffing the
463 // old one and removing its fds from chand->interested_parties.
464 // Note that we do NOT do this if either (a) we updated the existing
465 // LB policy above or (b) we failed to create the new LB policy (in
466 // which case we want to continue using the most recent one we had).
Craig Tiller4782d922017-11-10 09:53:21 -0800467 if (new_lb_policy != nullptr || error != GRPC_ERROR_NONE ||
468 chand->resolver == nullptr) {
469 if (chand->lb_policy != nullptr) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700470 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700471 gpr_log(GPR_DEBUG, "chand=%p: unreffing lb_policy=%p", chand,
Mark D. Rothc8875492018-02-20 08:33:48 -0800472 chand->lb_policy.get());
Mark D. Roth60751fe2017-07-07 12:50:33 -0700473 }
Mark D. Rothc8875492018-02-20 08:33:48 -0800474 grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700475 chand->interested_parties);
Mark D. Rothc8875492018-02-20 08:33:48 -0800476 chand->lb_policy->HandOffPendingPicksLocked(new_lb_policy.get());
477 chand->lb_policy.reset();
Craig Tiller45724b32015-09-22 10:42:19 -0700478 }
Mark D. Rothc8875492018-02-20 08:33:48 -0800479 chand->lb_policy = std::move(new_lb_policy);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700480 }
481 // Now that we've swapped out the relevant fields of chand, check for
482 // error or shutdown.
Craig Tiller4782d922017-11-10 09:53:21 -0800483 if (error != GRPC_ERROR_NONE || chand->resolver == nullptr) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700484 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700485 gpr_log(GPR_DEBUG, "chand=%p: shutting down", chand);
486 }
Craig Tiller4782d922017-11-10 09:53:21 -0800487 if (chand->resolver != nullptr) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700488 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700489 gpr_log(GPR_DEBUG, "chand=%p: shutting down resolver", chand);
490 }
Mark D. Roth209f6442018-02-08 10:26:46 -0800491 chand->resolver.reset();
Craig Tiller76a5c0e2016-03-09 09:05:30 -0800492 }
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800493 set_channel_connectivity_state_locked(
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800494 chand, GRPC_CHANNEL_SHUTDOWN,
ncteisen4b36a3d2017-03-13 19:08:06 -0700495 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700496 "Got resolver result after disconnection", &error, 1),
Craig Tiller804ff712016-05-05 16:25:40 -0700497 "resolver_gone");
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700498 grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures,
499 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
500 "Channel disconnected", &error, 1));
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800501 GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
Mark D. Roth1b95f472018-02-15 12:54:02 -0800502 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "resolver");
Mark D. Rothe63e06d2018-03-23 08:12:11 -0700503 grpc_channel_args_destroy(chand->resolver_result);
504 chand->resolver_result = nullptr;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700505 } else { // Not shutting down.
506 grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700507 grpc_error* state_error =
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700508 GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy");
Mark D. Rothc8875492018-02-20 08:33:48 -0800509 if (lb_policy_created) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700510 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth60751fe2017-07-07 12:50:33 -0700511 gpr_log(GPR_DEBUG, "chand=%p: initializing new LB policy", chand);
512 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700513 GRPC_ERROR_UNREF(state_error);
Mark D. Rothc8875492018-02-20 08:33:48 -0800514 state = chand->lb_policy->CheckConnectivityLocked(&state_error);
515 grpc_pollset_set_add_pollset_set(chand->lb_policy->interested_parties(),
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700516 chand->interested_parties);
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800517 GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700518 if (chand->exit_idle_when_lb_policy_arrives) {
Mark D. Rothc8875492018-02-20 08:33:48 -0800519 chand->lb_policy->ExitIdleLocked();
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700520 chand->exit_idle_when_lb_policy_arrives = false;
521 }
Mark D. Rothc8875492018-02-20 08:33:48 -0800522 watch_lb_policy_locked(chand, chand->lb_policy.get(), state);
Mark D. Rothe63e06d2018-03-23 08:12:11 -0700523 } else if (chand->resolver_result == nullptr) {
524 // Transient failure.
525 GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700526 }
Mark D. Roth15494b52017-07-12 15:26:55 -0700527 if (!lb_policy_updated) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800528 set_channel_connectivity_state_locked(
529 chand, state, GRPC_ERROR_REF(state_error), "new_lb+resolver");
Mark D. Roth15494b52017-07-12 15:26:55 -0700530 }
Mark D. Rothe63e06d2018-03-23 08:12:11 -0700531 grpc_channel_args_destroy(chand->resolver_result);
532 chand->resolver_result = nullptr;
Mark D. Roth209f6442018-02-08 10:26:46 -0800533 chand->resolver->NextLocked(&chand->resolver_result,
534 &chand->on_resolver_result_changed);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700535 GRPC_ERROR_UNREF(state_error);
Craig Tillera82950e2015-09-22 12:33:20 -0700536 }
Craig Tiller3f475422015-06-25 10:43:05 -0700537}
538
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800539static void start_transport_op_locked(void* arg, grpc_error* error_ignored) {
Noah Eisenbe82e642018-02-09 09:16:55 -0800540 grpc_transport_op* op = static_cast<grpc_transport_op*>(arg);
Craig Tillerbaa14a92017-11-03 09:09:36 -0700541 grpc_channel_element* elem =
Noah Eisenbe82e642018-02-09 09:16:55 -0800542 static_cast<grpc_channel_element*>(op->handler_private.extra_arg);
543 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Craig Tiller000cd8f2015-09-18 07:20:29 -0700544
Craig Tiller4782d922017-11-10 09:53:21 -0800545 if (op->on_connectivity_state_change != nullptr) {
Craig Tillera82950e2015-09-22 12:33:20 -0700546 grpc_connectivity_state_notify_on_state_change(
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800547 &chand->state_tracker, op->connectivity_state,
Craig Tillera82950e2015-09-22 12:33:20 -0700548 op->on_connectivity_state_change);
Craig Tiller4782d922017-11-10 09:53:21 -0800549 op->on_connectivity_state_change = nullptr;
550 op->connectivity_state = nullptr;
Craig Tillera82950e2015-09-22 12:33:20 -0700551 }
552
Yuchen Zengc272dd72017-12-05 12:18:34 -0800553 if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
Craig Tiller4782d922017-11-10 09:53:21 -0800554 if (chand->lb_policy == nullptr) {
ncteisen274bbbe2017-06-08 14:57:11 -0700555 GRPC_CLOSURE_SCHED(
Yash Tibrewald6c292f2017-12-07 19:38:43 -0800556 op->send_ping.on_initiate,
Yuchen Zengc272dd72017-12-05 12:18:34 -0800557 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing"));
558 GRPC_CLOSURE_SCHED(
Yash Tibrewald6c292f2017-12-07 19:38:43 -0800559 op->send_ping.on_ack,
ncteisen4b36a3d2017-03-13 19:08:06 -0700560 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing"));
Craig Tiller26dab312015-12-07 14:43:47 -0800561 } else {
Mark D. Rothc8875492018-02-20 08:33:48 -0800562 chand->lb_policy->PingOneLocked(op->send_ping.on_initiate,
563 op->send_ping.on_ack);
Craig Tiller4782d922017-11-10 09:53:21 -0800564 op->bind_pollset = nullptr;
Craig Tiller26dab312015-12-07 14:43:47 -0800565 }
Yuchen Zengc272dd72017-12-05 12:18:34 -0800566 op->send_ping.on_initiate = nullptr;
567 op->send_ping.on_ack = nullptr;
Craig Tiller26dab312015-12-07 14:43:47 -0800568 }
569
Craig Tiller1c51edc2016-05-07 16:18:43 -0700570 if (op->disconnect_with_error != GRPC_ERROR_NONE) {
Craig Tiller4782d922017-11-10 09:53:21 -0800571 if (chand->resolver != nullptr) {
Craig Tiller1c51edc2016-05-07 16:18:43 -0700572 set_channel_connectivity_state_locked(
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800573 chand, GRPC_CHANNEL_SHUTDOWN,
Craig Tiller1c51edc2016-05-07 16:18:43 -0700574 GRPC_ERROR_REF(op->disconnect_with_error), "disconnect");
Mark D. Roth209f6442018-02-08 10:26:46 -0800575 chand->resolver.reset();
Craig Tiller1c51edc2016-05-07 16:18:43 -0700576 if (!chand->started_resolving) {
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700577 grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures,
Craig Tiller1c51edc2016-05-07 16:18:43 -0700578 GRPC_ERROR_REF(op->disconnect_with_error));
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800579 GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
Craig Tiller1c51edc2016-05-07 16:18:43 -0700580 }
Craig Tiller4782d922017-11-10 09:53:21 -0800581 if (chand->lb_policy != nullptr) {
Mark D. Rothc8875492018-02-20 08:33:48 -0800582 grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
Craig Tiller1c51edc2016-05-07 16:18:43 -0700583 chand->interested_parties);
Mark D. Rothc8875492018-02-20 08:33:48 -0800584 chand->lb_policy.reset();
Craig Tiller1c51edc2016-05-07 16:18:43 -0700585 }
Craig Tillerb12d22a2016-04-23 12:50:21 -0700586 }
Craig Tiller1c51edc2016-05-07 16:18:43 -0700587 GRPC_ERROR_UNREF(op->disconnect_with_error);
Craig Tillera82950e2015-09-22 12:33:20 -0700588 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800589 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "start_transport_op");
Craig Tillerd2e5cfc2017-02-09 13:02:20 -0800590
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800591 GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE);
Craig Tillerbefafe62017-02-09 11:30:54 -0800592}
593
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800594static void cc_start_transport_op(grpc_channel_element* elem,
Craig Tillerbaa14a92017-11-03 09:09:36 -0700595 grpc_transport_op* op) {
Noah Eisenbe82e642018-02-09 09:16:55 -0800596 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Craig Tillerbefafe62017-02-09 11:30:54 -0800597
Craig Tillerbefafe62017-02-09 11:30:54 -0800598 GPR_ASSERT(op->set_accept_stream == false);
Craig Tiller4782d922017-11-10 09:53:21 -0800599 if (op->bind_pollset != nullptr) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800600 grpc_pollset_set_add_pollset(chand->interested_parties, op->bind_pollset);
Craig Tillerbefafe62017-02-09 11:30:54 -0800601 }
602
Craig Tillerc55c1022017-03-10 10:26:42 -0800603 op->handler_private.extra_arg = elem;
Craig Tillerd2e5cfc2017-02-09 13:02:20 -0800604 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "start_transport_op");
ncteisen274bbbe2017-06-08 14:57:11 -0700605 GRPC_CLOSURE_SCHED(
ncteisen274bbbe2017-06-08 14:57:11 -0700606 GRPC_CLOSURE_INIT(&op->handler_private.closure, start_transport_op_locked,
Craig Tilleree4b1452017-05-12 10:56:03 -0700607 op, grpc_combiner_scheduler(chand->combiner)),
Craig Tillerbefafe62017-02-09 11:30:54 -0800608 GRPC_ERROR_NONE);
Craig Tillerca3e9d32015-06-27 18:37:27 -0700609}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800610
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800611static void cc_get_channel_info(grpc_channel_element* elem,
Craig Tillerbaa14a92017-11-03 09:09:36 -0700612 const grpc_channel_info* info) {
Noah Eisenbe82e642018-02-09 09:16:55 -0800613 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Craig Tiller613dafa2017-02-09 12:00:43 -0800614 gpr_mu_lock(&chand->info_mu);
Craig Tiller4782d922017-11-10 09:53:21 -0800615 if (info->lb_policy_name != nullptr) {
616 *info->lb_policy_name = chand->info_lb_policy_name == nullptr
617 ? nullptr
Craig Tiller613dafa2017-02-09 12:00:43 -0800618 : gpr_strdup(chand->info_lb_policy_name);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700619 }
Craig Tiller4782d922017-11-10 09:53:21 -0800620 if (info->service_config_json != nullptr) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800621 *info->service_config_json =
Craig Tiller4782d922017-11-10 09:53:21 -0800622 chand->info_service_config_json == nullptr
623 ? nullptr
Craig Tiller613dafa2017-02-09 12:00:43 -0800624 : gpr_strdup(chand->info_service_config_json);
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800625 }
Craig Tiller613dafa2017-02-09 12:00:43 -0800626 gpr_mu_unlock(&chand->info_mu);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700627}
628
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700629/* Constructor for channel_data */
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800630static grpc_error* cc_init_channel_elem(grpc_channel_element* elem,
Craig Tillerbaa14a92017-11-03 09:09:36 -0700631 grpc_channel_element_args* args) {
Noah Eisenbe82e642018-02-09 09:16:55 -0800632 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700633 GPR_ASSERT(args->is_last);
634 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800635 // Initialize data members.
Craig Tilleree4b1452017-05-12 10:56:03 -0700636 chand->combiner = grpc_combiner_create();
Craig Tillerd85477512017-02-09 12:02:39 -0800637 gpr_mu_init(&chand->info_mu);
Alexander Polcync3b1f182017-04-18 13:51:36 -0700638 gpr_mu_init(&chand->external_connectivity_watcher_list_mu);
639
640 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
Craig Tiller4782d922017-11-10 09:53:21 -0800641 chand->external_connectivity_watcher_list_head = nullptr;
Alexander Polcync3b1f182017-04-18 13:51:36 -0700642 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
643
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800644 chand->owning_stack = args->channel_stack;
ncteisen274bbbe2017-06-08 14:57:11 -0700645 GRPC_CLOSURE_INIT(&chand->on_resolver_result_changed,
Craig Tillerbefafe62017-02-09 11:30:54 -0800646 on_resolver_result_changed_locked, chand,
Craig Tilleree4b1452017-05-12 10:56:03 -0700647 grpc_combiner_scheduler(chand->combiner));
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800648 chand->interested_parties = grpc_pollset_set_create();
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700649 grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
650 "client_channel");
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800651 grpc_client_channel_start_backup_polling(chand->interested_parties);
Mark D. Roth718c8342018-02-28 13:00:04 -0800652 // Record max per-RPC retry buffer size.
653 const grpc_arg* arg = grpc_channel_args_find(
654 args->channel_args, GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE);
655 chand->per_rpc_retry_buffer_size = (size_t)grpc_channel_arg_get_integer(
656 arg, {DEFAULT_PER_RPC_RETRY_BUFFER_SIZE, 0, INT_MAX});
657 // Record enable_retries.
658 arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_ENABLE_RETRIES);
659 chand->enable_retries = grpc_channel_arg_get_bool(arg, true);
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800660 // Record client channel factory.
Mark D. Roth718c8342018-02-28 13:00:04 -0800661 arg = grpc_channel_args_find(args->channel_args,
662 GRPC_ARG_CLIENT_CHANNEL_FACTORY);
Craig Tiller4782d922017-11-10 09:53:21 -0800663 if (arg == nullptr) {
David Garcia Quintas228a5142017-03-30 19:43:00 -0700664 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
665 "Missing client channel factory in args for client channel filter");
666 }
667 if (arg->type != GRPC_ARG_POINTER) {
668 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
669 "client channel factory arg must be a pointer");
670 }
Yash Tibrewalbc130da2017-09-12 22:44:08 -0700671 grpc_client_channel_factory_ref(
Noah Eisenbe82e642018-02-09 09:16:55 -0800672 static_cast<grpc_client_channel_factory*>(arg->value.pointer.p));
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700673 chand->client_channel_factory =
Noah Eisenbe82e642018-02-09 09:16:55 -0800674 static_cast<grpc_client_channel_factory*>(arg->value.pointer.p);
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800675 // Get server name to resolve, using proxy mapper if needed.
Mark D. Roth86e90592016-11-18 09:56:40 -0800676 arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI);
Craig Tiller4782d922017-11-10 09:53:21 -0800677 if (arg == nullptr) {
David Garcia Quintas228a5142017-03-30 19:43:00 -0700678 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
679 "Missing server uri in args for client channel filter");
680 }
681 if (arg->type != GRPC_ARG_STRING) {
682 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
683 "server uri arg must be a string");
684 }
Craig Tiller4782d922017-11-10 09:53:21 -0800685 char* proxy_name = nullptr;
686 grpc_channel_args* new_args = nullptr;
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800687 grpc_proxy_mappers_map_name(arg->value.string, args->channel_args,
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800688 &proxy_name, &new_args);
689 // Instantiate resolver.
Mark D. Roth209f6442018-02-08 10:26:46 -0800690 chand->resolver = grpc_core::ResolverRegistry::CreateResolver(
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800691 proxy_name != nullptr ? proxy_name : arg->value.string,
Craig Tiller4782d922017-11-10 09:53:21 -0800692 new_args != nullptr ? new_args : args->channel_args,
Craig Tiller972470b2017-02-09 15:05:36 -0800693 chand->interested_parties, chand->combiner);
Craig Tiller4782d922017-11-10 09:53:21 -0800694 if (proxy_name != nullptr) gpr_free(proxy_name);
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800695 if (new_args != nullptr) grpc_channel_args_destroy(new_args);
Craig Tiller4782d922017-11-10 09:53:21 -0800696 if (chand->resolver == nullptr) {
ncteisen4b36a3d2017-03-13 19:08:06 -0700697 return GRPC_ERROR_CREATE_FROM_STATIC_STRING("resolver creation failed");
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800698 }
Craig Tiller3be7dd02017-04-03 14:30:03 -0700699 chand->deadline_checking_enabled =
700 grpc_deadline_checking_enabled(args->channel_args);
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800701 return GRPC_ERROR_NONE;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700702}
703
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800704static void shutdown_resolver_locked(void* arg, grpc_error* error) {
Mark D. Roth209f6442018-02-08 10:26:46 -0800705 grpc_core::Resolver* resolver = static_cast<grpc_core::Resolver*>(arg);
706 resolver->Orphan();
Craig Tiller972470b2017-02-09 15:05:36 -0800707}
708
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700709/* Destructor for channel_data */
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800710static void cc_destroy_channel_elem(grpc_channel_element* elem) {
Noah Eisenbe82e642018-02-09 09:16:55 -0800711 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Craig Tiller4782d922017-11-10 09:53:21 -0800712 if (chand->resolver != nullptr) {
ncteisen274bbbe2017-06-08 14:57:11 -0700713 GRPC_CLOSURE_SCHED(
Mark D. Roth209f6442018-02-08 10:26:46 -0800714 GRPC_CLOSURE_CREATE(shutdown_resolver_locked, chand->resolver.release(),
Yash Tibrewal0ee75742017-10-13 16:07:13 -0700715 grpc_combiner_scheduler(chand->combiner)),
Craig Tiller972470b2017-02-09 15:05:36 -0800716 GRPC_ERROR_NONE);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700717 }
Craig Tiller4782d922017-11-10 09:53:21 -0800718 if (chand->client_channel_factory != nullptr) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800719 grpc_client_channel_factory_unref(chand->client_channel_factory);
Mark D. Roth0e48a9a2016-09-08 14:14:39 -0700720 }
Craig Tiller4782d922017-11-10 09:53:21 -0800721 if (chand->lb_policy != nullptr) {
Mark D. Rothc8875492018-02-20 08:33:48 -0800722 grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700723 chand->interested_parties);
Mark D. Rothc8875492018-02-20 08:33:48 -0800724 chand->lb_policy.reset();
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700725 }
Craig Tiller613dafa2017-02-09 12:00:43 -0800726 gpr_free(chand->info_lb_policy_name);
727 gpr_free(chand->info_service_config_json);
Craig Tiller4782d922017-11-10 09:53:21 -0800728 if (chand->retry_throttle_data != nullptr) {
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800729 grpc_server_retry_throttle_data_unref(chand->retry_throttle_data);
730 }
Craig Tiller4782d922017-11-10 09:53:21 -0800731 if (chand->method_params_table != nullptr) {
Mark D. Roth3e7f2df2018-02-26 13:17:06 -0800732 chand->method_params_table.reset();
Mark D. Roth9fe284e2016-09-12 11:22:27 -0700733 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800734 grpc_client_channel_stop_backup_polling(chand->interested_parties);
735 grpc_connectivity_state_destroy(&chand->state_tracker);
736 grpc_pollset_set_destroy(chand->interested_parties);
737 GRPC_COMBINER_UNREF(chand->combiner, "client_channel");
Craig Tillerd85477512017-02-09 12:02:39 -0800738 gpr_mu_destroy(&chand->info_mu);
Alexander Polcync3b1f182017-04-18 13:51:36 -0700739 gpr_mu_destroy(&chand->external_connectivity_watcher_list_mu);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700740}
741
742/*************************************************************************
743 * PER-CALL FUNCTIONS
744 */
745
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700746// Max number of batches that can be pending on a call at any given
Mark D. Roth718c8342018-02-28 13:00:04 -0800747// time. This includes one batch for each of the following ops:
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700748// recv_initial_metadata
749// send_initial_metadata
750// recv_message
751// send_message
752// recv_trailing_metadata
753// send_trailing_metadata
Mark D. Roth718c8342018-02-28 13:00:04 -0800754#define MAX_PENDING_BATCHES 6
755
756// Retry support:
757//
758// In order to support retries, we act as a proxy for stream op batches.
759// When we get a batch from the surface, we add it to our list of pending
760// batches, and we then use those batches to construct separate "child"
761// batches to be started on the subchannel call. When the child batches
762// return, we then decide which pending batches have been completed and
763// schedule their callbacks accordingly. If a subchannel call fails and
764// we want to retry it, we do a new pick and start again, constructing
765// new "child" batches for the new subchannel call.
766//
767// Note that retries are committed when receiving data from the server
768// (except for Trailers-Only responses). However, there may be many
769// send ops started before receiving any data, so we may have already
770// completed some number of send ops (and returned the completions up to
771// the surface) by the time we realize that we need to retry. To deal
772// with this, we cache data for send ops, so that we can replay them on a
773// different subchannel call even after we have completed the original
774// batches.
775//
776// There are two sets of data to maintain:
777// - In call_data (in the parent channel), we maintain a list of pending
778// ops and cached data for send ops.
779// - In the subchannel call, we maintain state to indicate what ops have
780// already been sent down to that call.
781//
782// When constructing the "child" batches, we compare those two sets of
783// data to see which batches need to be sent to the subchannel call.
784
785// TODO(roth): In subsequent PRs:
786// - add support for transparent retries (including initial metadata)
787// - figure out how to record stats in census for retries
788// (census filter is on top of this one)
789// - add census stats for retries
790
791// State used for starting a retryable batch on a subchannel call.
792// This provides its own grpc_transport_stream_op_batch and other data
793// structures needed to populate the ops in the batch.
794// We allocate one struct on the arena for each attempt at starting a
795// batch on a given subchannel call.
796typedef struct {
797 gpr_refcount refs;
798 grpc_call_element* elem;
799 grpc_subchannel_call* subchannel_call; // Holds a ref.
800 // The batch to use in the subchannel call.
801 // Its payload field points to subchannel_call_retry_state.batch_payload.
802 grpc_transport_stream_op_batch batch;
803 // For send_initial_metadata.
804 // Note that we need to make a copy of the initial metadata for each
805 // subchannel call instead of just referring to the copy in call_data,
806 // because filters in the subchannel stack will probably add entries,
807 // so we need to start in a pristine state for each attempt of the call.
808 grpc_linked_mdelem* send_initial_metadata_storage;
809 grpc_metadata_batch send_initial_metadata;
810 // For send_message.
Mark D. Roth3d8b32d2018-03-09 13:25:40 -0800811 grpc_core::ManualConstructor<grpc_core::ByteStreamCache::CachingByteStream>
812 send_message;
Mark D. Roth718c8342018-02-28 13:00:04 -0800813 // For send_trailing_metadata.
814 grpc_linked_mdelem* send_trailing_metadata_storage;
815 grpc_metadata_batch send_trailing_metadata;
816 // For intercepting recv_initial_metadata.
817 grpc_metadata_batch recv_initial_metadata;
818 grpc_closure recv_initial_metadata_ready;
819 bool trailing_metadata_available;
820 // For intercepting recv_message.
821 grpc_closure recv_message_ready;
Mark D. Roth3d8b32d2018-03-09 13:25:40 -0800822 grpc_core::OrphanablePtr<grpc_core::ByteStream> recv_message;
Mark D. Roth718c8342018-02-28 13:00:04 -0800823 // For intercepting recv_trailing_metadata.
824 grpc_metadata_batch recv_trailing_metadata;
825 grpc_transport_stream_stats collect_stats;
826 // For intercepting on_complete.
827 grpc_closure on_complete;
828} subchannel_batch_data;
829
830// Retry state associated with a subchannel call.
831// Stored in the parent_data of the subchannel call object.
832typedef struct {
833 // subchannel_batch_data.batch.payload points to this.
834 grpc_transport_stream_op_batch_payload batch_payload;
835 // These fields indicate which ops have been started and completed on
836 // this subchannel call.
837 size_t started_send_message_count;
838 size_t completed_send_message_count;
839 size_t started_recv_message_count;
840 size_t completed_recv_message_count;
841 bool started_send_initial_metadata : 1;
842 bool completed_send_initial_metadata : 1;
843 bool started_send_trailing_metadata : 1;
844 bool completed_send_trailing_metadata : 1;
845 bool started_recv_initial_metadata : 1;
846 bool completed_recv_initial_metadata : 1;
847 bool started_recv_trailing_metadata : 1;
848 bool completed_recv_trailing_metadata : 1;
849 // State for callback processing.
850 bool retry_dispatched : 1;
851 bool recv_initial_metadata_ready_deferred : 1;
852 bool recv_message_ready_deferred : 1;
853 grpc_error* recv_initial_metadata_error;
854 grpc_error* recv_message_error;
855} subchannel_call_retry_state;
856
857// Pending batches stored in call data.
858typedef struct {
859 // The pending batch. If nullptr, this slot is empty.
860 grpc_transport_stream_op_batch* batch;
861 // Indicates whether payload for send ops has been cached in call data.
862 bool send_ops_cached;
863} pending_batch;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700864
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700865/** Call data. Holds a pointer to grpc_subchannel_call and the
866 associated machinery to create such a pointer.
867 Handles queueing of stream ops until a call object is ready, waiting
868 for initial metadata before trying to create a call object,
869 and handling cancellation gracefully. */
870typedef struct client_channel_call_data {
Mark D. Roth72f6da82016-09-02 13:42:38 -0700871 // State for handling deadlines.
872 // The code in deadline_filter.c requires this to be the first field.
Mark D. Roth72f6da82016-09-02 13:42:38 -0700873 // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
Mark D. Roth66f3d2b2017-09-01 09:02:17 -0700874 // and this struct both independently store pointers to the call stack
875 // and call combiner. If/when we have time, find a way to avoid this
876 // without breaking the grpc_deadline_state abstraction.
Mark D. Roth72f6da82016-09-02 13:42:38 -0700877 grpc_deadline_state deadline_state;
Mark D. Rothf28763c2016-09-14 15:18:40 -0700878
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800879 grpc_slice path; // Request path.
Mark D. Rothe40dd292016-10-05 14:58:37 -0700880 gpr_timespec call_start_time;
Craig Tiller89c14282017-07-19 15:32:27 -0700881 grpc_millis deadline;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700882 gpr_arena* arena;
883 grpc_call_stack* owning_call;
884 grpc_call_combiner* call_combiner;
Mark D. Roth76e264b2017-08-25 09:03:33 -0700885
Craig Tillerbaa14a92017-11-03 09:09:36 -0700886 grpc_server_retry_throttle_data* retry_throttle_data;
Mark D. Roth3e7f2df2018-02-26 13:17:06 -0800887 grpc_core::RefCountedPtr<ClientChannelMethodParams> method_params;
Mark D. Rothaa850a72016-09-26 13:38:02 -0700888
Craig Tillerbaa14a92017-11-03 09:09:36 -0700889 grpc_subchannel_call* subchannel_call;
Mark D. Roth718c8342018-02-28 13:00:04 -0800890
891 // Set when we get a cancel_stream op.
892 grpc_error* cancel_error;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700893
Mark D. Rothc8875492018-02-20 08:33:48 -0800894 grpc_core::LoadBalancingPolicy::PickState pick;
Mark D. Roth718c8342018-02-28 13:00:04 -0800895 grpc_closure pick_closure;
896 grpc_closure pick_cancel_closure;
Mark D. Roth60751fe2017-07-07 12:50:33 -0700897
Craig Tillerbaa14a92017-11-03 09:09:36 -0700898 grpc_polling_entity* pollent;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700899
Mark D. Roth718c8342018-02-28 13:00:04 -0800900 // Batches are added to this list when received from above.
901 // They are removed when we are done handling the batch (i.e., when
902 // either we have invoked all of the batch's callbacks or we have
903 // passed the batch down to the subchannel call and are not
904 // intercepting any of its callbacks).
905 pending_batch pending_batches[MAX_PENDING_BATCHES];
906 bool pending_send_initial_metadata : 1;
907 bool pending_send_message : 1;
908 bool pending_send_trailing_metadata : 1;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700909
Mark D. Roth718c8342018-02-28 13:00:04 -0800910 // Retry state.
911 bool enable_retries : 1;
912 bool retry_committed : 1;
913 bool last_attempt_got_server_pushback : 1;
914 int num_attempts_completed;
915 size_t bytes_buffered_for_retry;
916 grpc_core::ManualConstructor<grpc_core::BackOff> retry_backoff;
917 grpc_timer retry_timer;
David Garcia Quintasd1a47f12016-09-02 12:46:44 +0200918
Mark D. Roth718c8342018-02-28 13:00:04 -0800919 // Cached data for retrying send ops.
920 // send_initial_metadata
921 bool seen_send_initial_metadata;
922 grpc_linked_mdelem* send_initial_metadata_storage;
923 grpc_metadata_batch send_initial_metadata;
924 uint32_t send_initial_metadata_flags;
925 gpr_atm* peer_string;
926 // send_message
927 // When we get a send_message op, we replace the original byte stream
Mark D. Roth3d8b32d2018-03-09 13:25:40 -0800928 // with a CachingByteStream that caches the slices to a local buffer for
929 // use in retries.
Mark D. Roth718c8342018-02-28 13:00:04 -0800930 // Note: We inline the cache for the first 3 send_message ops and use
931 // dynamic allocation after that. This number was essentially picked
932 // at random; it could be changed in the future to tune performance.
Mark D. Rothefcd45b2018-03-28 10:49:59 -0700933 grpc_core::ManualConstructor<
934 grpc_core::InlinedVector<grpc_core::ByteStreamCache*, 3>>
935 send_messages;
Mark D. Roth718c8342018-02-28 13:00:04 -0800936 // send_trailing_metadata
937 bool seen_send_trailing_metadata;
938 grpc_linked_mdelem* send_trailing_metadata_storage;
939 grpc_metadata_batch send_trailing_metadata;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700940} call_data;
941
Mark D. Roth718c8342018-02-28 13:00:04 -0800942// Forward declarations.
943static void retry_commit(grpc_call_element* elem,
944 subchannel_call_retry_state* retry_state);
945static void start_internal_recv_trailing_metadata(grpc_call_element* elem);
946static void on_complete(void* arg, grpc_error* error);
947static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored);
948static void pick_after_resolver_result_start_locked(grpc_call_element* elem);
949static void start_pick_locked(void* arg, grpc_error* ignored);
Craig Tiller8b1d59c2016-12-27 15:15:30 -0800950
Mark D. Roth718c8342018-02-28 13:00:04 -0800951//
952// send op data caching
953//
954
955// Caches data for send ops so that it can be retried later, if not
956// already cached.
957static void maybe_cache_send_ops_for_batch(call_data* calld,
958 pending_batch* pending) {
959 if (pending->send_ops_cached) return;
960 pending->send_ops_cached = true;
961 grpc_transport_stream_op_batch* batch = pending->batch;
962 // Save a copy of metadata for send_initial_metadata ops.
Mark D. Roth76e264b2017-08-25 09:03:33 -0700963 if (batch->send_initial_metadata) {
Mark D. Roth718c8342018-02-28 13:00:04 -0800964 calld->seen_send_initial_metadata = true;
965 GPR_ASSERT(calld->send_initial_metadata_storage == nullptr);
966 grpc_metadata_batch* send_initial_metadata =
967 batch->payload->send_initial_metadata.send_initial_metadata;
968 calld->send_initial_metadata_storage = (grpc_linked_mdelem*)gpr_arena_alloc(
969 calld->arena,
970 sizeof(grpc_linked_mdelem) * send_initial_metadata->list.count);
971 grpc_metadata_batch_copy(send_initial_metadata,
972 &calld->send_initial_metadata,
973 calld->send_initial_metadata_storage);
974 calld->send_initial_metadata_flags =
975 batch->payload->send_initial_metadata.send_initial_metadata_flags;
976 calld->peer_string = batch->payload->send_initial_metadata.peer_string;
977 }
978 // Set up cache for send_message ops.
979 if (batch->send_message) {
Mark D. Roth3d8b32d2018-03-09 13:25:40 -0800980 grpc_core::ByteStreamCache* cache =
981 static_cast<grpc_core::ByteStreamCache*>(
982 gpr_arena_alloc(calld->arena, sizeof(grpc_core::ByteStreamCache)));
983 new (cache) grpc_core::ByteStreamCache(
984 std::move(batch->payload->send_message.send_message));
Mark D. Rothefcd45b2018-03-28 10:49:59 -0700985 calld->send_messages->push_back(cache);
Mark D. Roth718c8342018-02-28 13:00:04 -0800986 }
987 // Save metadata batch for send_trailing_metadata ops.
988 if (batch->send_trailing_metadata) {
989 calld->seen_send_trailing_metadata = true;
990 GPR_ASSERT(calld->send_trailing_metadata_storage == nullptr);
991 grpc_metadata_batch* send_trailing_metadata =
992 batch->payload->send_trailing_metadata.send_trailing_metadata;
993 calld->send_trailing_metadata_storage =
994 (grpc_linked_mdelem*)gpr_arena_alloc(
995 calld->arena,
996 sizeof(grpc_linked_mdelem) * send_trailing_metadata->list.count);
997 grpc_metadata_batch_copy(send_trailing_metadata,
998 &calld->send_trailing_metadata,
999 calld->send_trailing_metadata_storage);
Mark D. Roth76e264b2017-08-25 09:03:33 -07001000 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001001}
1002
Mark D. Roth718c8342018-02-28 13:00:04 -08001003// Frees cached send ops that have already been completed after
1004// committing the call.
1005static void free_cached_send_op_data_after_commit(
1006 grpc_call_element* elem, subchannel_call_retry_state* retry_state) {
Noah Eisenbe82e642018-02-09 09:16:55 -08001007 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1008 call_data* calld = static_cast<call_data*>(elem->call_data);
Mark D. Roth718c8342018-02-28 13:00:04 -08001009 if (retry_state->completed_send_initial_metadata) {
1010 grpc_metadata_batch_destroy(&calld->send_initial_metadata);
Mark D. Roth60751fe2017-07-07 12:50:33 -07001011 }
Mark D. Roth718c8342018-02-28 13:00:04 -08001012 for (size_t i = 0; i < retry_state->completed_send_message_count; ++i) {
1013 if (grpc_client_channel_trace.enabled()) {
1014 gpr_log(GPR_DEBUG,
1015 "chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR
1016 "]",
1017 chand, calld, i);
1018 }
Mark D. Rothefcd45b2018-03-28 10:49:59 -07001019 (*calld->send_messages)[i]->Destroy();
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001020 }
Mark D. Roth718c8342018-02-28 13:00:04 -08001021 if (retry_state->completed_send_trailing_metadata) {
1022 grpc_metadata_batch_destroy(&calld->send_trailing_metadata);
1023 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001024}
1025
Mark D. Roth718c8342018-02-28 13:00:04 -08001026// Frees cached send ops that were completed by the completed batch in
1027// batch_data. Used when batches are completed after the call is committed.
1028static void free_cached_send_op_data_for_completed_batch(
1029 grpc_call_element* elem, subchannel_batch_data* batch_data,
1030 subchannel_call_retry_state* retry_state) {
Noah Eisenbe82e642018-02-09 09:16:55 -08001031 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1032 call_data* calld = static_cast<call_data*>(elem->call_data);
Mark D. Roth718c8342018-02-28 13:00:04 -08001033 if (batch_data->batch.send_initial_metadata) {
1034 grpc_metadata_batch_destroy(&calld->send_initial_metadata);
1035 }
1036 if (batch_data->batch.send_message) {
1037 if (grpc_client_channel_trace.enabled()) {
1038 gpr_log(GPR_DEBUG,
1039 "chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR
1040 "]",
1041 chand, calld, retry_state->completed_send_message_count - 1);
1042 }
Mark D. Rothefcd45b2018-03-28 10:49:59 -07001043 (*calld->send_messages)[retry_state->completed_send_message_count - 1]
Mark D. Roth3d8b32d2018-03-09 13:25:40 -08001044 ->Destroy();
Mark D. Roth718c8342018-02-28 13:00:04 -08001045 }
1046 if (batch_data->batch.send_trailing_metadata) {
1047 grpc_metadata_batch_destroy(&calld->send_trailing_metadata);
1048 }
1049}
1050
1051//
1052// pending_batches management
1053//
1054
1055// Returns the index into calld->pending_batches to be used for batch.
1056static size_t get_batch_index(grpc_transport_stream_op_batch* batch) {
1057 // Note: It is important the send_initial_metadata be the first entry
1058 // here, since the code in pick_subchannel_locked() assumes it will be.
1059 if (batch->send_initial_metadata) return 0;
1060 if (batch->send_message) return 1;
1061 if (batch->send_trailing_metadata) return 2;
1062 if (batch->recv_initial_metadata) return 3;
1063 if (batch->recv_message) return 4;
1064 if (batch->recv_trailing_metadata) return 5;
1065 GPR_UNREACHABLE_CODE(return (size_t)-1);
1066}
1067
1068// This is called via the call combiner, so access to calld is synchronized.
1069static void pending_batches_add(grpc_call_element* elem,
1070 grpc_transport_stream_op_batch* batch) {
1071 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1072 call_data* calld = static_cast<call_data*>(elem->call_data);
1073 const size_t idx = get_batch_index(batch);
Craig Tiller6014e8a2017-10-16 13:50:29 -07001074 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth718c8342018-02-28 13:00:04 -08001075 gpr_log(GPR_DEBUG,
1076 "chand=%p calld=%p: adding pending batch at index %" PRIuPTR, chand,
1077 calld, idx);
Mark D. Roth60751fe2017-07-07 12:50:33 -07001078 }
Mark D. Roth718c8342018-02-28 13:00:04 -08001079 pending_batch* pending = &calld->pending_batches[idx];
1080 GPR_ASSERT(pending->batch == nullptr);
1081 pending->batch = batch;
1082 pending->send_ops_cached = false;
1083 if (calld->enable_retries) {
1084 // Update state in calld about pending batches.
1085 // Also check if the batch takes us over the retry buffer limit.
1086 // Note: We don't check the size of trailing metadata here, because
1087 // gRPC clients do not send trailing metadata.
1088 if (batch->send_initial_metadata) {
1089 calld->pending_send_initial_metadata = true;
1090 calld->bytes_buffered_for_retry += grpc_metadata_batch_size(
1091 batch->payload->send_initial_metadata.send_initial_metadata);
1092 }
1093 if (batch->send_message) {
1094 calld->pending_send_message = true;
1095 calld->bytes_buffered_for_retry +=
Mark D. Roth3d8b32d2018-03-09 13:25:40 -08001096 batch->payload->send_message.send_message->length();
Mark D. Roth718c8342018-02-28 13:00:04 -08001097 }
1098 if (batch->send_trailing_metadata) {
1099 calld->pending_send_trailing_metadata = true;
1100 }
1101 if (calld->bytes_buffered_for_retry > chand->per_rpc_retry_buffer_size) {
1102 if (grpc_client_channel_trace.enabled()) {
1103 gpr_log(GPR_DEBUG,
1104 "chand=%p calld=%p: exceeded retry buffer size, committing",
1105 chand, calld);
1106 }
1107 subchannel_call_retry_state* retry_state =
1108 calld->subchannel_call == nullptr
1109 ? nullptr
1110 : static_cast<subchannel_call_retry_state*>(
1111 grpc_connected_subchannel_call_get_parent_data(
1112 calld->subchannel_call));
1113 retry_commit(elem, retry_state);
1114 // If we are not going to retry and have not yet started, pretend
1115 // retries are disabled so that we don't bother with retry overhead.
1116 if (calld->num_attempts_completed == 0) {
1117 if (grpc_client_channel_trace.enabled()) {
1118 gpr_log(GPR_DEBUG,
1119 "chand=%p calld=%p: disabling retries before first attempt",
1120 chand, calld);
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001121 }
Mark D. Roth718c8342018-02-28 13:00:04 -08001122 calld->enable_retries = false;
Craig Tiller11c17d42017-03-13 13:36:34 -07001123 }
1124 }
1125 }
Craig Tiller11c17d42017-03-13 13:36:34 -07001126}
Craig Tillerea4a4f12017-03-13 13:36:52 -07001127
Mark D. Roth718c8342018-02-28 13:00:04 -08001128static void pending_batch_clear(call_data* calld, pending_batch* pending) {
1129 if (calld->enable_retries) {
1130 if (pending->batch->send_initial_metadata) {
1131 calld->pending_send_initial_metadata = false;
1132 }
1133 if (pending->batch->send_message) {
1134 calld->pending_send_message = false;
1135 }
1136 if (pending->batch->send_trailing_metadata) {
1137 calld->pending_send_trailing_metadata = false;
1138 }
1139 }
1140 pending->batch = nullptr;
1141}
1142
1143// This is called via the call combiner, so access to calld is synchronized.
1144static void fail_pending_batch_in_call_combiner(void* arg, grpc_error* error) {
1145 grpc_transport_stream_op_batch* batch =
1146 static_cast<grpc_transport_stream_op_batch*>(arg);
1147 call_data* calld = static_cast<call_data*>(batch->handler_private.extra_arg);
1148 // Note: This will release the call combiner.
1149 grpc_transport_stream_op_batch_finish_with_failure(
1150 batch, GRPC_ERROR_REF(error), calld->call_combiner);
1151}
1152
1153// This is called via the call combiner, so access to calld is synchronized.
1154// If yield_call_combiner is true, assumes responsibility for yielding
1155// the call combiner.
1156static void pending_batches_fail(grpc_call_element* elem, grpc_error* error,
1157 bool yield_call_combiner) {
1158 GPR_ASSERT(error != GRPC_ERROR_NONE);
1159 call_data* calld = static_cast<call_data*>(elem->call_data);
1160 if (grpc_client_channel_trace.enabled()) {
1161 size_t num_batches = 0;
1162 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1163 if (calld->pending_batches[i].batch != nullptr) ++num_batches;
1164 }
1165 gpr_log(GPR_DEBUG,
1166 "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
1167 elem->channel_data, calld, num_batches, grpc_error_string(error));
1168 }
1169 grpc_transport_stream_op_batch*
1170 batches[GPR_ARRAY_SIZE(calld->pending_batches)];
1171 size_t num_batches = 0;
1172 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1173 pending_batch* pending = &calld->pending_batches[i];
1174 grpc_transport_stream_op_batch* batch = pending->batch;
1175 if (batch != nullptr) {
1176 batches[num_batches++] = batch;
1177 pending_batch_clear(calld, pending);
1178 }
1179 }
1180 for (size_t i = yield_call_combiner ? 1 : 0; i < num_batches; ++i) {
1181 grpc_transport_stream_op_batch* batch = batches[i];
1182 batch->handler_private.extra_arg = calld;
1183 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
1184 fail_pending_batch_in_call_combiner, batch,
1185 grpc_schedule_on_exec_ctx);
1186 GRPC_CALL_COMBINER_START(calld->call_combiner,
1187 &batch->handler_private.closure,
1188 GRPC_ERROR_REF(error), "pending_batches_fail");
1189 }
1190 if (yield_call_combiner) {
1191 if (num_batches > 0) {
1192 // Note: This will release the call combiner.
1193 grpc_transport_stream_op_batch_finish_with_failure(
1194 batches[0], GRPC_ERROR_REF(error), calld->call_combiner);
1195 } else {
1196 GRPC_CALL_COMBINER_STOP(calld->call_combiner, "pending_batches_fail");
1197 }
1198 }
1199 GRPC_ERROR_UNREF(error);
1200}
1201
1202// This is called via the call combiner, so access to calld is synchronized.
1203static void resume_pending_batch_in_call_combiner(void* arg,
1204 grpc_error* ignored) {
1205 grpc_transport_stream_op_batch* batch =
1206 static_cast<grpc_transport_stream_op_batch*>(arg);
1207 grpc_subchannel_call* subchannel_call =
1208 static_cast<grpc_subchannel_call*>(batch->handler_private.extra_arg);
1209 // Note: This will release the call combiner.
1210 grpc_subchannel_call_process_op(subchannel_call, batch);
1211}
1212
1213// This is called via the call combiner, so access to calld is synchronized.
1214static void pending_batches_resume(grpc_call_element* elem) {
Noah Eisenbe82e642018-02-09 09:16:55 -08001215 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1216 call_data* calld = static_cast<call_data*>(elem->call_data);
Mark D. Roth718c8342018-02-28 13:00:04 -08001217 if (calld->enable_retries) {
1218 start_retriable_subchannel_batches(elem, GRPC_ERROR_NONE);
1219 return;
1220 }
1221 // Retries not enabled; send down batches as-is.
1222 if (grpc_client_channel_trace.enabled()) {
1223 size_t num_batches = 0;
1224 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1225 if (calld->pending_batches[i].batch != nullptr) ++num_batches;
1226 }
1227 gpr_log(GPR_DEBUG,
1228 "chand=%p calld=%p: starting %" PRIuPTR
1229 " pending batches on subchannel_call=%p",
1230 chand, calld, num_batches, calld->subchannel_call);
1231 }
1232 grpc_transport_stream_op_batch*
1233 batches[GPR_ARRAY_SIZE(calld->pending_batches)];
1234 size_t num_batches = 0;
1235 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1236 pending_batch* pending = &calld->pending_batches[i];
1237 grpc_transport_stream_op_batch* batch = pending->batch;
1238 if (batch != nullptr) {
1239 batches[num_batches++] = batch;
1240 pending_batch_clear(calld, pending);
1241 }
1242 }
1243 for (size_t i = 1; i < num_batches; ++i) {
1244 grpc_transport_stream_op_batch* batch = batches[i];
1245 batch->handler_private.extra_arg = calld->subchannel_call;
1246 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
1247 resume_pending_batch_in_call_combiner, batch,
1248 grpc_schedule_on_exec_ctx);
1249 GRPC_CALL_COMBINER_START(calld->call_combiner,
1250 &batch->handler_private.closure, GRPC_ERROR_NONE,
1251 "pending_batches_resume");
1252 }
1253 GPR_ASSERT(num_batches > 0);
1254 // Note: This will release the call combiner.
1255 grpc_subchannel_call_process_op(calld->subchannel_call, batches[0]);
1256}
1257
1258static void maybe_clear_pending_batch(grpc_call_element* elem,
1259 pending_batch* pending) {
1260 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1261 call_data* calld = static_cast<call_data*>(elem->call_data);
1262 grpc_transport_stream_op_batch* batch = pending->batch;
1263 // We clear the pending batch if all of its callbacks have been
1264 // scheduled and reset to nullptr.
1265 if (batch->on_complete == nullptr &&
1266 (!batch->recv_initial_metadata ||
1267 batch->payload->recv_initial_metadata.recv_initial_metadata_ready ==
1268 nullptr) &&
1269 (!batch->recv_message ||
1270 batch->payload->recv_message.recv_message_ready == nullptr)) {
1271 if (grpc_client_channel_trace.enabled()) {
1272 gpr_log(GPR_DEBUG, "chand=%p calld=%p: clearing pending batch", chand,
1273 calld);
1274 }
1275 pending_batch_clear(calld, pending);
1276 }
1277}
1278
1279// Returns true if all ops in the pending batch have been completed.
1280static bool pending_batch_is_completed(
1281 pending_batch* pending, call_data* calld,
1282 subchannel_call_retry_state* retry_state) {
1283 if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
1284 return false;
1285 }
1286 if (pending->batch->send_initial_metadata &&
1287 !retry_state->completed_send_initial_metadata) {
1288 return false;
1289 }
1290 if (pending->batch->send_message &&
Mark D. Rothefcd45b2018-03-28 10:49:59 -07001291 retry_state->completed_send_message_count <
1292 calld->send_messages->size()) {
Mark D. Roth718c8342018-02-28 13:00:04 -08001293 return false;
1294 }
1295 if (pending->batch->send_trailing_metadata &&
1296 !retry_state->completed_send_trailing_metadata) {
1297 return false;
1298 }
1299 if (pending->batch->recv_initial_metadata &&
1300 !retry_state->completed_recv_initial_metadata) {
1301 return false;
1302 }
1303 if (pending->batch->recv_message &&
1304 retry_state->completed_recv_message_count <
1305 retry_state->started_recv_message_count) {
1306 return false;
1307 }
1308 if (pending->batch->recv_trailing_metadata &&
1309 !retry_state->completed_recv_trailing_metadata) {
1310 return false;
1311 }
1312 return true;
1313}
1314
1315// Returns true if any op in the batch was not yet started.
1316static bool pending_batch_is_unstarted(
1317 pending_batch* pending, call_data* calld,
1318 subchannel_call_retry_state* retry_state) {
1319 if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
1320 return false;
1321 }
1322 if (pending->batch->send_initial_metadata &&
1323 !retry_state->started_send_initial_metadata) {
1324 return true;
1325 }
1326 if (pending->batch->send_message &&
Mark D. Rothefcd45b2018-03-28 10:49:59 -07001327 retry_state->started_send_message_count < calld->send_messages->size()) {
Mark D. Roth718c8342018-02-28 13:00:04 -08001328 return true;
1329 }
1330 if (pending->batch->send_trailing_metadata &&
1331 !retry_state->started_send_trailing_metadata) {
1332 return true;
1333 }
1334 if (pending->batch->recv_initial_metadata &&
1335 !retry_state->started_recv_initial_metadata) {
1336 return true;
1337 }
1338 if (pending->batch->recv_message &&
1339 retry_state->completed_recv_message_count ==
1340 retry_state->started_recv_message_count) {
1341 return true;
1342 }
1343 if (pending->batch->recv_trailing_metadata &&
1344 !retry_state->started_recv_trailing_metadata) {
1345 return true;
1346 }
1347 return false;
1348}
1349
1350//
1351// retry code
1352//
1353
1354// Commits the call so that no further retry attempts will be performed.
1355static void retry_commit(grpc_call_element* elem,
1356 subchannel_call_retry_state* retry_state) {
1357 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1358 call_data* calld = static_cast<call_data*>(elem->call_data);
1359 if (calld->retry_committed) return;
1360 calld->retry_committed = true;
1361 if (grpc_client_channel_trace.enabled()) {
1362 gpr_log(GPR_DEBUG, "chand=%p calld=%p: committing retries", chand, calld);
1363 }
1364 if (retry_state != nullptr) {
1365 free_cached_send_op_data_after_commit(elem, retry_state);
1366 }
1367}
1368
1369// Starts a retry after appropriate back-off.
1370static void do_retry(grpc_call_element* elem,
1371 subchannel_call_retry_state* retry_state,
1372 grpc_millis server_pushback_ms) {
1373 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1374 call_data* calld = static_cast<call_data*>(elem->call_data);
1375 GPR_ASSERT(calld->method_params != nullptr);
1376 const ClientChannelMethodParams::RetryPolicy* retry_policy =
1377 calld->method_params->retry_policy();
1378 GPR_ASSERT(retry_policy != nullptr);
1379 // Reset subchannel call and connected subchannel.
1380 if (calld->subchannel_call != nullptr) {
1381 GRPC_SUBCHANNEL_CALL_UNREF(calld->subchannel_call,
1382 "client_channel_call_retry");
1383 calld->subchannel_call = nullptr;
1384 }
1385 if (calld->pick.connected_subchannel != nullptr) {
1386 calld->pick.connected_subchannel.reset();
1387 }
1388 // Compute backoff delay.
1389 grpc_millis next_attempt_time;
1390 if (server_pushback_ms >= 0) {
1391 next_attempt_time = grpc_core::ExecCtx::Get()->Now() + server_pushback_ms;
1392 calld->last_attempt_got_server_pushback = true;
1393 } else {
1394 if (calld->num_attempts_completed == 1 ||
1395 calld->last_attempt_got_server_pushback) {
1396 calld->retry_backoff.Init(
1397 grpc_core::BackOff::Options()
1398 .set_initial_backoff(retry_policy->initial_backoff)
1399 .set_multiplier(retry_policy->backoff_multiplier)
1400 .set_jitter(RETRY_BACKOFF_JITTER)
1401 .set_max_backoff(retry_policy->max_backoff));
1402 calld->last_attempt_got_server_pushback = false;
1403 }
1404 next_attempt_time = calld->retry_backoff->NextAttemptTime();
1405 }
1406 if (grpc_client_channel_trace.enabled()) {
1407 gpr_log(GPR_DEBUG,
1408 "chand=%p calld=%p: retrying failed call in %" PRIuPTR " ms", chand,
1409 calld, next_attempt_time - grpc_core::ExecCtx::Get()->Now());
1410 }
1411 // Schedule retry after computed delay.
1412 GRPC_CLOSURE_INIT(&calld->pick_closure, start_pick_locked, elem,
1413 grpc_combiner_scheduler(chand->combiner));
1414 grpc_timer_init(&calld->retry_timer, next_attempt_time, &calld->pick_closure);
1415 // Update bookkeeping.
1416 if (retry_state != nullptr) retry_state->retry_dispatched = true;
1417}
1418
1419// Returns true if the call is being retried.
1420static bool maybe_retry(grpc_call_element* elem,
1421 subchannel_batch_data* batch_data,
1422 grpc_status_code status,
1423 grpc_mdelem* server_pushback_md) {
1424 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1425 call_data* calld = static_cast<call_data*>(elem->call_data);
1426 // Get retry policy.
1427 if (calld->method_params == nullptr) return false;
1428 const ClientChannelMethodParams::RetryPolicy* retry_policy =
1429 calld->method_params->retry_policy();
1430 if (retry_policy == nullptr) return false;
1431 // If we've already dispatched a retry from this call, return true.
1432 // This catches the case where the batch has multiple callbacks
1433 // (i.e., it includes either recv_message or recv_initial_metadata).
1434 subchannel_call_retry_state* retry_state = nullptr;
1435 if (batch_data != nullptr) {
1436 retry_state = static_cast<subchannel_call_retry_state*>(
1437 grpc_connected_subchannel_call_get_parent_data(
1438 batch_data->subchannel_call));
1439 if (retry_state->retry_dispatched) {
1440 if (grpc_client_channel_trace.enabled()) {
1441 gpr_log(GPR_DEBUG, "chand=%p calld=%p: retry already dispatched", chand,
1442 calld);
1443 }
1444 return true;
1445 }
1446 }
1447 // Check status.
1448 if (status == GRPC_STATUS_OK) {
1449 grpc_server_retry_throttle_data_record_success(calld->retry_throttle_data);
1450 if (grpc_client_channel_trace.enabled()) {
1451 gpr_log(GPR_DEBUG, "chand=%p calld=%p: call succeeded", chand, calld);
1452 }
1453 return false;
1454 }
1455 // Status is not OK. Check whether the status is retryable.
1456 if (!retry_policy->retryable_status_codes.Contains(status)) {
1457 if (grpc_client_channel_trace.enabled()) {
1458 gpr_log(GPR_DEBUG,
1459 "chand=%p calld=%p: status %s not configured as retryable", chand,
1460 calld, grpc_status_code_to_string(status));
1461 }
1462 return false;
1463 }
1464 // Record the failure and check whether retries are throttled.
1465 // Note that it's important for this check to come after the status
1466 // code check above, since we should only record failures whose statuses
1467 // match the configured retryable status codes, so that we don't count
1468 // things like failures due to malformed requests (INVALID_ARGUMENT).
1469 // Conversely, it's important for this to come before the remaining
1470 // checks, so that we don't fail to record failures due to other factors.
1471 if (!grpc_server_retry_throttle_data_record_failure(
1472 calld->retry_throttle_data)) {
1473 if (grpc_client_channel_trace.enabled()) {
1474 gpr_log(GPR_DEBUG, "chand=%p calld=%p: retries throttled", chand, calld);
1475 }
1476 return false;
1477 }
1478 // Check whether the call is committed.
1479 if (calld->retry_committed) {
1480 if (grpc_client_channel_trace.enabled()) {
1481 gpr_log(GPR_DEBUG, "chand=%p calld=%p: retries already committed", chand,
1482 calld);
1483 }
1484 return false;
1485 }
1486 // Check whether we have retries remaining.
1487 ++calld->num_attempts_completed;
1488 if (calld->num_attempts_completed >= retry_policy->max_attempts) {
1489 if (grpc_client_channel_trace.enabled()) {
1490 gpr_log(GPR_DEBUG, "chand=%p calld=%p: exceeded %d retry attempts", chand,
1491 calld, retry_policy->max_attempts);
1492 }
1493 return false;
1494 }
1495 // If the call was cancelled from the surface, don't retry.
1496 if (calld->cancel_error != GRPC_ERROR_NONE) {
1497 if (grpc_client_channel_trace.enabled()) {
1498 gpr_log(GPR_DEBUG,
1499 "chand=%p calld=%p: call cancelled from surface, not retrying",
1500 chand, calld);
1501 }
1502 return false;
1503 }
1504 // Check server push-back.
1505 grpc_millis server_pushback_ms = -1;
1506 if (server_pushback_md != nullptr) {
1507 // If the value is "-1" or any other unparseable string, we do not retry.
1508 uint32_t ms;
1509 if (!grpc_parse_slice_to_uint32(GRPC_MDVALUE(*server_pushback_md), &ms)) {
1510 if (grpc_client_channel_trace.enabled()) {
1511 gpr_log(GPR_DEBUG,
1512 "chand=%p calld=%p: not retrying due to server push-back",
1513 chand, calld);
1514 }
1515 return false;
1516 } else {
1517 if (grpc_client_channel_trace.enabled()) {
1518 gpr_log(GPR_DEBUG,
1519 "chand=%p calld=%p: server push-back: retry in %u ms", chand,
1520 calld, ms);
1521 }
1522 server_pushback_ms = (grpc_millis)ms;
1523 }
1524 }
1525 do_retry(elem, retry_state, server_pushback_ms);
1526 return true;
1527}
1528
1529//
1530// subchannel_batch_data
1531//
1532
1533static subchannel_batch_data* batch_data_create(grpc_call_element* elem,
1534 int refcount) {
1535 call_data* calld = static_cast<call_data*>(elem->call_data);
1536 subchannel_call_retry_state* retry_state =
1537 static_cast<subchannel_call_retry_state*>(
1538 grpc_connected_subchannel_call_get_parent_data(
1539 calld->subchannel_call));
1540 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(
1541 gpr_arena_alloc(calld->arena, sizeof(*batch_data)));
1542 batch_data->elem = elem;
1543 batch_data->subchannel_call =
1544 GRPC_SUBCHANNEL_CALL_REF(calld->subchannel_call, "batch_data_create");
1545 batch_data->batch.payload = &retry_state->batch_payload;
1546 gpr_ref_init(&batch_data->refs, refcount);
1547 GRPC_CLOSURE_INIT(&batch_data->on_complete, on_complete, batch_data,
1548 grpc_schedule_on_exec_ctx);
1549 batch_data->batch.on_complete = &batch_data->on_complete;
1550 GRPC_CALL_STACK_REF(calld->owning_call, "batch_data");
1551 return batch_data;
1552}
1553
1554static void batch_data_unref(subchannel_batch_data* batch_data) {
1555 if (gpr_unref(&batch_data->refs)) {
1556 if (batch_data->send_initial_metadata_storage != nullptr) {
1557 grpc_metadata_batch_destroy(&batch_data->send_initial_metadata);
1558 }
1559 if (batch_data->send_trailing_metadata_storage != nullptr) {
1560 grpc_metadata_batch_destroy(&batch_data->send_trailing_metadata);
1561 }
1562 if (batch_data->batch.recv_initial_metadata) {
1563 grpc_metadata_batch_destroy(&batch_data->recv_initial_metadata);
1564 }
1565 if (batch_data->batch.recv_trailing_metadata) {
1566 grpc_metadata_batch_destroy(&batch_data->recv_trailing_metadata);
1567 }
1568 GRPC_SUBCHANNEL_CALL_UNREF(batch_data->subchannel_call, "batch_data_unref");
1569 call_data* calld = static_cast<call_data*>(batch_data->elem->call_data);
1570 GRPC_CALL_STACK_UNREF(calld->owning_call, "batch_data");
1571 }
1572}
1573
1574//
1575// recv_initial_metadata callback handling
1576//
1577
1578// Invokes recv_initial_metadata_ready for a subchannel batch.
1579static void invoke_recv_initial_metadata_callback(void* arg,
1580 grpc_error* error) {
1581 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1582 channel_data* chand =
1583 static_cast<channel_data*>(batch_data->elem->channel_data);
1584 call_data* calld = static_cast<call_data*>(batch_data->elem->call_data);
1585 // Find pending batch.
1586 pending_batch* pending = nullptr;
1587 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1588 grpc_transport_stream_op_batch* batch = calld->pending_batches[i].batch;
1589 if (batch != nullptr && batch->recv_initial_metadata &&
1590 batch->payload->recv_initial_metadata.recv_initial_metadata_ready !=
1591 nullptr) {
1592 if (grpc_client_channel_trace.enabled()) {
1593 gpr_log(GPR_DEBUG,
1594 "chand=%p calld=%p: invoking recv_initial_metadata_ready for "
1595 "pending batch at index %" PRIuPTR,
1596 chand, calld, i);
1597 }
1598 pending = &calld->pending_batches[i];
1599 break;
1600 }
1601 }
1602 GPR_ASSERT(pending != nullptr);
1603 // Return metadata.
1604 grpc_metadata_batch_move(
1605 &batch_data->recv_initial_metadata,
1606 pending->batch->payload->recv_initial_metadata.recv_initial_metadata);
1607 // Update bookkeeping.
1608 // Note: Need to do this before invoking the callback, since invoking
1609 // the callback will result in yielding the call combiner.
1610 grpc_closure* recv_initial_metadata_ready =
1611 pending->batch->payload->recv_initial_metadata
1612 .recv_initial_metadata_ready;
1613 pending->batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
1614 nullptr;
1615 maybe_clear_pending_batch(batch_data->elem, pending);
1616 batch_data_unref(batch_data);
1617 // Invoke callback.
1618 GRPC_CLOSURE_RUN(recv_initial_metadata_ready, GRPC_ERROR_REF(error));
1619}
1620
1621// Intercepts recv_initial_metadata_ready callback for retries.
1622// Commits the call and returns the initial metadata up the stack.
1623static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
1624 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1625 grpc_call_element* elem = batch_data->elem;
1626 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1627 call_data* calld = static_cast<call_data*>(elem->call_data);
1628 if (grpc_client_channel_trace.enabled()) {
1629 gpr_log(GPR_DEBUG,
1630 "chand=%p calld=%p: got recv_initial_metadata_ready, error=%s",
1631 chand, calld, grpc_error_string(error));
1632 }
1633 subchannel_call_retry_state* retry_state =
1634 static_cast<subchannel_call_retry_state*>(
1635 grpc_connected_subchannel_call_get_parent_data(
1636 batch_data->subchannel_call));
1637 // If we got an error or a Trailers-Only response and have not yet gotten
1638 // the recv_trailing_metadata on_complete callback, then defer
1639 // propagating this callback back to the surface. We can evaluate whether
1640 // to retry when recv_trailing_metadata comes back.
1641 if ((batch_data->trailing_metadata_available || error != GRPC_ERROR_NONE) &&
1642 !retry_state->completed_recv_trailing_metadata) {
1643 if (grpc_client_channel_trace.enabled()) {
1644 gpr_log(GPR_DEBUG,
1645 "chand=%p calld=%p: deferring recv_initial_metadata_ready "
1646 "(Trailers-Only)",
1647 chand, calld);
1648 }
1649 retry_state->recv_initial_metadata_ready_deferred = true;
1650 retry_state->recv_initial_metadata_error = GRPC_ERROR_REF(error);
1651 if (!retry_state->started_recv_trailing_metadata) {
1652 // recv_trailing_metadata not yet started by application; start it
1653 // ourselves to get status.
1654 start_internal_recv_trailing_metadata(elem);
1655 } else {
1656 GRPC_CALL_COMBINER_STOP(
1657 calld->call_combiner,
1658 "recv_initial_metadata_ready trailers-only or error");
1659 }
1660 return;
1661 }
1662 // Received valid initial metadata, so commit the call.
1663 retry_commit(elem, retry_state);
1664 // Manually invoking a callback function; it does not take ownership of error.
1665 invoke_recv_initial_metadata_callback(batch_data, error);
1666 GRPC_ERROR_UNREF(error);
1667}
1668
1669//
1670// recv_message callback handling
1671//
1672
1673// Invokes recv_message_ready for a subchannel batch.
1674static void invoke_recv_message_callback(void* arg, grpc_error* error) {
1675 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1676 channel_data* chand =
1677 static_cast<channel_data*>(batch_data->elem->channel_data);
1678 call_data* calld = static_cast<call_data*>(batch_data->elem->call_data);
1679 // Find pending op.
1680 pending_batch* pending = nullptr;
1681 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1682 grpc_transport_stream_op_batch* batch = calld->pending_batches[i].batch;
1683 if (batch != nullptr && batch->recv_message &&
1684 batch->payload->recv_message.recv_message_ready != nullptr) {
1685 if (grpc_client_channel_trace.enabled()) {
1686 gpr_log(GPR_DEBUG,
1687 "chand=%p calld=%p: invoking recv_message_ready for "
1688 "pending batch at index %" PRIuPTR,
1689 chand, calld, i);
1690 }
1691 pending = &calld->pending_batches[i];
1692 break;
1693 }
1694 }
1695 GPR_ASSERT(pending != nullptr);
1696 // Return payload.
1697 *pending->batch->payload->recv_message.recv_message =
Mark D. Roth3d8b32d2018-03-09 13:25:40 -08001698 std::move(batch_data->recv_message);
Mark D. Roth718c8342018-02-28 13:00:04 -08001699 // Update bookkeeping.
1700 // Note: Need to do this before invoking the callback, since invoking
1701 // the callback will result in yielding the call combiner.
1702 grpc_closure* recv_message_ready =
1703 pending->batch->payload->recv_message.recv_message_ready;
1704 pending->batch->payload->recv_message.recv_message_ready = nullptr;
1705 maybe_clear_pending_batch(batch_data->elem, pending);
1706 batch_data_unref(batch_data);
1707 // Invoke callback.
1708 GRPC_CLOSURE_RUN(recv_message_ready, GRPC_ERROR_REF(error));
1709}
1710
1711// Intercepts recv_message_ready callback for retries.
1712// Commits the call and returns the message up the stack.
1713static void recv_message_ready(void* arg, grpc_error* error) {
1714 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1715 grpc_call_element* elem = batch_data->elem;
1716 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1717 call_data* calld = static_cast<call_data*>(elem->call_data);
1718 if (grpc_client_channel_trace.enabled()) {
1719 gpr_log(GPR_DEBUG, "chand=%p calld=%p: got recv_message_ready, error=%s",
1720 chand, calld, grpc_error_string(error));
1721 }
1722 subchannel_call_retry_state* retry_state =
1723 static_cast<subchannel_call_retry_state*>(
1724 grpc_connected_subchannel_call_get_parent_data(
1725 batch_data->subchannel_call));
1726 // If we got an error or the payload was nullptr and we have not yet gotten
1727 // the recv_trailing_metadata on_complete callback, then defer
1728 // propagating this callback back to the surface. We can evaluate whether
1729 // to retry when recv_trailing_metadata comes back.
1730 if ((batch_data->recv_message == nullptr || error != GRPC_ERROR_NONE) &&
1731 !retry_state->completed_recv_trailing_metadata) {
1732 if (grpc_client_channel_trace.enabled()) {
1733 gpr_log(GPR_DEBUG,
1734 "chand=%p calld=%p: deferring recv_message_ready (nullptr "
1735 "message and recv_trailing_metadata pending)",
1736 chand, calld);
1737 }
1738 retry_state->recv_message_ready_deferred = true;
1739 retry_state->recv_message_error = GRPC_ERROR_REF(error);
1740 if (!retry_state->started_recv_trailing_metadata) {
1741 // recv_trailing_metadata not yet started by application; start it
1742 // ourselves to get status.
1743 start_internal_recv_trailing_metadata(elem);
1744 } else {
1745 GRPC_CALL_COMBINER_STOP(calld->call_combiner, "recv_message_ready null");
1746 }
1747 return;
1748 }
1749 // Received a valid message, so commit the call.
1750 retry_commit(elem, retry_state);
1751 // Manually invoking a callback function; it does not take ownership of error.
1752 invoke_recv_message_callback(batch_data, error);
1753 GRPC_ERROR_UNREF(error);
1754}
1755
1756//
1757// on_complete callback handling
1758//
1759
1760// Updates retry_state to reflect the ops completed in batch_data.
1761static void update_retry_state_for_completed_batch(
1762 subchannel_batch_data* batch_data,
1763 subchannel_call_retry_state* retry_state) {
1764 if (batch_data->batch.send_initial_metadata) {
1765 retry_state->completed_send_initial_metadata = true;
1766 }
1767 if (batch_data->batch.send_message) {
1768 ++retry_state->completed_send_message_count;
1769 }
1770 if (batch_data->batch.send_trailing_metadata) {
1771 retry_state->completed_send_trailing_metadata = true;
1772 }
1773 if (batch_data->batch.recv_initial_metadata) {
1774 retry_state->completed_recv_initial_metadata = true;
1775 }
1776 if (batch_data->batch.recv_message) {
1777 ++retry_state->completed_recv_message_count;
1778 }
1779 if (batch_data->batch.recv_trailing_metadata) {
1780 retry_state->completed_recv_trailing_metadata = true;
1781 }
1782}
1783
1784// Represents a closure that needs to run as a result of a completed batch.
1785typedef struct {
1786 grpc_closure* closure;
1787 grpc_error* error;
1788 const char* reason;
1789} closure_to_execute;
1790
1791// Adds any necessary closures for deferred recv_initial_metadata and
1792// recv_message callbacks to closures, updating *num_closures as needed.
1793static void add_closures_for_deferred_recv_callbacks(
1794 subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state,
1795 closure_to_execute* closures, size_t* num_closures) {
1796 if (batch_data->batch.recv_trailing_metadata &&
1797 retry_state->recv_initial_metadata_ready_deferred) {
1798 closure_to_execute* closure = &closures[(*num_closures)++];
1799 closure->closure =
1800 GRPC_CLOSURE_INIT(&batch_data->recv_initial_metadata_ready,
1801 invoke_recv_initial_metadata_callback, batch_data,
1802 grpc_schedule_on_exec_ctx);
1803 closure->error = retry_state->recv_initial_metadata_error;
1804 closure->reason = "resuming recv_initial_metadata_ready";
1805 }
1806 if (batch_data->batch.recv_trailing_metadata &&
1807 retry_state->recv_message_ready_deferred) {
1808 closure_to_execute* closure = &closures[(*num_closures)++];
1809 closure->closure = GRPC_CLOSURE_INIT(&batch_data->recv_message_ready,
1810 invoke_recv_message_callback,
1811 batch_data, grpc_schedule_on_exec_ctx);
1812 closure->error = retry_state->recv_message_error;
1813 closure->reason = "resuming recv_message_ready";
1814 }
1815}
1816
1817// If there are any cached ops to replay or pending ops to start on the
1818// subchannel call, adds a closure to closures to invoke
1819// start_retriable_subchannel_batches(), updating *num_closures as needed.
1820static void add_closures_for_replay_or_pending_send_ops(
1821 grpc_call_element* elem, subchannel_batch_data* batch_data,
1822 subchannel_call_retry_state* retry_state, closure_to_execute* closures,
1823 size_t* num_closures) {
1824 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1825 call_data* calld = static_cast<call_data*>(elem->call_data);
1826 bool have_pending_send_message_ops =
Mark D. Rothefcd45b2018-03-28 10:49:59 -07001827 retry_state->started_send_message_count < calld->send_messages->size();
Mark D. Roth718c8342018-02-28 13:00:04 -08001828 bool have_pending_send_trailing_metadata_op =
1829 calld->seen_send_trailing_metadata &&
1830 !retry_state->started_send_trailing_metadata;
1831 if (!have_pending_send_message_ops &&
1832 !have_pending_send_trailing_metadata_op) {
1833 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1834 pending_batch* pending = &calld->pending_batches[i];
1835 grpc_transport_stream_op_batch* batch = pending->batch;
1836 if (batch == nullptr || pending->send_ops_cached) continue;
1837 if (batch->send_message) have_pending_send_message_ops = true;
1838 if (batch->send_trailing_metadata) {
1839 have_pending_send_trailing_metadata_op = true;
1840 }
1841 }
1842 }
1843 if (have_pending_send_message_ops || have_pending_send_trailing_metadata_op) {
1844 if (grpc_client_channel_trace.enabled()) {
1845 gpr_log(GPR_DEBUG,
1846 "chand=%p calld=%p: starting next batch for pending send op(s)",
1847 chand, calld);
1848 }
1849 closure_to_execute* closure = &closures[(*num_closures)++];
1850 closure->closure = GRPC_CLOSURE_INIT(
1851 &batch_data->batch.handler_private.closure,
1852 start_retriable_subchannel_batches, elem, grpc_schedule_on_exec_ctx);
1853 closure->error = GRPC_ERROR_NONE;
1854 closure->reason = "starting next batch for send_* op(s)";
1855 }
1856}
1857
1858// For any pending batch completed in batch_data, adds the necessary
1859// completion closures to closures, updating *num_closures as needed.
1860static void add_closures_for_completed_pending_batches(
1861 grpc_call_element* elem, subchannel_batch_data* batch_data,
1862 subchannel_call_retry_state* retry_state, grpc_error* error,
1863 closure_to_execute* closures, size_t* num_closures) {
1864 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1865 call_data* calld = static_cast<call_data*>(elem->call_data);
1866 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1867 pending_batch* pending = &calld->pending_batches[i];
1868 if (pending_batch_is_completed(pending, calld, retry_state)) {
1869 if (grpc_client_channel_trace.enabled()) {
1870 gpr_log(GPR_DEBUG,
1871 "chand=%p calld=%p: pending batch completed at index %" PRIuPTR,
1872 chand, calld, i);
1873 }
1874 // Copy the trailing metadata to return it to the surface.
1875 if (batch_data->batch.recv_trailing_metadata) {
1876 grpc_metadata_batch_move(&batch_data->recv_trailing_metadata,
1877 pending->batch->payload->recv_trailing_metadata
1878 .recv_trailing_metadata);
1879 }
1880 closure_to_execute* closure = &closures[(*num_closures)++];
1881 closure->closure = pending->batch->on_complete;
1882 closure->error = GRPC_ERROR_REF(error);
1883 closure->reason = "on_complete for pending batch";
1884 pending->batch->on_complete = nullptr;
1885 maybe_clear_pending_batch(elem, pending);
1886 }
1887 }
1888 GRPC_ERROR_UNREF(error);
1889}
1890
1891// For any pending batch containing an op that has not yet been started,
1892// adds the pending batch's completion closures to closures, updating
1893// *num_closures as needed.
1894static void add_closures_to_fail_unstarted_pending_batches(
1895 grpc_call_element* elem, subchannel_call_retry_state* retry_state,
1896 grpc_error* error, closure_to_execute* closures, size_t* num_closures) {
1897 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1898 call_data* calld = static_cast<call_data*>(elem->call_data);
1899 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1900 pending_batch* pending = &calld->pending_batches[i];
1901 if (pending_batch_is_unstarted(pending, calld, retry_state)) {
1902 if (grpc_client_channel_trace.enabled()) {
1903 gpr_log(GPR_DEBUG,
1904 "chand=%p calld=%p: failing unstarted pending batch at index "
1905 "%" PRIuPTR,
1906 chand, calld, i);
1907 }
1908 if (pending->batch->recv_initial_metadata) {
1909 closure_to_execute* closure = &closures[(*num_closures)++];
1910 closure->closure = pending->batch->payload->recv_initial_metadata
1911 .recv_initial_metadata_ready;
1912 closure->error = GRPC_ERROR_REF(error);
1913 closure->reason =
1914 "failing recv_initial_metadata_ready for pending batch";
1915 pending->batch->payload->recv_initial_metadata
1916 .recv_initial_metadata_ready = nullptr;
1917 }
1918 if (pending->batch->recv_message) {
1919 *pending->batch->payload->recv_message.recv_message = nullptr;
1920 closure_to_execute* closure = &closures[(*num_closures)++];
1921 closure->closure =
1922 pending->batch->payload->recv_message.recv_message_ready;
1923 closure->error = GRPC_ERROR_REF(error);
1924 closure->reason = "failing recv_message_ready for pending batch";
1925 pending->batch->payload->recv_message.recv_message_ready = nullptr;
1926 }
1927 closure_to_execute* closure = &closures[(*num_closures)++];
1928 closure->closure = pending->batch->on_complete;
1929 closure->error = GRPC_ERROR_REF(error);
1930 closure->reason = "failing on_complete for pending batch";
1931 pending->batch->on_complete = nullptr;
1932 maybe_clear_pending_batch(elem, pending);
1933 }
1934 }
1935 GRPC_ERROR_UNREF(error);
1936}
1937
1938// Callback used to intercept on_complete from subchannel calls.
1939// Called only when retries are enabled.
1940static void on_complete(void* arg, grpc_error* error) {
1941 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1942 grpc_call_element* elem = batch_data->elem;
1943 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1944 call_data* calld = static_cast<call_data*>(elem->call_data);
1945 if (grpc_client_channel_trace.enabled()) {
1946 char* batch_str = grpc_transport_stream_op_batch_string(&batch_data->batch);
1947 gpr_log(GPR_DEBUG, "chand=%p calld=%p: got on_complete, error=%s, batch=%s",
1948 chand, calld, grpc_error_string(error), batch_str);
1949 gpr_free(batch_str);
1950 }
1951 subchannel_call_retry_state* retry_state =
1952 static_cast<subchannel_call_retry_state*>(
1953 grpc_connected_subchannel_call_get_parent_data(
1954 batch_data->subchannel_call));
1955 // If we have previously completed recv_trailing_metadata, then the
1956 // call is finished.
1957 bool call_finished = retry_state->completed_recv_trailing_metadata;
1958 // Update bookkeeping in retry_state.
1959 update_retry_state_for_completed_batch(batch_data, retry_state);
1960 if (call_finished) {
1961 if (grpc_client_channel_trace.enabled()) {
1962 gpr_log(GPR_DEBUG, "chand=%p calld=%p: call already finished", chand,
1963 calld);
1964 }
1965 } else {
1966 // Check if this batch finished the call, and if so, get its status.
1967 // The call is finished if either (a) this callback was invoked with
1968 // an error or (b) we receive status.
1969 grpc_status_code status = GRPC_STATUS_OK;
1970 grpc_mdelem* server_pushback_md = nullptr;
1971 if (error != GRPC_ERROR_NONE) { // Case (a).
1972 call_finished = true;
1973 grpc_error_get_status(error, calld->deadline, &status, nullptr, nullptr,
1974 nullptr);
1975 } else if (batch_data->batch.recv_trailing_metadata) { // Case (b).
1976 call_finished = true;
1977 grpc_metadata_batch* md_batch =
1978 batch_data->batch.payload->recv_trailing_metadata
1979 .recv_trailing_metadata;
1980 GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
1981 status = grpc_get_status_code_from_metadata(
1982 md_batch->idx.named.grpc_status->md);
1983 if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
1984 server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
1985 }
1986 } else if (retry_state->completed_recv_trailing_metadata) {
1987 call_finished = true;
1988 }
1989 if (call_finished && grpc_client_channel_trace.enabled()) {
1990 gpr_log(GPR_DEBUG, "chand=%p calld=%p: call finished, status=%s", chand,
1991 calld, grpc_status_code_to_string(status));
1992 }
1993 // If the call is finished, check if we should retry.
1994 if (call_finished &&
1995 maybe_retry(elem, batch_data, status, server_pushback_md)) {
1996 // Unref batch_data for deferred recv_initial_metadata_ready or
1997 // recv_message_ready callbacks, if any.
1998 if (batch_data->batch.recv_trailing_metadata &&
1999 retry_state->recv_initial_metadata_ready_deferred) {
2000 batch_data_unref(batch_data);
2001 GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
2002 }
2003 if (batch_data->batch.recv_trailing_metadata &&
2004 retry_state->recv_message_ready_deferred) {
2005 batch_data_unref(batch_data);
2006 GRPC_ERROR_UNREF(retry_state->recv_message_error);
2007 }
2008 batch_data_unref(batch_data);
2009 return;
2010 }
2011 }
2012 // If the call is finished or retries are committed, free cached data for
2013 // send ops that we've just completed.
2014 if (call_finished || calld->retry_committed) {
2015 free_cached_send_op_data_for_completed_batch(elem, batch_data, retry_state);
2016 }
2017 // Call not being retried.
2018 // Construct list of closures to execute.
2019 // Max number of closures is number of pending batches plus one for
2020 // each of:
2021 // - recv_initial_metadata_ready (either deferred or unstarted)
2022 // - recv_message_ready (either deferred or unstarted)
2023 // - starting a new batch for pending send ops
2024 closure_to_execute closures[GPR_ARRAY_SIZE(calld->pending_batches) + 3];
2025 size_t num_closures = 0;
2026 // If there are deferred recv_initial_metadata_ready or recv_message_ready
2027 // callbacks, add them to closures.
2028 add_closures_for_deferred_recv_callbacks(batch_data, retry_state, closures,
2029 &num_closures);
2030 // Find pending batches whose ops are now complete and add their
2031 // on_complete callbacks to closures.
2032 add_closures_for_completed_pending_batches(elem, batch_data, retry_state,
2033 GRPC_ERROR_REF(error), closures,
2034 &num_closures);
2035 // Add closures to handle any pending batches that have not yet been started.
2036 // If the call is finished, we fail these batches; otherwise, we add a
2037 // callback to start_retriable_subchannel_batches() to start them on
2038 // the subchannel call.
2039 if (call_finished) {
2040 add_closures_to_fail_unstarted_pending_batches(
2041 elem, retry_state, GRPC_ERROR_REF(error), closures, &num_closures);
2042 } else {
2043 add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state,
2044 closures, &num_closures);
2045 }
2046 // Don't need batch_data anymore.
2047 batch_data_unref(batch_data);
2048 // Schedule all of the closures identified above.
2049 // Note that the call combiner will be yielded for each closure that
2050 // we schedule. We're already running in the call combiner, so one of
2051 // the closures can be scheduled directly, but the others will
2052 // have to re-enter the call combiner.
2053 if (num_closures > 0) {
2054 GRPC_CLOSURE_SCHED(closures[0].closure, closures[0].error);
2055 for (size_t i = 1; i < num_closures; ++i) {
2056 GRPC_CALL_COMBINER_START(calld->call_combiner, closures[i].closure,
2057 closures[i].error, closures[i].reason);
2058 }
2059 } else {
2060 GRPC_CALL_COMBINER_STOP(calld->call_combiner,
2061 "no closures to run for on_complete");
2062 }
2063}
2064
2065//
2066// subchannel batch construction
2067//
2068
2069// Helper function used to start a subchannel batch in the call combiner.
2070static void start_batch_in_call_combiner(void* arg, grpc_error* ignored) {
2071 grpc_transport_stream_op_batch* batch =
2072 static_cast<grpc_transport_stream_op_batch*>(arg);
2073 grpc_subchannel_call* subchannel_call =
2074 static_cast<grpc_subchannel_call*>(batch->handler_private.extra_arg);
2075 // Note: This will release the call combiner.
2076 grpc_subchannel_call_process_op(subchannel_call, batch);
2077}
2078
2079// Adds retriable send_initial_metadata op to batch_data.
2080static void add_retriable_send_initial_metadata_op(
2081 call_data* calld, subchannel_call_retry_state* retry_state,
2082 subchannel_batch_data* batch_data) {
2083 // Maps the number of retries to the corresponding metadata value slice.
2084 static const grpc_slice* retry_count_strings[] = {
2085 &GRPC_MDSTR_1, &GRPC_MDSTR_2, &GRPC_MDSTR_3, &GRPC_MDSTR_4};
2086 // We need to make a copy of the metadata batch for each attempt, since
2087 // the filters in the subchannel stack may modify this batch, and we don't
2088 // want those modifications to be passed forward to subsequent attempts.
2089 //
2090 // If we've already completed one or more attempts, add the
2091 // grpc-retry-attempts header.
2092 batch_data->send_initial_metadata_storage =
2093 static_cast<grpc_linked_mdelem*>(gpr_arena_alloc(
2094 calld->arena, sizeof(grpc_linked_mdelem) *
2095 (calld->send_initial_metadata.list.count +
2096 (calld->num_attempts_completed > 0))));
2097 grpc_metadata_batch_copy(&calld->send_initial_metadata,
2098 &batch_data->send_initial_metadata,
2099 batch_data->send_initial_metadata_storage);
2100 if (batch_data->send_initial_metadata.idx.named.grpc_previous_rpc_attempts !=
2101 nullptr) {
2102 grpc_metadata_batch_remove(
2103 &batch_data->send_initial_metadata,
2104 batch_data->send_initial_metadata.idx.named.grpc_previous_rpc_attempts);
2105 }
2106 if (calld->num_attempts_completed > 0) {
2107 grpc_mdelem retry_md = grpc_mdelem_from_slices(
2108 GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS,
2109 *retry_count_strings[calld->num_attempts_completed - 1]);
2110 grpc_error* error = grpc_metadata_batch_add_tail(
2111 &batch_data->send_initial_metadata,
2112 &batch_data->send_initial_metadata_storage[calld->send_initial_metadata
2113 .list.count],
2114 retry_md);
2115 if (error != GRPC_ERROR_NONE) {
2116 gpr_log(GPR_ERROR, "error adding retry metadata: %s",
2117 grpc_error_string(error));
2118 GPR_ASSERT(false);
2119 }
2120 }
2121 retry_state->started_send_initial_metadata = true;
2122 batch_data->batch.send_initial_metadata = true;
2123 batch_data->batch.payload->send_initial_metadata.send_initial_metadata =
2124 &batch_data->send_initial_metadata;
2125 batch_data->batch.payload->send_initial_metadata.send_initial_metadata_flags =
2126 calld->send_initial_metadata_flags;
2127 batch_data->batch.payload->send_initial_metadata.peer_string =
2128 calld->peer_string;
2129}
2130
2131// Adds retriable send_message op to batch_data.
2132static void add_retriable_send_message_op(
2133 grpc_call_element* elem, subchannel_call_retry_state* retry_state,
2134 subchannel_batch_data* batch_data) {
2135 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2136 call_data* calld = static_cast<call_data*>(elem->call_data);
2137 if (grpc_client_channel_trace.enabled()) {
2138 gpr_log(GPR_DEBUG,
2139 "chand=%p calld=%p: starting calld->send_messages[%" PRIuPTR "]",
2140 chand, calld, retry_state->started_send_message_count);
2141 }
Mark D. Roth3d8b32d2018-03-09 13:25:40 -08002142 grpc_core::ByteStreamCache* cache =
Mark D. Rothefcd45b2018-03-28 10:49:59 -07002143 (*calld->send_messages)[retry_state->started_send_message_count];
Mark D. Roth718c8342018-02-28 13:00:04 -08002144 ++retry_state->started_send_message_count;
Mark D. Roth3d8b32d2018-03-09 13:25:40 -08002145 batch_data->send_message.Init(cache);
Mark D. Roth718c8342018-02-28 13:00:04 -08002146 batch_data->batch.send_message = true;
Mark D. Roth3d8b32d2018-03-09 13:25:40 -08002147 batch_data->batch.payload->send_message.send_message.reset(
2148 batch_data->send_message.get());
Mark D. Roth718c8342018-02-28 13:00:04 -08002149}
2150
2151// Adds retriable send_trailing_metadata op to batch_data.
2152static void add_retriable_send_trailing_metadata_op(
2153 call_data* calld, subchannel_call_retry_state* retry_state,
2154 subchannel_batch_data* batch_data) {
2155 // We need to make a copy of the metadata batch for each attempt, since
2156 // the filters in the subchannel stack may modify this batch, and we don't
2157 // want those modifications to be passed forward to subsequent attempts.
2158 batch_data->send_trailing_metadata_storage =
2159 static_cast<grpc_linked_mdelem*>(gpr_arena_alloc(
2160 calld->arena, sizeof(grpc_linked_mdelem) *
2161 calld->send_trailing_metadata.list.count));
2162 grpc_metadata_batch_copy(&calld->send_trailing_metadata,
2163 &batch_data->send_trailing_metadata,
2164 batch_data->send_trailing_metadata_storage);
2165 retry_state->started_send_trailing_metadata = true;
2166 batch_data->batch.send_trailing_metadata = true;
2167 batch_data->batch.payload->send_trailing_metadata.send_trailing_metadata =
2168 &batch_data->send_trailing_metadata;
2169}
2170
2171// Adds retriable recv_initial_metadata op to batch_data.
2172static void add_retriable_recv_initial_metadata_op(
2173 call_data* calld, subchannel_call_retry_state* retry_state,
2174 subchannel_batch_data* batch_data) {
2175 retry_state->started_recv_initial_metadata = true;
2176 batch_data->batch.recv_initial_metadata = true;
2177 grpc_metadata_batch_init(&batch_data->recv_initial_metadata);
2178 batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata =
2179 &batch_data->recv_initial_metadata;
2180 batch_data->batch.payload->recv_initial_metadata.trailing_metadata_available =
2181 &batch_data->trailing_metadata_available;
2182 GRPC_CLOSURE_INIT(&batch_data->recv_initial_metadata_ready,
2183 recv_initial_metadata_ready, batch_data,
2184 grpc_schedule_on_exec_ctx);
2185 batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata_ready =
2186 &batch_data->recv_initial_metadata_ready;
2187}
2188
2189// Adds retriable recv_message op to batch_data.
2190static void add_retriable_recv_message_op(
2191 call_data* calld, subchannel_call_retry_state* retry_state,
2192 subchannel_batch_data* batch_data) {
2193 ++retry_state->started_recv_message_count;
2194 batch_data->batch.recv_message = true;
2195 batch_data->batch.payload->recv_message.recv_message =
2196 &batch_data->recv_message;
2197 GRPC_CLOSURE_INIT(&batch_data->recv_message_ready, recv_message_ready,
2198 batch_data, grpc_schedule_on_exec_ctx);
2199 batch_data->batch.payload->recv_message.recv_message_ready =
2200 &batch_data->recv_message_ready;
2201}
2202
2203// Adds retriable recv_trailing_metadata op to batch_data.
2204static void add_retriable_recv_trailing_metadata_op(
2205 call_data* calld, subchannel_call_retry_state* retry_state,
2206 subchannel_batch_data* batch_data) {
2207 retry_state->started_recv_trailing_metadata = true;
2208 batch_data->batch.recv_trailing_metadata = true;
2209 grpc_metadata_batch_init(&batch_data->recv_trailing_metadata);
2210 batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata =
2211 &batch_data->recv_trailing_metadata;
2212 batch_data->batch.collect_stats = true;
2213 batch_data->batch.payload->collect_stats.collect_stats =
2214 &batch_data->collect_stats;
2215}
2216
2217// Helper function used to start a recv_trailing_metadata batch. This
2218// is used in the case where a recv_initial_metadata or recv_message
2219// op fails in a way that we know the call is over but when the application
2220// has not yet started its own recv_trailing_metadata op.
2221static void start_internal_recv_trailing_metadata(grpc_call_element* elem) {
2222 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2223 call_data* calld = static_cast<call_data*>(elem->call_data);
2224 if (grpc_client_channel_trace.enabled()) {
2225 gpr_log(GPR_DEBUG,
2226 "chand=%p calld=%p: call failed but recv_trailing_metadata not "
2227 "started; starting it internally",
2228 chand, calld);
2229 }
2230 subchannel_call_retry_state* retry_state =
2231 static_cast<subchannel_call_retry_state*>(
2232 grpc_connected_subchannel_call_get_parent_data(
2233 calld->subchannel_call));
2234 subchannel_batch_data* batch_data = batch_data_create(elem, 1);
2235 add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
2236 // Note: This will release the call combiner.
2237 grpc_subchannel_call_process_op(calld->subchannel_call, &batch_data->batch);
2238}
2239
2240// If there are any cached send ops that need to be replayed on the
2241// current subchannel call, creates and returns a new subchannel batch
2242// to replay those ops. Otherwise, returns nullptr.
2243static subchannel_batch_data* maybe_create_subchannel_batch_for_replay(
2244 grpc_call_element* elem, subchannel_call_retry_state* retry_state) {
2245 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2246 call_data* calld = static_cast<call_data*>(elem->call_data);
2247 subchannel_batch_data* replay_batch_data = nullptr;
2248 // send_initial_metadata.
2249 if (calld->seen_send_initial_metadata &&
2250 !retry_state->started_send_initial_metadata &&
2251 !calld->pending_send_initial_metadata) {
2252 if (grpc_client_channel_trace.enabled()) {
2253 gpr_log(GPR_DEBUG,
2254 "chand=%p calld=%p: replaying previously completed "
2255 "send_initial_metadata op",
2256 chand, calld);
2257 }
2258 replay_batch_data = batch_data_create(elem, 1);
2259 add_retriable_send_initial_metadata_op(calld, retry_state,
2260 replay_batch_data);
2261 }
2262 // send_message.
2263 // Note that we can only have one send_message op in flight at a time.
Mark D. Rothefcd45b2018-03-28 10:49:59 -07002264 if (retry_state->started_send_message_count < calld->send_messages->size() &&
Mark D. Roth718c8342018-02-28 13:00:04 -08002265 retry_state->started_send_message_count ==
2266 retry_state->completed_send_message_count &&
2267 !calld->pending_send_message) {
2268 if (grpc_client_channel_trace.enabled()) {
2269 gpr_log(GPR_DEBUG,
2270 "chand=%p calld=%p: replaying previously completed "
2271 "send_message op",
2272 chand, calld);
2273 }
2274 if (replay_batch_data == nullptr) {
2275 replay_batch_data = batch_data_create(elem, 1);
2276 }
2277 add_retriable_send_message_op(elem, retry_state, replay_batch_data);
2278 }
2279 // send_trailing_metadata.
2280 // Note that we only add this op if we have no more send_message ops
2281 // to start, since we can't send down any more send_message ops after
2282 // send_trailing_metadata.
2283 if (calld->seen_send_trailing_metadata &&
Mark D. Rothefcd45b2018-03-28 10:49:59 -07002284 retry_state->started_send_message_count == calld->send_messages->size() &&
Mark D. Roth718c8342018-02-28 13:00:04 -08002285 !retry_state->started_send_trailing_metadata &&
2286 !calld->pending_send_trailing_metadata) {
2287 if (grpc_client_channel_trace.enabled()) {
2288 gpr_log(GPR_DEBUG,
2289 "chand=%p calld=%p: replaying previously completed "
2290 "send_trailing_metadata op",
2291 chand, calld);
2292 }
2293 if (replay_batch_data == nullptr) {
2294 replay_batch_data = batch_data_create(elem, 1);
2295 }
2296 add_retriable_send_trailing_metadata_op(calld, retry_state,
2297 replay_batch_data);
2298 }
2299 return replay_batch_data;
2300}
2301
2302// Adds subchannel batches for pending batches to batches, updating
2303// *num_batches as needed.
2304static void add_subchannel_batches_for_pending_batches(
2305 grpc_call_element* elem, subchannel_call_retry_state* retry_state,
2306 grpc_transport_stream_op_batch** batches, size_t* num_batches) {
2307 call_data* calld = static_cast<call_data*>(elem->call_data);
2308 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
2309 pending_batch* pending = &calld->pending_batches[i];
2310 grpc_transport_stream_op_batch* batch = pending->batch;
2311 if (batch == nullptr) continue;
2312 // Skip any batch that either (a) has already been started on this
2313 // subchannel call or (b) we can't start yet because we're still
2314 // replaying send ops that need to be completed first.
2315 // TODO(roth): Note that if any one op in the batch can't be sent
2316 // yet due to ops that we're replaying, we don't start any of the ops
2317 // in the batch. This is probably okay, but it could conceivably
2318 // lead to increased latency in some cases -- e.g., we could delay
2319 // starting a recv op due to it being in the same batch with a send
2320 // op. If/when we revamp the callback protocol in
2321 // transport_stream_op_batch, we may be able to fix this.
2322 if (batch->send_initial_metadata &&
2323 retry_state->started_send_initial_metadata) {
2324 continue;
2325 }
2326 if (batch->send_message && retry_state->completed_send_message_count <
2327 retry_state->started_send_message_count) {
2328 continue;
2329 }
2330 // Note that we only start send_trailing_metadata if we have no more
2331 // send_message ops to start, since we can't send down any more
2332 // send_message ops after send_trailing_metadata.
2333 if (batch->send_trailing_metadata &&
2334 (retry_state->started_send_message_count + batch->send_message <
Mark D. Rothefcd45b2018-03-28 10:49:59 -07002335 calld->send_messages->size() ||
Mark D. Roth718c8342018-02-28 13:00:04 -08002336 retry_state->started_send_trailing_metadata)) {
2337 continue;
2338 }
2339 if (batch->recv_initial_metadata &&
2340 retry_state->started_recv_initial_metadata) {
2341 continue;
2342 }
2343 if (batch->recv_message && retry_state->completed_recv_message_count <
2344 retry_state->started_recv_message_count) {
2345 continue;
2346 }
2347 if (batch->recv_trailing_metadata &&
2348 retry_state->started_recv_trailing_metadata) {
2349 continue;
2350 }
2351 // If we're not retrying, just send the batch as-is.
2352 if (calld->method_params == nullptr ||
2353 calld->method_params->retry_policy() == nullptr ||
2354 calld->retry_committed) {
2355 batches[(*num_batches)++] = batch;
2356 pending_batch_clear(calld, pending);
2357 continue;
2358 }
2359 // Create batch with the right number of callbacks.
2360 const int num_callbacks =
2361 1 + batch->recv_initial_metadata + batch->recv_message;
2362 subchannel_batch_data* batch_data = batch_data_create(elem, num_callbacks);
2363 // Cache send ops if needed.
2364 maybe_cache_send_ops_for_batch(calld, pending);
2365 // send_initial_metadata.
2366 if (batch->send_initial_metadata) {
2367 add_retriable_send_initial_metadata_op(calld, retry_state, batch_data);
2368 }
2369 // send_message.
2370 if (batch->send_message) {
2371 add_retriable_send_message_op(elem, retry_state, batch_data);
2372 }
2373 // send_trailing_metadata.
2374 if (batch->send_trailing_metadata) {
2375 add_retriable_send_trailing_metadata_op(calld, retry_state, batch_data);
2376 }
2377 // recv_initial_metadata.
2378 if (batch->recv_initial_metadata) {
2379 // recv_flags is only used on the server side.
2380 GPR_ASSERT(batch->payload->recv_initial_metadata.recv_flags == nullptr);
2381 add_retriable_recv_initial_metadata_op(calld, retry_state, batch_data);
2382 }
2383 // recv_message.
2384 if (batch->recv_message) {
2385 add_retriable_recv_message_op(calld, retry_state, batch_data);
2386 }
2387 // recv_trailing_metadata.
2388 if (batch->recv_trailing_metadata) {
2389 GPR_ASSERT(batch->collect_stats);
2390 add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
2391 }
2392 batches[(*num_batches)++] = &batch_data->batch;
2393 }
2394}
2395
2396// Constructs and starts whatever subchannel batches are needed on the
2397// subchannel call.
2398static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) {
2399 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2400 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2401 call_data* calld = static_cast<call_data*>(elem->call_data);
2402 if (grpc_client_channel_trace.enabled()) {
2403 gpr_log(GPR_DEBUG, "chand=%p calld=%p: constructing retriable batches",
2404 chand, calld);
2405 }
2406 subchannel_call_retry_state* retry_state =
2407 static_cast<subchannel_call_retry_state*>(
2408 grpc_connected_subchannel_call_get_parent_data(
2409 calld->subchannel_call));
2410 // We can start up to 6 batches.
2411 grpc_transport_stream_op_batch*
2412 batches[GPR_ARRAY_SIZE(calld->pending_batches)];
2413 size_t num_batches = 0;
2414 // Replay previously-returned send_* ops if needed.
2415 subchannel_batch_data* replay_batch_data =
2416 maybe_create_subchannel_batch_for_replay(elem, retry_state);
2417 if (replay_batch_data != nullptr) {
2418 batches[num_batches++] = &replay_batch_data->batch;
2419 }
2420 // Now add pending batches.
2421 add_subchannel_batches_for_pending_batches(elem, retry_state, batches,
2422 &num_batches);
2423 // Start batches on subchannel call.
2424 // Note that the call combiner will be yielded for each batch that we
2425 // send down. We're already running in the call combiner, so one of
2426 // the batches can be started directly, but the others will have to
2427 // re-enter the call combiner.
2428 if (grpc_client_channel_trace.enabled()) {
2429 gpr_log(GPR_DEBUG,
2430 "chand=%p calld=%p: starting %" PRIuPTR
2431 " retriable batches on subchannel_call=%p",
2432 chand, calld, num_batches, calld->subchannel_call);
2433 }
2434 if (num_batches == 0) {
2435 // This should be fairly rare, but it can happen when (e.g.) an
2436 // attempt completes before it has finished replaying all
2437 // previously sent messages.
2438 GRPC_CALL_COMBINER_STOP(calld->call_combiner,
2439 "no retriable subchannel batches to start");
2440 } else {
2441 for (size_t i = 1; i < num_batches; ++i) {
2442 if (grpc_client_channel_trace.enabled()) {
2443 char* batch_str = grpc_transport_stream_op_batch_string(batches[i]);
2444 gpr_log(GPR_DEBUG,
2445 "chand=%p calld=%p: starting batch in call combiner: %s", chand,
2446 calld, batch_str);
2447 gpr_free(batch_str);
2448 }
2449 batches[i]->handler_private.extra_arg = calld->subchannel_call;
2450 GRPC_CLOSURE_INIT(&batches[i]->handler_private.closure,
2451 start_batch_in_call_combiner, batches[i],
2452 grpc_schedule_on_exec_ctx);
2453 GRPC_CALL_COMBINER_START(calld->call_combiner,
2454 &batches[i]->handler_private.closure,
2455 GRPC_ERROR_NONE, "start_subchannel_batch");
2456 }
2457 if (grpc_client_channel_trace.enabled()) {
2458 char* batch_str = grpc_transport_stream_op_batch_string(batches[0]);
2459 gpr_log(GPR_DEBUG, "chand=%p calld=%p: starting batch: %s", chand, calld,
2460 batch_str);
2461 gpr_free(batch_str);
2462 }
2463 // Note: This will release the call combiner.
2464 grpc_subchannel_call_process_op(calld->subchannel_call, batches[0]);
2465 }
2466}
2467
2468//
2469// LB pick
2470//
2471
2472static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) {
2473 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2474 call_data* calld = static_cast<call_data*>(elem->call_data);
2475 const size_t parent_data_size =
2476 calld->enable_retries ? sizeof(subchannel_call_retry_state) : 0;
David Garcia Quintasbaf1ac72018-01-09 14:24:32 -08002477 const grpc_core::ConnectedSubchannel::CallArgs call_args = {
Mark D. Rothc0febd32018-01-09 10:25:24 -08002478 calld->pollent, // pollent
2479 calld->path, // path
2480 calld->call_start_time, // start_time
2481 calld->deadline, // deadline
2482 calld->arena, // arena
2483 calld->pick.subchannel_call_context, // context
Mark D. Roth718c8342018-02-28 13:00:04 -08002484 calld->call_combiner, // call_combiner
2485 parent_data_size // parent_data_size
Yash Tibrewald8b84a22017-09-25 13:38:03 -07002486 };
David Garcia Quintas70fbe622018-01-09 19:27:46 -08002487 grpc_error* new_error = calld->pick.connected_subchannel->CreateCall(
David Garcia Quintasbaf1ac72018-01-09 14:24:32 -08002488 call_args, &calld->subchannel_call);
Craig Tiller6014e8a2017-10-16 13:50:29 -07002489 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth60751fe2017-07-07 12:50:33 -07002490 gpr_log(GPR_DEBUG, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
Mark D. Roth76e264b2017-08-25 09:03:33 -07002491 chand, calld, calld->subchannel_call, grpc_error_string(new_error));
Mark D. Roth60751fe2017-07-07 12:50:33 -07002492 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -07002493 if (new_error != GRPC_ERROR_NONE) {
2494 new_error = grpc_error_add_child(new_error, error);
Mark D. Roth718c8342018-02-28 13:00:04 -08002495 pending_batches_fail(elem, new_error, true /* yield_call_combiner */);
Mark D. Roth0ca0be82017-06-20 07:49:33 -07002496 } else {
Mark D. Roth718c8342018-02-28 13:00:04 -08002497 if (parent_data_size > 0) {
2498 subchannel_call_retry_state* retry_state =
2499 static_cast<subchannel_call_retry_state*>(
2500 grpc_connected_subchannel_call_get_parent_data(
2501 calld->subchannel_call));
2502 retry_state->batch_payload.context = calld->pick.subchannel_call_context;
2503 }
2504 pending_batches_resume(elem);
Craig Tiller11c17d42017-03-13 13:36:34 -07002505 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -07002506 GRPC_ERROR_UNREF(error);
Craig Tiller11c17d42017-03-13 13:36:34 -07002507}
2508
Mark D. Rothb2929602017-09-11 09:31:11 -07002509// Invoked when a pick is completed, on both success or failure.
Mark D. Roth718c8342018-02-28 13:00:04 -08002510static void pick_done(void* arg, grpc_error* error) {
2511 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
Noah Eisenbe82e642018-02-09 09:16:55 -08002512 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Mark D. Roth718c8342018-02-28 13:00:04 -08002513 call_data* calld = static_cast<call_data*>(elem->call_data);
David Garcia Quintasbe1b7f92018-01-12 14:01:38 -08002514 if (calld->pick.connected_subchannel == nullptr) {
Mark D. Roth0ca0be82017-06-20 07:49:33 -07002515 // Failed to create subchannel.
Mark D. Roth718c8342018-02-28 13:00:04 -08002516 // If there was no error, this is an LB policy drop, in which case
2517 // we return an error; otherwise, we may retry.
2518 grpc_status_code status = GRPC_STATUS_OK;
2519 grpc_error_get_status(error, calld->deadline, &status, nullptr, nullptr,
2520 nullptr);
2521 if (error == GRPC_ERROR_NONE || !calld->enable_retries ||
2522 !maybe_retry(elem, nullptr /* batch_data */, status,
2523 nullptr /* server_pushback_md */)) {
2524 grpc_error* new_error =
2525 error == GRPC_ERROR_NONE
2526 ? GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2527 "Call dropped by load balancing policy")
2528 : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2529 "Failed to create subchannel", &error, 1);
2530 if (grpc_client_channel_trace.enabled()) {
2531 gpr_log(GPR_DEBUG,
2532 "chand=%p calld=%p: failed to create subchannel: error=%s",
2533 chand, calld, grpc_error_string(new_error));
2534 }
2535 pending_batches_fail(elem, new_error, true /* yield_call_combiner */);
Mark D. Roth60751fe2017-07-07 12:50:33 -07002536 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -07002537 } else {
Mark D. Roth9fe284e2016-09-12 11:22:27 -07002538 /* Create call on subchannel. */
Mark D. Roth718c8342018-02-28 13:00:04 -08002539 create_subchannel_call(elem, GRPC_ERROR_REF(error));
Mark D. Roth2a5959f2016-09-01 08:20:27 -07002540 }
Mark D. Roth718c8342018-02-28 13:00:04 -08002541}
2542
2543// Invoked when a pick is completed to leave the client_channel combiner
2544// and continue processing in the call combiner.
2545static void pick_done_locked(grpc_call_element* elem, grpc_error* error) {
2546 call_data* calld = static_cast<call_data*>(elem->call_data);
2547 GRPC_CLOSURE_INIT(&calld->pick_closure, pick_done, elem,
2548 grpc_schedule_on_exec_ctx);
2549 GRPC_CLOSURE_SCHED(&calld->pick_closure, error);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07002550}
2551
Mark D. Rothb2929602017-09-11 09:31:11 -07002552// A wrapper around pick_done_locked() that is used in cases where
2553// either (a) the pick was deferred pending a resolver result or (b) the
2554// pick was done asynchronously. Removes the call's polling entity from
2555// chand->interested_parties before invoking pick_done_locked().
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002556static void async_pick_done_locked(grpc_call_element* elem, grpc_error* error) {
Noah Eisenbe82e642018-02-09 09:16:55 -08002557 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2558 call_data* calld = static_cast<call_data*>(elem->call_data);
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002559 grpc_polling_entity_del_from_pollset_set(calld->pollent,
Mark D. Rothb2929602017-09-11 09:31:11 -07002560 chand->interested_parties);
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002561 pick_done_locked(elem, error);
Mark D. Rothb2929602017-09-11 09:31:11 -07002562}
2563
2564// Note: This runs under the client_channel combiner, but will NOT be
2565// holding the call combiner.
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002566static void pick_callback_cancel_locked(void* arg, grpc_error* error) {
Noah Eisenbe82e642018-02-09 09:16:55 -08002567 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2568 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2569 call_data* calld = static_cast<call_data*>(elem->call_data);
Mark D. Rothc0febd32018-01-09 10:25:24 -08002570 // Note: chand->lb_policy may have changed since we started our pick,
2571 // in which case we will be cancelling the pick on a policy other than
2572 // the one we started it on. However, this will just be a no-op.
2573 if (error != GRPC_ERROR_NONE && chand->lb_policy != nullptr) {
Craig Tiller6014e8a2017-10-16 13:50:29 -07002574 if (grpc_client_channel_trace.enabled()) {
Mark D. Rothb2929602017-09-11 09:31:11 -07002575 gpr_log(GPR_DEBUG, "chand=%p calld=%p: cancelling pick from LB policy %p",
Mark D. Rothc8875492018-02-20 08:33:48 -08002576 chand, calld, chand->lb_policy.get());
Mark D. Rothb2929602017-09-11 09:31:11 -07002577 }
Mark D. Rothc8875492018-02-20 08:33:48 -08002578 chand->lb_policy->CancelPickLocked(&calld->pick, GRPC_ERROR_REF(error));
Mark D. Rothb2929602017-09-11 09:31:11 -07002579 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002580 GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback_cancel");
Mark D. Rothb2929602017-09-11 09:31:11 -07002581}
2582
Mark D. Rothc8875492018-02-20 08:33:48 -08002583// Callback invoked by LoadBalancingPolicy::PickLocked() for async picks.
Mark D. Rothb2929602017-09-11 09:31:11 -07002584// Unrefs the LB policy and invokes async_pick_done_locked().
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002585static void pick_callback_done_locked(void* arg, grpc_error* error) {
Noah Eisenbe82e642018-02-09 09:16:55 -08002586 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2587 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2588 call_data* calld = static_cast<call_data*>(elem->call_data);
Craig Tiller6014e8a2017-10-16 13:50:29 -07002589 if (grpc_client_channel_trace.enabled()) {
Mark D. Rothb2929602017-09-11 09:31:11 -07002590 gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed asynchronously",
2591 chand, calld);
2592 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002593 async_pick_done_locked(elem, GRPC_ERROR_REF(error));
Ken Paysonf069dd42018-02-05 09:15:05 -08002594 GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback");
Mark D. Rothb2929602017-09-11 09:31:11 -07002595}
2596
Mark D. Roth718c8342018-02-28 13:00:04 -08002597// Applies service config to the call. Must be invoked once we know
2598// that the resolver has returned results to the channel.
2599static void apply_service_config_to_call_locked(grpc_call_element* elem) {
2600 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2601 call_data* calld = static_cast<call_data*>(elem->call_data);
2602 if (grpc_client_channel_trace.enabled()) {
2603 gpr_log(GPR_DEBUG, "chand=%p calld=%p: applying service config to call",
2604 chand, calld);
2605 }
2606 if (chand->retry_throttle_data != nullptr) {
2607 calld->retry_throttle_data =
2608 grpc_server_retry_throttle_data_ref(chand->retry_throttle_data);
2609 }
2610 if (chand->method_params_table != nullptr) {
2611 calld->method_params = grpc_core::ServiceConfig::MethodConfigTableLookup(
2612 *chand->method_params_table, calld->path);
2613 if (calld->method_params != nullptr) {
2614 // If the deadline from the service config is shorter than the one
2615 // from the client API, reset the deadline timer.
2616 if (chand->deadline_checking_enabled &&
2617 calld->method_params->timeout() != 0) {
2618 const grpc_millis per_method_deadline =
2619 grpc_timespec_to_millis_round_up(calld->call_start_time) +
2620 calld->method_params->timeout();
2621 if (per_method_deadline < calld->deadline) {
2622 calld->deadline = per_method_deadline;
2623 grpc_deadline_state_reset(elem, calld->deadline);
2624 }
2625 }
2626 }
2627 }
2628 // If no retry policy, disable retries.
2629 // TODO(roth): Remove this when adding support for transparent retries.
2630 if (calld->method_params == nullptr ||
2631 calld->method_params->retry_policy() == nullptr) {
2632 calld->enable_retries = false;
2633 }
2634}
2635
Mark D. Rothc8875492018-02-20 08:33:48 -08002636// Starts a pick on chand->lb_policy.
2637// Returns true if pick is completed synchronously.
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002638static bool pick_callback_start_locked(grpc_call_element* elem) {
Noah Eisenbe82e642018-02-09 09:16:55 -08002639 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2640 call_data* calld = static_cast<call_data*>(elem->call_data);
Craig Tiller6014e8a2017-10-16 13:50:29 -07002641 if (grpc_client_channel_trace.enabled()) {
Mark D. Rothb2929602017-09-11 09:31:11 -07002642 gpr_log(GPR_DEBUG, "chand=%p calld=%p: starting pick on lb_policy=%p",
Mark D. Rothc8875492018-02-20 08:33:48 -08002643 chand, calld, chand->lb_policy.get());
Mark D. Rothb2929602017-09-11 09:31:11 -07002644 }
Mark D. Roth718c8342018-02-28 13:00:04 -08002645 // Only get service config data on the first attempt.
2646 if (calld->num_attempts_completed == 0) {
2647 apply_service_config_to_call_locked(elem);
2648 }
Mark D. Rothb2929602017-09-11 09:31:11 -07002649 // If the application explicitly set wait_for_ready, use that.
2650 // Otherwise, if the service config specified a value for this
2651 // method, use that.
Mark D. Roth718c8342018-02-28 13:00:04 -08002652 //
2653 // The send_initial_metadata batch will be the first one in the list,
2654 // as set by get_batch_index() above.
2655 calld->pick.initial_metadata =
2656 calld->seen_send_initial_metadata
2657 ? &calld->send_initial_metadata
2658 : calld->pending_batches[0]
2659 .batch->payload->send_initial_metadata.send_initial_metadata;
2660 uint32_t send_initial_metadata_flags =
2661 calld->seen_send_initial_metadata
2662 ? calld->send_initial_metadata_flags
2663 : calld->pending_batches[0]
2664 .batch->payload->send_initial_metadata
2665 .send_initial_metadata_flags;
Mark D. Rothb2929602017-09-11 09:31:11 -07002666 const bool wait_for_ready_set_from_api =
Mark D. Roth718c8342018-02-28 13:00:04 -08002667 send_initial_metadata_flags &
Mark D. Rothb2929602017-09-11 09:31:11 -07002668 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
2669 const bool wait_for_ready_set_from_service_config =
Craig Tiller4782d922017-11-10 09:53:21 -08002670 calld->method_params != nullptr &&
Mark D. Roth3e7f2df2018-02-26 13:17:06 -08002671 calld->method_params->wait_for_ready() !=
2672 ClientChannelMethodParams::WAIT_FOR_READY_UNSET;
Mark D. Rothb2929602017-09-11 09:31:11 -07002673 if (!wait_for_ready_set_from_api && wait_for_ready_set_from_service_config) {
Mark D. Roth3e7f2df2018-02-26 13:17:06 -08002674 if (calld->method_params->wait_for_ready() ==
2675 ClientChannelMethodParams::WAIT_FOR_READY_TRUE) {
Mark D. Roth718c8342018-02-28 13:00:04 -08002676 send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
Mark D. Rothb2929602017-09-11 09:31:11 -07002677 } else {
Mark D. Roth718c8342018-02-28 13:00:04 -08002678 send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
Mark D. Rothb2929602017-09-11 09:31:11 -07002679 }
2680 }
Mark D. Roth718c8342018-02-28 13:00:04 -08002681 calld->pick.initial_metadata_flags = send_initial_metadata_flags;
2682 GRPC_CLOSURE_INIT(&calld->pick_closure, pick_callback_done_locked, elem,
Mark D. Rothb2929602017-09-11 09:31:11 -07002683 grpc_combiner_scheduler(chand->combiner));
Mark D. Roth718c8342018-02-28 13:00:04 -08002684 calld->pick.on_complete = &calld->pick_closure;
Ken Paysonf069dd42018-02-05 09:15:05 -08002685 GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback");
Mark D. Rothc8875492018-02-20 08:33:48 -08002686 const bool pick_done = chand->lb_policy->PickLocked(&calld->pick);
Mark D. Rothb2929602017-09-11 09:31:11 -07002687 if (pick_done) {
Mark D. Rothc8875492018-02-20 08:33:48 -08002688 // Pick completed synchronously.
Craig Tiller6014e8a2017-10-16 13:50:29 -07002689 if (grpc_client_channel_trace.enabled()) {
Mark D. Rothb2929602017-09-11 09:31:11 -07002690 gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed synchronously",
2691 chand, calld);
2692 }
Ken Paysonf069dd42018-02-05 09:15:05 -08002693 GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback");
Mark D. Rothb2929602017-09-11 09:31:11 -07002694 } else {
2695 GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback_cancel");
2696 grpc_call_combiner_set_notify_on_cancel(
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002697 calld->call_combiner,
Mark D. Roth718c8342018-02-28 13:00:04 -08002698 GRPC_CLOSURE_INIT(&calld->pick_cancel_closure,
Mark D. Rothb2929602017-09-11 09:31:11 -07002699 pick_callback_cancel_locked, elem,
2700 grpc_combiner_scheduler(chand->combiner)));
2701 }
2702 return pick_done;
2703}
Mark D. Roth0ca0be82017-06-20 07:49:33 -07002704
Craig Tiller577c9b22015-11-02 14:11:15 -08002705typedef struct {
Craig Tillerbaa14a92017-11-03 09:09:36 -07002706 grpc_call_element* elem;
Mark D. Roth66f3d2b2017-09-01 09:02:17 -07002707 bool finished;
Craig Tiller577c9b22015-11-02 14:11:15 -08002708 grpc_closure closure;
Mark D. Roth66f3d2b2017-09-01 09:02:17 -07002709 grpc_closure cancel_closure;
Mark D. Roth0ca0be82017-06-20 07:49:33 -07002710} pick_after_resolver_result_args;
Craig Tiller577c9b22015-11-02 14:11:15 -08002711
Mark D. Roth76e264b2017-08-25 09:03:33 -07002712// Note: This runs under the client_channel combiner, but will NOT be
2713// holding the call combiner.
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002714static void pick_after_resolver_result_cancel_locked(void* arg,
Craig Tillerbaa14a92017-11-03 09:09:36 -07002715 grpc_error* error) {
Noah Eisen4d20a662018-02-09 09:34:04 -08002716 pick_after_resolver_result_args* args =
2717 static_cast<pick_after_resolver_result_args*>(arg);
Mark D. Roth66f3d2b2017-09-01 09:02:17 -07002718 if (args->finished) {
2719 gpr_free(args);
Mark D. Rothb2b9a0f2017-09-01 09:06:47 -07002720 return;
Mark D. Roth764cf042017-09-01 09:00:06 -07002721 }
Mark D. Roth76e264b2017-08-25 09:03:33 -07002722 // If we don't yet have a resolver result, then a closure for
2723 // pick_after_resolver_result_done_locked() will have been added to
2724 // chand->waiting_for_resolver_result_closures, and it may not be invoked
2725 // until after this call has been destroyed. We mark the operation as
Mark D. Rothb2b9a0f2017-09-01 09:06:47 -07002726 // finished, so that when pick_after_resolver_result_done_locked()
Mark D. Roth76e264b2017-08-25 09:03:33 -07002727 // is called, it will be a no-op. We also immediately invoke
Mark D. Rothb2929602017-09-11 09:31:11 -07002728 // async_pick_done_locked() to propagate the error back to the caller.
2729 args->finished = true;
Craig Tillerbaa14a92017-11-03 09:09:36 -07002730 grpc_call_element* elem = args->elem;
Noah Eisenbe82e642018-02-09 09:16:55 -08002731 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2732 call_data* calld = static_cast<call_data*>(elem->call_data);
Craig Tiller6014e8a2017-10-16 13:50:29 -07002733 if (grpc_client_channel_trace.enabled()) {
Mark D. Rothb2b9a0f2017-09-01 09:06:47 -07002734 gpr_log(GPR_DEBUG,
2735 "chand=%p calld=%p: cancelling pick waiting for resolver result",
2736 chand, calld);
Mark D. Roth76e264b2017-08-25 09:03:33 -07002737 }
Mark D. Rothb2b9a0f2017-09-01 09:06:47 -07002738 // Note: Although we are not in the call combiner here, we are
2739 // basically stealing the call combiner from the pending pick, so
Mark D. Rothb2929602017-09-11 09:31:11 -07002740 // it's safe to call async_pick_done_locked() here -- we are
Mark D. Rothb2b9a0f2017-09-01 09:06:47 -07002741 // essentially calling it here instead of calling it in
2742 // pick_after_resolver_result_done_locked().
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002743 async_pick_done_locked(elem, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2744 "Pick cancelled", &error, 1));
Mark D. Roth76e264b2017-08-25 09:03:33 -07002745}
2746
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002747static void pick_after_resolver_result_done_locked(void* arg,
Craig Tillerbaa14a92017-11-03 09:09:36 -07002748 grpc_error* error) {
Noah Eisen4d20a662018-02-09 09:34:04 -08002749 pick_after_resolver_result_args* args =
2750 static_cast<pick_after_resolver_result_args*>(arg);
Mark D. Roth66f3d2b2017-09-01 09:02:17 -07002751 if (args->finished) {
Craig Tiller577c9b22015-11-02 14:11:15 -08002752 /* cancelled, do nothing */
Craig Tiller6014e8a2017-10-16 13:50:29 -07002753 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth60751fe2017-07-07 12:50:33 -07002754 gpr_log(GPR_DEBUG, "call cancelled before resolver result");
2755 }
Mark D. Roth66f3d2b2017-09-01 09:02:17 -07002756 gpr_free(args);
Mark D. Rothb2b9a0f2017-09-01 09:06:47 -07002757 return;
2758 }
2759 args->finished = true;
Craig Tillerbaa14a92017-11-03 09:09:36 -07002760 grpc_call_element* elem = args->elem;
Noah Eisenbe82e642018-02-09 09:16:55 -08002761 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2762 call_data* calld = static_cast<call_data*>(elem->call_data);
Mark D. Rothb2b9a0f2017-09-01 09:06:47 -07002763 if (error != GRPC_ERROR_NONE) {
Craig Tiller6014e8a2017-10-16 13:50:29 -07002764 if (grpc_client_channel_trace.enabled()) {
Mark D. Rothb2b9a0f2017-09-01 09:06:47 -07002765 gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver failed to return data",
2766 chand, calld);
2767 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002768 async_pick_done_locked(elem, GRPC_ERROR_REF(error));
Mark D. Rothe63e06d2018-03-23 08:12:11 -07002769 } else if (chand->resolver == nullptr) {
2770 // Shutting down.
2771 if (grpc_client_channel_trace.enabled()) {
2772 gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver disconnected", chand,
2773 calld);
2774 }
2775 async_pick_done_locked(
2776 elem, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
2777 } else if (chand->lb_policy == nullptr) {
2778 // Transient resolver failure.
2779 // If call has wait_for_ready=true, try again; otherwise, fail.
2780 uint32_t send_initial_metadata_flags =
2781 calld->seen_send_initial_metadata
2782 ? calld->send_initial_metadata_flags
2783 : calld->pending_batches[0]
2784 .batch->payload->send_initial_metadata
2785 .send_initial_metadata_flags;
2786 if (send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) {
2787 if (grpc_client_channel_trace.enabled()) {
2788 gpr_log(GPR_DEBUG,
2789 "chand=%p calld=%p: resolver returned but no LB policy; "
2790 "wait_for_ready=true; trying again",
2791 chand, calld);
2792 }
2793 pick_after_resolver_result_start_locked(elem);
2794 } else {
2795 if (grpc_client_channel_trace.enabled()) {
2796 gpr_log(GPR_DEBUG,
2797 "chand=%p calld=%p: resolver returned but no LB policy; "
2798 "wait_for_ready=false; failing",
2799 chand, calld);
2800 }
2801 async_pick_done_locked(
2802 elem,
2803 grpc_error_set_int(
2804 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Name resolution failure"),
2805 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
2806 }
2807 } else {
Craig Tiller6014e8a2017-10-16 13:50:29 -07002808 if (grpc_client_channel_trace.enabled()) {
Mark D. Rothb2b9a0f2017-09-01 09:06:47 -07002809 gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver returned, doing pick",
2810 chand, calld);
2811 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002812 if (pick_callback_start_locked(elem)) {
Mark D. Rothb2929602017-09-11 09:31:11 -07002813 // Even if the LB policy returns a result synchronously, we have
2814 // already added our polling entity to chand->interested_parties
2815 // in order to wait for the resolver result, so we need to
2816 // remove it here. Therefore, we call async_pick_done_locked()
2817 // instead of pick_done_locked().
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002818 async_pick_done_locked(elem, GRPC_ERROR_NONE);
Mark D. Roth9dab7d52016-10-07 07:48:03 -07002819 }
Craig Tiller577c9b22015-11-02 14:11:15 -08002820 }
Craig Tiller577c9b22015-11-02 14:11:15 -08002821}
2822
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002823static void pick_after_resolver_result_start_locked(grpc_call_element* elem) {
Noah Eisenbe82e642018-02-09 09:16:55 -08002824 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2825 call_data* calld = static_cast<call_data*>(elem->call_data);
Craig Tiller6014e8a2017-10-16 13:50:29 -07002826 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth60751fe2017-07-07 12:50:33 -07002827 gpr_log(GPR_DEBUG,
2828 "chand=%p calld=%p: deferring pick pending resolver result", chand,
2829 calld);
Mark D. Roth64a317c2017-05-02 08:27:08 -07002830 }
Craig Tillerbaa14a92017-11-03 09:09:36 -07002831 pick_after_resolver_result_args* args =
Noah Eisenbe82e642018-02-09 09:16:55 -08002832 static_cast<pick_after_resolver_result_args*>(gpr_zalloc(sizeof(*args)));
Mark D. Roth60751fe2017-07-07 12:50:33 -07002833 args->elem = elem;
2834 GRPC_CLOSURE_INIT(&args->closure, pick_after_resolver_result_done_locked,
2835 args, grpc_combiner_scheduler(chand->combiner));
2836 grpc_closure_list_append(&chand->waiting_for_resolver_result_closures,
2837 &args->closure, GRPC_ERROR_NONE);
Mark D. Roth76e264b2017-08-25 09:03:33 -07002838 grpc_call_combiner_set_notify_on_cancel(
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002839 calld->call_combiner,
Mark D. Roth66f3d2b2017-09-01 09:02:17 -07002840 GRPC_CLOSURE_INIT(&args->cancel_closure,
2841 pick_after_resolver_result_cancel_locked, args,
Mark D. Roth76e264b2017-08-25 09:03:33 -07002842 grpc_combiner_scheduler(chand->combiner)));
Mark D. Roth60751fe2017-07-07 12:50:33 -07002843}
2844
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002845static void start_pick_locked(void* arg, grpc_error* ignored) {
Noah Eisenbe82e642018-02-09 09:16:55 -08002846 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2847 call_data* calld = static_cast<call_data*>(elem->call_data);
2848 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
David Garcia Quintasbe1b7f92018-01-12 14:01:38 -08002849 GPR_ASSERT(calld->pick.connected_subchannel == nullptr);
Mark D. Roth718c8342018-02-28 13:00:04 -08002850 GPR_ASSERT(calld->subchannel_call == nullptr);
Craig Tiller4782d922017-11-10 09:53:21 -08002851 if (chand->lb_policy != nullptr) {
Mark D. Rothb2929602017-09-11 09:31:11 -07002852 // We already have an LB policy, so ask it for a pick.
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002853 if (pick_callback_start_locked(elem)) {
Mark D. Rothb2929602017-09-11 09:31:11 -07002854 // Pick completed synchronously.
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002855 pick_done_locked(elem, GRPC_ERROR_NONE);
Mark D. Rothb2929602017-09-11 09:31:11 -07002856 return;
Mark D. Roth2a5959f2016-09-01 08:20:27 -07002857 }
Mark D. Roth76e264b2017-08-25 09:03:33 -07002858 } else {
Mark D. Rothb2929602017-09-11 09:31:11 -07002859 // We do not yet have an LB policy, so wait for a resolver result.
Craig Tiller4782d922017-11-10 09:53:21 -08002860 if (chand->resolver == nullptr) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002861 pick_done_locked(elem,
Mark D. Rothb2929602017-09-11 09:31:11 -07002862 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
2863 return;
2864 }
2865 if (!chand->started_resolving) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002866 start_resolving_locked(chand);
Mark D. Rothb2929602017-09-11 09:31:11 -07002867 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002868 pick_after_resolver_result_start_locked(elem);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07002869 }
Mark D. Rothb2929602017-09-11 09:31:11 -07002870 // We need to wait for either a resolver result or for an async result
2871 // from the LB policy. Add the polling entity from call_data to the
2872 // channel_data's interested_parties, so that the I/O of the LB policy
2873 // and resolver can be done under it. The polling entity will be
2874 // removed in async_pick_done_locked().
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002875 grpc_polling_entity_add_to_pollset_set(calld->pollent,
Mark D. Rothb2929602017-09-11 09:31:11 -07002876 chand->interested_parties);
Craig Tillera11bfc82017-02-14 09:56:33 -08002877}
2878
Mark D. Roth718c8342018-02-28 13:00:04 -08002879//
2880// filter call vtable functions
2881//
Mark D. Rothd6d192d2017-02-23 08:58:42 -08002882
Craig Tillere1b51da2017-03-31 15:44:33 -07002883static void cc_start_transport_stream_op_batch(
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002884 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
yang-gce1cfea2018-01-31 15:59:50 -08002885 GPR_TIMER_SCOPE("cc_start_transport_stream_op_batch", 0);
Noah Eisenbe82e642018-02-09 09:16:55 -08002886 call_data* calld = static_cast<call_data*>(elem->call_data);
2887 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Craig Tiller3be7dd02017-04-03 14:30:03 -07002888 if (chand->deadline_checking_enabled) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002889 grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch);
Craig Tiller3be7dd02017-04-03 14:30:03 -07002890 }
Mark D. Roth76e264b2017-08-25 09:03:33 -07002891 // If we've previously been cancelled, immediately fail any new batches.
Mark D. Roth718c8342018-02-28 13:00:04 -08002892 if (calld->cancel_error != GRPC_ERROR_NONE) {
Craig Tiller6014e8a2017-10-16 13:50:29 -07002893 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth76e264b2017-08-25 09:03:33 -07002894 gpr_log(GPR_DEBUG, "chand=%p calld=%p: failing batch with error: %s",
Mark D. Roth718c8342018-02-28 13:00:04 -08002895 chand, calld, grpc_error_string(calld->cancel_error));
Mark D. Roth76e264b2017-08-25 09:03:33 -07002896 }
Mark D. Roth718c8342018-02-28 13:00:04 -08002897 // Note: This will release the call combiner.
Mark D. Roth76e264b2017-08-25 09:03:33 -07002898 grpc_transport_stream_op_batch_finish_with_failure(
Mark D. Roth718c8342018-02-28 13:00:04 -08002899 batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner);
yang-gce1cfea2018-01-31 15:59:50 -08002900 return;
Mark D. Roth76e264b2017-08-25 09:03:33 -07002901 }
Mark D. Roth718c8342018-02-28 13:00:04 -08002902 // Handle cancellation.
Mark D. Roth76e264b2017-08-25 09:03:33 -07002903 if (batch->cancel_stream) {
2904 // Stash a copy of cancel_error in our call data, so that we can use
2905 // it for subsequent operations. This ensures that if the call is
2906 // cancelled before any batches are passed down (e.g., if the deadline
2907 // is in the past when the call starts), we can return the right
2908 // error to the caller when the first batch does get passed down.
Mark D. Roth718c8342018-02-28 13:00:04 -08002909 GRPC_ERROR_UNREF(calld->cancel_error);
2910 calld->cancel_error =
2911 GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
Craig Tiller6014e8a2017-10-16 13:50:29 -07002912 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth76e264b2017-08-25 09:03:33 -07002913 gpr_log(GPR_DEBUG, "chand=%p calld=%p: recording cancel_error=%s", chand,
Mark D. Roth718c8342018-02-28 13:00:04 -08002914 calld, grpc_error_string(calld->cancel_error));
Mark D. Roth76e264b2017-08-25 09:03:33 -07002915 }
Mark D. Roth718c8342018-02-28 13:00:04 -08002916 // If we do not have a subchannel call (i.e., a pick has not yet
2917 // been started), fail all pending batches. Otherwise, send the
2918 // cancellation down to the subchannel call.
2919 if (calld->subchannel_call == nullptr) {
2920 pending_batches_fail(elem, GRPC_ERROR_REF(calld->cancel_error),
2921 false /* yield_call_combiner */);
2922 // Note: This will release the call combiner.
2923 grpc_transport_stream_op_batch_finish_with_failure(
2924 batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner);
Mark D. Roth76e264b2017-08-25 09:03:33 -07002925 } else {
Mark D. Roth718c8342018-02-28 13:00:04 -08002926 // Note: This will release the call combiner.
2927 grpc_subchannel_call_process_op(calld->subchannel_call, batch);
Mark D. Roth76e264b2017-08-25 09:03:33 -07002928 }
yang-gce1cfea2018-01-31 15:59:50 -08002929 return;
Mark D. Roth76e264b2017-08-25 09:03:33 -07002930 }
Mark D. Roth718c8342018-02-28 13:00:04 -08002931 // Add the batch to the pending list.
2932 pending_batches_add(elem, batch);
Mark D. Roth76e264b2017-08-25 09:03:33 -07002933 // Check if we've already gotten a subchannel call.
2934 // Note that once we have completed the pick, we do not need to enter
2935 // the channel combiner, which is more efficient (especially for
2936 // streaming calls).
Craig Tiller4782d922017-11-10 09:53:21 -08002937 if (calld->subchannel_call != nullptr) {
Craig Tiller6014e8a2017-10-16 13:50:29 -07002938 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth60751fe2017-07-07 12:50:33 -07002939 gpr_log(GPR_DEBUG,
Mark D. Roth718c8342018-02-28 13:00:04 -08002940 "chand=%p calld=%p: starting batch on subchannel_call=%p", chand,
Mark D. Roth76e264b2017-08-25 09:03:33 -07002941 calld, calld->subchannel_call);
Mark D. Roth60751fe2017-07-07 12:50:33 -07002942 }
Mark D. Roth718c8342018-02-28 13:00:04 -08002943 pending_batches_resume(elem);
yang-gce1cfea2018-01-31 15:59:50 -08002944 return;
Mark D. Roth2a5959f2016-09-01 08:20:27 -07002945 }
Mark D. Roth76e264b2017-08-25 09:03:33 -07002946 // We do not yet have a subchannel call.
Mark D. Roth76e264b2017-08-25 09:03:33 -07002947 // For batches containing a send_initial_metadata op, enter the channel
2948 // combiner to start a pick.
2949 if (batch->send_initial_metadata) {
Craig Tiller6014e8a2017-10-16 13:50:29 -07002950 if (grpc_client_channel_trace.enabled()) {
Mark D. Rothb2929602017-09-11 09:31:11 -07002951 gpr_log(GPR_DEBUG, "chand=%p calld=%p: entering client_channel combiner",
2952 chand, calld);
Mark D. Roth76e264b2017-08-25 09:03:33 -07002953 }
2954 GRPC_CLOSURE_SCHED(
Mark D. Roth76e264b2017-08-25 09:03:33 -07002955 GRPC_CLOSURE_INIT(&batch->handler_private.closure, start_pick_locked,
2956 elem, grpc_combiner_scheduler(chand->combiner)),
2957 GRPC_ERROR_NONE);
2958 } else {
2959 // For all other batches, release the call combiner.
Craig Tiller6014e8a2017-10-16 13:50:29 -07002960 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth76e264b2017-08-25 09:03:33 -07002961 gpr_log(GPR_DEBUG,
2962 "chand=%p calld=%p: saved batch, yeilding call combiner", chand,
2963 calld);
2964 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002965 GRPC_CALL_COMBINER_STOP(calld->call_combiner,
Mark D. Roth76e264b2017-08-25 09:03:33 -07002966 "batch does not include send_initial_metadata");
Mark D. Roth60751fe2017-07-07 12:50:33 -07002967 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -07002968}
2969
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08002970/* Constructor for call_data */
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002971static grpc_error* cc_init_call_elem(grpc_call_element* elem,
Craig Tillerbaa14a92017-11-03 09:09:36 -07002972 const grpc_call_element_args* args) {
Noah Eisenbe82e642018-02-09 09:16:55 -08002973 call_data* calld = static_cast<call_data*>(elem->call_data);
2974 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Mark D. Rothe40dd292016-10-05 14:58:37 -07002975 // Initialize data members.
Craig Tiller7c70b6c2017-01-23 07:48:42 -08002976 calld->path = grpc_slice_ref_internal(args->path);
Mark D. Rothff08f332016-10-14 13:01:01 -07002977 calld->call_start_time = args->start_time;
Craig Tiller89c14282017-07-19 15:32:27 -07002978 calld->deadline = args->deadline;
Craig Tillerd426cac2017-03-13 12:30:45 -07002979 calld->arena = args->arena;
Mark D. Roth66f3d2b2017-09-01 09:02:17 -07002980 calld->owning_call = args->call_stack;
Mark D. Roth76e264b2017-08-25 09:03:33 -07002981 calld->call_combiner = args->call_combiner;
Craig Tiller3be7dd02017-04-03 14:30:03 -07002982 if (chand->deadline_checking_enabled) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002983 grpc_deadline_state_init(elem, args->call_stack, args->call_combiner,
2984 calld->deadline);
Craig Tiller3be7dd02017-04-03 14:30:03 -07002985 }
Mark D. Roth718c8342018-02-28 13:00:04 -08002986 calld->enable_retries = chand->enable_retries;
Mark D. Rothefcd45b2018-03-28 10:49:59 -07002987 calld->send_messages.Init();
Mark D. Roth0badbe82016-06-23 10:15:12 -07002988 return GRPC_ERROR_NONE;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08002989}
2990
2991/* Destructor for call_data */
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002992static void cc_destroy_call_elem(grpc_call_element* elem,
Craig Tillerbaa14a92017-11-03 09:09:36 -07002993 const grpc_call_final_info* final_info,
2994 grpc_closure* then_schedule_closure) {
Noah Eisenbe82e642018-02-09 09:16:55 -08002995 call_data* calld = static_cast<call_data*>(elem->call_data);
2996 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Craig Tiller3be7dd02017-04-03 14:30:03 -07002997 if (chand->deadline_checking_enabled) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002998 grpc_deadline_state_destroy(elem);
Craig Tiller3be7dd02017-04-03 14:30:03 -07002999 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003000 grpc_slice_unref_internal(calld->path);
Mark D. Roth3e7f2df2018-02-26 13:17:06 -08003001 calld->method_params.reset();
Mark D. Roth718c8342018-02-28 13:00:04 -08003002 GRPC_ERROR_UNREF(calld->cancel_error);
Craig Tiller4782d922017-11-10 09:53:21 -08003003 if (calld->subchannel_call != nullptr) {
Mark D. Roth76e264b2017-08-25 09:03:33 -07003004 grpc_subchannel_call_set_cleanup_closure(calld->subchannel_call,
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07003005 then_schedule_closure);
Craig Tiller4782d922017-11-10 09:53:21 -08003006 then_schedule_closure = nullptr;
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003007 GRPC_SUBCHANNEL_CALL_UNREF(calld->subchannel_call,
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07003008 "client_channel_destroy_call");
Mark D. Roth4c0fe492016-08-31 13:51:55 -07003009 }
Mark D. Roth718c8342018-02-28 13:00:04 -08003010 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
3011 GPR_ASSERT(calld->pending_batches[i].batch == nullptr);
3012 }
David Garcia Quintasbe1b7f92018-01-12 14:01:38 -08003013 if (calld->pick.connected_subchannel != nullptr) {
David Garcia Quintasdfa28512018-01-11 18:31:13 -08003014 calld->pick.connected_subchannel.reset();
Craig Tiller693d3942016-10-27 16:51:25 -07003015 }
Mark D. Roth09e458c2017-05-02 08:13:26 -07003016 for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) {
Mark D. Rothc0febd32018-01-09 10:25:24 -08003017 if (calld->pick.subchannel_call_context[i].value != nullptr) {
3018 calld->pick.subchannel_call_context[i].destroy(
3019 calld->pick.subchannel_call_context[i].value);
Mark D. Roth09e458c2017-05-02 08:13:26 -07003020 }
3021 }
Mark D. Rothefcd45b2018-03-28 10:49:59 -07003022 calld->send_messages.Destroy();
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003023 GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08003024}
3025
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003026static void cc_set_pollset_or_pollset_set(grpc_call_element* elem,
Craig Tillerbaa14a92017-11-03 09:09:36 -07003027 grpc_polling_entity* pollent) {
Noah Eisenbe82e642018-02-09 09:16:55 -08003028 call_data* calld = static_cast<call_data*>(elem->call_data);
David Garcia Quintas2a50dfe2016-05-31 15:09:12 -07003029 calld->pollent = pollent;
Craig Tiller577c9b22015-11-02 14:11:15 -08003030}
3031
Mark D. Roth2a5959f2016-09-01 08:20:27 -07003032/*************************************************************************
3033 * EXPORTED SYMBOLS
3034 */
3035
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08003036const grpc_channel_filter grpc_client_channel_filter = {
Craig Tillera0f3abd2017-03-31 15:42:16 -07003037 cc_start_transport_stream_op_batch,
Craig Tillerf40df232016-03-25 13:38:14 -07003038 cc_start_transport_op,
3039 sizeof(call_data),
Mark D. Roth2a5959f2016-09-01 08:20:27 -07003040 cc_init_call_elem,
David Garcia Quintas4afce7e2016-04-18 16:25:17 -07003041 cc_set_pollset_or_pollset_set,
Mark D. Roth2a5959f2016-09-01 08:20:27 -07003042 cc_destroy_call_elem,
Craig Tillerf40df232016-03-25 13:38:14 -07003043 sizeof(channel_data),
Mark D. Roth2a5959f2016-09-01 08:20:27 -07003044 cc_init_channel_elem,
3045 cc_destroy_channel_elem,
Mark D. Rothb2d24882016-10-27 15:44:07 -07003046 cc_get_channel_info,
Craig Tillerf40df232016-03-25 13:38:14 -07003047 "client-channel",
Craig Tiller87d5b192015-04-16 14:37:57 -07003048};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08003049
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003050static void try_to_connect_locked(void* arg, grpc_error* error_ignored) {
Noah Eisenbe82e642018-02-09 09:16:55 -08003051 channel_data* chand = static_cast<channel_data*>(arg);
Craig Tiller4782d922017-11-10 09:53:21 -08003052 if (chand->lb_policy != nullptr) {
Mark D. Rothc8875492018-02-20 08:33:48 -08003053 chand->lb_policy->ExitIdleLocked();
Craig Tiller613dafa2017-02-09 12:00:43 -08003054 } else {
3055 chand->exit_idle_when_lb_policy_arrives = true;
Craig Tiller4782d922017-11-10 09:53:21 -08003056 if (!chand->started_resolving && chand->resolver != nullptr) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003057 start_resolving_locked(chand);
Craig Tiller613dafa2017-02-09 12:00:43 -08003058 }
3059 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003060 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "try_to_connect");
Craig Tiller613dafa2017-02-09 12:00:43 -08003061}
3062
Craig Tillera82950e2015-09-22 12:33:20 -07003063grpc_connectivity_state grpc_client_channel_check_connectivity_state(
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003064 grpc_channel_element* elem, int try_to_connect) {
Noah Eisenbe82e642018-02-09 09:16:55 -08003065 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Craig Tillera8610c02017-02-14 10:05:11 -08003066 grpc_connectivity_state out =
3067 grpc_connectivity_state_check(&chand->state_tracker);
Craig Tillera82950e2015-09-22 12:33:20 -07003068 if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
Craig Tillerd2e5cfc2017-02-09 13:02:20 -08003069 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect");
ncteisen274bbbe2017-06-08 14:57:11 -07003070 GRPC_CLOSURE_SCHED(
Yash Tibrewal0ee75742017-10-13 16:07:13 -07003071 GRPC_CLOSURE_CREATE(try_to_connect_locked, chand,
3072 grpc_combiner_scheduler(chand->combiner)),
Craig Tiller613dafa2017-02-09 12:00:43 -08003073 GRPC_ERROR_NONE);
Craig Tillera82950e2015-09-22 12:33:20 -07003074 }
Craig Tiller48cb07c2015-07-15 16:16:15 -07003075 return out;
3076}
3077
Alexander Polcync3b1f182017-04-18 13:51:36 -07003078typedef struct external_connectivity_watcher {
Craig Tillerbaa14a92017-11-03 09:09:36 -07003079 channel_data* chand;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07003080 grpc_polling_entity pollent;
Craig Tillerbaa14a92017-11-03 09:09:36 -07003081 grpc_closure* on_complete;
3082 grpc_closure* watcher_timer_init;
3083 grpc_connectivity_state* state;
Craig Tiller86c99582015-11-25 15:22:26 -08003084 grpc_closure my_closure;
Craig Tillerbaa14a92017-11-03 09:09:36 -07003085 struct external_connectivity_watcher* next;
Craig Tiller86c99582015-11-25 15:22:26 -08003086} external_connectivity_watcher;
3087
Craig Tillerbaa14a92017-11-03 09:09:36 -07003088static external_connectivity_watcher* lookup_external_connectivity_watcher(
3089 channel_data* chand, grpc_closure* on_complete) {
Alexander Polcync3b1f182017-04-18 13:51:36 -07003090 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
Craig Tillerbaa14a92017-11-03 09:09:36 -07003091 external_connectivity_watcher* w =
Alexander Polcync3b1f182017-04-18 13:51:36 -07003092 chand->external_connectivity_watcher_list_head;
Craig Tiller4782d922017-11-10 09:53:21 -08003093 while (w != nullptr && w->on_complete != on_complete) {
Alexander Polcync3b1f182017-04-18 13:51:36 -07003094 w = w->next;
3095 }
3096 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
3097 return w;
3098}
3099
3100static void external_connectivity_watcher_list_append(
Craig Tillerbaa14a92017-11-03 09:09:36 -07003101 channel_data* chand, external_connectivity_watcher* w) {
Alexander Polcync3b1f182017-04-18 13:51:36 -07003102 GPR_ASSERT(!lookup_external_connectivity_watcher(chand, w->on_complete));
3103
3104 gpr_mu_lock(&w->chand->external_connectivity_watcher_list_mu);
3105 GPR_ASSERT(!w->next);
3106 w->next = chand->external_connectivity_watcher_list_head;
3107 chand->external_connectivity_watcher_list_head = w;
3108 gpr_mu_unlock(&w->chand->external_connectivity_watcher_list_mu);
3109}
3110
3111static void external_connectivity_watcher_list_remove(
Craig Tillerbaa14a92017-11-03 09:09:36 -07003112 channel_data* chand, external_connectivity_watcher* too_remove) {
Alexander Polcync3b1f182017-04-18 13:51:36 -07003113 GPR_ASSERT(
3114 lookup_external_connectivity_watcher(chand, too_remove->on_complete));
3115 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
3116 if (too_remove == chand->external_connectivity_watcher_list_head) {
3117 chand->external_connectivity_watcher_list_head = too_remove->next;
3118 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
3119 return;
3120 }
Craig Tillerbaa14a92017-11-03 09:09:36 -07003121 external_connectivity_watcher* w =
Alexander Polcync3b1f182017-04-18 13:51:36 -07003122 chand->external_connectivity_watcher_list_head;
Craig Tiller4782d922017-11-10 09:53:21 -08003123 while (w != nullptr) {
Alexander Polcync3b1f182017-04-18 13:51:36 -07003124 if (w->next == too_remove) {
3125 w->next = w->next->next;
3126 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
3127 return;
3128 }
3129 w = w->next;
3130 }
3131 GPR_UNREACHABLE_CODE(return );
3132}
3133
3134int grpc_client_channel_num_external_connectivity_watchers(
Craig Tillerbaa14a92017-11-03 09:09:36 -07003135 grpc_channel_element* elem) {
Noah Eisenbe82e642018-02-09 09:16:55 -08003136 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Alexander Polcync3b1f182017-04-18 13:51:36 -07003137 int count = 0;
3138
3139 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
Craig Tillerbaa14a92017-11-03 09:09:36 -07003140 external_connectivity_watcher* w =
Alexander Polcync3b1f182017-04-18 13:51:36 -07003141 chand->external_connectivity_watcher_list_head;
Craig Tiller4782d922017-11-10 09:53:21 -08003142 while (w != nullptr) {
Alexander Polcync3b1f182017-04-18 13:51:36 -07003143 count++;
3144 w = w->next;
3145 }
3146 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
3147
3148 return count;
3149}
3150
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003151static void on_external_watch_complete_locked(void* arg, grpc_error* error) {
Noah Eisen4d20a662018-02-09 09:34:04 -08003152 external_connectivity_watcher* w =
3153 static_cast<external_connectivity_watcher*>(arg);
Craig Tillerbaa14a92017-11-03 09:09:36 -07003154 grpc_closure* follow_up = w->on_complete;
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003155 grpc_polling_entity_del_from_pollset_set(&w->pollent,
David Garcia Quintas87d5a312017-06-06 19:45:58 -07003156 w->chand->interested_parties);
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003157 GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack,
Craig Tiller1d881fb2015-12-01 07:39:04 -08003158 "external_connectivity_watcher");
Alexander Polcync3b1f182017-04-18 13:51:36 -07003159 external_connectivity_watcher_list_remove(w->chand, w);
Craig Tiller86c99582015-11-25 15:22:26 -08003160 gpr_free(w);
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003161 GRPC_CLOSURE_RUN(follow_up, GRPC_ERROR_REF(error));
Craig Tiller613dafa2017-02-09 12:00:43 -08003162}
3163
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003164static void watch_connectivity_state_locked(void* arg,
Craig Tillerbaa14a92017-11-03 09:09:36 -07003165 grpc_error* error_ignored) {
Noah Eisen4d20a662018-02-09 09:34:04 -08003166 external_connectivity_watcher* w =
3167 static_cast<external_connectivity_watcher*>(arg);
Craig Tiller4782d922017-11-10 09:53:21 -08003168 external_connectivity_watcher* found = nullptr;
3169 if (w->state != nullptr) {
Alexander Polcync3b1f182017-04-18 13:51:36 -07003170 external_connectivity_watcher_list_append(w->chand, w);
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003171 GRPC_CLOSURE_RUN(w->watcher_timer_init, GRPC_ERROR_NONE);
Alexander Polcyn2004e392017-10-16 15:14:46 -07003172 GRPC_CLOSURE_INIT(&w->my_closure, on_external_watch_complete_locked, w,
3173 grpc_combiner_scheduler(w->chand->combiner));
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003174 grpc_connectivity_state_notify_on_state_change(&w->chand->state_tracker,
3175 w->state, &w->my_closure);
Alexander Polcync3b1f182017-04-18 13:51:36 -07003176 } else {
Craig Tiller4782d922017-11-10 09:53:21 -08003177 GPR_ASSERT(w->watcher_timer_init == nullptr);
Alexander Polcync3b1f182017-04-18 13:51:36 -07003178 found = lookup_external_connectivity_watcher(w->chand, w->on_complete);
3179 if (found) {
3180 GPR_ASSERT(found->on_complete == w->on_complete);
3181 grpc_connectivity_state_notify_on_state_change(
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003182 &found->chand->state_tracker, nullptr, &found->my_closure);
Alexander Polcync3b1f182017-04-18 13:51:36 -07003183 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003184 grpc_polling_entity_del_from_pollset_set(&w->pollent,
David Garcia Quintas87d5a312017-06-06 19:45:58 -07003185 w->chand->interested_parties);
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003186 GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack,
Alexander Polcync3b1f182017-04-18 13:51:36 -07003187 "external_connectivity_watcher");
3188 gpr_free(w);
3189 }
Craig Tiller86c99582015-11-25 15:22:26 -08003190}
3191
Craig Tillera82950e2015-09-22 12:33:20 -07003192void grpc_client_channel_watch_connectivity_state(
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003193 grpc_channel_element* elem, grpc_polling_entity pollent,
3194 grpc_connectivity_state* state, grpc_closure* closure,
3195 grpc_closure* watcher_timer_init) {
Noah Eisenbe82e642018-02-09 09:16:55 -08003196 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Craig Tillerbaa14a92017-11-03 09:09:36 -07003197 external_connectivity_watcher* w =
Noah Eisenbe82e642018-02-09 09:16:55 -08003198 static_cast<external_connectivity_watcher*>(gpr_zalloc(sizeof(*w)));
Craig Tiller86c99582015-11-25 15:22:26 -08003199 w->chand = chand;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07003200 w->pollent = pollent;
Mark D. Roth92210832017-05-02 15:04:39 -07003201 w->on_complete = closure;
Craig Tiller613dafa2017-02-09 12:00:43 -08003202 w->state = state;
Alexander Polcync3b1f182017-04-18 13:51:36 -07003203 w->watcher_timer_init = watcher_timer_init;
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003204 grpc_polling_entity_add_to_pollset_set(&w->pollent,
David Garcia Quintas87d5a312017-06-06 19:45:58 -07003205 chand->interested_parties);
Craig Tiller1d881fb2015-12-01 07:39:04 -08003206 GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
3207 "external_connectivity_watcher");
ncteisen274bbbe2017-06-08 14:57:11 -07003208 GRPC_CLOSURE_SCHED(
ncteisen274bbbe2017-06-08 14:57:11 -07003209 GRPC_CLOSURE_INIT(&w->my_closure, watch_connectivity_state_locked, w,
Craig Tilleree4b1452017-05-12 10:56:03 -07003210 grpc_combiner_scheduler(chand->combiner)),
Craig Tiller613dafa2017-02-09 12:00:43 -08003211 GRPC_ERROR_NONE);
Craig Tiller48cb07c2015-07-15 16:16:15 -07003212}
Mark D. Roth718c8342018-02-28 13:00:04 -08003213
3214grpc_subchannel_call* grpc_client_channel_get_subchannel_call(
3215 grpc_call_element* elem) {
3216 call_data* calld = static_cast<call_data*>(elem->call_data);
3217 return calld->subchannel_call;
3218}