blob: ea6775a8d857e9ab012f846a2c1602cde1e06947 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2015 gRPC authors.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080016 *
17 */
18
Yash Tibrewal37fdb732017-09-25 16:45:02 -070019#include <grpc/support/port_platform.h>
20
Craig Tiller9eb0fde2017-03-31 16:59:30 -070021#include "src/core/ext/filters/client_channel/client_channel.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080022
Yash Tibrewalfcd26bc2017-09-25 15:08:28 -070023#include <inttypes.h>
Mark D. Roth718c8342018-02-28 13:00:04 -080024#include <limits.h>
Mark D. Roth4c0fe492016-08-31 13:51:55 -070025#include <stdbool.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080026#include <stdio.h>
Craig Tillereb3b12e2015-06-26 14:42:49 -070027#include <string.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080028
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080029#include <grpc/support/alloc.h>
30#include <grpc/support/log.h>
Mark D. Rothb2d24882016-10-27 15:44:07 -070031#include <grpc/support/string_util.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080032#include <grpc/support/sync.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080033
Yuchen Zeng0bad30a2017-10-05 21:47:39 -070034#include "src/core/ext/filters/client_channel/backup_poller.h"
Craig Tiller9eb0fde2017-03-31 16:59:30 -070035#include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
36#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
Mark D. Roth3e7f2df2018-02-26 13:17:06 -080037#include "src/core/ext/filters/client_channel/method_params.h"
Craig Tiller9eb0fde2017-03-31 16:59:30 -070038#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
39#include "src/core/ext/filters/client_channel/resolver_registry.h"
40#include "src/core/ext/filters/client_channel/retry_throttle.h"
41#include "src/core/ext/filters/client_channel/subchannel.h"
Craig Tiller3be7dd02017-04-03 14:30:03 -070042#include "src/core/ext/filters/deadline/deadline_filter.h"
Mark D. Roth718c8342018-02-28 13:00:04 -080043#include "src/core/lib/backoff/backoff.h"
Craig Tiller9533d042016-03-25 17:11:06 -070044#include "src/core/lib/channel/channel_args.h"
45#include "src/core/lib/channel/connected_channel.h"
ncteisen3b42f832018-03-19 13:22:35 -070046#include "src/core/lib/channel/status_util.h"
Mark D. Rothdbdf4952018-01-18 11:21:12 -080047#include "src/core/lib/gpr/string.h"
Mark D. Roth718c8342018-02-28 13:00:04 -080048#include "src/core/lib/gprpp/inlined_vector.h"
49#include "src/core/lib/gprpp/manual_constructor.h"
Craig Tillerbefafe62017-02-09 11:30:54 -080050#include "src/core/lib/iomgr/combiner.h"
Craig Tiller9533d042016-03-25 17:11:06 -070051#include "src/core/lib/iomgr/iomgr.h"
Mark D. Roth4c0fe492016-08-31 13:51:55 -070052#include "src/core/lib/iomgr/polling_entity.h"
Craig Tiller9533d042016-03-25 17:11:06 -070053#include "src/core/lib/profiling/timers.h"
Craig Tiller7c70b6c2017-01-23 07:48:42 -080054#include "src/core/lib/slice/slice_internal.h"
Mark D. Roth718c8342018-02-28 13:00:04 -080055#include "src/core/lib/slice/slice_string_helpers.h"
Craig Tiller9533d042016-03-25 17:11:06 -070056#include "src/core/lib/surface/channel.h"
57#include "src/core/lib/transport/connectivity_state.h"
Mark D. Roth718c8342018-02-28 13:00:04 -080058#include "src/core/lib/transport/error_utils.h"
Mark D. Roth9fe284e2016-09-12 11:22:27 -070059#include "src/core/lib/transport/metadata.h"
60#include "src/core/lib/transport/metadata_batch.h"
Mark D. Rothea846a02016-11-03 11:32:54 -070061#include "src/core/lib/transport/service_config.h"
Mark D. Roth9fe284e2016-09-12 11:22:27 -070062#include "src/core/lib/transport/static_metadata.h"
Mark D. Roth718c8342018-02-28 13:00:04 -080063#include "src/core/lib/transport/status_metadata.h"
Craig Tiller8910ac62015-10-08 16:49:15 -070064
Mark D. Roth3e7f2df2018-02-26 13:17:06 -080065using grpc_core::internal::ClientChannelMethodParams;
Mark D. Roth9db86fc2018-03-28 07:42:20 -070066using grpc_core::internal::ServerRetryThrottleData;
Mark D. Roth3e7f2df2018-02-26 13:17:06 -080067
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080068/* Client channel implementation */
69
Mark D. Roth718c8342018-02-28 13:00:04 -080070// By default, we buffer 256 KiB per RPC for retries.
71// TODO(roth): Do we have any data to suggest a better value?
72#define DEFAULT_PER_RPC_RETRY_BUFFER_SIZE (256 << 10)
73
74// This value was picked arbitrarily. It can be changed if there is
75// any even moderately compelling reason to do so.
76#define RETRY_BACKOFF_JITTER 0.2
77
Craig Tiller694580f2017-10-18 14:48:14 -070078grpc_core::TraceFlag grpc_client_channel_trace(false, "client_channel");
Mark D. Roth60751fe2017-07-07 12:50:33 -070079
Mark D. Roth26b7be42016-10-24 10:08:07 -070080/*************************************************************************
Mark D. Roth3e7f2df2018-02-26 13:17:06 -080081 * CHANNEL-WIDE FUNCTIONS
Mark D. Roth26b7be42016-10-24 10:08:07 -070082 */
83
Alexander Polcync3b1f182017-04-18 13:51:36 -070084struct external_connectivity_watcher;
85
Mark D. Roth3e7f2df2018-02-26 13:17:06 -080086typedef grpc_core::SliceHashTable<
87 grpc_core::RefCountedPtr<ClientChannelMethodParams>>
88 MethodParamsTable;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080089
Craig Tiller800dacb2015-10-06 09:10:26 -070090typedef struct client_channel_channel_data {
Mark D. Roth209f6442018-02-08 10:26:46 -080091 grpc_core::OrphanablePtr<grpc_core::Resolver> resolver;
Mark D. Roth4c0fe492016-08-31 13:51:55 -070092 bool started_resolving;
Craig Tiller3be7dd02017-04-03 14:30:03 -070093 bool deadline_checking_enabled;
Craig Tillerbaa14a92017-11-03 09:09:36 -070094 grpc_client_channel_factory* client_channel_factory;
Mark D. Roth718c8342018-02-28 13:00:04 -080095 bool enable_retries;
96 size_t per_rpc_retry_buffer_size;
Craig Tillerf5f17122015-06-25 08:47:26 -070097
Craig Tillerbefafe62017-02-09 11:30:54 -080098 /** combiner protecting all variables below in this data structure */
Craig Tillerbaa14a92017-11-03 09:09:36 -070099 grpc_combiner* combiner;
Mark D. Roth046cf762016-09-26 11:13:51 -0700100 /** currently active load balancer */
Mark D. Rothc8875492018-02-20 08:33:48 -0800101 grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> lb_policy;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800102 /** retry throttle data */
Mark D. Roth9db86fc2018-03-28 07:42:20 -0700103 grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
Mark D. Roth9d480942016-10-19 14:18:05 -0700104 /** maps method names to method_parameters structs */
Mark D. Roth3e7f2df2018-02-26 13:17:06 -0800105 grpc_core::RefCountedPtr<MethodParamsTable> method_params_table;
Mark D. Roth046cf762016-09-26 11:13:51 -0700106 /** incoming resolver result - set by resolver.next() */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700107 grpc_channel_args* resolver_result;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700108 /** a list of closures that are all waiting for resolver result to come in */
109 grpc_closure_list waiting_for_resolver_result_closures;
Craig Tiller3f475422015-06-25 10:43:05 -0700110 /** resolver callback */
Mark D. Rothff4df062016-08-22 15:02:49 -0700111 grpc_closure on_resolver_result_changed;
Craig Tiller3f475422015-06-25 10:43:05 -0700112 /** connectivity state being tracked */
Craig Tillerca3e9d32015-06-27 18:37:27 -0700113 grpc_connectivity_state_tracker state_tracker;
Craig Tiller48cb07c2015-07-15 16:16:15 -0700114 /** when an lb_policy arrives, should we try to exit idle */
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700115 bool exit_idle_when_lb_policy_arrives;
Craig Tiller906e3bc2015-11-24 07:31:31 -0800116 /** owning stack */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700117 grpc_channel_stack* owning_stack;
Craig Tiller69b093b2016-02-25 19:04:07 -0800118 /** interested parties (owned) */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700119 grpc_pollset_set* interested_parties;
Craig Tiller613dafa2017-02-09 12:00:43 -0800120
Alexander Polcync3b1f182017-04-18 13:51:36 -0700121 /* external_connectivity_watcher_list head is guarded by its own mutex, since
122 * counts need to be grabbed immediately without polling on a cq */
123 gpr_mu external_connectivity_watcher_list_mu;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700124 struct external_connectivity_watcher* external_connectivity_watcher_list_head;
Alexander Polcync3b1f182017-04-18 13:51:36 -0700125
Mark D. Roth718c8342018-02-28 13:00:04 -0800126 /* the following properties are guarded by a mutex since APIs require them
Craig Tiller46dd7902017-02-23 09:42:16 -0800127 to be instantaneously available */
Craig Tiller613dafa2017-02-09 12:00:43 -0800128 gpr_mu info_mu;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700129 char* info_lb_policy_name;
Craig Tiller613dafa2017-02-09 12:00:43 -0800130 /** service config in JSON form */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700131 char* info_service_config_json;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800132} channel_data;
133
Juanli Shen592cf342017-12-04 20:52:01 -0800134typedef struct {
135 channel_data* chand;
136 /** used as an identifier, don't dereference it because the LB policy may be
137 * non-existing when the callback is run */
Mark D. Rothc8875492018-02-20 08:33:48 -0800138 grpc_core::LoadBalancingPolicy* lb_policy;
Juanli Shen592cf342017-12-04 20:52:01 -0800139 grpc_closure closure;
140} reresolution_request_args;
141
Craig Tillerd6c98df2015-08-18 09:33:44 -0700142/** We create one watcher for each new lb_policy that is returned from a
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700143 resolver, to watch for state changes from the lb_policy. When a state
144 change is seen, we update the channel, and create a new watcher. */
Craig Tillera82950e2015-09-22 12:33:20 -0700145typedef struct {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700146 channel_data* chand;
Craig Tiller33825112015-09-18 07:44:19 -0700147 grpc_closure on_changed;
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700148 grpc_connectivity_state state;
Mark D. Rothc8875492018-02-20 08:33:48 -0800149 grpc_core::LoadBalancingPolicy* lb_policy;
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700150} lb_policy_connectivity_watcher;
151
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800152static void watch_lb_policy_locked(channel_data* chand,
Mark D. Rothc8875492018-02-20 08:33:48 -0800153 grpc_core::LoadBalancingPolicy* lb_policy,
Craig Tiller2400bf52017-02-09 16:25:19 -0800154 grpc_connectivity_state current_state);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700155
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800156static void set_channel_connectivity_state_locked(channel_data* chand,
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800157 grpc_connectivity_state state,
Craig Tillerbaa14a92017-11-03 09:09:36 -0700158 grpc_error* error,
159 const char* reason) {
David Garcia Quintas37251282017-04-14 13:46:03 -0700160 /* TODO: Improve failure handling:
161 * - Make it possible for policies to return GRPC_CHANNEL_TRANSIENT_FAILURE.
162 * - Hand over pending picks from old policies during the switch that happens
163 * when resolver provides an update. */
Craig Tiller4782d922017-11-10 09:53:21 -0800164 if (chand->lb_policy != nullptr) {
David Garcia Quintas956f7002017-04-13 15:40:06 -0700165 if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
166 /* cancel picks with wait_for_ready=false */
Mark D. Rothc8875492018-02-20 08:33:48 -0800167 chand->lb_policy->CancelMatchingPicksLocked(
David Garcia Quintas956f7002017-04-13 15:40:06 -0700168 /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY,
169 /* check= */ 0, GRPC_ERROR_REF(error));
170 } else if (state == GRPC_CHANNEL_SHUTDOWN) {
171 /* cancel all picks */
Mark D. Rothc8875492018-02-20 08:33:48 -0800172 chand->lb_policy->CancelMatchingPicksLocked(/* mask= */ 0, /* check= */ 0,
173 GRPC_ERROR_REF(error));
David Garcia Quintas956f7002017-04-13 15:40:06 -0700174 }
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800175 }
Craig Tiller6014e8a2017-10-16 13:50:29 -0700176 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -0700177 gpr_log(GPR_INFO, "chand=%p: setting connectivity state to %s", chand,
Mark D. Roth60751fe2017-07-07 12:50:33 -0700178 grpc_connectivity_state_name(state));
179 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800180 grpc_connectivity_state_set(&chand->state_tracker, state, error, reason);
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800181}
182
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800183static void on_lb_policy_state_changed_locked(void* arg, grpc_error* error) {
Noah Eisen4d20a662018-02-09 09:34:04 -0800184 lb_policy_connectivity_watcher* w =
185 static_cast<lb_policy_connectivity_watcher*>(arg);
Craig Tillerc5de8352017-02-09 14:08:05 -0800186 /* check if the notification is for the latest policy */
Mark D. Rothc8875492018-02-20 08:33:48 -0800187 if (w->lb_policy == w->chand->lb_policy.get()) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700188 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -0700189 gpr_log(GPR_INFO, "chand=%p: lb_policy=%p state changed to %s", w->chand,
Mark D. Roth60751fe2017-07-07 12:50:33 -0700190 w->lb_policy, grpc_connectivity_state_name(w->state));
191 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800192 set_channel_connectivity_state_locked(w->chand, w->state,
Craig Tillerc5de8352017-02-09 14:08:05 -0800193 GRPC_ERROR_REF(error), "lb_changed");
194 if (w->state != GRPC_CHANNEL_SHUTDOWN) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800195 watch_lb_policy_locked(w->chand, w->lb_policy, w->state);
Craig Tillerc5de8352017-02-09 14:08:05 -0800196 }
Craig Tillera82950e2015-09-22 12:33:20 -0700197 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800198 GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack, "watch_lb_policy");
Craig Tillera82950e2015-09-22 12:33:20 -0700199 gpr_free(w);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700200}
201
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800202static void watch_lb_policy_locked(channel_data* chand,
Mark D. Rothc8875492018-02-20 08:33:48 -0800203 grpc_core::LoadBalancingPolicy* lb_policy,
Craig Tiller2400bf52017-02-09 16:25:19 -0800204 grpc_connectivity_state current_state) {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700205 lb_policy_connectivity_watcher* w =
Noah Eisenbe82e642018-02-09 09:16:55 -0800206 static_cast<lb_policy_connectivity_watcher*>(gpr_malloc(sizeof(*w)));
Craig Tiller906e3bc2015-11-24 07:31:31 -0800207 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy");
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700208 w->chand = chand;
ncteisen274bbbe2017-06-08 14:57:11 -0700209 GRPC_CLOSURE_INIT(&w->on_changed, on_lb_policy_state_changed_locked, w,
Craig Tilleree4b1452017-05-12 10:56:03 -0700210 grpc_combiner_scheduler(chand->combiner));
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700211 w->state = current_state;
212 w->lb_policy = lb_policy;
Mark D. Rothc8875492018-02-20 08:33:48 -0800213 lb_policy->NotifyOnStateChangeLocked(&w->state, &w->on_changed);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700214}
215
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800216static void start_resolving_locked(channel_data* chand) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700217 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -0700218 gpr_log(GPR_INFO, "chand=%p: starting name resolution", chand);
Mark D. Roth60751fe2017-07-07 12:50:33 -0700219 }
220 GPR_ASSERT(!chand->started_resolving);
221 chand->started_resolving = true;
222 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
Mark D. Roth209f6442018-02-08 10:26:46 -0800223 chand->resolver->NextLocked(&chand->resolver_result,
224 &chand->on_resolver_result_changed);
Mark D. Roth60751fe2017-07-07 12:50:33 -0700225}
226
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800227typedef struct {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700228 char* server_name;
Mark D. Roth9db86fc2018-03-28 07:42:20 -0700229 grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800230} service_config_parsing_state;
231
Mark D. Roth3e7f2df2018-02-26 13:17:06 -0800232static void parse_retry_throttle_params(
233 const grpc_json* field, service_config_parsing_state* parsing_state) {
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800234 if (strcmp(field->key, "retryThrottling") == 0) {
Craig Tiller4782d922017-11-10 09:53:21 -0800235 if (parsing_state->retry_throttle_data != nullptr) return; // Duplicate.
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800236 if (field->type != GRPC_JSON_OBJECT) return;
237 int max_milli_tokens = 0;
238 int milli_token_ratio = 0;
Craig Tiller4782d922017-11-10 09:53:21 -0800239 for (grpc_json* sub_field = field->child; sub_field != nullptr;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800240 sub_field = sub_field->next) {
Craig Tiller4782d922017-11-10 09:53:21 -0800241 if (sub_field->key == nullptr) return;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800242 if (strcmp(sub_field->key, "maxTokens") == 0) {
243 if (max_milli_tokens != 0) return; // Duplicate.
244 if (sub_field->type != GRPC_JSON_NUMBER) return;
245 max_milli_tokens = gpr_parse_nonnegative_int(sub_field->value);
246 if (max_milli_tokens == -1) return;
247 max_milli_tokens *= 1000;
248 } else if (strcmp(sub_field->key, "tokenRatio") == 0) {
249 if (milli_token_ratio != 0) return; // Duplicate.
250 if (sub_field->type != GRPC_JSON_NUMBER) return;
251 // We support up to 3 decimal digits.
252 size_t whole_len = strlen(sub_field->value);
253 uint32_t multiplier = 1;
254 uint32_t decimal_value = 0;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700255 const char* decimal_point = strchr(sub_field->value, '.');
Craig Tiller4782d922017-11-10 09:53:21 -0800256 if (decimal_point != nullptr) {
Noah Eisenbe82e642018-02-09 09:16:55 -0800257 whole_len = static_cast<size_t>(decimal_point - sub_field->value);
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800258 multiplier = 1000;
259 size_t decimal_len = strlen(decimal_point + 1);
260 if (decimal_len > 3) decimal_len = 3;
261 if (!gpr_parse_bytes_to_uint32(decimal_point + 1, decimal_len,
262 &decimal_value)) {
263 return;
264 }
265 uint32_t decimal_multiplier = 1;
266 for (size_t i = 0; i < (3 - decimal_len); ++i) {
267 decimal_multiplier *= 10;
268 }
269 decimal_value *= decimal_multiplier;
270 }
271 uint32_t whole_value;
272 if (!gpr_parse_bytes_to_uint32(sub_field->value, whole_len,
273 &whole_value)) {
274 return;
275 }
Noah Eisen4d20a662018-02-09 09:34:04 -0800276 milli_token_ratio =
277 static_cast<int>((whole_value * multiplier) + decimal_value);
Mark D. Rothb3322562017-02-23 14:38:02 -0800278 if (milli_token_ratio <= 0) return;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800279 }
280 }
281 parsing_state->retry_throttle_data =
Mark D. Roth9db86fc2018-03-28 07:42:20 -0700282 grpc_core::internal::ServerRetryThrottleMap::GetDataForServer(
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800283 parsing_state->server_name, max_milli_tokens, milli_token_ratio);
284 }
285}
286
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800287static void request_reresolution_locked(void* arg, grpc_error* error) {
Noah Eisen4d20a662018-02-09 09:34:04 -0800288 reresolution_request_args* args =
289 static_cast<reresolution_request_args*>(arg);
Juanli Shen592cf342017-12-04 20:52:01 -0800290 channel_data* chand = args->chand;
291 // If this invocation is for a stale LB policy, treat it as an LB shutdown
292 // signal.
Mark D. Rothc8875492018-02-20 08:33:48 -0800293 if (args->lb_policy != chand->lb_policy.get() || error != GRPC_ERROR_NONE ||
Juanli Shen592cf342017-12-04 20:52:01 -0800294 chand->resolver == nullptr) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800295 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "re-resolution");
Juanli Shen592cf342017-12-04 20:52:01 -0800296 gpr_free(args);
297 return;
298 }
299 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -0700300 gpr_log(GPR_INFO, "chand=%p: started name re-resolving", chand);
Juanli Shen592cf342017-12-04 20:52:01 -0800301 }
Mark D. Roth209f6442018-02-08 10:26:46 -0800302 chand->resolver->RequestReresolutionLocked();
Juanli Shen592cf342017-12-04 20:52:01 -0800303 // Give back the closure to the LB policy.
Mark D. Rothc8875492018-02-20 08:33:48 -0800304 chand->lb_policy->SetReresolutionClosureLocked(&args->closure);
Juanli Shen592cf342017-12-04 20:52:01 -0800305}
306
Mark D. Rothe63e06d2018-03-23 08:12:11 -0700307// TODO(roth): The logic in this function is very hard to follow. We
308// should refactor this so that it's easier to understand, perhaps as
309// part of changing the resolver API to more clearly differentiate
310// between transient failures and shutdown.
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800311static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
Noah Eisenbe82e642018-02-09 09:16:55 -0800312 channel_data* chand = static_cast<channel_data*>(arg);
Craig Tiller6014e8a2017-10-16 13:50:29 -0700313 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -0700314 gpr_log(GPR_INFO,
Mark D. Rothe63e06d2018-03-23 08:12:11 -0700315 "chand=%p: got resolver result: resolver_result=%p error=%s", chand,
316 chand->resolver_result, grpc_error_string(error));
Mark D. Roth60751fe2017-07-07 12:50:33 -0700317 }
Mark D. Roth718c8342018-02-28 13:00:04 -0800318 // Extract the following fields from the resolver result, if non-nullptr.
Mark D. Roth15494b52017-07-12 15:26:55 -0700319 bool lb_policy_updated = false;
Mark D. Rothc8875492018-02-20 08:33:48 -0800320 bool lb_policy_created = false;
Craig Tiller4782d922017-11-10 09:53:21 -0800321 char* lb_policy_name_dup = nullptr;
Mark D. Roth60751fe2017-07-07 12:50:33 -0700322 bool lb_policy_name_changed = false;
Mark D. Rothc8875492018-02-20 08:33:48 -0800323 grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> new_lb_policy;
Craig Tiller4782d922017-11-10 09:53:21 -0800324 char* service_config_json = nullptr;
Mark D. Roth9db86fc2018-03-28 07:42:20 -0700325 grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
Mark D. Roth3e7f2df2018-02-26 13:17:06 -0800326 grpc_core::RefCountedPtr<MethodParamsTable> method_params_table;
Craig Tiller4782d922017-11-10 09:53:21 -0800327 if (chand->resolver_result != nullptr) {
Juanli Shen592cf342017-12-04 20:52:01 -0800328 if (chand->resolver != nullptr) {
329 // Find LB policy name.
Noah Eisen7ea8a602018-06-14 11:43:18 -0400330 const grpc_arg* channel_arg = grpc_channel_args_find(
Juanli Shen592cf342017-12-04 20:52:01 -0800331 chand->resolver_result, GRPC_ARG_LB_POLICY_NAME);
Noah Eisen7ea8a602018-06-14 11:43:18 -0400332 const char* lb_policy_name = grpc_channel_arg_get_string(channel_arg);
Juanli Shen592cf342017-12-04 20:52:01 -0800333 // Special case: If at least one balancer address is present, we use
334 // the grpclb policy, regardless of what the resolver actually specified.
Noah Eisen7ea8a602018-06-14 11:43:18 -0400335 channel_arg =
336 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES);
337 if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) {
338 grpc_lb_addresses* addresses =
339 static_cast<grpc_lb_addresses*>(channel_arg->value.pointer.p);
Juanli Shen592cf342017-12-04 20:52:01 -0800340 bool found_balancer_address = false;
341 for (size_t i = 0; i < addresses->num_addresses; ++i) {
342 if (addresses->addresses[i].is_balancer) {
343 found_balancer_address = true;
344 break;
345 }
346 }
347 if (found_balancer_address) {
348 if (lb_policy_name != nullptr &&
349 strcmp(lb_policy_name, "grpclb") != 0) {
350 gpr_log(GPR_INFO,
351 "resolver requested LB policy %s but provided at least one "
352 "balancer address -- forcing use of grpclb LB policy",
353 lb_policy_name);
354 }
355 lb_policy_name = "grpclb";
356 }
357 }
358 // Use pick_first if nothing was specified and we didn't select grpclb
359 // above.
360 if (lb_policy_name == nullptr) lb_policy_name = "pick_first";
Juanli Shen592cf342017-12-04 20:52:01 -0800361 // Check to see if we're already using the right LB policy.
362 // Note: It's safe to use chand->info_lb_policy_name here without
363 // taking a lock on chand->info_mu, because this function is the
364 // only thing that modifies its value, and it can only be invoked
365 // once at any given time.
366 lb_policy_name_changed =
367 chand->info_lb_policy_name == nullptr ||
368 gpr_stricmp(chand->info_lb_policy_name, lb_policy_name) != 0;
369 if (chand->lb_policy != nullptr && !lb_policy_name_changed) {
370 // Continue using the same LB policy. Update with new addresses.
371 lb_policy_updated = true;
Mark D. Rothc8875492018-02-20 08:33:48 -0800372 chand->lb_policy->UpdateLocked(*chand->resolver_result);
Juanli Shen592cf342017-12-04 20:52:01 -0800373 } else {
374 // Instantiate new LB policy.
Mark D. Rothc8875492018-02-20 08:33:48 -0800375 grpc_core::LoadBalancingPolicy::Args lb_policy_args;
376 lb_policy_args.combiner = chand->combiner;
377 lb_policy_args.client_channel_factory = chand->client_channel_factory;
378 lb_policy_args.args = chand->resolver_result;
379 new_lb_policy =
380 grpc_core::LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
381 lb_policy_name, lb_policy_args);
Yash Tibrewal7f51ba82018-04-12 13:21:20 -0700382 if (GPR_UNLIKELY(new_lb_policy == nullptr)) {
Juanli Shen592cf342017-12-04 20:52:01 -0800383 gpr_log(GPR_ERROR, "could not create LB policy \"%s\"",
384 lb_policy_name);
385 } else {
Mark D. Roth3ef4af22018-02-21 07:53:26 -0800386 lb_policy_created = true;
Juanli Shen592cf342017-12-04 20:52:01 -0800387 reresolution_request_args* args =
Noah Eisen4d20a662018-02-09 09:34:04 -0800388 static_cast<reresolution_request_args*>(
389 gpr_zalloc(sizeof(*args)));
Juanli Shen592cf342017-12-04 20:52:01 -0800390 args->chand = chand;
Mark D. Rothc8875492018-02-20 08:33:48 -0800391 args->lb_policy = new_lb_policy.get();
Juanli Shen592cf342017-12-04 20:52:01 -0800392 GRPC_CLOSURE_INIT(&args->closure, request_reresolution_locked, args,
393 grpc_combiner_scheduler(chand->combiner));
394 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "re-resolution");
Mark D. Rothc8875492018-02-20 08:33:48 -0800395 new_lb_policy->SetReresolutionClosureLocked(&args->closure);
Juanli Shen592cf342017-12-04 20:52:01 -0800396 }
397 }
Mark D. Roth718c8342018-02-28 13:00:04 -0800398 // Before we clean up, save a copy of lb_policy_name, since it might
399 // be pointing to data inside chand->resolver_result.
400 // The copy will be saved in chand->lb_policy_name below.
401 lb_policy_name_dup = gpr_strdup(lb_policy_name);
Juanli Shen592cf342017-12-04 20:52:01 -0800402 // Find service config.
Noah Eisen7ea8a602018-06-14 11:43:18 -0400403 channel_arg = grpc_channel_args_find(chand->resolver_result,
404 GRPC_ARG_SERVICE_CONFIG);
405 service_config_json =
406 gpr_strdup(grpc_channel_arg_get_string(channel_arg));
ncteisenbf323a92018-02-14 17:34:05 -0800407 if (service_config_json != nullptr) {
Mark D. Roth3e7f2df2018-02-26 13:17:06 -0800408 grpc_core::UniquePtr<grpc_core::ServiceConfig> service_config =
409 grpc_core::ServiceConfig::Create(service_config_json);
Juanli Shen592cf342017-12-04 20:52:01 -0800410 if (service_config != nullptr) {
Mark D. Roth718c8342018-02-28 13:00:04 -0800411 if (chand->enable_retries) {
Noah Eisen7ea8a602018-06-14 11:43:18 -0400412 channel_arg = grpc_channel_args_find(chand->resolver_result,
413 GRPC_ARG_SERVER_URI);
414 const char* server_uri = grpc_channel_arg_get_string(channel_arg);
Mark D. Roth718c8342018-02-28 13:00:04 -0800415 GPR_ASSERT(server_uri != nullptr);
416 grpc_uri* uri = grpc_uri_parse(server_uri, true);
417 GPR_ASSERT(uri->path[0] != '\0');
418 service_config_parsing_state parsing_state;
419 memset(&parsing_state, 0, sizeof(parsing_state));
420 parsing_state.server_name =
421 uri->path[0] == '/' ? uri->path + 1 : uri->path;
422 service_config->ParseGlobalParams(parse_retry_throttle_params,
423 &parsing_state);
424 grpc_uri_destroy(uri);
Mark D. Roth9db86fc2018-03-28 07:42:20 -0700425 retry_throttle_data = std::move(parsing_state.retry_throttle_data);
Mark D. Roth718c8342018-02-28 13:00:04 -0800426 }
Mark D. Roth3e7f2df2018-02-26 13:17:06 -0800427 method_params_table = service_config->CreateMethodConfigTable(
428 ClientChannelMethodParams::CreateFromJson);
Juanli Shen592cf342017-12-04 20:52:01 -0800429 }
430 }
Mark D. Roth9fe284e2016-09-12 11:22:27 -0700431 }
Craig Tillera82950e2015-09-22 12:33:20 -0700432 }
Craig Tiller6014e8a2017-10-16 13:50:29 -0700433 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -0700434 gpr_log(GPR_INFO,
Mark D. Roth60751fe2017-07-07 12:50:33 -0700435 "chand=%p: resolver result: lb_policy_name=\"%s\"%s, "
436 "service_config=\"%s\"",
Yash Tibrewal9eb86722017-09-17 23:43:30 -0700437 chand, lb_policy_name_dup,
438 lb_policy_name_changed ? " (changed)" : "", service_config_json);
Mark D. Roth60751fe2017-07-07 12:50:33 -0700439 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700440 // Now swap out fields in chand. Note that the new values may still
Mark D. Roth718c8342018-02-28 13:00:04 -0800441 // be nullptr if (e.g.) the resolver failed to return results or the
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700442 // results did not contain the necessary data.
443 //
444 // First, swap out the data used by cc_get_channel_info().
Craig Tiller613dafa2017-02-09 12:00:43 -0800445 gpr_mu_lock(&chand->info_mu);
Craig Tiller4782d922017-11-10 09:53:21 -0800446 if (lb_policy_name_dup != nullptr) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800447 gpr_free(chand->info_lb_policy_name);
Yash Tibrewal9eb86722017-09-17 23:43:30 -0700448 chand->info_lb_policy_name = lb_policy_name_dup;
Mark D. Rothb2d24882016-10-27 15:44:07 -0700449 }
Craig Tiller4782d922017-11-10 09:53:21 -0800450 if (service_config_json != nullptr) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800451 gpr_free(chand->info_service_config_json);
452 chand->info_service_config_json = service_config_json;
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800453 }
Craig Tiller613dafa2017-02-09 12:00:43 -0800454 gpr_mu_unlock(&chand->info_mu);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700455 // Swap out the retry throttle data.
Mark D. Roth9db86fc2018-03-28 07:42:20 -0700456 chand->retry_throttle_data = std::move(retry_throttle_data);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700457 // Swap out the method params table.
Mark D. Roth3e7f2df2018-02-26 13:17:06 -0800458 chand->method_params_table = std::move(method_params_table);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700459 // If we have a new LB policy or are shutting down (in which case
Mark D. Roth718c8342018-02-28 13:00:04 -0800460 // new_lb_policy will be nullptr), swap out the LB policy, unreffing the
461 // old one and removing its fds from chand->interested_parties.
462 // Note that we do NOT do this if either (a) we updated the existing
463 // LB policy above or (b) we failed to create the new LB policy (in
464 // which case we want to continue using the most recent one we had).
Craig Tiller4782d922017-11-10 09:53:21 -0800465 if (new_lb_policy != nullptr || error != GRPC_ERROR_NONE ||
466 chand->resolver == nullptr) {
467 if (chand->lb_policy != nullptr) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700468 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -0700469 gpr_log(GPR_INFO, "chand=%p: unreffing lb_policy=%p", chand,
Mark D. Rothc8875492018-02-20 08:33:48 -0800470 chand->lb_policy.get());
Mark D. Roth60751fe2017-07-07 12:50:33 -0700471 }
Mark D. Rothc8875492018-02-20 08:33:48 -0800472 grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700473 chand->interested_parties);
Mark D. Rothc8875492018-02-20 08:33:48 -0800474 chand->lb_policy->HandOffPendingPicksLocked(new_lb_policy.get());
475 chand->lb_policy.reset();
Craig Tiller45724b32015-09-22 10:42:19 -0700476 }
Mark D. Rothc8875492018-02-20 08:33:48 -0800477 chand->lb_policy = std::move(new_lb_policy);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700478 }
479 // Now that we've swapped out the relevant fields of chand, check for
480 // error or shutdown.
Craig Tiller4782d922017-11-10 09:53:21 -0800481 if (error != GRPC_ERROR_NONE || chand->resolver == nullptr) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700482 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -0700483 gpr_log(GPR_INFO, "chand=%p: shutting down", chand);
Mark D. Roth60751fe2017-07-07 12:50:33 -0700484 }
Craig Tiller4782d922017-11-10 09:53:21 -0800485 if (chand->resolver != nullptr) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700486 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -0700487 gpr_log(GPR_INFO, "chand=%p: shutting down resolver", chand);
Mark D. Roth60751fe2017-07-07 12:50:33 -0700488 }
Mark D. Roth209f6442018-02-08 10:26:46 -0800489 chand->resolver.reset();
Craig Tiller76a5c0e2016-03-09 09:05:30 -0800490 }
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800491 set_channel_connectivity_state_locked(
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800492 chand, GRPC_CHANNEL_SHUTDOWN,
ncteisen4b36a3d2017-03-13 19:08:06 -0700493 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700494 "Got resolver result after disconnection", &error, 1),
Craig Tiller804ff712016-05-05 16:25:40 -0700495 "resolver_gone");
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700496 grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures,
497 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
498 "Channel disconnected", &error, 1));
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800499 GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
Mark D. Roth1b95f472018-02-15 12:54:02 -0800500 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "resolver");
Mark D. Rothe63e06d2018-03-23 08:12:11 -0700501 grpc_channel_args_destroy(chand->resolver_result);
502 chand->resolver_result = nullptr;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700503 } else { // Not shutting down.
504 grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700505 grpc_error* state_error =
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700506 GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy");
Mark D. Rothc8875492018-02-20 08:33:48 -0800507 if (lb_policy_created) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700508 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -0700509 gpr_log(GPR_INFO, "chand=%p: initializing new LB policy", chand);
Mark D. Roth60751fe2017-07-07 12:50:33 -0700510 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700511 GRPC_ERROR_UNREF(state_error);
Mark D. Rothc8875492018-02-20 08:33:48 -0800512 state = chand->lb_policy->CheckConnectivityLocked(&state_error);
513 grpc_pollset_set_add_pollset_set(chand->lb_policy->interested_parties(),
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700514 chand->interested_parties);
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800515 GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700516 if (chand->exit_idle_when_lb_policy_arrives) {
Mark D. Rothc8875492018-02-20 08:33:48 -0800517 chand->lb_policy->ExitIdleLocked();
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700518 chand->exit_idle_when_lb_policy_arrives = false;
519 }
Mark D. Rothc8875492018-02-20 08:33:48 -0800520 watch_lb_policy_locked(chand, chand->lb_policy.get(), state);
Mark D. Rothe63e06d2018-03-23 08:12:11 -0700521 } else if (chand->resolver_result == nullptr) {
522 // Transient failure.
523 GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700524 }
Mark D. Roth15494b52017-07-12 15:26:55 -0700525 if (!lb_policy_updated) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800526 set_channel_connectivity_state_locked(
527 chand, state, GRPC_ERROR_REF(state_error), "new_lb+resolver");
Mark D. Roth15494b52017-07-12 15:26:55 -0700528 }
Mark D. Rothe63e06d2018-03-23 08:12:11 -0700529 grpc_channel_args_destroy(chand->resolver_result);
530 chand->resolver_result = nullptr;
Mark D. Roth209f6442018-02-08 10:26:46 -0800531 chand->resolver->NextLocked(&chand->resolver_result,
532 &chand->on_resolver_result_changed);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700533 GRPC_ERROR_UNREF(state_error);
Craig Tillera82950e2015-09-22 12:33:20 -0700534 }
Craig Tiller3f475422015-06-25 10:43:05 -0700535}
536
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800537static void start_transport_op_locked(void* arg, grpc_error* error_ignored) {
Noah Eisenbe82e642018-02-09 09:16:55 -0800538 grpc_transport_op* op = static_cast<grpc_transport_op*>(arg);
Craig Tillerbaa14a92017-11-03 09:09:36 -0700539 grpc_channel_element* elem =
Noah Eisenbe82e642018-02-09 09:16:55 -0800540 static_cast<grpc_channel_element*>(op->handler_private.extra_arg);
541 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Craig Tiller000cd8f2015-09-18 07:20:29 -0700542
Craig Tiller4782d922017-11-10 09:53:21 -0800543 if (op->on_connectivity_state_change != nullptr) {
Craig Tillera82950e2015-09-22 12:33:20 -0700544 grpc_connectivity_state_notify_on_state_change(
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800545 &chand->state_tracker, op->connectivity_state,
Craig Tillera82950e2015-09-22 12:33:20 -0700546 op->on_connectivity_state_change);
Craig Tiller4782d922017-11-10 09:53:21 -0800547 op->on_connectivity_state_change = nullptr;
548 op->connectivity_state = nullptr;
Craig Tillera82950e2015-09-22 12:33:20 -0700549 }
550
Yuchen Zengc272dd72017-12-05 12:18:34 -0800551 if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
Craig Tiller4782d922017-11-10 09:53:21 -0800552 if (chand->lb_policy == nullptr) {
ncteisen274bbbe2017-06-08 14:57:11 -0700553 GRPC_CLOSURE_SCHED(
Yash Tibrewald6c292f2017-12-07 19:38:43 -0800554 op->send_ping.on_initiate,
Yuchen Zengc272dd72017-12-05 12:18:34 -0800555 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing"));
556 GRPC_CLOSURE_SCHED(
Yash Tibrewald6c292f2017-12-07 19:38:43 -0800557 op->send_ping.on_ack,
ncteisen4b36a3d2017-03-13 19:08:06 -0700558 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing"));
Craig Tiller26dab312015-12-07 14:43:47 -0800559 } else {
Mark D. Rothc8875492018-02-20 08:33:48 -0800560 chand->lb_policy->PingOneLocked(op->send_ping.on_initiate,
561 op->send_ping.on_ack);
Craig Tiller4782d922017-11-10 09:53:21 -0800562 op->bind_pollset = nullptr;
Craig Tiller26dab312015-12-07 14:43:47 -0800563 }
Yuchen Zengc272dd72017-12-05 12:18:34 -0800564 op->send_ping.on_initiate = nullptr;
565 op->send_ping.on_ack = nullptr;
Craig Tiller26dab312015-12-07 14:43:47 -0800566 }
567
Craig Tiller1c51edc2016-05-07 16:18:43 -0700568 if (op->disconnect_with_error != GRPC_ERROR_NONE) {
Craig Tiller4782d922017-11-10 09:53:21 -0800569 if (chand->resolver != nullptr) {
Craig Tiller1c51edc2016-05-07 16:18:43 -0700570 set_channel_connectivity_state_locked(
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800571 chand, GRPC_CHANNEL_SHUTDOWN,
Craig Tiller1c51edc2016-05-07 16:18:43 -0700572 GRPC_ERROR_REF(op->disconnect_with_error), "disconnect");
Mark D. Roth209f6442018-02-08 10:26:46 -0800573 chand->resolver.reset();
Craig Tiller1c51edc2016-05-07 16:18:43 -0700574 if (!chand->started_resolving) {
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700575 grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures,
Craig Tiller1c51edc2016-05-07 16:18:43 -0700576 GRPC_ERROR_REF(op->disconnect_with_error));
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800577 GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
Craig Tiller1c51edc2016-05-07 16:18:43 -0700578 }
Craig Tiller4782d922017-11-10 09:53:21 -0800579 if (chand->lb_policy != nullptr) {
Mark D. Rothc8875492018-02-20 08:33:48 -0800580 grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
Craig Tiller1c51edc2016-05-07 16:18:43 -0700581 chand->interested_parties);
Mark D. Rothc8875492018-02-20 08:33:48 -0800582 chand->lb_policy.reset();
Craig Tiller1c51edc2016-05-07 16:18:43 -0700583 }
Craig Tillerb12d22a2016-04-23 12:50:21 -0700584 }
Craig Tiller1c51edc2016-05-07 16:18:43 -0700585 GRPC_ERROR_UNREF(op->disconnect_with_error);
Craig Tillera82950e2015-09-22 12:33:20 -0700586 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800587 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "start_transport_op");
Craig Tillerd2e5cfc2017-02-09 13:02:20 -0800588
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800589 GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE);
Craig Tillerbefafe62017-02-09 11:30:54 -0800590}
591
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800592static void cc_start_transport_op(grpc_channel_element* elem,
Craig Tillerbaa14a92017-11-03 09:09:36 -0700593 grpc_transport_op* op) {
Noah Eisenbe82e642018-02-09 09:16:55 -0800594 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Craig Tillerbefafe62017-02-09 11:30:54 -0800595
Craig Tillerbefafe62017-02-09 11:30:54 -0800596 GPR_ASSERT(op->set_accept_stream == false);
Craig Tiller4782d922017-11-10 09:53:21 -0800597 if (op->bind_pollset != nullptr) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800598 grpc_pollset_set_add_pollset(chand->interested_parties, op->bind_pollset);
Craig Tillerbefafe62017-02-09 11:30:54 -0800599 }
600
Craig Tillerc55c1022017-03-10 10:26:42 -0800601 op->handler_private.extra_arg = elem;
Craig Tillerd2e5cfc2017-02-09 13:02:20 -0800602 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "start_transport_op");
ncteisen274bbbe2017-06-08 14:57:11 -0700603 GRPC_CLOSURE_SCHED(
ncteisen274bbbe2017-06-08 14:57:11 -0700604 GRPC_CLOSURE_INIT(&op->handler_private.closure, start_transport_op_locked,
Craig Tilleree4b1452017-05-12 10:56:03 -0700605 op, grpc_combiner_scheduler(chand->combiner)),
Craig Tillerbefafe62017-02-09 11:30:54 -0800606 GRPC_ERROR_NONE);
Craig Tillerca3e9d32015-06-27 18:37:27 -0700607}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800608
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800609static void cc_get_channel_info(grpc_channel_element* elem,
Craig Tillerbaa14a92017-11-03 09:09:36 -0700610 const grpc_channel_info* info) {
Noah Eisenbe82e642018-02-09 09:16:55 -0800611 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Craig Tiller613dafa2017-02-09 12:00:43 -0800612 gpr_mu_lock(&chand->info_mu);
Craig Tiller4782d922017-11-10 09:53:21 -0800613 if (info->lb_policy_name != nullptr) {
614 *info->lb_policy_name = chand->info_lb_policy_name == nullptr
615 ? nullptr
Craig Tiller613dafa2017-02-09 12:00:43 -0800616 : gpr_strdup(chand->info_lb_policy_name);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700617 }
Craig Tiller4782d922017-11-10 09:53:21 -0800618 if (info->service_config_json != nullptr) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800619 *info->service_config_json =
Craig Tiller4782d922017-11-10 09:53:21 -0800620 chand->info_service_config_json == nullptr
621 ? nullptr
Craig Tiller613dafa2017-02-09 12:00:43 -0800622 : gpr_strdup(chand->info_service_config_json);
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800623 }
Craig Tiller613dafa2017-02-09 12:00:43 -0800624 gpr_mu_unlock(&chand->info_mu);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700625}
626
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700627/* Constructor for channel_data */
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800628static grpc_error* cc_init_channel_elem(grpc_channel_element* elem,
Craig Tillerbaa14a92017-11-03 09:09:36 -0700629 grpc_channel_element_args* args) {
Noah Eisenbe82e642018-02-09 09:16:55 -0800630 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700631 GPR_ASSERT(args->is_last);
632 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800633 // Initialize data members.
Craig Tilleree4b1452017-05-12 10:56:03 -0700634 chand->combiner = grpc_combiner_create();
Craig Tillerd85477512017-02-09 12:02:39 -0800635 gpr_mu_init(&chand->info_mu);
Alexander Polcync3b1f182017-04-18 13:51:36 -0700636 gpr_mu_init(&chand->external_connectivity_watcher_list_mu);
637
638 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
Craig Tiller4782d922017-11-10 09:53:21 -0800639 chand->external_connectivity_watcher_list_head = nullptr;
Alexander Polcync3b1f182017-04-18 13:51:36 -0700640 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
641
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800642 chand->owning_stack = args->channel_stack;
ncteisen274bbbe2017-06-08 14:57:11 -0700643 GRPC_CLOSURE_INIT(&chand->on_resolver_result_changed,
Craig Tillerbefafe62017-02-09 11:30:54 -0800644 on_resolver_result_changed_locked, chand,
Craig Tilleree4b1452017-05-12 10:56:03 -0700645 grpc_combiner_scheduler(chand->combiner));
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800646 chand->interested_parties = grpc_pollset_set_create();
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700647 grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
648 "client_channel");
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800649 grpc_client_channel_start_backup_polling(chand->interested_parties);
Mark D. Roth718c8342018-02-28 13:00:04 -0800650 // Record max per-RPC retry buffer size.
Noah Eisen7ea8a602018-06-14 11:43:18 -0400651 const grpc_arg* arg = grpc_channel_args_find(
652 args->channel_args, GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE);
653 chand->per_rpc_retry_buffer_size = (size_t)grpc_channel_arg_get_integer(
654 arg, {DEFAULT_PER_RPC_RETRY_BUFFER_SIZE, 0, INT_MAX});
Mark D. Roth718c8342018-02-28 13:00:04 -0800655 // Record enable_retries.
Noah Eisen7ea8a602018-06-14 11:43:18 -0400656 arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_ENABLE_RETRIES);
657 chand->enable_retries = grpc_channel_arg_get_bool(arg, true);
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800658 // Record client channel factory.
Noah Eisen7ea8a602018-06-14 11:43:18 -0400659 arg = grpc_channel_args_find(args->channel_args,
660 GRPC_ARG_CLIENT_CHANNEL_FACTORY);
661 if (arg == nullptr) {
David Garcia Quintas228a5142017-03-30 19:43:00 -0700662 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
Noah Eisen7ea8a602018-06-14 11:43:18 -0400663 "Missing client channel factory in args for client channel filter");
David Garcia Quintas228a5142017-03-30 19:43:00 -0700664 }
Noah Eisen7ea8a602018-06-14 11:43:18 -0400665 if (arg->type != GRPC_ARG_POINTER) {
David Garcia Quintas228a5142017-03-30 19:43:00 -0700666 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
Noah Eisen7ea8a602018-06-14 11:43:18 -0400667 "client channel factory arg must be a pointer");
668 }
669 grpc_client_channel_factory_ref(
670 static_cast<grpc_client_channel_factory*>(arg->value.pointer.p));
671 chand->client_channel_factory =
672 static_cast<grpc_client_channel_factory*>(arg->value.pointer.p);
673 // Get server name to resolve, using proxy mapper if needed.
674 arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI);
675 if (arg == nullptr) {
676 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
677 "Missing server uri in args for client channel filter");
678 }
679 if (arg->type != GRPC_ARG_STRING) {
680 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
681 "server uri arg must be a string");
David Garcia Quintas228a5142017-03-30 19:43:00 -0700682 }
Craig Tiller4782d922017-11-10 09:53:21 -0800683 char* proxy_name = nullptr;
684 grpc_channel_args* new_args = nullptr;
Noah Eisen7ea8a602018-06-14 11:43:18 -0400685 grpc_proxy_mappers_map_name(arg->value.string, args->channel_args,
686 &proxy_name, &new_args);
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800687 // Instantiate resolver.
Mark D. Roth209f6442018-02-08 10:26:46 -0800688 chand->resolver = grpc_core::ResolverRegistry::CreateResolver(
Noah Eisen7ea8a602018-06-14 11:43:18 -0400689 proxy_name != nullptr ? proxy_name : arg->value.string,
Craig Tiller4782d922017-11-10 09:53:21 -0800690 new_args != nullptr ? new_args : args->channel_args,
Craig Tiller972470b2017-02-09 15:05:36 -0800691 chand->interested_parties, chand->combiner);
Craig Tiller4782d922017-11-10 09:53:21 -0800692 if (proxy_name != nullptr) gpr_free(proxy_name);
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800693 if (new_args != nullptr) grpc_channel_args_destroy(new_args);
Craig Tiller4782d922017-11-10 09:53:21 -0800694 if (chand->resolver == nullptr) {
ncteisen4b36a3d2017-03-13 19:08:06 -0700695 return GRPC_ERROR_CREATE_FROM_STATIC_STRING("resolver creation failed");
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800696 }
Craig Tiller3be7dd02017-04-03 14:30:03 -0700697 chand->deadline_checking_enabled =
698 grpc_deadline_checking_enabled(args->channel_args);
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800699 return GRPC_ERROR_NONE;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700700}
701
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800702static void shutdown_resolver_locked(void* arg, grpc_error* error) {
Mark D. Roth209f6442018-02-08 10:26:46 -0800703 grpc_core::Resolver* resolver = static_cast<grpc_core::Resolver*>(arg);
704 resolver->Orphan();
Craig Tiller972470b2017-02-09 15:05:36 -0800705}
706
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700707/* Destructor for channel_data */
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800708static void cc_destroy_channel_elem(grpc_channel_element* elem) {
Noah Eisenbe82e642018-02-09 09:16:55 -0800709 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Craig Tiller4782d922017-11-10 09:53:21 -0800710 if (chand->resolver != nullptr) {
ncteisen274bbbe2017-06-08 14:57:11 -0700711 GRPC_CLOSURE_SCHED(
Mark D. Roth209f6442018-02-08 10:26:46 -0800712 GRPC_CLOSURE_CREATE(shutdown_resolver_locked, chand->resolver.release(),
Yash Tibrewal0ee75742017-10-13 16:07:13 -0700713 grpc_combiner_scheduler(chand->combiner)),
Craig Tiller972470b2017-02-09 15:05:36 -0800714 GRPC_ERROR_NONE);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700715 }
Craig Tiller4782d922017-11-10 09:53:21 -0800716 if (chand->client_channel_factory != nullptr) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800717 grpc_client_channel_factory_unref(chand->client_channel_factory);
Mark D. Roth0e48a9a2016-09-08 14:14:39 -0700718 }
Craig Tiller4782d922017-11-10 09:53:21 -0800719 if (chand->lb_policy != nullptr) {
Mark D. Rothc8875492018-02-20 08:33:48 -0800720 grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700721 chand->interested_parties);
Mark D. Rothc8875492018-02-20 08:33:48 -0800722 chand->lb_policy.reset();
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700723 }
Craig Tiller613dafa2017-02-09 12:00:43 -0800724 gpr_free(chand->info_lb_policy_name);
725 gpr_free(chand->info_service_config_json);
Mark D. Roth9db86fc2018-03-28 07:42:20 -0700726 chand->retry_throttle_data.reset();
727 chand->method_params_table.reset();
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800728 grpc_client_channel_stop_backup_polling(chand->interested_parties);
729 grpc_connectivity_state_destroy(&chand->state_tracker);
730 grpc_pollset_set_destroy(chand->interested_parties);
731 GRPC_COMBINER_UNREF(chand->combiner, "client_channel");
Craig Tillerd85477512017-02-09 12:02:39 -0800732 gpr_mu_destroy(&chand->info_mu);
Alexander Polcync3b1f182017-04-18 13:51:36 -0700733 gpr_mu_destroy(&chand->external_connectivity_watcher_list_mu);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700734}
735
736/*************************************************************************
737 * PER-CALL FUNCTIONS
738 */
739
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700740// Max number of batches that can be pending on a call at any given
Mark D. Roth718c8342018-02-28 13:00:04 -0800741// time. This includes one batch for each of the following ops:
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700742// recv_initial_metadata
743// send_initial_metadata
744// recv_message
745// send_message
746// recv_trailing_metadata
747// send_trailing_metadata
Mark D. Roth718c8342018-02-28 13:00:04 -0800748#define MAX_PENDING_BATCHES 6
749
750// Retry support:
751//
752// In order to support retries, we act as a proxy for stream op batches.
753// When we get a batch from the surface, we add it to our list of pending
754// batches, and we then use those batches to construct separate "child"
755// batches to be started on the subchannel call. When the child batches
756// return, we then decide which pending batches have been completed and
757// schedule their callbacks accordingly. If a subchannel call fails and
758// we want to retry it, we do a new pick and start again, constructing
759// new "child" batches for the new subchannel call.
760//
761// Note that retries are committed when receiving data from the server
762// (except for Trailers-Only responses). However, there may be many
763// send ops started before receiving any data, so we may have already
764// completed some number of send ops (and returned the completions up to
765// the surface) by the time we realize that we need to retry. To deal
766// with this, we cache data for send ops, so that we can replay them on a
767// different subchannel call even after we have completed the original
768// batches.
769//
770// There are two sets of data to maintain:
771// - In call_data (in the parent channel), we maintain a list of pending
772// ops and cached data for send ops.
773// - In the subchannel call, we maintain state to indicate what ops have
774// already been sent down to that call.
775//
776// When constructing the "child" batches, we compare those two sets of
777// data to see which batches need to be sent to the subchannel call.
778
779// TODO(roth): In subsequent PRs:
780// - add support for transparent retries (including initial metadata)
781// - figure out how to record stats in census for retries
782// (census filter is on top of this one)
783// - add census stats for retries
784
785// State used for starting a retryable batch on a subchannel call.
786// This provides its own grpc_transport_stream_op_batch and other data
787// structures needed to populate the ops in the batch.
788// We allocate one struct on the arena for each attempt at starting a
789// batch on a given subchannel call.
790typedef struct {
791 gpr_refcount refs;
792 grpc_call_element* elem;
793 grpc_subchannel_call* subchannel_call; // Holds a ref.
794 // The batch to use in the subchannel call.
795 // Its payload field points to subchannel_call_retry_state.batch_payload.
796 grpc_transport_stream_op_batch batch;
797 // For send_initial_metadata.
798 // Note that we need to make a copy of the initial metadata for each
799 // subchannel call instead of just referring to the copy in call_data,
800 // because filters in the subchannel stack will probably add entries,
801 // so we need to start in a pristine state for each attempt of the call.
802 grpc_linked_mdelem* send_initial_metadata_storage;
803 grpc_metadata_batch send_initial_metadata;
804 // For send_message.
Mark D. Roth3d8b32d2018-03-09 13:25:40 -0800805 grpc_core::ManualConstructor<grpc_core::ByteStreamCache::CachingByteStream>
806 send_message;
Mark D. Roth718c8342018-02-28 13:00:04 -0800807 // For send_trailing_metadata.
808 grpc_linked_mdelem* send_trailing_metadata_storage;
809 grpc_metadata_batch send_trailing_metadata;
810 // For intercepting recv_initial_metadata.
811 grpc_metadata_batch recv_initial_metadata;
812 grpc_closure recv_initial_metadata_ready;
813 bool trailing_metadata_available;
814 // For intercepting recv_message.
815 grpc_closure recv_message_ready;
Mark D. Roth3d8b32d2018-03-09 13:25:40 -0800816 grpc_core::OrphanablePtr<grpc_core::ByteStream> recv_message;
Mark D. Roth718c8342018-02-28 13:00:04 -0800817 // For intercepting recv_trailing_metadata.
818 grpc_metadata_batch recv_trailing_metadata;
819 grpc_transport_stream_stats collect_stats;
820 // For intercepting on_complete.
821 grpc_closure on_complete;
822} subchannel_batch_data;
823
824// Retry state associated with a subchannel call.
825// Stored in the parent_data of the subchannel call object.
826typedef struct {
827 // subchannel_batch_data.batch.payload points to this.
828 grpc_transport_stream_op_batch_payload batch_payload;
829 // These fields indicate which ops have been started and completed on
830 // this subchannel call.
831 size_t started_send_message_count;
832 size_t completed_send_message_count;
833 size_t started_recv_message_count;
834 size_t completed_recv_message_count;
835 bool started_send_initial_metadata : 1;
836 bool completed_send_initial_metadata : 1;
837 bool started_send_trailing_metadata : 1;
838 bool completed_send_trailing_metadata : 1;
839 bool started_recv_initial_metadata : 1;
840 bool completed_recv_initial_metadata : 1;
841 bool started_recv_trailing_metadata : 1;
842 bool completed_recv_trailing_metadata : 1;
843 // State for callback processing.
844 bool retry_dispatched : 1;
Mark D. Rothde077ac2018-04-12 08:05:44 -0700845 subchannel_batch_data* recv_initial_metadata_ready_deferred_batch;
Mark D. Roth718c8342018-02-28 13:00:04 -0800846 grpc_error* recv_initial_metadata_error;
Mark D. Rothde077ac2018-04-12 08:05:44 -0700847 subchannel_batch_data* recv_message_ready_deferred_batch;
Mark D. Roth718c8342018-02-28 13:00:04 -0800848 grpc_error* recv_message_error;
Mark D. Rothde077ac2018-04-12 08:05:44 -0700849 subchannel_batch_data* recv_trailing_metadata_internal_batch;
Mark D. Roth718c8342018-02-28 13:00:04 -0800850} subchannel_call_retry_state;
851
852// Pending batches stored in call data.
853typedef struct {
854 // The pending batch. If nullptr, this slot is empty.
855 grpc_transport_stream_op_batch* batch;
856 // Indicates whether payload for send ops has been cached in call data.
857 bool send_ops_cached;
858} pending_batch;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700859
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700860/** Call data. Holds a pointer to grpc_subchannel_call and the
861 associated machinery to create such a pointer.
862 Handles queueing of stream ops until a call object is ready, waiting
863 for initial metadata before trying to create a call object,
864 and handling cancellation gracefully. */
865typedef struct client_channel_call_data {
Mark D. Roth72f6da82016-09-02 13:42:38 -0700866 // State for handling deadlines.
867 // The code in deadline_filter.c requires this to be the first field.
Mark D. Roth72f6da82016-09-02 13:42:38 -0700868 // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
Mark D. Roth66f3d2b2017-09-01 09:02:17 -0700869 // and this struct both independently store pointers to the call stack
870 // and call combiner. If/when we have time, find a way to avoid this
871 // without breaking the grpc_deadline_state abstraction.
Mark D. Roth72f6da82016-09-02 13:42:38 -0700872 grpc_deadline_state deadline_state;
Mark D. Rothf28763c2016-09-14 15:18:40 -0700873
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800874 grpc_slice path; // Request path.
Mark D. Rothe40dd292016-10-05 14:58:37 -0700875 gpr_timespec call_start_time;
Craig Tiller89c14282017-07-19 15:32:27 -0700876 grpc_millis deadline;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700877 gpr_arena* arena;
878 grpc_call_stack* owning_call;
879 grpc_call_combiner* call_combiner;
Mark D. Roth76e264b2017-08-25 09:03:33 -0700880
Mark D. Roth9db86fc2018-03-28 07:42:20 -0700881 grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
Mark D. Roth3e7f2df2018-02-26 13:17:06 -0800882 grpc_core::RefCountedPtr<ClientChannelMethodParams> method_params;
Mark D. Rothaa850a72016-09-26 13:38:02 -0700883
Craig Tillerbaa14a92017-11-03 09:09:36 -0700884 grpc_subchannel_call* subchannel_call;
Mark D. Roth718c8342018-02-28 13:00:04 -0800885
886 // Set when we get a cancel_stream op.
887 grpc_error* cancel_error;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700888
Mark D. Rothc8875492018-02-20 08:33:48 -0800889 grpc_core::LoadBalancingPolicy::PickState pick;
Mark D. Roth718c8342018-02-28 13:00:04 -0800890 grpc_closure pick_closure;
891 grpc_closure pick_cancel_closure;
Mark D. Roth60751fe2017-07-07 12:50:33 -0700892
Craig Tillerbaa14a92017-11-03 09:09:36 -0700893 grpc_polling_entity* pollent;
Mark D. Roth7e0e2022018-06-01 12:04:16 -0700894 bool pollent_added_to_interested_parties;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700895
Mark D. Roth718c8342018-02-28 13:00:04 -0800896 // Batches are added to this list when received from above.
897 // They are removed when we are done handling the batch (i.e., when
898 // either we have invoked all of the batch's callbacks or we have
899 // passed the batch down to the subchannel call and are not
900 // intercepting any of its callbacks).
901 pending_batch pending_batches[MAX_PENDING_BATCHES];
902 bool pending_send_initial_metadata : 1;
903 bool pending_send_message : 1;
904 bool pending_send_trailing_metadata : 1;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700905
Mark D. Roth718c8342018-02-28 13:00:04 -0800906 // Retry state.
907 bool enable_retries : 1;
908 bool retry_committed : 1;
909 bool last_attempt_got_server_pushback : 1;
910 int num_attempts_completed;
911 size_t bytes_buffered_for_retry;
912 grpc_core::ManualConstructor<grpc_core::BackOff> retry_backoff;
913 grpc_timer retry_timer;
David Garcia Quintasd1a47f12016-09-02 12:46:44 +0200914
Mark D. Roth4f9e0032018-05-24 09:30:09 -0700915 // The number of pending retriable subchannel batches containing send ops.
916 // We hold a ref to the call stack while this is non-zero, since replay
917 // batches may not complete until after all callbacks have been returned
918 // to the surface, and we need to make sure that the call is not destroyed
919 // until all of these batches have completed.
920 // Note that we actually only need to track replay batches, but it's
921 // easier to track all batches with send ops.
922 int num_pending_retriable_subchannel_send_batches;
923
Mark D. Roth718c8342018-02-28 13:00:04 -0800924 // Cached data for retrying send ops.
925 // send_initial_metadata
926 bool seen_send_initial_metadata;
927 grpc_linked_mdelem* send_initial_metadata_storage;
928 grpc_metadata_batch send_initial_metadata;
929 uint32_t send_initial_metadata_flags;
930 gpr_atm* peer_string;
931 // send_message
932 // When we get a send_message op, we replace the original byte stream
Mark D. Roth3d8b32d2018-03-09 13:25:40 -0800933 // with a CachingByteStream that caches the slices to a local buffer for
934 // use in retries.
Mark D. Roth718c8342018-02-28 13:00:04 -0800935 // Note: We inline the cache for the first 3 send_message ops and use
936 // dynamic allocation after that. This number was essentially picked
937 // at random; it could be changed in the future to tune performance.
Mark D. Rothefcd45b2018-03-28 10:49:59 -0700938 grpc_core::ManualConstructor<
939 grpc_core::InlinedVector<grpc_core::ByteStreamCache*, 3>>
940 send_messages;
Mark D. Roth718c8342018-02-28 13:00:04 -0800941 // send_trailing_metadata
942 bool seen_send_trailing_metadata;
943 grpc_linked_mdelem* send_trailing_metadata_storage;
944 grpc_metadata_batch send_trailing_metadata;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700945} call_data;
946
Mark D. Roth718c8342018-02-28 13:00:04 -0800947// Forward declarations.
948static void retry_commit(grpc_call_element* elem,
949 subchannel_call_retry_state* retry_state);
950static void start_internal_recv_trailing_metadata(grpc_call_element* elem);
951static void on_complete(void* arg, grpc_error* error);
952static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored);
Mark D. Roth718c8342018-02-28 13:00:04 -0800953static void start_pick_locked(void* arg, grpc_error* ignored);
Craig Tiller8b1d59c2016-12-27 15:15:30 -0800954
Mark D. Roth718c8342018-02-28 13:00:04 -0800955//
956// send op data caching
957//
958
959// Caches data for send ops so that it can be retried later, if not
960// already cached.
961static void maybe_cache_send_ops_for_batch(call_data* calld,
962 pending_batch* pending) {
963 if (pending->send_ops_cached) return;
964 pending->send_ops_cached = true;
965 grpc_transport_stream_op_batch* batch = pending->batch;
966 // Save a copy of metadata for send_initial_metadata ops.
Mark D. Roth76e264b2017-08-25 09:03:33 -0700967 if (batch->send_initial_metadata) {
Mark D. Roth718c8342018-02-28 13:00:04 -0800968 calld->seen_send_initial_metadata = true;
969 GPR_ASSERT(calld->send_initial_metadata_storage == nullptr);
970 grpc_metadata_batch* send_initial_metadata =
971 batch->payload->send_initial_metadata.send_initial_metadata;
972 calld->send_initial_metadata_storage = (grpc_linked_mdelem*)gpr_arena_alloc(
973 calld->arena,
974 sizeof(grpc_linked_mdelem) * send_initial_metadata->list.count);
975 grpc_metadata_batch_copy(send_initial_metadata,
976 &calld->send_initial_metadata,
977 calld->send_initial_metadata_storage);
978 calld->send_initial_metadata_flags =
979 batch->payload->send_initial_metadata.send_initial_metadata_flags;
980 calld->peer_string = batch->payload->send_initial_metadata.peer_string;
981 }
982 // Set up cache for send_message ops.
983 if (batch->send_message) {
Mark D. Roth3d8b32d2018-03-09 13:25:40 -0800984 grpc_core::ByteStreamCache* cache =
985 static_cast<grpc_core::ByteStreamCache*>(
986 gpr_arena_alloc(calld->arena, sizeof(grpc_core::ByteStreamCache)));
987 new (cache) grpc_core::ByteStreamCache(
988 std::move(batch->payload->send_message.send_message));
Mark D. Rothefcd45b2018-03-28 10:49:59 -0700989 calld->send_messages->push_back(cache);
Mark D. Roth718c8342018-02-28 13:00:04 -0800990 }
991 // Save metadata batch for send_trailing_metadata ops.
992 if (batch->send_trailing_metadata) {
993 calld->seen_send_trailing_metadata = true;
994 GPR_ASSERT(calld->send_trailing_metadata_storage == nullptr);
995 grpc_metadata_batch* send_trailing_metadata =
996 batch->payload->send_trailing_metadata.send_trailing_metadata;
997 calld->send_trailing_metadata_storage =
998 (grpc_linked_mdelem*)gpr_arena_alloc(
999 calld->arena,
1000 sizeof(grpc_linked_mdelem) * send_trailing_metadata->list.count);
1001 grpc_metadata_batch_copy(send_trailing_metadata,
1002 &calld->send_trailing_metadata,
1003 calld->send_trailing_metadata_storage);
Mark D. Roth76e264b2017-08-25 09:03:33 -07001004 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001005}
1006
Mark D. Rothde077ac2018-04-12 08:05:44 -07001007// Frees cached send_initial_metadata.
1008static void free_cached_send_initial_metadata(channel_data* chand,
1009 call_data* calld) {
1010 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001011 gpr_log(GPR_INFO,
Mark D. Rothde077ac2018-04-12 08:05:44 -07001012 "chand=%p calld=%p: destroying calld->send_initial_metadata", chand,
1013 calld);
1014 }
1015 grpc_metadata_batch_destroy(&calld->send_initial_metadata);
1016}
1017
1018// Frees cached send_message at index idx.
1019static void free_cached_send_message(channel_data* chand, call_data* calld,
1020 size_t idx) {
1021 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001022 gpr_log(GPR_INFO,
Mark D. Rothde077ac2018-04-12 08:05:44 -07001023 "chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR "]",
1024 chand, calld, idx);
1025 }
1026 (*calld->send_messages)[idx]->Destroy();
1027}
1028
1029// Frees cached send_trailing_metadata.
1030static void free_cached_send_trailing_metadata(channel_data* chand,
1031 call_data* calld) {
1032 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001033 gpr_log(GPR_INFO,
Mark D. Rothde077ac2018-04-12 08:05:44 -07001034 "chand=%p calld=%p: destroying calld->send_trailing_metadata",
1035 chand, calld);
1036 }
1037 grpc_metadata_batch_destroy(&calld->send_trailing_metadata);
1038}
1039
Mark D. Roth718c8342018-02-28 13:00:04 -08001040// Frees cached send ops that have already been completed after
1041// committing the call.
1042static void free_cached_send_op_data_after_commit(
1043 grpc_call_element* elem, subchannel_call_retry_state* retry_state) {
Noah Eisenbe82e642018-02-09 09:16:55 -08001044 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1045 call_data* calld = static_cast<call_data*>(elem->call_data);
Mark D. Roth718c8342018-02-28 13:00:04 -08001046 if (retry_state->completed_send_initial_metadata) {
Mark D. Rothde077ac2018-04-12 08:05:44 -07001047 free_cached_send_initial_metadata(chand, calld);
Mark D. Roth60751fe2017-07-07 12:50:33 -07001048 }
Mark D. Roth718c8342018-02-28 13:00:04 -08001049 for (size_t i = 0; i < retry_state->completed_send_message_count; ++i) {
Mark D. Rothde077ac2018-04-12 08:05:44 -07001050 free_cached_send_message(chand, calld, i);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001051 }
Mark D. Roth718c8342018-02-28 13:00:04 -08001052 if (retry_state->completed_send_trailing_metadata) {
Mark D. Rothde077ac2018-04-12 08:05:44 -07001053 free_cached_send_trailing_metadata(chand, calld);
Mark D. Roth718c8342018-02-28 13:00:04 -08001054 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001055}
1056
Mark D. Roth718c8342018-02-28 13:00:04 -08001057// Frees cached send ops that were completed by the completed batch in
1058// batch_data. Used when batches are completed after the call is committed.
1059static void free_cached_send_op_data_for_completed_batch(
1060 grpc_call_element* elem, subchannel_batch_data* batch_data,
1061 subchannel_call_retry_state* retry_state) {
Noah Eisenbe82e642018-02-09 09:16:55 -08001062 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1063 call_data* calld = static_cast<call_data*>(elem->call_data);
Mark D. Roth718c8342018-02-28 13:00:04 -08001064 if (batch_data->batch.send_initial_metadata) {
Mark D. Rothde077ac2018-04-12 08:05:44 -07001065 free_cached_send_initial_metadata(chand, calld);
Mark D. Roth718c8342018-02-28 13:00:04 -08001066 }
1067 if (batch_data->batch.send_message) {
Mark D. Rothde077ac2018-04-12 08:05:44 -07001068 free_cached_send_message(chand, calld,
1069 retry_state->completed_send_message_count - 1);
Mark D. Roth718c8342018-02-28 13:00:04 -08001070 }
1071 if (batch_data->batch.send_trailing_metadata) {
Mark D. Rothde077ac2018-04-12 08:05:44 -07001072 free_cached_send_trailing_metadata(chand, calld);
Mark D. Roth718c8342018-02-28 13:00:04 -08001073 }
1074}
1075
1076//
1077// pending_batches management
1078//
1079
1080// Returns the index into calld->pending_batches to be used for batch.
1081static size_t get_batch_index(grpc_transport_stream_op_batch* batch) {
1082 // Note: It is important the send_initial_metadata be the first entry
1083 // here, since the code in pick_subchannel_locked() assumes it will be.
1084 if (batch->send_initial_metadata) return 0;
1085 if (batch->send_message) return 1;
1086 if (batch->send_trailing_metadata) return 2;
1087 if (batch->recv_initial_metadata) return 3;
1088 if (batch->recv_message) return 4;
1089 if (batch->recv_trailing_metadata) return 5;
1090 GPR_UNREACHABLE_CODE(return (size_t)-1);
1091}
1092
1093// This is called via the call combiner, so access to calld is synchronized.
1094static void pending_batches_add(grpc_call_element* elem,
1095 grpc_transport_stream_op_batch* batch) {
1096 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1097 call_data* calld = static_cast<call_data*>(elem->call_data);
1098 const size_t idx = get_batch_index(batch);
Craig Tiller6014e8a2017-10-16 13:50:29 -07001099 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001100 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08001101 "chand=%p calld=%p: adding pending batch at index %" PRIuPTR, chand,
1102 calld, idx);
Mark D. Roth60751fe2017-07-07 12:50:33 -07001103 }
Mark D. Roth718c8342018-02-28 13:00:04 -08001104 pending_batch* pending = &calld->pending_batches[idx];
1105 GPR_ASSERT(pending->batch == nullptr);
1106 pending->batch = batch;
1107 pending->send_ops_cached = false;
1108 if (calld->enable_retries) {
1109 // Update state in calld about pending batches.
1110 // Also check if the batch takes us over the retry buffer limit.
1111 // Note: We don't check the size of trailing metadata here, because
1112 // gRPC clients do not send trailing metadata.
1113 if (batch->send_initial_metadata) {
1114 calld->pending_send_initial_metadata = true;
1115 calld->bytes_buffered_for_retry += grpc_metadata_batch_size(
1116 batch->payload->send_initial_metadata.send_initial_metadata);
1117 }
1118 if (batch->send_message) {
1119 calld->pending_send_message = true;
1120 calld->bytes_buffered_for_retry +=
Mark D. Roth3d8b32d2018-03-09 13:25:40 -08001121 batch->payload->send_message.send_message->length();
Mark D. Roth718c8342018-02-28 13:00:04 -08001122 }
1123 if (batch->send_trailing_metadata) {
1124 calld->pending_send_trailing_metadata = true;
1125 }
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07001126 if (GPR_UNLIKELY(calld->bytes_buffered_for_retry >
1127 chand->per_rpc_retry_buffer_size)) {
Mark D. Roth718c8342018-02-28 13:00:04 -08001128 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001129 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08001130 "chand=%p calld=%p: exceeded retry buffer size, committing",
1131 chand, calld);
1132 }
1133 subchannel_call_retry_state* retry_state =
1134 calld->subchannel_call == nullptr
1135 ? nullptr
1136 : static_cast<subchannel_call_retry_state*>(
1137 grpc_connected_subchannel_call_get_parent_data(
1138 calld->subchannel_call));
1139 retry_commit(elem, retry_state);
1140 // If we are not going to retry and have not yet started, pretend
1141 // retries are disabled so that we don't bother with retry overhead.
1142 if (calld->num_attempts_completed == 0) {
1143 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001144 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08001145 "chand=%p calld=%p: disabling retries before first attempt",
1146 chand, calld);
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001147 }
Mark D. Roth718c8342018-02-28 13:00:04 -08001148 calld->enable_retries = false;
Craig Tiller11c17d42017-03-13 13:36:34 -07001149 }
1150 }
1151 }
Craig Tiller11c17d42017-03-13 13:36:34 -07001152}
Craig Tillerea4a4f12017-03-13 13:36:52 -07001153
Mark D. Roth718c8342018-02-28 13:00:04 -08001154static void pending_batch_clear(call_data* calld, pending_batch* pending) {
1155 if (calld->enable_retries) {
1156 if (pending->batch->send_initial_metadata) {
1157 calld->pending_send_initial_metadata = false;
1158 }
1159 if (pending->batch->send_message) {
1160 calld->pending_send_message = false;
1161 }
1162 if (pending->batch->send_trailing_metadata) {
1163 calld->pending_send_trailing_metadata = false;
1164 }
1165 }
1166 pending->batch = nullptr;
1167}
1168
1169// This is called via the call combiner, so access to calld is synchronized.
1170static void fail_pending_batch_in_call_combiner(void* arg, grpc_error* error) {
1171 grpc_transport_stream_op_batch* batch =
1172 static_cast<grpc_transport_stream_op_batch*>(arg);
1173 call_data* calld = static_cast<call_data*>(batch->handler_private.extra_arg);
1174 // Note: This will release the call combiner.
1175 grpc_transport_stream_op_batch_finish_with_failure(
1176 batch, GRPC_ERROR_REF(error), calld->call_combiner);
1177}
1178
1179// This is called via the call combiner, so access to calld is synchronized.
1180// If yield_call_combiner is true, assumes responsibility for yielding
1181// the call combiner.
1182static void pending_batches_fail(grpc_call_element* elem, grpc_error* error,
1183 bool yield_call_combiner) {
1184 GPR_ASSERT(error != GRPC_ERROR_NONE);
1185 call_data* calld = static_cast<call_data*>(elem->call_data);
1186 if (grpc_client_channel_trace.enabled()) {
1187 size_t num_batches = 0;
1188 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1189 if (calld->pending_batches[i].batch != nullptr) ++num_batches;
1190 }
Mark D. Roth48854d22018-04-25 13:05:26 -07001191 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08001192 "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
1193 elem->channel_data, calld, num_batches, grpc_error_string(error));
1194 }
Mark D. Rothf3715132018-06-08 14:22:12 -07001195 grpc_transport_stream_op_batch*
1196 batches[GPR_ARRAY_SIZE(calld->pending_batches)];
1197 size_t num_batches = 0;
Mark D. Roth718c8342018-02-28 13:00:04 -08001198 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1199 pending_batch* pending = &calld->pending_batches[i];
1200 grpc_transport_stream_op_batch* batch = pending->batch;
1201 if (batch != nullptr) {
Mark D. Rothf3715132018-06-08 14:22:12 -07001202 batches[num_batches++] = batch;
Mark D. Roth718c8342018-02-28 13:00:04 -08001203 pending_batch_clear(calld, pending);
1204 }
1205 }
Mark D. Rothf3715132018-06-08 14:22:12 -07001206 for (size_t i = yield_call_combiner ? 1 : 0; i < num_batches; ++i) {
1207 grpc_transport_stream_op_batch* batch = batches[i];
1208 batch->handler_private.extra_arg = calld;
1209 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
1210 fail_pending_batch_in_call_combiner, batch,
1211 grpc_schedule_on_exec_ctx);
1212 GRPC_CALL_COMBINER_START(calld->call_combiner,
1213 &batch->handler_private.closure,
1214 GRPC_ERROR_REF(error), "pending_batches_fail");
1215 }
Mark D. Roth718c8342018-02-28 13:00:04 -08001216 if (yield_call_combiner) {
Mark D. Rothf3715132018-06-08 14:22:12 -07001217 if (num_batches > 0) {
1218 // Note: This will release the call combiner.
1219 grpc_transport_stream_op_batch_finish_with_failure(
1220 batches[0], GRPC_ERROR_REF(error), calld->call_combiner);
1221 } else {
1222 GRPC_CALL_COMBINER_STOP(calld->call_combiner, "pending_batches_fail");
1223 }
Mark D. Roth718c8342018-02-28 13:00:04 -08001224 }
1225 GRPC_ERROR_UNREF(error);
1226}
1227
1228// This is called via the call combiner, so access to calld is synchronized.
1229static void resume_pending_batch_in_call_combiner(void* arg,
1230 grpc_error* ignored) {
1231 grpc_transport_stream_op_batch* batch =
1232 static_cast<grpc_transport_stream_op_batch*>(arg);
1233 grpc_subchannel_call* subchannel_call =
1234 static_cast<grpc_subchannel_call*>(batch->handler_private.extra_arg);
1235 // Note: This will release the call combiner.
1236 grpc_subchannel_call_process_op(subchannel_call, batch);
1237}
1238
1239// This is called via the call combiner, so access to calld is synchronized.
1240static void pending_batches_resume(grpc_call_element* elem) {
Noah Eisenbe82e642018-02-09 09:16:55 -08001241 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1242 call_data* calld = static_cast<call_data*>(elem->call_data);
Mark D. Roth718c8342018-02-28 13:00:04 -08001243 if (calld->enable_retries) {
1244 start_retriable_subchannel_batches(elem, GRPC_ERROR_NONE);
1245 return;
1246 }
1247 // Retries not enabled; send down batches as-is.
1248 if (grpc_client_channel_trace.enabled()) {
1249 size_t num_batches = 0;
1250 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1251 if (calld->pending_batches[i].batch != nullptr) ++num_batches;
1252 }
Mark D. Roth48854d22018-04-25 13:05:26 -07001253 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08001254 "chand=%p calld=%p: starting %" PRIuPTR
1255 " pending batches on subchannel_call=%p",
1256 chand, calld, num_batches, calld->subchannel_call);
1257 }
Mark D. Rothf3715132018-06-08 14:22:12 -07001258 grpc_transport_stream_op_batch*
1259 batches[GPR_ARRAY_SIZE(calld->pending_batches)];
1260 size_t num_batches = 0;
Mark D. Roth718c8342018-02-28 13:00:04 -08001261 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1262 pending_batch* pending = &calld->pending_batches[i];
1263 grpc_transport_stream_op_batch* batch = pending->batch;
1264 if (batch != nullptr) {
Mark D. Rothf3715132018-06-08 14:22:12 -07001265 batches[num_batches++] = batch;
Mark D. Roth718c8342018-02-28 13:00:04 -08001266 pending_batch_clear(calld, pending);
1267 }
1268 }
Mark D. Rothf3715132018-06-08 14:22:12 -07001269 for (size_t i = 1; i < num_batches; ++i) {
1270 grpc_transport_stream_op_batch* batch = batches[i];
1271 batch->handler_private.extra_arg = calld->subchannel_call;
1272 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
1273 resume_pending_batch_in_call_combiner, batch,
1274 grpc_schedule_on_exec_ctx);
1275 GRPC_CALL_COMBINER_START(calld->call_combiner,
1276 &batch->handler_private.closure, GRPC_ERROR_NONE,
1277 "pending_batches_resume");
1278 }
1279 GPR_ASSERT(num_batches > 0);
Mark D. Roth718c8342018-02-28 13:00:04 -08001280 // Note: This will release the call combiner.
Mark D. Rothf3715132018-06-08 14:22:12 -07001281 grpc_subchannel_call_process_op(calld->subchannel_call, batches[0]);
Mark D. Roth718c8342018-02-28 13:00:04 -08001282}
1283
1284static void maybe_clear_pending_batch(grpc_call_element* elem,
1285 pending_batch* pending) {
1286 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1287 call_data* calld = static_cast<call_data*>(elem->call_data);
1288 grpc_transport_stream_op_batch* batch = pending->batch;
1289 // We clear the pending batch if all of its callbacks have been
1290 // scheduled and reset to nullptr.
1291 if (batch->on_complete == nullptr &&
1292 (!batch->recv_initial_metadata ||
1293 batch->payload->recv_initial_metadata.recv_initial_metadata_ready ==
1294 nullptr) &&
1295 (!batch->recv_message ||
Mark D. Rothf3715132018-06-08 14:22:12 -07001296 batch->payload->recv_message.recv_message_ready == nullptr)) {
Mark D. Roth718c8342018-02-28 13:00:04 -08001297 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001298 gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand,
Mark D. Roth718c8342018-02-28 13:00:04 -08001299 calld);
1300 }
1301 pending_batch_clear(calld, pending);
1302 }
1303}
1304
Mark D. Rothf3715132018-06-08 14:22:12 -07001305// Returns true if all ops in the pending batch have been completed.
1306static bool pending_batch_is_completed(
1307 pending_batch* pending, call_data* calld,
1308 subchannel_call_retry_state* retry_state) {
1309 if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
1310 return false;
Mark D. Roth718c8342018-02-28 13:00:04 -08001311 }
Mark D. Rothf3715132018-06-08 14:22:12 -07001312 if (pending->batch->send_initial_metadata &&
1313 !retry_state->completed_send_initial_metadata) {
1314 return false;
1315 }
1316 if (pending->batch->send_message &&
1317 retry_state->completed_send_message_count <
1318 calld->send_messages->size()) {
1319 return false;
1320 }
1321 if (pending->batch->send_trailing_metadata &&
1322 !retry_state->completed_send_trailing_metadata) {
1323 return false;
1324 }
1325 if (pending->batch->recv_initial_metadata &&
1326 !retry_state->completed_recv_initial_metadata) {
1327 return false;
1328 }
1329 if (pending->batch->recv_message &&
1330 retry_state->completed_recv_message_count <
1331 retry_state->started_recv_message_count) {
1332 return false;
1333 }
1334 if (pending->batch->recv_trailing_metadata &&
1335 !retry_state->completed_recv_trailing_metadata) {
1336 return false;
1337 }
1338 return true;
1339}
1340
1341// Returns true if any op in the batch was not yet started.
1342static bool pending_batch_is_unstarted(
1343 pending_batch* pending, call_data* calld,
1344 subchannel_call_retry_state* retry_state) {
1345 if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
1346 return false;
1347 }
1348 if (pending->batch->send_initial_metadata &&
1349 !retry_state->started_send_initial_metadata) {
1350 return true;
1351 }
1352 if (pending->batch->send_message &&
1353 retry_state->started_send_message_count < calld->send_messages->size()) {
1354 return true;
1355 }
1356 if (pending->batch->send_trailing_metadata &&
1357 !retry_state->started_send_trailing_metadata) {
1358 return true;
1359 }
1360 if (pending->batch->recv_initial_metadata &&
1361 !retry_state->started_recv_initial_metadata) {
1362 return true;
1363 }
1364 if (pending->batch->recv_message &&
1365 retry_state->completed_recv_message_count ==
1366 retry_state->started_recv_message_count) {
1367 return true;
1368 }
1369 if (pending->batch->recv_trailing_metadata &&
1370 !retry_state->started_recv_trailing_metadata) {
1371 return true;
1372 }
1373 return false;
Mark D. Roth718c8342018-02-28 13:00:04 -08001374}
1375
1376//
1377// retry code
1378//
1379
1380// Commits the call so that no further retry attempts will be performed.
1381static void retry_commit(grpc_call_element* elem,
1382 subchannel_call_retry_state* retry_state) {
1383 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1384 call_data* calld = static_cast<call_data*>(elem->call_data);
1385 if (calld->retry_committed) return;
1386 calld->retry_committed = true;
1387 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001388 gpr_log(GPR_INFO, "chand=%p calld=%p: committing retries", chand, calld);
Mark D. Roth718c8342018-02-28 13:00:04 -08001389 }
1390 if (retry_state != nullptr) {
1391 free_cached_send_op_data_after_commit(elem, retry_state);
1392 }
1393}
1394
1395// Starts a retry after appropriate back-off.
1396static void do_retry(grpc_call_element* elem,
1397 subchannel_call_retry_state* retry_state,
1398 grpc_millis server_pushback_ms) {
1399 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1400 call_data* calld = static_cast<call_data*>(elem->call_data);
1401 GPR_ASSERT(calld->method_params != nullptr);
1402 const ClientChannelMethodParams::RetryPolicy* retry_policy =
1403 calld->method_params->retry_policy();
1404 GPR_ASSERT(retry_policy != nullptr);
1405 // Reset subchannel call and connected subchannel.
1406 if (calld->subchannel_call != nullptr) {
1407 GRPC_SUBCHANNEL_CALL_UNREF(calld->subchannel_call,
1408 "client_channel_call_retry");
1409 calld->subchannel_call = nullptr;
1410 }
1411 if (calld->pick.connected_subchannel != nullptr) {
1412 calld->pick.connected_subchannel.reset();
1413 }
1414 // Compute backoff delay.
1415 grpc_millis next_attempt_time;
1416 if (server_pushback_ms >= 0) {
1417 next_attempt_time = grpc_core::ExecCtx::Get()->Now() + server_pushback_ms;
1418 calld->last_attempt_got_server_pushback = true;
1419 } else {
1420 if (calld->num_attempts_completed == 1 ||
1421 calld->last_attempt_got_server_pushback) {
1422 calld->retry_backoff.Init(
1423 grpc_core::BackOff::Options()
1424 .set_initial_backoff(retry_policy->initial_backoff)
1425 .set_multiplier(retry_policy->backoff_multiplier)
1426 .set_jitter(RETRY_BACKOFF_JITTER)
1427 .set_max_backoff(retry_policy->max_backoff));
1428 calld->last_attempt_got_server_pushback = false;
1429 }
1430 next_attempt_time = calld->retry_backoff->NextAttemptTime();
1431 }
1432 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001433 gpr_log(GPR_INFO,
Sree Kuchibhotla1dd12c02018-04-11 18:05:48 -07001434 "chand=%p calld=%p: retrying failed call in %" PRId64 " ms", chand,
Mark D. Roth718c8342018-02-28 13:00:04 -08001435 calld, next_attempt_time - grpc_core::ExecCtx::Get()->Now());
1436 }
1437 // Schedule retry after computed delay.
1438 GRPC_CLOSURE_INIT(&calld->pick_closure, start_pick_locked, elem,
1439 grpc_combiner_scheduler(chand->combiner));
1440 grpc_timer_init(&calld->retry_timer, next_attempt_time, &calld->pick_closure);
1441 // Update bookkeeping.
1442 if (retry_state != nullptr) retry_state->retry_dispatched = true;
1443}
1444
1445// Returns true if the call is being retried.
1446static bool maybe_retry(grpc_call_element* elem,
1447 subchannel_batch_data* batch_data,
1448 grpc_status_code status,
1449 grpc_mdelem* server_pushback_md) {
1450 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1451 call_data* calld = static_cast<call_data*>(elem->call_data);
1452 // Get retry policy.
1453 if (calld->method_params == nullptr) return false;
1454 const ClientChannelMethodParams::RetryPolicy* retry_policy =
1455 calld->method_params->retry_policy();
1456 if (retry_policy == nullptr) return false;
1457 // If we've already dispatched a retry from this call, return true.
1458 // This catches the case where the batch has multiple callbacks
1459 // (i.e., it includes either recv_message or recv_initial_metadata).
1460 subchannel_call_retry_state* retry_state = nullptr;
1461 if (batch_data != nullptr) {
1462 retry_state = static_cast<subchannel_call_retry_state*>(
1463 grpc_connected_subchannel_call_get_parent_data(
1464 batch_data->subchannel_call));
1465 if (retry_state->retry_dispatched) {
1466 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001467 gpr_log(GPR_INFO, "chand=%p calld=%p: retry already dispatched", chand,
Mark D. Roth718c8342018-02-28 13:00:04 -08001468 calld);
1469 }
1470 return true;
1471 }
1472 }
1473 // Check status.
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07001474 if (GPR_LIKELY(status == GRPC_STATUS_OK)) {
Mark D. Roth9db86fc2018-03-28 07:42:20 -07001475 if (calld->retry_throttle_data != nullptr) {
1476 calld->retry_throttle_data->RecordSuccess();
1477 }
Mark D. Roth718c8342018-02-28 13:00:04 -08001478 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001479 gpr_log(GPR_INFO, "chand=%p calld=%p: call succeeded", chand, calld);
Mark D. Roth718c8342018-02-28 13:00:04 -08001480 }
1481 return false;
1482 }
1483 // Status is not OK. Check whether the status is retryable.
1484 if (!retry_policy->retryable_status_codes.Contains(status)) {
1485 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001486 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08001487 "chand=%p calld=%p: status %s not configured as retryable", chand,
1488 calld, grpc_status_code_to_string(status));
1489 }
1490 return false;
1491 }
1492 // Record the failure and check whether retries are throttled.
1493 // Note that it's important for this check to come after the status
1494 // code check above, since we should only record failures whose statuses
1495 // match the configured retryable status codes, so that we don't count
1496 // things like failures due to malformed requests (INVALID_ARGUMENT).
1497 // Conversely, it's important for this to come before the remaining
1498 // checks, so that we don't fail to record failures due to other factors.
Mark D. Roth9db86fc2018-03-28 07:42:20 -07001499 if (calld->retry_throttle_data != nullptr &&
1500 !calld->retry_throttle_data->RecordFailure()) {
Mark D. Roth718c8342018-02-28 13:00:04 -08001501 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001502 gpr_log(GPR_INFO, "chand=%p calld=%p: retries throttled", chand, calld);
Mark D. Roth718c8342018-02-28 13:00:04 -08001503 }
1504 return false;
1505 }
1506 // Check whether the call is committed.
1507 if (calld->retry_committed) {
1508 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001509 gpr_log(GPR_INFO, "chand=%p calld=%p: retries already committed", chand,
Mark D. Roth718c8342018-02-28 13:00:04 -08001510 calld);
1511 }
1512 return false;
1513 }
1514 // Check whether we have retries remaining.
1515 ++calld->num_attempts_completed;
1516 if (calld->num_attempts_completed >= retry_policy->max_attempts) {
1517 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001518 gpr_log(GPR_INFO, "chand=%p calld=%p: exceeded %d retry attempts", chand,
Mark D. Roth718c8342018-02-28 13:00:04 -08001519 calld, retry_policy->max_attempts);
1520 }
1521 return false;
1522 }
1523 // If the call was cancelled from the surface, don't retry.
1524 if (calld->cancel_error != GRPC_ERROR_NONE) {
1525 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001526 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08001527 "chand=%p calld=%p: call cancelled from surface, not retrying",
1528 chand, calld);
1529 }
1530 return false;
1531 }
1532 // Check server push-back.
1533 grpc_millis server_pushback_ms = -1;
1534 if (server_pushback_md != nullptr) {
1535 // If the value is "-1" or any other unparseable string, we do not retry.
1536 uint32_t ms;
1537 if (!grpc_parse_slice_to_uint32(GRPC_MDVALUE(*server_pushback_md), &ms)) {
1538 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001539 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08001540 "chand=%p calld=%p: not retrying due to server push-back",
1541 chand, calld);
1542 }
1543 return false;
1544 } else {
1545 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001546 gpr_log(GPR_INFO, "chand=%p calld=%p: server push-back: retry in %u ms",
1547 chand, calld, ms);
Mark D. Roth718c8342018-02-28 13:00:04 -08001548 }
1549 server_pushback_ms = (grpc_millis)ms;
1550 }
1551 }
1552 do_retry(elem, retry_state, server_pushback_ms);
1553 return true;
1554}
1555
1556//
1557// subchannel_batch_data
1558//
1559
1560static subchannel_batch_data* batch_data_create(grpc_call_element* elem,
Mark D. Rothf3715132018-06-08 14:22:12 -07001561 int refcount) {
Mark D. Roth718c8342018-02-28 13:00:04 -08001562 call_data* calld = static_cast<call_data*>(elem->call_data);
1563 subchannel_call_retry_state* retry_state =
1564 static_cast<subchannel_call_retry_state*>(
1565 grpc_connected_subchannel_call_get_parent_data(
1566 calld->subchannel_call));
1567 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(
1568 gpr_arena_alloc(calld->arena, sizeof(*batch_data)));
1569 batch_data->elem = elem;
1570 batch_data->subchannel_call =
1571 GRPC_SUBCHANNEL_CALL_REF(calld->subchannel_call, "batch_data_create");
1572 batch_data->batch.payload = &retry_state->batch_payload;
1573 gpr_ref_init(&batch_data->refs, refcount);
Mark D. Rothf3715132018-06-08 14:22:12 -07001574 GRPC_CLOSURE_INIT(&batch_data->on_complete, on_complete, batch_data,
1575 grpc_schedule_on_exec_ctx);
1576 batch_data->batch.on_complete = &batch_data->on_complete;
Mark D. Roth718c8342018-02-28 13:00:04 -08001577 GRPC_CALL_STACK_REF(calld->owning_call, "batch_data");
1578 return batch_data;
1579}
1580
1581static void batch_data_unref(subchannel_batch_data* batch_data) {
1582 if (gpr_unref(&batch_data->refs)) {
1583 if (batch_data->send_initial_metadata_storage != nullptr) {
1584 grpc_metadata_batch_destroy(&batch_data->send_initial_metadata);
1585 }
1586 if (batch_data->send_trailing_metadata_storage != nullptr) {
1587 grpc_metadata_batch_destroy(&batch_data->send_trailing_metadata);
1588 }
1589 if (batch_data->batch.recv_initial_metadata) {
1590 grpc_metadata_batch_destroy(&batch_data->recv_initial_metadata);
1591 }
1592 if (batch_data->batch.recv_trailing_metadata) {
1593 grpc_metadata_batch_destroy(&batch_data->recv_trailing_metadata);
1594 }
1595 GRPC_SUBCHANNEL_CALL_UNREF(batch_data->subchannel_call, "batch_data_unref");
1596 call_data* calld = static_cast<call_data*>(batch_data->elem->call_data);
1597 GRPC_CALL_STACK_UNREF(calld->owning_call, "batch_data");
1598 }
1599}
1600
1601//
1602// recv_initial_metadata callback handling
1603//
1604
1605// Invokes recv_initial_metadata_ready for a subchannel batch.
1606static void invoke_recv_initial_metadata_callback(void* arg,
1607 grpc_error* error) {
1608 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
Mark D. Rothf3715132018-06-08 14:22:12 -07001609 channel_data* chand =
1610 static_cast<channel_data*>(batch_data->elem->channel_data);
1611 call_data* calld = static_cast<call_data*>(batch_data->elem->call_data);
Mark D. Roth718c8342018-02-28 13:00:04 -08001612 // Find pending batch.
Mark D. Rothf3715132018-06-08 14:22:12 -07001613 pending_batch* pending = nullptr;
1614 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1615 grpc_transport_stream_op_batch* batch = calld->pending_batches[i].batch;
1616 if (batch != nullptr && batch->recv_initial_metadata &&
1617 batch->payload->recv_initial_metadata.recv_initial_metadata_ready !=
1618 nullptr) {
1619 if (grpc_client_channel_trace.enabled()) {
1620 gpr_log(GPR_INFO,
1621 "chand=%p calld=%p: invoking recv_initial_metadata_ready for "
1622 "pending batch at index %" PRIuPTR,
1623 chand, calld, i);
1624 }
1625 pending = &calld->pending_batches[i];
1626 break;
1627 }
1628 }
Mark D. Roth718c8342018-02-28 13:00:04 -08001629 GPR_ASSERT(pending != nullptr);
1630 // Return metadata.
1631 grpc_metadata_batch_move(
1632 &batch_data->recv_initial_metadata,
1633 pending->batch->payload->recv_initial_metadata.recv_initial_metadata);
1634 // Update bookkeeping.
1635 // Note: Need to do this before invoking the callback, since invoking
1636 // the callback will result in yielding the call combiner.
1637 grpc_closure* recv_initial_metadata_ready =
1638 pending->batch->payload->recv_initial_metadata
1639 .recv_initial_metadata_ready;
1640 pending->batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
1641 nullptr;
1642 maybe_clear_pending_batch(batch_data->elem, pending);
1643 batch_data_unref(batch_data);
1644 // Invoke callback.
1645 GRPC_CLOSURE_RUN(recv_initial_metadata_ready, GRPC_ERROR_REF(error));
1646}
1647
1648// Intercepts recv_initial_metadata_ready callback for retries.
1649// Commits the call and returns the initial metadata up the stack.
1650static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
1651 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1652 grpc_call_element* elem = batch_data->elem;
1653 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1654 call_data* calld = static_cast<call_data*>(elem->call_data);
1655 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001656 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08001657 "chand=%p calld=%p: got recv_initial_metadata_ready, error=%s",
1658 chand, calld, grpc_error_string(error));
1659 }
1660 subchannel_call_retry_state* retry_state =
1661 static_cast<subchannel_call_retry_state*>(
1662 grpc_connected_subchannel_call_get_parent_data(
1663 batch_data->subchannel_call));
1664 // If we got an error or a Trailers-Only response and have not yet gotten
Mark D. Rothf3715132018-06-08 14:22:12 -07001665 // the recv_trailing_metadata on_complete callback, then defer
1666 // propagating this callback back to the surface. We can evaluate whether
1667 // to retry when recv_trailing_metadata comes back.
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07001668 if (GPR_UNLIKELY((batch_data->trailing_metadata_available ||
1669 error != GRPC_ERROR_NONE) &&
1670 !retry_state->completed_recv_trailing_metadata)) {
Mark D. Roth718c8342018-02-28 13:00:04 -08001671 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001672 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08001673 "chand=%p calld=%p: deferring recv_initial_metadata_ready "
1674 "(Trailers-Only)",
1675 chand, calld);
1676 }
Mark D. Rothde077ac2018-04-12 08:05:44 -07001677 retry_state->recv_initial_metadata_ready_deferred_batch = batch_data;
Mark D. Roth718c8342018-02-28 13:00:04 -08001678 retry_state->recv_initial_metadata_error = GRPC_ERROR_REF(error);
1679 if (!retry_state->started_recv_trailing_metadata) {
1680 // recv_trailing_metadata not yet started by application; start it
1681 // ourselves to get status.
1682 start_internal_recv_trailing_metadata(elem);
1683 } else {
1684 GRPC_CALL_COMBINER_STOP(
1685 calld->call_combiner,
1686 "recv_initial_metadata_ready trailers-only or error");
1687 }
1688 return;
1689 }
1690 // Received valid initial metadata, so commit the call.
1691 retry_commit(elem, retry_state);
1692 // Manually invoking a callback function; it does not take ownership of error.
1693 invoke_recv_initial_metadata_callback(batch_data, error);
Mark D. Rothf3715132018-06-08 14:22:12 -07001694 GRPC_ERROR_UNREF(error);
Mark D. Roth718c8342018-02-28 13:00:04 -08001695}
1696
1697//
1698// recv_message callback handling
1699//
1700
1701// Invokes recv_message_ready for a subchannel batch.
1702static void invoke_recv_message_callback(void* arg, grpc_error* error) {
1703 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
Mark D. Rothf3715132018-06-08 14:22:12 -07001704 channel_data* chand =
1705 static_cast<channel_data*>(batch_data->elem->channel_data);
1706 call_data* calld = static_cast<call_data*>(batch_data->elem->call_data);
Mark D. Roth718c8342018-02-28 13:00:04 -08001707 // Find pending op.
Mark D. Rothf3715132018-06-08 14:22:12 -07001708 pending_batch* pending = nullptr;
1709 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1710 grpc_transport_stream_op_batch* batch = calld->pending_batches[i].batch;
1711 if (batch != nullptr && batch->recv_message &&
1712 batch->payload->recv_message.recv_message_ready != nullptr) {
1713 if (grpc_client_channel_trace.enabled()) {
1714 gpr_log(GPR_INFO,
1715 "chand=%p calld=%p: invoking recv_message_ready for "
1716 "pending batch at index %" PRIuPTR,
1717 chand, calld, i);
1718 }
1719 pending = &calld->pending_batches[i];
1720 break;
1721 }
1722 }
Mark D. Roth718c8342018-02-28 13:00:04 -08001723 GPR_ASSERT(pending != nullptr);
1724 // Return payload.
1725 *pending->batch->payload->recv_message.recv_message =
Mark D. Roth3d8b32d2018-03-09 13:25:40 -08001726 std::move(batch_data->recv_message);
Mark D. Roth718c8342018-02-28 13:00:04 -08001727 // Update bookkeeping.
1728 // Note: Need to do this before invoking the callback, since invoking
1729 // the callback will result in yielding the call combiner.
1730 grpc_closure* recv_message_ready =
1731 pending->batch->payload->recv_message.recv_message_ready;
1732 pending->batch->payload->recv_message.recv_message_ready = nullptr;
1733 maybe_clear_pending_batch(batch_data->elem, pending);
1734 batch_data_unref(batch_data);
1735 // Invoke callback.
1736 GRPC_CLOSURE_RUN(recv_message_ready, GRPC_ERROR_REF(error));
1737}
1738
1739// Intercepts recv_message_ready callback for retries.
1740// Commits the call and returns the message up the stack.
1741static void recv_message_ready(void* arg, grpc_error* error) {
1742 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1743 grpc_call_element* elem = batch_data->elem;
1744 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1745 call_data* calld = static_cast<call_data*>(elem->call_data);
1746 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001747 gpr_log(GPR_INFO, "chand=%p calld=%p: got recv_message_ready, error=%s",
Mark D. Roth718c8342018-02-28 13:00:04 -08001748 chand, calld, grpc_error_string(error));
1749 }
1750 subchannel_call_retry_state* retry_state =
1751 static_cast<subchannel_call_retry_state*>(
1752 grpc_connected_subchannel_call_get_parent_data(
1753 batch_data->subchannel_call));
1754 // If we got an error or the payload was nullptr and we have not yet gotten
Mark D. Rothf3715132018-06-08 14:22:12 -07001755 // the recv_trailing_metadata on_complete callback, then defer
1756 // propagating this callback back to the surface. We can evaluate whether
1757 // to retry when recv_trailing_metadata comes back.
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07001758 if (GPR_UNLIKELY(
1759 (batch_data->recv_message == nullptr || error != GRPC_ERROR_NONE) &&
1760 !retry_state->completed_recv_trailing_metadata)) {
Mark D. Roth718c8342018-02-28 13:00:04 -08001761 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001762 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08001763 "chand=%p calld=%p: deferring recv_message_ready (nullptr "
1764 "message and recv_trailing_metadata pending)",
1765 chand, calld);
1766 }
Mark D. Rothde077ac2018-04-12 08:05:44 -07001767 retry_state->recv_message_ready_deferred_batch = batch_data;
Mark D. Roth718c8342018-02-28 13:00:04 -08001768 retry_state->recv_message_error = GRPC_ERROR_REF(error);
1769 if (!retry_state->started_recv_trailing_metadata) {
1770 // recv_trailing_metadata not yet started by application; start it
1771 // ourselves to get status.
1772 start_internal_recv_trailing_metadata(elem);
1773 } else {
1774 GRPC_CALL_COMBINER_STOP(calld->call_combiner, "recv_message_ready null");
1775 }
1776 return;
1777 }
1778 // Received a valid message, so commit the call.
1779 retry_commit(elem, retry_state);
1780 // Manually invoking a callback function; it does not take ownership of error.
1781 invoke_recv_message_callback(batch_data, error);
Mark D. Roth8ae0ff02018-06-08 08:37:23 -07001782 GRPC_ERROR_UNREF(error);
1783}
1784
Mark D. Rothf3715132018-06-08 14:22:12 -07001785//
1786// list of closures to execute in call combiner
1787//
1788
1789// Represents a closure that needs to run in the call combiner as part of
1790// starting or completing a batch.
1791typedef struct {
1792 grpc_closure* closure;
1793 grpc_error* error;
1794 const char* reason;
1795 bool free_reason = false;
1796} closure_to_execute;
1797
1798static void execute_closures_in_call_combiner(grpc_call_element* elem,
1799 const char* caller,
1800 closure_to_execute* closures,
1801 size_t num_closures) {
Mark D. Roth8ae0ff02018-06-08 08:37:23 -07001802 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1803 call_data* calld = static_cast<call_data*>(elem->call_data);
Mark D. Rothf3715132018-06-08 14:22:12 -07001804 // Note that the call combiner will be yielded for each closure that
1805 // we schedule. We're already running in the call combiner, so one of
1806 // the closures can be scheduled directly, but the others will
1807 // have to re-enter the call combiner.
1808 if (num_closures > 0) {
1809 if (grpc_client_channel_trace.enabled()) {
1810 gpr_log(GPR_INFO, "chand=%p calld=%p: %s starting closure: %s", chand,
1811 calld, caller, closures[0].reason);
1812 }
1813 GRPC_CLOSURE_SCHED(closures[0].closure, closures[0].error);
1814 if (closures[0].free_reason) {
1815 gpr_free(const_cast<char*>(closures[0].reason));
1816 }
1817 for (size_t i = 1; i < num_closures; ++i) {
1818 if (grpc_client_channel_trace.enabled()) {
1819 gpr_log(GPR_INFO,
1820 "chand=%p calld=%p: %s starting closure in call combiner: %s",
1821 chand, calld, caller, closures[i].reason);
1822 }
1823 GRPC_CALL_COMBINER_START(calld->call_combiner, closures[i].closure,
1824 closures[i].error, closures[i].reason);
1825 if (closures[i].free_reason) {
1826 gpr_free(const_cast<char*>(closures[i].reason));
1827 }
1828 }
Mark D. Roth8ae0ff02018-06-08 08:37:23 -07001829 } else {
Mark D. Rothf3715132018-06-08 14:22:12 -07001830 if (grpc_client_channel_trace.enabled()) {
1831 gpr_log(GPR_INFO, "chand=%p calld=%p: no closures to run for %s", chand,
1832 calld, caller);
Mark D. Roth8ae0ff02018-06-08 08:37:23 -07001833 }
Mark D. Rothf3715132018-06-08 14:22:12 -07001834 GRPC_CALL_COMBINER_STOP(calld->call_combiner, "no closures to run");
Mark D. Roth8ae0ff02018-06-08 08:37:23 -07001835 }
Mark D. Rothde077ac2018-04-12 08:05:44 -07001836}
1837
1838//
Mark D. Roth718c8342018-02-28 13:00:04 -08001839// on_complete callback handling
1840//
1841
Mark D. Rothf3715132018-06-08 14:22:12 -07001842// Updates retry_state to reflect the ops completed in batch_data.
1843static void update_retry_state_for_completed_batch(
1844 subchannel_batch_data* batch_data,
1845 subchannel_call_retry_state* retry_state) {
1846 if (batch_data->batch.send_initial_metadata) {
1847 retry_state->completed_send_initial_metadata = true;
Mark D. Roth718c8342018-02-28 13:00:04 -08001848 }
Mark D. Rothf3715132018-06-08 14:22:12 -07001849 if (batch_data->batch.send_message) {
1850 ++retry_state->completed_send_message_count;
1851 }
1852 if (batch_data->batch.send_trailing_metadata) {
1853 retry_state->completed_send_trailing_metadata = true;
1854 }
1855 if (batch_data->batch.recv_initial_metadata) {
1856 retry_state->completed_recv_initial_metadata = true;
1857 }
1858 if (batch_data->batch.recv_message) {
1859 ++retry_state->completed_recv_message_count;
1860 }
1861 if (batch_data->batch.recv_trailing_metadata) {
1862 retry_state->completed_recv_trailing_metadata = true;
1863 }
1864}
1865
1866// Adds any necessary closures for deferred recv_initial_metadata and
1867// recv_message callbacks to closures, updating *num_closures as needed.
1868static void add_closures_for_deferred_recv_callbacks(
1869 subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state,
1870 closure_to_execute* closures, size_t* num_closures) {
1871 if (batch_data->batch.recv_trailing_metadata) {
1872 // Add closure for deferred recv_initial_metadata_ready.
1873 if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch !=
1874 nullptr)) {
1875 closure_to_execute* closure = &closures[(*num_closures)++];
1876 closure->closure = GRPC_CLOSURE_INIT(
1877 &batch_data->recv_initial_metadata_ready,
1878 invoke_recv_initial_metadata_callback,
1879 retry_state->recv_initial_metadata_ready_deferred_batch,
1880 grpc_schedule_on_exec_ctx);
1881 closure->error = retry_state->recv_initial_metadata_error;
1882 closure->reason = "resuming recv_initial_metadata_ready";
1883 retry_state->recv_initial_metadata_ready_deferred_batch = nullptr;
1884 }
1885 // Add closure for deferred recv_message_ready.
1886 if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch !=
1887 nullptr)) {
1888 closure_to_execute* closure = &closures[(*num_closures)++];
1889 closure->closure = GRPC_CLOSURE_INIT(
1890 &batch_data->recv_message_ready, invoke_recv_message_callback,
1891 retry_state->recv_message_ready_deferred_batch,
1892 grpc_schedule_on_exec_ctx);
1893 closure->error = retry_state->recv_message_error;
1894 closure->reason = "resuming recv_message_ready";
1895 retry_state->recv_message_ready_deferred_batch = nullptr;
1896 }
1897 }
Mark D. Roth718c8342018-02-28 13:00:04 -08001898}
1899
1900// If there are any cached ops to replay or pending ops to start on the
1901// subchannel call, adds a closure to closures to invoke
Mark D. Rothf3715132018-06-08 14:22:12 -07001902// start_retriable_subchannel_batches(), updating *num_closures as needed.
Mark D. Roth718c8342018-02-28 13:00:04 -08001903static void add_closures_for_replay_or_pending_send_ops(
1904 grpc_call_element* elem, subchannel_batch_data* batch_data,
Mark D. Rothf3715132018-06-08 14:22:12 -07001905 subchannel_call_retry_state* retry_state, closure_to_execute* closures,
1906 size_t* num_closures) {
Mark D. Roth718c8342018-02-28 13:00:04 -08001907 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1908 call_data* calld = static_cast<call_data*>(elem->call_data);
1909 bool have_pending_send_message_ops =
Mark D. Rothefcd45b2018-03-28 10:49:59 -07001910 retry_state->started_send_message_count < calld->send_messages->size();
Mark D. Roth718c8342018-02-28 13:00:04 -08001911 bool have_pending_send_trailing_metadata_op =
1912 calld->seen_send_trailing_metadata &&
1913 !retry_state->started_send_trailing_metadata;
1914 if (!have_pending_send_message_ops &&
1915 !have_pending_send_trailing_metadata_op) {
1916 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1917 pending_batch* pending = &calld->pending_batches[i];
1918 grpc_transport_stream_op_batch* batch = pending->batch;
1919 if (batch == nullptr || pending->send_ops_cached) continue;
1920 if (batch->send_message) have_pending_send_message_ops = true;
1921 if (batch->send_trailing_metadata) {
1922 have_pending_send_trailing_metadata_op = true;
1923 }
1924 }
1925 }
1926 if (have_pending_send_message_ops || have_pending_send_trailing_metadata_op) {
1927 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001928 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08001929 "chand=%p calld=%p: starting next batch for pending send op(s)",
1930 chand, calld);
1931 }
Mark D. Rothf3715132018-06-08 14:22:12 -07001932 closure_to_execute* closure = &closures[(*num_closures)++];
1933 closure->closure = GRPC_CLOSURE_INIT(
1934 &batch_data->batch.handler_private.closure,
1935 start_retriable_subchannel_batches, elem, grpc_schedule_on_exec_ctx);
1936 closure->error = GRPC_ERROR_NONE;
1937 closure->reason = "starting next batch for send_* op(s)";
Mark D. Roth718c8342018-02-28 13:00:04 -08001938 }
1939}
1940
Mark D. Rothf3715132018-06-08 14:22:12 -07001941// For any pending batch completed in batch_data, adds the necessary
1942// completion closures to closures, updating *num_closures as needed.
1943static void add_closures_for_completed_pending_batches(
1944 grpc_call_element* elem, subchannel_batch_data* batch_data,
1945 subchannel_call_retry_state* retry_state, grpc_error* error,
1946 closure_to_execute* closures, size_t* num_closures) {
1947 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1948 call_data* calld = static_cast<call_data*>(elem->call_data);
1949 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1950 pending_batch* pending = &calld->pending_batches[i];
1951 if (pending_batch_is_completed(pending, calld, retry_state)) {
1952 if (grpc_client_channel_trace.enabled()) {
1953 gpr_log(GPR_INFO,
1954 "chand=%p calld=%p: pending batch completed at index %" PRIuPTR,
1955 chand, calld, i);
1956 }
1957 // Copy the trailing metadata to return it to the surface.
1958 if (batch_data->batch.recv_trailing_metadata) {
1959 grpc_metadata_batch_move(&batch_data->recv_trailing_metadata,
1960 pending->batch->payload->recv_trailing_metadata
1961 .recv_trailing_metadata);
1962 }
1963 closure_to_execute* closure = &closures[(*num_closures)++];
1964 closure->closure = pending->batch->on_complete;
1965 closure->error = GRPC_ERROR_REF(error);
1966 closure->reason = "on_complete for pending batch";
1967 pending->batch->on_complete = nullptr;
1968 maybe_clear_pending_batch(elem, pending);
1969 }
1970 }
1971 GRPC_ERROR_UNREF(error);
1972}
1973
1974// For any pending batch containing an op that has not yet been started,
1975// adds the pending batch's completion closures to closures, updating
1976// *num_closures as needed.
1977static void add_closures_to_fail_unstarted_pending_batches(
1978 grpc_call_element* elem, subchannel_call_retry_state* retry_state,
1979 grpc_error* error, closure_to_execute* closures, size_t* num_closures) {
1980 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1981 call_data* calld = static_cast<call_data*>(elem->call_data);
1982 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1983 pending_batch* pending = &calld->pending_batches[i];
1984 if (pending_batch_is_unstarted(pending, calld, retry_state)) {
1985 if (grpc_client_channel_trace.enabled()) {
1986 gpr_log(GPR_INFO,
1987 "chand=%p calld=%p: failing unstarted pending batch at index "
1988 "%" PRIuPTR,
1989 chand, calld, i);
1990 }
1991 if (pending->batch->recv_initial_metadata) {
1992 closure_to_execute* closure = &closures[(*num_closures)++];
1993 closure->closure = pending->batch->payload->recv_initial_metadata
1994 .recv_initial_metadata_ready;
1995 closure->error = GRPC_ERROR_REF(error);
1996 closure->reason =
1997 "failing recv_initial_metadata_ready for pending batch";
1998 pending->batch->payload->recv_initial_metadata
1999 .recv_initial_metadata_ready = nullptr;
2000 }
2001 if (pending->batch->recv_message) {
2002 *pending->batch->payload->recv_message.recv_message = nullptr;
2003 closure_to_execute* closure = &closures[(*num_closures)++];
2004 closure->closure =
2005 pending->batch->payload->recv_message.recv_message_ready;
2006 closure->error = GRPC_ERROR_REF(error);
2007 closure->reason = "failing recv_message_ready for pending batch";
2008 pending->batch->payload->recv_message.recv_message_ready = nullptr;
2009 }
2010 closure_to_execute* closure = &closures[(*num_closures)++];
2011 closure->closure = pending->batch->on_complete;
2012 closure->error = GRPC_ERROR_REF(error);
2013 closure->reason = "failing on_complete for pending batch";
2014 pending->batch->on_complete = nullptr;
2015 maybe_clear_pending_batch(elem, pending);
2016 }
2017 }
2018 GRPC_ERROR_UNREF(error);
2019}
2020
Mark D. Roth718c8342018-02-28 13:00:04 -08002021// Callback used to intercept on_complete from subchannel calls.
2022// Called only when retries are enabled.
2023static void on_complete(void* arg, grpc_error* error) {
2024 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
2025 grpc_call_element* elem = batch_data->elem;
2026 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2027 call_data* calld = static_cast<call_data*>(elem->call_data);
2028 if (grpc_client_channel_trace.enabled()) {
2029 char* batch_str = grpc_transport_stream_op_batch_string(&batch_data->batch);
Mark D. Roth48854d22018-04-25 13:05:26 -07002030 gpr_log(GPR_INFO, "chand=%p calld=%p: got on_complete, error=%s, batch=%s",
Mark D. Roth718c8342018-02-28 13:00:04 -08002031 chand, calld, grpc_error_string(error), batch_str);
2032 gpr_free(batch_str);
2033 }
2034 subchannel_call_retry_state* retry_state =
2035 static_cast<subchannel_call_retry_state*>(
2036 grpc_connected_subchannel_call_get_parent_data(
2037 batch_data->subchannel_call));
Mark D. Rothf3715132018-06-08 14:22:12 -07002038 // If we have previously completed recv_trailing_metadata, then the
2039 // call is finished.
2040 bool call_finished = retry_state->completed_recv_trailing_metadata;
2041 // Record whether we were already committed before receiving this callback.
2042 const bool previously_committed = calld->retry_committed;
Mark D. Roth718c8342018-02-28 13:00:04 -08002043 // Update bookkeeping in retry_state.
Mark D. Rothf3715132018-06-08 14:22:12 -07002044 update_retry_state_for_completed_batch(batch_data, retry_state);
2045 if (call_finished) {
2046 if (grpc_client_channel_trace.enabled()) {
2047 gpr_log(GPR_INFO, "chand=%p calld=%p: call already finished", chand,
2048 calld);
2049 }
2050 } else {
2051 // Check if this batch finished the call, and if so, get its status.
2052 // The call is finished if either (a) this callback was invoked with
2053 // an error or (b) we receive status.
2054 grpc_status_code status = GRPC_STATUS_OK;
2055 grpc_mdelem* server_pushback_md = nullptr;
2056 if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { // Case (a).
2057 call_finished = true;
2058 grpc_error_get_status(error, calld->deadline, &status, nullptr, nullptr,
2059 nullptr);
2060 } else if (batch_data->batch.recv_trailing_metadata) { // Case (b).
2061 call_finished = true;
2062 grpc_metadata_batch* md_batch =
2063 batch_data->batch.payload->recv_trailing_metadata
2064 .recv_trailing_metadata;
2065 GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
2066 status = grpc_get_status_code_from_metadata(
2067 md_batch->idx.named.grpc_status->md);
2068 if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
2069 server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
2070 }
2071 }
2072 // If the call just finished, check if we should retry.
2073 if (call_finished) {
2074 if (grpc_client_channel_trace.enabled()) {
2075 gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand,
2076 calld, grpc_status_code_to_string(status));
2077 }
2078 if (maybe_retry(elem, batch_data, status, server_pushback_md)) {
2079 // Unref batch_data for deferred recv_initial_metadata_ready or
2080 // recv_message_ready callbacks, if any.
2081 if (batch_data->batch.recv_trailing_metadata &&
2082 retry_state->recv_initial_metadata_ready_deferred_batch !=
2083 nullptr) {
2084 batch_data_unref(batch_data);
2085 GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
2086 }
2087 if (batch_data->batch.recv_trailing_metadata &&
2088 retry_state->recv_message_ready_deferred_batch != nullptr) {
2089 batch_data_unref(batch_data);
2090 GRPC_ERROR_UNREF(retry_state->recv_message_error);
2091 }
2092 // Track number of pending subchannel send batches and determine if
2093 // this was the last one.
2094 bool last_callback_complete = false;
2095 if (batch_data->batch.send_initial_metadata ||
2096 batch_data->batch.send_message ||
2097 batch_data->batch.send_trailing_metadata) {
2098 --calld->num_pending_retriable_subchannel_send_batches;
2099 last_callback_complete =
2100 calld->num_pending_retriable_subchannel_send_batches == 0;
2101 }
2102 batch_data_unref(batch_data);
2103 // If we just completed the last subchannel send batch, unref the
2104 // call stack.
2105 if (last_callback_complete) {
2106 GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches");
2107 }
2108 return;
2109 }
2110 // Not retrying, so commit the call.
2111 retry_commit(elem, retry_state);
2112 }
Mark D. Roth718c8342018-02-28 13:00:04 -08002113 }
Mark D. Rothf3715132018-06-08 14:22:12 -07002114 // If we were already committed before receiving this callback, free
2115 // cached data for send ops that we've just completed. (If the call has
2116 // just now finished, the call to retry_commit() above will have freed all
2117 // cached send ops, so we don't need to do it here.)
2118 if (previously_committed) {
Mark D. Roth718c8342018-02-28 13:00:04 -08002119 free_cached_send_op_data_for_completed_batch(elem, batch_data, retry_state);
2120 }
Mark D. Rothf3715132018-06-08 14:22:12 -07002121 // Call not being retried.
Mark D. Roth718c8342018-02-28 13:00:04 -08002122 // Construct list of closures to execute.
Mark D. Rothf3715132018-06-08 14:22:12 -07002123 // Max number of closures is number of pending batches plus one for
2124 // each of:
2125 // - recv_initial_metadata_ready (either deferred or unstarted)
2126 // - recv_message_ready (either deferred or unstarted)
2127 // - starting a new batch for pending send ops
2128 closure_to_execute closures[GPR_ARRAY_SIZE(calld->pending_batches) + 3];
2129 size_t num_closures = 0;
2130 // If there are deferred recv_initial_metadata_ready or recv_message_ready
2131 // callbacks, add them to closures.
2132 add_closures_for_deferred_recv_callbacks(batch_data, retry_state, closures,
2133 &num_closures);
2134 // Find pending batches whose ops are now complete and add their
2135 // on_complete callbacks to closures.
2136 add_closures_for_completed_pending_batches(elem, batch_data, retry_state,
2137 GRPC_ERROR_REF(error), closures,
2138 &num_closures);
2139 // Add closures to handle any pending batches that have not yet been started.
2140 // If the call is finished, we fail these batches; otherwise, we add a
2141 // callback to start_retriable_subchannel_batches() to start them on
2142 // the subchannel call.
2143 if (call_finished) {
2144 add_closures_to_fail_unstarted_pending_batches(
2145 elem, retry_state, GRPC_ERROR_REF(error), closures, &num_closures);
2146 } else {
2147 add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state,
2148 closures, &num_closures);
Mark D. Roth718c8342018-02-28 13:00:04 -08002149 }
Mark D. Roth4f9e0032018-05-24 09:30:09 -07002150 // Track number of pending subchannel send batches and determine if this
2151 // was the last one.
Mark D. Rothf3715132018-06-08 14:22:12 -07002152 bool last_callback_complete = false;
2153 if (batch_data->batch.send_initial_metadata ||
2154 batch_data->batch.send_message ||
2155 batch_data->batch.send_trailing_metadata) {
2156 --calld->num_pending_retriable_subchannel_send_batches;
2157 last_callback_complete =
2158 calld->num_pending_retriable_subchannel_send_batches == 0;
2159 }
Mark D. Roth718c8342018-02-28 13:00:04 -08002160 // Don't need batch_data anymore.
2161 batch_data_unref(batch_data);
2162 // Schedule all of the closures identified above.
Mark D. Roth4f9e0032018-05-24 09:30:09 -07002163 // Note: This yeilds the call combiner.
Mark D. Rothf3715132018-06-08 14:22:12 -07002164 execute_closures_in_call_combiner(elem, "on_complete", closures,
2165 num_closures);
2166 // If we just completed the last subchannel send batch, unref the call stack.
Mark D. Roth4f9e0032018-05-24 09:30:09 -07002167 if (last_callback_complete) {
2168 GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches");
2169 }
Mark D. Roth718c8342018-02-28 13:00:04 -08002170}
2171
2172//
2173// subchannel batch construction
2174//
2175
2176// Helper function used to start a subchannel batch in the call combiner.
2177static void start_batch_in_call_combiner(void* arg, grpc_error* ignored) {
2178 grpc_transport_stream_op_batch* batch =
2179 static_cast<grpc_transport_stream_op_batch*>(arg);
2180 grpc_subchannel_call* subchannel_call =
2181 static_cast<grpc_subchannel_call*>(batch->handler_private.extra_arg);
2182 // Note: This will release the call combiner.
2183 grpc_subchannel_call_process_op(subchannel_call, batch);
2184}
2185
Mark D. Rothde077ac2018-04-12 08:05:44 -07002186// Adds a closure to closures that will execute batch in the call combiner.
2187static void add_closure_for_subchannel_batch(
Mark D. Rothf3715132018-06-08 14:22:12 -07002188 call_data* calld, grpc_transport_stream_op_batch* batch,
2189 closure_to_execute* closures, size_t* num_closures) {
Mark D. Rothde077ac2018-04-12 08:05:44 -07002190 batch->handler_private.extra_arg = calld->subchannel_call;
2191 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2192 start_batch_in_call_combiner, batch,
2193 grpc_schedule_on_exec_ctx);
Mark D. Rothf3715132018-06-08 14:22:12 -07002194 closure_to_execute* closure = &closures[(*num_closures)++];
2195 closure->closure = &batch->handler_private.closure;
2196 closure->error = GRPC_ERROR_NONE;
2197 // If the tracer is enabled, we log a more detailed message, which
2198 // requires dynamic allocation. This will be freed in
2199 // start_retriable_subchannel_batches().
Mark D. Rothde077ac2018-04-12 08:05:44 -07002200 if (grpc_client_channel_trace.enabled()) {
2201 char* batch_str = grpc_transport_stream_op_batch_string(batch);
Mark D. Rothf3715132018-06-08 14:22:12 -07002202 gpr_asprintf(const_cast<char**>(&closure->reason),
2203 "starting batch in call combiner: %s", batch_str);
Mark D. Rothde077ac2018-04-12 08:05:44 -07002204 gpr_free(batch_str);
Mark D. Rothf3715132018-06-08 14:22:12 -07002205 closure->free_reason = true;
2206 } else {
2207 closure->reason = "start_subchannel_batch";
Mark D. Rothde077ac2018-04-12 08:05:44 -07002208 }
2209}
2210
Mark D. Roth718c8342018-02-28 13:00:04 -08002211// Adds retriable send_initial_metadata op to batch_data.
2212static void add_retriable_send_initial_metadata_op(
2213 call_data* calld, subchannel_call_retry_state* retry_state,
2214 subchannel_batch_data* batch_data) {
2215 // Maps the number of retries to the corresponding metadata value slice.
2216 static const grpc_slice* retry_count_strings[] = {
2217 &GRPC_MDSTR_1, &GRPC_MDSTR_2, &GRPC_MDSTR_3, &GRPC_MDSTR_4};
2218 // We need to make a copy of the metadata batch for each attempt, since
2219 // the filters in the subchannel stack may modify this batch, and we don't
2220 // want those modifications to be passed forward to subsequent attempts.
2221 //
2222 // If we've already completed one or more attempts, add the
2223 // grpc-retry-attempts header.
2224 batch_data->send_initial_metadata_storage =
2225 static_cast<grpc_linked_mdelem*>(gpr_arena_alloc(
2226 calld->arena, sizeof(grpc_linked_mdelem) *
2227 (calld->send_initial_metadata.list.count +
2228 (calld->num_attempts_completed > 0))));
2229 grpc_metadata_batch_copy(&calld->send_initial_metadata,
2230 &batch_data->send_initial_metadata,
2231 batch_data->send_initial_metadata_storage);
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07002232 if (GPR_UNLIKELY(batch_data->send_initial_metadata.idx.named
2233 .grpc_previous_rpc_attempts != nullptr)) {
Mark D. Roth718c8342018-02-28 13:00:04 -08002234 grpc_metadata_batch_remove(
2235 &batch_data->send_initial_metadata,
2236 batch_data->send_initial_metadata.idx.named.grpc_previous_rpc_attempts);
2237 }
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07002238 if (GPR_UNLIKELY(calld->num_attempts_completed > 0)) {
Mark D. Roth718c8342018-02-28 13:00:04 -08002239 grpc_mdelem retry_md = grpc_mdelem_from_slices(
2240 GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS,
2241 *retry_count_strings[calld->num_attempts_completed - 1]);
2242 grpc_error* error = grpc_metadata_batch_add_tail(
2243 &batch_data->send_initial_metadata,
2244 &batch_data->send_initial_metadata_storage[calld->send_initial_metadata
2245 .list.count],
2246 retry_md);
Yash Tibrewal7f51ba82018-04-12 13:21:20 -07002247 if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
Mark D. Roth718c8342018-02-28 13:00:04 -08002248 gpr_log(GPR_ERROR, "error adding retry metadata: %s",
2249 grpc_error_string(error));
2250 GPR_ASSERT(false);
2251 }
2252 }
2253 retry_state->started_send_initial_metadata = true;
2254 batch_data->batch.send_initial_metadata = true;
2255 batch_data->batch.payload->send_initial_metadata.send_initial_metadata =
2256 &batch_data->send_initial_metadata;
2257 batch_data->batch.payload->send_initial_metadata.send_initial_metadata_flags =
2258 calld->send_initial_metadata_flags;
2259 batch_data->batch.payload->send_initial_metadata.peer_string =
2260 calld->peer_string;
2261}
2262
2263// Adds retriable send_message op to batch_data.
2264static void add_retriable_send_message_op(
2265 grpc_call_element* elem, subchannel_call_retry_state* retry_state,
2266 subchannel_batch_data* batch_data) {
2267 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2268 call_data* calld = static_cast<call_data*>(elem->call_data);
2269 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07002270 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08002271 "chand=%p calld=%p: starting calld->send_messages[%" PRIuPTR "]",
2272 chand, calld, retry_state->started_send_message_count);
2273 }
Mark D. Roth3d8b32d2018-03-09 13:25:40 -08002274 grpc_core::ByteStreamCache* cache =
Mark D. Rothefcd45b2018-03-28 10:49:59 -07002275 (*calld->send_messages)[retry_state->started_send_message_count];
Mark D. Roth718c8342018-02-28 13:00:04 -08002276 ++retry_state->started_send_message_count;
Mark D. Roth3d8b32d2018-03-09 13:25:40 -08002277 batch_data->send_message.Init(cache);
Mark D. Roth718c8342018-02-28 13:00:04 -08002278 batch_data->batch.send_message = true;
Mark D. Roth3d8b32d2018-03-09 13:25:40 -08002279 batch_data->batch.payload->send_message.send_message.reset(
2280 batch_data->send_message.get());
Mark D. Roth718c8342018-02-28 13:00:04 -08002281}
2282
2283// Adds retriable send_trailing_metadata op to batch_data.
2284static void add_retriable_send_trailing_metadata_op(
2285 call_data* calld, subchannel_call_retry_state* retry_state,
2286 subchannel_batch_data* batch_data) {
2287 // We need to make a copy of the metadata batch for each attempt, since
2288 // the filters in the subchannel stack may modify this batch, and we don't
2289 // want those modifications to be passed forward to subsequent attempts.
2290 batch_data->send_trailing_metadata_storage =
2291 static_cast<grpc_linked_mdelem*>(gpr_arena_alloc(
2292 calld->arena, sizeof(grpc_linked_mdelem) *
2293 calld->send_trailing_metadata.list.count));
2294 grpc_metadata_batch_copy(&calld->send_trailing_metadata,
2295 &batch_data->send_trailing_metadata,
2296 batch_data->send_trailing_metadata_storage);
2297 retry_state->started_send_trailing_metadata = true;
2298 batch_data->batch.send_trailing_metadata = true;
2299 batch_data->batch.payload->send_trailing_metadata.send_trailing_metadata =
2300 &batch_data->send_trailing_metadata;
2301}
2302
2303// Adds retriable recv_initial_metadata op to batch_data.
2304static void add_retriable_recv_initial_metadata_op(
2305 call_data* calld, subchannel_call_retry_state* retry_state,
2306 subchannel_batch_data* batch_data) {
2307 retry_state->started_recv_initial_metadata = true;
2308 batch_data->batch.recv_initial_metadata = true;
2309 grpc_metadata_batch_init(&batch_data->recv_initial_metadata);
2310 batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata =
2311 &batch_data->recv_initial_metadata;
2312 batch_data->batch.payload->recv_initial_metadata.trailing_metadata_available =
2313 &batch_data->trailing_metadata_available;
2314 GRPC_CLOSURE_INIT(&batch_data->recv_initial_metadata_ready,
2315 recv_initial_metadata_ready, batch_data,
2316 grpc_schedule_on_exec_ctx);
2317 batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata_ready =
2318 &batch_data->recv_initial_metadata_ready;
2319}
2320
2321// Adds retriable recv_message op to batch_data.
2322static void add_retriable_recv_message_op(
2323 call_data* calld, subchannel_call_retry_state* retry_state,
2324 subchannel_batch_data* batch_data) {
2325 ++retry_state->started_recv_message_count;
2326 batch_data->batch.recv_message = true;
2327 batch_data->batch.payload->recv_message.recv_message =
2328 &batch_data->recv_message;
2329 GRPC_CLOSURE_INIT(&batch_data->recv_message_ready, recv_message_ready,
2330 batch_data, grpc_schedule_on_exec_ctx);
2331 batch_data->batch.payload->recv_message.recv_message_ready =
2332 &batch_data->recv_message_ready;
2333}
2334
2335// Adds retriable recv_trailing_metadata op to batch_data.
2336static void add_retriable_recv_trailing_metadata_op(
2337 call_data* calld, subchannel_call_retry_state* retry_state,
2338 subchannel_batch_data* batch_data) {
2339 retry_state->started_recv_trailing_metadata = true;
2340 batch_data->batch.recv_trailing_metadata = true;
2341 grpc_metadata_batch_init(&batch_data->recv_trailing_metadata);
2342 batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata =
2343 &batch_data->recv_trailing_metadata;
Mark D. Rothf3715132018-06-08 14:22:12 -07002344 batch_data->batch.collect_stats = true;
2345 batch_data->batch.payload->collect_stats.collect_stats =
Mark D. Roth718c8342018-02-28 13:00:04 -08002346 &batch_data->collect_stats;
2347}
2348
2349// Helper function used to start a recv_trailing_metadata batch. This
2350// is used in the case where a recv_initial_metadata or recv_message
2351// op fails in a way that we know the call is over but when the application
2352// has not yet started its own recv_trailing_metadata op.
2353static void start_internal_recv_trailing_metadata(grpc_call_element* elem) {
2354 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2355 call_data* calld = static_cast<call_data*>(elem->call_data);
2356 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07002357 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08002358 "chand=%p calld=%p: call failed but recv_trailing_metadata not "
2359 "started; starting it internally",
2360 chand, calld);
2361 }
2362 subchannel_call_retry_state* retry_state =
2363 static_cast<subchannel_call_retry_state*>(
2364 grpc_connected_subchannel_call_get_parent_data(
2365 calld->subchannel_call));
Mark D. Rothde077ac2018-04-12 08:05:44 -07002366 // Create batch_data with 2 refs, since this batch will be unreffed twice:
Mark D. Rothf3715132018-06-08 14:22:12 -07002367 // once when the subchannel batch returns, and again when we actually get
2368 // a recv_trailing_metadata op from the surface.
2369 subchannel_batch_data* batch_data = batch_data_create(elem, 2);
Mark D. Roth718c8342018-02-28 13:00:04 -08002370 add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
Mark D. Rothde077ac2018-04-12 08:05:44 -07002371 retry_state->recv_trailing_metadata_internal_batch = batch_data;
Mark D. Roth718c8342018-02-28 13:00:04 -08002372 // Note: This will release the call combiner.
2373 grpc_subchannel_call_process_op(calld->subchannel_call, &batch_data->batch);
2374}
2375
2376// If there are any cached send ops that need to be replayed on the
2377// current subchannel call, creates and returns a new subchannel batch
2378// to replay those ops. Otherwise, returns nullptr.
2379static subchannel_batch_data* maybe_create_subchannel_batch_for_replay(
2380 grpc_call_element* elem, subchannel_call_retry_state* retry_state) {
2381 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2382 call_data* calld = static_cast<call_data*>(elem->call_data);
2383 subchannel_batch_data* replay_batch_data = nullptr;
2384 // send_initial_metadata.
2385 if (calld->seen_send_initial_metadata &&
2386 !retry_state->started_send_initial_metadata &&
2387 !calld->pending_send_initial_metadata) {
2388 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07002389 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08002390 "chand=%p calld=%p: replaying previously completed "
2391 "send_initial_metadata op",
2392 chand, calld);
2393 }
Mark D. Rothf3715132018-06-08 14:22:12 -07002394 replay_batch_data = batch_data_create(elem, 1);
Mark D. Roth718c8342018-02-28 13:00:04 -08002395 add_retriable_send_initial_metadata_op(calld, retry_state,
2396 replay_batch_data);
2397 }
2398 // send_message.
2399 // Note that we can only have one send_message op in flight at a time.
Mark D. Rothefcd45b2018-03-28 10:49:59 -07002400 if (retry_state->started_send_message_count < calld->send_messages->size() &&
Mark D. Roth718c8342018-02-28 13:00:04 -08002401 retry_state->started_send_message_count ==
2402 retry_state->completed_send_message_count &&
2403 !calld->pending_send_message) {
2404 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07002405 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08002406 "chand=%p calld=%p: replaying previously completed "
2407 "send_message op",
2408 chand, calld);
2409 }
2410 if (replay_batch_data == nullptr) {
Mark D. Rothf3715132018-06-08 14:22:12 -07002411 replay_batch_data = batch_data_create(elem, 1);
Mark D. Roth718c8342018-02-28 13:00:04 -08002412 }
2413 add_retriable_send_message_op(elem, retry_state, replay_batch_data);
2414 }
2415 // send_trailing_metadata.
2416 // Note that we only add this op if we have no more send_message ops
2417 // to start, since we can't send down any more send_message ops after
2418 // send_trailing_metadata.
2419 if (calld->seen_send_trailing_metadata &&
Mark D. Rothefcd45b2018-03-28 10:49:59 -07002420 retry_state->started_send_message_count == calld->send_messages->size() &&
Mark D. Roth718c8342018-02-28 13:00:04 -08002421 !retry_state->started_send_trailing_metadata &&
2422 !calld->pending_send_trailing_metadata) {
2423 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07002424 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08002425 "chand=%p calld=%p: replaying previously completed "
2426 "send_trailing_metadata op",
2427 chand, calld);
2428 }
2429 if (replay_batch_data == nullptr) {
Mark D. Rothf3715132018-06-08 14:22:12 -07002430 replay_batch_data = batch_data_create(elem, 1);
Mark D. Roth718c8342018-02-28 13:00:04 -08002431 }
2432 add_retriable_send_trailing_metadata_op(calld, retry_state,
2433 replay_batch_data);
2434 }
2435 return replay_batch_data;
2436}
2437
2438// Adds subchannel batches for pending batches to batches, updating
2439// *num_batches as needed.
2440static void add_subchannel_batches_for_pending_batches(
2441 grpc_call_element* elem, subchannel_call_retry_state* retry_state,
Mark D. Rothf3715132018-06-08 14:22:12 -07002442 closure_to_execute* closures, size_t* num_closures) {
Mark D. Roth718c8342018-02-28 13:00:04 -08002443 call_data* calld = static_cast<call_data*>(elem->call_data);
2444 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
2445 pending_batch* pending = &calld->pending_batches[i];
2446 grpc_transport_stream_op_batch* batch = pending->batch;
2447 if (batch == nullptr) continue;
2448 // Skip any batch that either (a) has already been started on this
2449 // subchannel call or (b) we can't start yet because we're still
2450 // replaying send ops that need to be completed first.
2451 // TODO(roth): Note that if any one op in the batch can't be sent
2452 // yet due to ops that we're replaying, we don't start any of the ops
2453 // in the batch. This is probably okay, but it could conceivably
2454 // lead to increased latency in some cases -- e.g., we could delay
2455 // starting a recv op due to it being in the same batch with a send
2456 // op. If/when we revamp the callback protocol in
2457 // transport_stream_op_batch, we may be able to fix this.
2458 if (batch->send_initial_metadata &&
2459 retry_state->started_send_initial_metadata) {
2460 continue;
2461 }
2462 if (batch->send_message && retry_state->completed_send_message_count <
2463 retry_state->started_send_message_count) {
2464 continue;
2465 }
2466 // Note that we only start send_trailing_metadata if we have no more
2467 // send_message ops to start, since we can't send down any more
2468 // send_message ops after send_trailing_metadata.
2469 if (batch->send_trailing_metadata &&
2470 (retry_state->started_send_message_count + batch->send_message <
Mark D. Rothefcd45b2018-03-28 10:49:59 -07002471 calld->send_messages->size() ||
Mark D. Roth718c8342018-02-28 13:00:04 -08002472 retry_state->started_send_trailing_metadata)) {
2473 continue;
2474 }
2475 if (batch->recv_initial_metadata &&
2476 retry_state->started_recv_initial_metadata) {
2477 continue;
2478 }
2479 if (batch->recv_message && retry_state->completed_recv_message_count <
2480 retry_state->started_recv_message_count) {
2481 continue;
2482 }
2483 if (batch->recv_trailing_metadata &&
2484 retry_state->started_recv_trailing_metadata) {
Mark D. Rothde077ac2018-04-12 08:05:44 -07002485 // If we previously completed a recv_trailing_metadata op
2486 // initiated by start_internal_recv_trailing_metadata(), use the
2487 // result of that instead of trying to re-start this op.
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07002488 if (GPR_UNLIKELY((retry_state->recv_trailing_metadata_internal_batch !=
2489 nullptr))) {
Mark D. Rothde077ac2018-04-12 08:05:44 -07002490 // If the batch completed, then trigger the completion callback
2491 // directly, so that we return the previously returned results to
2492 // the application. Otherwise, just unref the internally
2493 // started subchannel batch, since we'll propagate the
2494 // completion when it completes.
2495 if (retry_state->completed_recv_trailing_metadata) {
2496 subchannel_batch_data* batch_data =
2497 retry_state->recv_trailing_metadata_internal_batch;
Mark D. Rothf3715132018-06-08 14:22:12 -07002498 closure_to_execute* closure = &closures[(*num_closures)++];
2499 closure->closure = &batch_data->on_complete;
Mark D. Rothde077ac2018-04-12 08:05:44 -07002500 // Batches containing recv_trailing_metadata always succeed.
Mark D. Rothf3715132018-06-08 14:22:12 -07002501 closure->error = GRPC_ERROR_NONE;
2502 closure->reason =
2503 "re-executing on_complete for recv_trailing_metadata "
2504 "to propagate internally triggered result";
Mark D. Rothde077ac2018-04-12 08:05:44 -07002505 } else {
2506 batch_data_unref(retry_state->recv_trailing_metadata_internal_batch);
2507 }
2508 retry_state->recv_trailing_metadata_internal_batch = nullptr;
2509 }
Mark D. Roth718c8342018-02-28 13:00:04 -08002510 continue;
2511 }
2512 // If we're not retrying, just send the batch as-is.
2513 if (calld->method_params == nullptr ||
2514 calld->method_params->retry_policy() == nullptr ||
2515 calld->retry_committed) {
Mark D. Rothf3715132018-06-08 14:22:12 -07002516 add_closure_for_subchannel_batch(calld, batch, closures, num_closures);
Mark D. Roth718c8342018-02-28 13:00:04 -08002517 pending_batch_clear(calld, pending);
2518 continue;
2519 }
2520 // Create batch with the right number of callbacks.
Mark D. Rothf3715132018-06-08 14:22:12 -07002521 const int num_callbacks =
2522 1 + batch->recv_initial_metadata + batch->recv_message;
2523 subchannel_batch_data* batch_data = batch_data_create(elem, num_callbacks);
Mark D. Roth718c8342018-02-28 13:00:04 -08002524 // Cache send ops if needed.
2525 maybe_cache_send_ops_for_batch(calld, pending);
2526 // send_initial_metadata.
2527 if (batch->send_initial_metadata) {
2528 add_retriable_send_initial_metadata_op(calld, retry_state, batch_data);
2529 }
2530 // send_message.
2531 if (batch->send_message) {
2532 add_retriable_send_message_op(elem, retry_state, batch_data);
2533 }
2534 // send_trailing_metadata.
2535 if (batch->send_trailing_metadata) {
2536 add_retriable_send_trailing_metadata_op(calld, retry_state, batch_data);
2537 }
2538 // recv_initial_metadata.
2539 if (batch->recv_initial_metadata) {
2540 // recv_flags is only used on the server side.
2541 GPR_ASSERT(batch->payload->recv_initial_metadata.recv_flags == nullptr);
2542 add_retriable_recv_initial_metadata_op(calld, retry_state, batch_data);
2543 }
2544 // recv_message.
2545 if (batch->recv_message) {
2546 add_retriable_recv_message_op(calld, retry_state, batch_data);
2547 }
2548 // recv_trailing_metadata.
2549 if (batch->recv_trailing_metadata) {
Mark D. Rothf3715132018-06-08 14:22:12 -07002550 GPR_ASSERT(batch->collect_stats);
Mark D. Roth718c8342018-02-28 13:00:04 -08002551 add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
2552 }
Mark D. Rothf3715132018-06-08 14:22:12 -07002553 add_closure_for_subchannel_batch(calld, &batch_data->batch, closures,
2554 num_closures);
Mark D. Roth4f9e0032018-05-24 09:30:09 -07002555 // Track number of pending subchannel send batches.
2556 // If this is the first one, take a ref to the call stack.
2557 if (batch->send_initial_metadata || batch->send_message ||
2558 batch->send_trailing_metadata) {
2559 if (calld->num_pending_retriable_subchannel_send_batches == 0) {
2560 GRPC_CALL_STACK_REF(calld->owning_call, "subchannel_send_batches");
2561 }
2562 ++calld->num_pending_retriable_subchannel_send_batches;
2563 }
Mark D. Roth718c8342018-02-28 13:00:04 -08002564 }
2565}
2566
2567// Constructs and starts whatever subchannel batches are needed on the
2568// subchannel call.
2569static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) {
2570 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2571 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2572 call_data* calld = static_cast<call_data*>(elem->call_data);
2573 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07002574 gpr_log(GPR_INFO, "chand=%p calld=%p: constructing retriable batches",
Mark D. Roth718c8342018-02-28 13:00:04 -08002575 chand, calld);
2576 }
2577 subchannel_call_retry_state* retry_state =
2578 static_cast<subchannel_call_retry_state*>(
2579 grpc_connected_subchannel_call_get_parent_data(
2580 calld->subchannel_call));
Mark D. Rothde077ac2018-04-12 08:05:44 -07002581 // Construct list of closures to execute, one for each pending batch.
Mark D. Rothf3715132018-06-08 14:22:12 -07002582 // We can start up to 6 batches.
2583 closure_to_execute closures[GPR_ARRAY_SIZE(calld->pending_batches)];
2584 size_t num_closures = 0;
Mark D. Roth718c8342018-02-28 13:00:04 -08002585 // Replay previously-returned send_* ops if needed.
2586 subchannel_batch_data* replay_batch_data =
2587 maybe_create_subchannel_batch_for_replay(elem, retry_state);
2588 if (replay_batch_data != nullptr) {
Mark D. Rothf3715132018-06-08 14:22:12 -07002589 add_closure_for_subchannel_batch(calld, &replay_batch_data->batch, closures,
2590 &num_closures);
Mark D. Roth4f9e0032018-05-24 09:30:09 -07002591 // Track number of pending subchannel send batches.
2592 // If this is the first one, take a ref to the call stack.
2593 if (calld->num_pending_retriable_subchannel_send_batches == 0) {
2594 GRPC_CALL_STACK_REF(calld->owning_call, "subchannel_send_batches");
2595 }
2596 ++calld->num_pending_retriable_subchannel_send_batches;
Mark D. Roth718c8342018-02-28 13:00:04 -08002597 }
2598 // Now add pending batches.
Mark D. Rothf3715132018-06-08 14:22:12 -07002599 add_subchannel_batches_for_pending_batches(elem, retry_state, closures,
2600 &num_closures);
Mark D. Roth718c8342018-02-28 13:00:04 -08002601 // Start batches on subchannel call.
Mark D. Roth718c8342018-02-28 13:00:04 -08002602 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07002603 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08002604 "chand=%p calld=%p: starting %" PRIuPTR
2605 " retriable batches on subchannel_call=%p",
Mark D. Rothf3715132018-06-08 14:22:12 -07002606 chand, calld, num_closures, calld->subchannel_call);
Mark D. Roth718c8342018-02-28 13:00:04 -08002607 }
Mark D. Rothf3715132018-06-08 14:22:12 -07002608 execute_closures_in_call_combiner(elem, "start_retriable_subchannel_batches",
2609 closures, num_closures);
Mark D. Roth718c8342018-02-28 13:00:04 -08002610}
2611
2612//
2613// LB pick
2614//
2615
2616static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) {
2617 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2618 call_data* calld = static_cast<call_data*>(elem->call_data);
2619 const size_t parent_data_size =
2620 calld->enable_retries ? sizeof(subchannel_call_retry_state) : 0;
David Garcia Quintasbaf1ac72018-01-09 14:24:32 -08002621 const grpc_core::ConnectedSubchannel::CallArgs call_args = {
Mark D. Rothc0febd32018-01-09 10:25:24 -08002622 calld->pollent, // pollent
2623 calld->path, // path
2624 calld->call_start_time, // start_time
2625 calld->deadline, // deadline
2626 calld->arena, // arena
2627 calld->pick.subchannel_call_context, // context
Mark D. Roth718c8342018-02-28 13:00:04 -08002628 calld->call_combiner, // call_combiner
2629 parent_data_size // parent_data_size
Yash Tibrewald8b84a22017-09-25 13:38:03 -07002630 };
David Garcia Quintas70fbe622018-01-09 19:27:46 -08002631 grpc_error* new_error = calld->pick.connected_subchannel->CreateCall(
David Garcia Quintasbaf1ac72018-01-09 14:24:32 -08002632 call_args, &calld->subchannel_call);
Craig Tiller6014e8a2017-10-16 13:50:29 -07002633 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07002634 gpr_log(GPR_INFO, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
Mark D. Roth76e264b2017-08-25 09:03:33 -07002635 chand, calld, calld->subchannel_call, grpc_error_string(new_error));
Mark D. Roth60751fe2017-07-07 12:50:33 -07002636 }
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07002637 if (GPR_UNLIKELY(new_error != GRPC_ERROR_NONE)) {
Mark D. Roth0ca0be82017-06-20 07:49:33 -07002638 new_error = grpc_error_add_child(new_error, error);
Mark D. Roth718c8342018-02-28 13:00:04 -08002639 pending_batches_fail(elem, new_error, true /* yield_call_combiner */);
Mark D. Roth0ca0be82017-06-20 07:49:33 -07002640 } else {
Mark D. Roth718c8342018-02-28 13:00:04 -08002641 if (parent_data_size > 0) {
2642 subchannel_call_retry_state* retry_state =
2643 static_cast<subchannel_call_retry_state*>(
2644 grpc_connected_subchannel_call_get_parent_data(
2645 calld->subchannel_call));
2646 retry_state->batch_payload.context = calld->pick.subchannel_call_context;
2647 }
2648 pending_batches_resume(elem);
Craig Tiller11c17d42017-03-13 13:36:34 -07002649 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -07002650 GRPC_ERROR_UNREF(error);
Craig Tiller11c17d42017-03-13 13:36:34 -07002651}
2652
Mark D. Rothb2929602017-09-11 09:31:11 -07002653// Invoked when a pick is completed, on both success or failure.
Mark D. Roth718c8342018-02-28 13:00:04 -08002654static void pick_done(void* arg, grpc_error* error) {
2655 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
Noah Eisenbe82e642018-02-09 09:16:55 -08002656 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Mark D. Roth718c8342018-02-28 13:00:04 -08002657 call_data* calld = static_cast<call_data*>(elem->call_data);
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07002658 if (GPR_UNLIKELY(calld->pick.connected_subchannel == nullptr)) {
Mark D. Roth0ca0be82017-06-20 07:49:33 -07002659 // Failed to create subchannel.
Mark D. Roth718c8342018-02-28 13:00:04 -08002660 // If there was no error, this is an LB policy drop, in which case
2661 // we return an error; otherwise, we may retry.
2662 grpc_status_code status = GRPC_STATUS_OK;
2663 grpc_error_get_status(error, calld->deadline, &status, nullptr, nullptr,
2664 nullptr);
2665 if (error == GRPC_ERROR_NONE || !calld->enable_retries ||
2666 !maybe_retry(elem, nullptr /* batch_data */, status,
2667 nullptr /* server_pushback_md */)) {
2668 grpc_error* new_error =
2669 error == GRPC_ERROR_NONE
2670 ? GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2671 "Call dropped by load balancing policy")
2672 : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2673 "Failed to create subchannel", &error, 1);
2674 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07002675 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08002676 "chand=%p calld=%p: failed to create subchannel: error=%s",
2677 chand, calld, grpc_error_string(new_error));
2678 }
2679 pending_batches_fail(elem, new_error, true /* yield_call_combiner */);
Mark D. Roth60751fe2017-07-07 12:50:33 -07002680 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -07002681 } else {
Mark D. Roth9fe284e2016-09-12 11:22:27 -07002682 /* Create call on subchannel. */
Mark D. Roth718c8342018-02-28 13:00:04 -08002683 create_subchannel_call(elem, GRPC_ERROR_REF(error));
Mark D. Roth2a5959f2016-09-01 08:20:27 -07002684 }
Mark D. Roth718c8342018-02-28 13:00:04 -08002685}
2686
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002687static void maybe_add_call_to_channel_interested_parties_locked(
2688 grpc_call_element* elem) {
2689 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2690 call_data* calld = static_cast<call_data*>(elem->call_data);
2691 if (!calld->pollent_added_to_interested_parties) {
2692 calld->pollent_added_to_interested_parties = true;
2693 grpc_polling_entity_add_to_pollset_set(calld->pollent,
2694 chand->interested_parties);
2695 }
2696}
2697
2698static void maybe_del_call_from_channel_interested_parties_locked(
2699 grpc_call_element* elem) {
2700 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2701 call_data* calld = static_cast<call_data*>(elem->call_data);
2702 if (calld->pollent_added_to_interested_parties) {
2703 calld->pollent_added_to_interested_parties = false;
2704 grpc_polling_entity_del_from_pollset_set(calld->pollent,
2705 chand->interested_parties);
2706 }
2707}
2708
Mark D. Roth718c8342018-02-28 13:00:04 -08002709// Invoked when a pick is completed to leave the client_channel combiner
2710// and continue processing in the call combiner.
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002711// If needed, removes the call's polling entity from chand->interested_parties.
Mark D. Roth718c8342018-02-28 13:00:04 -08002712static void pick_done_locked(grpc_call_element* elem, grpc_error* error) {
2713 call_data* calld = static_cast<call_data*>(elem->call_data);
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002714 maybe_del_call_from_channel_interested_parties_locked(elem);
Mark D. Roth718c8342018-02-28 13:00:04 -08002715 GRPC_CLOSURE_INIT(&calld->pick_closure, pick_done, elem,
2716 grpc_schedule_on_exec_ctx);
2717 GRPC_CLOSURE_SCHED(&calld->pick_closure, error);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07002718}
2719
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002720namespace grpc_core {
Mark D. Rothb2929602017-09-11 09:31:11 -07002721
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002722// Performs subchannel pick via LB policy.
2723class LbPicker {
2724 public:
2725 // Starts a pick on chand->lb_policy.
2726 static void StartLocked(grpc_call_element* elem) {
2727 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2728 call_data* calld = static_cast<call_data*>(elem->call_data);
Craig Tiller6014e8a2017-10-16 13:50:29 -07002729 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002730 gpr_log(GPR_INFO, "chand=%p calld=%p: starting pick on lb_policy=%p",
Mark D. Rothc8875492018-02-20 08:33:48 -08002731 chand, calld, chand->lb_policy.get());
Mark D. Rothb2929602017-09-11 09:31:11 -07002732 }
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002733 // If this is a retry, use the send_initial_metadata payload that
2734 // we've cached; otherwise, use the pending batch. The
2735 // send_initial_metadata batch will be the first pending batch in the
2736 // list, as set by get_batch_index() above.
2737 calld->pick.initial_metadata =
2738 calld->seen_send_initial_metadata
2739 ? &calld->send_initial_metadata
2740 : calld->pending_batches[0]
2741 .batch->payload->send_initial_metadata.send_initial_metadata;
2742 calld->pick.initial_metadata_flags =
2743 calld->seen_send_initial_metadata
2744 ? calld->send_initial_metadata_flags
2745 : calld->pending_batches[0]
2746 .batch->payload->send_initial_metadata
2747 .send_initial_metadata_flags;
2748 GRPC_CLOSURE_INIT(&calld->pick_closure, &LbPicker::DoneLocked, elem,
2749 grpc_combiner_scheduler(chand->combiner));
2750 calld->pick.on_complete = &calld->pick_closure;
2751 GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback");
2752 const bool pick_done = chand->lb_policy->PickLocked(&calld->pick);
2753 if (GPR_LIKELY(pick_done)) {
2754 // Pick completed synchronously.
2755 if (grpc_client_channel_trace.enabled()) {
2756 gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed synchronously",
2757 chand, calld);
2758 }
2759 pick_done_locked(elem, GRPC_ERROR_NONE);
2760 GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback");
2761 } else {
2762 // Pick will be returned asynchronously.
2763 // Add the polling entity from call_data to the channel_data's
2764 // interested_parties, so that the I/O of the LB policy can be done
2765 // under it. It will be removed in pick_done_locked().
2766 maybe_add_call_to_channel_interested_parties_locked(elem);
2767 // Request notification on call cancellation.
2768 GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback_cancel");
2769 grpc_call_combiner_set_notify_on_cancel(
2770 calld->call_combiner,
2771 GRPC_CLOSURE_INIT(&calld->pick_cancel_closure,
2772 &LbPicker::CancelLocked, elem,
2773 grpc_combiner_scheduler(chand->combiner)));
2774 }
Mark D. Rothb2929602017-09-11 09:31:11 -07002775 }
Mark D. Rothb2929602017-09-11 09:31:11 -07002776
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002777 private:
2778 // Callback invoked by LoadBalancingPolicy::PickLocked() for async picks.
2779 // Unrefs the LB policy and invokes pick_done_locked().
2780 static void DoneLocked(void* arg, grpc_error* error) {
2781 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2782 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2783 call_data* calld = static_cast<call_data*>(elem->call_data);
2784 if (grpc_client_channel_trace.enabled()) {
2785 gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed asynchronously",
2786 chand, calld);
2787 }
2788 pick_done_locked(elem, GRPC_ERROR_REF(error));
2789 GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback");
Mark D. Rothb2929602017-09-11 09:31:11 -07002790 }
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002791
2792 // Note: This runs under the client_channel combiner, but will NOT be
2793 // holding the call combiner.
2794 static void CancelLocked(void* arg, grpc_error* error) {
2795 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2796 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2797 call_data* calld = static_cast<call_data*>(elem->call_data);
2798 // Note: chand->lb_policy may have changed since we started our pick,
2799 // in which case we will be cancelling the pick on a policy other than
2800 // the one we started it on. However, this will just be a no-op.
2801 if (GPR_UNLIKELY(error != GRPC_ERROR_NONE && chand->lb_policy != nullptr)) {
2802 if (grpc_client_channel_trace.enabled()) {
2803 gpr_log(GPR_INFO,
2804 "chand=%p calld=%p: cancelling pick from LB policy %p", chand,
2805 calld, chand->lb_policy.get());
2806 }
2807 chand->lb_policy->CancelPickLocked(&calld->pick, GRPC_ERROR_REF(error));
2808 }
2809 GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback_cancel");
2810 }
2811};
2812
2813} // namespace grpc_core
Mark D. Rothb2929602017-09-11 09:31:11 -07002814
Mark D. Roth718c8342018-02-28 13:00:04 -08002815// Applies service config to the call. Must be invoked once we know
2816// that the resolver has returned results to the channel.
2817static void apply_service_config_to_call_locked(grpc_call_element* elem) {
2818 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2819 call_data* calld = static_cast<call_data*>(elem->call_data);
2820 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07002821 gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call",
Mark D. Roth718c8342018-02-28 13:00:04 -08002822 chand, calld);
2823 }
2824 if (chand->retry_throttle_data != nullptr) {
Mark D. Roth9db86fc2018-03-28 07:42:20 -07002825 calld->retry_throttle_data = chand->retry_throttle_data->Ref();
Mark D. Roth718c8342018-02-28 13:00:04 -08002826 }
2827 if (chand->method_params_table != nullptr) {
2828 calld->method_params = grpc_core::ServiceConfig::MethodConfigTableLookup(
2829 *chand->method_params_table, calld->path);
2830 if (calld->method_params != nullptr) {
2831 // If the deadline from the service config is shorter than the one
2832 // from the client API, reset the deadline timer.
2833 if (chand->deadline_checking_enabled &&
2834 calld->method_params->timeout() != 0) {
2835 const grpc_millis per_method_deadline =
2836 grpc_timespec_to_millis_round_up(calld->call_start_time) +
2837 calld->method_params->timeout();
2838 if (per_method_deadline < calld->deadline) {
2839 calld->deadline = per_method_deadline;
2840 grpc_deadline_state_reset(elem, calld->deadline);
2841 }
2842 }
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002843 // If the service config set wait_for_ready and the application
2844 // did not explicitly set it, use the value from the service config.
2845 uint32_t* send_initial_metadata_flags =
2846 &calld->pending_batches[0]
2847 .batch->payload->send_initial_metadata
2848 .send_initial_metadata_flags;
2849 if (GPR_UNLIKELY(
2850 calld->method_params->wait_for_ready() !=
2851 ClientChannelMethodParams::WAIT_FOR_READY_UNSET &&
2852 !(*send_initial_metadata_flags &
2853 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET))) {
2854 if (calld->method_params->wait_for_ready() ==
2855 ClientChannelMethodParams::WAIT_FOR_READY_TRUE) {
2856 *send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
2857 } else {
2858 *send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
2859 }
2860 }
Mark D. Roth718c8342018-02-28 13:00:04 -08002861 }
2862 }
2863 // If no retry policy, disable retries.
2864 // TODO(roth): Remove this when adding support for transparent retries.
2865 if (calld->method_params == nullptr ||
2866 calld->method_params->retry_policy() == nullptr) {
2867 calld->enable_retries = false;
2868 }
2869}
2870
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002871// Invoked once resolver results are available.
2872static void process_service_config_and_start_lb_pick_locked(
2873 grpc_call_element* elem) {
Noah Eisenbe82e642018-02-09 09:16:55 -08002874 call_data* calld = static_cast<call_data*>(elem->call_data);
Mark D. Roth718c8342018-02-28 13:00:04 -08002875 // Only get service config data on the first attempt.
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07002876 if (GPR_LIKELY(calld->num_attempts_completed == 0)) {
Mark D. Roth718c8342018-02-28 13:00:04 -08002877 apply_service_config_to_call_locked(elem);
2878 }
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002879 // Start LB pick.
2880 grpc_core::LbPicker::StartLocked(elem);
Mark D. Rothb2929602017-09-11 09:31:11 -07002881}
Mark D. Roth0ca0be82017-06-20 07:49:33 -07002882
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002883namespace grpc_core {
Craig Tiller577c9b22015-11-02 14:11:15 -08002884
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002885// Handles waiting for a resolver result.
2886// Used only for the first call on an idle channel.
2887class ResolverResultWaiter {
2888 public:
2889 explicit ResolverResultWaiter(grpc_call_element* elem) : elem_(elem) {
2890 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2891 call_data* calld = static_cast<call_data*>(elem->call_data);
Craig Tiller6014e8a2017-10-16 13:50:29 -07002892 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002893 gpr_log(GPR_INFO,
2894 "chand=%p calld=%p: deferring pick pending resolver result",
Mark D. Rothb2b9a0f2017-09-01 09:06:47 -07002895 chand, calld);
2896 }
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002897 // Add closure to be run when a resolver result is available.
2898 GRPC_CLOSURE_INIT(&done_closure_, &ResolverResultWaiter::DoneLocked, this,
2899 grpc_combiner_scheduler(chand->combiner));
2900 AddToWaitingList();
2901 // Set cancellation closure, so that we abort if the call is cancelled.
2902 GRPC_CLOSURE_INIT(&cancel_closure_, &ResolverResultWaiter::CancelLocked,
2903 this, grpc_combiner_scheduler(chand->combiner));
2904 grpc_call_combiner_set_notify_on_cancel(calld->call_combiner,
2905 &cancel_closure_);
2906 }
2907
2908 private:
2909 // Adds closure_ to chand->waiting_for_resolver_result_closures.
2910 void AddToWaitingList() {
2911 channel_data* chand = static_cast<channel_data*>(elem_->channel_data);
2912 grpc_closure_list_append(&chand->waiting_for_resolver_result_closures,
2913 &done_closure_, GRPC_ERROR_NONE);
2914 }
2915
2916 // Invoked when a resolver result is available.
2917 static void DoneLocked(void* arg, grpc_error* error) {
2918 ResolverResultWaiter* self = static_cast<ResolverResultWaiter*>(arg);
2919 // If CancelLocked() has already run, delete ourselves without doing
2920 // anything. Note that the call stack may have already been destroyed,
2921 // so it's not safe to access anything in elem_.
2922 if (GPR_UNLIKELY(self->finished_)) {
Mark D. Rothe63e06d2018-03-23 08:12:11 -07002923 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002924 gpr_log(GPR_INFO, "call cancelled before resolver result");
2925 }
2926 Delete(self);
2927 return;
2928 }
2929 // Otherwise, process the resolver result.
2930 grpc_call_element* elem = self->elem_;
2931 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2932 call_data* calld = static_cast<call_data*>(elem->call_data);
2933 if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
2934 if (grpc_client_channel_trace.enabled()) {
2935 gpr_log(GPR_INFO, "chand=%p calld=%p: resolver failed to return data",
Mark D. Rothe63e06d2018-03-23 08:12:11 -07002936 chand, calld);
2937 }
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002938 pick_done_locked(elem, GRPC_ERROR_REF(error));
2939 } else if (GPR_UNLIKELY(chand->resolver == nullptr)) {
2940 // Shutting down.
2941 if (grpc_client_channel_trace.enabled()) {
2942 gpr_log(GPR_INFO, "chand=%p calld=%p: resolver disconnected", chand,
2943 calld);
2944 }
2945 pick_done_locked(elem,
2946 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
2947 } else if (GPR_UNLIKELY(chand->lb_policy == nullptr)) {
2948 // Transient resolver failure.
2949 // If call has wait_for_ready=true, try again; otherwise, fail.
2950 uint32_t send_initial_metadata_flags =
2951 calld->seen_send_initial_metadata
2952 ? calld->send_initial_metadata_flags
2953 : calld->pending_batches[0]
2954 .batch->payload->send_initial_metadata
2955 .send_initial_metadata_flags;
2956 if (send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) {
2957 if (grpc_client_channel_trace.enabled()) {
2958 gpr_log(GPR_INFO,
2959 "chand=%p calld=%p: resolver returned but no LB policy; "
2960 "wait_for_ready=true; trying again",
2961 chand, calld);
2962 }
2963 // Re-add ourselves to the waiting list.
2964 self->AddToWaitingList();
2965 // Return early so that we don't set finished_ to true below.
2966 return;
2967 } else {
2968 if (grpc_client_channel_trace.enabled()) {
2969 gpr_log(GPR_INFO,
2970 "chand=%p calld=%p: resolver returned but no LB policy; "
2971 "wait_for_ready=false; failing",
2972 chand, calld);
2973 }
2974 pick_done_locked(
2975 elem,
2976 grpc_error_set_int(
2977 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Name resolution failure"),
2978 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
2979 }
Mark D. Rothe63e06d2018-03-23 08:12:11 -07002980 } else {
2981 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002982 gpr_log(GPR_INFO, "chand=%p calld=%p: resolver returned, doing LB pick",
Mark D. Rothe63e06d2018-03-23 08:12:11 -07002983 chand, calld);
2984 }
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002985 process_service_config_and_start_lb_pick_locked(elem);
Mark D. Rothe63e06d2018-03-23 08:12:11 -07002986 }
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002987 self->finished_ = true;
Craig Tiller577c9b22015-11-02 14:11:15 -08002988 }
Craig Tiller577c9b22015-11-02 14:11:15 -08002989
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002990 // Invoked when the call is cancelled.
2991 // Note: This runs under the client_channel combiner, but will NOT be
2992 // holding the call combiner.
2993 static void CancelLocked(void* arg, grpc_error* error) {
2994 ResolverResultWaiter* self = static_cast<ResolverResultWaiter*>(arg);
2995 // If DoneLocked() has already run, delete ourselves without doing anything.
2996 if (GPR_LIKELY(self->finished_)) {
2997 Delete(self);
2998 return;
2999 }
3000 // If we are being cancelled, immediately invoke pick_done_locked()
3001 // to propagate the error back to the caller.
3002 if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
3003 grpc_call_element* elem = self->elem_;
3004 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
3005 call_data* calld = static_cast<call_data*>(elem->call_data);
3006 if (grpc_client_channel_trace.enabled()) {
3007 gpr_log(GPR_INFO,
3008 "chand=%p calld=%p: cancelling call waiting for name "
3009 "resolution",
3010 chand, calld);
3011 }
3012 // Note: Although we are not in the call combiner here, we are
3013 // basically stealing the call combiner from the pending pick, so
3014 // it's safe to call pick_done_locked() here -- we are essentially
3015 // calling it here instead of calling it in DoneLocked().
3016 pick_done_locked(elem, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
3017 "Pick cancelled", &error, 1));
3018 }
3019 self->finished_ = true;
Mark D. Roth64a317c2017-05-02 08:27:08 -07003020 }
Mark D. Roth7e0e2022018-06-01 12:04:16 -07003021
3022 grpc_call_element* elem_;
3023 grpc_closure done_closure_;
3024 grpc_closure cancel_closure_;
3025 bool finished_ = false;
3026};
3027
3028} // namespace grpc_core
Mark D. Roth60751fe2017-07-07 12:50:33 -07003029
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003030static void start_pick_locked(void* arg, grpc_error* ignored) {
Noah Eisenbe82e642018-02-09 09:16:55 -08003031 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
3032 call_data* calld = static_cast<call_data*>(elem->call_data);
3033 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
David Garcia Quintasbe1b7f92018-01-12 14:01:38 -08003034 GPR_ASSERT(calld->pick.connected_subchannel == nullptr);
Mark D. Roth718c8342018-02-28 13:00:04 -08003035 GPR_ASSERT(calld->subchannel_call == nullptr);
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07003036 if (GPR_LIKELY(chand->lb_policy != nullptr)) {
Mark D. Roth7e0e2022018-06-01 12:04:16 -07003037 // We already have resolver results, so process the service config
3038 // and start an LB pick.
3039 process_service_config_and_start_lb_pick_locked(elem);
3040 } else if (GPR_UNLIKELY(chand->resolver == nullptr)) {
3041 pick_done_locked(elem,
3042 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
Mark D. Roth76e264b2017-08-25 09:03:33 -07003043 } else {
Mark D. Rothb2929602017-09-11 09:31:11 -07003044 // We do not yet have an LB policy, so wait for a resolver result.
Yash Tibrewal137eb932018-05-23 15:07:39 -07003045 if (GPR_UNLIKELY(!chand->started_resolving)) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003046 start_resolving_locked(chand);
Mark D. Rothb2929602017-09-11 09:31:11 -07003047 }
Mark D. Roth7e0e2022018-06-01 12:04:16 -07003048 // Create a new waiter, which will delete itself when done.
3049 grpc_core::New<grpc_core::ResolverResultWaiter>(elem);
3050 // Add the polling entity from call_data to the channel_data's
3051 // interested_parties, so that the I/O of the resolver can be done
3052 // under it. It will be removed in pick_done_locked().
3053 maybe_add_call_to_channel_interested_parties_locked(elem);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07003054 }
Craig Tillera11bfc82017-02-14 09:56:33 -08003055}
3056
Mark D. Roth718c8342018-02-28 13:00:04 -08003057//
3058// filter call vtable functions
3059//
Mark D. Rothd6d192d2017-02-23 08:58:42 -08003060
Craig Tillere1b51da2017-03-31 15:44:33 -07003061static void cc_start_transport_stream_op_batch(
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003062 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
yang-gce1cfea2018-01-31 15:59:50 -08003063 GPR_TIMER_SCOPE("cc_start_transport_stream_op_batch", 0);
Noah Eisenbe82e642018-02-09 09:16:55 -08003064 call_data* calld = static_cast<call_data*>(elem->call_data);
3065 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Yash Tibrewalcff3f9f2018-05-21 15:51:47 -07003066 if (GPR_LIKELY(chand->deadline_checking_enabled)) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003067 grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch);
Craig Tiller3be7dd02017-04-03 14:30:03 -07003068 }
Mark D. Roth76e264b2017-08-25 09:03:33 -07003069 // If we've previously been cancelled, immediately fail any new batches.
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07003070 if (GPR_UNLIKELY(calld->cancel_error != GRPC_ERROR_NONE)) {
Craig Tiller6014e8a2017-10-16 13:50:29 -07003071 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07003072 gpr_log(GPR_INFO, "chand=%p calld=%p: failing batch with error: %s",
Mark D. Roth718c8342018-02-28 13:00:04 -08003073 chand, calld, grpc_error_string(calld->cancel_error));
Mark D. Roth76e264b2017-08-25 09:03:33 -07003074 }
Mark D. Roth718c8342018-02-28 13:00:04 -08003075 // Note: This will release the call combiner.
Mark D. Roth76e264b2017-08-25 09:03:33 -07003076 grpc_transport_stream_op_batch_finish_with_failure(
Mark D. Roth718c8342018-02-28 13:00:04 -08003077 batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner);
yang-gce1cfea2018-01-31 15:59:50 -08003078 return;
Mark D. Roth76e264b2017-08-25 09:03:33 -07003079 }
Mark D. Roth718c8342018-02-28 13:00:04 -08003080 // Handle cancellation.
Yash Tibrewalcff3f9f2018-05-21 15:51:47 -07003081 if (GPR_UNLIKELY(batch->cancel_stream)) {
Mark D. Roth76e264b2017-08-25 09:03:33 -07003082 // Stash a copy of cancel_error in our call data, so that we can use
3083 // it for subsequent operations. This ensures that if the call is
3084 // cancelled before any batches are passed down (e.g., if the deadline
3085 // is in the past when the call starts), we can return the right
3086 // error to the caller when the first batch does get passed down.
Mark D. Roth718c8342018-02-28 13:00:04 -08003087 GRPC_ERROR_UNREF(calld->cancel_error);
3088 calld->cancel_error =
3089 GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
Craig Tiller6014e8a2017-10-16 13:50:29 -07003090 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07003091 gpr_log(GPR_INFO, "chand=%p calld=%p: recording cancel_error=%s", chand,
Mark D. Roth718c8342018-02-28 13:00:04 -08003092 calld, grpc_error_string(calld->cancel_error));
Mark D. Roth76e264b2017-08-25 09:03:33 -07003093 }
Mark D. Roth718c8342018-02-28 13:00:04 -08003094 // If we do not have a subchannel call (i.e., a pick has not yet
3095 // been started), fail all pending batches. Otherwise, send the
3096 // cancellation down to the subchannel call.
3097 if (calld->subchannel_call == nullptr) {
3098 pending_batches_fail(elem, GRPC_ERROR_REF(calld->cancel_error),
3099 false /* yield_call_combiner */);
3100 // Note: This will release the call combiner.
3101 grpc_transport_stream_op_batch_finish_with_failure(
3102 batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner);
Mark D. Roth76e264b2017-08-25 09:03:33 -07003103 } else {
Mark D. Roth718c8342018-02-28 13:00:04 -08003104 // Note: This will release the call combiner.
3105 grpc_subchannel_call_process_op(calld->subchannel_call, batch);
Mark D. Roth76e264b2017-08-25 09:03:33 -07003106 }
yang-gce1cfea2018-01-31 15:59:50 -08003107 return;
Mark D. Roth76e264b2017-08-25 09:03:33 -07003108 }
Mark D. Roth718c8342018-02-28 13:00:04 -08003109 // Add the batch to the pending list.
3110 pending_batches_add(elem, batch);
Mark D. Roth76e264b2017-08-25 09:03:33 -07003111 // Check if we've already gotten a subchannel call.
3112 // Note that once we have completed the pick, we do not need to enter
3113 // the channel combiner, which is more efficient (especially for
3114 // streaming calls).
Craig Tiller4782d922017-11-10 09:53:21 -08003115 if (calld->subchannel_call != nullptr) {
Craig Tiller6014e8a2017-10-16 13:50:29 -07003116 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07003117 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08003118 "chand=%p calld=%p: starting batch on subchannel_call=%p", chand,
Mark D. Roth76e264b2017-08-25 09:03:33 -07003119 calld, calld->subchannel_call);
Mark D. Roth60751fe2017-07-07 12:50:33 -07003120 }
Mark D. Roth718c8342018-02-28 13:00:04 -08003121 pending_batches_resume(elem);
yang-gce1cfea2018-01-31 15:59:50 -08003122 return;
Mark D. Roth2a5959f2016-09-01 08:20:27 -07003123 }
Mark D. Roth76e264b2017-08-25 09:03:33 -07003124 // We do not yet have a subchannel call.
Mark D. Roth76e264b2017-08-25 09:03:33 -07003125 // For batches containing a send_initial_metadata op, enter the channel
3126 // combiner to start a pick.
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07003127 if (GPR_LIKELY(batch->send_initial_metadata)) {
Craig Tiller6014e8a2017-10-16 13:50:29 -07003128 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07003129 gpr_log(GPR_INFO, "chand=%p calld=%p: entering client_channel combiner",
Mark D. Rothb2929602017-09-11 09:31:11 -07003130 chand, calld);
Mark D. Roth76e264b2017-08-25 09:03:33 -07003131 }
3132 GRPC_CLOSURE_SCHED(
Mark D. Roth76e264b2017-08-25 09:03:33 -07003133 GRPC_CLOSURE_INIT(&batch->handler_private.closure, start_pick_locked,
3134 elem, grpc_combiner_scheduler(chand->combiner)),
3135 GRPC_ERROR_NONE);
3136 } else {
3137 // For all other batches, release the call combiner.
Craig Tiller6014e8a2017-10-16 13:50:29 -07003138 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07003139 gpr_log(GPR_INFO,
Mark D. Roth76e264b2017-08-25 09:03:33 -07003140 "chand=%p calld=%p: saved batch, yeilding call combiner", chand,
3141 calld);
3142 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003143 GRPC_CALL_COMBINER_STOP(calld->call_combiner,
Mark D. Roth76e264b2017-08-25 09:03:33 -07003144 "batch does not include send_initial_metadata");
Mark D. Roth60751fe2017-07-07 12:50:33 -07003145 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -07003146}
3147
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08003148/* Constructor for call_data */
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003149static grpc_error* cc_init_call_elem(grpc_call_element* elem,
Craig Tillerbaa14a92017-11-03 09:09:36 -07003150 const grpc_call_element_args* args) {
Noah Eisenbe82e642018-02-09 09:16:55 -08003151 call_data* calld = static_cast<call_data*>(elem->call_data);
3152 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Mark D. Rothe40dd292016-10-05 14:58:37 -07003153 // Initialize data members.
Craig Tiller7c70b6c2017-01-23 07:48:42 -08003154 calld->path = grpc_slice_ref_internal(args->path);
Mark D. Rothff08f332016-10-14 13:01:01 -07003155 calld->call_start_time = args->start_time;
Craig Tiller89c14282017-07-19 15:32:27 -07003156 calld->deadline = args->deadline;
Craig Tillerd426cac2017-03-13 12:30:45 -07003157 calld->arena = args->arena;
Mark D. Roth66f3d2b2017-09-01 09:02:17 -07003158 calld->owning_call = args->call_stack;
Mark D. Roth76e264b2017-08-25 09:03:33 -07003159 calld->call_combiner = args->call_combiner;
Yash Tibrewalcff3f9f2018-05-21 15:51:47 -07003160 if (GPR_LIKELY(chand->deadline_checking_enabled)) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003161 grpc_deadline_state_init(elem, args->call_stack, args->call_combiner,
3162 calld->deadline);
Craig Tiller3be7dd02017-04-03 14:30:03 -07003163 }
Mark D. Roth718c8342018-02-28 13:00:04 -08003164 calld->enable_retries = chand->enable_retries;
Mark D. Rothefcd45b2018-03-28 10:49:59 -07003165 calld->send_messages.Init();
Mark D. Roth0badbe82016-06-23 10:15:12 -07003166 return GRPC_ERROR_NONE;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08003167}
3168
3169/* Destructor for call_data */
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003170static void cc_destroy_call_elem(grpc_call_element* elem,
Craig Tillerbaa14a92017-11-03 09:09:36 -07003171 const grpc_call_final_info* final_info,
3172 grpc_closure* then_schedule_closure) {
Noah Eisenbe82e642018-02-09 09:16:55 -08003173 call_data* calld = static_cast<call_data*>(elem->call_data);
3174 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Yash Tibrewalcff3f9f2018-05-21 15:51:47 -07003175 if (GPR_LIKELY(chand->deadline_checking_enabled)) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003176 grpc_deadline_state_destroy(elem);
Craig Tiller3be7dd02017-04-03 14:30:03 -07003177 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003178 grpc_slice_unref_internal(calld->path);
Mark D. Roth9db86fc2018-03-28 07:42:20 -07003179 calld->retry_throttle_data.reset();
Mark D. Roth3e7f2df2018-02-26 13:17:06 -08003180 calld->method_params.reset();
Mark D. Roth718c8342018-02-28 13:00:04 -08003181 GRPC_ERROR_UNREF(calld->cancel_error);
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07003182 if (GPR_LIKELY(calld->subchannel_call != nullptr)) {
Mark D. Roth76e264b2017-08-25 09:03:33 -07003183 grpc_subchannel_call_set_cleanup_closure(calld->subchannel_call,
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07003184 then_schedule_closure);
Craig Tiller4782d922017-11-10 09:53:21 -08003185 then_schedule_closure = nullptr;
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003186 GRPC_SUBCHANNEL_CALL_UNREF(calld->subchannel_call,
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07003187 "client_channel_destroy_call");
Mark D. Roth4c0fe492016-08-31 13:51:55 -07003188 }
Mark D. Roth718c8342018-02-28 13:00:04 -08003189 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
3190 GPR_ASSERT(calld->pending_batches[i].batch == nullptr);
3191 }
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07003192 if (GPR_LIKELY(calld->pick.connected_subchannel != nullptr)) {
David Garcia Quintasdfa28512018-01-11 18:31:13 -08003193 calld->pick.connected_subchannel.reset();
Craig Tiller693d3942016-10-27 16:51:25 -07003194 }
Mark D. Roth09e458c2017-05-02 08:13:26 -07003195 for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) {
Mark D. Rothc0febd32018-01-09 10:25:24 -08003196 if (calld->pick.subchannel_call_context[i].value != nullptr) {
3197 calld->pick.subchannel_call_context[i].destroy(
3198 calld->pick.subchannel_call_context[i].value);
Mark D. Roth09e458c2017-05-02 08:13:26 -07003199 }
3200 }
Mark D. Rothefcd45b2018-03-28 10:49:59 -07003201 calld->send_messages.Destroy();
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003202 GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08003203}
3204
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003205static void cc_set_pollset_or_pollset_set(grpc_call_element* elem,
Craig Tillerbaa14a92017-11-03 09:09:36 -07003206 grpc_polling_entity* pollent) {
Noah Eisenbe82e642018-02-09 09:16:55 -08003207 call_data* calld = static_cast<call_data*>(elem->call_data);
David Garcia Quintas2a50dfe2016-05-31 15:09:12 -07003208 calld->pollent = pollent;
Craig Tiller577c9b22015-11-02 14:11:15 -08003209}
3210
Mark D. Roth2a5959f2016-09-01 08:20:27 -07003211/*************************************************************************
3212 * EXPORTED SYMBOLS
3213 */
3214
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08003215const grpc_channel_filter grpc_client_channel_filter = {
Craig Tillera0f3abd2017-03-31 15:42:16 -07003216 cc_start_transport_stream_op_batch,
Craig Tillerf40df232016-03-25 13:38:14 -07003217 cc_start_transport_op,
3218 sizeof(call_data),
Mark D. Roth2a5959f2016-09-01 08:20:27 -07003219 cc_init_call_elem,
David Garcia Quintas4afce7e2016-04-18 16:25:17 -07003220 cc_set_pollset_or_pollset_set,
Mark D. Roth2a5959f2016-09-01 08:20:27 -07003221 cc_destroy_call_elem,
Craig Tillerf40df232016-03-25 13:38:14 -07003222 sizeof(channel_data),
Mark D. Roth2a5959f2016-09-01 08:20:27 -07003223 cc_init_channel_elem,
3224 cc_destroy_channel_elem,
Mark D. Rothb2d24882016-10-27 15:44:07 -07003225 cc_get_channel_info,
Craig Tillerf40df232016-03-25 13:38:14 -07003226 "client-channel",
Craig Tiller87d5b192015-04-16 14:37:57 -07003227};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08003228
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003229static void try_to_connect_locked(void* arg, grpc_error* error_ignored) {
Noah Eisenbe82e642018-02-09 09:16:55 -08003230 channel_data* chand = static_cast<channel_data*>(arg);
Craig Tiller4782d922017-11-10 09:53:21 -08003231 if (chand->lb_policy != nullptr) {
Mark D. Rothc8875492018-02-20 08:33:48 -08003232 chand->lb_policy->ExitIdleLocked();
Craig Tiller613dafa2017-02-09 12:00:43 -08003233 } else {
3234 chand->exit_idle_when_lb_policy_arrives = true;
Craig Tiller4782d922017-11-10 09:53:21 -08003235 if (!chand->started_resolving && chand->resolver != nullptr) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003236 start_resolving_locked(chand);
Craig Tiller613dafa2017-02-09 12:00:43 -08003237 }
3238 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003239 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "try_to_connect");
Craig Tiller613dafa2017-02-09 12:00:43 -08003240}
3241
Craig Tillera82950e2015-09-22 12:33:20 -07003242grpc_connectivity_state grpc_client_channel_check_connectivity_state(
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003243 grpc_channel_element* elem, int try_to_connect) {
Noah Eisenbe82e642018-02-09 09:16:55 -08003244 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Craig Tillera8610c02017-02-14 10:05:11 -08003245 grpc_connectivity_state out =
3246 grpc_connectivity_state_check(&chand->state_tracker);
Craig Tillera82950e2015-09-22 12:33:20 -07003247 if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
Craig Tillerd2e5cfc2017-02-09 13:02:20 -08003248 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect");
ncteisen274bbbe2017-06-08 14:57:11 -07003249 GRPC_CLOSURE_SCHED(
Yash Tibrewal0ee75742017-10-13 16:07:13 -07003250 GRPC_CLOSURE_CREATE(try_to_connect_locked, chand,
3251 grpc_combiner_scheduler(chand->combiner)),
Craig Tiller613dafa2017-02-09 12:00:43 -08003252 GRPC_ERROR_NONE);
Craig Tillera82950e2015-09-22 12:33:20 -07003253 }
Craig Tiller48cb07c2015-07-15 16:16:15 -07003254 return out;
3255}
3256
Alexander Polcync3b1f182017-04-18 13:51:36 -07003257typedef struct external_connectivity_watcher {
Craig Tillerbaa14a92017-11-03 09:09:36 -07003258 channel_data* chand;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07003259 grpc_polling_entity pollent;
Craig Tillerbaa14a92017-11-03 09:09:36 -07003260 grpc_closure* on_complete;
3261 grpc_closure* watcher_timer_init;
3262 grpc_connectivity_state* state;
Craig Tiller86c99582015-11-25 15:22:26 -08003263 grpc_closure my_closure;
Craig Tillerbaa14a92017-11-03 09:09:36 -07003264 struct external_connectivity_watcher* next;
Craig Tiller86c99582015-11-25 15:22:26 -08003265} external_connectivity_watcher;
3266
Craig Tillerbaa14a92017-11-03 09:09:36 -07003267static external_connectivity_watcher* lookup_external_connectivity_watcher(
3268 channel_data* chand, grpc_closure* on_complete) {
Alexander Polcync3b1f182017-04-18 13:51:36 -07003269 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
Craig Tillerbaa14a92017-11-03 09:09:36 -07003270 external_connectivity_watcher* w =
Alexander Polcync3b1f182017-04-18 13:51:36 -07003271 chand->external_connectivity_watcher_list_head;
Craig Tiller4782d922017-11-10 09:53:21 -08003272 while (w != nullptr && w->on_complete != on_complete) {
Alexander Polcync3b1f182017-04-18 13:51:36 -07003273 w = w->next;
3274 }
3275 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
3276 return w;
3277}
3278
3279static void external_connectivity_watcher_list_append(
Craig Tillerbaa14a92017-11-03 09:09:36 -07003280 channel_data* chand, external_connectivity_watcher* w) {
Alexander Polcync3b1f182017-04-18 13:51:36 -07003281 GPR_ASSERT(!lookup_external_connectivity_watcher(chand, w->on_complete));
3282
3283 gpr_mu_lock(&w->chand->external_connectivity_watcher_list_mu);
3284 GPR_ASSERT(!w->next);
3285 w->next = chand->external_connectivity_watcher_list_head;
3286 chand->external_connectivity_watcher_list_head = w;
3287 gpr_mu_unlock(&w->chand->external_connectivity_watcher_list_mu);
3288}
3289
3290static void external_connectivity_watcher_list_remove(
Craig Tillerbaa14a92017-11-03 09:09:36 -07003291 channel_data* chand, external_connectivity_watcher* too_remove) {
Alexander Polcync3b1f182017-04-18 13:51:36 -07003292 GPR_ASSERT(
3293 lookup_external_connectivity_watcher(chand, too_remove->on_complete));
3294 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
3295 if (too_remove == chand->external_connectivity_watcher_list_head) {
3296 chand->external_connectivity_watcher_list_head = too_remove->next;
3297 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
3298 return;
3299 }
Craig Tillerbaa14a92017-11-03 09:09:36 -07003300 external_connectivity_watcher* w =
Alexander Polcync3b1f182017-04-18 13:51:36 -07003301 chand->external_connectivity_watcher_list_head;
Craig Tiller4782d922017-11-10 09:53:21 -08003302 while (w != nullptr) {
Alexander Polcync3b1f182017-04-18 13:51:36 -07003303 if (w->next == too_remove) {
3304 w->next = w->next->next;
3305 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
3306 return;
3307 }
3308 w = w->next;
3309 }
3310 GPR_UNREACHABLE_CODE(return );
3311}
3312
3313int grpc_client_channel_num_external_connectivity_watchers(
Craig Tillerbaa14a92017-11-03 09:09:36 -07003314 grpc_channel_element* elem) {
Noah Eisenbe82e642018-02-09 09:16:55 -08003315 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Alexander Polcync3b1f182017-04-18 13:51:36 -07003316 int count = 0;
3317
3318 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
Craig Tillerbaa14a92017-11-03 09:09:36 -07003319 external_connectivity_watcher* w =
Alexander Polcync3b1f182017-04-18 13:51:36 -07003320 chand->external_connectivity_watcher_list_head;
Craig Tiller4782d922017-11-10 09:53:21 -08003321 while (w != nullptr) {
Alexander Polcync3b1f182017-04-18 13:51:36 -07003322 count++;
3323 w = w->next;
3324 }
3325 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
3326
3327 return count;
3328}
3329
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003330static void on_external_watch_complete_locked(void* arg, grpc_error* error) {
Noah Eisen4d20a662018-02-09 09:34:04 -08003331 external_connectivity_watcher* w =
3332 static_cast<external_connectivity_watcher*>(arg);
Craig Tillerbaa14a92017-11-03 09:09:36 -07003333 grpc_closure* follow_up = w->on_complete;
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003334 grpc_polling_entity_del_from_pollset_set(&w->pollent,
David Garcia Quintas87d5a312017-06-06 19:45:58 -07003335 w->chand->interested_parties);
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003336 GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack,
Craig Tiller1d881fb2015-12-01 07:39:04 -08003337 "external_connectivity_watcher");
Alexander Polcync3b1f182017-04-18 13:51:36 -07003338 external_connectivity_watcher_list_remove(w->chand, w);
Craig Tiller86c99582015-11-25 15:22:26 -08003339 gpr_free(w);
Yash Tibrewal2629f462018-04-30 14:52:31 -07003340 GRPC_CLOSURE_SCHED(follow_up, GRPC_ERROR_REF(error));
Craig Tiller613dafa2017-02-09 12:00:43 -08003341}
3342
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003343static void watch_connectivity_state_locked(void* arg,
Craig Tillerbaa14a92017-11-03 09:09:36 -07003344 grpc_error* error_ignored) {
Noah Eisen4d20a662018-02-09 09:34:04 -08003345 external_connectivity_watcher* w =
3346 static_cast<external_connectivity_watcher*>(arg);
Craig Tiller4782d922017-11-10 09:53:21 -08003347 external_connectivity_watcher* found = nullptr;
3348 if (w->state != nullptr) {
Alexander Polcync3b1f182017-04-18 13:51:36 -07003349 external_connectivity_watcher_list_append(w->chand, w);
Yash Tibrewal446d1ea2018-04-30 16:58:21 -07003350 // An assumption is being made that the closure is scheduled on the exec ctx
3351 // scheduler and that GRPC_CLOSURE_RUN would run the closure immediately.
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003352 GRPC_CLOSURE_RUN(w->watcher_timer_init, GRPC_ERROR_NONE);
Alexander Polcyn2004e392017-10-16 15:14:46 -07003353 GRPC_CLOSURE_INIT(&w->my_closure, on_external_watch_complete_locked, w,
3354 grpc_combiner_scheduler(w->chand->combiner));
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003355 grpc_connectivity_state_notify_on_state_change(&w->chand->state_tracker,
3356 w->state, &w->my_closure);
Alexander Polcync3b1f182017-04-18 13:51:36 -07003357 } else {
Craig Tiller4782d922017-11-10 09:53:21 -08003358 GPR_ASSERT(w->watcher_timer_init == nullptr);
Alexander Polcync3b1f182017-04-18 13:51:36 -07003359 found = lookup_external_connectivity_watcher(w->chand, w->on_complete);
3360 if (found) {
3361 GPR_ASSERT(found->on_complete == w->on_complete);
3362 grpc_connectivity_state_notify_on_state_change(
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003363 &found->chand->state_tracker, nullptr, &found->my_closure);
Alexander Polcync3b1f182017-04-18 13:51:36 -07003364 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003365 grpc_polling_entity_del_from_pollset_set(&w->pollent,
David Garcia Quintas87d5a312017-06-06 19:45:58 -07003366 w->chand->interested_parties);
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003367 GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack,
Alexander Polcync3b1f182017-04-18 13:51:36 -07003368 "external_connectivity_watcher");
3369 gpr_free(w);
3370 }
Craig Tiller86c99582015-11-25 15:22:26 -08003371}
3372
Craig Tillera82950e2015-09-22 12:33:20 -07003373void grpc_client_channel_watch_connectivity_state(
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003374 grpc_channel_element* elem, grpc_polling_entity pollent,
3375 grpc_connectivity_state* state, grpc_closure* closure,
3376 grpc_closure* watcher_timer_init) {
Noah Eisenbe82e642018-02-09 09:16:55 -08003377 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Craig Tillerbaa14a92017-11-03 09:09:36 -07003378 external_connectivity_watcher* w =
Noah Eisenbe82e642018-02-09 09:16:55 -08003379 static_cast<external_connectivity_watcher*>(gpr_zalloc(sizeof(*w)));
Craig Tiller86c99582015-11-25 15:22:26 -08003380 w->chand = chand;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07003381 w->pollent = pollent;
Mark D. Roth92210832017-05-02 15:04:39 -07003382 w->on_complete = closure;
Craig Tiller613dafa2017-02-09 12:00:43 -08003383 w->state = state;
Alexander Polcync3b1f182017-04-18 13:51:36 -07003384 w->watcher_timer_init = watcher_timer_init;
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003385 grpc_polling_entity_add_to_pollset_set(&w->pollent,
David Garcia Quintas87d5a312017-06-06 19:45:58 -07003386 chand->interested_parties);
Craig Tiller1d881fb2015-12-01 07:39:04 -08003387 GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
3388 "external_connectivity_watcher");
ncteisen274bbbe2017-06-08 14:57:11 -07003389 GRPC_CLOSURE_SCHED(
ncteisen274bbbe2017-06-08 14:57:11 -07003390 GRPC_CLOSURE_INIT(&w->my_closure, watch_connectivity_state_locked, w,
Craig Tilleree4b1452017-05-12 10:56:03 -07003391 grpc_combiner_scheduler(chand->combiner)),
Craig Tiller613dafa2017-02-09 12:00:43 -08003392 GRPC_ERROR_NONE);
Craig Tiller48cb07c2015-07-15 16:16:15 -07003393}
Mark D. Roth718c8342018-02-28 13:00:04 -08003394
3395grpc_subchannel_call* grpc_client_channel_get_subchannel_call(
3396 grpc_call_element* elem) {
3397 call_data* calld = static_cast<call_data*>(elem->call_data);
3398 return calld->subchannel_call;
3399}