blob: 024c9d737e89b673243d535daf53e0a7ef57edc7 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2015 gRPC authors.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080016 *
17 */
18
Yash Tibrewal37fdb732017-09-25 16:45:02 -070019#include <grpc/support/port_platform.h>
20
Craig Tiller9eb0fde2017-03-31 16:59:30 -070021#include "src/core/ext/filters/client_channel/client_channel.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080022
Yash Tibrewalfcd26bc2017-09-25 15:08:28 -070023#include <inttypes.h>
Mark D. Roth718c8342018-02-28 13:00:04 -080024#include <limits.h>
Mark D. Roth4c0fe492016-08-31 13:51:55 -070025#include <stdbool.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080026#include <stdio.h>
Craig Tillereb3b12e2015-06-26 14:42:49 -070027#include <string.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080028
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080029#include <grpc/support/alloc.h>
30#include <grpc/support/log.h>
Mark D. Rothb2d24882016-10-27 15:44:07 -070031#include <grpc/support/string_util.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080032#include <grpc/support/sync.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080033
Yuchen Zeng0bad30a2017-10-05 21:47:39 -070034#include "src/core/ext/filters/client_channel/backup_poller.h"
Craig Tiller9eb0fde2017-03-31 16:59:30 -070035#include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
36#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
Mark D. Roth3e7f2df2018-02-26 13:17:06 -080037#include "src/core/ext/filters/client_channel/method_params.h"
Craig Tiller9eb0fde2017-03-31 16:59:30 -070038#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
39#include "src/core/ext/filters/client_channel/resolver_registry.h"
40#include "src/core/ext/filters/client_channel/retry_throttle.h"
41#include "src/core/ext/filters/client_channel/subchannel.h"
Craig Tiller3be7dd02017-04-03 14:30:03 -070042#include "src/core/ext/filters/deadline/deadline_filter.h"
Mark D. Roth718c8342018-02-28 13:00:04 -080043#include "src/core/lib/backoff/backoff.h"
Craig Tiller9533d042016-03-25 17:11:06 -070044#include "src/core/lib/channel/channel_args.h"
45#include "src/core/lib/channel/connected_channel.h"
ncteisen3b42f832018-03-19 13:22:35 -070046#include "src/core/lib/channel/status_util.h"
Mark D. Rothdbdf4952018-01-18 11:21:12 -080047#include "src/core/lib/gpr/string.h"
Mark D. Roth718c8342018-02-28 13:00:04 -080048#include "src/core/lib/gprpp/inlined_vector.h"
49#include "src/core/lib/gprpp/manual_constructor.h"
Craig Tillerbefafe62017-02-09 11:30:54 -080050#include "src/core/lib/iomgr/combiner.h"
Craig Tiller9533d042016-03-25 17:11:06 -070051#include "src/core/lib/iomgr/iomgr.h"
Mark D. Roth4c0fe492016-08-31 13:51:55 -070052#include "src/core/lib/iomgr/polling_entity.h"
Craig Tiller9533d042016-03-25 17:11:06 -070053#include "src/core/lib/profiling/timers.h"
Craig Tiller7c70b6c2017-01-23 07:48:42 -080054#include "src/core/lib/slice/slice_internal.h"
Mark D. Roth718c8342018-02-28 13:00:04 -080055#include "src/core/lib/slice/slice_string_helpers.h"
Craig Tiller9533d042016-03-25 17:11:06 -070056#include "src/core/lib/surface/channel.h"
57#include "src/core/lib/transport/connectivity_state.h"
Mark D. Roth718c8342018-02-28 13:00:04 -080058#include "src/core/lib/transport/error_utils.h"
Mark D. Roth9fe284e2016-09-12 11:22:27 -070059#include "src/core/lib/transport/metadata.h"
60#include "src/core/lib/transport/metadata_batch.h"
Mark D. Rothea846a02016-11-03 11:32:54 -070061#include "src/core/lib/transport/service_config.h"
Mark D. Roth9fe284e2016-09-12 11:22:27 -070062#include "src/core/lib/transport/static_metadata.h"
Mark D. Roth718c8342018-02-28 13:00:04 -080063#include "src/core/lib/transport/status_metadata.h"
Craig Tiller8910ac62015-10-08 16:49:15 -070064
Mark D. Roth3e7f2df2018-02-26 13:17:06 -080065using grpc_core::internal::ClientChannelMethodParams;
Mark D. Roth9db86fc2018-03-28 07:42:20 -070066using grpc_core::internal::ServerRetryThrottleData;
Mark D. Roth3e7f2df2018-02-26 13:17:06 -080067
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080068/* Client channel implementation */
69
Mark D. Roth718c8342018-02-28 13:00:04 -080070// By default, we buffer 256 KiB per RPC for retries.
71// TODO(roth): Do we have any data to suggest a better value?
72#define DEFAULT_PER_RPC_RETRY_BUFFER_SIZE (256 << 10)
73
74// This value was picked arbitrarily. It can be changed if there is
75// any even moderately compelling reason to do so.
76#define RETRY_BACKOFF_JITTER 0.2
77
Craig Tiller694580f2017-10-18 14:48:14 -070078grpc_core::TraceFlag grpc_client_channel_trace(false, "client_channel");
Mark D. Roth60751fe2017-07-07 12:50:33 -070079
Mark D. Roth26b7be42016-10-24 10:08:07 -070080/*************************************************************************
Mark D. Roth3e7f2df2018-02-26 13:17:06 -080081 * CHANNEL-WIDE FUNCTIONS
Mark D. Roth26b7be42016-10-24 10:08:07 -070082 */
83
Alexander Polcync3b1f182017-04-18 13:51:36 -070084struct external_connectivity_watcher;
85
Mark D. Roth3e7f2df2018-02-26 13:17:06 -080086typedef grpc_core::SliceHashTable<
87 grpc_core::RefCountedPtr<ClientChannelMethodParams>>
88 MethodParamsTable;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080089
Craig Tiller800dacb2015-10-06 09:10:26 -070090typedef struct client_channel_channel_data {
Mark D. Roth209f6442018-02-08 10:26:46 -080091 grpc_core::OrphanablePtr<grpc_core::Resolver> resolver;
Mark D. Roth4c0fe492016-08-31 13:51:55 -070092 bool started_resolving;
Craig Tiller3be7dd02017-04-03 14:30:03 -070093 bool deadline_checking_enabled;
Craig Tillerbaa14a92017-11-03 09:09:36 -070094 grpc_client_channel_factory* client_channel_factory;
Mark D. Roth718c8342018-02-28 13:00:04 -080095 bool enable_retries;
96 size_t per_rpc_retry_buffer_size;
Craig Tillerf5f17122015-06-25 08:47:26 -070097
Craig Tillerbefafe62017-02-09 11:30:54 -080098 /** combiner protecting all variables below in this data structure */
Craig Tillerbaa14a92017-11-03 09:09:36 -070099 grpc_combiner* combiner;
Mark D. Roth046cf762016-09-26 11:13:51 -0700100 /** currently active load balancer */
Mark D. Rothc8875492018-02-20 08:33:48 -0800101 grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> lb_policy;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800102 /** retry throttle data */
Mark D. Roth9db86fc2018-03-28 07:42:20 -0700103 grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
Mark D. Roth9d480942016-10-19 14:18:05 -0700104 /** maps method names to method_parameters structs */
Mark D. Roth3e7f2df2018-02-26 13:17:06 -0800105 grpc_core::RefCountedPtr<MethodParamsTable> method_params_table;
Mark D. Roth046cf762016-09-26 11:13:51 -0700106 /** incoming resolver result - set by resolver.next() */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700107 grpc_channel_args* resolver_result;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700108 /** a list of closures that are all waiting for resolver result to come in */
109 grpc_closure_list waiting_for_resolver_result_closures;
Craig Tiller3f475422015-06-25 10:43:05 -0700110 /** resolver callback */
Mark D. Rothff4df062016-08-22 15:02:49 -0700111 grpc_closure on_resolver_result_changed;
Craig Tiller3f475422015-06-25 10:43:05 -0700112 /** connectivity state being tracked */
Craig Tillerca3e9d32015-06-27 18:37:27 -0700113 grpc_connectivity_state_tracker state_tracker;
Craig Tiller48cb07c2015-07-15 16:16:15 -0700114 /** when an lb_policy arrives, should we try to exit idle */
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700115 bool exit_idle_when_lb_policy_arrives;
Craig Tiller906e3bc2015-11-24 07:31:31 -0800116 /** owning stack */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700117 grpc_channel_stack* owning_stack;
Craig Tiller69b093b2016-02-25 19:04:07 -0800118 /** interested parties (owned) */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700119 grpc_pollset_set* interested_parties;
Craig Tiller613dafa2017-02-09 12:00:43 -0800120
Alexander Polcync3b1f182017-04-18 13:51:36 -0700121 /* external_connectivity_watcher_list head is guarded by its own mutex, since
122 * counts need to be grabbed immediately without polling on a cq */
123 gpr_mu external_connectivity_watcher_list_mu;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700124 struct external_connectivity_watcher* external_connectivity_watcher_list_head;
Alexander Polcync3b1f182017-04-18 13:51:36 -0700125
Mark D. Roth718c8342018-02-28 13:00:04 -0800126 /* the following properties are guarded by a mutex since APIs require them
Craig Tiller46dd7902017-02-23 09:42:16 -0800127 to be instantaneously available */
Craig Tiller613dafa2017-02-09 12:00:43 -0800128 gpr_mu info_mu;
Mark D. Roth4f0dec72018-07-11 11:46:11 -0700129 grpc_core::UniquePtr<char> info_lb_policy_name;
Craig Tiller613dafa2017-02-09 12:00:43 -0800130 /** service config in JSON form */
Mark D. Roth4f0dec72018-07-11 11:46:11 -0700131 grpc_core::UniquePtr<char> info_service_config_json;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800132} channel_data;
133
Juanli Shen592cf342017-12-04 20:52:01 -0800134typedef struct {
135 channel_data* chand;
136 /** used as an identifier, don't dereference it because the LB policy may be
137 * non-existing when the callback is run */
Mark D. Rothc8875492018-02-20 08:33:48 -0800138 grpc_core::LoadBalancingPolicy* lb_policy;
Juanli Shen592cf342017-12-04 20:52:01 -0800139 grpc_closure closure;
140} reresolution_request_args;
141
Craig Tillerd6c98df2015-08-18 09:33:44 -0700142/** We create one watcher for each new lb_policy that is returned from a
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700143 resolver, to watch for state changes from the lb_policy. When a state
144 change is seen, we update the channel, and create a new watcher. */
Craig Tillera82950e2015-09-22 12:33:20 -0700145typedef struct {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700146 channel_data* chand;
Craig Tiller33825112015-09-18 07:44:19 -0700147 grpc_closure on_changed;
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700148 grpc_connectivity_state state;
Mark D. Rothc8875492018-02-20 08:33:48 -0800149 grpc_core::LoadBalancingPolicy* lb_policy;
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700150} lb_policy_connectivity_watcher;
151
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800152static void watch_lb_policy_locked(channel_data* chand,
Mark D. Rothc8875492018-02-20 08:33:48 -0800153 grpc_core::LoadBalancingPolicy* lb_policy,
Craig Tiller2400bf52017-02-09 16:25:19 -0800154 grpc_connectivity_state current_state);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700155
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800156static void set_channel_connectivity_state_locked(channel_data* chand,
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800157 grpc_connectivity_state state,
Craig Tillerbaa14a92017-11-03 09:09:36 -0700158 grpc_error* error,
159 const char* reason) {
David Garcia Quintas37251282017-04-14 13:46:03 -0700160 /* TODO: Improve failure handling:
161 * - Make it possible for policies to return GRPC_CHANNEL_TRANSIENT_FAILURE.
162 * - Hand over pending picks from old policies during the switch that happens
163 * when resolver provides an update. */
Craig Tiller4782d922017-11-10 09:53:21 -0800164 if (chand->lb_policy != nullptr) {
David Garcia Quintas956f7002017-04-13 15:40:06 -0700165 if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
166 /* cancel picks with wait_for_ready=false */
Mark D. Rothc8875492018-02-20 08:33:48 -0800167 chand->lb_policy->CancelMatchingPicksLocked(
David Garcia Quintas956f7002017-04-13 15:40:06 -0700168 /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY,
169 /* check= */ 0, GRPC_ERROR_REF(error));
170 } else if (state == GRPC_CHANNEL_SHUTDOWN) {
171 /* cancel all picks */
Mark D. Rothc8875492018-02-20 08:33:48 -0800172 chand->lb_policy->CancelMatchingPicksLocked(/* mask= */ 0, /* check= */ 0,
173 GRPC_ERROR_REF(error));
David Garcia Quintas956f7002017-04-13 15:40:06 -0700174 }
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800175 }
Craig Tiller6014e8a2017-10-16 13:50:29 -0700176 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -0700177 gpr_log(GPR_INFO, "chand=%p: setting connectivity state to %s", chand,
Mark D. Roth60751fe2017-07-07 12:50:33 -0700178 grpc_connectivity_state_name(state));
179 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800180 grpc_connectivity_state_set(&chand->state_tracker, state, error, reason);
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800181}
182
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800183static void on_lb_policy_state_changed_locked(void* arg, grpc_error* error) {
Noah Eisen4d20a662018-02-09 09:34:04 -0800184 lb_policy_connectivity_watcher* w =
185 static_cast<lb_policy_connectivity_watcher*>(arg);
Craig Tillerc5de8352017-02-09 14:08:05 -0800186 /* check if the notification is for the latest policy */
Mark D. Rothc8875492018-02-20 08:33:48 -0800187 if (w->lb_policy == w->chand->lb_policy.get()) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700188 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -0700189 gpr_log(GPR_INFO, "chand=%p: lb_policy=%p state changed to %s", w->chand,
Mark D. Roth60751fe2017-07-07 12:50:33 -0700190 w->lb_policy, grpc_connectivity_state_name(w->state));
191 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800192 set_channel_connectivity_state_locked(w->chand, w->state,
Craig Tillerc5de8352017-02-09 14:08:05 -0800193 GRPC_ERROR_REF(error), "lb_changed");
194 if (w->state != GRPC_CHANNEL_SHUTDOWN) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800195 watch_lb_policy_locked(w->chand, w->lb_policy, w->state);
Craig Tillerc5de8352017-02-09 14:08:05 -0800196 }
Craig Tillera82950e2015-09-22 12:33:20 -0700197 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800198 GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack, "watch_lb_policy");
Craig Tillera82950e2015-09-22 12:33:20 -0700199 gpr_free(w);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700200}
201
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800202static void watch_lb_policy_locked(channel_data* chand,
Mark D. Rothc8875492018-02-20 08:33:48 -0800203 grpc_core::LoadBalancingPolicy* lb_policy,
Craig Tiller2400bf52017-02-09 16:25:19 -0800204 grpc_connectivity_state current_state) {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700205 lb_policy_connectivity_watcher* w =
Noah Eisenbe82e642018-02-09 09:16:55 -0800206 static_cast<lb_policy_connectivity_watcher*>(gpr_malloc(sizeof(*w)));
Craig Tiller906e3bc2015-11-24 07:31:31 -0800207 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy");
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700208 w->chand = chand;
ncteisen274bbbe2017-06-08 14:57:11 -0700209 GRPC_CLOSURE_INIT(&w->on_changed, on_lb_policy_state_changed_locked, w,
Craig Tilleree4b1452017-05-12 10:56:03 -0700210 grpc_combiner_scheduler(chand->combiner));
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700211 w->state = current_state;
212 w->lb_policy = lb_policy;
Mark D. Rothc8875492018-02-20 08:33:48 -0800213 lb_policy->NotifyOnStateChangeLocked(&w->state, &w->on_changed);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700214}
215
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800216static void start_resolving_locked(channel_data* chand) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700217 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -0700218 gpr_log(GPR_INFO, "chand=%p: starting name resolution", chand);
Mark D. Roth60751fe2017-07-07 12:50:33 -0700219 }
220 GPR_ASSERT(!chand->started_resolving);
221 chand->started_resolving = true;
222 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
Mark D. Roth209f6442018-02-08 10:26:46 -0800223 chand->resolver->NextLocked(&chand->resolver_result,
224 &chand->on_resolver_result_changed);
Mark D. Roth60751fe2017-07-07 12:50:33 -0700225}
226
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800227typedef struct {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700228 char* server_name;
Mark D. Roth9db86fc2018-03-28 07:42:20 -0700229 grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800230} service_config_parsing_state;
231
Mark D. Roth3e7f2df2018-02-26 13:17:06 -0800232static void parse_retry_throttle_params(
233 const grpc_json* field, service_config_parsing_state* parsing_state) {
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800234 if (strcmp(field->key, "retryThrottling") == 0) {
Craig Tiller4782d922017-11-10 09:53:21 -0800235 if (parsing_state->retry_throttle_data != nullptr) return; // Duplicate.
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800236 if (field->type != GRPC_JSON_OBJECT) return;
237 int max_milli_tokens = 0;
238 int milli_token_ratio = 0;
Craig Tiller4782d922017-11-10 09:53:21 -0800239 for (grpc_json* sub_field = field->child; sub_field != nullptr;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800240 sub_field = sub_field->next) {
Craig Tiller4782d922017-11-10 09:53:21 -0800241 if (sub_field->key == nullptr) return;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800242 if (strcmp(sub_field->key, "maxTokens") == 0) {
243 if (max_milli_tokens != 0) return; // Duplicate.
244 if (sub_field->type != GRPC_JSON_NUMBER) return;
245 max_milli_tokens = gpr_parse_nonnegative_int(sub_field->value);
246 if (max_milli_tokens == -1) return;
247 max_milli_tokens *= 1000;
248 } else if (strcmp(sub_field->key, "tokenRatio") == 0) {
249 if (milli_token_ratio != 0) return; // Duplicate.
250 if (sub_field->type != GRPC_JSON_NUMBER) return;
251 // We support up to 3 decimal digits.
252 size_t whole_len = strlen(sub_field->value);
253 uint32_t multiplier = 1;
254 uint32_t decimal_value = 0;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700255 const char* decimal_point = strchr(sub_field->value, '.');
Craig Tiller4782d922017-11-10 09:53:21 -0800256 if (decimal_point != nullptr) {
Noah Eisenbe82e642018-02-09 09:16:55 -0800257 whole_len = static_cast<size_t>(decimal_point - sub_field->value);
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800258 multiplier = 1000;
259 size_t decimal_len = strlen(decimal_point + 1);
260 if (decimal_len > 3) decimal_len = 3;
261 if (!gpr_parse_bytes_to_uint32(decimal_point + 1, decimal_len,
262 &decimal_value)) {
263 return;
264 }
265 uint32_t decimal_multiplier = 1;
266 for (size_t i = 0; i < (3 - decimal_len); ++i) {
267 decimal_multiplier *= 10;
268 }
269 decimal_value *= decimal_multiplier;
270 }
271 uint32_t whole_value;
272 if (!gpr_parse_bytes_to_uint32(sub_field->value, whole_len,
273 &whole_value)) {
274 return;
275 }
Noah Eisen4d20a662018-02-09 09:34:04 -0800276 milli_token_ratio =
277 static_cast<int>((whole_value * multiplier) + decimal_value);
Mark D. Rothb3322562017-02-23 14:38:02 -0800278 if (milli_token_ratio <= 0) return;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800279 }
280 }
281 parsing_state->retry_throttle_data =
Mark D. Roth9db86fc2018-03-28 07:42:20 -0700282 grpc_core::internal::ServerRetryThrottleMap::GetDataForServer(
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800283 parsing_state->server_name, max_milli_tokens, milli_token_ratio);
284 }
285}
286
Mark D. Roth4f0dec72018-07-11 11:46:11 -0700287// Invoked from the resolver NextLocked() callback when the resolver
288// is shutting down.
289static void on_resolver_shutdown_locked(channel_data* chand,
290 grpc_error* error) {
291 if (grpc_client_channel_trace.enabled()) {
292 gpr_log(GPR_INFO, "chand=%p: shutting down", chand);
293 }
294 if (chand->lb_policy != nullptr) {
295 if (grpc_client_channel_trace.enabled()) {
296 gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", chand,
297 chand->lb_policy.get());
298 }
299 grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
300 chand->interested_parties);
301 chand->lb_policy.reset();
302 }
303 if (chand->resolver != nullptr) {
304 // This should never happen; it can only be triggered by a resolver
305 // implementation spotaneously deciding to report shutdown without
306 // being orphaned. This code is included just to be defensive.
307 if (grpc_client_channel_trace.enabled()) {
308 gpr_log(GPR_INFO, "chand=%p: spontaneous shutdown from resolver %p",
309 chand, chand->resolver.get());
310 }
311 chand->resolver.reset();
312 set_channel_connectivity_state_locked(
313 chand, GRPC_CHANNEL_SHUTDOWN,
314 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
315 "Resolver spontaneous shutdown", &error, 1),
316 "resolver_spontaneous_shutdown");
317 }
318 grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures,
319 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
320 "Channel disconnected", &error, 1));
321 GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
322 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "resolver");
323 grpc_channel_args_destroy(chand->resolver_result);
324 chand->resolver_result = nullptr;
325 GRPC_ERROR_UNREF(error);
326}
327
328// Returns the LB policy name from the resolver result.
329static grpc_core::UniquePtr<char>
330get_lb_policy_name_from_resolver_result_locked(channel_data* chand) {
331 // Find LB policy name in channel args.
332 const grpc_arg* channel_arg =
333 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_POLICY_NAME);
334 const char* lb_policy_name = grpc_channel_arg_get_string(channel_arg);
335 // Special case: If at least one balancer address is present, we use
336 // the grpclb policy, regardless of what the resolver actually specified.
337 channel_arg =
338 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES);
339 if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) {
340 grpc_lb_addresses* addresses =
341 static_cast<grpc_lb_addresses*>(channel_arg->value.pointer.p);
342 if (grpc_lb_addresses_contains_balancer_address(*addresses)) {
343 if (lb_policy_name != nullptr &&
344 gpr_stricmp(lb_policy_name, "grpclb") != 0) {
345 gpr_log(GPR_INFO,
346 "resolver requested LB policy %s but provided at least one "
347 "balancer address -- forcing use of grpclb LB policy",
348 lb_policy_name);
349 }
350 lb_policy_name = "grpclb";
351 }
352 }
353 // Use pick_first if nothing was specified and we didn't select grpclb
354 // above.
355 if (lb_policy_name == nullptr) lb_policy_name = "pick_first";
356 return grpc_core::UniquePtr<char>(gpr_strdup(lb_policy_name));
357}
358
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800359static void request_reresolution_locked(void* arg, grpc_error* error) {
Noah Eisen4d20a662018-02-09 09:34:04 -0800360 reresolution_request_args* args =
361 static_cast<reresolution_request_args*>(arg);
Juanli Shen592cf342017-12-04 20:52:01 -0800362 channel_data* chand = args->chand;
363 // If this invocation is for a stale LB policy, treat it as an LB shutdown
364 // signal.
Mark D. Rothc8875492018-02-20 08:33:48 -0800365 if (args->lb_policy != chand->lb_policy.get() || error != GRPC_ERROR_NONE ||
Juanli Shen592cf342017-12-04 20:52:01 -0800366 chand->resolver == nullptr) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800367 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "re-resolution");
Juanli Shen592cf342017-12-04 20:52:01 -0800368 gpr_free(args);
369 return;
370 }
371 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -0700372 gpr_log(GPR_INFO, "chand=%p: started name re-resolving", chand);
Juanli Shen592cf342017-12-04 20:52:01 -0800373 }
Mark D. Roth209f6442018-02-08 10:26:46 -0800374 chand->resolver->RequestReresolutionLocked();
Juanli Shen592cf342017-12-04 20:52:01 -0800375 // Give back the closure to the LB policy.
Mark D. Rothc8875492018-02-20 08:33:48 -0800376 chand->lb_policy->SetReresolutionClosureLocked(&args->closure);
Juanli Shen592cf342017-12-04 20:52:01 -0800377}
378
Mark D. Roth4f0dec72018-07-11 11:46:11 -0700379// Creates a new LB policy, replacing any previous one.
380// If the new policy is created successfully, sets *connectivity_state and
381// *connectivity_error to its initial connectivity state; otherwise,
382// leaves them unchanged.
383static void create_new_lb_policy_locked(
384 channel_data* chand, char* lb_policy_name,
385 grpc_connectivity_state* connectivity_state,
386 grpc_error** connectivity_error) {
387 grpc_core::LoadBalancingPolicy::Args lb_policy_args;
388 lb_policy_args.combiner = chand->combiner;
389 lb_policy_args.client_channel_factory = chand->client_channel_factory;
390 lb_policy_args.args = chand->resolver_result;
391 grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> new_lb_policy =
392 grpc_core::LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
393 lb_policy_name, lb_policy_args);
394 if (GPR_UNLIKELY(new_lb_policy == nullptr)) {
395 gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name);
396 } else {
397 if (grpc_client_channel_trace.enabled()) {
398 gpr_log(GPR_INFO, "chand=%p: created new LB policy \"%s\" (%p)", chand,
399 lb_policy_name, new_lb_policy.get());
Mark D. Roth9fe284e2016-09-12 11:22:27 -0700400 }
Mark D. Roth4f0dec72018-07-11 11:46:11 -0700401 // Swap out the LB policy and update the fds in
402 // chand->interested_parties.
Craig Tiller4782d922017-11-10 09:53:21 -0800403 if (chand->lb_policy != nullptr) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700404 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth4f0dec72018-07-11 11:46:11 -0700405 gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", chand,
Mark D. Rothc8875492018-02-20 08:33:48 -0800406 chand->lb_policy.get());
Mark D. Roth60751fe2017-07-07 12:50:33 -0700407 }
Mark D. Rothc8875492018-02-20 08:33:48 -0800408 grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700409 chand->interested_parties);
Mark D. Rothc8875492018-02-20 08:33:48 -0800410 chand->lb_policy->HandOffPendingPicksLocked(new_lb_policy.get());
Craig Tiller45724b32015-09-22 10:42:19 -0700411 }
Mark D. Rothc8875492018-02-20 08:33:48 -0800412 chand->lb_policy = std::move(new_lb_policy);
Mark D. Roth4f0dec72018-07-11 11:46:11 -0700413 grpc_pollset_set_add_pollset_set(chand->lb_policy->interested_parties(),
414 chand->interested_parties);
415 // Set up re-resolution callback.
416 reresolution_request_args* args =
417 static_cast<reresolution_request_args*>(gpr_zalloc(sizeof(*args)));
418 args->chand = chand;
419 args->lb_policy = chand->lb_policy.get();
420 GRPC_CLOSURE_INIT(&args->closure, request_reresolution_locked, args,
421 grpc_combiner_scheduler(chand->combiner));
422 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "re-resolution");
423 chand->lb_policy->SetReresolutionClosureLocked(&args->closure);
424 // Get the new LB policy's initial connectivity state and start a
425 // connectivity watch.
426 GRPC_ERROR_UNREF(*connectivity_error);
427 *connectivity_state =
428 chand->lb_policy->CheckConnectivityLocked(connectivity_error);
429 if (chand->exit_idle_when_lb_policy_arrives) {
430 chand->lb_policy->ExitIdleLocked();
431 chand->exit_idle_when_lb_policy_arrives = false;
432 }
433 watch_lb_policy_locked(chand, chand->lb_policy.get(), *connectivity_state);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700434 }
Mark D. Roth4f0dec72018-07-11 11:46:11 -0700435}
436
437// Returns the service config (as a JSON string) from the resolver result.
438// Also updates state in chand.
439static grpc_core::UniquePtr<char>
440get_service_config_from_resolver_result_locked(channel_data* chand) {
441 const grpc_arg* channel_arg =
442 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG);
443 const char* service_config_json = grpc_channel_arg_get_string(channel_arg);
444 if (service_config_json != nullptr) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700445 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth4f0dec72018-07-11 11:46:11 -0700446 gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"",
447 chand, service_config_json);
Mark D. Roth60751fe2017-07-07 12:50:33 -0700448 }
Mark D. Roth4f0dec72018-07-11 11:46:11 -0700449 grpc_core::UniquePtr<grpc_core::ServiceConfig> service_config =
450 grpc_core::ServiceConfig::Create(service_config_json);
451 if (service_config != nullptr) {
452 if (chand->enable_retries) {
453 channel_arg =
454 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI);
455 const char* server_uri = grpc_channel_arg_get_string(channel_arg);
456 GPR_ASSERT(server_uri != nullptr);
457 grpc_uri* uri = grpc_uri_parse(server_uri, true);
458 GPR_ASSERT(uri->path[0] != '\0');
459 service_config_parsing_state parsing_state;
460 memset(&parsing_state, 0, sizeof(parsing_state));
461 parsing_state.server_name =
462 uri->path[0] == '/' ? uri->path + 1 : uri->path;
463 service_config->ParseGlobalParams(parse_retry_throttle_params,
464 &parsing_state);
465 grpc_uri_destroy(uri);
466 chand->retry_throttle_data =
467 std::move(parsing_state.retry_throttle_data);
Mark D. Roth60751fe2017-07-07 12:50:33 -0700468 }
Mark D. Roth4f0dec72018-07-11 11:46:11 -0700469 chand->method_params_table = service_config->CreateMethodConfigTable(
470 ClientChannelMethodParams::CreateFromJson);
Craig Tiller76a5c0e2016-03-09 09:05:30 -0800471 }
Craig Tillera82950e2015-09-22 12:33:20 -0700472 }
Mark D. Roth4f0dec72018-07-11 11:46:11 -0700473 return grpc_core::UniquePtr<char>(gpr_strdup(service_config_json));
474}
475
476// Callback invoked when a resolver result is available.
477static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
478 channel_data* chand = static_cast<channel_data*>(arg);
479 if (grpc_client_channel_trace.enabled()) {
480 const char* disposition =
481 chand->resolver_result != nullptr
482 ? ""
483 : (error == GRPC_ERROR_NONE ? " (transient error)"
484 : " (resolver shutdown)");
485 gpr_log(GPR_INFO,
486 "chand=%p: got resolver result: resolver_result=%p error=%s%s",
487 chand, chand->resolver_result, grpc_error_string(error),
488 disposition);
489 }
490 // Handle shutdown.
491 if (error != GRPC_ERROR_NONE || chand->resolver == nullptr) {
492 on_resolver_shutdown_locked(chand, GRPC_ERROR_REF(error));
493 return;
494 }
495 // Data used to set the channel's connectivity state.
496 bool set_connectivity_state = true;
497 grpc_connectivity_state connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
498 grpc_error* connectivity_error =
499 GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy");
500 // chand->resolver_result will be null in the case of a transient
501 // resolution error. In that case, we don't have any new result to
502 // process, which means that we keep using the previous result (if any).
503 if (chand->resolver_result == nullptr) {
504 if (grpc_client_channel_trace.enabled()) {
505 gpr_log(GPR_INFO, "chand=%p: resolver transient failure", chand);
506 }
507 } else {
508 grpc_core::UniquePtr<char> lb_policy_name =
509 get_lb_policy_name_from_resolver_result_locked(chand);
510 // Check to see if we're already using the right LB policy.
511 // Note: It's safe to use chand->info_lb_policy_name here without
512 // taking a lock on chand->info_mu, because this function is the
513 // only thing that modifies its value, and it can only be invoked
514 // once at any given time.
515 bool lb_policy_name_changed = chand->info_lb_policy_name == nullptr ||
516 gpr_stricmp(chand->info_lb_policy_name.get(),
517 lb_policy_name.get()) != 0;
518 if (chand->lb_policy != nullptr && !lb_policy_name_changed) {
519 // Continue using the same LB policy. Update with new addresses.
520 if (grpc_client_channel_trace.enabled()) {
521 gpr_log(GPR_INFO, "chand=%p: updating existing LB policy \"%s\" (%p)",
522 chand, lb_policy_name.get(), chand->lb_policy.get());
523 }
524 chand->lb_policy->UpdateLocked(*chand->resolver_result);
525 // No need to set the channel's connectivity state; the existing
526 // watch on the LB policy will take care of that.
527 set_connectivity_state = false;
528 } else {
529 // Instantiate new LB policy.
530 create_new_lb_policy_locked(chand, lb_policy_name.get(),
531 &connectivity_state, &connectivity_error);
532 }
533 // Find service config.
534 grpc_core::UniquePtr<char> service_config_json =
535 get_service_config_from_resolver_result_locked(chand);
536 // Swap out the data used by cc_get_channel_info().
537 gpr_mu_lock(&chand->info_mu);
538 chand->info_lb_policy_name = std::move(lb_policy_name);
539 chand->info_service_config_json = std::move(service_config_json);
540 gpr_mu_unlock(&chand->info_mu);
541 // Clean up.
542 grpc_channel_args_destroy(chand->resolver_result);
543 chand->resolver_result = nullptr;
544 }
545 // Set the channel's connectivity state if needed.
546 if (set_connectivity_state) {
547 set_channel_connectivity_state_locked(
548 chand, connectivity_state, connectivity_error, "resolver_result");
549 } else {
550 GRPC_ERROR_UNREF(connectivity_error);
551 }
552 // Invoke closures that were waiting for results and renew the watch.
553 GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
554 chand->resolver->NextLocked(&chand->resolver_result,
555 &chand->on_resolver_result_changed);
Craig Tiller3f475422015-06-25 10:43:05 -0700556}
557
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800558static void start_transport_op_locked(void* arg, grpc_error* error_ignored) {
Noah Eisenbe82e642018-02-09 09:16:55 -0800559 grpc_transport_op* op = static_cast<grpc_transport_op*>(arg);
Craig Tillerbaa14a92017-11-03 09:09:36 -0700560 grpc_channel_element* elem =
Noah Eisenbe82e642018-02-09 09:16:55 -0800561 static_cast<grpc_channel_element*>(op->handler_private.extra_arg);
562 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Craig Tiller000cd8f2015-09-18 07:20:29 -0700563
Craig Tiller4782d922017-11-10 09:53:21 -0800564 if (op->on_connectivity_state_change != nullptr) {
Craig Tillera82950e2015-09-22 12:33:20 -0700565 grpc_connectivity_state_notify_on_state_change(
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800566 &chand->state_tracker, op->connectivity_state,
Craig Tillera82950e2015-09-22 12:33:20 -0700567 op->on_connectivity_state_change);
Craig Tiller4782d922017-11-10 09:53:21 -0800568 op->on_connectivity_state_change = nullptr;
569 op->connectivity_state = nullptr;
Craig Tillera82950e2015-09-22 12:33:20 -0700570 }
571
Yuchen Zengc272dd72017-12-05 12:18:34 -0800572 if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
Craig Tiller4782d922017-11-10 09:53:21 -0800573 if (chand->lb_policy == nullptr) {
ncteisen274bbbe2017-06-08 14:57:11 -0700574 GRPC_CLOSURE_SCHED(
Yash Tibrewald6c292f2017-12-07 19:38:43 -0800575 op->send_ping.on_initiate,
Yuchen Zengc272dd72017-12-05 12:18:34 -0800576 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing"));
577 GRPC_CLOSURE_SCHED(
Yash Tibrewald6c292f2017-12-07 19:38:43 -0800578 op->send_ping.on_ack,
ncteisen4b36a3d2017-03-13 19:08:06 -0700579 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing"));
Craig Tiller26dab312015-12-07 14:43:47 -0800580 } else {
Mark D. Rothc8875492018-02-20 08:33:48 -0800581 chand->lb_policy->PingOneLocked(op->send_ping.on_initiate,
582 op->send_ping.on_ack);
Craig Tiller4782d922017-11-10 09:53:21 -0800583 op->bind_pollset = nullptr;
Craig Tiller26dab312015-12-07 14:43:47 -0800584 }
Yuchen Zengc272dd72017-12-05 12:18:34 -0800585 op->send_ping.on_initiate = nullptr;
586 op->send_ping.on_ack = nullptr;
Craig Tiller26dab312015-12-07 14:43:47 -0800587 }
588
Craig Tiller1c51edc2016-05-07 16:18:43 -0700589 if (op->disconnect_with_error != GRPC_ERROR_NONE) {
Craig Tiller4782d922017-11-10 09:53:21 -0800590 if (chand->resolver != nullptr) {
Craig Tiller1c51edc2016-05-07 16:18:43 -0700591 set_channel_connectivity_state_locked(
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800592 chand, GRPC_CHANNEL_SHUTDOWN,
Craig Tiller1c51edc2016-05-07 16:18:43 -0700593 GRPC_ERROR_REF(op->disconnect_with_error), "disconnect");
Mark D. Roth209f6442018-02-08 10:26:46 -0800594 chand->resolver.reset();
Craig Tiller1c51edc2016-05-07 16:18:43 -0700595 if (!chand->started_resolving) {
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700596 grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures,
Craig Tiller1c51edc2016-05-07 16:18:43 -0700597 GRPC_ERROR_REF(op->disconnect_with_error));
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800598 GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
Craig Tiller1c51edc2016-05-07 16:18:43 -0700599 }
Craig Tiller4782d922017-11-10 09:53:21 -0800600 if (chand->lb_policy != nullptr) {
Mark D. Rothc8875492018-02-20 08:33:48 -0800601 grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
Craig Tiller1c51edc2016-05-07 16:18:43 -0700602 chand->interested_parties);
Mark D. Rothc8875492018-02-20 08:33:48 -0800603 chand->lb_policy.reset();
Craig Tiller1c51edc2016-05-07 16:18:43 -0700604 }
Craig Tillerb12d22a2016-04-23 12:50:21 -0700605 }
Craig Tiller1c51edc2016-05-07 16:18:43 -0700606 GRPC_ERROR_UNREF(op->disconnect_with_error);
Craig Tillera82950e2015-09-22 12:33:20 -0700607 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800608 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "start_transport_op");
Craig Tillerd2e5cfc2017-02-09 13:02:20 -0800609
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800610 GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE);
Craig Tillerbefafe62017-02-09 11:30:54 -0800611}
612
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800613static void cc_start_transport_op(grpc_channel_element* elem,
Craig Tillerbaa14a92017-11-03 09:09:36 -0700614 grpc_transport_op* op) {
Noah Eisenbe82e642018-02-09 09:16:55 -0800615 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Craig Tillerbefafe62017-02-09 11:30:54 -0800616
Craig Tillerbefafe62017-02-09 11:30:54 -0800617 GPR_ASSERT(op->set_accept_stream == false);
Craig Tiller4782d922017-11-10 09:53:21 -0800618 if (op->bind_pollset != nullptr) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800619 grpc_pollset_set_add_pollset(chand->interested_parties, op->bind_pollset);
Craig Tillerbefafe62017-02-09 11:30:54 -0800620 }
621
Craig Tillerc55c1022017-03-10 10:26:42 -0800622 op->handler_private.extra_arg = elem;
Craig Tillerd2e5cfc2017-02-09 13:02:20 -0800623 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "start_transport_op");
ncteisen274bbbe2017-06-08 14:57:11 -0700624 GRPC_CLOSURE_SCHED(
ncteisen274bbbe2017-06-08 14:57:11 -0700625 GRPC_CLOSURE_INIT(&op->handler_private.closure, start_transport_op_locked,
Craig Tilleree4b1452017-05-12 10:56:03 -0700626 op, grpc_combiner_scheduler(chand->combiner)),
Craig Tillerbefafe62017-02-09 11:30:54 -0800627 GRPC_ERROR_NONE);
Craig Tillerca3e9d32015-06-27 18:37:27 -0700628}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800629
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800630static void cc_get_channel_info(grpc_channel_element* elem,
Craig Tillerbaa14a92017-11-03 09:09:36 -0700631 const grpc_channel_info* info) {
Noah Eisenbe82e642018-02-09 09:16:55 -0800632 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Craig Tiller613dafa2017-02-09 12:00:43 -0800633 gpr_mu_lock(&chand->info_mu);
Craig Tiller4782d922017-11-10 09:53:21 -0800634 if (info->lb_policy_name != nullptr) {
Mark D. Roth4f0dec72018-07-11 11:46:11 -0700635 *info->lb_policy_name = gpr_strdup(chand->info_lb_policy_name.get());
Mark D. Rothb2d24882016-10-27 15:44:07 -0700636 }
Craig Tiller4782d922017-11-10 09:53:21 -0800637 if (info->service_config_json != nullptr) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800638 *info->service_config_json =
Mark D. Roth4f0dec72018-07-11 11:46:11 -0700639 gpr_strdup(chand->info_service_config_json.get());
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800640 }
Craig Tiller613dafa2017-02-09 12:00:43 -0800641 gpr_mu_unlock(&chand->info_mu);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700642}
643
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700644/* Constructor for channel_data */
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800645static grpc_error* cc_init_channel_elem(grpc_channel_element* elem,
Craig Tillerbaa14a92017-11-03 09:09:36 -0700646 grpc_channel_element_args* args) {
Noah Eisenbe82e642018-02-09 09:16:55 -0800647 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700648 GPR_ASSERT(args->is_last);
649 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800650 // Initialize data members.
Craig Tilleree4b1452017-05-12 10:56:03 -0700651 chand->combiner = grpc_combiner_create();
Craig Tillerd85477512017-02-09 12:02:39 -0800652 gpr_mu_init(&chand->info_mu);
Alexander Polcync3b1f182017-04-18 13:51:36 -0700653 gpr_mu_init(&chand->external_connectivity_watcher_list_mu);
654
655 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
Craig Tiller4782d922017-11-10 09:53:21 -0800656 chand->external_connectivity_watcher_list_head = nullptr;
Alexander Polcync3b1f182017-04-18 13:51:36 -0700657 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
658
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800659 chand->owning_stack = args->channel_stack;
ncteisen274bbbe2017-06-08 14:57:11 -0700660 GRPC_CLOSURE_INIT(&chand->on_resolver_result_changed,
Craig Tillerbefafe62017-02-09 11:30:54 -0800661 on_resolver_result_changed_locked, chand,
Craig Tilleree4b1452017-05-12 10:56:03 -0700662 grpc_combiner_scheduler(chand->combiner));
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800663 chand->interested_parties = grpc_pollset_set_create();
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700664 grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
665 "client_channel");
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800666 grpc_client_channel_start_backup_polling(chand->interested_parties);
Mark D. Roth718c8342018-02-28 13:00:04 -0800667 // Record max per-RPC retry buffer size.
Noah Eisen7ea8a602018-06-14 11:43:18 -0400668 const grpc_arg* arg = grpc_channel_args_find(
669 args->channel_args, GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE);
670 chand->per_rpc_retry_buffer_size = (size_t)grpc_channel_arg_get_integer(
671 arg, {DEFAULT_PER_RPC_RETRY_BUFFER_SIZE, 0, INT_MAX});
Mark D. Roth718c8342018-02-28 13:00:04 -0800672 // Record enable_retries.
Noah Eisen7ea8a602018-06-14 11:43:18 -0400673 arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_ENABLE_RETRIES);
674 chand->enable_retries = grpc_channel_arg_get_bool(arg, true);
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800675 // Record client channel factory.
Noah Eisen7ea8a602018-06-14 11:43:18 -0400676 arg = grpc_channel_args_find(args->channel_args,
677 GRPC_ARG_CLIENT_CHANNEL_FACTORY);
678 if (arg == nullptr) {
David Garcia Quintas228a5142017-03-30 19:43:00 -0700679 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
Noah Eisen7ea8a602018-06-14 11:43:18 -0400680 "Missing client channel factory in args for client channel filter");
David Garcia Quintas228a5142017-03-30 19:43:00 -0700681 }
Noah Eisen7ea8a602018-06-14 11:43:18 -0400682 if (arg->type != GRPC_ARG_POINTER) {
David Garcia Quintas228a5142017-03-30 19:43:00 -0700683 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
Noah Eisen7ea8a602018-06-14 11:43:18 -0400684 "client channel factory arg must be a pointer");
685 }
686 grpc_client_channel_factory_ref(
687 static_cast<grpc_client_channel_factory*>(arg->value.pointer.p));
688 chand->client_channel_factory =
689 static_cast<grpc_client_channel_factory*>(arg->value.pointer.p);
690 // Get server name to resolve, using proxy mapper if needed.
691 arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI);
692 if (arg == nullptr) {
693 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
694 "Missing server uri in args for client channel filter");
695 }
696 if (arg->type != GRPC_ARG_STRING) {
697 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
698 "server uri arg must be a string");
David Garcia Quintas228a5142017-03-30 19:43:00 -0700699 }
Craig Tiller4782d922017-11-10 09:53:21 -0800700 char* proxy_name = nullptr;
701 grpc_channel_args* new_args = nullptr;
Noah Eisen7ea8a602018-06-14 11:43:18 -0400702 grpc_proxy_mappers_map_name(arg->value.string, args->channel_args,
703 &proxy_name, &new_args);
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800704 // Instantiate resolver.
Mark D. Roth209f6442018-02-08 10:26:46 -0800705 chand->resolver = grpc_core::ResolverRegistry::CreateResolver(
Noah Eisen7ea8a602018-06-14 11:43:18 -0400706 proxy_name != nullptr ? proxy_name : arg->value.string,
Craig Tiller4782d922017-11-10 09:53:21 -0800707 new_args != nullptr ? new_args : args->channel_args,
Craig Tiller972470b2017-02-09 15:05:36 -0800708 chand->interested_parties, chand->combiner);
Craig Tiller4782d922017-11-10 09:53:21 -0800709 if (proxy_name != nullptr) gpr_free(proxy_name);
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800710 if (new_args != nullptr) grpc_channel_args_destroy(new_args);
Craig Tiller4782d922017-11-10 09:53:21 -0800711 if (chand->resolver == nullptr) {
ncteisen4b36a3d2017-03-13 19:08:06 -0700712 return GRPC_ERROR_CREATE_FROM_STATIC_STRING("resolver creation failed");
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800713 }
Craig Tiller3be7dd02017-04-03 14:30:03 -0700714 chand->deadline_checking_enabled =
715 grpc_deadline_checking_enabled(args->channel_args);
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800716 return GRPC_ERROR_NONE;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700717}
718
719/* Destructor for channel_data */
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800720static void cc_destroy_channel_elem(grpc_channel_element* elem) {
Noah Eisenbe82e642018-02-09 09:16:55 -0800721 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Craig Tiller4782d922017-11-10 09:53:21 -0800722 if (chand->resolver != nullptr) {
Mark D. Roth4f0dec72018-07-11 11:46:11 -0700723 // The only way we can get here is if we never started resolving,
724 // because we take a ref to the channel stack when we start
725 // resolving and do not release it until the resolver callback is
726 // invoked after the resolver shuts down.
727 chand->resolver.reset();
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700728 }
Craig Tiller4782d922017-11-10 09:53:21 -0800729 if (chand->client_channel_factory != nullptr) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800730 grpc_client_channel_factory_unref(chand->client_channel_factory);
Mark D. Roth0e48a9a2016-09-08 14:14:39 -0700731 }
Craig Tiller4782d922017-11-10 09:53:21 -0800732 if (chand->lb_policy != nullptr) {
Mark D. Rothc8875492018-02-20 08:33:48 -0800733 grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700734 chand->interested_parties);
Mark D. Rothc8875492018-02-20 08:33:48 -0800735 chand->lb_policy.reset();
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700736 }
Mark D. Roth4f0dec72018-07-11 11:46:11 -0700737 // TODO(roth): Once we convert the filter API to C++, there will no
738 // longer be any need to explicitly reset these smart pointer data members.
739 chand->info_lb_policy_name.reset();
740 chand->info_service_config_json.reset();
Mark D. Roth9db86fc2018-03-28 07:42:20 -0700741 chand->retry_throttle_data.reset();
742 chand->method_params_table.reset();
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800743 grpc_client_channel_stop_backup_polling(chand->interested_parties);
744 grpc_connectivity_state_destroy(&chand->state_tracker);
745 grpc_pollset_set_destroy(chand->interested_parties);
746 GRPC_COMBINER_UNREF(chand->combiner, "client_channel");
Craig Tillerd85477512017-02-09 12:02:39 -0800747 gpr_mu_destroy(&chand->info_mu);
Alexander Polcync3b1f182017-04-18 13:51:36 -0700748 gpr_mu_destroy(&chand->external_connectivity_watcher_list_mu);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700749}
750
751/*************************************************************************
752 * PER-CALL FUNCTIONS
753 */
754
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700755// Max number of batches that can be pending on a call at any given
Mark D. Roth718c8342018-02-28 13:00:04 -0800756// time. This includes one batch for each of the following ops:
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700757// recv_initial_metadata
758// send_initial_metadata
759// recv_message
760// send_message
761// recv_trailing_metadata
762// send_trailing_metadata
Mark D. Roth718c8342018-02-28 13:00:04 -0800763#define MAX_PENDING_BATCHES 6
764
765// Retry support:
766//
767// In order to support retries, we act as a proxy for stream op batches.
768// When we get a batch from the surface, we add it to our list of pending
769// batches, and we then use those batches to construct separate "child"
770// batches to be started on the subchannel call. When the child batches
771// return, we then decide which pending batches have been completed and
772// schedule their callbacks accordingly. If a subchannel call fails and
773// we want to retry it, we do a new pick and start again, constructing
774// new "child" batches for the new subchannel call.
775//
776// Note that retries are committed when receiving data from the server
777// (except for Trailers-Only responses). However, there may be many
778// send ops started before receiving any data, so we may have already
779// completed some number of send ops (and returned the completions up to
780// the surface) by the time we realize that we need to retry. To deal
781// with this, we cache data for send ops, so that we can replay them on a
782// different subchannel call even after we have completed the original
783// batches.
784//
785// There are two sets of data to maintain:
786// - In call_data (in the parent channel), we maintain a list of pending
787// ops and cached data for send ops.
788// - In the subchannel call, we maintain state to indicate what ops have
789// already been sent down to that call.
790//
791// When constructing the "child" batches, we compare those two sets of
792// data to see which batches need to be sent to the subchannel call.
793
794// TODO(roth): In subsequent PRs:
795// - add support for transparent retries (including initial metadata)
796// - figure out how to record stats in census for retries
797// (census filter is on top of this one)
798// - add census stats for retries
799
800// State used for starting a retryable batch on a subchannel call.
801// This provides its own grpc_transport_stream_op_batch and other data
802// structures needed to populate the ops in the batch.
803// We allocate one struct on the arena for each attempt at starting a
804// batch on a given subchannel call.
805typedef struct {
806 gpr_refcount refs;
807 grpc_call_element* elem;
808 grpc_subchannel_call* subchannel_call; // Holds a ref.
809 // The batch to use in the subchannel call.
810 // Its payload field points to subchannel_call_retry_state.batch_payload.
811 grpc_transport_stream_op_batch batch;
Mark D. Roth5bacf2e2018-06-19 08:19:58 -0700812 // For intercepting on_complete.
813 grpc_closure on_complete;
814} subchannel_batch_data;
815
816// Retry state associated with a subchannel call.
817// Stored in the parent_data of the subchannel call object.
818typedef struct {
819 // subchannel_batch_data.batch.payload points to this.
820 grpc_transport_stream_op_batch_payload batch_payload;
Mark D. Roth718c8342018-02-28 13:00:04 -0800821 // For send_initial_metadata.
822 // Note that we need to make a copy of the initial metadata for each
823 // subchannel call instead of just referring to the copy in call_data,
824 // because filters in the subchannel stack will probably add entries,
825 // so we need to start in a pristine state for each attempt of the call.
826 grpc_linked_mdelem* send_initial_metadata_storage;
827 grpc_metadata_batch send_initial_metadata;
828 // For send_message.
Mark D. Roth3d8b32d2018-03-09 13:25:40 -0800829 grpc_core::ManualConstructor<grpc_core::ByteStreamCache::CachingByteStream>
830 send_message;
Mark D. Roth718c8342018-02-28 13:00:04 -0800831 // For send_trailing_metadata.
832 grpc_linked_mdelem* send_trailing_metadata_storage;
833 grpc_metadata_batch send_trailing_metadata;
834 // For intercepting recv_initial_metadata.
835 grpc_metadata_batch recv_initial_metadata;
836 grpc_closure recv_initial_metadata_ready;
837 bool trailing_metadata_available;
838 // For intercepting recv_message.
839 grpc_closure recv_message_ready;
Mark D. Roth3d8b32d2018-03-09 13:25:40 -0800840 grpc_core::OrphanablePtr<grpc_core::ByteStream> recv_message;
Mark D. Roth718c8342018-02-28 13:00:04 -0800841 // For intercepting recv_trailing_metadata.
842 grpc_metadata_batch recv_trailing_metadata;
843 grpc_transport_stream_stats collect_stats;
Mark D. Roth817d28f2018-06-14 09:44:58 -0700844 grpc_closure recv_trailing_metadata_ready;
Mark D. Roth718c8342018-02-28 13:00:04 -0800845 // These fields indicate which ops have been started and completed on
846 // this subchannel call.
847 size_t started_send_message_count;
848 size_t completed_send_message_count;
849 size_t started_recv_message_count;
850 size_t completed_recv_message_count;
851 bool started_send_initial_metadata : 1;
852 bool completed_send_initial_metadata : 1;
853 bool started_send_trailing_metadata : 1;
854 bool completed_send_trailing_metadata : 1;
855 bool started_recv_initial_metadata : 1;
856 bool completed_recv_initial_metadata : 1;
857 bool started_recv_trailing_metadata : 1;
858 bool completed_recv_trailing_metadata : 1;
859 // State for callback processing.
860 bool retry_dispatched : 1;
Mark D. Rothde077ac2018-04-12 08:05:44 -0700861 subchannel_batch_data* recv_initial_metadata_ready_deferred_batch;
Mark D. Roth718c8342018-02-28 13:00:04 -0800862 grpc_error* recv_initial_metadata_error;
Mark D. Rothde077ac2018-04-12 08:05:44 -0700863 subchannel_batch_data* recv_message_ready_deferred_batch;
Mark D. Roth718c8342018-02-28 13:00:04 -0800864 grpc_error* recv_message_error;
Mark D. Rothde077ac2018-04-12 08:05:44 -0700865 subchannel_batch_data* recv_trailing_metadata_internal_batch;
Mark D. Roth718c8342018-02-28 13:00:04 -0800866} subchannel_call_retry_state;
867
868// Pending batches stored in call data.
869typedef struct {
870 // The pending batch. If nullptr, this slot is empty.
871 grpc_transport_stream_op_batch* batch;
872 // Indicates whether payload for send ops has been cached in call data.
873 bool send_ops_cached;
874} pending_batch;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700875
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700876/** Call data. Holds a pointer to grpc_subchannel_call and the
877 associated machinery to create such a pointer.
878 Handles queueing of stream ops until a call object is ready, waiting
879 for initial metadata before trying to create a call object,
880 and handling cancellation gracefully. */
881typedef struct client_channel_call_data {
Mark D. Roth72f6da82016-09-02 13:42:38 -0700882 // State for handling deadlines.
883 // The code in deadline_filter.c requires this to be the first field.
Mark D. Roth72f6da82016-09-02 13:42:38 -0700884 // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
Mark D. Roth66f3d2b2017-09-01 09:02:17 -0700885 // and this struct both independently store pointers to the call stack
886 // and call combiner. If/when we have time, find a way to avoid this
887 // without breaking the grpc_deadline_state abstraction.
Mark D. Roth72f6da82016-09-02 13:42:38 -0700888 grpc_deadline_state deadline_state;
Mark D. Rothf28763c2016-09-14 15:18:40 -0700889
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800890 grpc_slice path; // Request path.
Mark D. Rothe40dd292016-10-05 14:58:37 -0700891 gpr_timespec call_start_time;
Craig Tiller89c14282017-07-19 15:32:27 -0700892 grpc_millis deadline;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700893 gpr_arena* arena;
894 grpc_call_stack* owning_call;
895 grpc_call_combiner* call_combiner;
Mark D. Roth76e264b2017-08-25 09:03:33 -0700896
Mark D. Roth9db86fc2018-03-28 07:42:20 -0700897 grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
Mark D. Roth3e7f2df2018-02-26 13:17:06 -0800898 grpc_core::RefCountedPtr<ClientChannelMethodParams> method_params;
Mark D. Rothaa850a72016-09-26 13:38:02 -0700899
Craig Tillerbaa14a92017-11-03 09:09:36 -0700900 grpc_subchannel_call* subchannel_call;
Mark D. Roth718c8342018-02-28 13:00:04 -0800901
902 // Set when we get a cancel_stream op.
903 grpc_error* cancel_error;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700904
Mark D. Rothc8875492018-02-20 08:33:48 -0800905 grpc_core::LoadBalancingPolicy::PickState pick;
Mark D. Roth718c8342018-02-28 13:00:04 -0800906 grpc_closure pick_closure;
907 grpc_closure pick_cancel_closure;
Mark D. Roth60751fe2017-07-07 12:50:33 -0700908
Craig Tillerbaa14a92017-11-03 09:09:36 -0700909 grpc_polling_entity* pollent;
Mark D. Roth7e0e2022018-06-01 12:04:16 -0700910 bool pollent_added_to_interested_parties;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700911
Mark D. Roth718c8342018-02-28 13:00:04 -0800912 // Batches are added to this list when received from above.
913 // They are removed when we are done handling the batch (i.e., when
914 // either we have invoked all of the batch's callbacks or we have
915 // passed the batch down to the subchannel call and are not
916 // intercepting any of its callbacks).
917 pending_batch pending_batches[MAX_PENDING_BATCHES];
918 bool pending_send_initial_metadata : 1;
919 bool pending_send_message : 1;
920 bool pending_send_trailing_metadata : 1;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700921
Mark D. Roth718c8342018-02-28 13:00:04 -0800922 // Retry state.
923 bool enable_retries : 1;
924 bool retry_committed : 1;
925 bool last_attempt_got_server_pushback : 1;
926 int num_attempts_completed;
927 size_t bytes_buffered_for_retry;
928 grpc_core::ManualConstructor<grpc_core::BackOff> retry_backoff;
929 grpc_timer retry_timer;
David Garcia Quintasd1a47f12016-09-02 12:46:44 +0200930
Mark D. Roth4f9e0032018-05-24 09:30:09 -0700931 // The number of pending retriable subchannel batches containing send ops.
932 // We hold a ref to the call stack while this is non-zero, since replay
933 // batches may not complete until after all callbacks have been returned
934 // to the surface, and we need to make sure that the call is not destroyed
935 // until all of these batches have completed.
936 // Note that we actually only need to track replay batches, but it's
937 // easier to track all batches with send ops.
938 int num_pending_retriable_subchannel_send_batches;
939
Mark D. Roth718c8342018-02-28 13:00:04 -0800940 // Cached data for retrying send ops.
941 // send_initial_metadata
942 bool seen_send_initial_metadata;
943 grpc_linked_mdelem* send_initial_metadata_storage;
944 grpc_metadata_batch send_initial_metadata;
945 uint32_t send_initial_metadata_flags;
946 gpr_atm* peer_string;
947 // send_message
948 // When we get a send_message op, we replace the original byte stream
Mark D. Roth3d8b32d2018-03-09 13:25:40 -0800949 // with a CachingByteStream that caches the slices to a local buffer for
950 // use in retries.
Mark D. Roth718c8342018-02-28 13:00:04 -0800951 // Note: We inline the cache for the first 3 send_message ops and use
952 // dynamic allocation after that. This number was essentially picked
953 // at random; it could be changed in the future to tune performance.
Mark D. Rothefcd45b2018-03-28 10:49:59 -0700954 grpc_core::ManualConstructor<
955 grpc_core::InlinedVector<grpc_core::ByteStreamCache*, 3>>
956 send_messages;
Mark D. Roth718c8342018-02-28 13:00:04 -0800957 // send_trailing_metadata
958 bool seen_send_trailing_metadata;
959 grpc_linked_mdelem* send_trailing_metadata_storage;
960 grpc_metadata_batch send_trailing_metadata;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700961} call_data;
962
Mark D. Roth718c8342018-02-28 13:00:04 -0800963// Forward declarations.
964static void retry_commit(grpc_call_element* elem,
965 subchannel_call_retry_state* retry_state);
966static void start_internal_recv_trailing_metadata(grpc_call_element* elem);
967static void on_complete(void* arg, grpc_error* error);
968static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored);
Mark D. Roth718c8342018-02-28 13:00:04 -0800969static void start_pick_locked(void* arg, grpc_error* ignored);
Craig Tiller8b1d59c2016-12-27 15:15:30 -0800970
Mark D. Roth718c8342018-02-28 13:00:04 -0800971//
972// send op data caching
973//
974
975// Caches data for send ops so that it can be retried later, if not
976// already cached.
977static void maybe_cache_send_ops_for_batch(call_data* calld,
978 pending_batch* pending) {
979 if (pending->send_ops_cached) return;
980 pending->send_ops_cached = true;
981 grpc_transport_stream_op_batch* batch = pending->batch;
982 // Save a copy of metadata for send_initial_metadata ops.
Mark D. Roth76e264b2017-08-25 09:03:33 -0700983 if (batch->send_initial_metadata) {
Mark D. Roth718c8342018-02-28 13:00:04 -0800984 calld->seen_send_initial_metadata = true;
985 GPR_ASSERT(calld->send_initial_metadata_storage == nullptr);
986 grpc_metadata_batch* send_initial_metadata =
987 batch->payload->send_initial_metadata.send_initial_metadata;
988 calld->send_initial_metadata_storage = (grpc_linked_mdelem*)gpr_arena_alloc(
989 calld->arena,
990 sizeof(grpc_linked_mdelem) * send_initial_metadata->list.count);
991 grpc_metadata_batch_copy(send_initial_metadata,
992 &calld->send_initial_metadata,
993 calld->send_initial_metadata_storage);
994 calld->send_initial_metadata_flags =
995 batch->payload->send_initial_metadata.send_initial_metadata_flags;
996 calld->peer_string = batch->payload->send_initial_metadata.peer_string;
997 }
998 // Set up cache for send_message ops.
999 if (batch->send_message) {
Mark D. Roth3d8b32d2018-03-09 13:25:40 -08001000 grpc_core::ByteStreamCache* cache =
1001 static_cast<grpc_core::ByteStreamCache*>(
1002 gpr_arena_alloc(calld->arena, sizeof(grpc_core::ByteStreamCache)));
1003 new (cache) grpc_core::ByteStreamCache(
1004 std::move(batch->payload->send_message.send_message));
Mark D. Rothefcd45b2018-03-28 10:49:59 -07001005 calld->send_messages->push_back(cache);
Mark D. Roth718c8342018-02-28 13:00:04 -08001006 }
1007 // Save metadata batch for send_trailing_metadata ops.
1008 if (batch->send_trailing_metadata) {
1009 calld->seen_send_trailing_metadata = true;
1010 GPR_ASSERT(calld->send_trailing_metadata_storage == nullptr);
1011 grpc_metadata_batch* send_trailing_metadata =
1012 batch->payload->send_trailing_metadata.send_trailing_metadata;
1013 calld->send_trailing_metadata_storage =
1014 (grpc_linked_mdelem*)gpr_arena_alloc(
1015 calld->arena,
1016 sizeof(grpc_linked_mdelem) * send_trailing_metadata->list.count);
1017 grpc_metadata_batch_copy(send_trailing_metadata,
1018 &calld->send_trailing_metadata,
1019 calld->send_trailing_metadata_storage);
Mark D. Roth76e264b2017-08-25 09:03:33 -07001020 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001021}
1022
Mark D. Rothde077ac2018-04-12 08:05:44 -07001023// Frees cached send_initial_metadata.
1024static void free_cached_send_initial_metadata(channel_data* chand,
1025 call_data* calld) {
1026 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001027 gpr_log(GPR_INFO,
Mark D. Rothde077ac2018-04-12 08:05:44 -07001028 "chand=%p calld=%p: destroying calld->send_initial_metadata", chand,
1029 calld);
1030 }
1031 grpc_metadata_batch_destroy(&calld->send_initial_metadata);
1032}
1033
1034// Frees cached send_message at index idx.
1035static void free_cached_send_message(channel_data* chand, call_data* calld,
1036 size_t idx) {
1037 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001038 gpr_log(GPR_INFO,
Mark D. Rothde077ac2018-04-12 08:05:44 -07001039 "chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR "]",
1040 chand, calld, idx);
1041 }
1042 (*calld->send_messages)[idx]->Destroy();
1043}
1044
1045// Frees cached send_trailing_metadata.
1046static void free_cached_send_trailing_metadata(channel_data* chand,
1047 call_data* calld) {
1048 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001049 gpr_log(GPR_INFO,
Mark D. Rothde077ac2018-04-12 08:05:44 -07001050 "chand=%p calld=%p: destroying calld->send_trailing_metadata",
1051 chand, calld);
1052 }
1053 grpc_metadata_batch_destroy(&calld->send_trailing_metadata);
1054}
1055
Mark D. Roth718c8342018-02-28 13:00:04 -08001056// Frees cached send ops that have already been completed after
1057// committing the call.
1058static void free_cached_send_op_data_after_commit(
1059 grpc_call_element* elem, subchannel_call_retry_state* retry_state) {
Noah Eisenbe82e642018-02-09 09:16:55 -08001060 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1061 call_data* calld = static_cast<call_data*>(elem->call_data);
Mark D. Roth718c8342018-02-28 13:00:04 -08001062 if (retry_state->completed_send_initial_metadata) {
Mark D. Rothde077ac2018-04-12 08:05:44 -07001063 free_cached_send_initial_metadata(chand, calld);
Mark D. Roth60751fe2017-07-07 12:50:33 -07001064 }
Mark D. Roth718c8342018-02-28 13:00:04 -08001065 for (size_t i = 0; i < retry_state->completed_send_message_count; ++i) {
Mark D. Rothde077ac2018-04-12 08:05:44 -07001066 free_cached_send_message(chand, calld, i);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001067 }
Mark D. Roth718c8342018-02-28 13:00:04 -08001068 if (retry_state->completed_send_trailing_metadata) {
Mark D. Rothde077ac2018-04-12 08:05:44 -07001069 free_cached_send_trailing_metadata(chand, calld);
Mark D. Roth718c8342018-02-28 13:00:04 -08001070 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001071}
1072
Mark D. Roth718c8342018-02-28 13:00:04 -08001073// Frees cached send ops that were completed by the completed batch in
1074// batch_data. Used when batches are completed after the call is committed.
1075static void free_cached_send_op_data_for_completed_batch(
1076 grpc_call_element* elem, subchannel_batch_data* batch_data,
1077 subchannel_call_retry_state* retry_state) {
Noah Eisenbe82e642018-02-09 09:16:55 -08001078 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1079 call_data* calld = static_cast<call_data*>(elem->call_data);
Mark D. Roth718c8342018-02-28 13:00:04 -08001080 if (batch_data->batch.send_initial_metadata) {
Mark D. Rothde077ac2018-04-12 08:05:44 -07001081 free_cached_send_initial_metadata(chand, calld);
Mark D. Roth718c8342018-02-28 13:00:04 -08001082 }
1083 if (batch_data->batch.send_message) {
Mark D. Rothde077ac2018-04-12 08:05:44 -07001084 free_cached_send_message(chand, calld,
1085 retry_state->completed_send_message_count - 1);
Mark D. Roth718c8342018-02-28 13:00:04 -08001086 }
1087 if (batch_data->batch.send_trailing_metadata) {
Mark D. Rothde077ac2018-04-12 08:05:44 -07001088 free_cached_send_trailing_metadata(chand, calld);
Mark D. Roth718c8342018-02-28 13:00:04 -08001089 }
1090}
1091
1092//
1093// pending_batches management
1094//
1095
1096// Returns the index into calld->pending_batches to be used for batch.
1097static size_t get_batch_index(grpc_transport_stream_op_batch* batch) {
1098 // Note: It is important the send_initial_metadata be the first entry
1099 // here, since the code in pick_subchannel_locked() assumes it will be.
1100 if (batch->send_initial_metadata) return 0;
1101 if (batch->send_message) return 1;
1102 if (batch->send_trailing_metadata) return 2;
1103 if (batch->recv_initial_metadata) return 3;
1104 if (batch->recv_message) return 4;
1105 if (batch->recv_trailing_metadata) return 5;
1106 GPR_UNREACHABLE_CODE(return (size_t)-1);
1107}
1108
1109// This is called via the call combiner, so access to calld is synchronized.
1110static void pending_batches_add(grpc_call_element* elem,
1111 grpc_transport_stream_op_batch* batch) {
1112 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1113 call_data* calld = static_cast<call_data*>(elem->call_data);
1114 const size_t idx = get_batch_index(batch);
Craig Tiller6014e8a2017-10-16 13:50:29 -07001115 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001116 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08001117 "chand=%p calld=%p: adding pending batch at index %" PRIuPTR, chand,
1118 calld, idx);
Mark D. Roth60751fe2017-07-07 12:50:33 -07001119 }
Mark D. Roth718c8342018-02-28 13:00:04 -08001120 pending_batch* pending = &calld->pending_batches[idx];
1121 GPR_ASSERT(pending->batch == nullptr);
1122 pending->batch = batch;
1123 pending->send_ops_cached = false;
1124 if (calld->enable_retries) {
1125 // Update state in calld about pending batches.
1126 // Also check if the batch takes us over the retry buffer limit.
1127 // Note: We don't check the size of trailing metadata here, because
1128 // gRPC clients do not send trailing metadata.
1129 if (batch->send_initial_metadata) {
1130 calld->pending_send_initial_metadata = true;
1131 calld->bytes_buffered_for_retry += grpc_metadata_batch_size(
1132 batch->payload->send_initial_metadata.send_initial_metadata);
1133 }
1134 if (batch->send_message) {
1135 calld->pending_send_message = true;
1136 calld->bytes_buffered_for_retry +=
Mark D. Roth3d8b32d2018-03-09 13:25:40 -08001137 batch->payload->send_message.send_message->length();
Mark D. Roth718c8342018-02-28 13:00:04 -08001138 }
1139 if (batch->send_trailing_metadata) {
1140 calld->pending_send_trailing_metadata = true;
1141 }
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07001142 if (GPR_UNLIKELY(calld->bytes_buffered_for_retry >
1143 chand->per_rpc_retry_buffer_size)) {
Mark D. Roth718c8342018-02-28 13:00:04 -08001144 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001145 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08001146 "chand=%p calld=%p: exceeded retry buffer size, committing",
1147 chand, calld);
1148 }
1149 subchannel_call_retry_state* retry_state =
1150 calld->subchannel_call == nullptr
1151 ? nullptr
1152 : static_cast<subchannel_call_retry_state*>(
1153 grpc_connected_subchannel_call_get_parent_data(
1154 calld->subchannel_call));
1155 retry_commit(elem, retry_state);
1156 // If we are not going to retry and have not yet started, pretend
1157 // retries are disabled so that we don't bother with retry overhead.
1158 if (calld->num_attempts_completed == 0) {
1159 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001160 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08001161 "chand=%p calld=%p: disabling retries before first attempt",
1162 chand, calld);
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001163 }
Mark D. Roth718c8342018-02-28 13:00:04 -08001164 calld->enable_retries = false;
Craig Tiller11c17d42017-03-13 13:36:34 -07001165 }
1166 }
1167 }
Craig Tiller11c17d42017-03-13 13:36:34 -07001168}
Craig Tillerea4a4f12017-03-13 13:36:52 -07001169
Mark D. Roth718c8342018-02-28 13:00:04 -08001170static void pending_batch_clear(call_data* calld, pending_batch* pending) {
1171 if (calld->enable_retries) {
1172 if (pending->batch->send_initial_metadata) {
1173 calld->pending_send_initial_metadata = false;
1174 }
1175 if (pending->batch->send_message) {
1176 calld->pending_send_message = false;
1177 }
1178 if (pending->batch->send_trailing_metadata) {
1179 calld->pending_send_trailing_metadata = false;
1180 }
1181 }
1182 pending->batch = nullptr;
1183}
1184
1185// This is called via the call combiner, so access to calld is synchronized.
1186static void fail_pending_batch_in_call_combiner(void* arg, grpc_error* error) {
1187 grpc_transport_stream_op_batch* batch =
1188 static_cast<grpc_transport_stream_op_batch*>(arg);
1189 call_data* calld = static_cast<call_data*>(batch->handler_private.extra_arg);
1190 // Note: This will release the call combiner.
1191 grpc_transport_stream_op_batch_finish_with_failure(
1192 batch, GRPC_ERROR_REF(error), calld->call_combiner);
1193}
1194
1195// This is called via the call combiner, so access to calld is synchronized.
1196// If yield_call_combiner is true, assumes responsibility for yielding
1197// the call combiner.
1198static void pending_batches_fail(grpc_call_element* elem, grpc_error* error,
1199 bool yield_call_combiner) {
1200 GPR_ASSERT(error != GRPC_ERROR_NONE);
1201 call_data* calld = static_cast<call_data*>(elem->call_data);
1202 if (grpc_client_channel_trace.enabled()) {
1203 size_t num_batches = 0;
1204 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1205 if (calld->pending_batches[i].batch != nullptr) ++num_batches;
1206 }
Mark D. Roth48854d22018-04-25 13:05:26 -07001207 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08001208 "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
1209 elem->channel_data, calld, num_batches, grpc_error_string(error));
1210 }
Mark D. Roth817d28f2018-06-14 09:44:58 -07001211 grpc_core::CallCombinerClosureList closures;
Mark D. Roth718c8342018-02-28 13:00:04 -08001212 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1213 pending_batch* pending = &calld->pending_batches[i];
1214 grpc_transport_stream_op_batch* batch = pending->batch;
1215 if (batch != nullptr) {
Mark D. Roth817d28f2018-06-14 09:44:58 -07001216 batch->handler_private.extra_arg = calld;
1217 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
1218 fail_pending_batch_in_call_combiner, batch,
1219 grpc_schedule_on_exec_ctx);
1220 closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
1221 "pending_batches_fail");
Mark D. Roth718c8342018-02-28 13:00:04 -08001222 pending_batch_clear(calld, pending);
1223 }
1224 }
Mark D. Roth718c8342018-02-28 13:00:04 -08001225 if (yield_call_combiner) {
Mark D. Roth817d28f2018-06-14 09:44:58 -07001226 closures.RunClosures(calld->call_combiner);
1227 } else {
1228 closures.RunClosuresWithoutYielding(calld->call_combiner);
Mark D. Roth718c8342018-02-28 13:00:04 -08001229 }
1230 GRPC_ERROR_UNREF(error);
1231}
1232
1233// This is called via the call combiner, so access to calld is synchronized.
1234static void resume_pending_batch_in_call_combiner(void* arg,
1235 grpc_error* ignored) {
1236 grpc_transport_stream_op_batch* batch =
1237 static_cast<grpc_transport_stream_op_batch*>(arg);
1238 grpc_subchannel_call* subchannel_call =
1239 static_cast<grpc_subchannel_call*>(batch->handler_private.extra_arg);
1240 // Note: This will release the call combiner.
1241 grpc_subchannel_call_process_op(subchannel_call, batch);
1242}
1243
1244// This is called via the call combiner, so access to calld is synchronized.
1245static void pending_batches_resume(grpc_call_element* elem) {
Noah Eisenbe82e642018-02-09 09:16:55 -08001246 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1247 call_data* calld = static_cast<call_data*>(elem->call_data);
Mark D. Roth718c8342018-02-28 13:00:04 -08001248 if (calld->enable_retries) {
1249 start_retriable_subchannel_batches(elem, GRPC_ERROR_NONE);
1250 return;
1251 }
1252 // Retries not enabled; send down batches as-is.
1253 if (grpc_client_channel_trace.enabled()) {
1254 size_t num_batches = 0;
1255 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1256 if (calld->pending_batches[i].batch != nullptr) ++num_batches;
1257 }
Mark D. Roth48854d22018-04-25 13:05:26 -07001258 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08001259 "chand=%p calld=%p: starting %" PRIuPTR
1260 " pending batches on subchannel_call=%p",
1261 chand, calld, num_batches, calld->subchannel_call);
1262 }
Mark D. Roth817d28f2018-06-14 09:44:58 -07001263 grpc_core::CallCombinerClosureList closures;
Mark D. Roth718c8342018-02-28 13:00:04 -08001264 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1265 pending_batch* pending = &calld->pending_batches[i];
1266 grpc_transport_stream_op_batch* batch = pending->batch;
1267 if (batch != nullptr) {
Mark D. Roth817d28f2018-06-14 09:44:58 -07001268 batch->handler_private.extra_arg = calld->subchannel_call;
1269 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
1270 resume_pending_batch_in_call_combiner, batch,
1271 grpc_schedule_on_exec_ctx);
1272 closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
1273 "pending_batches_resume");
Mark D. Roth718c8342018-02-28 13:00:04 -08001274 pending_batch_clear(calld, pending);
1275 }
1276 }
Mark D. Roth718c8342018-02-28 13:00:04 -08001277 // Note: This will release the call combiner.
Mark D. Roth817d28f2018-06-14 09:44:58 -07001278 closures.RunClosures(calld->call_combiner);
Mark D. Roth718c8342018-02-28 13:00:04 -08001279}
1280
1281static void maybe_clear_pending_batch(grpc_call_element* elem,
1282 pending_batch* pending) {
1283 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1284 call_data* calld = static_cast<call_data*>(elem->call_data);
1285 grpc_transport_stream_op_batch* batch = pending->batch;
1286 // We clear the pending batch if all of its callbacks have been
1287 // scheduled and reset to nullptr.
1288 if (batch->on_complete == nullptr &&
1289 (!batch->recv_initial_metadata ||
1290 batch->payload->recv_initial_metadata.recv_initial_metadata_ready ==
1291 nullptr) &&
1292 (!batch->recv_message ||
Mark D. Roth817d28f2018-06-14 09:44:58 -07001293 batch->payload->recv_message.recv_message_ready == nullptr) &&
1294 (!batch->recv_trailing_metadata ||
1295 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready ==
1296 nullptr)) {
Mark D. Roth718c8342018-02-28 13:00:04 -08001297 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001298 gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand,
Mark D. Roth718c8342018-02-28 13:00:04 -08001299 calld);
1300 }
1301 pending_batch_clear(calld, pending);
1302 }
1303}
1304
Mark D. Roth817d28f2018-06-14 09:44:58 -07001305// Returns a pointer to the first pending batch for which predicate(batch)
1306// returns true, or null if not found.
1307template <typename Predicate>
1308static pending_batch* pending_batch_find(grpc_call_element* elem,
1309 const char* log_message,
1310 Predicate predicate) {
1311 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1312 call_data* calld = static_cast<call_data*>(elem->call_data);
1313 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1314 pending_batch* pending = &calld->pending_batches[i];
1315 grpc_transport_stream_op_batch* batch = pending->batch;
1316 if (batch != nullptr && predicate(batch)) {
1317 if (grpc_client_channel_trace.enabled()) {
1318 gpr_log(GPR_INFO,
1319 "chand=%p calld=%p: %s pending batch at index %" PRIuPTR, chand,
1320 calld, log_message, i);
1321 }
1322 return pending;
1323 }
Mark D. Roth718c8342018-02-28 13:00:04 -08001324 }
Mark D. Roth817d28f2018-06-14 09:44:58 -07001325 return nullptr;
Mark D. Roth718c8342018-02-28 13:00:04 -08001326}
1327
1328//
1329// retry code
1330//
1331
1332// Commits the call so that no further retry attempts will be performed.
1333static void retry_commit(grpc_call_element* elem,
1334 subchannel_call_retry_state* retry_state) {
1335 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1336 call_data* calld = static_cast<call_data*>(elem->call_data);
1337 if (calld->retry_committed) return;
1338 calld->retry_committed = true;
1339 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001340 gpr_log(GPR_INFO, "chand=%p calld=%p: committing retries", chand, calld);
Mark D. Roth718c8342018-02-28 13:00:04 -08001341 }
1342 if (retry_state != nullptr) {
1343 free_cached_send_op_data_after_commit(elem, retry_state);
1344 }
1345}
1346
1347// Starts a retry after appropriate back-off.
1348static void do_retry(grpc_call_element* elem,
1349 subchannel_call_retry_state* retry_state,
1350 grpc_millis server_pushback_ms) {
1351 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1352 call_data* calld = static_cast<call_data*>(elem->call_data);
1353 GPR_ASSERT(calld->method_params != nullptr);
1354 const ClientChannelMethodParams::RetryPolicy* retry_policy =
1355 calld->method_params->retry_policy();
1356 GPR_ASSERT(retry_policy != nullptr);
1357 // Reset subchannel call and connected subchannel.
1358 if (calld->subchannel_call != nullptr) {
1359 GRPC_SUBCHANNEL_CALL_UNREF(calld->subchannel_call,
1360 "client_channel_call_retry");
1361 calld->subchannel_call = nullptr;
1362 }
1363 if (calld->pick.connected_subchannel != nullptr) {
1364 calld->pick.connected_subchannel.reset();
1365 }
1366 // Compute backoff delay.
1367 grpc_millis next_attempt_time;
1368 if (server_pushback_ms >= 0) {
1369 next_attempt_time = grpc_core::ExecCtx::Get()->Now() + server_pushback_ms;
1370 calld->last_attempt_got_server_pushback = true;
1371 } else {
1372 if (calld->num_attempts_completed == 1 ||
1373 calld->last_attempt_got_server_pushback) {
1374 calld->retry_backoff.Init(
1375 grpc_core::BackOff::Options()
1376 .set_initial_backoff(retry_policy->initial_backoff)
1377 .set_multiplier(retry_policy->backoff_multiplier)
1378 .set_jitter(RETRY_BACKOFF_JITTER)
1379 .set_max_backoff(retry_policy->max_backoff));
1380 calld->last_attempt_got_server_pushback = false;
1381 }
1382 next_attempt_time = calld->retry_backoff->NextAttemptTime();
1383 }
1384 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001385 gpr_log(GPR_INFO,
Sree Kuchibhotla1dd12c02018-04-11 18:05:48 -07001386 "chand=%p calld=%p: retrying failed call in %" PRId64 " ms", chand,
Mark D. Roth718c8342018-02-28 13:00:04 -08001387 calld, next_attempt_time - grpc_core::ExecCtx::Get()->Now());
1388 }
1389 // Schedule retry after computed delay.
1390 GRPC_CLOSURE_INIT(&calld->pick_closure, start_pick_locked, elem,
1391 grpc_combiner_scheduler(chand->combiner));
1392 grpc_timer_init(&calld->retry_timer, next_attempt_time, &calld->pick_closure);
1393 // Update bookkeeping.
1394 if (retry_state != nullptr) retry_state->retry_dispatched = true;
1395}
1396
1397// Returns true if the call is being retried.
1398static bool maybe_retry(grpc_call_element* elem,
1399 subchannel_batch_data* batch_data,
1400 grpc_status_code status,
1401 grpc_mdelem* server_pushback_md) {
1402 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1403 call_data* calld = static_cast<call_data*>(elem->call_data);
1404 // Get retry policy.
1405 if (calld->method_params == nullptr) return false;
1406 const ClientChannelMethodParams::RetryPolicy* retry_policy =
1407 calld->method_params->retry_policy();
1408 if (retry_policy == nullptr) return false;
1409 // If we've already dispatched a retry from this call, return true.
1410 // This catches the case where the batch has multiple callbacks
1411 // (i.e., it includes either recv_message or recv_initial_metadata).
1412 subchannel_call_retry_state* retry_state = nullptr;
1413 if (batch_data != nullptr) {
1414 retry_state = static_cast<subchannel_call_retry_state*>(
1415 grpc_connected_subchannel_call_get_parent_data(
1416 batch_data->subchannel_call));
1417 if (retry_state->retry_dispatched) {
1418 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001419 gpr_log(GPR_INFO, "chand=%p calld=%p: retry already dispatched", chand,
Mark D. Roth718c8342018-02-28 13:00:04 -08001420 calld);
1421 }
1422 return true;
1423 }
1424 }
1425 // Check status.
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07001426 if (GPR_LIKELY(status == GRPC_STATUS_OK)) {
Mark D. Roth9db86fc2018-03-28 07:42:20 -07001427 if (calld->retry_throttle_data != nullptr) {
1428 calld->retry_throttle_data->RecordSuccess();
1429 }
Mark D. Roth718c8342018-02-28 13:00:04 -08001430 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001431 gpr_log(GPR_INFO, "chand=%p calld=%p: call succeeded", chand, calld);
Mark D. Roth718c8342018-02-28 13:00:04 -08001432 }
1433 return false;
1434 }
1435 // Status is not OK. Check whether the status is retryable.
1436 if (!retry_policy->retryable_status_codes.Contains(status)) {
1437 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001438 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08001439 "chand=%p calld=%p: status %s not configured as retryable", chand,
1440 calld, grpc_status_code_to_string(status));
1441 }
1442 return false;
1443 }
1444 // Record the failure and check whether retries are throttled.
1445 // Note that it's important for this check to come after the status
1446 // code check above, since we should only record failures whose statuses
1447 // match the configured retryable status codes, so that we don't count
1448 // things like failures due to malformed requests (INVALID_ARGUMENT).
1449 // Conversely, it's important for this to come before the remaining
1450 // checks, so that we don't fail to record failures due to other factors.
Mark D. Roth9db86fc2018-03-28 07:42:20 -07001451 if (calld->retry_throttle_data != nullptr &&
1452 !calld->retry_throttle_data->RecordFailure()) {
Mark D. Roth718c8342018-02-28 13:00:04 -08001453 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001454 gpr_log(GPR_INFO, "chand=%p calld=%p: retries throttled", chand, calld);
Mark D. Roth718c8342018-02-28 13:00:04 -08001455 }
1456 return false;
1457 }
1458 // Check whether the call is committed.
1459 if (calld->retry_committed) {
1460 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001461 gpr_log(GPR_INFO, "chand=%p calld=%p: retries already committed", chand,
Mark D. Roth718c8342018-02-28 13:00:04 -08001462 calld);
1463 }
1464 return false;
1465 }
1466 // Check whether we have retries remaining.
1467 ++calld->num_attempts_completed;
1468 if (calld->num_attempts_completed >= retry_policy->max_attempts) {
1469 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001470 gpr_log(GPR_INFO, "chand=%p calld=%p: exceeded %d retry attempts", chand,
Mark D. Roth718c8342018-02-28 13:00:04 -08001471 calld, retry_policy->max_attempts);
1472 }
1473 return false;
1474 }
1475 // If the call was cancelled from the surface, don't retry.
1476 if (calld->cancel_error != GRPC_ERROR_NONE) {
1477 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001478 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08001479 "chand=%p calld=%p: call cancelled from surface, not retrying",
1480 chand, calld);
1481 }
1482 return false;
1483 }
1484 // Check server push-back.
1485 grpc_millis server_pushback_ms = -1;
1486 if (server_pushback_md != nullptr) {
1487 // If the value is "-1" or any other unparseable string, we do not retry.
1488 uint32_t ms;
1489 if (!grpc_parse_slice_to_uint32(GRPC_MDVALUE(*server_pushback_md), &ms)) {
1490 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001491 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08001492 "chand=%p calld=%p: not retrying due to server push-back",
1493 chand, calld);
1494 }
1495 return false;
1496 } else {
1497 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001498 gpr_log(GPR_INFO, "chand=%p calld=%p: server push-back: retry in %u ms",
1499 chand, calld, ms);
Mark D. Roth718c8342018-02-28 13:00:04 -08001500 }
1501 server_pushback_ms = (grpc_millis)ms;
1502 }
1503 }
1504 do_retry(elem, retry_state, server_pushback_ms);
1505 return true;
1506}
1507
1508//
1509// subchannel_batch_data
1510//
1511
Mark D. Roth817d28f2018-06-14 09:44:58 -07001512// Creates a subchannel_batch_data object on the call's arena with the
1513// specified refcount. If set_on_complete is true, the batch's
1514// on_complete callback will be set to point to on_complete();
1515// otherwise, the batch's on_complete callback will be null.
Mark D. Roth718c8342018-02-28 13:00:04 -08001516static subchannel_batch_data* batch_data_create(grpc_call_element* elem,
Mark D. Roth817d28f2018-06-14 09:44:58 -07001517 int refcount,
1518 bool set_on_complete) {
Mark D. Roth718c8342018-02-28 13:00:04 -08001519 call_data* calld = static_cast<call_data*>(elem->call_data);
1520 subchannel_call_retry_state* retry_state =
1521 static_cast<subchannel_call_retry_state*>(
1522 grpc_connected_subchannel_call_get_parent_data(
1523 calld->subchannel_call));
1524 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(
1525 gpr_arena_alloc(calld->arena, sizeof(*batch_data)));
1526 batch_data->elem = elem;
1527 batch_data->subchannel_call =
1528 GRPC_SUBCHANNEL_CALL_REF(calld->subchannel_call, "batch_data_create");
1529 batch_data->batch.payload = &retry_state->batch_payload;
1530 gpr_ref_init(&batch_data->refs, refcount);
Mark D. Roth817d28f2018-06-14 09:44:58 -07001531 if (set_on_complete) {
1532 GRPC_CLOSURE_INIT(&batch_data->on_complete, on_complete, batch_data,
1533 grpc_schedule_on_exec_ctx);
1534 batch_data->batch.on_complete = &batch_data->on_complete;
1535 }
Mark D. Roth718c8342018-02-28 13:00:04 -08001536 GRPC_CALL_STACK_REF(calld->owning_call, "batch_data");
1537 return batch_data;
1538}
1539
1540static void batch_data_unref(subchannel_batch_data* batch_data) {
1541 if (gpr_unref(&batch_data->refs)) {
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07001542 subchannel_call_retry_state* retry_state =
1543 static_cast<subchannel_call_retry_state*>(
1544 grpc_connected_subchannel_call_get_parent_data(
1545 batch_data->subchannel_call));
1546 if (batch_data->batch.send_initial_metadata) {
1547 grpc_metadata_batch_destroy(&retry_state->send_initial_metadata);
Mark D. Roth718c8342018-02-28 13:00:04 -08001548 }
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07001549 if (batch_data->batch.send_trailing_metadata) {
1550 grpc_metadata_batch_destroy(&retry_state->send_trailing_metadata);
Mark D. Roth718c8342018-02-28 13:00:04 -08001551 }
1552 if (batch_data->batch.recv_initial_metadata) {
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07001553 grpc_metadata_batch_destroy(&retry_state->recv_initial_metadata);
Mark D. Roth718c8342018-02-28 13:00:04 -08001554 }
1555 if (batch_data->batch.recv_trailing_metadata) {
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07001556 grpc_metadata_batch_destroy(&retry_state->recv_trailing_metadata);
Mark D. Roth718c8342018-02-28 13:00:04 -08001557 }
1558 GRPC_SUBCHANNEL_CALL_UNREF(batch_data->subchannel_call, "batch_data_unref");
1559 call_data* calld = static_cast<call_data*>(batch_data->elem->call_data);
1560 GRPC_CALL_STACK_UNREF(calld->owning_call, "batch_data");
1561 }
1562}
1563
1564//
1565// recv_initial_metadata callback handling
1566//
1567
1568// Invokes recv_initial_metadata_ready for a subchannel batch.
1569static void invoke_recv_initial_metadata_callback(void* arg,
1570 grpc_error* error) {
1571 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
Mark D. Roth718c8342018-02-28 13:00:04 -08001572 // Find pending batch.
Mark D. Roth817d28f2018-06-14 09:44:58 -07001573 pending_batch* pending = pending_batch_find(
1574 batch_data->elem, "invoking recv_initial_metadata_ready for",
1575 [](grpc_transport_stream_op_batch* batch) {
1576 return batch->recv_initial_metadata &&
1577 batch->payload->recv_initial_metadata
1578 .recv_initial_metadata_ready != nullptr;
1579 });
Mark D. Roth718c8342018-02-28 13:00:04 -08001580 GPR_ASSERT(pending != nullptr);
1581 // Return metadata.
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07001582 subchannel_call_retry_state* retry_state =
1583 static_cast<subchannel_call_retry_state*>(
1584 grpc_connected_subchannel_call_get_parent_data(
1585 batch_data->subchannel_call));
Mark D. Roth718c8342018-02-28 13:00:04 -08001586 grpc_metadata_batch_move(
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07001587 &retry_state->recv_initial_metadata,
Mark D. Roth718c8342018-02-28 13:00:04 -08001588 pending->batch->payload->recv_initial_metadata.recv_initial_metadata);
1589 // Update bookkeeping.
1590 // Note: Need to do this before invoking the callback, since invoking
1591 // the callback will result in yielding the call combiner.
1592 grpc_closure* recv_initial_metadata_ready =
1593 pending->batch->payload->recv_initial_metadata
1594 .recv_initial_metadata_ready;
1595 pending->batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
1596 nullptr;
1597 maybe_clear_pending_batch(batch_data->elem, pending);
1598 batch_data_unref(batch_data);
1599 // Invoke callback.
1600 GRPC_CLOSURE_RUN(recv_initial_metadata_ready, GRPC_ERROR_REF(error));
1601}
1602
1603// Intercepts recv_initial_metadata_ready callback for retries.
1604// Commits the call and returns the initial metadata up the stack.
1605static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
1606 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1607 grpc_call_element* elem = batch_data->elem;
1608 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1609 call_data* calld = static_cast<call_data*>(elem->call_data);
1610 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001611 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08001612 "chand=%p calld=%p: got recv_initial_metadata_ready, error=%s",
1613 chand, calld, grpc_error_string(error));
1614 }
1615 subchannel_call_retry_state* retry_state =
1616 static_cast<subchannel_call_retry_state*>(
1617 grpc_connected_subchannel_call_get_parent_data(
1618 batch_data->subchannel_call));
Mark D. Roth817d28f2018-06-14 09:44:58 -07001619 retry_state->completed_recv_initial_metadata = true;
1620 // If a retry was already dispatched, then we're not going to use the
1621 // result of this recv_initial_metadata op, so do nothing.
1622 if (retry_state->retry_dispatched) {
1623 GRPC_CALL_COMBINER_STOP(
1624 calld->call_combiner,
1625 "recv_initial_metadata_ready after retry dispatched");
1626 return;
1627 }
Mark D. Roth718c8342018-02-28 13:00:04 -08001628 // If we got an error or a Trailers-Only response and have not yet gotten
Mark D. Roth817d28f2018-06-14 09:44:58 -07001629 // the recv_trailing_metadata_ready callback, then defer propagating this
1630 // callback back to the surface. We can evaluate whether to retry when
1631 // recv_trailing_metadata comes back.
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07001632 if (GPR_UNLIKELY((retry_state->trailing_metadata_available ||
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07001633 error != GRPC_ERROR_NONE) &&
1634 !retry_state->completed_recv_trailing_metadata)) {
Mark D. Roth718c8342018-02-28 13:00:04 -08001635 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001636 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08001637 "chand=%p calld=%p: deferring recv_initial_metadata_ready "
1638 "(Trailers-Only)",
1639 chand, calld);
1640 }
Mark D. Rothde077ac2018-04-12 08:05:44 -07001641 retry_state->recv_initial_metadata_ready_deferred_batch = batch_data;
Mark D. Roth718c8342018-02-28 13:00:04 -08001642 retry_state->recv_initial_metadata_error = GRPC_ERROR_REF(error);
1643 if (!retry_state->started_recv_trailing_metadata) {
1644 // recv_trailing_metadata not yet started by application; start it
1645 // ourselves to get status.
1646 start_internal_recv_trailing_metadata(elem);
1647 } else {
1648 GRPC_CALL_COMBINER_STOP(
1649 calld->call_combiner,
1650 "recv_initial_metadata_ready trailers-only or error");
1651 }
1652 return;
1653 }
1654 // Received valid initial metadata, so commit the call.
1655 retry_commit(elem, retry_state);
Mark D. Roth817d28f2018-06-14 09:44:58 -07001656 // Invoke the callback to return the result to the surface.
Mark D. Roth718c8342018-02-28 13:00:04 -08001657 // Manually invoking a callback function; it does not take ownership of error.
1658 invoke_recv_initial_metadata_callback(batch_data, error);
Mark D. Roth718c8342018-02-28 13:00:04 -08001659}
1660
1661//
1662// recv_message callback handling
1663//
1664
1665// Invokes recv_message_ready for a subchannel batch.
1666static void invoke_recv_message_callback(void* arg, grpc_error* error) {
1667 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
Mark D. Roth718c8342018-02-28 13:00:04 -08001668 // Find pending op.
Mark D. Roth817d28f2018-06-14 09:44:58 -07001669 pending_batch* pending = pending_batch_find(
1670 batch_data->elem, "invoking recv_message_ready for",
1671 [](grpc_transport_stream_op_batch* batch) {
1672 return batch->recv_message &&
1673 batch->payload->recv_message.recv_message_ready != nullptr;
1674 });
Mark D. Roth718c8342018-02-28 13:00:04 -08001675 GPR_ASSERT(pending != nullptr);
1676 // Return payload.
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07001677 subchannel_call_retry_state* retry_state =
1678 static_cast<subchannel_call_retry_state*>(
1679 grpc_connected_subchannel_call_get_parent_data(
1680 batch_data->subchannel_call));
Mark D. Roth718c8342018-02-28 13:00:04 -08001681 *pending->batch->payload->recv_message.recv_message =
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07001682 std::move(retry_state->recv_message);
Mark D. Roth718c8342018-02-28 13:00:04 -08001683 // Update bookkeeping.
1684 // Note: Need to do this before invoking the callback, since invoking
1685 // the callback will result in yielding the call combiner.
1686 grpc_closure* recv_message_ready =
1687 pending->batch->payload->recv_message.recv_message_ready;
1688 pending->batch->payload->recv_message.recv_message_ready = nullptr;
1689 maybe_clear_pending_batch(batch_data->elem, pending);
1690 batch_data_unref(batch_data);
1691 // Invoke callback.
1692 GRPC_CLOSURE_RUN(recv_message_ready, GRPC_ERROR_REF(error));
1693}
1694
1695// Intercepts recv_message_ready callback for retries.
1696// Commits the call and returns the message up the stack.
1697static void recv_message_ready(void* arg, grpc_error* error) {
1698 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1699 grpc_call_element* elem = batch_data->elem;
1700 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1701 call_data* calld = static_cast<call_data*>(elem->call_data);
1702 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001703 gpr_log(GPR_INFO, "chand=%p calld=%p: got recv_message_ready, error=%s",
Mark D. Roth718c8342018-02-28 13:00:04 -08001704 chand, calld, grpc_error_string(error));
1705 }
1706 subchannel_call_retry_state* retry_state =
1707 static_cast<subchannel_call_retry_state*>(
1708 grpc_connected_subchannel_call_get_parent_data(
1709 batch_data->subchannel_call));
Mark D. Roth817d28f2018-06-14 09:44:58 -07001710 ++retry_state->completed_recv_message_count;
1711 // If a retry was already dispatched, then we're not going to use the
1712 // result of this recv_message op, so do nothing.
1713 if (retry_state->retry_dispatched) {
1714 GRPC_CALL_COMBINER_STOP(calld->call_combiner,
1715 "recv_message_ready after retry dispatched");
1716 return;
1717 }
Mark D. Roth718c8342018-02-28 13:00:04 -08001718 // If we got an error or the payload was nullptr and we have not yet gotten
Mark D. Roth817d28f2018-06-14 09:44:58 -07001719 // the recv_trailing_metadata_ready callback, then defer propagating this
1720 // callback back to the surface. We can evaluate whether to retry when
1721 // recv_trailing_metadata comes back.
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07001722 if (GPR_UNLIKELY(
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07001723 (retry_state->recv_message == nullptr || error != GRPC_ERROR_NONE) &&
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07001724 !retry_state->completed_recv_trailing_metadata)) {
Mark D. Roth718c8342018-02-28 13:00:04 -08001725 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001726 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08001727 "chand=%p calld=%p: deferring recv_message_ready (nullptr "
1728 "message and recv_trailing_metadata pending)",
1729 chand, calld);
1730 }
Mark D. Rothde077ac2018-04-12 08:05:44 -07001731 retry_state->recv_message_ready_deferred_batch = batch_data;
Mark D. Roth718c8342018-02-28 13:00:04 -08001732 retry_state->recv_message_error = GRPC_ERROR_REF(error);
1733 if (!retry_state->started_recv_trailing_metadata) {
1734 // recv_trailing_metadata not yet started by application; start it
1735 // ourselves to get status.
1736 start_internal_recv_trailing_metadata(elem);
1737 } else {
1738 GRPC_CALL_COMBINER_STOP(calld->call_combiner, "recv_message_ready null");
1739 }
1740 return;
1741 }
1742 // Received a valid message, so commit the call.
1743 retry_commit(elem, retry_state);
Mark D. Roth817d28f2018-06-14 09:44:58 -07001744 // Invoke the callback to return the result to the surface.
Mark D. Roth718c8342018-02-28 13:00:04 -08001745 // Manually invoking a callback function; it does not take ownership of error.
1746 invoke_recv_message_callback(batch_data, error);
Mark D. Roth8ae0ff02018-06-08 08:37:23 -07001747}
1748
Mark D. Rothf3715132018-06-08 14:22:12 -07001749//
Mark D. Roth817d28f2018-06-14 09:44:58 -07001750// recv_trailing_metadata handling
Mark D. Rothf3715132018-06-08 14:22:12 -07001751//
1752
Mark D. Roth817d28f2018-06-14 09:44:58 -07001753// Sets *status and *server_pushback_md based on batch_data and error.
1754static void get_call_status(subchannel_batch_data* batch_data,
1755 grpc_error* error, grpc_status_code* status,
1756 grpc_mdelem** server_pushback_md) {
1757 grpc_call_element* elem = batch_data->elem;
1758 call_data* calld = static_cast<call_data*>(elem->call_data);
1759 if (error != GRPC_ERROR_NONE) {
1760 grpc_error_get_status(error, calld->deadline, status, nullptr, nullptr,
1761 nullptr);
1762 } else {
1763 grpc_metadata_batch* md_batch =
1764 batch_data->batch.payload->recv_trailing_metadata
1765 .recv_trailing_metadata;
1766 GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
1767 *status =
1768 grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md);
1769 if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
1770 *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
1771 }
1772 }
1773 GRPC_ERROR_UNREF(error);
1774}
Mark D. Rothf3715132018-06-08 14:22:12 -07001775
Mark D. Roth817d28f2018-06-14 09:44:58 -07001776// Adds recv_trailing_metadata_ready closure to closures.
1777static void add_closure_for_recv_trailing_metadata_ready(
1778 grpc_call_element* elem, subchannel_batch_data* batch_data,
1779 grpc_error* error, grpc_core::CallCombinerClosureList* closures) {
1780 // Find pending batch.
1781 pending_batch* pending = pending_batch_find(
1782 elem, "invoking recv_trailing_metadata for",
1783 [](grpc_transport_stream_op_batch* batch) {
1784 return batch->recv_trailing_metadata &&
1785 batch->payload->recv_trailing_metadata
1786 .recv_trailing_metadata_ready != nullptr;
1787 });
1788 // If we generated the recv_trailing_metadata op internally via
1789 // start_internal_recv_trailing_metadata(), then there will be no
1790 // pending batch.
1791 if (pending == nullptr) {
1792 GRPC_ERROR_UNREF(error);
1793 return;
1794 }
1795 // Return metadata.
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07001796 subchannel_call_retry_state* retry_state =
1797 static_cast<subchannel_call_retry_state*>(
1798 grpc_connected_subchannel_call_get_parent_data(
1799 batch_data->subchannel_call));
Mark D. Roth817d28f2018-06-14 09:44:58 -07001800 grpc_metadata_batch_move(
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07001801 &retry_state->recv_trailing_metadata,
Mark D. Roth817d28f2018-06-14 09:44:58 -07001802 pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata);
1803 // Add closure.
1804 closures->Add(pending->batch->payload->recv_trailing_metadata
1805 .recv_trailing_metadata_ready,
1806 error, "recv_trailing_metadata_ready for pending batch");
1807 // Update bookkeeping.
1808 pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1809 nullptr;
1810 maybe_clear_pending_batch(elem, pending);
1811}
1812
1813// Adds any necessary closures for deferred recv_initial_metadata and
1814// recv_message callbacks to closures.
1815static void add_closures_for_deferred_recv_callbacks(
1816 subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state,
1817 grpc_core::CallCombinerClosureList* closures) {
1818 if (batch_data->batch.recv_trailing_metadata) {
1819 // Add closure for deferred recv_initial_metadata_ready.
1820 if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch !=
1821 nullptr)) {
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07001822 GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
Mark D. Roth817d28f2018-06-14 09:44:58 -07001823 invoke_recv_initial_metadata_callback,
1824 retry_state->recv_initial_metadata_ready_deferred_batch,
1825 grpc_schedule_on_exec_ctx);
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07001826 closures->Add(&retry_state->recv_initial_metadata_ready,
Mark D. Roth817d28f2018-06-14 09:44:58 -07001827 retry_state->recv_initial_metadata_error,
1828 "resuming recv_initial_metadata_ready");
1829 retry_state->recv_initial_metadata_ready_deferred_batch = nullptr;
1830 }
1831 // Add closure for deferred recv_message_ready.
1832 if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch !=
1833 nullptr)) {
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07001834 GRPC_CLOSURE_INIT(&retry_state->recv_message_ready,
Mark D. Roth817d28f2018-06-14 09:44:58 -07001835 invoke_recv_message_callback,
1836 retry_state->recv_message_ready_deferred_batch,
1837 grpc_schedule_on_exec_ctx);
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07001838 closures->Add(&retry_state->recv_message_ready,
Mark D. Roth817d28f2018-06-14 09:44:58 -07001839 retry_state->recv_message_error,
1840 "resuming recv_message_ready");
1841 retry_state->recv_message_ready_deferred_batch = nullptr;
1842 }
1843 }
1844}
1845
1846// Returns true if any op in the batch was not yet started.
1847// Only looks at send ops, since recv ops are always started immediately.
1848static bool pending_batch_is_unstarted(
1849 pending_batch* pending, call_data* calld,
1850 subchannel_call_retry_state* retry_state) {
1851 if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
1852 return false;
1853 }
1854 if (pending->batch->send_initial_metadata &&
1855 !retry_state->started_send_initial_metadata) {
1856 return true;
1857 }
1858 if (pending->batch->send_message &&
1859 retry_state->started_send_message_count < calld->send_messages->size()) {
1860 return true;
1861 }
1862 if (pending->batch->send_trailing_metadata &&
1863 !retry_state->started_send_trailing_metadata) {
1864 return true;
1865 }
1866 return false;
1867}
1868
1869// For any pending batch containing an op that has not yet been started,
1870// adds the pending batch's completion closures to closures.
1871static void add_closures_to_fail_unstarted_pending_batches(
1872 grpc_call_element* elem, subchannel_call_retry_state* retry_state,
1873 grpc_error* error, grpc_core::CallCombinerClosureList* closures) {
Mark D. Roth8ae0ff02018-06-08 08:37:23 -07001874 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1875 call_data* calld = static_cast<call_data*>(elem->call_data);
Mark D. Roth817d28f2018-06-14 09:44:58 -07001876 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1877 pending_batch* pending = &calld->pending_batches[i];
1878 if (pending_batch_is_unstarted(pending, calld, retry_state)) {
Mark D. Rothf3715132018-06-08 14:22:12 -07001879 if (grpc_client_channel_trace.enabled()) {
1880 gpr_log(GPR_INFO,
Mark D. Roth817d28f2018-06-14 09:44:58 -07001881 "chand=%p calld=%p: failing unstarted pending batch at index "
1882 "%" PRIuPTR,
1883 chand, calld, i);
Mark D. Rothf3715132018-06-08 14:22:12 -07001884 }
Mark D. Roth817d28f2018-06-14 09:44:58 -07001885 closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error),
1886 "failing on_complete for pending batch");
1887 pending->batch->on_complete = nullptr;
1888 maybe_clear_pending_batch(elem, pending);
Mark D. Rothf3715132018-06-08 14:22:12 -07001889 }
Mark D. Roth8ae0ff02018-06-08 08:37:23 -07001890 }
Mark D. Roth817d28f2018-06-14 09:44:58 -07001891 GRPC_ERROR_UNREF(error);
1892}
1893
1894// Runs necessary closures upon completion of a call attempt.
1895static void run_closures_for_completed_call(subchannel_batch_data* batch_data,
1896 grpc_error* error) {
1897 grpc_call_element* elem = batch_data->elem;
1898 call_data* calld = static_cast<call_data*>(elem->call_data);
1899 subchannel_call_retry_state* retry_state =
1900 static_cast<subchannel_call_retry_state*>(
1901 grpc_connected_subchannel_call_get_parent_data(
1902 batch_data->subchannel_call));
1903 // Construct list of closures to execute.
1904 grpc_core::CallCombinerClosureList closures;
1905 // First, add closure for recv_trailing_metadata_ready.
1906 add_closure_for_recv_trailing_metadata_ready(
1907 elem, batch_data, GRPC_ERROR_REF(error), &closures);
1908 // If there are deferred recv_initial_metadata_ready or recv_message_ready
1909 // callbacks, add them to closures.
1910 add_closures_for_deferred_recv_callbacks(batch_data, retry_state, &closures);
1911 // Add closures to fail any pending batches that have not yet been started.
1912 add_closures_to_fail_unstarted_pending_batches(
1913 elem, retry_state, GRPC_ERROR_REF(error), &closures);
1914 // Don't need batch_data anymore.
1915 batch_data_unref(batch_data);
1916 // Schedule all of the closures identified above.
1917 // Note: This will release the call combiner.
1918 closures.RunClosures(calld->call_combiner);
1919 GRPC_ERROR_UNREF(error);
1920}
1921
1922// Intercepts recv_trailing_metadata_ready callback for retries.
1923// Commits the call and returns the trailing metadata up the stack.
1924static void recv_trailing_metadata_ready(void* arg, grpc_error* error) {
1925 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1926 grpc_call_element* elem = batch_data->elem;
1927 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1928 call_data* calld = static_cast<call_data*>(elem->call_data);
1929 if (grpc_client_channel_trace.enabled()) {
1930 gpr_log(GPR_INFO,
1931 "chand=%p calld=%p: got recv_trailing_metadata_ready, error=%s",
1932 chand, calld, grpc_error_string(error));
1933 }
1934 subchannel_call_retry_state* retry_state =
1935 static_cast<subchannel_call_retry_state*>(
1936 grpc_connected_subchannel_call_get_parent_data(
1937 batch_data->subchannel_call));
1938 retry_state->completed_recv_trailing_metadata = true;
1939 // Get the call's status and check for server pushback metadata.
1940 grpc_status_code status = GRPC_STATUS_OK;
1941 grpc_mdelem* server_pushback_md = nullptr;
1942 get_call_status(batch_data, GRPC_ERROR_REF(error), &status,
1943 &server_pushback_md);
1944 if (grpc_client_channel_trace.enabled()) {
1945 gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand,
1946 calld, grpc_status_code_to_string(status));
1947 }
1948 // Check if we should retry.
1949 if (maybe_retry(elem, batch_data, status, server_pushback_md)) {
1950 // Unref batch_data for deferred recv_initial_metadata_ready or
1951 // recv_message_ready callbacks, if any.
1952 if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) {
1953 batch_data_unref(batch_data);
1954 GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
1955 }
1956 if (retry_state->recv_message_ready_deferred_batch != nullptr) {
1957 batch_data_unref(batch_data);
1958 GRPC_ERROR_UNREF(retry_state->recv_message_error);
1959 }
1960 batch_data_unref(batch_data);
1961 return;
1962 }
1963 // Not retrying, so commit the call.
1964 retry_commit(elem, retry_state);
1965 // Run any necessary closures.
1966 run_closures_for_completed_call(batch_data, GRPC_ERROR_REF(error));
Mark D. Rothde077ac2018-04-12 08:05:44 -07001967}
1968
1969//
Mark D. Roth718c8342018-02-28 13:00:04 -08001970// on_complete callback handling
1971//
1972
Mark D. Roth817d28f2018-06-14 09:44:58 -07001973// Adds the on_complete closure for the pending batch completed in
1974// batch_data to closures.
1975static void add_closure_for_completed_pending_batch(
1976 grpc_call_element* elem, subchannel_batch_data* batch_data,
1977 subchannel_call_retry_state* retry_state, grpc_error* error,
1978 grpc_core::CallCombinerClosureList* closures) {
1979 pending_batch* pending = pending_batch_find(
1980 elem, "completed", [batch_data](grpc_transport_stream_op_batch* batch) {
1981 // Match the pending batch with the same set of send ops as the
1982 // subchannel batch we've just completed.
1983 return batch->on_complete != nullptr &&
1984 batch_data->batch.send_initial_metadata ==
1985 batch->send_initial_metadata &&
1986 batch_data->batch.send_message == batch->send_message &&
1987 batch_data->batch.send_trailing_metadata ==
1988 batch->send_trailing_metadata;
1989 });
1990 // If batch_data is a replay batch, then there will be no pending
1991 // batch to complete.
1992 if (pending == nullptr) {
1993 GRPC_ERROR_UNREF(error);
1994 return;
Mark D. Roth718c8342018-02-28 13:00:04 -08001995 }
Mark D. Roth817d28f2018-06-14 09:44:58 -07001996 // Add closure.
1997 closures->Add(pending->batch->on_complete, error,
1998 "on_complete for pending batch");
1999 pending->batch->on_complete = nullptr;
2000 maybe_clear_pending_batch(elem, pending);
Mark D. Roth718c8342018-02-28 13:00:04 -08002001}
2002
2003// If there are any cached ops to replay or pending ops to start on the
2004// subchannel call, adds a closure to closures to invoke
Mark D. Roth817d28f2018-06-14 09:44:58 -07002005// start_retriable_subchannel_batches().
Mark D. Roth718c8342018-02-28 13:00:04 -08002006static void add_closures_for_replay_or_pending_send_ops(
2007 grpc_call_element* elem, subchannel_batch_data* batch_data,
Mark D. Roth817d28f2018-06-14 09:44:58 -07002008 subchannel_call_retry_state* retry_state,
2009 grpc_core::CallCombinerClosureList* closures) {
Mark D. Roth718c8342018-02-28 13:00:04 -08002010 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2011 call_data* calld = static_cast<call_data*>(elem->call_data);
2012 bool have_pending_send_message_ops =
Mark D. Rothefcd45b2018-03-28 10:49:59 -07002013 retry_state->started_send_message_count < calld->send_messages->size();
Mark D. Roth718c8342018-02-28 13:00:04 -08002014 bool have_pending_send_trailing_metadata_op =
2015 calld->seen_send_trailing_metadata &&
2016 !retry_state->started_send_trailing_metadata;
2017 if (!have_pending_send_message_ops &&
2018 !have_pending_send_trailing_metadata_op) {
2019 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
2020 pending_batch* pending = &calld->pending_batches[i];
2021 grpc_transport_stream_op_batch* batch = pending->batch;
2022 if (batch == nullptr || pending->send_ops_cached) continue;
2023 if (batch->send_message) have_pending_send_message_ops = true;
2024 if (batch->send_trailing_metadata) {
2025 have_pending_send_trailing_metadata_op = true;
2026 }
2027 }
2028 }
2029 if (have_pending_send_message_ops || have_pending_send_trailing_metadata_op) {
2030 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07002031 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08002032 "chand=%p calld=%p: starting next batch for pending send op(s)",
2033 chand, calld);
2034 }
Mark D. Roth817d28f2018-06-14 09:44:58 -07002035 GRPC_CLOSURE_INIT(&batch_data->batch.handler_private.closure,
2036 start_retriable_subchannel_batches, elem,
2037 grpc_schedule_on_exec_ctx);
2038 closures->Add(&batch_data->batch.handler_private.closure, GRPC_ERROR_NONE,
2039 "starting next batch for send_* op(s)");
Mark D. Roth718c8342018-02-28 13:00:04 -08002040 }
2041}
2042
Mark D. Roth718c8342018-02-28 13:00:04 -08002043// Callback used to intercept on_complete from subchannel calls.
2044// Called only when retries are enabled.
2045static void on_complete(void* arg, grpc_error* error) {
2046 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
2047 grpc_call_element* elem = batch_data->elem;
2048 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2049 call_data* calld = static_cast<call_data*>(elem->call_data);
2050 if (grpc_client_channel_trace.enabled()) {
2051 char* batch_str = grpc_transport_stream_op_batch_string(&batch_data->batch);
Mark D. Roth48854d22018-04-25 13:05:26 -07002052 gpr_log(GPR_INFO, "chand=%p calld=%p: got on_complete, error=%s, batch=%s",
Mark D. Roth718c8342018-02-28 13:00:04 -08002053 chand, calld, grpc_error_string(error), batch_str);
2054 gpr_free(batch_str);
2055 }
2056 subchannel_call_retry_state* retry_state =
2057 static_cast<subchannel_call_retry_state*>(
2058 grpc_connected_subchannel_call_get_parent_data(
2059 batch_data->subchannel_call));
Mark D. Roth718c8342018-02-28 13:00:04 -08002060 // Update bookkeeping in retry_state.
Mark D. Roth817d28f2018-06-14 09:44:58 -07002061 if (batch_data->batch.send_initial_metadata) {
2062 retry_state->completed_send_initial_metadata = true;
Mark D. Roth718c8342018-02-28 13:00:04 -08002063 }
Mark D. Roth817d28f2018-06-14 09:44:58 -07002064 if (batch_data->batch.send_message) {
2065 ++retry_state->completed_send_message_count;
2066 }
2067 if (batch_data->batch.send_trailing_metadata) {
2068 retry_state->completed_send_trailing_metadata = true;
2069 }
2070 // If the call is committed, free cached data for send ops that we've just
2071 // completed.
2072 if (calld->retry_committed) {
Mark D. Roth718c8342018-02-28 13:00:04 -08002073 free_cached_send_op_data_for_completed_batch(elem, batch_data, retry_state);
2074 }
Mark D. Roth718c8342018-02-28 13:00:04 -08002075 // Construct list of closures to execute.
Mark D. Roth817d28f2018-06-14 09:44:58 -07002076 grpc_core::CallCombinerClosureList closures;
2077 // If a retry was already dispatched, that means we saw
2078 // recv_trailing_metadata before this, so we do nothing here.
2079 // Otherwise, invoke the callback to return the result to the surface.
2080 if (!retry_state->retry_dispatched) {
2081 // Add closure for the completed pending batch, if any.
2082 add_closure_for_completed_pending_batch(elem, batch_data, retry_state,
2083 GRPC_ERROR_REF(error), &closures);
2084 // If needed, add a callback to start any replay or pending send ops on
2085 // the subchannel call.
2086 if (!retry_state->completed_recv_trailing_metadata) {
2087 add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state,
2088 &closures);
2089 }
Mark D. Roth718c8342018-02-28 13:00:04 -08002090 }
Mark D. Roth4f9e0032018-05-24 09:30:09 -07002091 // Track number of pending subchannel send batches and determine if this
2092 // was the last one.
Mark D. Roth817d28f2018-06-14 09:44:58 -07002093 --calld->num_pending_retriable_subchannel_send_batches;
2094 const bool last_send_batch_complete =
2095 calld->num_pending_retriable_subchannel_send_batches == 0;
Mark D. Roth718c8342018-02-28 13:00:04 -08002096 // Don't need batch_data anymore.
2097 batch_data_unref(batch_data);
2098 // Schedule all of the closures identified above.
Mark D. Roth4f9e0032018-05-24 09:30:09 -07002099 // Note: This yeilds the call combiner.
Mark D. Roth817d28f2018-06-14 09:44:58 -07002100 closures.RunClosures(calld->call_combiner);
2101 // If this was the last subchannel send batch, unref the call stack.
2102 if (last_send_batch_complete) {
Mark D. Roth4f9e0032018-05-24 09:30:09 -07002103 GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches");
2104 }
Mark D. Roth718c8342018-02-28 13:00:04 -08002105}
2106
2107//
2108// subchannel batch construction
2109//
2110
2111// Helper function used to start a subchannel batch in the call combiner.
2112static void start_batch_in_call_combiner(void* arg, grpc_error* ignored) {
2113 grpc_transport_stream_op_batch* batch =
2114 static_cast<grpc_transport_stream_op_batch*>(arg);
2115 grpc_subchannel_call* subchannel_call =
2116 static_cast<grpc_subchannel_call*>(batch->handler_private.extra_arg);
2117 // Note: This will release the call combiner.
2118 grpc_subchannel_call_process_op(subchannel_call, batch);
2119}
2120
Mark D. Rothde077ac2018-04-12 08:05:44 -07002121// Adds a closure to closures that will execute batch in the call combiner.
2122static void add_closure_for_subchannel_batch(
Mark D. Roth817d28f2018-06-14 09:44:58 -07002123 grpc_call_element* elem, grpc_transport_stream_op_batch* batch,
2124 grpc_core::CallCombinerClosureList* closures) {
2125 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2126 call_data* calld = static_cast<call_data*>(elem->call_data);
Mark D. Rothde077ac2018-04-12 08:05:44 -07002127 batch->handler_private.extra_arg = calld->subchannel_call;
2128 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2129 start_batch_in_call_combiner, batch,
2130 grpc_schedule_on_exec_ctx);
Mark D. Rothde077ac2018-04-12 08:05:44 -07002131 if (grpc_client_channel_trace.enabled()) {
2132 char* batch_str = grpc_transport_stream_op_batch_string(batch);
Mark D. Roth817d28f2018-06-14 09:44:58 -07002133 gpr_log(GPR_INFO, "chand=%p calld=%p: starting subchannel batch: %s", chand,
2134 calld, batch_str);
Mark D. Rothde077ac2018-04-12 08:05:44 -07002135 gpr_free(batch_str);
Mark D. Rothde077ac2018-04-12 08:05:44 -07002136 }
Mark D. Roth817d28f2018-06-14 09:44:58 -07002137 closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
2138 "start_subchannel_batch");
Mark D. Rothde077ac2018-04-12 08:05:44 -07002139}
2140
Mark D. Roth718c8342018-02-28 13:00:04 -08002141// Adds retriable send_initial_metadata op to batch_data.
2142static void add_retriable_send_initial_metadata_op(
2143 call_data* calld, subchannel_call_retry_state* retry_state,
2144 subchannel_batch_data* batch_data) {
2145 // Maps the number of retries to the corresponding metadata value slice.
2146 static const grpc_slice* retry_count_strings[] = {
2147 &GRPC_MDSTR_1, &GRPC_MDSTR_2, &GRPC_MDSTR_3, &GRPC_MDSTR_4};
2148 // We need to make a copy of the metadata batch for each attempt, since
2149 // the filters in the subchannel stack may modify this batch, and we don't
2150 // want those modifications to be passed forward to subsequent attempts.
2151 //
2152 // If we've already completed one or more attempts, add the
2153 // grpc-retry-attempts header.
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07002154 retry_state->send_initial_metadata_storage =
Mark D. Roth718c8342018-02-28 13:00:04 -08002155 static_cast<grpc_linked_mdelem*>(gpr_arena_alloc(
2156 calld->arena, sizeof(grpc_linked_mdelem) *
2157 (calld->send_initial_metadata.list.count +
2158 (calld->num_attempts_completed > 0))));
2159 grpc_metadata_batch_copy(&calld->send_initial_metadata,
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07002160 &retry_state->send_initial_metadata,
2161 retry_state->send_initial_metadata_storage);
2162 if (GPR_UNLIKELY(retry_state->send_initial_metadata.idx.named
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07002163 .grpc_previous_rpc_attempts != nullptr)) {
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07002164 grpc_metadata_batch_remove(&retry_state->send_initial_metadata,
2165 retry_state->send_initial_metadata.idx.named
2166 .grpc_previous_rpc_attempts);
Mark D. Roth718c8342018-02-28 13:00:04 -08002167 }
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07002168 if (GPR_UNLIKELY(calld->num_attempts_completed > 0)) {
Mark D. Roth718c8342018-02-28 13:00:04 -08002169 grpc_mdelem retry_md = grpc_mdelem_from_slices(
2170 GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS,
2171 *retry_count_strings[calld->num_attempts_completed - 1]);
2172 grpc_error* error = grpc_metadata_batch_add_tail(
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07002173 &retry_state->send_initial_metadata,
2174 &retry_state->send_initial_metadata_storage[calld->send_initial_metadata
2175 .list.count],
Mark D. Roth718c8342018-02-28 13:00:04 -08002176 retry_md);
Yash Tibrewal7f51ba82018-04-12 13:21:20 -07002177 if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
Mark D. Roth718c8342018-02-28 13:00:04 -08002178 gpr_log(GPR_ERROR, "error adding retry metadata: %s",
2179 grpc_error_string(error));
2180 GPR_ASSERT(false);
2181 }
2182 }
2183 retry_state->started_send_initial_metadata = true;
2184 batch_data->batch.send_initial_metadata = true;
2185 batch_data->batch.payload->send_initial_metadata.send_initial_metadata =
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07002186 &retry_state->send_initial_metadata;
Mark D. Roth718c8342018-02-28 13:00:04 -08002187 batch_data->batch.payload->send_initial_metadata.send_initial_metadata_flags =
2188 calld->send_initial_metadata_flags;
2189 batch_data->batch.payload->send_initial_metadata.peer_string =
2190 calld->peer_string;
2191}
2192
2193// Adds retriable send_message op to batch_data.
2194static void add_retriable_send_message_op(
2195 grpc_call_element* elem, subchannel_call_retry_state* retry_state,
2196 subchannel_batch_data* batch_data) {
2197 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2198 call_data* calld = static_cast<call_data*>(elem->call_data);
2199 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07002200 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08002201 "chand=%p calld=%p: starting calld->send_messages[%" PRIuPTR "]",
2202 chand, calld, retry_state->started_send_message_count);
2203 }
Mark D. Roth3d8b32d2018-03-09 13:25:40 -08002204 grpc_core::ByteStreamCache* cache =
Mark D. Rothefcd45b2018-03-28 10:49:59 -07002205 (*calld->send_messages)[retry_state->started_send_message_count];
Mark D. Roth718c8342018-02-28 13:00:04 -08002206 ++retry_state->started_send_message_count;
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07002207 retry_state->send_message.Init(cache);
Mark D. Roth718c8342018-02-28 13:00:04 -08002208 batch_data->batch.send_message = true;
Mark D. Roth3d8b32d2018-03-09 13:25:40 -08002209 batch_data->batch.payload->send_message.send_message.reset(
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07002210 retry_state->send_message.get());
Mark D. Roth718c8342018-02-28 13:00:04 -08002211}
2212
2213// Adds retriable send_trailing_metadata op to batch_data.
2214static void add_retriable_send_trailing_metadata_op(
2215 call_data* calld, subchannel_call_retry_state* retry_state,
2216 subchannel_batch_data* batch_data) {
2217 // We need to make a copy of the metadata batch for each attempt, since
2218 // the filters in the subchannel stack may modify this batch, and we don't
2219 // want those modifications to be passed forward to subsequent attempts.
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07002220 retry_state->send_trailing_metadata_storage =
Mark D. Roth718c8342018-02-28 13:00:04 -08002221 static_cast<grpc_linked_mdelem*>(gpr_arena_alloc(
2222 calld->arena, sizeof(grpc_linked_mdelem) *
2223 calld->send_trailing_metadata.list.count));
2224 grpc_metadata_batch_copy(&calld->send_trailing_metadata,
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07002225 &retry_state->send_trailing_metadata,
2226 retry_state->send_trailing_metadata_storage);
Mark D. Roth718c8342018-02-28 13:00:04 -08002227 retry_state->started_send_trailing_metadata = true;
2228 batch_data->batch.send_trailing_metadata = true;
2229 batch_data->batch.payload->send_trailing_metadata.send_trailing_metadata =
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07002230 &retry_state->send_trailing_metadata;
Mark D. Roth718c8342018-02-28 13:00:04 -08002231}
2232
2233// Adds retriable recv_initial_metadata op to batch_data.
2234static void add_retriable_recv_initial_metadata_op(
2235 call_data* calld, subchannel_call_retry_state* retry_state,
2236 subchannel_batch_data* batch_data) {
2237 retry_state->started_recv_initial_metadata = true;
2238 batch_data->batch.recv_initial_metadata = true;
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07002239 grpc_metadata_batch_init(&retry_state->recv_initial_metadata);
Mark D. Roth718c8342018-02-28 13:00:04 -08002240 batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata =
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07002241 &retry_state->recv_initial_metadata;
Mark D. Roth718c8342018-02-28 13:00:04 -08002242 batch_data->batch.payload->recv_initial_metadata.trailing_metadata_available =
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07002243 &retry_state->trailing_metadata_available;
2244 GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
Mark D. Roth718c8342018-02-28 13:00:04 -08002245 recv_initial_metadata_ready, batch_data,
2246 grpc_schedule_on_exec_ctx);
2247 batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata_ready =
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07002248 &retry_state->recv_initial_metadata_ready;
Mark D. Roth718c8342018-02-28 13:00:04 -08002249}
2250
2251// Adds retriable recv_message op to batch_data.
2252static void add_retriable_recv_message_op(
2253 call_data* calld, subchannel_call_retry_state* retry_state,
2254 subchannel_batch_data* batch_data) {
2255 ++retry_state->started_recv_message_count;
2256 batch_data->batch.recv_message = true;
2257 batch_data->batch.payload->recv_message.recv_message =
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07002258 &retry_state->recv_message;
2259 GRPC_CLOSURE_INIT(&retry_state->recv_message_ready, recv_message_ready,
Mark D. Roth718c8342018-02-28 13:00:04 -08002260 batch_data, grpc_schedule_on_exec_ctx);
2261 batch_data->batch.payload->recv_message.recv_message_ready =
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07002262 &retry_state->recv_message_ready;
Mark D. Roth718c8342018-02-28 13:00:04 -08002263}
2264
2265// Adds retriable recv_trailing_metadata op to batch_data.
2266static void add_retriable_recv_trailing_metadata_op(
2267 call_data* calld, subchannel_call_retry_state* retry_state,
2268 subchannel_batch_data* batch_data) {
2269 retry_state->started_recv_trailing_metadata = true;
2270 batch_data->batch.recv_trailing_metadata = true;
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07002271 grpc_metadata_batch_init(&retry_state->recv_trailing_metadata);
Mark D. Roth718c8342018-02-28 13:00:04 -08002272 batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata =
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07002273 &retry_state->recv_trailing_metadata;
Mark D. Roth817d28f2018-06-14 09:44:58 -07002274 batch_data->batch.payload->recv_trailing_metadata.collect_stats =
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07002275 &retry_state->collect_stats;
2276 GRPC_CLOSURE_INIT(&retry_state->recv_trailing_metadata_ready,
Mark D. Roth817d28f2018-06-14 09:44:58 -07002277 recv_trailing_metadata_ready, batch_data,
2278 grpc_schedule_on_exec_ctx);
2279 batch_data->batch.payload->recv_trailing_metadata
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07002280 .recv_trailing_metadata_ready =
2281 &retry_state->recv_trailing_metadata_ready;
Mark D. Roth718c8342018-02-28 13:00:04 -08002282}
2283
2284// Helper function used to start a recv_trailing_metadata batch. This
2285// is used in the case where a recv_initial_metadata or recv_message
2286// op fails in a way that we know the call is over but when the application
2287// has not yet started its own recv_trailing_metadata op.
2288static void start_internal_recv_trailing_metadata(grpc_call_element* elem) {
2289 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2290 call_data* calld = static_cast<call_data*>(elem->call_data);
2291 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07002292 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08002293 "chand=%p calld=%p: call failed but recv_trailing_metadata not "
2294 "started; starting it internally",
2295 chand, calld);
2296 }
2297 subchannel_call_retry_state* retry_state =
2298 static_cast<subchannel_call_retry_state*>(
2299 grpc_connected_subchannel_call_get_parent_data(
2300 calld->subchannel_call));
Mark D. Rothde077ac2018-04-12 08:05:44 -07002301 // Create batch_data with 2 refs, since this batch will be unreffed twice:
Mark D. Roth817d28f2018-06-14 09:44:58 -07002302 // once for the recv_trailing_metadata_ready callback when the subchannel
2303 // batch returns, and again when we actually get a recv_trailing_metadata
2304 // op from the surface.
2305 subchannel_batch_data* batch_data =
2306 batch_data_create(elem, 2, false /* set_on_complete */);
Mark D. Roth718c8342018-02-28 13:00:04 -08002307 add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
Mark D. Rothde077ac2018-04-12 08:05:44 -07002308 retry_state->recv_trailing_metadata_internal_batch = batch_data;
Mark D. Roth718c8342018-02-28 13:00:04 -08002309 // Note: This will release the call combiner.
2310 grpc_subchannel_call_process_op(calld->subchannel_call, &batch_data->batch);
2311}
2312
2313// If there are any cached send ops that need to be replayed on the
2314// current subchannel call, creates and returns a new subchannel batch
2315// to replay those ops. Otherwise, returns nullptr.
2316static subchannel_batch_data* maybe_create_subchannel_batch_for_replay(
2317 grpc_call_element* elem, subchannel_call_retry_state* retry_state) {
2318 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2319 call_data* calld = static_cast<call_data*>(elem->call_data);
2320 subchannel_batch_data* replay_batch_data = nullptr;
2321 // send_initial_metadata.
2322 if (calld->seen_send_initial_metadata &&
2323 !retry_state->started_send_initial_metadata &&
2324 !calld->pending_send_initial_metadata) {
2325 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07002326 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08002327 "chand=%p calld=%p: replaying previously completed "
2328 "send_initial_metadata op",
2329 chand, calld);
2330 }
Mark D. Roth817d28f2018-06-14 09:44:58 -07002331 replay_batch_data = batch_data_create(elem, 1, true /* set_on_complete */);
Mark D. Roth718c8342018-02-28 13:00:04 -08002332 add_retriable_send_initial_metadata_op(calld, retry_state,
2333 replay_batch_data);
2334 }
2335 // send_message.
2336 // Note that we can only have one send_message op in flight at a time.
Mark D. Rothefcd45b2018-03-28 10:49:59 -07002337 if (retry_state->started_send_message_count < calld->send_messages->size() &&
Mark D. Roth718c8342018-02-28 13:00:04 -08002338 retry_state->started_send_message_count ==
2339 retry_state->completed_send_message_count &&
2340 !calld->pending_send_message) {
2341 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07002342 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08002343 "chand=%p calld=%p: replaying previously completed "
2344 "send_message op",
2345 chand, calld);
2346 }
2347 if (replay_batch_data == nullptr) {
Mark D. Roth817d28f2018-06-14 09:44:58 -07002348 replay_batch_data =
2349 batch_data_create(elem, 1, true /* set_on_complete */);
Mark D. Roth718c8342018-02-28 13:00:04 -08002350 }
2351 add_retriable_send_message_op(elem, retry_state, replay_batch_data);
2352 }
2353 // send_trailing_metadata.
2354 // Note that we only add this op if we have no more send_message ops
2355 // to start, since we can't send down any more send_message ops after
2356 // send_trailing_metadata.
2357 if (calld->seen_send_trailing_metadata &&
Mark D. Rothefcd45b2018-03-28 10:49:59 -07002358 retry_state->started_send_message_count == calld->send_messages->size() &&
Mark D. Roth718c8342018-02-28 13:00:04 -08002359 !retry_state->started_send_trailing_metadata &&
2360 !calld->pending_send_trailing_metadata) {
2361 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07002362 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08002363 "chand=%p calld=%p: replaying previously completed "
2364 "send_trailing_metadata op",
2365 chand, calld);
2366 }
2367 if (replay_batch_data == nullptr) {
Mark D. Roth817d28f2018-06-14 09:44:58 -07002368 replay_batch_data =
2369 batch_data_create(elem, 1, true /* set_on_complete */);
Mark D. Roth718c8342018-02-28 13:00:04 -08002370 }
2371 add_retriable_send_trailing_metadata_op(calld, retry_state,
2372 replay_batch_data);
2373 }
2374 return replay_batch_data;
2375}
2376
2377// Adds subchannel batches for pending batches to batches, updating
2378// *num_batches as needed.
2379static void add_subchannel_batches_for_pending_batches(
2380 grpc_call_element* elem, subchannel_call_retry_state* retry_state,
Mark D. Roth817d28f2018-06-14 09:44:58 -07002381 grpc_core::CallCombinerClosureList* closures) {
Mark D. Roth718c8342018-02-28 13:00:04 -08002382 call_data* calld = static_cast<call_data*>(elem->call_data);
2383 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
2384 pending_batch* pending = &calld->pending_batches[i];
2385 grpc_transport_stream_op_batch* batch = pending->batch;
2386 if (batch == nullptr) continue;
2387 // Skip any batch that either (a) has already been started on this
2388 // subchannel call or (b) we can't start yet because we're still
2389 // replaying send ops that need to be completed first.
2390 // TODO(roth): Note that if any one op in the batch can't be sent
2391 // yet due to ops that we're replaying, we don't start any of the ops
2392 // in the batch. This is probably okay, but it could conceivably
2393 // lead to increased latency in some cases -- e.g., we could delay
2394 // starting a recv op due to it being in the same batch with a send
2395 // op. If/when we revamp the callback protocol in
2396 // transport_stream_op_batch, we may be able to fix this.
2397 if (batch->send_initial_metadata &&
2398 retry_state->started_send_initial_metadata) {
2399 continue;
2400 }
2401 if (batch->send_message && retry_state->completed_send_message_count <
2402 retry_state->started_send_message_count) {
2403 continue;
2404 }
2405 // Note that we only start send_trailing_metadata if we have no more
2406 // send_message ops to start, since we can't send down any more
2407 // send_message ops after send_trailing_metadata.
2408 if (batch->send_trailing_metadata &&
2409 (retry_state->started_send_message_count + batch->send_message <
Mark D. Rothefcd45b2018-03-28 10:49:59 -07002410 calld->send_messages->size() ||
Mark D. Roth718c8342018-02-28 13:00:04 -08002411 retry_state->started_send_trailing_metadata)) {
2412 continue;
2413 }
2414 if (batch->recv_initial_metadata &&
2415 retry_state->started_recv_initial_metadata) {
2416 continue;
2417 }
2418 if (batch->recv_message && retry_state->completed_recv_message_count <
2419 retry_state->started_recv_message_count) {
2420 continue;
2421 }
2422 if (batch->recv_trailing_metadata &&
2423 retry_state->started_recv_trailing_metadata) {
Mark D. Rothde077ac2018-04-12 08:05:44 -07002424 // If we previously completed a recv_trailing_metadata op
2425 // initiated by start_internal_recv_trailing_metadata(), use the
2426 // result of that instead of trying to re-start this op.
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07002427 if (GPR_UNLIKELY((retry_state->recv_trailing_metadata_internal_batch !=
2428 nullptr))) {
Mark D. Rothde077ac2018-04-12 08:05:44 -07002429 // If the batch completed, then trigger the completion callback
2430 // directly, so that we return the previously returned results to
2431 // the application. Otherwise, just unref the internally
2432 // started subchannel batch, since we'll propagate the
2433 // completion when it completes.
2434 if (retry_state->completed_recv_trailing_metadata) {
Mark D. Rothde077ac2018-04-12 08:05:44 -07002435 // Batches containing recv_trailing_metadata always succeed.
Mark D. Roth817d28f2018-06-14 09:44:58 -07002436 closures->Add(
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07002437 &retry_state->recv_trailing_metadata_ready, GRPC_ERROR_NONE,
Mark D. Roth817d28f2018-06-14 09:44:58 -07002438 "re-executing recv_trailing_metadata_ready to propagate "
2439 "internally triggered result");
Mark D. Rothde077ac2018-04-12 08:05:44 -07002440 } else {
2441 batch_data_unref(retry_state->recv_trailing_metadata_internal_batch);
2442 }
2443 retry_state->recv_trailing_metadata_internal_batch = nullptr;
2444 }
Mark D. Roth718c8342018-02-28 13:00:04 -08002445 continue;
2446 }
2447 // If we're not retrying, just send the batch as-is.
2448 if (calld->method_params == nullptr ||
2449 calld->method_params->retry_policy() == nullptr ||
2450 calld->retry_committed) {
Mark D. Roth817d28f2018-06-14 09:44:58 -07002451 add_closure_for_subchannel_batch(elem, batch, closures);
Mark D. Roth718c8342018-02-28 13:00:04 -08002452 pending_batch_clear(calld, pending);
2453 continue;
2454 }
2455 // Create batch with the right number of callbacks.
Mark D. Roth817d28f2018-06-14 09:44:58 -07002456 const bool has_send_ops = batch->send_initial_metadata ||
2457 batch->send_message ||
2458 batch->send_trailing_metadata;
2459 const int num_callbacks = has_send_ops + batch->recv_initial_metadata +
2460 batch->recv_message +
2461 batch->recv_trailing_metadata;
2462 subchannel_batch_data* batch_data = batch_data_create(
2463 elem, num_callbacks, has_send_ops /* set_on_complete */);
Mark D. Roth718c8342018-02-28 13:00:04 -08002464 // Cache send ops if needed.
2465 maybe_cache_send_ops_for_batch(calld, pending);
2466 // send_initial_metadata.
2467 if (batch->send_initial_metadata) {
2468 add_retriable_send_initial_metadata_op(calld, retry_state, batch_data);
2469 }
2470 // send_message.
2471 if (batch->send_message) {
2472 add_retriable_send_message_op(elem, retry_state, batch_data);
2473 }
2474 // send_trailing_metadata.
2475 if (batch->send_trailing_metadata) {
2476 add_retriable_send_trailing_metadata_op(calld, retry_state, batch_data);
2477 }
2478 // recv_initial_metadata.
2479 if (batch->recv_initial_metadata) {
2480 // recv_flags is only used on the server side.
2481 GPR_ASSERT(batch->payload->recv_initial_metadata.recv_flags == nullptr);
2482 add_retriable_recv_initial_metadata_op(calld, retry_state, batch_data);
2483 }
2484 // recv_message.
2485 if (batch->recv_message) {
2486 add_retriable_recv_message_op(calld, retry_state, batch_data);
2487 }
2488 // recv_trailing_metadata.
2489 if (batch->recv_trailing_metadata) {
Mark D. Roth718c8342018-02-28 13:00:04 -08002490 add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
2491 }
Mark D. Roth817d28f2018-06-14 09:44:58 -07002492 add_closure_for_subchannel_batch(elem, &batch_data->batch, closures);
Mark D. Roth4f9e0032018-05-24 09:30:09 -07002493 // Track number of pending subchannel send batches.
2494 // If this is the first one, take a ref to the call stack.
2495 if (batch->send_initial_metadata || batch->send_message ||
2496 batch->send_trailing_metadata) {
2497 if (calld->num_pending_retriable_subchannel_send_batches == 0) {
2498 GRPC_CALL_STACK_REF(calld->owning_call, "subchannel_send_batches");
2499 }
2500 ++calld->num_pending_retriable_subchannel_send_batches;
2501 }
Mark D. Roth718c8342018-02-28 13:00:04 -08002502 }
2503}
2504
2505// Constructs and starts whatever subchannel batches are needed on the
2506// subchannel call.
2507static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) {
2508 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2509 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2510 call_data* calld = static_cast<call_data*>(elem->call_data);
2511 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07002512 gpr_log(GPR_INFO, "chand=%p calld=%p: constructing retriable batches",
Mark D. Roth718c8342018-02-28 13:00:04 -08002513 chand, calld);
2514 }
2515 subchannel_call_retry_state* retry_state =
2516 static_cast<subchannel_call_retry_state*>(
2517 grpc_connected_subchannel_call_get_parent_data(
2518 calld->subchannel_call));
Mark D. Rothde077ac2018-04-12 08:05:44 -07002519 // Construct list of closures to execute, one for each pending batch.
Mark D. Roth817d28f2018-06-14 09:44:58 -07002520 grpc_core::CallCombinerClosureList closures;
Mark D. Roth718c8342018-02-28 13:00:04 -08002521 // Replay previously-returned send_* ops if needed.
2522 subchannel_batch_data* replay_batch_data =
2523 maybe_create_subchannel_batch_for_replay(elem, retry_state);
2524 if (replay_batch_data != nullptr) {
Mark D. Roth817d28f2018-06-14 09:44:58 -07002525 add_closure_for_subchannel_batch(elem, &replay_batch_data->batch,
2526 &closures);
Mark D. Roth4f9e0032018-05-24 09:30:09 -07002527 // Track number of pending subchannel send batches.
2528 // If this is the first one, take a ref to the call stack.
2529 if (calld->num_pending_retriable_subchannel_send_batches == 0) {
2530 GRPC_CALL_STACK_REF(calld->owning_call, "subchannel_send_batches");
2531 }
2532 ++calld->num_pending_retriable_subchannel_send_batches;
Mark D. Roth718c8342018-02-28 13:00:04 -08002533 }
2534 // Now add pending batches.
Mark D. Roth817d28f2018-06-14 09:44:58 -07002535 add_subchannel_batches_for_pending_batches(elem, retry_state, &closures);
Mark D. Roth718c8342018-02-28 13:00:04 -08002536 // Start batches on subchannel call.
Mark D. Roth718c8342018-02-28 13:00:04 -08002537 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07002538 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08002539 "chand=%p calld=%p: starting %" PRIuPTR
2540 " retriable batches on subchannel_call=%p",
Mark D. Roth817d28f2018-06-14 09:44:58 -07002541 chand, calld, closures.size(), calld->subchannel_call);
Mark D. Roth718c8342018-02-28 13:00:04 -08002542 }
Mark D. Roth817d28f2018-06-14 09:44:58 -07002543 // Note: This will yield the call combiner.
2544 closures.RunClosures(calld->call_combiner);
Mark D. Roth718c8342018-02-28 13:00:04 -08002545}
2546
2547//
2548// LB pick
2549//
2550
2551static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) {
2552 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2553 call_data* calld = static_cast<call_data*>(elem->call_data);
2554 const size_t parent_data_size =
2555 calld->enable_retries ? sizeof(subchannel_call_retry_state) : 0;
David Garcia Quintasbaf1ac72018-01-09 14:24:32 -08002556 const grpc_core::ConnectedSubchannel::CallArgs call_args = {
Mark D. Rothc0febd32018-01-09 10:25:24 -08002557 calld->pollent, // pollent
2558 calld->path, // path
2559 calld->call_start_time, // start_time
2560 calld->deadline, // deadline
2561 calld->arena, // arena
2562 calld->pick.subchannel_call_context, // context
Mark D. Roth718c8342018-02-28 13:00:04 -08002563 calld->call_combiner, // call_combiner
2564 parent_data_size // parent_data_size
Yash Tibrewald8b84a22017-09-25 13:38:03 -07002565 };
David Garcia Quintas70fbe622018-01-09 19:27:46 -08002566 grpc_error* new_error = calld->pick.connected_subchannel->CreateCall(
David Garcia Quintasbaf1ac72018-01-09 14:24:32 -08002567 call_args, &calld->subchannel_call);
Craig Tiller6014e8a2017-10-16 13:50:29 -07002568 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07002569 gpr_log(GPR_INFO, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
Mark D. Roth76e264b2017-08-25 09:03:33 -07002570 chand, calld, calld->subchannel_call, grpc_error_string(new_error));
Mark D. Roth60751fe2017-07-07 12:50:33 -07002571 }
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07002572 if (GPR_UNLIKELY(new_error != GRPC_ERROR_NONE)) {
Mark D. Roth0ca0be82017-06-20 07:49:33 -07002573 new_error = grpc_error_add_child(new_error, error);
Mark D. Roth718c8342018-02-28 13:00:04 -08002574 pending_batches_fail(elem, new_error, true /* yield_call_combiner */);
Mark D. Roth0ca0be82017-06-20 07:49:33 -07002575 } else {
Mark D. Roth718c8342018-02-28 13:00:04 -08002576 if (parent_data_size > 0) {
2577 subchannel_call_retry_state* retry_state =
2578 static_cast<subchannel_call_retry_state*>(
2579 grpc_connected_subchannel_call_get_parent_data(
2580 calld->subchannel_call));
2581 retry_state->batch_payload.context = calld->pick.subchannel_call_context;
2582 }
2583 pending_batches_resume(elem);
Craig Tiller11c17d42017-03-13 13:36:34 -07002584 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -07002585 GRPC_ERROR_UNREF(error);
Craig Tiller11c17d42017-03-13 13:36:34 -07002586}
2587
Mark D. Rothb2929602017-09-11 09:31:11 -07002588// Invoked when a pick is completed, on both success or failure.
Mark D. Roth718c8342018-02-28 13:00:04 -08002589static void pick_done(void* arg, grpc_error* error) {
2590 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
Noah Eisenbe82e642018-02-09 09:16:55 -08002591 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Mark D. Roth718c8342018-02-28 13:00:04 -08002592 call_data* calld = static_cast<call_data*>(elem->call_data);
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07002593 if (GPR_UNLIKELY(calld->pick.connected_subchannel == nullptr)) {
Mark D. Roth0ca0be82017-06-20 07:49:33 -07002594 // Failed to create subchannel.
Mark D. Roth718c8342018-02-28 13:00:04 -08002595 // If there was no error, this is an LB policy drop, in which case
2596 // we return an error; otherwise, we may retry.
2597 grpc_status_code status = GRPC_STATUS_OK;
2598 grpc_error_get_status(error, calld->deadline, &status, nullptr, nullptr,
2599 nullptr);
2600 if (error == GRPC_ERROR_NONE || !calld->enable_retries ||
2601 !maybe_retry(elem, nullptr /* batch_data */, status,
2602 nullptr /* server_pushback_md */)) {
2603 grpc_error* new_error =
2604 error == GRPC_ERROR_NONE
2605 ? GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2606 "Call dropped by load balancing policy")
2607 : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2608 "Failed to create subchannel", &error, 1);
2609 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07002610 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08002611 "chand=%p calld=%p: failed to create subchannel: error=%s",
2612 chand, calld, grpc_error_string(new_error));
2613 }
2614 pending_batches_fail(elem, new_error, true /* yield_call_combiner */);
Mark D. Roth60751fe2017-07-07 12:50:33 -07002615 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -07002616 } else {
Mark D. Roth9fe284e2016-09-12 11:22:27 -07002617 /* Create call on subchannel. */
Mark D. Roth718c8342018-02-28 13:00:04 -08002618 create_subchannel_call(elem, GRPC_ERROR_REF(error));
Mark D. Roth2a5959f2016-09-01 08:20:27 -07002619 }
Mark D. Roth718c8342018-02-28 13:00:04 -08002620}
2621
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002622static void maybe_add_call_to_channel_interested_parties_locked(
2623 grpc_call_element* elem) {
2624 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2625 call_data* calld = static_cast<call_data*>(elem->call_data);
2626 if (!calld->pollent_added_to_interested_parties) {
2627 calld->pollent_added_to_interested_parties = true;
2628 grpc_polling_entity_add_to_pollset_set(calld->pollent,
2629 chand->interested_parties);
2630 }
2631}
2632
2633static void maybe_del_call_from_channel_interested_parties_locked(
2634 grpc_call_element* elem) {
2635 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2636 call_data* calld = static_cast<call_data*>(elem->call_data);
2637 if (calld->pollent_added_to_interested_parties) {
2638 calld->pollent_added_to_interested_parties = false;
2639 grpc_polling_entity_del_from_pollset_set(calld->pollent,
2640 chand->interested_parties);
2641 }
2642}
2643
Mark D. Roth718c8342018-02-28 13:00:04 -08002644// Invoked when a pick is completed to leave the client_channel combiner
2645// and continue processing in the call combiner.
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002646// If needed, removes the call's polling entity from chand->interested_parties.
Mark D. Roth718c8342018-02-28 13:00:04 -08002647static void pick_done_locked(grpc_call_element* elem, grpc_error* error) {
2648 call_data* calld = static_cast<call_data*>(elem->call_data);
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002649 maybe_del_call_from_channel_interested_parties_locked(elem);
Mark D. Roth718c8342018-02-28 13:00:04 -08002650 GRPC_CLOSURE_INIT(&calld->pick_closure, pick_done, elem,
2651 grpc_schedule_on_exec_ctx);
2652 GRPC_CLOSURE_SCHED(&calld->pick_closure, error);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07002653}
2654
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002655namespace grpc_core {
Mark D. Rothb2929602017-09-11 09:31:11 -07002656
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002657// Performs subchannel pick via LB policy.
2658class LbPicker {
2659 public:
2660 // Starts a pick on chand->lb_policy.
2661 static void StartLocked(grpc_call_element* elem) {
2662 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2663 call_data* calld = static_cast<call_data*>(elem->call_data);
Craig Tiller6014e8a2017-10-16 13:50:29 -07002664 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002665 gpr_log(GPR_INFO, "chand=%p calld=%p: starting pick on lb_policy=%p",
Mark D. Rothc8875492018-02-20 08:33:48 -08002666 chand, calld, chand->lb_policy.get());
Mark D. Rothb2929602017-09-11 09:31:11 -07002667 }
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002668 // If this is a retry, use the send_initial_metadata payload that
2669 // we've cached; otherwise, use the pending batch. The
2670 // send_initial_metadata batch will be the first pending batch in the
2671 // list, as set by get_batch_index() above.
2672 calld->pick.initial_metadata =
2673 calld->seen_send_initial_metadata
2674 ? &calld->send_initial_metadata
2675 : calld->pending_batches[0]
2676 .batch->payload->send_initial_metadata.send_initial_metadata;
2677 calld->pick.initial_metadata_flags =
2678 calld->seen_send_initial_metadata
2679 ? calld->send_initial_metadata_flags
2680 : calld->pending_batches[0]
2681 .batch->payload->send_initial_metadata
2682 .send_initial_metadata_flags;
2683 GRPC_CLOSURE_INIT(&calld->pick_closure, &LbPicker::DoneLocked, elem,
2684 grpc_combiner_scheduler(chand->combiner));
2685 calld->pick.on_complete = &calld->pick_closure;
2686 GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback");
2687 const bool pick_done = chand->lb_policy->PickLocked(&calld->pick);
2688 if (GPR_LIKELY(pick_done)) {
2689 // Pick completed synchronously.
2690 if (grpc_client_channel_trace.enabled()) {
2691 gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed synchronously",
2692 chand, calld);
2693 }
2694 pick_done_locked(elem, GRPC_ERROR_NONE);
2695 GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback");
2696 } else {
2697 // Pick will be returned asynchronously.
2698 // Add the polling entity from call_data to the channel_data's
2699 // interested_parties, so that the I/O of the LB policy can be done
2700 // under it. It will be removed in pick_done_locked().
2701 maybe_add_call_to_channel_interested_parties_locked(elem);
2702 // Request notification on call cancellation.
2703 GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback_cancel");
2704 grpc_call_combiner_set_notify_on_cancel(
2705 calld->call_combiner,
2706 GRPC_CLOSURE_INIT(&calld->pick_cancel_closure,
2707 &LbPicker::CancelLocked, elem,
2708 grpc_combiner_scheduler(chand->combiner)));
2709 }
Mark D. Rothb2929602017-09-11 09:31:11 -07002710 }
Mark D. Rothb2929602017-09-11 09:31:11 -07002711
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002712 private:
2713 // Callback invoked by LoadBalancingPolicy::PickLocked() for async picks.
2714 // Unrefs the LB policy and invokes pick_done_locked().
2715 static void DoneLocked(void* arg, grpc_error* error) {
2716 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2717 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2718 call_data* calld = static_cast<call_data*>(elem->call_data);
2719 if (grpc_client_channel_trace.enabled()) {
2720 gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed asynchronously",
2721 chand, calld);
2722 }
2723 pick_done_locked(elem, GRPC_ERROR_REF(error));
2724 GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback");
Mark D. Rothb2929602017-09-11 09:31:11 -07002725 }
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002726
2727 // Note: This runs under the client_channel combiner, but will NOT be
2728 // holding the call combiner.
2729 static void CancelLocked(void* arg, grpc_error* error) {
2730 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2731 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2732 call_data* calld = static_cast<call_data*>(elem->call_data);
2733 // Note: chand->lb_policy may have changed since we started our pick,
2734 // in which case we will be cancelling the pick on a policy other than
2735 // the one we started it on. However, this will just be a no-op.
2736 if (GPR_UNLIKELY(error != GRPC_ERROR_NONE && chand->lb_policy != nullptr)) {
2737 if (grpc_client_channel_trace.enabled()) {
2738 gpr_log(GPR_INFO,
2739 "chand=%p calld=%p: cancelling pick from LB policy %p", chand,
2740 calld, chand->lb_policy.get());
2741 }
2742 chand->lb_policy->CancelPickLocked(&calld->pick, GRPC_ERROR_REF(error));
2743 }
2744 GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback_cancel");
2745 }
2746};
2747
2748} // namespace grpc_core
Mark D. Rothb2929602017-09-11 09:31:11 -07002749
Mark D. Roth718c8342018-02-28 13:00:04 -08002750// Applies service config to the call. Must be invoked once we know
2751// that the resolver has returned results to the channel.
2752static void apply_service_config_to_call_locked(grpc_call_element* elem) {
2753 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2754 call_data* calld = static_cast<call_data*>(elem->call_data);
2755 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07002756 gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call",
Mark D. Roth718c8342018-02-28 13:00:04 -08002757 chand, calld);
2758 }
2759 if (chand->retry_throttle_data != nullptr) {
Mark D. Roth9db86fc2018-03-28 07:42:20 -07002760 calld->retry_throttle_data = chand->retry_throttle_data->Ref();
Mark D. Roth718c8342018-02-28 13:00:04 -08002761 }
2762 if (chand->method_params_table != nullptr) {
2763 calld->method_params = grpc_core::ServiceConfig::MethodConfigTableLookup(
2764 *chand->method_params_table, calld->path);
2765 if (calld->method_params != nullptr) {
2766 // If the deadline from the service config is shorter than the one
2767 // from the client API, reset the deadline timer.
2768 if (chand->deadline_checking_enabled &&
2769 calld->method_params->timeout() != 0) {
2770 const grpc_millis per_method_deadline =
2771 grpc_timespec_to_millis_round_up(calld->call_start_time) +
2772 calld->method_params->timeout();
2773 if (per_method_deadline < calld->deadline) {
2774 calld->deadline = per_method_deadline;
2775 grpc_deadline_state_reset(elem, calld->deadline);
2776 }
2777 }
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002778 // If the service config set wait_for_ready and the application
2779 // did not explicitly set it, use the value from the service config.
2780 uint32_t* send_initial_metadata_flags =
2781 &calld->pending_batches[0]
2782 .batch->payload->send_initial_metadata
2783 .send_initial_metadata_flags;
2784 if (GPR_UNLIKELY(
2785 calld->method_params->wait_for_ready() !=
2786 ClientChannelMethodParams::WAIT_FOR_READY_UNSET &&
2787 !(*send_initial_metadata_flags &
2788 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET))) {
2789 if (calld->method_params->wait_for_ready() ==
2790 ClientChannelMethodParams::WAIT_FOR_READY_TRUE) {
2791 *send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
2792 } else {
2793 *send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
2794 }
2795 }
Mark D. Roth718c8342018-02-28 13:00:04 -08002796 }
2797 }
2798 // If no retry policy, disable retries.
2799 // TODO(roth): Remove this when adding support for transparent retries.
2800 if (calld->method_params == nullptr ||
2801 calld->method_params->retry_policy() == nullptr) {
2802 calld->enable_retries = false;
2803 }
2804}
2805
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002806// Invoked once resolver results are available.
2807static void process_service_config_and_start_lb_pick_locked(
2808 grpc_call_element* elem) {
Noah Eisenbe82e642018-02-09 09:16:55 -08002809 call_data* calld = static_cast<call_data*>(elem->call_data);
Mark D. Roth718c8342018-02-28 13:00:04 -08002810 // Only get service config data on the first attempt.
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07002811 if (GPR_LIKELY(calld->num_attempts_completed == 0)) {
Mark D. Roth718c8342018-02-28 13:00:04 -08002812 apply_service_config_to_call_locked(elem);
2813 }
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002814 // Start LB pick.
2815 grpc_core::LbPicker::StartLocked(elem);
Mark D. Rothb2929602017-09-11 09:31:11 -07002816}
Mark D. Roth0ca0be82017-06-20 07:49:33 -07002817
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002818namespace grpc_core {
Craig Tiller577c9b22015-11-02 14:11:15 -08002819
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002820// Handles waiting for a resolver result.
2821// Used only for the first call on an idle channel.
2822class ResolverResultWaiter {
2823 public:
2824 explicit ResolverResultWaiter(grpc_call_element* elem) : elem_(elem) {
2825 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2826 call_data* calld = static_cast<call_data*>(elem->call_data);
Craig Tiller6014e8a2017-10-16 13:50:29 -07002827 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002828 gpr_log(GPR_INFO,
2829 "chand=%p calld=%p: deferring pick pending resolver result",
Mark D. Rothb2b9a0f2017-09-01 09:06:47 -07002830 chand, calld);
2831 }
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002832 // Add closure to be run when a resolver result is available.
2833 GRPC_CLOSURE_INIT(&done_closure_, &ResolverResultWaiter::DoneLocked, this,
2834 grpc_combiner_scheduler(chand->combiner));
2835 AddToWaitingList();
2836 // Set cancellation closure, so that we abort if the call is cancelled.
2837 GRPC_CLOSURE_INIT(&cancel_closure_, &ResolverResultWaiter::CancelLocked,
2838 this, grpc_combiner_scheduler(chand->combiner));
2839 grpc_call_combiner_set_notify_on_cancel(calld->call_combiner,
2840 &cancel_closure_);
2841 }
2842
2843 private:
2844 // Adds closure_ to chand->waiting_for_resolver_result_closures.
2845 void AddToWaitingList() {
2846 channel_data* chand = static_cast<channel_data*>(elem_->channel_data);
2847 grpc_closure_list_append(&chand->waiting_for_resolver_result_closures,
2848 &done_closure_, GRPC_ERROR_NONE);
2849 }
2850
2851 // Invoked when a resolver result is available.
2852 static void DoneLocked(void* arg, grpc_error* error) {
2853 ResolverResultWaiter* self = static_cast<ResolverResultWaiter*>(arg);
2854 // If CancelLocked() has already run, delete ourselves without doing
2855 // anything. Note that the call stack may have already been destroyed,
2856 // so it's not safe to access anything in elem_.
2857 if (GPR_UNLIKELY(self->finished_)) {
Mark D. Rothe63e06d2018-03-23 08:12:11 -07002858 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002859 gpr_log(GPR_INFO, "call cancelled before resolver result");
2860 }
2861 Delete(self);
2862 return;
2863 }
2864 // Otherwise, process the resolver result.
2865 grpc_call_element* elem = self->elem_;
2866 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2867 call_data* calld = static_cast<call_data*>(elem->call_data);
2868 if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
2869 if (grpc_client_channel_trace.enabled()) {
2870 gpr_log(GPR_INFO, "chand=%p calld=%p: resolver failed to return data",
Mark D. Rothe63e06d2018-03-23 08:12:11 -07002871 chand, calld);
2872 }
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002873 pick_done_locked(elem, GRPC_ERROR_REF(error));
2874 } else if (GPR_UNLIKELY(chand->resolver == nullptr)) {
2875 // Shutting down.
2876 if (grpc_client_channel_trace.enabled()) {
2877 gpr_log(GPR_INFO, "chand=%p calld=%p: resolver disconnected", chand,
2878 calld);
2879 }
2880 pick_done_locked(elem,
2881 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
2882 } else if (GPR_UNLIKELY(chand->lb_policy == nullptr)) {
2883 // Transient resolver failure.
2884 // If call has wait_for_ready=true, try again; otherwise, fail.
2885 uint32_t send_initial_metadata_flags =
2886 calld->seen_send_initial_metadata
2887 ? calld->send_initial_metadata_flags
2888 : calld->pending_batches[0]
2889 .batch->payload->send_initial_metadata
2890 .send_initial_metadata_flags;
2891 if (send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) {
2892 if (grpc_client_channel_trace.enabled()) {
2893 gpr_log(GPR_INFO,
2894 "chand=%p calld=%p: resolver returned but no LB policy; "
2895 "wait_for_ready=true; trying again",
2896 chand, calld);
2897 }
2898 // Re-add ourselves to the waiting list.
2899 self->AddToWaitingList();
2900 // Return early so that we don't set finished_ to true below.
2901 return;
2902 } else {
2903 if (grpc_client_channel_trace.enabled()) {
2904 gpr_log(GPR_INFO,
2905 "chand=%p calld=%p: resolver returned but no LB policy; "
2906 "wait_for_ready=false; failing",
2907 chand, calld);
2908 }
2909 pick_done_locked(
2910 elem,
2911 grpc_error_set_int(
2912 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Name resolution failure"),
2913 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
2914 }
Mark D. Rothe63e06d2018-03-23 08:12:11 -07002915 } else {
2916 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002917 gpr_log(GPR_INFO, "chand=%p calld=%p: resolver returned, doing LB pick",
Mark D. Rothe63e06d2018-03-23 08:12:11 -07002918 chand, calld);
2919 }
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002920 process_service_config_and_start_lb_pick_locked(elem);
Mark D. Rothe63e06d2018-03-23 08:12:11 -07002921 }
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002922 self->finished_ = true;
Craig Tiller577c9b22015-11-02 14:11:15 -08002923 }
Craig Tiller577c9b22015-11-02 14:11:15 -08002924
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002925 // Invoked when the call is cancelled.
2926 // Note: This runs under the client_channel combiner, but will NOT be
2927 // holding the call combiner.
2928 static void CancelLocked(void* arg, grpc_error* error) {
2929 ResolverResultWaiter* self = static_cast<ResolverResultWaiter*>(arg);
2930 // If DoneLocked() has already run, delete ourselves without doing anything.
2931 if (GPR_LIKELY(self->finished_)) {
2932 Delete(self);
2933 return;
2934 }
2935 // If we are being cancelled, immediately invoke pick_done_locked()
2936 // to propagate the error back to the caller.
2937 if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
2938 grpc_call_element* elem = self->elem_;
2939 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2940 call_data* calld = static_cast<call_data*>(elem->call_data);
2941 if (grpc_client_channel_trace.enabled()) {
2942 gpr_log(GPR_INFO,
2943 "chand=%p calld=%p: cancelling call waiting for name "
2944 "resolution",
2945 chand, calld);
2946 }
2947 // Note: Although we are not in the call combiner here, we are
2948 // basically stealing the call combiner from the pending pick, so
2949 // it's safe to call pick_done_locked() here -- we are essentially
2950 // calling it here instead of calling it in DoneLocked().
2951 pick_done_locked(elem, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2952 "Pick cancelled", &error, 1));
2953 }
2954 self->finished_ = true;
Mark D. Roth64a317c2017-05-02 08:27:08 -07002955 }
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002956
2957 grpc_call_element* elem_;
2958 grpc_closure done_closure_;
2959 grpc_closure cancel_closure_;
2960 bool finished_ = false;
2961};
2962
2963} // namespace grpc_core
Mark D. Roth60751fe2017-07-07 12:50:33 -07002964
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002965static void start_pick_locked(void* arg, grpc_error* ignored) {
Noah Eisenbe82e642018-02-09 09:16:55 -08002966 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2967 call_data* calld = static_cast<call_data*>(elem->call_data);
2968 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
David Garcia Quintasbe1b7f92018-01-12 14:01:38 -08002969 GPR_ASSERT(calld->pick.connected_subchannel == nullptr);
Mark D. Roth718c8342018-02-28 13:00:04 -08002970 GPR_ASSERT(calld->subchannel_call == nullptr);
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07002971 if (GPR_LIKELY(chand->lb_policy != nullptr)) {
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002972 // We already have resolver results, so process the service config
2973 // and start an LB pick.
2974 process_service_config_and_start_lb_pick_locked(elem);
2975 } else if (GPR_UNLIKELY(chand->resolver == nullptr)) {
2976 pick_done_locked(elem,
2977 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
Mark D. Roth76e264b2017-08-25 09:03:33 -07002978 } else {
Mark D. Rothb2929602017-09-11 09:31:11 -07002979 // We do not yet have an LB policy, so wait for a resolver result.
Yash Tibrewal137eb932018-05-23 15:07:39 -07002980 if (GPR_UNLIKELY(!chand->started_resolving)) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002981 start_resolving_locked(chand);
Mark D. Rothb2929602017-09-11 09:31:11 -07002982 }
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002983 // Create a new waiter, which will delete itself when done.
2984 grpc_core::New<grpc_core::ResolverResultWaiter>(elem);
2985 // Add the polling entity from call_data to the channel_data's
2986 // interested_parties, so that the I/O of the resolver can be done
2987 // under it. It will be removed in pick_done_locked().
2988 maybe_add_call_to_channel_interested_parties_locked(elem);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07002989 }
Craig Tillera11bfc82017-02-14 09:56:33 -08002990}
2991
Mark D. Roth718c8342018-02-28 13:00:04 -08002992//
2993// filter call vtable functions
2994//
Mark D. Rothd6d192d2017-02-23 08:58:42 -08002995
Craig Tillere1b51da2017-03-31 15:44:33 -07002996static void cc_start_transport_stream_op_batch(
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002997 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
yang-gce1cfea2018-01-31 15:59:50 -08002998 GPR_TIMER_SCOPE("cc_start_transport_stream_op_batch", 0);
Noah Eisenbe82e642018-02-09 09:16:55 -08002999 call_data* calld = static_cast<call_data*>(elem->call_data);
3000 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Yash Tibrewalcff3f9f2018-05-21 15:51:47 -07003001 if (GPR_LIKELY(chand->deadline_checking_enabled)) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003002 grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch);
Craig Tiller3be7dd02017-04-03 14:30:03 -07003003 }
Mark D. Roth76e264b2017-08-25 09:03:33 -07003004 // If we've previously been cancelled, immediately fail any new batches.
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07003005 if (GPR_UNLIKELY(calld->cancel_error != GRPC_ERROR_NONE)) {
Craig Tiller6014e8a2017-10-16 13:50:29 -07003006 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07003007 gpr_log(GPR_INFO, "chand=%p calld=%p: failing batch with error: %s",
Mark D. Roth718c8342018-02-28 13:00:04 -08003008 chand, calld, grpc_error_string(calld->cancel_error));
Mark D. Roth76e264b2017-08-25 09:03:33 -07003009 }
Mark D. Roth718c8342018-02-28 13:00:04 -08003010 // Note: This will release the call combiner.
Mark D. Roth76e264b2017-08-25 09:03:33 -07003011 grpc_transport_stream_op_batch_finish_with_failure(
Mark D. Roth718c8342018-02-28 13:00:04 -08003012 batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner);
yang-gce1cfea2018-01-31 15:59:50 -08003013 return;
Mark D. Roth76e264b2017-08-25 09:03:33 -07003014 }
Mark D. Roth718c8342018-02-28 13:00:04 -08003015 // Handle cancellation.
Yash Tibrewalcff3f9f2018-05-21 15:51:47 -07003016 if (GPR_UNLIKELY(batch->cancel_stream)) {
Mark D. Roth76e264b2017-08-25 09:03:33 -07003017 // Stash a copy of cancel_error in our call data, so that we can use
3018 // it for subsequent operations. This ensures that if the call is
3019 // cancelled before any batches are passed down (e.g., if the deadline
3020 // is in the past when the call starts), we can return the right
3021 // error to the caller when the first batch does get passed down.
Mark D. Roth718c8342018-02-28 13:00:04 -08003022 GRPC_ERROR_UNREF(calld->cancel_error);
3023 calld->cancel_error =
3024 GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
Craig Tiller6014e8a2017-10-16 13:50:29 -07003025 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07003026 gpr_log(GPR_INFO, "chand=%p calld=%p: recording cancel_error=%s", chand,
Mark D. Roth718c8342018-02-28 13:00:04 -08003027 calld, grpc_error_string(calld->cancel_error));
Mark D. Roth76e264b2017-08-25 09:03:33 -07003028 }
Mark D. Roth718c8342018-02-28 13:00:04 -08003029 // If we do not have a subchannel call (i.e., a pick has not yet
3030 // been started), fail all pending batches. Otherwise, send the
3031 // cancellation down to the subchannel call.
3032 if (calld->subchannel_call == nullptr) {
3033 pending_batches_fail(elem, GRPC_ERROR_REF(calld->cancel_error),
3034 false /* yield_call_combiner */);
3035 // Note: This will release the call combiner.
3036 grpc_transport_stream_op_batch_finish_with_failure(
3037 batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner);
Mark D. Roth76e264b2017-08-25 09:03:33 -07003038 } else {
Mark D. Roth718c8342018-02-28 13:00:04 -08003039 // Note: This will release the call combiner.
3040 grpc_subchannel_call_process_op(calld->subchannel_call, batch);
Mark D. Roth76e264b2017-08-25 09:03:33 -07003041 }
yang-gce1cfea2018-01-31 15:59:50 -08003042 return;
Mark D. Roth76e264b2017-08-25 09:03:33 -07003043 }
Mark D. Roth718c8342018-02-28 13:00:04 -08003044 // Add the batch to the pending list.
3045 pending_batches_add(elem, batch);
Mark D. Roth76e264b2017-08-25 09:03:33 -07003046 // Check if we've already gotten a subchannel call.
3047 // Note that once we have completed the pick, we do not need to enter
3048 // the channel combiner, which is more efficient (especially for
3049 // streaming calls).
Craig Tiller4782d922017-11-10 09:53:21 -08003050 if (calld->subchannel_call != nullptr) {
Craig Tiller6014e8a2017-10-16 13:50:29 -07003051 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07003052 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08003053 "chand=%p calld=%p: starting batch on subchannel_call=%p", chand,
Mark D. Roth76e264b2017-08-25 09:03:33 -07003054 calld, calld->subchannel_call);
Mark D. Roth60751fe2017-07-07 12:50:33 -07003055 }
Mark D. Roth718c8342018-02-28 13:00:04 -08003056 pending_batches_resume(elem);
yang-gce1cfea2018-01-31 15:59:50 -08003057 return;
Mark D. Roth2a5959f2016-09-01 08:20:27 -07003058 }
Mark D. Roth76e264b2017-08-25 09:03:33 -07003059 // We do not yet have a subchannel call.
Mark D. Roth76e264b2017-08-25 09:03:33 -07003060 // For batches containing a send_initial_metadata op, enter the channel
3061 // combiner to start a pick.
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07003062 if (GPR_LIKELY(batch->send_initial_metadata)) {
Craig Tiller6014e8a2017-10-16 13:50:29 -07003063 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07003064 gpr_log(GPR_INFO, "chand=%p calld=%p: entering client_channel combiner",
Mark D. Rothb2929602017-09-11 09:31:11 -07003065 chand, calld);
Mark D. Roth76e264b2017-08-25 09:03:33 -07003066 }
3067 GRPC_CLOSURE_SCHED(
Mark D. Roth76e264b2017-08-25 09:03:33 -07003068 GRPC_CLOSURE_INIT(&batch->handler_private.closure, start_pick_locked,
3069 elem, grpc_combiner_scheduler(chand->combiner)),
3070 GRPC_ERROR_NONE);
3071 } else {
3072 // For all other batches, release the call combiner.
Craig Tiller6014e8a2017-10-16 13:50:29 -07003073 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07003074 gpr_log(GPR_INFO,
Mark D. Roth76e264b2017-08-25 09:03:33 -07003075 "chand=%p calld=%p: saved batch, yeilding call combiner", chand,
3076 calld);
3077 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003078 GRPC_CALL_COMBINER_STOP(calld->call_combiner,
Mark D. Roth76e264b2017-08-25 09:03:33 -07003079 "batch does not include send_initial_metadata");
Mark D. Roth60751fe2017-07-07 12:50:33 -07003080 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -07003081}
3082
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08003083/* Constructor for call_data */
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003084static grpc_error* cc_init_call_elem(grpc_call_element* elem,
Craig Tillerbaa14a92017-11-03 09:09:36 -07003085 const grpc_call_element_args* args) {
Noah Eisenbe82e642018-02-09 09:16:55 -08003086 call_data* calld = static_cast<call_data*>(elem->call_data);
3087 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Mark D. Rothe40dd292016-10-05 14:58:37 -07003088 // Initialize data members.
Craig Tiller7c70b6c2017-01-23 07:48:42 -08003089 calld->path = grpc_slice_ref_internal(args->path);
Mark D. Rothff08f332016-10-14 13:01:01 -07003090 calld->call_start_time = args->start_time;
Craig Tiller89c14282017-07-19 15:32:27 -07003091 calld->deadline = args->deadline;
Craig Tillerd426cac2017-03-13 12:30:45 -07003092 calld->arena = args->arena;
Mark D. Roth66f3d2b2017-09-01 09:02:17 -07003093 calld->owning_call = args->call_stack;
Mark D. Roth76e264b2017-08-25 09:03:33 -07003094 calld->call_combiner = args->call_combiner;
Yash Tibrewalcff3f9f2018-05-21 15:51:47 -07003095 if (GPR_LIKELY(chand->deadline_checking_enabled)) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003096 grpc_deadline_state_init(elem, args->call_stack, args->call_combiner,
3097 calld->deadline);
Craig Tiller3be7dd02017-04-03 14:30:03 -07003098 }
Mark D. Roth718c8342018-02-28 13:00:04 -08003099 calld->enable_retries = chand->enable_retries;
Mark D. Rothefcd45b2018-03-28 10:49:59 -07003100 calld->send_messages.Init();
Mark D. Roth0badbe82016-06-23 10:15:12 -07003101 return GRPC_ERROR_NONE;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08003102}
3103
3104/* Destructor for call_data */
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003105static void cc_destroy_call_elem(grpc_call_element* elem,
Craig Tillerbaa14a92017-11-03 09:09:36 -07003106 const grpc_call_final_info* final_info,
3107 grpc_closure* then_schedule_closure) {
Noah Eisenbe82e642018-02-09 09:16:55 -08003108 call_data* calld = static_cast<call_data*>(elem->call_data);
3109 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Yash Tibrewalcff3f9f2018-05-21 15:51:47 -07003110 if (GPR_LIKELY(chand->deadline_checking_enabled)) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003111 grpc_deadline_state_destroy(elem);
Craig Tiller3be7dd02017-04-03 14:30:03 -07003112 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003113 grpc_slice_unref_internal(calld->path);
Mark D. Roth9db86fc2018-03-28 07:42:20 -07003114 calld->retry_throttle_data.reset();
Mark D. Roth3e7f2df2018-02-26 13:17:06 -08003115 calld->method_params.reset();
Mark D. Roth718c8342018-02-28 13:00:04 -08003116 GRPC_ERROR_UNREF(calld->cancel_error);
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07003117 if (GPR_LIKELY(calld->subchannel_call != nullptr)) {
Mark D. Roth76e264b2017-08-25 09:03:33 -07003118 grpc_subchannel_call_set_cleanup_closure(calld->subchannel_call,
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07003119 then_schedule_closure);
Craig Tiller4782d922017-11-10 09:53:21 -08003120 then_schedule_closure = nullptr;
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003121 GRPC_SUBCHANNEL_CALL_UNREF(calld->subchannel_call,
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07003122 "client_channel_destroy_call");
Mark D. Roth4c0fe492016-08-31 13:51:55 -07003123 }
Mark D. Roth718c8342018-02-28 13:00:04 -08003124 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
3125 GPR_ASSERT(calld->pending_batches[i].batch == nullptr);
3126 }
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07003127 if (GPR_LIKELY(calld->pick.connected_subchannel != nullptr)) {
David Garcia Quintasdfa28512018-01-11 18:31:13 -08003128 calld->pick.connected_subchannel.reset();
Craig Tiller693d3942016-10-27 16:51:25 -07003129 }
Mark D. Roth09e458c2017-05-02 08:13:26 -07003130 for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) {
Mark D. Rothc0febd32018-01-09 10:25:24 -08003131 if (calld->pick.subchannel_call_context[i].value != nullptr) {
3132 calld->pick.subchannel_call_context[i].destroy(
3133 calld->pick.subchannel_call_context[i].value);
Mark D. Roth09e458c2017-05-02 08:13:26 -07003134 }
3135 }
Mark D. Rothefcd45b2018-03-28 10:49:59 -07003136 calld->send_messages.Destroy();
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003137 GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08003138}
3139
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003140static void cc_set_pollset_or_pollset_set(grpc_call_element* elem,
Craig Tillerbaa14a92017-11-03 09:09:36 -07003141 grpc_polling_entity* pollent) {
Noah Eisenbe82e642018-02-09 09:16:55 -08003142 call_data* calld = static_cast<call_data*>(elem->call_data);
David Garcia Quintas2a50dfe2016-05-31 15:09:12 -07003143 calld->pollent = pollent;
Craig Tiller577c9b22015-11-02 14:11:15 -08003144}
3145
Mark D. Roth2a5959f2016-09-01 08:20:27 -07003146/*************************************************************************
3147 * EXPORTED SYMBOLS
3148 */
3149
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08003150const grpc_channel_filter grpc_client_channel_filter = {
Craig Tillera0f3abd2017-03-31 15:42:16 -07003151 cc_start_transport_stream_op_batch,
Craig Tillerf40df232016-03-25 13:38:14 -07003152 cc_start_transport_op,
3153 sizeof(call_data),
Mark D. Roth2a5959f2016-09-01 08:20:27 -07003154 cc_init_call_elem,
David Garcia Quintas4afce7e2016-04-18 16:25:17 -07003155 cc_set_pollset_or_pollset_set,
Mark D. Roth2a5959f2016-09-01 08:20:27 -07003156 cc_destroy_call_elem,
Craig Tillerf40df232016-03-25 13:38:14 -07003157 sizeof(channel_data),
Mark D. Roth2a5959f2016-09-01 08:20:27 -07003158 cc_init_channel_elem,
3159 cc_destroy_channel_elem,
Mark D. Rothb2d24882016-10-27 15:44:07 -07003160 cc_get_channel_info,
Craig Tillerf40df232016-03-25 13:38:14 -07003161 "client-channel",
Craig Tiller87d5b192015-04-16 14:37:57 -07003162};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08003163
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003164static void try_to_connect_locked(void* arg, grpc_error* error_ignored) {
Noah Eisenbe82e642018-02-09 09:16:55 -08003165 channel_data* chand = static_cast<channel_data*>(arg);
Craig Tiller4782d922017-11-10 09:53:21 -08003166 if (chand->lb_policy != nullptr) {
Mark D. Rothc8875492018-02-20 08:33:48 -08003167 chand->lb_policy->ExitIdleLocked();
Craig Tiller613dafa2017-02-09 12:00:43 -08003168 } else {
3169 chand->exit_idle_when_lb_policy_arrives = true;
Craig Tiller4782d922017-11-10 09:53:21 -08003170 if (!chand->started_resolving && chand->resolver != nullptr) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003171 start_resolving_locked(chand);
Craig Tiller613dafa2017-02-09 12:00:43 -08003172 }
3173 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003174 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "try_to_connect");
Craig Tiller613dafa2017-02-09 12:00:43 -08003175}
3176
ncteisen018498a2018-06-29 14:48:05 -07003177void grpc_client_channel_populate_child_refs(
3178 grpc_channel_element* elem, grpc_core::ChildRefsList* child_subchannels,
3179 grpc_core::ChildRefsList* child_channels) {
3180 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
ncteisen25082c52018-07-12 17:24:48 -07003181 if (chand->lb_policy != nullptr) {
ncteisen018498a2018-06-29 14:48:05 -07003182 chand->lb_policy->FillChildRefsForChannelz(child_subchannels,
3183 child_channels);
3184 }
3185}
3186
Craig Tillera82950e2015-09-22 12:33:20 -07003187grpc_connectivity_state grpc_client_channel_check_connectivity_state(
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003188 grpc_channel_element* elem, int try_to_connect) {
Noah Eisenbe82e642018-02-09 09:16:55 -08003189 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Craig Tillera8610c02017-02-14 10:05:11 -08003190 grpc_connectivity_state out =
3191 grpc_connectivity_state_check(&chand->state_tracker);
Craig Tillera82950e2015-09-22 12:33:20 -07003192 if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
Craig Tillerd2e5cfc2017-02-09 13:02:20 -08003193 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect");
ncteisen274bbbe2017-06-08 14:57:11 -07003194 GRPC_CLOSURE_SCHED(
Yash Tibrewal0ee75742017-10-13 16:07:13 -07003195 GRPC_CLOSURE_CREATE(try_to_connect_locked, chand,
3196 grpc_combiner_scheduler(chand->combiner)),
Craig Tiller613dafa2017-02-09 12:00:43 -08003197 GRPC_ERROR_NONE);
Craig Tillera82950e2015-09-22 12:33:20 -07003198 }
Craig Tiller48cb07c2015-07-15 16:16:15 -07003199 return out;
3200}
3201
Alexander Polcync3b1f182017-04-18 13:51:36 -07003202typedef struct external_connectivity_watcher {
Craig Tillerbaa14a92017-11-03 09:09:36 -07003203 channel_data* chand;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07003204 grpc_polling_entity pollent;
Craig Tillerbaa14a92017-11-03 09:09:36 -07003205 grpc_closure* on_complete;
3206 grpc_closure* watcher_timer_init;
3207 grpc_connectivity_state* state;
Craig Tiller86c99582015-11-25 15:22:26 -08003208 grpc_closure my_closure;
Craig Tillerbaa14a92017-11-03 09:09:36 -07003209 struct external_connectivity_watcher* next;
Craig Tiller86c99582015-11-25 15:22:26 -08003210} external_connectivity_watcher;
3211
Craig Tillerbaa14a92017-11-03 09:09:36 -07003212static external_connectivity_watcher* lookup_external_connectivity_watcher(
3213 channel_data* chand, grpc_closure* on_complete) {
Alexander Polcync3b1f182017-04-18 13:51:36 -07003214 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
Craig Tillerbaa14a92017-11-03 09:09:36 -07003215 external_connectivity_watcher* w =
Alexander Polcync3b1f182017-04-18 13:51:36 -07003216 chand->external_connectivity_watcher_list_head;
Craig Tiller4782d922017-11-10 09:53:21 -08003217 while (w != nullptr && w->on_complete != on_complete) {
Alexander Polcync3b1f182017-04-18 13:51:36 -07003218 w = w->next;
3219 }
3220 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
3221 return w;
3222}
3223
3224static void external_connectivity_watcher_list_append(
Craig Tillerbaa14a92017-11-03 09:09:36 -07003225 channel_data* chand, external_connectivity_watcher* w) {
Alexander Polcync3b1f182017-04-18 13:51:36 -07003226 GPR_ASSERT(!lookup_external_connectivity_watcher(chand, w->on_complete));
3227
3228 gpr_mu_lock(&w->chand->external_connectivity_watcher_list_mu);
3229 GPR_ASSERT(!w->next);
3230 w->next = chand->external_connectivity_watcher_list_head;
3231 chand->external_connectivity_watcher_list_head = w;
3232 gpr_mu_unlock(&w->chand->external_connectivity_watcher_list_mu);
3233}
3234
3235static void external_connectivity_watcher_list_remove(
Craig Tillerbaa14a92017-11-03 09:09:36 -07003236 channel_data* chand, external_connectivity_watcher* too_remove) {
Alexander Polcync3b1f182017-04-18 13:51:36 -07003237 GPR_ASSERT(
3238 lookup_external_connectivity_watcher(chand, too_remove->on_complete));
3239 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
3240 if (too_remove == chand->external_connectivity_watcher_list_head) {
3241 chand->external_connectivity_watcher_list_head = too_remove->next;
3242 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
3243 return;
3244 }
Craig Tillerbaa14a92017-11-03 09:09:36 -07003245 external_connectivity_watcher* w =
Alexander Polcync3b1f182017-04-18 13:51:36 -07003246 chand->external_connectivity_watcher_list_head;
Craig Tiller4782d922017-11-10 09:53:21 -08003247 while (w != nullptr) {
Alexander Polcync3b1f182017-04-18 13:51:36 -07003248 if (w->next == too_remove) {
3249 w->next = w->next->next;
3250 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
3251 return;
3252 }
3253 w = w->next;
3254 }
3255 GPR_UNREACHABLE_CODE(return );
3256}
3257
3258int grpc_client_channel_num_external_connectivity_watchers(
Craig Tillerbaa14a92017-11-03 09:09:36 -07003259 grpc_channel_element* elem) {
Noah Eisenbe82e642018-02-09 09:16:55 -08003260 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Alexander Polcync3b1f182017-04-18 13:51:36 -07003261 int count = 0;
3262
3263 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
Craig Tillerbaa14a92017-11-03 09:09:36 -07003264 external_connectivity_watcher* w =
Alexander Polcync3b1f182017-04-18 13:51:36 -07003265 chand->external_connectivity_watcher_list_head;
Craig Tiller4782d922017-11-10 09:53:21 -08003266 while (w != nullptr) {
Alexander Polcync3b1f182017-04-18 13:51:36 -07003267 count++;
3268 w = w->next;
3269 }
3270 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
3271
3272 return count;
3273}
3274
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003275static void on_external_watch_complete_locked(void* arg, grpc_error* error) {
Noah Eisen4d20a662018-02-09 09:34:04 -08003276 external_connectivity_watcher* w =
3277 static_cast<external_connectivity_watcher*>(arg);
Craig Tillerbaa14a92017-11-03 09:09:36 -07003278 grpc_closure* follow_up = w->on_complete;
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003279 grpc_polling_entity_del_from_pollset_set(&w->pollent,
David Garcia Quintas87d5a312017-06-06 19:45:58 -07003280 w->chand->interested_parties);
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003281 GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack,
Craig Tiller1d881fb2015-12-01 07:39:04 -08003282 "external_connectivity_watcher");
Alexander Polcync3b1f182017-04-18 13:51:36 -07003283 external_connectivity_watcher_list_remove(w->chand, w);
Craig Tiller86c99582015-11-25 15:22:26 -08003284 gpr_free(w);
Yash Tibrewal2629f462018-04-30 14:52:31 -07003285 GRPC_CLOSURE_SCHED(follow_up, GRPC_ERROR_REF(error));
Craig Tiller613dafa2017-02-09 12:00:43 -08003286}
3287
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003288static void watch_connectivity_state_locked(void* arg,
Craig Tillerbaa14a92017-11-03 09:09:36 -07003289 grpc_error* error_ignored) {
Noah Eisen4d20a662018-02-09 09:34:04 -08003290 external_connectivity_watcher* w =
3291 static_cast<external_connectivity_watcher*>(arg);
Craig Tiller4782d922017-11-10 09:53:21 -08003292 external_connectivity_watcher* found = nullptr;
3293 if (w->state != nullptr) {
Alexander Polcync3b1f182017-04-18 13:51:36 -07003294 external_connectivity_watcher_list_append(w->chand, w);
Yash Tibrewal446d1ea2018-04-30 16:58:21 -07003295 // An assumption is being made that the closure is scheduled on the exec ctx
3296 // scheduler and that GRPC_CLOSURE_RUN would run the closure immediately.
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003297 GRPC_CLOSURE_RUN(w->watcher_timer_init, GRPC_ERROR_NONE);
Alexander Polcyn2004e392017-10-16 15:14:46 -07003298 GRPC_CLOSURE_INIT(&w->my_closure, on_external_watch_complete_locked, w,
3299 grpc_combiner_scheduler(w->chand->combiner));
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003300 grpc_connectivity_state_notify_on_state_change(&w->chand->state_tracker,
3301 w->state, &w->my_closure);
Alexander Polcync3b1f182017-04-18 13:51:36 -07003302 } else {
Craig Tiller4782d922017-11-10 09:53:21 -08003303 GPR_ASSERT(w->watcher_timer_init == nullptr);
Alexander Polcync3b1f182017-04-18 13:51:36 -07003304 found = lookup_external_connectivity_watcher(w->chand, w->on_complete);
3305 if (found) {
3306 GPR_ASSERT(found->on_complete == w->on_complete);
3307 grpc_connectivity_state_notify_on_state_change(
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003308 &found->chand->state_tracker, nullptr, &found->my_closure);
Alexander Polcync3b1f182017-04-18 13:51:36 -07003309 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003310 grpc_polling_entity_del_from_pollset_set(&w->pollent,
David Garcia Quintas87d5a312017-06-06 19:45:58 -07003311 w->chand->interested_parties);
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003312 GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack,
Alexander Polcync3b1f182017-04-18 13:51:36 -07003313 "external_connectivity_watcher");
3314 gpr_free(w);
3315 }
Craig Tiller86c99582015-11-25 15:22:26 -08003316}
3317
Craig Tillera82950e2015-09-22 12:33:20 -07003318void grpc_client_channel_watch_connectivity_state(
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003319 grpc_channel_element* elem, grpc_polling_entity pollent,
3320 grpc_connectivity_state* state, grpc_closure* closure,
3321 grpc_closure* watcher_timer_init) {
Noah Eisenbe82e642018-02-09 09:16:55 -08003322 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Craig Tillerbaa14a92017-11-03 09:09:36 -07003323 external_connectivity_watcher* w =
Noah Eisenbe82e642018-02-09 09:16:55 -08003324 static_cast<external_connectivity_watcher*>(gpr_zalloc(sizeof(*w)));
Craig Tiller86c99582015-11-25 15:22:26 -08003325 w->chand = chand;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07003326 w->pollent = pollent;
Mark D. Roth92210832017-05-02 15:04:39 -07003327 w->on_complete = closure;
Craig Tiller613dafa2017-02-09 12:00:43 -08003328 w->state = state;
Alexander Polcync3b1f182017-04-18 13:51:36 -07003329 w->watcher_timer_init = watcher_timer_init;
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003330 grpc_polling_entity_add_to_pollset_set(&w->pollent,
David Garcia Quintas87d5a312017-06-06 19:45:58 -07003331 chand->interested_parties);
Craig Tiller1d881fb2015-12-01 07:39:04 -08003332 GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
3333 "external_connectivity_watcher");
ncteisen274bbbe2017-06-08 14:57:11 -07003334 GRPC_CLOSURE_SCHED(
ncteisen274bbbe2017-06-08 14:57:11 -07003335 GRPC_CLOSURE_INIT(&w->my_closure, watch_connectivity_state_locked, w,
Craig Tilleree4b1452017-05-12 10:56:03 -07003336 grpc_combiner_scheduler(chand->combiner)),
Craig Tiller613dafa2017-02-09 12:00:43 -08003337 GRPC_ERROR_NONE);
Craig Tiller48cb07c2015-07-15 16:16:15 -07003338}
Mark D. Roth718c8342018-02-28 13:00:04 -08003339
3340grpc_subchannel_call* grpc_client_channel_get_subchannel_call(
3341 grpc_call_element* elem) {
3342 call_data* calld = static_cast<call_data*>(elem->call_data);
3343 return calld->subchannel_call;
3344}