blob: 8c3e9f2b30553189e60f6eafcb22ac6d0af2df19 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2015 gRPC authors.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080016 *
17 */
18
Yash Tibrewal37fdb732017-09-25 16:45:02 -070019#include <grpc/support/port_platform.h>
20
Craig Tiller9eb0fde2017-03-31 16:59:30 -070021#include "src/core/ext/filters/client_channel/client_channel.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080022
Yash Tibrewalfcd26bc2017-09-25 15:08:28 -070023#include <inttypes.h>
Mark D. Roth718c8342018-02-28 13:00:04 -080024#include <limits.h>
Mark D. Roth4c0fe492016-08-31 13:51:55 -070025#include <stdbool.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080026#include <stdio.h>
Craig Tillereb3b12e2015-06-26 14:42:49 -070027#include <string.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080028
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080029#include <grpc/support/alloc.h>
30#include <grpc/support/log.h>
Mark D. Rothb2d24882016-10-27 15:44:07 -070031#include <grpc/support/string_util.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080032#include <grpc/support/sync.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080033
Yuchen Zeng0bad30a2017-10-05 21:47:39 -070034#include "src/core/ext/filters/client_channel/backup_poller.h"
Craig Tiller9eb0fde2017-03-31 16:59:30 -070035#include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
36#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
Mark D. Roth3e7f2df2018-02-26 13:17:06 -080037#include "src/core/ext/filters/client_channel/method_params.h"
Craig Tiller9eb0fde2017-03-31 16:59:30 -070038#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
39#include "src/core/ext/filters/client_channel/resolver_registry.h"
40#include "src/core/ext/filters/client_channel/retry_throttle.h"
41#include "src/core/ext/filters/client_channel/subchannel.h"
Craig Tiller3be7dd02017-04-03 14:30:03 -070042#include "src/core/ext/filters/deadline/deadline_filter.h"
Mark D. Roth718c8342018-02-28 13:00:04 -080043#include "src/core/lib/backoff/backoff.h"
Craig Tiller9533d042016-03-25 17:11:06 -070044#include "src/core/lib/channel/channel_args.h"
45#include "src/core/lib/channel/connected_channel.h"
ncteisen3b42f832018-03-19 13:22:35 -070046#include "src/core/lib/channel/status_util.h"
Mark D. Rothdbdf4952018-01-18 11:21:12 -080047#include "src/core/lib/gpr/string.h"
Mark D. Roth718c8342018-02-28 13:00:04 -080048#include "src/core/lib/gprpp/inlined_vector.h"
49#include "src/core/lib/gprpp/manual_constructor.h"
Craig Tillerbefafe62017-02-09 11:30:54 -080050#include "src/core/lib/iomgr/combiner.h"
Craig Tiller9533d042016-03-25 17:11:06 -070051#include "src/core/lib/iomgr/iomgr.h"
Mark D. Roth4c0fe492016-08-31 13:51:55 -070052#include "src/core/lib/iomgr/polling_entity.h"
Craig Tiller9533d042016-03-25 17:11:06 -070053#include "src/core/lib/profiling/timers.h"
Craig Tiller7c70b6c2017-01-23 07:48:42 -080054#include "src/core/lib/slice/slice_internal.h"
Mark D. Roth718c8342018-02-28 13:00:04 -080055#include "src/core/lib/slice/slice_string_helpers.h"
Craig Tiller9533d042016-03-25 17:11:06 -070056#include "src/core/lib/surface/channel.h"
57#include "src/core/lib/transport/connectivity_state.h"
Mark D. Roth718c8342018-02-28 13:00:04 -080058#include "src/core/lib/transport/error_utils.h"
Mark D. Roth9fe284e2016-09-12 11:22:27 -070059#include "src/core/lib/transport/metadata.h"
60#include "src/core/lib/transport/metadata_batch.h"
Mark D. Rothea846a02016-11-03 11:32:54 -070061#include "src/core/lib/transport/service_config.h"
Mark D. Roth9fe284e2016-09-12 11:22:27 -070062#include "src/core/lib/transport/static_metadata.h"
Mark D. Roth718c8342018-02-28 13:00:04 -080063#include "src/core/lib/transport/status_metadata.h"
Craig Tiller8910ac62015-10-08 16:49:15 -070064
Mark D. Roth3e7f2df2018-02-26 13:17:06 -080065using grpc_core::internal::ClientChannelMethodParams;
Mark D. Roth9db86fc2018-03-28 07:42:20 -070066using grpc_core::internal::ServerRetryThrottleData;
Mark D. Roth3e7f2df2018-02-26 13:17:06 -080067
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080068/* Client channel implementation */
69
Mark D. Roth718c8342018-02-28 13:00:04 -080070// By default, we buffer 256 KiB per RPC for retries.
71// TODO(roth): Do we have any data to suggest a better value?
72#define DEFAULT_PER_RPC_RETRY_BUFFER_SIZE (256 << 10)
73
74// This value was picked arbitrarily. It can be changed if there is
75// any even moderately compelling reason to do so.
76#define RETRY_BACKOFF_JITTER 0.2
77
Craig Tiller694580f2017-10-18 14:48:14 -070078grpc_core::TraceFlag grpc_client_channel_trace(false, "client_channel");
Mark D. Roth60751fe2017-07-07 12:50:33 -070079
Mark D. Roth26b7be42016-10-24 10:08:07 -070080/*************************************************************************
Mark D. Roth3e7f2df2018-02-26 13:17:06 -080081 * CHANNEL-WIDE FUNCTIONS
Mark D. Roth26b7be42016-10-24 10:08:07 -070082 */
83
Alexander Polcync3b1f182017-04-18 13:51:36 -070084struct external_connectivity_watcher;
85
Mark D. Roth3e7f2df2018-02-26 13:17:06 -080086typedef grpc_core::SliceHashTable<
87 grpc_core::RefCountedPtr<ClientChannelMethodParams>>
88 MethodParamsTable;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080089
Craig Tiller800dacb2015-10-06 09:10:26 -070090typedef struct client_channel_channel_data {
Mark D. Roth209f6442018-02-08 10:26:46 -080091 grpc_core::OrphanablePtr<grpc_core::Resolver> resolver;
Mark D. Roth4c0fe492016-08-31 13:51:55 -070092 bool started_resolving;
Craig Tiller3be7dd02017-04-03 14:30:03 -070093 bool deadline_checking_enabled;
Craig Tillerbaa14a92017-11-03 09:09:36 -070094 grpc_client_channel_factory* client_channel_factory;
Mark D. Roth718c8342018-02-28 13:00:04 -080095 bool enable_retries;
96 size_t per_rpc_retry_buffer_size;
Craig Tillerf5f17122015-06-25 08:47:26 -070097
Craig Tillerbefafe62017-02-09 11:30:54 -080098 /** combiner protecting all variables below in this data structure */
Craig Tillerbaa14a92017-11-03 09:09:36 -070099 grpc_combiner* combiner;
Mark D. Roth046cf762016-09-26 11:13:51 -0700100 /** currently active load balancer */
Mark D. Rothc8875492018-02-20 08:33:48 -0800101 grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> lb_policy;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800102 /** retry throttle data */
Mark D. Roth9db86fc2018-03-28 07:42:20 -0700103 grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
Mark D. Roth9d480942016-10-19 14:18:05 -0700104 /** maps method names to method_parameters structs */
Mark D. Roth3e7f2df2018-02-26 13:17:06 -0800105 grpc_core::RefCountedPtr<MethodParamsTable> method_params_table;
Mark D. Roth046cf762016-09-26 11:13:51 -0700106 /** incoming resolver result - set by resolver.next() */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700107 grpc_channel_args* resolver_result;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700108 /** a list of closures that are all waiting for resolver result to come in */
109 grpc_closure_list waiting_for_resolver_result_closures;
Craig Tiller3f475422015-06-25 10:43:05 -0700110 /** resolver callback */
Mark D. Rothff4df062016-08-22 15:02:49 -0700111 grpc_closure on_resolver_result_changed;
Craig Tiller3f475422015-06-25 10:43:05 -0700112 /** connectivity state being tracked */
Craig Tillerca3e9d32015-06-27 18:37:27 -0700113 grpc_connectivity_state_tracker state_tracker;
Craig Tiller48cb07c2015-07-15 16:16:15 -0700114 /** when an lb_policy arrives, should we try to exit idle */
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700115 bool exit_idle_when_lb_policy_arrives;
Craig Tiller906e3bc2015-11-24 07:31:31 -0800116 /** owning stack */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700117 grpc_channel_stack* owning_stack;
Craig Tiller69b093b2016-02-25 19:04:07 -0800118 /** interested parties (owned) */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700119 grpc_pollset_set* interested_parties;
Craig Tiller613dafa2017-02-09 12:00:43 -0800120
Alexander Polcync3b1f182017-04-18 13:51:36 -0700121 /* external_connectivity_watcher_list head is guarded by its own mutex, since
122 * counts need to be grabbed immediately without polling on a cq */
123 gpr_mu external_connectivity_watcher_list_mu;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700124 struct external_connectivity_watcher* external_connectivity_watcher_list_head;
Alexander Polcync3b1f182017-04-18 13:51:36 -0700125
Mark D. Roth718c8342018-02-28 13:00:04 -0800126 /* the following properties are guarded by a mutex since APIs require them
Craig Tiller46dd7902017-02-23 09:42:16 -0800127 to be instantaneously available */
Craig Tiller613dafa2017-02-09 12:00:43 -0800128 gpr_mu info_mu;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700129 char* info_lb_policy_name;
Craig Tiller613dafa2017-02-09 12:00:43 -0800130 /** service config in JSON form */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700131 char* info_service_config_json;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800132} channel_data;
133
Juanli Shen592cf342017-12-04 20:52:01 -0800134typedef struct {
135 channel_data* chand;
136 /** used as an identifier, don't dereference it because the LB policy may be
137 * non-existing when the callback is run */
Mark D. Rothc8875492018-02-20 08:33:48 -0800138 grpc_core::LoadBalancingPolicy* lb_policy;
Juanli Shen592cf342017-12-04 20:52:01 -0800139 grpc_closure closure;
140} reresolution_request_args;
141
Craig Tillerd6c98df2015-08-18 09:33:44 -0700142/** We create one watcher for each new lb_policy that is returned from a
Mark D. Roth4c0fe492016-08-31 13:51:55 -0700143 resolver, to watch for state changes from the lb_policy. When a state
144 change is seen, we update the channel, and create a new watcher. */
Craig Tillera82950e2015-09-22 12:33:20 -0700145typedef struct {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700146 channel_data* chand;
Craig Tiller33825112015-09-18 07:44:19 -0700147 grpc_closure on_changed;
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700148 grpc_connectivity_state state;
Mark D. Rothc8875492018-02-20 08:33:48 -0800149 grpc_core::LoadBalancingPolicy* lb_policy;
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700150} lb_policy_connectivity_watcher;
151
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800152static void watch_lb_policy_locked(channel_data* chand,
Mark D. Rothc8875492018-02-20 08:33:48 -0800153 grpc_core::LoadBalancingPolicy* lb_policy,
Craig Tiller2400bf52017-02-09 16:25:19 -0800154 grpc_connectivity_state current_state);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700155
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800156static void set_channel_connectivity_state_locked(channel_data* chand,
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800157 grpc_connectivity_state state,
Craig Tillerbaa14a92017-11-03 09:09:36 -0700158 grpc_error* error,
159 const char* reason) {
David Garcia Quintas37251282017-04-14 13:46:03 -0700160 /* TODO: Improve failure handling:
161 * - Make it possible for policies to return GRPC_CHANNEL_TRANSIENT_FAILURE.
162 * - Hand over pending picks from old policies during the switch that happens
163 * when resolver provides an update. */
Craig Tiller4782d922017-11-10 09:53:21 -0800164 if (chand->lb_policy != nullptr) {
David Garcia Quintas956f7002017-04-13 15:40:06 -0700165 if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
166 /* cancel picks with wait_for_ready=false */
Mark D. Rothc8875492018-02-20 08:33:48 -0800167 chand->lb_policy->CancelMatchingPicksLocked(
David Garcia Quintas956f7002017-04-13 15:40:06 -0700168 /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY,
169 /* check= */ 0, GRPC_ERROR_REF(error));
170 } else if (state == GRPC_CHANNEL_SHUTDOWN) {
171 /* cancel all picks */
Mark D. Rothc8875492018-02-20 08:33:48 -0800172 chand->lb_policy->CancelMatchingPicksLocked(/* mask= */ 0, /* check= */ 0,
173 GRPC_ERROR_REF(error));
David Garcia Quintas956f7002017-04-13 15:40:06 -0700174 }
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800175 }
Craig Tiller6014e8a2017-10-16 13:50:29 -0700176 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -0700177 gpr_log(GPR_INFO, "chand=%p: setting connectivity state to %s", chand,
Mark D. Roth60751fe2017-07-07 12:50:33 -0700178 grpc_connectivity_state_name(state));
179 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800180 grpc_connectivity_state_set(&chand->state_tracker, state, error, reason);
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800181}
182
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800183static void on_lb_policy_state_changed_locked(void* arg, grpc_error* error) {
Noah Eisen4d20a662018-02-09 09:34:04 -0800184 lb_policy_connectivity_watcher* w =
185 static_cast<lb_policy_connectivity_watcher*>(arg);
Craig Tillerc5de8352017-02-09 14:08:05 -0800186 /* check if the notification is for the latest policy */
Mark D. Rothc8875492018-02-20 08:33:48 -0800187 if (w->lb_policy == w->chand->lb_policy.get()) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700188 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -0700189 gpr_log(GPR_INFO, "chand=%p: lb_policy=%p state changed to %s", w->chand,
Mark D. Roth60751fe2017-07-07 12:50:33 -0700190 w->lb_policy, grpc_connectivity_state_name(w->state));
191 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800192 set_channel_connectivity_state_locked(w->chand, w->state,
Craig Tillerc5de8352017-02-09 14:08:05 -0800193 GRPC_ERROR_REF(error), "lb_changed");
194 if (w->state != GRPC_CHANNEL_SHUTDOWN) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800195 watch_lb_policy_locked(w->chand, w->lb_policy, w->state);
Craig Tillerc5de8352017-02-09 14:08:05 -0800196 }
Craig Tillera82950e2015-09-22 12:33:20 -0700197 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800198 GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack, "watch_lb_policy");
Craig Tillera82950e2015-09-22 12:33:20 -0700199 gpr_free(w);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700200}
201
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800202static void watch_lb_policy_locked(channel_data* chand,
Mark D. Rothc8875492018-02-20 08:33:48 -0800203 grpc_core::LoadBalancingPolicy* lb_policy,
Craig Tiller2400bf52017-02-09 16:25:19 -0800204 grpc_connectivity_state current_state) {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700205 lb_policy_connectivity_watcher* w =
Noah Eisenbe82e642018-02-09 09:16:55 -0800206 static_cast<lb_policy_connectivity_watcher*>(gpr_malloc(sizeof(*w)));
Craig Tiller906e3bc2015-11-24 07:31:31 -0800207 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy");
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700208 w->chand = chand;
ncteisen274bbbe2017-06-08 14:57:11 -0700209 GRPC_CLOSURE_INIT(&w->on_changed, on_lb_policy_state_changed_locked, w,
Craig Tilleree4b1452017-05-12 10:56:03 -0700210 grpc_combiner_scheduler(chand->combiner));
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700211 w->state = current_state;
212 w->lb_policy = lb_policy;
Mark D. Rothc8875492018-02-20 08:33:48 -0800213 lb_policy->NotifyOnStateChangeLocked(&w->state, &w->on_changed);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700214}
215
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800216static void start_resolving_locked(channel_data* chand) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700217 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -0700218 gpr_log(GPR_INFO, "chand=%p: starting name resolution", chand);
Mark D. Roth60751fe2017-07-07 12:50:33 -0700219 }
220 GPR_ASSERT(!chand->started_resolving);
221 chand->started_resolving = true;
222 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
Mark D. Roth209f6442018-02-08 10:26:46 -0800223 chand->resolver->NextLocked(&chand->resolver_result,
224 &chand->on_resolver_result_changed);
Mark D. Roth60751fe2017-07-07 12:50:33 -0700225}
226
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800227typedef struct {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700228 char* server_name;
Mark D. Roth9db86fc2018-03-28 07:42:20 -0700229 grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800230} service_config_parsing_state;
231
Mark D. Roth3e7f2df2018-02-26 13:17:06 -0800232static void parse_retry_throttle_params(
233 const grpc_json* field, service_config_parsing_state* parsing_state) {
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800234 if (strcmp(field->key, "retryThrottling") == 0) {
Craig Tiller4782d922017-11-10 09:53:21 -0800235 if (parsing_state->retry_throttle_data != nullptr) return; // Duplicate.
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800236 if (field->type != GRPC_JSON_OBJECT) return;
237 int max_milli_tokens = 0;
238 int milli_token_ratio = 0;
Craig Tiller4782d922017-11-10 09:53:21 -0800239 for (grpc_json* sub_field = field->child; sub_field != nullptr;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800240 sub_field = sub_field->next) {
Craig Tiller4782d922017-11-10 09:53:21 -0800241 if (sub_field->key == nullptr) return;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800242 if (strcmp(sub_field->key, "maxTokens") == 0) {
243 if (max_milli_tokens != 0) return; // Duplicate.
244 if (sub_field->type != GRPC_JSON_NUMBER) return;
245 max_milli_tokens = gpr_parse_nonnegative_int(sub_field->value);
246 if (max_milli_tokens == -1) return;
247 max_milli_tokens *= 1000;
248 } else if (strcmp(sub_field->key, "tokenRatio") == 0) {
249 if (milli_token_ratio != 0) return; // Duplicate.
250 if (sub_field->type != GRPC_JSON_NUMBER) return;
251 // We support up to 3 decimal digits.
252 size_t whole_len = strlen(sub_field->value);
253 uint32_t multiplier = 1;
254 uint32_t decimal_value = 0;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700255 const char* decimal_point = strchr(sub_field->value, '.');
Craig Tiller4782d922017-11-10 09:53:21 -0800256 if (decimal_point != nullptr) {
Noah Eisenbe82e642018-02-09 09:16:55 -0800257 whole_len = static_cast<size_t>(decimal_point - sub_field->value);
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800258 multiplier = 1000;
259 size_t decimal_len = strlen(decimal_point + 1);
260 if (decimal_len > 3) decimal_len = 3;
261 if (!gpr_parse_bytes_to_uint32(decimal_point + 1, decimal_len,
262 &decimal_value)) {
263 return;
264 }
265 uint32_t decimal_multiplier = 1;
266 for (size_t i = 0; i < (3 - decimal_len); ++i) {
267 decimal_multiplier *= 10;
268 }
269 decimal_value *= decimal_multiplier;
270 }
271 uint32_t whole_value;
272 if (!gpr_parse_bytes_to_uint32(sub_field->value, whole_len,
273 &whole_value)) {
274 return;
275 }
Noah Eisen4d20a662018-02-09 09:34:04 -0800276 milli_token_ratio =
277 static_cast<int>((whole_value * multiplier) + decimal_value);
Mark D. Rothb3322562017-02-23 14:38:02 -0800278 if (milli_token_ratio <= 0) return;
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800279 }
280 }
281 parsing_state->retry_throttle_data =
Mark D. Roth9db86fc2018-03-28 07:42:20 -0700282 grpc_core::internal::ServerRetryThrottleMap::GetDataForServer(
Mark D. Rothd6d192d2017-02-23 08:58:42 -0800283 parsing_state->server_name, max_milli_tokens, milli_token_ratio);
284 }
285}
286
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800287static void request_reresolution_locked(void* arg, grpc_error* error) {
Noah Eisen4d20a662018-02-09 09:34:04 -0800288 reresolution_request_args* args =
289 static_cast<reresolution_request_args*>(arg);
Juanli Shen592cf342017-12-04 20:52:01 -0800290 channel_data* chand = args->chand;
291 // If this invocation is for a stale LB policy, treat it as an LB shutdown
292 // signal.
Mark D. Rothc8875492018-02-20 08:33:48 -0800293 if (args->lb_policy != chand->lb_policy.get() || error != GRPC_ERROR_NONE ||
Juanli Shen592cf342017-12-04 20:52:01 -0800294 chand->resolver == nullptr) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800295 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "re-resolution");
Juanli Shen592cf342017-12-04 20:52:01 -0800296 gpr_free(args);
297 return;
298 }
299 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -0700300 gpr_log(GPR_INFO, "chand=%p: started name re-resolving", chand);
Juanli Shen592cf342017-12-04 20:52:01 -0800301 }
Mark D. Roth209f6442018-02-08 10:26:46 -0800302 chand->resolver->RequestReresolutionLocked();
Juanli Shen592cf342017-12-04 20:52:01 -0800303 // Give back the closure to the LB policy.
Mark D. Rothc8875492018-02-20 08:33:48 -0800304 chand->lb_policy->SetReresolutionClosureLocked(&args->closure);
Juanli Shen592cf342017-12-04 20:52:01 -0800305}
306
Mark D. Rothe63e06d2018-03-23 08:12:11 -0700307// TODO(roth): The logic in this function is very hard to follow. We
308// should refactor this so that it's easier to understand, perhaps as
309// part of changing the resolver API to more clearly differentiate
310// between transient failures and shutdown.
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800311static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
Noah Eisenbe82e642018-02-09 09:16:55 -0800312 channel_data* chand = static_cast<channel_data*>(arg);
Craig Tiller6014e8a2017-10-16 13:50:29 -0700313 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -0700314 gpr_log(GPR_INFO,
Mark D. Rothe63e06d2018-03-23 08:12:11 -0700315 "chand=%p: got resolver result: resolver_result=%p error=%s", chand,
316 chand->resolver_result, grpc_error_string(error));
Mark D. Roth60751fe2017-07-07 12:50:33 -0700317 }
Mark D. Roth718c8342018-02-28 13:00:04 -0800318 // Extract the following fields from the resolver result, if non-nullptr.
Mark D. Roth15494b52017-07-12 15:26:55 -0700319 bool lb_policy_updated = false;
Mark D. Rothc8875492018-02-20 08:33:48 -0800320 bool lb_policy_created = false;
Craig Tiller4782d922017-11-10 09:53:21 -0800321 char* lb_policy_name_dup = nullptr;
Mark D. Roth60751fe2017-07-07 12:50:33 -0700322 bool lb_policy_name_changed = false;
Mark D. Rothc8875492018-02-20 08:33:48 -0800323 grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> new_lb_policy;
Craig Tiller4782d922017-11-10 09:53:21 -0800324 char* service_config_json = nullptr;
Mark D. Roth9db86fc2018-03-28 07:42:20 -0700325 grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
Mark D. Roth3e7f2df2018-02-26 13:17:06 -0800326 grpc_core::RefCountedPtr<MethodParamsTable> method_params_table;
Craig Tiller4782d922017-11-10 09:53:21 -0800327 if (chand->resolver_result != nullptr) {
Juanli Shen592cf342017-12-04 20:52:01 -0800328 if (chand->resolver != nullptr) {
329 // Find LB policy name.
Noah Eisen7ea8a602018-06-14 11:43:18 -0400330 const grpc_arg* channel_arg = grpc_channel_args_find(
Juanli Shen592cf342017-12-04 20:52:01 -0800331 chand->resolver_result, GRPC_ARG_LB_POLICY_NAME);
Noah Eisen7ea8a602018-06-14 11:43:18 -0400332 const char* lb_policy_name = grpc_channel_arg_get_string(channel_arg);
Juanli Shen592cf342017-12-04 20:52:01 -0800333 // Special case: If at least one balancer address is present, we use
334 // the grpclb policy, regardless of what the resolver actually specified.
Noah Eisen7ea8a602018-06-14 11:43:18 -0400335 channel_arg =
336 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES);
337 if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) {
338 grpc_lb_addresses* addresses =
339 static_cast<grpc_lb_addresses*>(channel_arg->value.pointer.p);
Juanli Shen592cf342017-12-04 20:52:01 -0800340 bool found_balancer_address = false;
341 for (size_t i = 0; i < addresses->num_addresses; ++i) {
342 if (addresses->addresses[i].is_balancer) {
343 found_balancer_address = true;
344 break;
345 }
346 }
347 if (found_balancer_address) {
348 if (lb_policy_name != nullptr &&
349 strcmp(lb_policy_name, "grpclb") != 0) {
350 gpr_log(GPR_INFO,
351 "resolver requested LB policy %s but provided at least one "
352 "balancer address -- forcing use of grpclb LB policy",
353 lb_policy_name);
354 }
355 lb_policy_name = "grpclb";
356 }
357 }
358 // Use pick_first if nothing was specified and we didn't select grpclb
359 // above.
360 if (lb_policy_name == nullptr) lb_policy_name = "pick_first";
Juanli Shen592cf342017-12-04 20:52:01 -0800361 // Check to see if we're already using the right LB policy.
362 // Note: It's safe to use chand->info_lb_policy_name here without
363 // taking a lock on chand->info_mu, because this function is the
364 // only thing that modifies its value, and it can only be invoked
365 // once at any given time.
366 lb_policy_name_changed =
367 chand->info_lb_policy_name == nullptr ||
368 gpr_stricmp(chand->info_lb_policy_name, lb_policy_name) != 0;
369 if (chand->lb_policy != nullptr && !lb_policy_name_changed) {
370 // Continue using the same LB policy. Update with new addresses.
371 lb_policy_updated = true;
Mark D. Rothc8875492018-02-20 08:33:48 -0800372 chand->lb_policy->UpdateLocked(*chand->resolver_result);
Juanli Shen592cf342017-12-04 20:52:01 -0800373 } else {
374 // Instantiate new LB policy.
Mark D. Rothc8875492018-02-20 08:33:48 -0800375 grpc_core::LoadBalancingPolicy::Args lb_policy_args;
376 lb_policy_args.combiner = chand->combiner;
377 lb_policy_args.client_channel_factory = chand->client_channel_factory;
378 lb_policy_args.args = chand->resolver_result;
379 new_lb_policy =
380 grpc_core::LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
381 lb_policy_name, lb_policy_args);
Yash Tibrewal7f51ba82018-04-12 13:21:20 -0700382 if (GPR_UNLIKELY(new_lb_policy == nullptr)) {
Juanli Shen592cf342017-12-04 20:52:01 -0800383 gpr_log(GPR_ERROR, "could not create LB policy \"%s\"",
384 lb_policy_name);
385 } else {
Mark D. Roth3ef4af22018-02-21 07:53:26 -0800386 lb_policy_created = true;
Juanli Shen592cf342017-12-04 20:52:01 -0800387 reresolution_request_args* args =
Noah Eisen4d20a662018-02-09 09:34:04 -0800388 static_cast<reresolution_request_args*>(
389 gpr_zalloc(sizeof(*args)));
Juanli Shen592cf342017-12-04 20:52:01 -0800390 args->chand = chand;
Mark D. Rothc8875492018-02-20 08:33:48 -0800391 args->lb_policy = new_lb_policy.get();
Juanli Shen592cf342017-12-04 20:52:01 -0800392 GRPC_CLOSURE_INIT(&args->closure, request_reresolution_locked, args,
393 grpc_combiner_scheduler(chand->combiner));
394 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "re-resolution");
Mark D. Rothc8875492018-02-20 08:33:48 -0800395 new_lb_policy->SetReresolutionClosureLocked(&args->closure);
Juanli Shen592cf342017-12-04 20:52:01 -0800396 }
397 }
Mark D. Roth718c8342018-02-28 13:00:04 -0800398 // Before we clean up, save a copy of lb_policy_name, since it might
399 // be pointing to data inside chand->resolver_result.
400 // The copy will be saved in chand->lb_policy_name below.
401 lb_policy_name_dup = gpr_strdup(lb_policy_name);
Juanli Shen592cf342017-12-04 20:52:01 -0800402 // Find service config.
Noah Eisen7ea8a602018-06-14 11:43:18 -0400403 channel_arg = grpc_channel_args_find(chand->resolver_result,
404 GRPC_ARG_SERVICE_CONFIG);
405 service_config_json =
406 gpr_strdup(grpc_channel_arg_get_string(channel_arg));
ncteisenbf323a92018-02-14 17:34:05 -0800407 if (service_config_json != nullptr) {
Mark D. Roth3e7f2df2018-02-26 13:17:06 -0800408 grpc_core::UniquePtr<grpc_core::ServiceConfig> service_config =
409 grpc_core::ServiceConfig::Create(service_config_json);
Juanli Shen592cf342017-12-04 20:52:01 -0800410 if (service_config != nullptr) {
Mark D. Roth718c8342018-02-28 13:00:04 -0800411 if (chand->enable_retries) {
Noah Eisen7ea8a602018-06-14 11:43:18 -0400412 channel_arg = grpc_channel_args_find(chand->resolver_result,
413 GRPC_ARG_SERVER_URI);
414 const char* server_uri = grpc_channel_arg_get_string(channel_arg);
Mark D. Roth718c8342018-02-28 13:00:04 -0800415 GPR_ASSERT(server_uri != nullptr);
416 grpc_uri* uri = grpc_uri_parse(server_uri, true);
417 GPR_ASSERT(uri->path[0] != '\0');
418 service_config_parsing_state parsing_state;
419 memset(&parsing_state, 0, sizeof(parsing_state));
420 parsing_state.server_name =
421 uri->path[0] == '/' ? uri->path + 1 : uri->path;
422 service_config->ParseGlobalParams(parse_retry_throttle_params,
423 &parsing_state);
424 grpc_uri_destroy(uri);
Mark D. Roth9db86fc2018-03-28 07:42:20 -0700425 retry_throttle_data = std::move(parsing_state.retry_throttle_data);
Mark D. Roth718c8342018-02-28 13:00:04 -0800426 }
Mark D. Roth3e7f2df2018-02-26 13:17:06 -0800427 method_params_table = service_config->CreateMethodConfigTable(
428 ClientChannelMethodParams::CreateFromJson);
Juanli Shen592cf342017-12-04 20:52:01 -0800429 }
430 }
Mark D. Roth9fe284e2016-09-12 11:22:27 -0700431 }
Craig Tillera82950e2015-09-22 12:33:20 -0700432 }
Craig Tiller6014e8a2017-10-16 13:50:29 -0700433 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -0700434 gpr_log(GPR_INFO,
Mark D. Roth60751fe2017-07-07 12:50:33 -0700435 "chand=%p: resolver result: lb_policy_name=\"%s\"%s, "
436 "service_config=\"%s\"",
Yash Tibrewal9eb86722017-09-17 23:43:30 -0700437 chand, lb_policy_name_dup,
438 lb_policy_name_changed ? " (changed)" : "", service_config_json);
Mark D. Roth60751fe2017-07-07 12:50:33 -0700439 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700440 // Now swap out fields in chand. Note that the new values may still
Mark D. Roth718c8342018-02-28 13:00:04 -0800441 // be nullptr if (e.g.) the resolver failed to return results or the
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700442 // results did not contain the necessary data.
443 //
444 // First, swap out the data used by cc_get_channel_info().
Craig Tiller613dafa2017-02-09 12:00:43 -0800445 gpr_mu_lock(&chand->info_mu);
Craig Tiller4782d922017-11-10 09:53:21 -0800446 if (lb_policy_name_dup != nullptr) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800447 gpr_free(chand->info_lb_policy_name);
Yash Tibrewal9eb86722017-09-17 23:43:30 -0700448 chand->info_lb_policy_name = lb_policy_name_dup;
Mark D. Rothb2d24882016-10-27 15:44:07 -0700449 }
Craig Tiller4782d922017-11-10 09:53:21 -0800450 if (service_config_json != nullptr) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800451 gpr_free(chand->info_service_config_json);
452 chand->info_service_config_json = service_config_json;
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800453 }
Craig Tiller613dafa2017-02-09 12:00:43 -0800454 gpr_mu_unlock(&chand->info_mu);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700455 // Swap out the retry throttle data.
Mark D. Roth9db86fc2018-03-28 07:42:20 -0700456 chand->retry_throttle_data = std::move(retry_throttle_data);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700457 // Swap out the method params table.
Mark D. Roth3e7f2df2018-02-26 13:17:06 -0800458 chand->method_params_table = std::move(method_params_table);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700459 // If we have a new LB policy or are shutting down (in which case
Mark D. Roth718c8342018-02-28 13:00:04 -0800460 // new_lb_policy will be nullptr), swap out the LB policy, unreffing the
461 // old one and removing its fds from chand->interested_parties.
462 // Note that we do NOT do this if either (a) we updated the existing
463 // LB policy above or (b) we failed to create the new LB policy (in
464 // which case we want to continue using the most recent one we had).
Craig Tiller4782d922017-11-10 09:53:21 -0800465 if (new_lb_policy != nullptr || error != GRPC_ERROR_NONE ||
466 chand->resolver == nullptr) {
467 if (chand->lb_policy != nullptr) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700468 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -0700469 gpr_log(GPR_INFO, "chand=%p: unreffing lb_policy=%p", chand,
Mark D. Rothc8875492018-02-20 08:33:48 -0800470 chand->lb_policy.get());
Mark D. Roth60751fe2017-07-07 12:50:33 -0700471 }
Mark D. Rothc8875492018-02-20 08:33:48 -0800472 grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700473 chand->interested_parties);
Mark D. Rothc8875492018-02-20 08:33:48 -0800474 chand->lb_policy->HandOffPendingPicksLocked(new_lb_policy.get());
475 chand->lb_policy.reset();
Craig Tiller45724b32015-09-22 10:42:19 -0700476 }
Mark D. Rothc8875492018-02-20 08:33:48 -0800477 chand->lb_policy = std::move(new_lb_policy);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700478 }
479 // Now that we've swapped out the relevant fields of chand, check for
480 // error or shutdown.
Craig Tiller4782d922017-11-10 09:53:21 -0800481 if (error != GRPC_ERROR_NONE || chand->resolver == nullptr) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700482 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -0700483 gpr_log(GPR_INFO, "chand=%p: shutting down", chand);
Mark D. Roth60751fe2017-07-07 12:50:33 -0700484 }
Craig Tiller4782d922017-11-10 09:53:21 -0800485 if (chand->resolver != nullptr) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700486 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -0700487 gpr_log(GPR_INFO, "chand=%p: shutting down resolver", chand);
Mark D. Roth60751fe2017-07-07 12:50:33 -0700488 }
Mark D. Roth209f6442018-02-08 10:26:46 -0800489 chand->resolver.reset();
Craig Tiller76a5c0e2016-03-09 09:05:30 -0800490 }
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800491 set_channel_connectivity_state_locked(
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800492 chand, GRPC_CHANNEL_SHUTDOWN,
ncteisen4b36a3d2017-03-13 19:08:06 -0700493 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700494 "Got resolver result after disconnection", &error, 1),
Craig Tiller804ff712016-05-05 16:25:40 -0700495 "resolver_gone");
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700496 grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures,
497 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
498 "Channel disconnected", &error, 1));
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800499 GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
Mark D. Roth1b95f472018-02-15 12:54:02 -0800500 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "resolver");
Mark D. Rothe63e06d2018-03-23 08:12:11 -0700501 grpc_channel_args_destroy(chand->resolver_result);
502 chand->resolver_result = nullptr;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700503 } else { // Not shutting down.
504 grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700505 grpc_error* state_error =
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700506 GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy");
Mark D. Rothc8875492018-02-20 08:33:48 -0800507 if (lb_policy_created) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700508 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -0700509 gpr_log(GPR_INFO, "chand=%p: initializing new LB policy", chand);
Mark D. Roth60751fe2017-07-07 12:50:33 -0700510 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700511 GRPC_ERROR_UNREF(state_error);
Mark D. Rothc8875492018-02-20 08:33:48 -0800512 state = chand->lb_policy->CheckConnectivityLocked(&state_error);
513 grpc_pollset_set_add_pollset_set(chand->lb_policy->interested_parties(),
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700514 chand->interested_parties);
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800515 GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700516 if (chand->exit_idle_when_lb_policy_arrives) {
Mark D. Rothc8875492018-02-20 08:33:48 -0800517 chand->lb_policy->ExitIdleLocked();
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700518 chand->exit_idle_when_lb_policy_arrives = false;
519 }
Mark D. Rothc8875492018-02-20 08:33:48 -0800520 watch_lb_policy_locked(chand, chand->lb_policy.get(), state);
Mark D. Rothe63e06d2018-03-23 08:12:11 -0700521 } else if (chand->resolver_result == nullptr) {
522 // Transient failure.
523 GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700524 }
Mark D. Roth15494b52017-07-12 15:26:55 -0700525 if (!lb_policy_updated) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800526 set_channel_connectivity_state_locked(
527 chand, state, GRPC_ERROR_REF(state_error), "new_lb+resolver");
Mark D. Roth15494b52017-07-12 15:26:55 -0700528 }
Mark D. Rothe63e06d2018-03-23 08:12:11 -0700529 grpc_channel_args_destroy(chand->resolver_result);
530 chand->resolver_result = nullptr;
Mark D. Roth209f6442018-02-08 10:26:46 -0800531 chand->resolver->NextLocked(&chand->resolver_result,
532 &chand->on_resolver_result_changed);
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700533 GRPC_ERROR_UNREF(state_error);
Craig Tillera82950e2015-09-22 12:33:20 -0700534 }
Craig Tiller3f475422015-06-25 10:43:05 -0700535}
536
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800537static void start_transport_op_locked(void* arg, grpc_error* error_ignored) {
Noah Eisenbe82e642018-02-09 09:16:55 -0800538 grpc_transport_op* op = static_cast<grpc_transport_op*>(arg);
Craig Tillerbaa14a92017-11-03 09:09:36 -0700539 grpc_channel_element* elem =
Noah Eisenbe82e642018-02-09 09:16:55 -0800540 static_cast<grpc_channel_element*>(op->handler_private.extra_arg);
541 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Craig Tiller000cd8f2015-09-18 07:20:29 -0700542
Craig Tiller4782d922017-11-10 09:53:21 -0800543 if (op->on_connectivity_state_change != nullptr) {
Craig Tillera82950e2015-09-22 12:33:20 -0700544 grpc_connectivity_state_notify_on_state_change(
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800545 &chand->state_tracker, op->connectivity_state,
Craig Tillera82950e2015-09-22 12:33:20 -0700546 op->on_connectivity_state_change);
Craig Tiller4782d922017-11-10 09:53:21 -0800547 op->on_connectivity_state_change = nullptr;
548 op->connectivity_state = nullptr;
Craig Tillera82950e2015-09-22 12:33:20 -0700549 }
550
Yuchen Zengc272dd72017-12-05 12:18:34 -0800551 if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
Craig Tiller4782d922017-11-10 09:53:21 -0800552 if (chand->lb_policy == nullptr) {
ncteisen274bbbe2017-06-08 14:57:11 -0700553 GRPC_CLOSURE_SCHED(
Yash Tibrewald6c292f2017-12-07 19:38:43 -0800554 op->send_ping.on_initiate,
Yuchen Zengc272dd72017-12-05 12:18:34 -0800555 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing"));
556 GRPC_CLOSURE_SCHED(
Yash Tibrewald6c292f2017-12-07 19:38:43 -0800557 op->send_ping.on_ack,
ncteisen4b36a3d2017-03-13 19:08:06 -0700558 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing"));
Craig Tiller26dab312015-12-07 14:43:47 -0800559 } else {
Mark D. Rothc8875492018-02-20 08:33:48 -0800560 chand->lb_policy->PingOneLocked(op->send_ping.on_initiate,
561 op->send_ping.on_ack);
Craig Tiller4782d922017-11-10 09:53:21 -0800562 op->bind_pollset = nullptr;
Craig Tiller26dab312015-12-07 14:43:47 -0800563 }
Yuchen Zengc272dd72017-12-05 12:18:34 -0800564 op->send_ping.on_initiate = nullptr;
565 op->send_ping.on_ack = nullptr;
Craig Tiller26dab312015-12-07 14:43:47 -0800566 }
567
Craig Tiller1c51edc2016-05-07 16:18:43 -0700568 if (op->disconnect_with_error != GRPC_ERROR_NONE) {
Craig Tiller4782d922017-11-10 09:53:21 -0800569 if (chand->resolver != nullptr) {
Craig Tiller1c51edc2016-05-07 16:18:43 -0700570 set_channel_connectivity_state_locked(
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800571 chand, GRPC_CHANNEL_SHUTDOWN,
Craig Tiller1c51edc2016-05-07 16:18:43 -0700572 GRPC_ERROR_REF(op->disconnect_with_error), "disconnect");
Mark D. Roth209f6442018-02-08 10:26:46 -0800573 chand->resolver.reset();
Craig Tiller1c51edc2016-05-07 16:18:43 -0700574 if (!chand->started_resolving) {
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700575 grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures,
Craig Tiller1c51edc2016-05-07 16:18:43 -0700576 GRPC_ERROR_REF(op->disconnect_with_error));
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800577 GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
Craig Tiller1c51edc2016-05-07 16:18:43 -0700578 }
Craig Tiller4782d922017-11-10 09:53:21 -0800579 if (chand->lb_policy != nullptr) {
Mark D. Rothc8875492018-02-20 08:33:48 -0800580 grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
Craig Tiller1c51edc2016-05-07 16:18:43 -0700581 chand->interested_parties);
Mark D. Rothc8875492018-02-20 08:33:48 -0800582 chand->lb_policy.reset();
Craig Tiller1c51edc2016-05-07 16:18:43 -0700583 }
Craig Tillerb12d22a2016-04-23 12:50:21 -0700584 }
Craig Tiller1c51edc2016-05-07 16:18:43 -0700585 GRPC_ERROR_UNREF(op->disconnect_with_error);
Craig Tillera82950e2015-09-22 12:33:20 -0700586 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800587 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "start_transport_op");
Craig Tillerd2e5cfc2017-02-09 13:02:20 -0800588
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800589 GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE);
Craig Tillerbefafe62017-02-09 11:30:54 -0800590}
591
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800592static void cc_start_transport_op(grpc_channel_element* elem,
Craig Tillerbaa14a92017-11-03 09:09:36 -0700593 grpc_transport_op* op) {
Noah Eisenbe82e642018-02-09 09:16:55 -0800594 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Craig Tillerbefafe62017-02-09 11:30:54 -0800595
Craig Tillerbefafe62017-02-09 11:30:54 -0800596 GPR_ASSERT(op->set_accept_stream == false);
Craig Tiller4782d922017-11-10 09:53:21 -0800597 if (op->bind_pollset != nullptr) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800598 grpc_pollset_set_add_pollset(chand->interested_parties, op->bind_pollset);
Craig Tillerbefafe62017-02-09 11:30:54 -0800599 }
600
Craig Tillerc55c1022017-03-10 10:26:42 -0800601 op->handler_private.extra_arg = elem;
Craig Tillerd2e5cfc2017-02-09 13:02:20 -0800602 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "start_transport_op");
ncteisen274bbbe2017-06-08 14:57:11 -0700603 GRPC_CLOSURE_SCHED(
ncteisen274bbbe2017-06-08 14:57:11 -0700604 GRPC_CLOSURE_INIT(&op->handler_private.closure, start_transport_op_locked,
Craig Tilleree4b1452017-05-12 10:56:03 -0700605 op, grpc_combiner_scheduler(chand->combiner)),
Craig Tillerbefafe62017-02-09 11:30:54 -0800606 GRPC_ERROR_NONE);
Craig Tillerca3e9d32015-06-27 18:37:27 -0700607}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800608
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800609static void cc_get_channel_info(grpc_channel_element* elem,
Craig Tillerbaa14a92017-11-03 09:09:36 -0700610 const grpc_channel_info* info) {
Noah Eisenbe82e642018-02-09 09:16:55 -0800611 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Craig Tiller613dafa2017-02-09 12:00:43 -0800612 gpr_mu_lock(&chand->info_mu);
Craig Tiller4782d922017-11-10 09:53:21 -0800613 if (info->lb_policy_name != nullptr) {
614 *info->lb_policy_name = chand->info_lb_policy_name == nullptr
615 ? nullptr
Craig Tiller613dafa2017-02-09 12:00:43 -0800616 : gpr_strdup(chand->info_lb_policy_name);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700617 }
Craig Tiller4782d922017-11-10 09:53:21 -0800618 if (info->service_config_json != nullptr) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800619 *info->service_config_json =
Craig Tiller4782d922017-11-10 09:53:21 -0800620 chand->info_service_config_json == nullptr
621 ? nullptr
Craig Tiller613dafa2017-02-09 12:00:43 -0800622 : gpr_strdup(chand->info_service_config_json);
Mark D. Rothc625c7a2016-11-09 14:12:37 -0800623 }
Craig Tiller613dafa2017-02-09 12:00:43 -0800624 gpr_mu_unlock(&chand->info_mu);
Mark D. Rothb2d24882016-10-27 15:44:07 -0700625}
626
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700627/* Constructor for channel_data */
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800628static grpc_error* cc_init_channel_elem(grpc_channel_element* elem,
Craig Tillerbaa14a92017-11-03 09:09:36 -0700629 grpc_channel_element_args* args) {
Noah Eisenbe82e642018-02-09 09:16:55 -0800630 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700631 GPR_ASSERT(args->is_last);
632 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800633 // Initialize data members.
Craig Tilleree4b1452017-05-12 10:56:03 -0700634 chand->combiner = grpc_combiner_create();
Craig Tillerd85477512017-02-09 12:02:39 -0800635 gpr_mu_init(&chand->info_mu);
Alexander Polcync3b1f182017-04-18 13:51:36 -0700636 gpr_mu_init(&chand->external_connectivity_watcher_list_mu);
637
638 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
Craig Tiller4782d922017-11-10 09:53:21 -0800639 chand->external_connectivity_watcher_list_head = nullptr;
Alexander Polcync3b1f182017-04-18 13:51:36 -0700640 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
641
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800642 chand->owning_stack = args->channel_stack;
ncteisen274bbbe2017-06-08 14:57:11 -0700643 GRPC_CLOSURE_INIT(&chand->on_resolver_result_changed,
Craig Tillerbefafe62017-02-09 11:30:54 -0800644 on_resolver_result_changed_locked, chand,
Craig Tilleree4b1452017-05-12 10:56:03 -0700645 grpc_combiner_scheduler(chand->combiner));
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800646 chand->interested_parties = grpc_pollset_set_create();
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700647 grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
648 "client_channel");
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800649 grpc_client_channel_start_backup_polling(chand->interested_parties);
Mark D. Roth718c8342018-02-28 13:00:04 -0800650 // Record max per-RPC retry buffer size.
Noah Eisen7ea8a602018-06-14 11:43:18 -0400651 const grpc_arg* arg = grpc_channel_args_find(
652 args->channel_args, GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE);
653 chand->per_rpc_retry_buffer_size = (size_t)grpc_channel_arg_get_integer(
654 arg, {DEFAULT_PER_RPC_RETRY_BUFFER_SIZE, 0, INT_MAX});
Mark D. Roth718c8342018-02-28 13:00:04 -0800655 // Record enable_retries.
Noah Eisen7ea8a602018-06-14 11:43:18 -0400656 arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_ENABLE_RETRIES);
657 chand->enable_retries = grpc_channel_arg_get_bool(arg, true);
Mark D. Roth21d4b2d2016-11-18 09:53:41 -0800658 // Record client channel factory.
Noah Eisen7ea8a602018-06-14 11:43:18 -0400659 arg = grpc_channel_args_find(args->channel_args,
660 GRPC_ARG_CLIENT_CHANNEL_FACTORY);
661 if (arg == nullptr) {
David Garcia Quintas228a5142017-03-30 19:43:00 -0700662 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
Noah Eisen7ea8a602018-06-14 11:43:18 -0400663 "Missing client channel factory in args for client channel filter");
David Garcia Quintas228a5142017-03-30 19:43:00 -0700664 }
Noah Eisen7ea8a602018-06-14 11:43:18 -0400665 if (arg->type != GRPC_ARG_POINTER) {
David Garcia Quintas228a5142017-03-30 19:43:00 -0700666 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
Noah Eisen7ea8a602018-06-14 11:43:18 -0400667 "client channel factory arg must be a pointer");
668 }
669 grpc_client_channel_factory_ref(
670 static_cast<grpc_client_channel_factory*>(arg->value.pointer.p));
671 chand->client_channel_factory =
672 static_cast<grpc_client_channel_factory*>(arg->value.pointer.p);
673 // Get server name to resolve, using proxy mapper if needed.
674 arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI);
675 if (arg == nullptr) {
676 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
677 "Missing server uri in args for client channel filter");
678 }
679 if (arg->type != GRPC_ARG_STRING) {
680 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
681 "server uri arg must be a string");
David Garcia Quintas228a5142017-03-30 19:43:00 -0700682 }
Craig Tiller4782d922017-11-10 09:53:21 -0800683 char* proxy_name = nullptr;
684 grpc_channel_args* new_args = nullptr;
Noah Eisen7ea8a602018-06-14 11:43:18 -0400685 grpc_proxy_mappers_map_name(arg->value.string, args->channel_args,
686 &proxy_name, &new_args);
Mark D. Rothdc9bee72017-02-07 12:29:14 -0800687 // Instantiate resolver.
Mark D. Roth209f6442018-02-08 10:26:46 -0800688 chand->resolver = grpc_core::ResolverRegistry::CreateResolver(
Noah Eisen7ea8a602018-06-14 11:43:18 -0400689 proxy_name != nullptr ? proxy_name : arg->value.string,
Craig Tiller4782d922017-11-10 09:53:21 -0800690 new_args != nullptr ? new_args : args->channel_args,
Craig Tiller972470b2017-02-09 15:05:36 -0800691 chand->interested_parties, chand->combiner);
Craig Tiller4782d922017-11-10 09:53:21 -0800692 if (proxy_name != nullptr) gpr_free(proxy_name);
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800693 if (new_args != nullptr) grpc_channel_args_destroy(new_args);
Craig Tiller4782d922017-11-10 09:53:21 -0800694 if (chand->resolver == nullptr) {
ncteisen4b36a3d2017-03-13 19:08:06 -0700695 return GRPC_ERROR_CREATE_FROM_STATIC_STRING("resolver creation failed");
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800696 }
Craig Tiller3be7dd02017-04-03 14:30:03 -0700697 chand->deadline_checking_enabled =
698 grpc_deadline_checking_enabled(args->channel_args);
Mark D. Roth5e2566e2016-11-18 10:53:13 -0800699 return GRPC_ERROR_NONE;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700700}
701
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800702static void shutdown_resolver_locked(void* arg, grpc_error* error) {
Mark D. Roth209f6442018-02-08 10:26:46 -0800703 grpc_core::Resolver* resolver = static_cast<grpc_core::Resolver*>(arg);
704 resolver->Orphan();
Craig Tiller972470b2017-02-09 15:05:36 -0800705}
706
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700707/* Destructor for channel_data */
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800708static void cc_destroy_channel_elem(grpc_channel_element* elem) {
Noah Eisenbe82e642018-02-09 09:16:55 -0800709 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Craig Tiller4782d922017-11-10 09:53:21 -0800710 if (chand->resolver != nullptr) {
ncteisen274bbbe2017-06-08 14:57:11 -0700711 GRPC_CLOSURE_SCHED(
Mark D. Roth209f6442018-02-08 10:26:46 -0800712 GRPC_CLOSURE_CREATE(shutdown_resolver_locked, chand->resolver.release(),
Yash Tibrewal0ee75742017-10-13 16:07:13 -0700713 grpc_combiner_scheduler(chand->combiner)),
Craig Tiller972470b2017-02-09 15:05:36 -0800714 GRPC_ERROR_NONE);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700715 }
Craig Tiller4782d922017-11-10 09:53:21 -0800716 if (chand->client_channel_factory != nullptr) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800717 grpc_client_channel_factory_unref(chand->client_channel_factory);
Mark D. Roth0e48a9a2016-09-08 14:14:39 -0700718 }
Craig Tiller4782d922017-11-10 09:53:21 -0800719 if (chand->lb_policy != nullptr) {
Mark D. Rothc8875492018-02-20 08:33:48 -0800720 grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700721 chand->interested_parties);
Mark D. Rothc8875492018-02-20 08:33:48 -0800722 chand->lb_policy.reset();
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700723 }
Craig Tiller613dafa2017-02-09 12:00:43 -0800724 gpr_free(chand->info_lb_policy_name);
725 gpr_free(chand->info_service_config_json);
Mark D. Roth9db86fc2018-03-28 07:42:20 -0700726 chand->retry_throttle_data.reset();
727 chand->method_params_table.reset();
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800728 grpc_client_channel_stop_backup_polling(chand->interested_parties);
729 grpc_connectivity_state_destroy(&chand->state_tracker);
730 grpc_pollset_set_destroy(chand->interested_parties);
731 GRPC_COMBINER_UNREF(chand->combiner, "client_channel");
Craig Tillerd85477512017-02-09 12:02:39 -0800732 gpr_mu_destroy(&chand->info_mu);
Alexander Polcync3b1f182017-04-18 13:51:36 -0700733 gpr_mu_destroy(&chand->external_connectivity_watcher_list_mu);
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700734}
735
736/*************************************************************************
737 * PER-CALL FUNCTIONS
738 */
739
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700740// Max number of batches that can be pending on a call at any given
Mark D. Roth718c8342018-02-28 13:00:04 -0800741// time. This includes one batch for each of the following ops:
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700742// recv_initial_metadata
743// send_initial_metadata
744// recv_message
745// send_message
746// recv_trailing_metadata
747// send_trailing_metadata
Mark D. Roth718c8342018-02-28 13:00:04 -0800748#define MAX_PENDING_BATCHES 6
749
750// Retry support:
751//
752// In order to support retries, we act as a proxy for stream op batches.
753// When we get a batch from the surface, we add it to our list of pending
754// batches, and we then use those batches to construct separate "child"
755// batches to be started on the subchannel call. When the child batches
756// return, we then decide which pending batches have been completed and
757// schedule their callbacks accordingly. If a subchannel call fails and
758// we want to retry it, we do a new pick and start again, constructing
759// new "child" batches for the new subchannel call.
760//
761// Note that retries are committed when receiving data from the server
762// (except for Trailers-Only responses). However, there may be many
763// send ops started before receiving any data, so we may have already
764// completed some number of send ops (and returned the completions up to
765// the surface) by the time we realize that we need to retry. To deal
766// with this, we cache data for send ops, so that we can replay them on a
767// different subchannel call even after we have completed the original
768// batches.
769//
770// There are two sets of data to maintain:
771// - In call_data (in the parent channel), we maintain a list of pending
772// ops and cached data for send ops.
773// - In the subchannel call, we maintain state to indicate what ops have
774// already been sent down to that call.
775//
776// When constructing the "child" batches, we compare those two sets of
777// data to see which batches need to be sent to the subchannel call.
778
779// TODO(roth): In subsequent PRs:
780// - add support for transparent retries (including initial metadata)
781// - figure out how to record stats in census for retries
782// (census filter is on top of this one)
783// - add census stats for retries
784
785// State used for starting a retryable batch on a subchannel call.
786// This provides its own grpc_transport_stream_op_batch and other data
787// structures needed to populate the ops in the batch.
788// We allocate one struct on the arena for each attempt at starting a
789// batch on a given subchannel call.
790typedef struct {
791 gpr_refcount refs;
792 grpc_call_element* elem;
793 grpc_subchannel_call* subchannel_call; // Holds a ref.
794 // The batch to use in the subchannel call.
795 // Its payload field points to subchannel_call_retry_state.batch_payload.
796 grpc_transport_stream_op_batch batch;
Mark D. Roth5bacf2e2018-06-19 08:19:58 -0700797 // For intercepting on_complete.
798 grpc_closure on_complete;
799} subchannel_batch_data;
800
801// Retry state associated with a subchannel call.
802// Stored in the parent_data of the subchannel call object.
803typedef struct {
804 // subchannel_batch_data.batch.payload points to this.
805 grpc_transport_stream_op_batch_payload batch_payload;
Mark D. Roth718c8342018-02-28 13:00:04 -0800806 // For send_initial_metadata.
807 // Note that we need to make a copy of the initial metadata for each
808 // subchannel call instead of just referring to the copy in call_data,
809 // because filters in the subchannel stack will probably add entries,
810 // so we need to start in a pristine state for each attempt of the call.
811 grpc_linked_mdelem* send_initial_metadata_storage;
812 grpc_metadata_batch send_initial_metadata;
813 // For send_message.
Mark D. Roth3d8b32d2018-03-09 13:25:40 -0800814 grpc_core::ManualConstructor<grpc_core::ByteStreamCache::CachingByteStream>
815 send_message;
Mark D. Roth718c8342018-02-28 13:00:04 -0800816 // For send_trailing_metadata.
817 grpc_linked_mdelem* send_trailing_metadata_storage;
818 grpc_metadata_batch send_trailing_metadata;
819 // For intercepting recv_initial_metadata.
820 grpc_metadata_batch recv_initial_metadata;
821 grpc_closure recv_initial_metadata_ready;
822 bool trailing_metadata_available;
823 // For intercepting recv_message.
824 grpc_closure recv_message_ready;
Mark D. Roth3d8b32d2018-03-09 13:25:40 -0800825 grpc_core::OrphanablePtr<grpc_core::ByteStream> recv_message;
Mark D. Roth718c8342018-02-28 13:00:04 -0800826 // For intercepting recv_trailing_metadata.
827 grpc_metadata_batch recv_trailing_metadata;
828 grpc_transport_stream_stats collect_stats;
Mark D. Roth817d28f2018-06-14 09:44:58 -0700829 grpc_closure recv_trailing_metadata_ready;
Mark D. Roth718c8342018-02-28 13:00:04 -0800830 // These fields indicate which ops have been started and completed on
831 // this subchannel call.
832 size_t started_send_message_count;
833 size_t completed_send_message_count;
834 size_t started_recv_message_count;
835 size_t completed_recv_message_count;
836 bool started_send_initial_metadata : 1;
837 bool completed_send_initial_metadata : 1;
838 bool started_send_trailing_metadata : 1;
839 bool completed_send_trailing_metadata : 1;
840 bool started_recv_initial_metadata : 1;
841 bool completed_recv_initial_metadata : 1;
842 bool started_recv_trailing_metadata : 1;
843 bool completed_recv_trailing_metadata : 1;
844 // State for callback processing.
845 bool retry_dispatched : 1;
Mark D. Rothde077ac2018-04-12 08:05:44 -0700846 subchannel_batch_data* recv_initial_metadata_ready_deferred_batch;
Mark D. Roth718c8342018-02-28 13:00:04 -0800847 grpc_error* recv_initial_metadata_error;
Mark D. Rothde077ac2018-04-12 08:05:44 -0700848 subchannel_batch_data* recv_message_ready_deferred_batch;
Mark D. Roth718c8342018-02-28 13:00:04 -0800849 grpc_error* recv_message_error;
Mark D. Rothde077ac2018-04-12 08:05:44 -0700850 subchannel_batch_data* recv_trailing_metadata_internal_batch;
Mark D. Roth718c8342018-02-28 13:00:04 -0800851} subchannel_call_retry_state;
852
853// Pending batches stored in call data.
854typedef struct {
855 // The pending batch. If nullptr, this slot is empty.
856 grpc_transport_stream_op_batch* batch;
857 // Indicates whether payload for send ops has been cached in call data.
858 bool send_ops_cached;
859} pending_batch;
Mark D. Roth0ca0be82017-06-20 07:49:33 -0700860
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700861/** Call data. Holds a pointer to grpc_subchannel_call and the
862 associated machinery to create such a pointer.
863 Handles queueing of stream ops until a call object is ready, waiting
864 for initial metadata before trying to create a call object,
865 and handling cancellation gracefully. */
866typedef struct client_channel_call_data {
Mark D. Roth72f6da82016-09-02 13:42:38 -0700867 // State for handling deadlines.
868 // The code in deadline_filter.c requires this to be the first field.
Mark D. Roth72f6da82016-09-02 13:42:38 -0700869 // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
Mark D. Roth66f3d2b2017-09-01 09:02:17 -0700870 // and this struct both independently store pointers to the call stack
871 // and call combiner. If/when we have time, find a way to avoid this
872 // without breaking the grpc_deadline_state abstraction.
Mark D. Roth72f6da82016-09-02 13:42:38 -0700873 grpc_deadline_state deadline_state;
Mark D. Rothf28763c2016-09-14 15:18:40 -0700874
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800875 grpc_slice path; // Request path.
Mark D. Rothe40dd292016-10-05 14:58:37 -0700876 gpr_timespec call_start_time;
Craig Tiller89c14282017-07-19 15:32:27 -0700877 grpc_millis deadline;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700878 gpr_arena* arena;
879 grpc_call_stack* owning_call;
880 grpc_call_combiner* call_combiner;
Mark D. Roth76e264b2017-08-25 09:03:33 -0700881
Mark D. Roth9db86fc2018-03-28 07:42:20 -0700882 grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
Mark D. Roth3e7f2df2018-02-26 13:17:06 -0800883 grpc_core::RefCountedPtr<ClientChannelMethodParams> method_params;
Mark D. Rothaa850a72016-09-26 13:38:02 -0700884
Craig Tillerbaa14a92017-11-03 09:09:36 -0700885 grpc_subchannel_call* subchannel_call;
Mark D. Roth718c8342018-02-28 13:00:04 -0800886
887 // Set when we get a cancel_stream op.
888 grpc_error* cancel_error;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700889
Mark D. Rothc8875492018-02-20 08:33:48 -0800890 grpc_core::LoadBalancingPolicy::PickState pick;
Mark D. Roth718c8342018-02-28 13:00:04 -0800891 grpc_closure pick_closure;
892 grpc_closure pick_cancel_closure;
Mark D. Roth60751fe2017-07-07 12:50:33 -0700893
Craig Tillerbaa14a92017-11-03 09:09:36 -0700894 grpc_polling_entity* pollent;
Mark D. Roth7e0e2022018-06-01 12:04:16 -0700895 bool pollent_added_to_interested_parties;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700896
Mark D. Roth718c8342018-02-28 13:00:04 -0800897 // Batches are added to this list when received from above.
898 // They are removed when we are done handling the batch (i.e., when
899 // either we have invoked all of the batch's callbacks or we have
900 // passed the batch down to the subchannel call and are not
901 // intercepting any of its callbacks).
902 pending_batch pending_batches[MAX_PENDING_BATCHES];
903 bool pending_send_initial_metadata : 1;
904 bool pending_send_message : 1;
905 bool pending_send_trailing_metadata : 1;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700906
Mark D. Roth718c8342018-02-28 13:00:04 -0800907 // Retry state.
908 bool enable_retries : 1;
909 bool retry_committed : 1;
910 bool last_attempt_got_server_pushback : 1;
911 int num_attempts_completed;
912 size_t bytes_buffered_for_retry;
913 grpc_core::ManualConstructor<grpc_core::BackOff> retry_backoff;
914 grpc_timer retry_timer;
David Garcia Quintasd1a47f12016-09-02 12:46:44 +0200915
Mark D. Roth4f9e0032018-05-24 09:30:09 -0700916 // The number of pending retriable subchannel batches containing send ops.
917 // We hold a ref to the call stack while this is non-zero, since replay
918 // batches may not complete until after all callbacks have been returned
919 // to the surface, and we need to make sure that the call is not destroyed
920 // until all of these batches have completed.
921 // Note that we actually only need to track replay batches, but it's
922 // easier to track all batches with send ops.
923 int num_pending_retriable_subchannel_send_batches;
924
Mark D. Roth718c8342018-02-28 13:00:04 -0800925 // Cached data for retrying send ops.
926 // send_initial_metadata
927 bool seen_send_initial_metadata;
928 grpc_linked_mdelem* send_initial_metadata_storage;
929 grpc_metadata_batch send_initial_metadata;
930 uint32_t send_initial_metadata_flags;
931 gpr_atm* peer_string;
932 // send_message
933 // When we get a send_message op, we replace the original byte stream
Mark D. Roth3d8b32d2018-03-09 13:25:40 -0800934 // with a CachingByteStream that caches the slices to a local buffer for
935 // use in retries.
Mark D. Roth718c8342018-02-28 13:00:04 -0800936 // Note: We inline the cache for the first 3 send_message ops and use
937 // dynamic allocation after that. This number was essentially picked
938 // at random; it could be changed in the future to tune performance.
Mark D. Rothefcd45b2018-03-28 10:49:59 -0700939 grpc_core::ManualConstructor<
940 grpc_core::InlinedVector<grpc_core::ByteStreamCache*, 3>>
941 send_messages;
Mark D. Roth718c8342018-02-28 13:00:04 -0800942 // send_trailing_metadata
943 bool seen_send_trailing_metadata;
944 grpc_linked_mdelem* send_trailing_metadata_storage;
945 grpc_metadata_batch send_trailing_metadata;
Mark D. Roth2a5959f2016-09-01 08:20:27 -0700946} call_data;
947
Mark D. Roth718c8342018-02-28 13:00:04 -0800948// Forward declarations.
949static void retry_commit(grpc_call_element* elem,
950 subchannel_call_retry_state* retry_state);
951static void start_internal_recv_trailing_metadata(grpc_call_element* elem);
952static void on_complete(void* arg, grpc_error* error);
953static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored);
Mark D. Roth718c8342018-02-28 13:00:04 -0800954static void start_pick_locked(void* arg, grpc_error* ignored);
Craig Tiller8b1d59c2016-12-27 15:15:30 -0800955
Mark D. Roth718c8342018-02-28 13:00:04 -0800956//
957// send op data caching
958//
959
960// Caches data for send ops so that it can be retried later, if not
961// already cached.
962static void maybe_cache_send_ops_for_batch(call_data* calld,
963 pending_batch* pending) {
964 if (pending->send_ops_cached) return;
965 pending->send_ops_cached = true;
966 grpc_transport_stream_op_batch* batch = pending->batch;
967 // Save a copy of metadata for send_initial_metadata ops.
Mark D. Roth76e264b2017-08-25 09:03:33 -0700968 if (batch->send_initial_metadata) {
Mark D. Roth718c8342018-02-28 13:00:04 -0800969 calld->seen_send_initial_metadata = true;
970 GPR_ASSERT(calld->send_initial_metadata_storage == nullptr);
971 grpc_metadata_batch* send_initial_metadata =
972 batch->payload->send_initial_metadata.send_initial_metadata;
973 calld->send_initial_metadata_storage = (grpc_linked_mdelem*)gpr_arena_alloc(
974 calld->arena,
975 sizeof(grpc_linked_mdelem) * send_initial_metadata->list.count);
976 grpc_metadata_batch_copy(send_initial_metadata,
977 &calld->send_initial_metadata,
978 calld->send_initial_metadata_storage);
979 calld->send_initial_metadata_flags =
980 batch->payload->send_initial_metadata.send_initial_metadata_flags;
981 calld->peer_string = batch->payload->send_initial_metadata.peer_string;
982 }
983 // Set up cache for send_message ops.
984 if (batch->send_message) {
Mark D. Roth3d8b32d2018-03-09 13:25:40 -0800985 grpc_core::ByteStreamCache* cache =
986 static_cast<grpc_core::ByteStreamCache*>(
987 gpr_arena_alloc(calld->arena, sizeof(grpc_core::ByteStreamCache)));
988 new (cache) grpc_core::ByteStreamCache(
989 std::move(batch->payload->send_message.send_message));
Mark D. Rothefcd45b2018-03-28 10:49:59 -0700990 calld->send_messages->push_back(cache);
Mark D. Roth718c8342018-02-28 13:00:04 -0800991 }
992 // Save metadata batch for send_trailing_metadata ops.
993 if (batch->send_trailing_metadata) {
994 calld->seen_send_trailing_metadata = true;
995 GPR_ASSERT(calld->send_trailing_metadata_storage == nullptr);
996 grpc_metadata_batch* send_trailing_metadata =
997 batch->payload->send_trailing_metadata.send_trailing_metadata;
998 calld->send_trailing_metadata_storage =
999 (grpc_linked_mdelem*)gpr_arena_alloc(
1000 calld->arena,
1001 sizeof(grpc_linked_mdelem) * send_trailing_metadata->list.count);
1002 grpc_metadata_batch_copy(send_trailing_metadata,
1003 &calld->send_trailing_metadata,
1004 calld->send_trailing_metadata_storage);
Mark D. Roth76e264b2017-08-25 09:03:33 -07001005 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001006}
1007
Mark D. Rothde077ac2018-04-12 08:05:44 -07001008// Frees cached send_initial_metadata.
1009static void free_cached_send_initial_metadata(channel_data* chand,
1010 call_data* calld) {
1011 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001012 gpr_log(GPR_INFO,
Mark D. Rothde077ac2018-04-12 08:05:44 -07001013 "chand=%p calld=%p: destroying calld->send_initial_metadata", chand,
1014 calld);
1015 }
1016 grpc_metadata_batch_destroy(&calld->send_initial_metadata);
1017}
1018
1019// Frees cached send_message at index idx.
1020static void free_cached_send_message(channel_data* chand, call_data* calld,
1021 size_t idx) {
1022 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001023 gpr_log(GPR_INFO,
Mark D. Rothde077ac2018-04-12 08:05:44 -07001024 "chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR "]",
1025 chand, calld, idx);
1026 }
1027 (*calld->send_messages)[idx]->Destroy();
1028}
1029
1030// Frees cached send_trailing_metadata.
1031static void free_cached_send_trailing_metadata(channel_data* chand,
1032 call_data* calld) {
1033 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001034 gpr_log(GPR_INFO,
Mark D. Rothde077ac2018-04-12 08:05:44 -07001035 "chand=%p calld=%p: destroying calld->send_trailing_metadata",
1036 chand, calld);
1037 }
1038 grpc_metadata_batch_destroy(&calld->send_trailing_metadata);
1039}
1040
Mark D. Roth718c8342018-02-28 13:00:04 -08001041// Frees cached send ops that have already been completed after
1042// committing the call.
1043static void free_cached_send_op_data_after_commit(
1044 grpc_call_element* elem, subchannel_call_retry_state* retry_state) {
Noah Eisenbe82e642018-02-09 09:16:55 -08001045 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1046 call_data* calld = static_cast<call_data*>(elem->call_data);
Mark D. Roth718c8342018-02-28 13:00:04 -08001047 if (retry_state->completed_send_initial_metadata) {
Mark D. Rothde077ac2018-04-12 08:05:44 -07001048 free_cached_send_initial_metadata(chand, calld);
Mark D. Roth60751fe2017-07-07 12:50:33 -07001049 }
Mark D. Roth718c8342018-02-28 13:00:04 -08001050 for (size_t i = 0; i < retry_state->completed_send_message_count; ++i) {
Mark D. Rothde077ac2018-04-12 08:05:44 -07001051 free_cached_send_message(chand, calld, i);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001052 }
Mark D. Roth718c8342018-02-28 13:00:04 -08001053 if (retry_state->completed_send_trailing_metadata) {
Mark D. Rothde077ac2018-04-12 08:05:44 -07001054 free_cached_send_trailing_metadata(chand, calld);
Mark D. Roth718c8342018-02-28 13:00:04 -08001055 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -07001056}
1057
Mark D. Roth718c8342018-02-28 13:00:04 -08001058// Frees cached send ops that were completed by the completed batch in
1059// batch_data. Used when batches are completed after the call is committed.
1060static void free_cached_send_op_data_for_completed_batch(
1061 grpc_call_element* elem, subchannel_batch_data* batch_data,
1062 subchannel_call_retry_state* retry_state) {
Noah Eisenbe82e642018-02-09 09:16:55 -08001063 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1064 call_data* calld = static_cast<call_data*>(elem->call_data);
Mark D. Roth718c8342018-02-28 13:00:04 -08001065 if (batch_data->batch.send_initial_metadata) {
Mark D. Rothde077ac2018-04-12 08:05:44 -07001066 free_cached_send_initial_metadata(chand, calld);
Mark D. Roth718c8342018-02-28 13:00:04 -08001067 }
1068 if (batch_data->batch.send_message) {
Mark D. Rothde077ac2018-04-12 08:05:44 -07001069 free_cached_send_message(chand, calld,
1070 retry_state->completed_send_message_count - 1);
Mark D. Roth718c8342018-02-28 13:00:04 -08001071 }
1072 if (batch_data->batch.send_trailing_metadata) {
Mark D. Rothde077ac2018-04-12 08:05:44 -07001073 free_cached_send_trailing_metadata(chand, calld);
Mark D. Roth718c8342018-02-28 13:00:04 -08001074 }
1075}
1076
1077//
1078// pending_batches management
1079//
1080
1081// Returns the index into calld->pending_batches to be used for batch.
1082static size_t get_batch_index(grpc_transport_stream_op_batch* batch) {
1083 // Note: It is important the send_initial_metadata be the first entry
1084 // here, since the code in pick_subchannel_locked() assumes it will be.
1085 if (batch->send_initial_metadata) return 0;
1086 if (batch->send_message) return 1;
1087 if (batch->send_trailing_metadata) return 2;
1088 if (batch->recv_initial_metadata) return 3;
1089 if (batch->recv_message) return 4;
1090 if (batch->recv_trailing_metadata) return 5;
1091 GPR_UNREACHABLE_CODE(return (size_t)-1);
1092}
1093
1094// This is called via the call combiner, so access to calld is synchronized.
1095static void pending_batches_add(grpc_call_element* elem,
1096 grpc_transport_stream_op_batch* batch) {
1097 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1098 call_data* calld = static_cast<call_data*>(elem->call_data);
1099 const size_t idx = get_batch_index(batch);
Craig Tiller6014e8a2017-10-16 13:50:29 -07001100 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001101 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08001102 "chand=%p calld=%p: adding pending batch at index %" PRIuPTR, chand,
1103 calld, idx);
Mark D. Roth60751fe2017-07-07 12:50:33 -07001104 }
Mark D. Roth718c8342018-02-28 13:00:04 -08001105 pending_batch* pending = &calld->pending_batches[idx];
1106 GPR_ASSERT(pending->batch == nullptr);
1107 pending->batch = batch;
1108 pending->send_ops_cached = false;
1109 if (calld->enable_retries) {
1110 // Update state in calld about pending batches.
1111 // Also check if the batch takes us over the retry buffer limit.
1112 // Note: We don't check the size of trailing metadata here, because
1113 // gRPC clients do not send trailing metadata.
1114 if (batch->send_initial_metadata) {
1115 calld->pending_send_initial_metadata = true;
1116 calld->bytes_buffered_for_retry += grpc_metadata_batch_size(
1117 batch->payload->send_initial_metadata.send_initial_metadata);
1118 }
1119 if (batch->send_message) {
1120 calld->pending_send_message = true;
1121 calld->bytes_buffered_for_retry +=
Mark D. Roth3d8b32d2018-03-09 13:25:40 -08001122 batch->payload->send_message.send_message->length();
Mark D. Roth718c8342018-02-28 13:00:04 -08001123 }
1124 if (batch->send_trailing_metadata) {
1125 calld->pending_send_trailing_metadata = true;
1126 }
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07001127 if (GPR_UNLIKELY(calld->bytes_buffered_for_retry >
1128 chand->per_rpc_retry_buffer_size)) {
Mark D. Roth718c8342018-02-28 13:00:04 -08001129 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001130 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08001131 "chand=%p calld=%p: exceeded retry buffer size, committing",
1132 chand, calld);
1133 }
1134 subchannel_call_retry_state* retry_state =
1135 calld->subchannel_call == nullptr
1136 ? nullptr
1137 : static_cast<subchannel_call_retry_state*>(
1138 grpc_connected_subchannel_call_get_parent_data(
1139 calld->subchannel_call));
1140 retry_commit(elem, retry_state);
1141 // If we are not going to retry and have not yet started, pretend
1142 // retries are disabled so that we don't bother with retry overhead.
1143 if (calld->num_attempts_completed == 0) {
1144 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001145 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08001146 "chand=%p calld=%p: disabling retries before first attempt",
1147 chand, calld);
Mark D. Roth0ca0be82017-06-20 07:49:33 -07001148 }
Mark D. Roth718c8342018-02-28 13:00:04 -08001149 calld->enable_retries = false;
Craig Tiller11c17d42017-03-13 13:36:34 -07001150 }
1151 }
1152 }
Craig Tiller11c17d42017-03-13 13:36:34 -07001153}
Craig Tillerea4a4f12017-03-13 13:36:52 -07001154
Mark D. Roth718c8342018-02-28 13:00:04 -08001155static void pending_batch_clear(call_data* calld, pending_batch* pending) {
1156 if (calld->enable_retries) {
1157 if (pending->batch->send_initial_metadata) {
1158 calld->pending_send_initial_metadata = false;
1159 }
1160 if (pending->batch->send_message) {
1161 calld->pending_send_message = false;
1162 }
1163 if (pending->batch->send_trailing_metadata) {
1164 calld->pending_send_trailing_metadata = false;
1165 }
1166 }
1167 pending->batch = nullptr;
1168}
1169
1170// This is called via the call combiner, so access to calld is synchronized.
1171static void fail_pending_batch_in_call_combiner(void* arg, grpc_error* error) {
1172 grpc_transport_stream_op_batch* batch =
1173 static_cast<grpc_transport_stream_op_batch*>(arg);
1174 call_data* calld = static_cast<call_data*>(batch->handler_private.extra_arg);
1175 // Note: This will release the call combiner.
1176 grpc_transport_stream_op_batch_finish_with_failure(
1177 batch, GRPC_ERROR_REF(error), calld->call_combiner);
1178}
1179
1180// This is called via the call combiner, so access to calld is synchronized.
1181// If yield_call_combiner is true, assumes responsibility for yielding
1182// the call combiner.
1183static void pending_batches_fail(grpc_call_element* elem, grpc_error* error,
1184 bool yield_call_combiner) {
1185 GPR_ASSERT(error != GRPC_ERROR_NONE);
1186 call_data* calld = static_cast<call_data*>(elem->call_data);
1187 if (grpc_client_channel_trace.enabled()) {
1188 size_t num_batches = 0;
1189 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1190 if (calld->pending_batches[i].batch != nullptr) ++num_batches;
1191 }
Mark D. Roth48854d22018-04-25 13:05:26 -07001192 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08001193 "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
1194 elem->channel_data, calld, num_batches, grpc_error_string(error));
1195 }
Mark D. Roth817d28f2018-06-14 09:44:58 -07001196 grpc_core::CallCombinerClosureList closures;
Mark D. Roth718c8342018-02-28 13:00:04 -08001197 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1198 pending_batch* pending = &calld->pending_batches[i];
1199 grpc_transport_stream_op_batch* batch = pending->batch;
1200 if (batch != nullptr) {
Mark D. Roth817d28f2018-06-14 09:44:58 -07001201 batch->handler_private.extra_arg = calld;
1202 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
1203 fail_pending_batch_in_call_combiner, batch,
1204 grpc_schedule_on_exec_ctx);
1205 closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
1206 "pending_batches_fail");
Mark D. Roth718c8342018-02-28 13:00:04 -08001207 pending_batch_clear(calld, pending);
1208 }
1209 }
Mark D. Roth718c8342018-02-28 13:00:04 -08001210 if (yield_call_combiner) {
Mark D. Roth817d28f2018-06-14 09:44:58 -07001211 closures.RunClosures(calld->call_combiner);
1212 } else {
1213 closures.RunClosuresWithoutYielding(calld->call_combiner);
Mark D. Roth718c8342018-02-28 13:00:04 -08001214 }
1215 GRPC_ERROR_UNREF(error);
1216}
1217
1218// This is called via the call combiner, so access to calld is synchronized.
1219static void resume_pending_batch_in_call_combiner(void* arg,
1220 grpc_error* ignored) {
1221 grpc_transport_stream_op_batch* batch =
1222 static_cast<grpc_transport_stream_op_batch*>(arg);
1223 grpc_subchannel_call* subchannel_call =
1224 static_cast<grpc_subchannel_call*>(batch->handler_private.extra_arg);
1225 // Note: This will release the call combiner.
1226 grpc_subchannel_call_process_op(subchannel_call, batch);
1227}
1228
1229// This is called via the call combiner, so access to calld is synchronized.
1230static void pending_batches_resume(grpc_call_element* elem) {
Noah Eisenbe82e642018-02-09 09:16:55 -08001231 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1232 call_data* calld = static_cast<call_data*>(elem->call_data);
Mark D. Roth718c8342018-02-28 13:00:04 -08001233 if (calld->enable_retries) {
1234 start_retriable_subchannel_batches(elem, GRPC_ERROR_NONE);
1235 return;
1236 }
1237 // Retries not enabled; send down batches as-is.
1238 if (grpc_client_channel_trace.enabled()) {
1239 size_t num_batches = 0;
1240 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1241 if (calld->pending_batches[i].batch != nullptr) ++num_batches;
1242 }
Mark D. Roth48854d22018-04-25 13:05:26 -07001243 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08001244 "chand=%p calld=%p: starting %" PRIuPTR
1245 " pending batches on subchannel_call=%p",
1246 chand, calld, num_batches, calld->subchannel_call);
1247 }
Mark D. Roth817d28f2018-06-14 09:44:58 -07001248 grpc_core::CallCombinerClosureList closures;
Mark D. Roth718c8342018-02-28 13:00:04 -08001249 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1250 pending_batch* pending = &calld->pending_batches[i];
1251 grpc_transport_stream_op_batch* batch = pending->batch;
1252 if (batch != nullptr) {
Mark D. Roth817d28f2018-06-14 09:44:58 -07001253 batch->handler_private.extra_arg = calld->subchannel_call;
1254 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
1255 resume_pending_batch_in_call_combiner, batch,
1256 grpc_schedule_on_exec_ctx);
1257 closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
1258 "pending_batches_resume");
Mark D. Roth718c8342018-02-28 13:00:04 -08001259 pending_batch_clear(calld, pending);
1260 }
1261 }
Mark D. Roth718c8342018-02-28 13:00:04 -08001262 // Note: This will release the call combiner.
Mark D. Roth817d28f2018-06-14 09:44:58 -07001263 closures.RunClosures(calld->call_combiner);
Mark D. Roth718c8342018-02-28 13:00:04 -08001264}
1265
1266static void maybe_clear_pending_batch(grpc_call_element* elem,
1267 pending_batch* pending) {
1268 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1269 call_data* calld = static_cast<call_data*>(elem->call_data);
1270 grpc_transport_stream_op_batch* batch = pending->batch;
1271 // We clear the pending batch if all of its callbacks have been
1272 // scheduled and reset to nullptr.
1273 if (batch->on_complete == nullptr &&
1274 (!batch->recv_initial_metadata ||
1275 batch->payload->recv_initial_metadata.recv_initial_metadata_ready ==
1276 nullptr) &&
1277 (!batch->recv_message ||
Mark D. Roth817d28f2018-06-14 09:44:58 -07001278 batch->payload->recv_message.recv_message_ready == nullptr) &&
1279 (!batch->recv_trailing_metadata ||
1280 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready ==
1281 nullptr)) {
Mark D. Roth718c8342018-02-28 13:00:04 -08001282 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001283 gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand,
Mark D. Roth718c8342018-02-28 13:00:04 -08001284 calld);
1285 }
1286 pending_batch_clear(calld, pending);
1287 }
1288}
1289
Mark D. Roth817d28f2018-06-14 09:44:58 -07001290// Returns a pointer to the first pending batch for which predicate(batch)
1291// returns true, or null if not found.
1292template <typename Predicate>
1293static pending_batch* pending_batch_find(grpc_call_element* elem,
1294 const char* log_message,
1295 Predicate predicate) {
1296 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1297 call_data* calld = static_cast<call_data*>(elem->call_data);
1298 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1299 pending_batch* pending = &calld->pending_batches[i];
1300 grpc_transport_stream_op_batch* batch = pending->batch;
1301 if (batch != nullptr && predicate(batch)) {
1302 if (grpc_client_channel_trace.enabled()) {
1303 gpr_log(GPR_INFO,
1304 "chand=%p calld=%p: %s pending batch at index %" PRIuPTR, chand,
1305 calld, log_message, i);
1306 }
1307 return pending;
1308 }
Mark D. Roth718c8342018-02-28 13:00:04 -08001309 }
Mark D. Roth817d28f2018-06-14 09:44:58 -07001310 return nullptr;
Mark D. Roth718c8342018-02-28 13:00:04 -08001311}
1312
1313//
1314// retry code
1315//
1316
1317// Commits the call so that no further retry attempts will be performed.
1318static void retry_commit(grpc_call_element* elem,
1319 subchannel_call_retry_state* retry_state) {
1320 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1321 call_data* calld = static_cast<call_data*>(elem->call_data);
1322 if (calld->retry_committed) return;
1323 calld->retry_committed = true;
1324 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001325 gpr_log(GPR_INFO, "chand=%p calld=%p: committing retries", chand, calld);
Mark D. Roth718c8342018-02-28 13:00:04 -08001326 }
1327 if (retry_state != nullptr) {
1328 free_cached_send_op_data_after_commit(elem, retry_state);
1329 }
1330}
1331
1332// Starts a retry after appropriate back-off.
1333static void do_retry(grpc_call_element* elem,
1334 subchannel_call_retry_state* retry_state,
1335 grpc_millis server_pushback_ms) {
1336 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1337 call_data* calld = static_cast<call_data*>(elem->call_data);
1338 GPR_ASSERT(calld->method_params != nullptr);
1339 const ClientChannelMethodParams::RetryPolicy* retry_policy =
1340 calld->method_params->retry_policy();
1341 GPR_ASSERT(retry_policy != nullptr);
1342 // Reset subchannel call and connected subchannel.
1343 if (calld->subchannel_call != nullptr) {
1344 GRPC_SUBCHANNEL_CALL_UNREF(calld->subchannel_call,
1345 "client_channel_call_retry");
1346 calld->subchannel_call = nullptr;
1347 }
1348 if (calld->pick.connected_subchannel != nullptr) {
1349 calld->pick.connected_subchannel.reset();
1350 }
1351 // Compute backoff delay.
1352 grpc_millis next_attempt_time;
1353 if (server_pushback_ms >= 0) {
1354 next_attempt_time = grpc_core::ExecCtx::Get()->Now() + server_pushback_ms;
1355 calld->last_attempt_got_server_pushback = true;
1356 } else {
1357 if (calld->num_attempts_completed == 1 ||
1358 calld->last_attempt_got_server_pushback) {
1359 calld->retry_backoff.Init(
1360 grpc_core::BackOff::Options()
1361 .set_initial_backoff(retry_policy->initial_backoff)
1362 .set_multiplier(retry_policy->backoff_multiplier)
1363 .set_jitter(RETRY_BACKOFF_JITTER)
1364 .set_max_backoff(retry_policy->max_backoff));
1365 calld->last_attempt_got_server_pushback = false;
1366 }
1367 next_attempt_time = calld->retry_backoff->NextAttemptTime();
1368 }
1369 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001370 gpr_log(GPR_INFO,
Sree Kuchibhotla1dd12c02018-04-11 18:05:48 -07001371 "chand=%p calld=%p: retrying failed call in %" PRId64 " ms", chand,
Mark D. Roth718c8342018-02-28 13:00:04 -08001372 calld, next_attempt_time - grpc_core::ExecCtx::Get()->Now());
1373 }
1374 // Schedule retry after computed delay.
1375 GRPC_CLOSURE_INIT(&calld->pick_closure, start_pick_locked, elem,
1376 grpc_combiner_scheduler(chand->combiner));
1377 grpc_timer_init(&calld->retry_timer, next_attempt_time, &calld->pick_closure);
1378 // Update bookkeeping.
1379 if (retry_state != nullptr) retry_state->retry_dispatched = true;
1380}
1381
1382// Returns true if the call is being retried.
1383static bool maybe_retry(grpc_call_element* elem,
1384 subchannel_batch_data* batch_data,
1385 grpc_status_code status,
1386 grpc_mdelem* server_pushback_md) {
1387 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1388 call_data* calld = static_cast<call_data*>(elem->call_data);
1389 // Get retry policy.
1390 if (calld->method_params == nullptr) return false;
1391 const ClientChannelMethodParams::RetryPolicy* retry_policy =
1392 calld->method_params->retry_policy();
1393 if (retry_policy == nullptr) return false;
1394 // If we've already dispatched a retry from this call, return true.
1395 // This catches the case where the batch has multiple callbacks
1396 // (i.e., it includes either recv_message or recv_initial_metadata).
1397 subchannel_call_retry_state* retry_state = nullptr;
1398 if (batch_data != nullptr) {
1399 retry_state = static_cast<subchannel_call_retry_state*>(
1400 grpc_connected_subchannel_call_get_parent_data(
1401 batch_data->subchannel_call));
1402 if (retry_state->retry_dispatched) {
1403 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001404 gpr_log(GPR_INFO, "chand=%p calld=%p: retry already dispatched", chand,
Mark D. Roth718c8342018-02-28 13:00:04 -08001405 calld);
1406 }
1407 return true;
1408 }
1409 }
1410 // Check status.
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07001411 if (GPR_LIKELY(status == GRPC_STATUS_OK)) {
Mark D. Roth9db86fc2018-03-28 07:42:20 -07001412 if (calld->retry_throttle_data != nullptr) {
1413 calld->retry_throttle_data->RecordSuccess();
1414 }
Mark D. Roth718c8342018-02-28 13:00:04 -08001415 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001416 gpr_log(GPR_INFO, "chand=%p calld=%p: call succeeded", chand, calld);
Mark D. Roth718c8342018-02-28 13:00:04 -08001417 }
1418 return false;
1419 }
1420 // Status is not OK. Check whether the status is retryable.
1421 if (!retry_policy->retryable_status_codes.Contains(status)) {
1422 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001423 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08001424 "chand=%p calld=%p: status %s not configured as retryable", chand,
1425 calld, grpc_status_code_to_string(status));
1426 }
1427 return false;
1428 }
1429 // Record the failure and check whether retries are throttled.
1430 // Note that it's important for this check to come after the status
1431 // code check above, since we should only record failures whose statuses
1432 // match the configured retryable status codes, so that we don't count
1433 // things like failures due to malformed requests (INVALID_ARGUMENT).
1434 // Conversely, it's important for this to come before the remaining
1435 // checks, so that we don't fail to record failures due to other factors.
Mark D. Roth9db86fc2018-03-28 07:42:20 -07001436 if (calld->retry_throttle_data != nullptr &&
1437 !calld->retry_throttle_data->RecordFailure()) {
Mark D. Roth718c8342018-02-28 13:00:04 -08001438 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001439 gpr_log(GPR_INFO, "chand=%p calld=%p: retries throttled", chand, calld);
Mark D. Roth718c8342018-02-28 13:00:04 -08001440 }
1441 return false;
1442 }
1443 // Check whether the call is committed.
1444 if (calld->retry_committed) {
1445 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001446 gpr_log(GPR_INFO, "chand=%p calld=%p: retries already committed", chand,
Mark D. Roth718c8342018-02-28 13:00:04 -08001447 calld);
1448 }
1449 return false;
1450 }
1451 // Check whether we have retries remaining.
1452 ++calld->num_attempts_completed;
1453 if (calld->num_attempts_completed >= retry_policy->max_attempts) {
1454 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001455 gpr_log(GPR_INFO, "chand=%p calld=%p: exceeded %d retry attempts", chand,
Mark D. Roth718c8342018-02-28 13:00:04 -08001456 calld, retry_policy->max_attempts);
1457 }
1458 return false;
1459 }
1460 // If the call was cancelled from the surface, don't retry.
1461 if (calld->cancel_error != GRPC_ERROR_NONE) {
1462 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001463 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08001464 "chand=%p calld=%p: call cancelled from surface, not retrying",
1465 chand, calld);
1466 }
1467 return false;
1468 }
1469 // Check server push-back.
1470 grpc_millis server_pushback_ms = -1;
1471 if (server_pushback_md != nullptr) {
1472 // If the value is "-1" or any other unparseable string, we do not retry.
1473 uint32_t ms;
1474 if (!grpc_parse_slice_to_uint32(GRPC_MDVALUE(*server_pushback_md), &ms)) {
1475 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001476 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08001477 "chand=%p calld=%p: not retrying due to server push-back",
1478 chand, calld);
1479 }
1480 return false;
1481 } else {
1482 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001483 gpr_log(GPR_INFO, "chand=%p calld=%p: server push-back: retry in %u ms",
1484 chand, calld, ms);
Mark D. Roth718c8342018-02-28 13:00:04 -08001485 }
1486 server_pushback_ms = (grpc_millis)ms;
1487 }
1488 }
1489 do_retry(elem, retry_state, server_pushback_ms);
1490 return true;
1491}
1492
1493//
1494// subchannel_batch_data
1495//
1496
Mark D. Roth817d28f2018-06-14 09:44:58 -07001497// Creates a subchannel_batch_data object on the call's arena with the
1498// specified refcount. If set_on_complete is true, the batch's
1499// on_complete callback will be set to point to on_complete();
1500// otherwise, the batch's on_complete callback will be null.
Mark D. Roth718c8342018-02-28 13:00:04 -08001501static subchannel_batch_data* batch_data_create(grpc_call_element* elem,
Mark D. Roth817d28f2018-06-14 09:44:58 -07001502 int refcount,
1503 bool set_on_complete) {
Mark D. Roth718c8342018-02-28 13:00:04 -08001504 call_data* calld = static_cast<call_data*>(elem->call_data);
1505 subchannel_call_retry_state* retry_state =
1506 static_cast<subchannel_call_retry_state*>(
1507 grpc_connected_subchannel_call_get_parent_data(
1508 calld->subchannel_call));
1509 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(
1510 gpr_arena_alloc(calld->arena, sizeof(*batch_data)));
1511 batch_data->elem = elem;
1512 batch_data->subchannel_call =
1513 GRPC_SUBCHANNEL_CALL_REF(calld->subchannel_call, "batch_data_create");
1514 batch_data->batch.payload = &retry_state->batch_payload;
1515 gpr_ref_init(&batch_data->refs, refcount);
Mark D. Roth817d28f2018-06-14 09:44:58 -07001516 if (set_on_complete) {
1517 GRPC_CLOSURE_INIT(&batch_data->on_complete, on_complete, batch_data,
1518 grpc_schedule_on_exec_ctx);
1519 batch_data->batch.on_complete = &batch_data->on_complete;
1520 }
Mark D. Roth718c8342018-02-28 13:00:04 -08001521 GRPC_CALL_STACK_REF(calld->owning_call, "batch_data");
1522 return batch_data;
1523}
1524
1525static void batch_data_unref(subchannel_batch_data* batch_data) {
1526 if (gpr_unref(&batch_data->refs)) {
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07001527 subchannel_call_retry_state* retry_state =
1528 static_cast<subchannel_call_retry_state*>(
1529 grpc_connected_subchannel_call_get_parent_data(
1530 batch_data->subchannel_call));
1531 if (batch_data->batch.send_initial_metadata) {
1532 grpc_metadata_batch_destroy(&retry_state->send_initial_metadata);
Mark D. Roth718c8342018-02-28 13:00:04 -08001533 }
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07001534 if (batch_data->batch.send_trailing_metadata) {
1535 grpc_metadata_batch_destroy(&retry_state->send_trailing_metadata);
Mark D. Roth718c8342018-02-28 13:00:04 -08001536 }
1537 if (batch_data->batch.recv_initial_metadata) {
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07001538 grpc_metadata_batch_destroy(&retry_state->recv_initial_metadata);
Mark D. Roth718c8342018-02-28 13:00:04 -08001539 }
1540 if (batch_data->batch.recv_trailing_metadata) {
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07001541 grpc_metadata_batch_destroy(&retry_state->recv_trailing_metadata);
Mark D. Roth718c8342018-02-28 13:00:04 -08001542 }
1543 GRPC_SUBCHANNEL_CALL_UNREF(batch_data->subchannel_call, "batch_data_unref");
1544 call_data* calld = static_cast<call_data*>(batch_data->elem->call_data);
1545 GRPC_CALL_STACK_UNREF(calld->owning_call, "batch_data");
1546 }
1547}
1548
1549//
1550// recv_initial_metadata callback handling
1551//
1552
1553// Invokes recv_initial_metadata_ready for a subchannel batch.
1554static void invoke_recv_initial_metadata_callback(void* arg,
1555 grpc_error* error) {
1556 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
Mark D. Roth718c8342018-02-28 13:00:04 -08001557 // Find pending batch.
Mark D. Roth817d28f2018-06-14 09:44:58 -07001558 pending_batch* pending = pending_batch_find(
1559 batch_data->elem, "invoking recv_initial_metadata_ready for",
1560 [](grpc_transport_stream_op_batch* batch) {
1561 return batch->recv_initial_metadata &&
1562 batch->payload->recv_initial_metadata
1563 .recv_initial_metadata_ready != nullptr;
1564 });
Mark D. Roth718c8342018-02-28 13:00:04 -08001565 GPR_ASSERT(pending != nullptr);
1566 // Return metadata.
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07001567 subchannel_call_retry_state* retry_state =
1568 static_cast<subchannel_call_retry_state*>(
1569 grpc_connected_subchannel_call_get_parent_data(
1570 batch_data->subchannel_call));
Mark D. Roth718c8342018-02-28 13:00:04 -08001571 grpc_metadata_batch_move(
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07001572 &retry_state->recv_initial_metadata,
Mark D. Roth718c8342018-02-28 13:00:04 -08001573 pending->batch->payload->recv_initial_metadata.recv_initial_metadata);
1574 // Update bookkeeping.
1575 // Note: Need to do this before invoking the callback, since invoking
1576 // the callback will result in yielding the call combiner.
1577 grpc_closure* recv_initial_metadata_ready =
1578 pending->batch->payload->recv_initial_metadata
1579 .recv_initial_metadata_ready;
1580 pending->batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
1581 nullptr;
1582 maybe_clear_pending_batch(batch_data->elem, pending);
1583 batch_data_unref(batch_data);
1584 // Invoke callback.
1585 GRPC_CLOSURE_RUN(recv_initial_metadata_ready, GRPC_ERROR_REF(error));
1586}
1587
1588// Intercepts recv_initial_metadata_ready callback for retries.
1589// Commits the call and returns the initial metadata up the stack.
1590static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
1591 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1592 grpc_call_element* elem = batch_data->elem;
1593 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1594 call_data* calld = static_cast<call_data*>(elem->call_data);
1595 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001596 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08001597 "chand=%p calld=%p: got recv_initial_metadata_ready, error=%s",
1598 chand, calld, grpc_error_string(error));
1599 }
1600 subchannel_call_retry_state* retry_state =
1601 static_cast<subchannel_call_retry_state*>(
1602 grpc_connected_subchannel_call_get_parent_data(
1603 batch_data->subchannel_call));
Mark D. Roth817d28f2018-06-14 09:44:58 -07001604 retry_state->completed_recv_initial_metadata = true;
1605 // If a retry was already dispatched, then we're not going to use the
1606 // result of this recv_initial_metadata op, so do nothing.
1607 if (retry_state->retry_dispatched) {
1608 GRPC_CALL_COMBINER_STOP(
1609 calld->call_combiner,
1610 "recv_initial_metadata_ready after retry dispatched");
1611 return;
1612 }
Mark D. Roth718c8342018-02-28 13:00:04 -08001613 // If we got an error or a Trailers-Only response and have not yet gotten
Mark D. Roth817d28f2018-06-14 09:44:58 -07001614 // the recv_trailing_metadata_ready callback, then defer propagating this
1615 // callback back to the surface. We can evaluate whether to retry when
1616 // recv_trailing_metadata comes back.
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07001617 if (GPR_UNLIKELY((retry_state->trailing_metadata_available ||
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07001618 error != GRPC_ERROR_NONE) &&
1619 !retry_state->completed_recv_trailing_metadata)) {
Mark D. Roth718c8342018-02-28 13:00:04 -08001620 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001621 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08001622 "chand=%p calld=%p: deferring recv_initial_metadata_ready "
1623 "(Trailers-Only)",
1624 chand, calld);
1625 }
Mark D. Rothde077ac2018-04-12 08:05:44 -07001626 retry_state->recv_initial_metadata_ready_deferred_batch = batch_data;
Mark D. Roth718c8342018-02-28 13:00:04 -08001627 retry_state->recv_initial_metadata_error = GRPC_ERROR_REF(error);
1628 if (!retry_state->started_recv_trailing_metadata) {
1629 // recv_trailing_metadata not yet started by application; start it
1630 // ourselves to get status.
1631 start_internal_recv_trailing_metadata(elem);
1632 } else {
1633 GRPC_CALL_COMBINER_STOP(
1634 calld->call_combiner,
1635 "recv_initial_metadata_ready trailers-only or error");
1636 }
1637 return;
1638 }
1639 // Received valid initial metadata, so commit the call.
1640 retry_commit(elem, retry_state);
Mark D. Roth817d28f2018-06-14 09:44:58 -07001641 // Invoke the callback to return the result to the surface.
Mark D. Roth718c8342018-02-28 13:00:04 -08001642 // Manually invoking a callback function; it does not take ownership of error.
1643 invoke_recv_initial_metadata_callback(batch_data, error);
Mark D. Roth718c8342018-02-28 13:00:04 -08001644}
1645
1646//
1647// recv_message callback handling
1648//
1649
1650// Invokes recv_message_ready for a subchannel batch.
1651static void invoke_recv_message_callback(void* arg, grpc_error* error) {
1652 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
Mark D. Roth718c8342018-02-28 13:00:04 -08001653 // Find pending op.
Mark D. Roth817d28f2018-06-14 09:44:58 -07001654 pending_batch* pending = pending_batch_find(
1655 batch_data->elem, "invoking recv_message_ready for",
1656 [](grpc_transport_stream_op_batch* batch) {
1657 return batch->recv_message &&
1658 batch->payload->recv_message.recv_message_ready != nullptr;
1659 });
Mark D. Roth718c8342018-02-28 13:00:04 -08001660 GPR_ASSERT(pending != nullptr);
1661 // Return payload.
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07001662 subchannel_call_retry_state* retry_state =
1663 static_cast<subchannel_call_retry_state*>(
1664 grpc_connected_subchannel_call_get_parent_data(
1665 batch_data->subchannel_call));
Mark D. Roth718c8342018-02-28 13:00:04 -08001666 *pending->batch->payload->recv_message.recv_message =
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07001667 std::move(retry_state->recv_message);
Mark D. Roth718c8342018-02-28 13:00:04 -08001668 // Update bookkeeping.
1669 // Note: Need to do this before invoking the callback, since invoking
1670 // the callback will result in yielding the call combiner.
1671 grpc_closure* recv_message_ready =
1672 pending->batch->payload->recv_message.recv_message_ready;
1673 pending->batch->payload->recv_message.recv_message_ready = nullptr;
1674 maybe_clear_pending_batch(batch_data->elem, pending);
1675 batch_data_unref(batch_data);
1676 // Invoke callback.
1677 GRPC_CLOSURE_RUN(recv_message_ready, GRPC_ERROR_REF(error));
1678}
1679
1680// Intercepts recv_message_ready callback for retries.
1681// Commits the call and returns the message up the stack.
1682static void recv_message_ready(void* arg, grpc_error* error) {
1683 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1684 grpc_call_element* elem = batch_data->elem;
1685 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1686 call_data* calld = static_cast<call_data*>(elem->call_data);
1687 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001688 gpr_log(GPR_INFO, "chand=%p calld=%p: got recv_message_ready, error=%s",
Mark D. Roth718c8342018-02-28 13:00:04 -08001689 chand, calld, grpc_error_string(error));
1690 }
1691 subchannel_call_retry_state* retry_state =
1692 static_cast<subchannel_call_retry_state*>(
1693 grpc_connected_subchannel_call_get_parent_data(
1694 batch_data->subchannel_call));
Mark D. Roth817d28f2018-06-14 09:44:58 -07001695 ++retry_state->completed_recv_message_count;
1696 // If a retry was already dispatched, then we're not going to use the
1697 // result of this recv_message op, so do nothing.
1698 if (retry_state->retry_dispatched) {
1699 GRPC_CALL_COMBINER_STOP(calld->call_combiner,
1700 "recv_message_ready after retry dispatched");
1701 return;
1702 }
Mark D. Roth718c8342018-02-28 13:00:04 -08001703 // If we got an error or the payload was nullptr and we have not yet gotten
Mark D. Roth817d28f2018-06-14 09:44:58 -07001704 // the recv_trailing_metadata_ready callback, then defer propagating this
1705 // callback back to the surface. We can evaluate whether to retry when
1706 // recv_trailing_metadata comes back.
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07001707 if (GPR_UNLIKELY(
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07001708 (retry_state->recv_message == nullptr || error != GRPC_ERROR_NONE) &&
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07001709 !retry_state->completed_recv_trailing_metadata)) {
Mark D. Roth718c8342018-02-28 13:00:04 -08001710 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001711 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08001712 "chand=%p calld=%p: deferring recv_message_ready (nullptr "
1713 "message and recv_trailing_metadata pending)",
1714 chand, calld);
1715 }
Mark D. Rothde077ac2018-04-12 08:05:44 -07001716 retry_state->recv_message_ready_deferred_batch = batch_data;
Mark D. Roth718c8342018-02-28 13:00:04 -08001717 retry_state->recv_message_error = GRPC_ERROR_REF(error);
1718 if (!retry_state->started_recv_trailing_metadata) {
1719 // recv_trailing_metadata not yet started by application; start it
1720 // ourselves to get status.
1721 start_internal_recv_trailing_metadata(elem);
1722 } else {
1723 GRPC_CALL_COMBINER_STOP(calld->call_combiner, "recv_message_ready null");
1724 }
1725 return;
1726 }
1727 // Received a valid message, so commit the call.
1728 retry_commit(elem, retry_state);
Mark D. Roth817d28f2018-06-14 09:44:58 -07001729 // Invoke the callback to return the result to the surface.
Mark D. Roth718c8342018-02-28 13:00:04 -08001730 // Manually invoking a callback function; it does not take ownership of error.
1731 invoke_recv_message_callback(batch_data, error);
Mark D. Roth8ae0ff02018-06-08 08:37:23 -07001732}
1733
Mark D. Rothf3715132018-06-08 14:22:12 -07001734//
Mark D. Roth817d28f2018-06-14 09:44:58 -07001735// recv_trailing_metadata handling
Mark D. Rothf3715132018-06-08 14:22:12 -07001736//
1737
Mark D. Roth817d28f2018-06-14 09:44:58 -07001738// Sets *status and *server_pushback_md based on batch_data and error.
1739static void get_call_status(subchannel_batch_data* batch_data,
1740 grpc_error* error, grpc_status_code* status,
1741 grpc_mdelem** server_pushback_md) {
1742 grpc_call_element* elem = batch_data->elem;
1743 call_data* calld = static_cast<call_data*>(elem->call_data);
1744 if (error != GRPC_ERROR_NONE) {
1745 grpc_error_get_status(error, calld->deadline, status, nullptr, nullptr,
1746 nullptr);
1747 } else {
1748 grpc_metadata_batch* md_batch =
1749 batch_data->batch.payload->recv_trailing_metadata
1750 .recv_trailing_metadata;
1751 GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
1752 *status =
1753 grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md);
1754 if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
1755 *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
1756 }
1757 }
1758 GRPC_ERROR_UNREF(error);
1759}
Mark D. Rothf3715132018-06-08 14:22:12 -07001760
Mark D. Roth817d28f2018-06-14 09:44:58 -07001761// Adds recv_trailing_metadata_ready closure to closures.
1762static void add_closure_for_recv_trailing_metadata_ready(
1763 grpc_call_element* elem, subchannel_batch_data* batch_data,
1764 grpc_error* error, grpc_core::CallCombinerClosureList* closures) {
1765 // Find pending batch.
1766 pending_batch* pending = pending_batch_find(
1767 elem, "invoking recv_trailing_metadata for",
1768 [](grpc_transport_stream_op_batch* batch) {
1769 return batch->recv_trailing_metadata &&
1770 batch->payload->recv_trailing_metadata
1771 .recv_trailing_metadata_ready != nullptr;
1772 });
1773 // If we generated the recv_trailing_metadata op internally via
1774 // start_internal_recv_trailing_metadata(), then there will be no
1775 // pending batch.
1776 if (pending == nullptr) {
1777 GRPC_ERROR_UNREF(error);
1778 return;
1779 }
1780 // Return metadata.
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07001781 subchannel_call_retry_state* retry_state =
1782 static_cast<subchannel_call_retry_state*>(
1783 grpc_connected_subchannel_call_get_parent_data(
1784 batch_data->subchannel_call));
Mark D. Roth817d28f2018-06-14 09:44:58 -07001785 grpc_metadata_batch_move(
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07001786 &retry_state->recv_trailing_metadata,
Mark D. Roth817d28f2018-06-14 09:44:58 -07001787 pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata);
1788 // Add closure.
1789 closures->Add(pending->batch->payload->recv_trailing_metadata
1790 .recv_trailing_metadata_ready,
1791 error, "recv_trailing_metadata_ready for pending batch");
1792 // Update bookkeeping.
1793 pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1794 nullptr;
1795 maybe_clear_pending_batch(elem, pending);
1796}
1797
1798// Adds any necessary closures for deferred recv_initial_metadata and
1799// recv_message callbacks to closures.
1800static void add_closures_for_deferred_recv_callbacks(
1801 subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state,
1802 grpc_core::CallCombinerClosureList* closures) {
1803 if (batch_data->batch.recv_trailing_metadata) {
1804 // Add closure for deferred recv_initial_metadata_ready.
1805 if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch !=
1806 nullptr)) {
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07001807 GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
Mark D. Roth817d28f2018-06-14 09:44:58 -07001808 invoke_recv_initial_metadata_callback,
1809 retry_state->recv_initial_metadata_ready_deferred_batch,
1810 grpc_schedule_on_exec_ctx);
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07001811 closures->Add(&retry_state->recv_initial_metadata_ready,
Mark D. Roth817d28f2018-06-14 09:44:58 -07001812 retry_state->recv_initial_metadata_error,
1813 "resuming recv_initial_metadata_ready");
1814 retry_state->recv_initial_metadata_ready_deferred_batch = nullptr;
1815 }
1816 // Add closure for deferred recv_message_ready.
1817 if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch !=
1818 nullptr)) {
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07001819 GRPC_CLOSURE_INIT(&retry_state->recv_message_ready,
Mark D. Roth817d28f2018-06-14 09:44:58 -07001820 invoke_recv_message_callback,
1821 retry_state->recv_message_ready_deferred_batch,
1822 grpc_schedule_on_exec_ctx);
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07001823 closures->Add(&retry_state->recv_message_ready,
Mark D. Roth817d28f2018-06-14 09:44:58 -07001824 retry_state->recv_message_error,
1825 "resuming recv_message_ready");
1826 retry_state->recv_message_ready_deferred_batch = nullptr;
1827 }
1828 }
1829}
1830
1831// Returns true if any op in the batch was not yet started.
1832// Only looks at send ops, since recv ops are always started immediately.
1833static bool pending_batch_is_unstarted(
1834 pending_batch* pending, call_data* calld,
1835 subchannel_call_retry_state* retry_state) {
1836 if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
1837 return false;
1838 }
1839 if (pending->batch->send_initial_metadata &&
1840 !retry_state->started_send_initial_metadata) {
1841 return true;
1842 }
1843 if (pending->batch->send_message &&
1844 retry_state->started_send_message_count < calld->send_messages->size()) {
1845 return true;
1846 }
1847 if (pending->batch->send_trailing_metadata &&
1848 !retry_state->started_send_trailing_metadata) {
1849 return true;
1850 }
1851 return false;
1852}
1853
1854// For any pending batch containing an op that has not yet been started,
1855// adds the pending batch's completion closures to closures.
1856static void add_closures_to_fail_unstarted_pending_batches(
1857 grpc_call_element* elem, subchannel_call_retry_state* retry_state,
1858 grpc_error* error, grpc_core::CallCombinerClosureList* closures) {
Mark D. Roth8ae0ff02018-06-08 08:37:23 -07001859 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1860 call_data* calld = static_cast<call_data*>(elem->call_data);
Mark D. Roth817d28f2018-06-14 09:44:58 -07001861 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1862 pending_batch* pending = &calld->pending_batches[i];
1863 if (pending_batch_is_unstarted(pending, calld, retry_state)) {
Mark D. Rothf3715132018-06-08 14:22:12 -07001864 if (grpc_client_channel_trace.enabled()) {
1865 gpr_log(GPR_INFO,
Mark D. Roth817d28f2018-06-14 09:44:58 -07001866 "chand=%p calld=%p: failing unstarted pending batch at index "
1867 "%" PRIuPTR,
1868 chand, calld, i);
Mark D. Rothf3715132018-06-08 14:22:12 -07001869 }
Mark D. Roth817d28f2018-06-14 09:44:58 -07001870 closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error),
1871 "failing on_complete for pending batch");
1872 pending->batch->on_complete = nullptr;
1873 maybe_clear_pending_batch(elem, pending);
Mark D. Rothf3715132018-06-08 14:22:12 -07001874 }
Mark D. Roth8ae0ff02018-06-08 08:37:23 -07001875 }
Mark D. Roth817d28f2018-06-14 09:44:58 -07001876 GRPC_ERROR_UNREF(error);
1877}
1878
1879// Runs necessary closures upon completion of a call attempt.
1880static void run_closures_for_completed_call(subchannel_batch_data* batch_data,
1881 grpc_error* error) {
1882 grpc_call_element* elem = batch_data->elem;
1883 call_data* calld = static_cast<call_data*>(elem->call_data);
1884 subchannel_call_retry_state* retry_state =
1885 static_cast<subchannel_call_retry_state*>(
1886 grpc_connected_subchannel_call_get_parent_data(
1887 batch_data->subchannel_call));
1888 // Construct list of closures to execute.
1889 grpc_core::CallCombinerClosureList closures;
1890 // First, add closure for recv_trailing_metadata_ready.
1891 add_closure_for_recv_trailing_metadata_ready(
1892 elem, batch_data, GRPC_ERROR_REF(error), &closures);
1893 // If there are deferred recv_initial_metadata_ready or recv_message_ready
1894 // callbacks, add them to closures.
1895 add_closures_for_deferred_recv_callbacks(batch_data, retry_state, &closures);
1896 // Add closures to fail any pending batches that have not yet been started.
1897 add_closures_to_fail_unstarted_pending_batches(
1898 elem, retry_state, GRPC_ERROR_REF(error), &closures);
1899 // Don't need batch_data anymore.
1900 batch_data_unref(batch_data);
1901 // Schedule all of the closures identified above.
1902 // Note: This will release the call combiner.
1903 closures.RunClosures(calld->call_combiner);
1904 GRPC_ERROR_UNREF(error);
1905}
1906
1907// Intercepts recv_trailing_metadata_ready callback for retries.
1908// Commits the call and returns the trailing metadata up the stack.
1909static void recv_trailing_metadata_ready(void* arg, grpc_error* error) {
1910 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1911 grpc_call_element* elem = batch_data->elem;
1912 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1913 call_data* calld = static_cast<call_data*>(elem->call_data);
1914 if (grpc_client_channel_trace.enabled()) {
1915 gpr_log(GPR_INFO,
1916 "chand=%p calld=%p: got recv_trailing_metadata_ready, error=%s",
1917 chand, calld, grpc_error_string(error));
1918 }
1919 subchannel_call_retry_state* retry_state =
1920 static_cast<subchannel_call_retry_state*>(
1921 grpc_connected_subchannel_call_get_parent_data(
1922 batch_data->subchannel_call));
1923 retry_state->completed_recv_trailing_metadata = true;
1924 // Get the call's status and check for server pushback metadata.
1925 grpc_status_code status = GRPC_STATUS_OK;
1926 grpc_mdelem* server_pushback_md = nullptr;
1927 get_call_status(batch_data, GRPC_ERROR_REF(error), &status,
1928 &server_pushback_md);
1929 if (grpc_client_channel_trace.enabled()) {
1930 gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand,
1931 calld, grpc_status_code_to_string(status));
1932 }
1933 // Check if we should retry.
1934 if (maybe_retry(elem, batch_data, status, server_pushback_md)) {
1935 // Unref batch_data for deferred recv_initial_metadata_ready or
1936 // recv_message_ready callbacks, if any.
1937 if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) {
1938 batch_data_unref(batch_data);
1939 GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
1940 }
1941 if (retry_state->recv_message_ready_deferred_batch != nullptr) {
1942 batch_data_unref(batch_data);
1943 GRPC_ERROR_UNREF(retry_state->recv_message_error);
1944 }
1945 batch_data_unref(batch_data);
1946 return;
1947 }
1948 // Not retrying, so commit the call.
1949 retry_commit(elem, retry_state);
1950 // Run any necessary closures.
1951 run_closures_for_completed_call(batch_data, GRPC_ERROR_REF(error));
Mark D. Rothde077ac2018-04-12 08:05:44 -07001952}
1953
1954//
Mark D. Roth718c8342018-02-28 13:00:04 -08001955// on_complete callback handling
1956//
1957
Mark D. Roth817d28f2018-06-14 09:44:58 -07001958// Adds the on_complete closure for the pending batch completed in
1959// batch_data to closures.
1960static void add_closure_for_completed_pending_batch(
1961 grpc_call_element* elem, subchannel_batch_data* batch_data,
1962 subchannel_call_retry_state* retry_state, grpc_error* error,
1963 grpc_core::CallCombinerClosureList* closures) {
1964 pending_batch* pending = pending_batch_find(
1965 elem, "completed", [batch_data](grpc_transport_stream_op_batch* batch) {
1966 // Match the pending batch with the same set of send ops as the
1967 // subchannel batch we've just completed.
1968 return batch->on_complete != nullptr &&
1969 batch_data->batch.send_initial_metadata ==
1970 batch->send_initial_metadata &&
1971 batch_data->batch.send_message == batch->send_message &&
1972 batch_data->batch.send_trailing_metadata ==
1973 batch->send_trailing_metadata;
1974 });
1975 // If batch_data is a replay batch, then there will be no pending
1976 // batch to complete.
1977 if (pending == nullptr) {
1978 GRPC_ERROR_UNREF(error);
1979 return;
Mark D. Roth718c8342018-02-28 13:00:04 -08001980 }
Mark D. Roth817d28f2018-06-14 09:44:58 -07001981 // Add closure.
1982 closures->Add(pending->batch->on_complete, error,
1983 "on_complete for pending batch");
1984 pending->batch->on_complete = nullptr;
1985 maybe_clear_pending_batch(elem, pending);
Mark D. Roth718c8342018-02-28 13:00:04 -08001986}
1987
1988// If there are any cached ops to replay or pending ops to start on the
1989// subchannel call, adds a closure to closures to invoke
Mark D. Roth817d28f2018-06-14 09:44:58 -07001990// start_retriable_subchannel_batches().
Mark D. Roth718c8342018-02-28 13:00:04 -08001991static void add_closures_for_replay_or_pending_send_ops(
1992 grpc_call_element* elem, subchannel_batch_data* batch_data,
Mark D. Roth817d28f2018-06-14 09:44:58 -07001993 subchannel_call_retry_state* retry_state,
1994 grpc_core::CallCombinerClosureList* closures) {
Mark D. Roth718c8342018-02-28 13:00:04 -08001995 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1996 call_data* calld = static_cast<call_data*>(elem->call_data);
1997 bool have_pending_send_message_ops =
Mark D. Rothefcd45b2018-03-28 10:49:59 -07001998 retry_state->started_send_message_count < calld->send_messages->size();
Mark D. Roth718c8342018-02-28 13:00:04 -08001999 bool have_pending_send_trailing_metadata_op =
2000 calld->seen_send_trailing_metadata &&
2001 !retry_state->started_send_trailing_metadata;
2002 if (!have_pending_send_message_ops &&
2003 !have_pending_send_trailing_metadata_op) {
2004 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
2005 pending_batch* pending = &calld->pending_batches[i];
2006 grpc_transport_stream_op_batch* batch = pending->batch;
2007 if (batch == nullptr || pending->send_ops_cached) continue;
2008 if (batch->send_message) have_pending_send_message_ops = true;
2009 if (batch->send_trailing_metadata) {
2010 have_pending_send_trailing_metadata_op = true;
2011 }
2012 }
2013 }
2014 if (have_pending_send_message_ops || have_pending_send_trailing_metadata_op) {
2015 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07002016 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08002017 "chand=%p calld=%p: starting next batch for pending send op(s)",
2018 chand, calld);
2019 }
Mark D. Roth817d28f2018-06-14 09:44:58 -07002020 GRPC_CLOSURE_INIT(&batch_data->batch.handler_private.closure,
2021 start_retriable_subchannel_batches, elem,
2022 grpc_schedule_on_exec_ctx);
2023 closures->Add(&batch_data->batch.handler_private.closure, GRPC_ERROR_NONE,
2024 "starting next batch for send_* op(s)");
Mark D. Roth718c8342018-02-28 13:00:04 -08002025 }
2026}
2027
Mark D. Roth718c8342018-02-28 13:00:04 -08002028// Callback used to intercept on_complete from subchannel calls.
2029// Called only when retries are enabled.
2030static void on_complete(void* arg, grpc_error* error) {
2031 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
2032 grpc_call_element* elem = batch_data->elem;
2033 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2034 call_data* calld = static_cast<call_data*>(elem->call_data);
2035 if (grpc_client_channel_trace.enabled()) {
2036 char* batch_str = grpc_transport_stream_op_batch_string(&batch_data->batch);
Mark D. Roth48854d22018-04-25 13:05:26 -07002037 gpr_log(GPR_INFO, "chand=%p calld=%p: got on_complete, error=%s, batch=%s",
Mark D. Roth718c8342018-02-28 13:00:04 -08002038 chand, calld, grpc_error_string(error), batch_str);
2039 gpr_free(batch_str);
2040 }
2041 subchannel_call_retry_state* retry_state =
2042 static_cast<subchannel_call_retry_state*>(
2043 grpc_connected_subchannel_call_get_parent_data(
2044 batch_data->subchannel_call));
Mark D. Roth718c8342018-02-28 13:00:04 -08002045 // Update bookkeeping in retry_state.
Mark D. Roth817d28f2018-06-14 09:44:58 -07002046 if (batch_data->batch.send_initial_metadata) {
2047 retry_state->completed_send_initial_metadata = true;
Mark D. Roth718c8342018-02-28 13:00:04 -08002048 }
Mark D. Roth817d28f2018-06-14 09:44:58 -07002049 if (batch_data->batch.send_message) {
2050 ++retry_state->completed_send_message_count;
2051 }
2052 if (batch_data->batch.send_trailing_metadata) {
2053 retry_state->completed_send_trailing_metadata = true;
2054 }
2055 // If the call is committed, free cached data for send ops that we've just
2056 // completed.
2057 if (calld->retry_committed) {
Mark D. Roth718c8342018-02-28 13:00:04 -08002058 free_cached_send_op_data_for_completed_batch(elem, batch_data, retry_state);
2059 }
Mark D. Roth718c8342018-02-28 13:00:04 -08002060 // Construct list of closures to execute.
Mark D. Roth817d28f2018-06-14 09:44:58 -07002061 grpc_core::CallCombinerClosureList closures;
2062 // If a retry was already dispatched, that means we saw
2063 // recv_trailing_metadata before this, so we do nothing here.
2064 // Otherwise, invoke the callback to return the result to the surface.
2065 if (!retry_state->retry_dispatched) {
2066 // Add closure for the completed pending batch, if any.
2067 add_closure_for_completed_pending_batch(elem, batch_data, retry_state,
2068 GRPC_ERROR_REF(error), &closures);
2069 // If needed, add a callback to start any replay or pending send ops on
2070 // the subchannel call.
2071 if (!retry_state->completed_recv_trailing_metadata) {
2072 add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state,
2073 &closures);
2074 }
Mark D. Roth718c8342018-02-28 13:00:04 -08002075 }
Mark D. Roth4f9e0032018-05-24 09:30:09 -07002076 // Track number of pending subchannel send batches and determine if this
2077 // was the last one.
Mark D. Roth817d28f2018-06-14 09:44:58 -07002078 --calld->num_pending_retriable_subchannel_send_batches;
2079 const bool last_send_batch_complete =
2080 calld->num_pending_retriable_subchannel_send_batches == 0;
Mark D. Roth718c8342018-02-28 13:00:04 -08002081 // Don't need batch_data anymore.
2082 batch_data_unref(batch_data);
2083 // Schedule all of the closures identified above.
Mark D. Roth4f9e0032018-05-24 09:30:09 -07002084 // Note: This yeilds the call combiner.
Mark D. Roth817d28f2018-06-14 09:44:58 -07002085 closures.RunClosures(calld->call_combiner);
2086 // If this was the last subchannel send batch, unref the call stack.
2087 if (last_send_batch_complete) {
Mark D. Roth4f9e0032018-05-24 09:30:09 -07002088 GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches");
2089 }
Mark D. Roth718c8342018-02-28 13:00:04 -08002090}
2091
2092//
2093// subchannel batch construction
2094//
2095
2096// Helper function used to start a subchannel batch in the call combiner.
2097static void start_batch_in_call_combiner(void* arg, grpc_error* ignored) {
2098 grpc_transport_stream_op_batch* batch =
2099 static_cast<grpc_transport_stream_op_batch*>(arg);
2100 grpc_subchannel_call* subchannel_call =
2101 static_cast<grpc_subchannel_call*>(batch->handler_private.extra_arg);
2102 // Note: This will release the call combiner.
2103 grpc_subchannel_call_process_op(subchannel_call, batch);
2104}
2105
Mark D. Rothde077ac2018-04-12 08:05:44 -07002106// Adds a closure to closures that will execute batch in the call combiner.
2107static void add_closure_for_subchannel_batch(
Mark D. Roth817d28f2018-06-14 09:44:58 -07002108 grpc_call_element* elem, grpc_transport_stream_op_batch* batch,
2109 grpc_core::CallCombinerClosureList* closures) {
2110 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2111 call_data* calld = static_cast<call_data*>(elem->call_data);
Mark D. Rothde077ac2018-04-12 08:05:44 -07002112 batch->handler_private.extra_arg = calld->subchannel_call;
2113 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2114 start_batch_in_call_combiner, batch,
2115 grpc_schedule_on_exec_ctx);
Mark D. Rothde077ac2018-04-12 08:05:44 -07002116 if (grpc_client_channel_trace.enabled()) {
2117 char* batch_str = grpc_transport_stream_op_batch_string(batch);
Mark D. Roth817d28f2018-06-14 09:44:58 -07002118 gpr_log(GPR_INFO, "chand=%p calld=%p: starting subchannel batch: %s", chand,
2119 calld, batch_str);
Mark D. Rothde077ac2018-04-12 08:05:44 -07002120 gpr_free(batch_str);
Mark D. Rothde077ac2018-04-12 08:05:44 -07002121 }
Mark D. Roth817d28f2018-06-14 09:44:58 -07002122 closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
2123 "start_subchannel_batch");
Mark D. Rothde077ac2018-04-12 08:05:44 -07002124}
2125
Mark D. Roth718c8342018-02-28 13:00:04 -08002126// Adds retriable send_initial_metadata op to batch_data.
2127static void add_retriable_send_initial_metadata_op(
2128 call_data* calld, subchannel_call_retry_state* retry_state,
2129 subchannel_batch_data* batch_data) {
2130 // Maps the number of retries to the corresponding metadata value slice.
2131 static const grpc_slice* retry_count_strings[] = {
2132 &GRPC_MDSTR_1, &GRPC_MDSTR_2, &GRPC_MDSTR_3, &GRPC_MDSTR_4};
2133 // We need to make a copy of the metadata batch for each attempt, since
2134 // the filters in the subchannel stack may modify this batch, and we don't
2135 // want those modifications to be passed forward to subsequent attempts.
2136 //
2137 // If we've already completed one or more attempts, add the
2138 // grpc-retry-attempts header.
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07002139 retry_state->send_initial_metadata_storage =
Mark D. Roth718c8342018-02-28 13:00:04 -08002140 static_cast<grpc_linked_mdelem*>(gpr_arena_alloc(
2141 calld->arena, sizeof(grpc_linked_mdelem) *
2142 (calld->send_initial_metadata.list.count +
2143 (calld->num_attempts_completed > 0))));
2144 grpc_metadata_batch_copy(&calld->send_initial_metadata,
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07002145 &retry_state->send_initial_metadata,
2146 retry_state->send_initial_metadata_storage);
2147 if (GPR_UNLIKELY(retry_state->send_initial_metadata.idx.named
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07002148 .grpc_previous_rpc_attempts != nullptr)) {
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07002149 grpc_metadata_batch_remove(&retry_state->send_initial_metadata,
2150 retry_state->send_initial_metadata.idx.named
2151 .grpc_previous_rpc_attempts);
Mark D. Roth718c8342018-02-28 13:00:04 -08002152 }
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07002153 if (GPR_UNLIKELY(calld->num_attempts_completed > 0)) {
Mark D. Roth718c8342018-02-28 13:00:04 -08002154 grpc_mdelem retry_md = grpc_mdelem_from_slices(
2155 GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS,
2156 *retry_count_strings[calld->num_attempts_completed - 1]);
2157 grpc_error* error = grpc_metadata_batch_add_tail(
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07002158 &retry_state->send_initial_metadata,
2159 &retry_state->send_initial_metadata_storage[calld->send_initial_metadata
2160 .list.count],
Mark D. Roth718c8342018-02-28 13:00:04 -08002161 retry_md);
Yash Tibrewal7f51ba82018-04-12 13:21:20 -07002162 if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
Mark D. Roth718c8342018-02-28 13:00:04 -08002163 gpr_log(GPR_ERROR, "error adding retry metadata: %s",
2164 grpc_error_string(error));
2165 GPR_ASSERT(false);
2166 }
2167 }
2168 retry_state->started_send_initial_metadata = true;
2169 batch_data->batch.send_initial_metadata = true;
2170 batch_data->batch.payload->send_initial_metadata.send_initial_metadata =
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07002171 &retry_state->send_initial_metadata;
Mark D. Roth718c8342018-02-28 13:00:04 -08002172 batch_data->batch.payload->send_initial_metadata.send_initial_metadata_flags =
2173 calld->send_initial_metadata_flags;
2174 batch_data->batch.payload->send_initial_metadata.peer_string =
2175 calld->peer_string;
2176}
2177
2178// Adds retriable send_message op to batch_data.
2179static void add_retriable_send_message_op(
2180 grpc_call_element* elem, subchannel_call_retry_state* retry_state,
2181 subchannel_batch_data* batch_data) {
2182 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2183 call_data* calld = static_cast<call_data*>(elem->call_data);
2184 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07002185 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08002186 "chand=%p calld=%p: starting calld->send_messages[%" PRIuPTR "]",
2187 chand, calld, retry_state->started_send_message_count);
2188 }
Mark D. Roth3d8b32d2018-03-09 13:25:40 -08002189 grpc_core::ByteStreamCache* cache =
Mark D. Rothefcd45b2018-03-28 10:49:59 -07002190 (*calld->send_messages)[retry_state->started_send_message_count];
Mark D. Roth718c8342018-02-28 13:00:04 -08002191 ++retry_state->started_send_message_count;
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07002192 retry_state->send_message.Init(cache);
Mark D. Roth718c8342018-02-28 13:00:04 -08002193 batch_data->batch.send_message = true;
Mark D. Roth3d8b32d2018-03-09 13:25:40 -08002194 batch_data->batch.payload->send_message.send_message.reset(
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07002195 retry_state->send_message.get());
Mark D. Roth718c8342018-02-28 13:00:04 -08002196}
2197
2198// Adds retriable send_trailing_metadata op to batch_data.
2199static void add_retriable_send_trailing_metadata_op(
2200 call_data* calld, subchannel_call_retry_state* retry_state,
2201 subchannel_batch_data* batch_data) {
2202 // We need to make a copy of the metadata batch for each attempt, since
2203 // the filters in the subchannel stack may modify this batch, and we don't
2204 // want those modifications to be passed forward to subsequent attempts.
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07002205 retry_state->send_trailing_metadata_storage =
Mark D. Roth718c8342018-02-28 13:00:04 -08002206 static_cast<grpc_linked_mdelem*>(gpr_arena_alloc(
2207 calld->arena, sizeof(grpc_linked_mdelem) *
2208 calld->send_trailing_metadata.list.count));
2209 grpc_metadata_batch_copy(&calld->send_trailing_metadata,
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07002210 &retry_state->send_trailing_metadata,
2211 retry_state->send_trailing_metadata_storage);
Mark D. Roth718c8342018-02-28 13:00:04 -08002212 retry_state->started_send_trailing_metadata = true;
2213 batch_data->batch.send_trailing_metadata = true;
2214 batch_data->batch.payload->send_trailing_metadata.send_trailing_metadata =
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07002215 &retry_state->send_trailing_metadata;
Mark D. Roth718c8342018-02-28 13:00:04 -08002216}
2217
2218// Adds retriable recv_initial_metadata op to batch_data.
2219static void add_retriable_recv_initial_metadata_op(
2220 call_data* calld, subchannel_call_retry_state* retry_state,
2221 subchannel_batch_data* batch_data) {
2222 retry_state->started_recv_initial_metadata = true;
2223 batch_data->batch.recv_initial_metadata = true;
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07002224 grpc_metadata_batch_init(&retry_state->recv_initial_metadata);
Mark D. Roth718c8342018-02-28 13:00:04 -08002225 batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata =
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07002226 &retry_state->recv_initial_metadata;
Mark D. Roth718c8342018-02-28 13:00:04 -08002227 batch_data->batch.payload->recv_initial_metadata.trailing_metadata_available =
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07002228 &retry_state->trailing_metadata_available;
2229 GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
Mark D. Roth718c8342018-02-28 13:00:04 -08002230 recv_initial_metadata_ready, batch_data,
2231 grpc_schedule_on_exec_ctx);
2232 batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata_ready =
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07002233 &retry_state->recv_initial_metadata_ready;
Mark D. Roth718c8342018-02-28 13:00:04 -08002234}
2235
2236// Adds retriable recv_message op to batch_data.
2237static void add_retriable_recv_message_op(
2238 call_data* calld, subchannel_call_retry_state* retry_state,
2239 subchannel_batch_data* batch_data) {
2240 ++retry_state->started_recv_message_count;
2241 batch_data->batch.recv_message = true;
2242 batch_data->batch.payload->recv_message.recv_message =
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07002243 &retry_state->recv_message;
2244 GRPC_CLOSURE_INIT(&retry_state->recv_message_ready, recv_message_ready,
Mark D. Roth718c8342018-02-28 13:00:04 -08002245 batch_data, grpc_schedule_on_exec_ctx);
2246 batch_data->batch.payload->recv_message.recv_message_ready =
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07002247 &retry_state->recv_message_ready;
Mark D. Roth718c8342018-02-28 13:00:04 -08002248}
2249
2250// Adds retriable recv_trailing_metadata op to batch_data.
2251static void add_retriable_recv_trailing_metadata_op(
2252 call_data* calld, subchannel_call_retry_state* retry_state,
2253 subchannel_batch_data* batch_data) {
2254 retry_state->started_recv_trailing_metadata = true;
2255 batch_data->batch.recv_trailing_metadata = true;
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07002256 grpc_metadata_batch_init(&retry_state->recv_trailing_metadata);
Mark D. Roth718c8342018-02-28 13:00:04 -08002257 batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata =
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07002258 &retry_state->recv_trailing_metadata;
Mark D. Roth817d28f2018-06-14 09:44:58 -07002259 batch_data->batch.payload->recv_trailing_metadata.collect_stats =
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07002260 &retry_state->collect_stats;
2261 GRPC_CLOSURE_INIT(&retry_state->recv_trailing_metadata_ready,
Mark D. Roth817d28f2018-06-14 09:44:58 -07002262 recv_trailing_metadata_ready, batch_data,
2263 grpc_schedule_on_exec_ctx);
2264 batch_data->batch.payload->recv_trailing_metadata
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07002265 .recv_trailing_metadata_ready =
2266 &retry_state->recv_trailing_metadata_ready;
Mark D. Roth718c8342018-02-28 13:00:04 -08002267}
2268
2269// Helper function used to start a recv_trailing_metadata batch. This
2270// is used in the case where a recv_initial_metadata or recv_message
2271// op fails in a way that we know the call is over but when the application
2272// has not yet started its own recv_trailing_metadata op.
2273static void start_internal_recv_trailing_metadata(grpc_call_element* elem) {
2274 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2275 call_data* calld = static_cast<call_data*>(elem->call_data);
2276 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07002277 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08002278 "chand=%p calld=%p: call failed but recv_trailing_metadata not "
2279 "started; starting it internally",
2280 chand, calld);
2281 }
2282 subchannel_call_retry_state* retry_state =
2283 static_cast<subchannel_call_retry_state*>(
2284 grpc_connected_subchannel_call_get_parent_data(
2285 calld->subchannel_call));
Mark D. Rothde077ac2018-04-12 08:05:44 -07002286 // Create batch_data with 2 refs, since this batch will be unreffed twice:
Mark D. Roth817d28f2018-06-14 09:44:58 -07002287 // once for the recv_trailing_metadata_ready callback when the subchannel
2288 // batch returns, and again when we actually get a recv_trailing_metadata
2289 // op from the surface.
2290 subchannel_batch_data* batch_data =
2291 batch_data_create(elem, 2, false /* set_on_complete */);
Mark D. Roth718c8342018-02-28 13:00:04 -08002292 add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
Mark D. Rothde077ac2018-04-12 08:05:44 -07002293 retry_state->recv_trailing_metadata_internal_batch = batch_data;
Mark D. Roth718c8342018-02-28 13:00:04 -08002294 // Note: This will release the call combiner.
2295 grpc_subchannel_call_process_op(calld->subchannel_call, &batch_data->batch);
2296}
2297
2298// If there are any cached send ops that need to be replayed on the
2299// current subchannel call, creates and returns a new subchannel batch
2300// to replay those ops. Otherwise, returns nullptr.
2301static subchannel_batch_data* maybe_create_subchannel_batch_for_replay(
2302 grpc_call_element* elem, subchannel_call_retry_state* retry_state) {
2303 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2304 call_data* calld = static_cast<call_data*>(elem->call_data);
2305 subchannel_batch_data* replay_batch_data = nullptr;
2306 // send_initial_metadata.
2307 if (calld->seen_send_initial_metadata &&
2308 !retry_state->started_send_initial_metadata &&
2309 !calld->pending_send_initial_metadata) {
2310 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07002311 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08002312 "chand=%p calld=%p: replaying previously completed "
2313 "send_initial_metadata op",
2314 chand, calld);
2315 }
Mark D. Roth817d28f2018-06-14 09:44:58 -07002316 replay_batch_data = batch_data_create(elem, 1, true /* set_on_complete */);
Mark D. Roth718c8342018-02-28 13:00:04 -08002317 add_retriable_send_initial_metadata_op(calld, retry_state,
2318 replay_batch_data);
2319 }
2320 // send_message.
2321 // Note that we can only have one send_message op in flight at a time.
Mark D. Rothefcd45b2018-03-28 10:49:59 -07002322 if (retry_state->started_send_message_count < calld->send_messages->size() &&
Mark D. Roth718c8342018-02-28 13:00:04 -08002323 retry_state->started_send_message_count ==
2324 retry_state->completed_send_message_count &&
2325 !calld->pending_send_message) {
2326 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07002327 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08002328 "chand=%p calld=%p: replaying previously completed "
2329 "send_message op",
2330 chand, calld);
2331 }
2332 if (replay_batch_data == nullptr) {
Mark D. Roth817d28f2018-06-14 09:44:58 -07002333 replay_batch_data =
2334 batch_data_create(elem, 1, true /* set_on_complete */);
Mark D. Roth718c8342018-02-28 13:00:04 -08002335 }
2336 add_retriable_send_message_op(elem, retry_state, replay_batch_data);
2337 }
2338 // send_trailing_metadata.
2339 // Note that we only add this op if we have no more send_message ops
2340 // to start, since we can't send down any more send_message ops after
2341 // send_trailing_metadata.
2342 if (calld->seen_send_trailing_metadata &&
Mark D. Rothefcd45b2018-03-28 10:49:59 -07002343 retry_state->started_send_message_count == calld->send_messages->size() &&
Mark D. Roth718c8342018-02-28 13:00:04 -08002344 !retry_state->started_send_trailing_metadata &&
2345 !calld->pending_send_trailing_metadata) {
2346 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07002347 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08002348 "chand=%p calld=%p: replaying previously completed "
2349 "send_trailing_metadata op",
2350 chand, calld);
2351 }
2352 if (replay_batch_data == nullptr) {
Mark D. Roth817d28f2018-06-14 09:44:58 -07002353 replay_batch_data =
2354 batch_data_create(elem, 1, true /* set_on_complete */);
Mark D. Roth718c8342018-02-28 13:00:04 -08002355 }
2356 add_retriable_send_trailing_metadata_op(calld, retry_state,
2357 replay_batch_data);
2358 }
2359 return replay_batch_data;
2360}
2361
2362// Adds subchannel batches for pending batches to batches, updating
2363// *num_batches as needed.
2364static void add_subchannel_batches_for_pending_batches(
2365 grpc_call_element* elem, subchannel_call_retry_state* retry_state,
Mark D. Roth817d28f2018-06-14 09:44:58 -07002366 grpc_core::CallCombinerClosureList* closures) {
Mark D. Roth718c8342018-02-28 13:00:04 -08002367 call_data* calld = static_cast<call_data*>(elem->call_data);
2368 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
2369 pending_batch* pending = &calld->pending_batches[i];
2370 grpc_transport_stream_op_batch* batch = pending->batch;
2371 if (batch == nullptr) continue;
2372 // Skip any batch that either (a) has already been started on this
2373 // subchannel call or (b) we can't start yet because we're still
2374 // replaying send ops that need to be completed first.
2375 // TODO(roth): Note that if any one op in the batch can't be sent
2376 // yet due to ops that we're replaying, we don't start any of the ops
2377 // in the batch. This is probably okay, but it could conceivably
2378 // lead to increased latency in some cases -- e.g., we could delay
2379 // starting a recv op due to it being in the same batch with a send
2380 // op. If/when we revamp the callback protocol in
2381 // transport_stream_op_batch, we may be able to fix this.
2382 if (batch->send_initial_metadata &&
2383 retry_state->started_send_initial_metadata) {
2384 continue;
2385 }
2386 if (batch->send_message && retry_state->completed_send_message_count <
2387 retry_state->started_send_message_count) {
2388 continue;
2389 }
2390 // Note that we only start send_trailing_metadata if we have no more
2391 // send_message ops to start, since we can't send down any more
2392 // send_message ops after send_trailing_metadata.
2393 if (batch->send_trailing_metadata &&
2394 (retry_state->started_send_message_count + batch->send_message <
Mark D. Rothefcd45b2018-03-28 10:49:59 -07002395 calld->send_messages->size() ||
Mark D. Roth718c8342018-02-28 13:00:04 -08002396 retry_state->started_send_trailing_metadata)) {
2397 continue;
2398 }
2399 if (batch->recv_initial_metadata &&
2400 retry_state->started_recv_initial_metadata) {
2401 continue;
2402 }
2403 if (batch->recv_message && retry_state->completed_recv_message_count <
2404 retry_state->started_recv_message_count) {
2405 continue;
2406 }
2407 if (batch->recv_trailing_metadata &&
2408 retry_state->started_recv_trailing_metadata) {
Mark D. Rothde077ac2018-04-12 08:05:44 -07002409 // If we previously completed a recv_trailing_metadata op
2410 // initiated by start_internal_recv_trailing_metadata(), use the
2411 // result of that instead of trying to re-start this op.
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07002412 if (GPR_UNLIKELY((retry_state->recv_trailing_metadata_internal_batch !=
2413 nullptr))) {
Mark D. Rothde077ac2018-04-12 08:05:44 -07002414 // If the batch completed, then trigger the completion callback
2415 // directly, so that we return the previously returned results to
2416 // the application. Otherwise, just unref the internally
2417 // started subchannel batch, since we'll propagate the
2418 // completion when it completes.
2419 if (retry_state->completed_recv_trailing_metadata) {
Mark D. Rothde077ac2018-04-12 08:05:44 -07002420 // Batches containing recv_trailing_metadata always succeed.
Mark D. Roth817d28f2018-06-14 09:44:58 -07002421 closures->Add(
Mark D. Roth5bacf2e2018-06-19 08:19:58 -07002422 &retry_state->recv_trailing_metadata_ready, GRPC_ERROR_NONE,
Mark D. Roth817d28f2018-06-14 09:44:58 -07002423 "re-executing recv_trailing_metadata_ready to propagate "
2424 "internally triggered result");
Mark D. Rothde077ac2018-04-12 08:05:44 -07002425 } else {
2426 batch_data_unref(retry_state->recv_trailing_metadata_internal_batch);
2427 }
2428 retry_state->recv_trailing_metadata_internal_batch = nullptr;
2429 }
Mark D. Roth718c8342018-02-28 13:00:04 -08002430 continue;
2431 }
2432 // If we're not retrying, just send the batch as-is.
2433 if (calld->method_params == nullptr ||
2434 calld->method_params->retry_policy() == nullptr ||
2435 calld->retry_committed) {
Mark D. Roth817d28f2018-06-14 09:44:58 -07002436 add_closure_for_subchannel_batch(elem, batch, closures);
Mark D. Roth718c8342018-02-28 13:00:04 -08002437 pending_batch_clear(calld, pending);
2438 continue;
2439 }
2440 // Create batch with the right number of callbacks.
Mark D. Roth817d28f2018-06-14 09:44:58 -07002441 const bool has_send_ops = batch->send_initial_metadata ||
2442 batch->send_message ||
2443 batch->send_trailing_metadata;
2444 const int num_callbacks = has_send_ops + batch->recv_initial_metadata +
2445 batch->recv_message +
2446 batch->recv_trailing_metadata;
2447 subchannel_batch_data* batch_data = batch_data_create(
2448 elem, num_callbacks, has_send_ops /* set_on_complete */);
Mark D. Roth718c8342018-02-28 13:00:04 -08002449 // Cache send ops if needed.
2450 maybe_cache_send_ops_for_batch(calld, pending);
2451 // send_initial_metadata.
2452 if (batch->send_initial_metadata) {
2453 add_retriable_send_initial_metadata_op(calld, retry_state, batch_data);
2454 }
2455 // send_message.
2456 if (batch->send_message) {
2457 add_retriable_send_message_op(elem, retry_state, batch_data);
2458 }
2459 // send_trailing_metadata.
2460 if (batch->send_trailing_metadata) {
2461 add_retriable_send_trailing_metadata_op(calld, retry_state, batch_data);
2462 }
2463 // recv_initial_metadata.
2464 if (batch->recv_initial_metadata) {
2465 // recv_flags is only used on the server side.
2466 GPR_ASSERT(batch->payload->recv_initial_metadata.recv_flags == nullptr);
2467 add_retriable_recv_initial_metadata_op(calld, retry_state, batch_data);
2468 }
2469 // recv_message.
2470 if (batch->recv_message) {
2471 add_retriable_recv_message_op(calld, retry_state, batch_data);
2472 }
2473 // recv_trailing_metadata.
2474 if (batch->recv_trailing_metadata) {
Mark D. Roth718c8342018-02-28 13:00:04 -08002475 add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
2476 }
Mark D. Roth817d28f2018-06-14 09:44:58 -07002477 add_closure_for_subchannel_batch(elem, &batch_data->batch, closures);
Mark D. Roth4f9e0032018-05-24 09:30:09 -07002478 // Track number of pending subchannel send batches.
2479 // If this is the first one, take a ref to the call stack.
2480 if (batch->send_initial_metadata || batch->send_message ||
2481 batch->send_trailing_metadata) {
2482 if (calld->num_pending_retriable_subchannel_send_batches == 0) {
2483 GRPC_CALL_STACK_REF(calld->owning_call, "subchannel_send_batches");
2484 }
2485 ++calld->num_pending_retriable_subchannel_send_batches;
2486 }
Mark D. Roth718c8342018-02-28 13:00:04 -08002487 }
2488}
2489
2490// Constructs and starts whatever subchannel batches are needed on the
2491// subchannel call.
2492static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) {
2493 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2494 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2495 call_data* calld = static_cast<call_data*>(elem->call_data);
2496 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07002497 gpr_log(GPR_INFO, "chand=%p calld=%p: constructing retriable batches",
Mark D. Roth718c8342018-02-28 13:00:04 -08002498 chand, calld);
2499 }
2500 subchannel_call_retry_state* retry_state =
2501 static_cast<subchannel_call_retry_state*>(
2502 grpc_connected_subchannel_call_get_parent_data(
2503 calld->subchannel_call));
Mark D. Rothde077ac2018-04-12 08:05:44 -07002504 // Construct list of closures to execute, one for each pending batch.
Mark D. Roth817d28f2018-06-14 09:44:58 -07002505 grpc_core::CallCombinerClosureList closures;
Mark D. Roth718c8342018-02-28 13:00:04 -08002506 // Replay previously-returned send_* ops if needed.
2507 subchannel_batch_data* replay_batch_data =
2508 maybe_create_subchannel_batch_for_replay(elem, retry_state);
2509 if (replay_batch_data != nullptr) {
Mark D. Roth817d28f2018-06-14 09:44:58 -07002510 add_closure_for_subchannel_batch(elem, &replay_batch_data->batch,
2511 &closures);
Mark D. Roth4f9e0032018-05-24 09:30:09 -07002512 // Track number of pending subchannel send batches.
2513 // If this is the first one, take a ref to the call stack.
2514 if (calld->num_pending_retriable_subchannel_send_batches == 0) {
2515 GRPC_CALL_STACK_REF(calld->owning_call, "subchannel_send_batches");
2516 }
2517 ++calld->num_pending_retriable_subchannel_send_batches;
Mark D. Roth718c8342018-02-28 13:00:04 -08002518 }
2519 // Now add pending batches.
Mark D. Roth817d28f2018-06-14 09:44:58 -07002520 add_subchannel_batches_for_pending_batches(elem, retry_state, &closures);
Mark D. Roth718c8342018-02-28 13:00:04 -08002521 // Start batches on subchannel call.
Mark D. Roth718c8342018-02-28 13:00:04 -08002522 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07002523 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08002524 "chand=%p calld=%p: starting %" PRIuPTR
2525 " retriable batches on subchannel_call=%p",
Mark D. Roth817d28f2018-06-14 09:44:58 -07002526 chand, calld, closures.size(), calld->subchannel_call);
Mark D. Roth718c8342018-02-28 13:00:04 -08002527 }
Mark D. Roth817d28f2018-06-14 09:44:58 -07002528 // Note: This will yield the call combiner.
2529 closures.RunClosures(calld->call_combiner);
Mark D. Roth718c8342018-02-28 13:00:04 -08002530}
2531
2532//
2533// LB pick
2534//
2535
2536static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) {
2537 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2538 call_data* calld = static_cast<call_data*>(elem->call_data);
2539 const size_t parent_data_size =
2540 calld->enable_retries ? sizeof(subchannel_call_retry_state) : 0;
David Garcia Quintasbaf1ac72018-01-09 14:24:32 -08002541 const grpc_core::ConnectedSubchannel::CallArgs call_args = {
Mark D. Rothc0febd32018-01-09 10:25:24 -08002542 calld->pollent, // pollent
2543 calld->path, // path
2544 calld->call_start_time, // start_time
2545 calld->deadline, // deadline
2546 calld->arena, // arena
2547 calld->pick.subchannel_call_context, // context
Mark D. Roth718c8342018-02-28 13:00:04 -08002548 calld->call_combiner, // call_combiner
2549 parent_data_size // parent_data_size
Yash Tibrewald8b84a22017-09-25 13:38:03 -07002550 };
David Garcia Quintas70fbe622018-01-09 19:27:46 -08002551 grpc_error* new_error = calld->pick.connected_subchannel->CreateCall(
David Garcia Quintasbaf1ac72018-01-09 14:24:32 -08002552 call_args, &calld->subchannel_call);
Craig Tiller6014e8a2017-10-16 13:50:29 -07002553 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07002554 gpr_log(GPR_INFO, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
Mark D. Roth76e264b2017-08-25 09:03:33 -07002555 chand, calld, calld->subchannel_call, grpc_error_string(new_error));
Mark D. Roth60751fe2017-07-07 12:50:33 -07002556 }
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07002557 if (GPR_UNLIKELY(new_error != GRPC_ERROR_NONE)) {
Mark D. Roth0ca0be82017-06-20 07:49:33 -07002558 new_error = grpc_error_add_child(new_error, error);
Mark D. Roth718c8342018-02-28 13:00:04 -08002559 pending_batches_fail(elem, new_error, true /* yield_call_combiner */);
Mark D. Roth0ca0be82017-06-20 07:49:33 -07002560 } else {
Mark D. Roth718c8342018-02-28 13:00:04 -08002561 if (parent_data_size > 0) {
2562 subchannel_call_retry_state* retry_state =
2563 static_cast<subchannel_call_retry_state*>(
2564 grpc_connected_subchannel_call_get_parent_data(
2565 calld->subchannel_call));
2566 retry_state->batch_payload.context = calld->pick.subchannel_call_context;
2567 }
2568 pending_batches_resume(elem);
Craig Tiller11c17d42017-03-13 13:36:34 -07002569 }
Mark D. Roth0ca0be82017-06-20 07:49:33 -07002570 GRPC_ERROR_UNREF(error);
Craig Tiller11c17d42017-03-13 13:36:34 -07002571}
2572
Mark D. Rothb2929602017-09-11 09:31:11 -07002573// Invoked when a pick is completed, on both success or failure.
Mark D. Roth718c8342018-02-28 13:00:04 -08002574static void pick_done(void* arg, grpc_error* error) {
2575 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
Noah Eisenbe82e642018-02-09 09:16:55 -08002576 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Mark D. Roth718c8342018-02-28 13:00:04 -08002577 call_data* calld = static_cast<call_data*>(elem->call_data);
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07002578 if (GPR_UNLIKELY(calld->pick.connected_subchannel == nullptr)) {
Mark D. Roth0ca0be82017-06-20 07:49:33 -07002579 // Failed to create subchannel.
Mark D. Roth718c8342018-02-28 13:00:04 -08002580 // If there was no error, this is an LB policy drop, in which case
2581 // we return an error; otherwise, we may retry.
2582 grpc_status_code status = GRPC_STATUS_OK;
2583 grpc_error_get_status(error, calld->deadline, &status, nullptr, nullptr,
2584 nullptr);
2585 if (error == GRPC_ERROR_NONE || !calld->enable_retries ||
2586 !maybe_retry(elem, nullptr /* batch_data */, status,
2587 nullptr /* server_pushback_md */)) {
2588 grpc_error* new_error =
2589 error == GRPC_ERROR_NONE
2590 ? GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2591 "Call dropped by load balancing policy")
2592 : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2593 "Failed to create subchannel", &error, 1);
2594 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07002595 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08002596 "chand=%p calld=%p: failed to create subchannel: error=%s",
2597 chand, calld, grpc_error_string(new_error));
2598 }
2599 pending_batches_fail(elem, new_error, true /* yield_call_combiner */);
Mark D. Roth60751fe2017-07-07 12:50:33 -07002600 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -07002601 } else {
Mark D. Roth9fe284e2016-09-12 11:22:27 -07002602 /* Create call on subchannel. */
Mark D. Roth718c8342018-02-28 13:00:04 -08002603 create_subchannel_call(elem, GRPC_ERROR_REF(error));
Mark D. Roth2a5959f2016-09-01 08:20:27 -07002604 }
Mark D. Roth718c8342018-02-28 13:00:04 -08002605}
2606
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002607static void maybe_add_call_to_channel_interested_parties_locked(
2608 grpc_call_element* elem) {
2609 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2610 call_data* calld = static_cast<call_data*>(elem->call_data);
2611 if (!calld->pollent_added_to_interested_parties) {
2612 calld->pollent_added_to_interested_parties = true;
2613 grpc_polling_entity_add_to_pollset_set(calld->pollent,
2614 chand->interested_parties);
2615 }
2616}
2617
2618static void maybe_del_call_from_channel_interested_parties_locked(
2619 grpc_call_element* elem) {
2620 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2621 call_data* calld = static_cast<call_data*>(elem->call_data);
2622 if (calld->pollent_added_to_interested_parties) {
2623 calld->pollent_added_to_interested_parties = false;
2624 grpc_polling_entity_del_from_pollset_set(calld->pollent,
2625 chand->interested_parties);
2626 }
2627}
2628
Mark D. Roth718c8342018-02-28 13:00:04 -08002629// Invoked when a pick is completed to leave the client_channel combiner
2630// and continue processing in the call combiner.
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002631// If needed, removes the call's polling entity from chand->interested_parties.
Mark D. Roth718c8342018-02-28 13:00:04 -08002632static void pick_done_locked(grpc_call_element* elem, grpc_error* error) {
2633 call_data* calld = static_cast<call_data*>(elem->call_data);
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002634 maybe_del_call_from_channel_interested_parties_locked(elem);
Mark D. Roth718c8342018-02-28 13:00:04 -08002635 GRPC_CLOSURE_INIT(&calld->pick_closure, pick_done, elem,
2636 grpc_schedule_on_exec_ctx);
2637 GRPC_CLOSURE_SCHED(&calld->pick_closure, error);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07002638}
2639
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002640namespace grpc_core {
Mark D. Rothb2929602017-09-11 09:31:11 -07002641
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002642// Performs subchannel pick via LB policy.
2643class LbPicker {
2644 public:
2645 // Starts a pick on chand->lb_policy.
2646 static void StartLocked(grpc_call_element* elem) {
2647 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2648 call_data* calld = static_cast<call_data*>(elem->call_data);
Craig Tiller6014e8a2017-10-16 13:50:29 -07002649 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002650 gpr_log(GPR_INFO, "chand=%p calld=%p: starting pick on lb_policy=%p",
Mark D. Rothc8875492018-02-20 08:33:48 -08002651 chand, calld, chand->lb_policy.get());
Mark D. Rothb2929602017-09-11 09:31:11 -07002652 }
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002653 // If this is a retry, use the send_initial_metadata payload that
2654 // we've cached; otherwise, use the pending batch. The
2655 // send_initial_metadata batch will be the first pending batch in the
2656 // list, as set by get_batch_index() above.
2657 calld->pick.initial_metadata =
2658 calld->seen_send_initial_metadata
2659 ? &calld->send_initial_metadata
2660 : calld->pending_batches[0]
2661 .batch->payload->send_initial_metadata.send_initial_metadata;
2662 calld->pick.initial_metadata_flags =
2663 calld->seen_send_initial_metadata
2664 ? calld->send_initial_metadata_flags
2665 : calld->pending_batches[0]
2666 .batch->payload->send_initial_metadata
2667 .send_initial_metadata_flags;
2668 GRPC_CLOSURE_INIT(&calld->pick_closure, &LbPicker::DoneLocked, elem,
2669 grpc_combiner_scheduler(chand->combiner));
2670 calld->pick.on_complete = &calld->pick_closure;
2671 GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback");
2672 const bool pick_done = chand->lb_policy->PickLocked(&calld->pick);
2673 if (GPR_LIKELY(pick_done)) {
2674 // Pick completed synchronously.
2675 if (grpc_client_channel_trace.enabled()) {
2676 gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed synchronously",
2677 chand, calld);
2678 }
2679 pick_done_locked(elem, GRPC_ERROR_NONE);
2680 GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback");
2681 } else {
2682 // Pick will be returned asynchronously.
2683 // Add the polling entity from call_data to the channel_data's
2684 // interested_parties, so that the I/O of the LB policy can be done
2685 // under it. It will be removed in pick_done_locked().
2686 maybe_add_call_to_channel_interested_parties_locked(elem);
2687 // Request notification on call cancellation.
2688 GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback_cancel");
2689 grpc_call_combiner_set_notify_on_cancel(
2690 calld->call_combiner,
2691 GRPC_CLOSURE_INIT(&calld->pick_cancel_closure,
2692 &LbPicker::CancelLocked, elem,
2693 grpc_combiner_scheduler(chand->combiner)));
2694 }
Mark D. Rothb2929602017-09-11 09:31:11 -07002695 }
Mark D. Rothb2929602017-09-11 09:31:11 -07002696
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002697 private:
2698 // Callback invoked by LoadBalancingPolicy::PickLocked() for async picks.
2699 // Unrefs the LB policy and invokes pick_done_locked().
2700 static void DoneLocked(void* arg, grpc_error* error) {
2701 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2702 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2703 call_data* calld = static_cast<call_data*>(elem->call_data);
2704 if (grpc_client_channel_trace.enabled()) {
2705 gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed asynchronously",
2706 chand, calld);
2707 }
2708 pick_done_locked(elem, GRPC_ERROR_REF(error));
2709 GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback");
Mark D. Rothb2929602017-09-11 09:31:11 -07002710 }
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002711
2712 // Note: This runs under the client_channel combiner, but will NOT be
2713 // holding the call combiner.
2714 static void CancelLocked(void* arg, grpc_error* error) {
2715 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2716 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2717 call_data* calld = static_cast<call_data*>(elem->call_data);
2718 // Note: chand->lb_policy may have changed since we started our pick,
2719 // in which case we will be cancelling the pick on a policy other than
2720 // the one we started it on. However, this will just be a no-op.
2721 if (GPR_UNLIKELY(error != GRPC_ERROR_NONE && chand->lb_policy != nullptr)) {
2722 if (grpc_client_channel_trace.enabled()) {
2723 gpr_log(GPR_INFO,
2724 "chand=%p calld=%p: cancelling pick from LB policy %p", chand,
2725 calld, chand->lb_policy.get());
2726 }
2727 chand->lb_policy->CancelPickLocked(&calld->pick, GRPC_ERROR_REF(error));
2728 }
2729 GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback_cancel");
2730 }
2731};
2732
2733} // namespace grpc_core
Mark D. Rothb2929602017-09-11 09:31:11 -07002734
Mark D. Roth718c8342018-02-28 13:00:04 -08002735// Applies service config to the call. Must be invoked once we know
2736// that the resolver has returned results to the channel.
2737static void apply_service_config_to_call_locked(grpc_call_element* elem) {
2738 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2739 call_data* calld = static_cast<call_data*>(elem->call_data);
2740 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07002741 gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call",
Mark D. Roth718c8342018-02-28 13:00:04 -08002742 chand, calld);
2743 }
2744 if (chand->retry_throttle_data != nullptr) {
Mark D. Roth9db86fc2018-03-28 07:42:20 -07002745 calld->retry_throttle_data = chand->retry_throttle_data->Ref();
Mark D. Roth718c8342018-02-28 13:00:04 -08002746 }
2747 if (chand->method_params_table != nullptr) {
2748 calld->method_params = grpc_core::ServiceConfig::MethodConfigTableLookup(
2749 *chand->method_params_table, calld->path);
2750 if (calld->method_params != nullptr) {
2751 // If the deadline from the service config is shorter than the one
2752 // from the client API, reset the deadline timer.
2753 if (chand->deadline_checking_enabled &&
2754 calld->method_params->timeout() != 0) {
2755 const grpc_millis per_method_deadline =
2756 grpc_timespec_to_millis_round_up(calld->call_start_time) +
2757 calld->method_params->timeout();
2758 if (per_method_deadline < calld->deadline) {
2759 calld->deadline = per_method_deadline;
2760 grpc_deadline_state_reset(elem, calld->deadline);
2761 }
2762 }
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002763 // If the service config set wait_for_ready and the application
2764 // did not explicitly set it, use the value from the service config.
2765 uint32_t* send_initial_metadata_flags =
2766 &calld->pending_batches[0]
2767 .batch->payload->send_initial_metadata
2768 .send_initial_metadata_flags;
2769 if (GPR_UNLIKELY(
2770 calld->method_params->wait_for_ready() !=
2771 ClientChannelMethodParams::WAIT_FOR_READY_UNSET &&
2772 !(*send_initial_metadata_flags &
2773 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET))) {
2774 if (calld->method_params->wait_for_ready() ==
2775 ClientChannelMethodParams::WAIT_FOR_READY_TRUE) {
2776 *send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
2777 } else {
2778 *send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
2779 }
2780 }
Mark D. Roth718c8342018-02-28 13:00:04 -08002781 }
2782 }
2783 // If no retry policy, disable retries.
2784 // TODO(roth): Remove this when adding support for transparent retries.
2785 if (calld->method_params == nullptr ||
2786 calld->method_params->retry_policy() == nullptr) {
2787 calld->enable_retries = false;
2788 }
2789}
2790
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002791// Invoked once resolver results are available.
2792static void process_service_config_and_start_lb_pick_locked(
2793 grpc_call_element* elem) {
Noah Eisenbe82e642018-02-09 09:16:55 -08002794 call_data* calld = static_cast<call_data*>(elem->call_data);
Mark D. Roth718c8342018-02-28 13:00:04 -08002795 // Only get service config data on the first attempt.
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07002796 if (GPR_LIKELY(calld->num_attempts_completed == 0)) {
Mark D. Roth718c8342018-02-28 13:00:04 -08002797 apply_service_config_to_call_locked(elem);
2798 }
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002799 // Start LB pick.
2800 grpc_core::LbPicker::StartLocked(elem);
Mark D. Rothb2929602017-09-11 09:31:11 -07002801}
Mark D. Roth0ca0be82017-06-20 07:49:33 -07002802
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002803namespace grpc_core {
Craig Tiller577c9b22015-11-02 14:11:15 -08002804
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002805// Handles waiting for a resolver result.
2806// Used only for the first call on an idle channel.
2807class ResolverResultWaiter {
2808 public:
2809 explicit ResolverResultWaiter(grpc_call_element* elem) : elem_(elem) {
2810 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2811 call_data* calld = static_cast<call_data*>(elem->call_data);
Craig Tiller6014e8a2017-10-16 13:50:29 -07002812 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002813 gpr_log(GPR_INFO,
2814 "chand=%p calld=%p: deferring pick pending resolver result",
Mark D. Rothb2b9a0f2017-09-01 09:06:47 -07002815 chand, calld);
2816 }
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002817 // Add closure to be run when a resolver result is available.
2818 GRPC_CLOSURE_INIT(&done_closure_, &ResolverResultWaiter::DoneLocked, this,
2819 grpc_combiner_scheduler(chand->combiner));
2820 AddToWaitingList();
2821 // Set cancellation closure, so that we abort if the call is cancelled.
2822 GRPC_CLOSURE_INIT(&cancel_closure_, &ResolverResultWaiter::CancelLocked,
2823 this, grpc_combiner_scheduler(chand->combiner));
2824 grpc_call_combiner_set_notify_on_cancel(calld->call_combiner,
2825 &cancel_closure_);
2826 }
2827
2828 private:
2829 // Adds closure_ to chand->waiting_for_resolver_result_closures.
2830 void AddToWaitingList() {
2831 channel_data* chand = static_cast<channel_data*>(elem_->channel_data);
2832 grpc_closure_list_append(&chand->waiting_for_resolver_result_closures,
2833 &done_closure_, GRPC_ERROR_NONE);
2834 }
2835
2836 // Invoked when a resolver result is available.
2837 static void DoneLocked(void* arg, grpc_error* error) {
2838 ResolverResultWaiter* self = static_cast<ResolverResultWaiter*>(arg);
2839 // If CancelLocked() has already run, delete ourselves without doing
2840 // anything. Note that the call stack may have already been destroyed,
2841 // so it's not safe to access anything in elem_.
2842 if (GPR_UNLIKELY(self->finished_)) {
Mark D. Rothe63e06d2018-03-23 08:12:11 -07002843 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002844 gpr_log(GPR_INFO, "call cancelled before resolver result");
2845 }
2846 Delete(self);
2847 return;
2848 }
2849 // Otherwise, process the resolver result.
2850 grpc_call_element* elem = self->elem_;
2851 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2852 call_data* calld = static_cast<call_data*>(elem->call_data);
2853 if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
2854 if (grpc_client_channel_trace.enabled()) {
2855 gpr_log(GPR_INFO, "chand=%p calld=%p: resolver failed to return data",
Mark D. Rothe63e06d2018-03-23 08:12:11 -07002856 chand, calld);
2857 }
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002858 pick_done_locked(elem, GRPC_ERROR_REF(error));
2859 } else if (GPR_UNLIKELY(chand->resolver == nullptr)) {
2860 // Shutting down.
2861 if (grpc_client_channel_trace.enabled()) {
2862 gpr_log(GPR_INFO, "chand=%p calld=%p: resolver disconnected", chand,
2863 calld);
2864 }
2865 pick_done_locked(elem,
2866 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
2867 } else if (GPR_UNLIKELY(chand->lb_policy == nullptr)) {
2868 // Transient resolver failure.
2869 // If call has wait_for_ready=true, try again; otherwise, fail.
2870 uint32_t send_initial_metadata_flags =
2871 calld->seen_send_initial_metadata
2872 ? calld->send_initial_metadata_flags
2873 : calld->pending_batches[0]
2874 .batch->payload->send_initial_metadata
2875 .send_initial_metadata_flags;
2876 if (send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) {
2877 if (grpc_client_channel_trace.enabled()) {
2878 gpr_log(GPR_INFO,
2879 "chand=%p calld=%p: resolver returned but no LB policy; "
2880 "wait_for_ready=true; trying again",
2881 chand, calld);
2882 }
2883 // Re-add ourselves to the waiting list.
2884 self->AddToWaitingList();
2885 // Return early so that we don't set finished_ to true below.
2886 return;
2887 } else {
2888 if (grpc_client_channel_trace.enabled()) {
2889 gpr_log(GPR_INFO,
2890 "chand=%p calld=%p: resolver returned but no LB policy; "
2891 "wait_for_ready=false; failing",
2892 chand, calld);
2893 }
2894 pick_done_locked(
2895 elem,
2896 grpc_error_set_int(
2897 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Name resolution failure"),
2898 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
2899 }
Mark D. Rothe63e06d2018-03-23 08:12:11 -07002900 } else {
2901 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002902 gpr_log(GPR_INFO, "chand=%p calld=%p: resolver returned, doing LB pick",
Mark D. Rothe63e06d2018-03-23 08:12:11 -07002903 chand, calld);
2904 }
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002905 process_service_config_and_start_lb_pick_locked(elem);
Mark D. Rothe63e06d2018-03-23 08:12:11 -07002906 }
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002907 self->finished_ = true;
Craig Tiller577c9b22015-11-02 14:11:15 -08002908 }
Craig Tiller577c9b22015-11-02 14:11:15 -08002909
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002910 // Invoked when the call is cancelled.
2911 // Note: This runs under the client_channel combiner, but will NOT be
2912 // holding the call combiner.
2913 static void CancelLocked(void* arg, grpc_error* error) {
2914 ResolverResultWaiter* self = static_cast<ResolverResultWaiter*>(arg);
2915 // If DoneLocked() has already run, delete ourselves without doing anything.
2916 if (GPR_LIKELY(self->finished_)) {
2917 Delete(self);
2918 return;
2919 }
2920 // If we are being cancelled, immediately invoke pick_done_locked()
2921 // to propagate the error back to the caller.
2922 if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
2923 grpc_call_element* elem = self->elem_;
2924 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2925 call_data* calld = static_cast<call_data*>(elem->call_data);
2926 if (grpc_client_channel_trace.enabled()) {
2927 gpr_log(GPR_INFO,
2928 "chand=%p calld=%p: cancelling call waiting for name "
2929 "resolution",
2930 chand, calld);
2931 }
2932 // Note: Although we are not in the call combiner here, we are
2933 // basically stealing the call combiner from the pending pick, so
2934 // it's safe to call pick_done_locked() here -- we are essentially
2935 // calling it here instead of calling it in DoneLocked().
2936 pick_done_locked(elem, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2937 "Pick cancelled", &error, 1));
2938 }
2939 self->finished_ = true;
Mark D. Roth64a317c2017-05-02 08:27:08 -07002940 }
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002941
2942 grpc_call_element* elem_;
2943 grpc_closure done_closure_;
2944 grpc_closure cancel_closure_;
2945 bool finished_ = false;
2946};
2947
2948} // namespace grpc_core
Mark D. Roth60751fe2017-07-07 12:50:33 -07002949
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002950static void start_pick_locked(void* arg, grpc_error* ignored) {
Noah Eisenbe82e642018-02-09 09:16:55 -08002951 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2952 call_data* calld = static_cast<call_data*>(elem->call_data);
2953 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
David Garcia Quintasbe1b7f92018-01-12 14:01:38 -08002954 GPR_ASSERT(calld->pick.connected_subchannel == nullptr);
Mark D. Roth718c8342018-02-28 13:00:04 -08002955 GPR_ASSERT(calld->subchannel_call == nullptr);
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07002956 if (GPR_LIKELY(chand->lb_policy != nullptr)) {
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002957 // We already have resolver results, so process the service config
2958 // and start an LB pick.
2959 process_service_config_and_start_lb_pick_locked(elem);
2960 } else if (GPR_UNLIKELY(chand->resolver == nullptr)) {
2961 pick_done_locked(elem,
2962 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
Mark D. Roth76e264b2017-08-25 09:03:33 -07002963 } else {
Mark D. Rothb2929602017-09-11 09:31:11 -07002964 // We do not yet have an LB policy, so wait for a resolver result.
Yash Tibrewal137eb932018-05-23 15:07:39 -07002965 if (GPR_UNLIKELY(!chand->started_resolving)) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002966 start_resolving_locked(chand);
Mark D. Rothb2929602017-09-11 09:31:11 -07002967 }
Mark D. Roth7e0e2022018-06-01 12:04:16 -07002968 // Create a new waiter, which will delete itself when done.
2969 grpc_core::New<grpc_core::ResolverResultWaiter>(elem);
2970 // Add the polling entity from call_data to the channel_data's
2971 // interested_parties, so that the I/O of the resolver can be done
2972 // under it. It will be removed in pick_done_locked().
2973 maybe_add_call_to_channel_interested_parties_locked(elem);
Mark D. Roth2a5959f2016-09-01 08:20:27 -07002974 }
Craig Tillera11bfc82017-02-14 09:56:33 -08002975}
2976
Mark D. Roth718c8342018-02-28 13:00:04 -08002977//
2978// filter call vtable functions
2979//
Mark D. Rothd6d192d2017-02-23 08:58:42 -08002980
Craig Tillere1b51da2017-03-31 15:44:33 -07002981static void cc_start_transport_stream_op_batch(
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002982 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
yang-gce1cfea2018-01-31 15:59:50 -08002983 GPR_TIMER_SCOPE("cc_start_transport_stream_op_batch", 0);
Noah Eisenbe82e642018-02-09 09:16:55 -08002984 call_data* calld = static_cast<call_data*>(elem->call_data);
2985 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Yash Tibrewalcff3f9f2018-05-21 15:51:47 -07002986 if (GPR_LIKELY(chand->deadline_checking_enabled)) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -08002987 grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch);
Craig Tiller3be7dd02017-04-03 14:30:03 -07002988 }
Mark D. Roth76e264b2017-08-25 09:03:33 -07002989 // If we've previously been cancelled, immediately fail any new batches.
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07002990 if (GPR_UNLIKELY(calld->cancel_error != GRPC_ERROR_NONE)) {
Craig Tiller6014e8a2017-10-16 13:50:29 -07002991 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07002992 gpr_log(GPR_INFO, "chand=%p calld=%p: failing batch with error: %s",
Mark D. Roth718c8342018-02-28 13:00:04 -08002993 chand, calld, grpc_error_string(calld->cancel_error));
Mark D. Roth76e264b2017-08-25 09:03:33 -07002994 }
Mark D. Roth718c8342018-02-28 13:00:04 -08002995 // Note: This will release the call combiner.
Mark D. Roth76e264b2017-08-25 09:03:33 -07002996 grpc_transport_stream_op_batch_finish_with_failure(
Mark D. Roth718c8342018-02-28 13:00:04 -08002997 batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner);
yang-gce1cfea2018-01-31 15:59:50 -08002998 return;
Mark D. Roth76e264b2017-08-25 09:03:33 -07002999 }
Mark D. Roth718c8342018-02-28 13:00:04 -08003000 // Handle cancellation.
Yash Tibrewalcff3f9f2018-05-21 15:51:47 -07003001 if (GPR_UNLIKELY(batch->cancel_stream)) {
Mark D. Roth76e264b2017-08-25 09:03:33 -07003002 // Stash a copy of cancel_error in our call data, so that we can use
3003 // it for subsequent operations. This ensures that if the call is
3004 // cancelled before any batches are passed down (e.g., if the deadline
3005 // is in the past when the call starts), we can return the right
3006 // error to the caller when the first batch does get passed down.
Mark D. Roth718c8342018-02-28 13:00:04 -08003007 GRPC_ERROR_UNREF(calld->cancel_error);
3008 calld->cancel_error =
3009 GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
Craig Tiller6014e8a2017-10-16 13:50:29 -07003010 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07003011 gpr_log(GPR_INFO, "chand=%p calld=%p: recording cancel_error=%s", chand,
Mark D. Roth718c8342018-02-28 13:00:04 -08003012 calld, grpc_error_string(calld->cancel_error));
Mark D. Roth76e264b2017-08-25 09:03:33 -07003013 }
Mark D. Roth718c8342018-02-28 13:00:04 -08003014 // If we do not have a subchannel call (i.e., a pick has not yet
3015 // been started), fail all pending batches. Otherwise, send the
3016 // cancellation down to the subchannel call.
3017 if (calld->subchannel_call == nullptr) {
3018 pending_batches_fail(elem, GRPC_ERROR_REF(calld->cancel_error),
3019 false /* yield_call_combiner */);
3020 // Note: This will release the call combiner.
3021 grpc_transport_stream_op_batch_finish_with_failure(
3022 batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner);
Mark D. Roth76e264b2017-08-25 09:03:33 -07003023 } else {
Mark D. Roth718c8342018-02-28 13:00:04 -08003024 // Note: This will release the call combiner.
3025 grpc_subchannel_call_process_op(calld->subchannel_call, batch);
Mark D. Roth76e264b2017-08-25 09:03:33 -07003026 }
yang-gce1cfea2018-01-31 15:59:50 -08003027 return;
Mark D. Roth76e264b2017-08-25 09:03:33 -07003028 }
Mark D. Roth718c8342018-02-28 13:00:04 -08003029 // Add the batch to the pending list.
3030 pending_batches_add(elem, batch);
Mark D. Roth76e264b2017-08-25 09:03:33 -07003031 // Check if we've already gotten a subchannel call.
3032 // Note that once we have completed the pick, we do not need to enter
3033 // the channel combiner, which is more efficient (especially for
3034 // streaming calls).
Craig Tiller4782d922017-11-10 09:53:21 -08003035 if (calld->subchannel_call != nullptr) {
Craig Tiller6014e8a2017-10-16 13:50:29 -07003036 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07003037 gpr_log(GPR_INFO,
Mark D. Roth718c8342018-02-28 13:00:04 -08003038 "chand=%p calld=%p: starting batch on subchannel_call=%p", chand,
Mark D. Roth76e264b2017-08-25 09:03:33 -07003039 calld, calld->subchannel_call);
Mark D. Roth60751fe2017-07-07 12:50:33 -07003040 }
Mark D. Roth718c8342018-02-28 13:00:04 -08003041 pending_batches_resume(elem);
yang-gce1cfea2018-01-31 15:59:50 -08003042 return;
Mark D. Roth2a5959f2016-09-01 08:20:27 -07003043 }
Mark D. Roth76e264b2017-08-25 09:03:33 -07003044 // We do not yet have a subchannel call.
Mark D. Roth76e264b2017-08-25 09:03:33 -07003045 // For batches containing a send_initial_metadata op, enter the channel
3046 // combiner to start a pick.
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07003047 if (GPR_LIKELY(batch->send_initial_metadata)) {
Craig Tiller6014e8a2017-10-16 13:50:29 -07003048 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07003049 gpr_log(GPR_INFO, "chand=%p calld=%p: entering client_channel combiner",
Mark D. Rothb2929602017-09-11 09:31:11 -07003050 chand, calld);
Mark D. Roth76e264b2017-08-25 09:03:33 -07003051 }
3052 GRPC_CLOSURE_SCHED(
Mark D. Roth76e264b2017-08-25 09:03:33 -07003053 GRPC_CLOSURE_INIT(&batch->handler_private.closure, start_pick_locked,
3054 elem, grpc_combiner_scheduler(chand->combiner)),
3055 GRPC_ERROR_NONE);
3056 } else {
3057 // For all other batches, release the call combiner.
Craig Tiller6014e8a2017-10-16 13:50:29 -07003058 if (grpc_client_channel_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07003059 gpr_log(GPR_INFO,
Mark D. Roth76e264b2017-08-25 09:03:33 -07003060 "chand=%p calld=%p: saved batch, yeilding call combiner", chand,
3061 calld);
3062 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003063 GRPC_CALL_COMBINER_STOP(calld->call_combiner,
Mark D. Roth76e264b2017-08-25 09:03:33 -07003064 "batch does not include send_initial_metadata");
Mark D. Roth60751fe2017-07-07 12:50:33 -07003065 }
Mark D. Roth2a5959f2016-09-01 08:20:27 -07003066}
3067
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08003068/* Constructor for call_data */
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003069static grpc_error* cc_init_call_elem(grpc_call_element* elem,
Craig Tillerbaa14a92017-11-03 09:09:36 -07003070 const grpc_call_element_args* args) {
Noah Eisenbe82e642018-02-09 09:16:55 -08003071 call_data* calld = static_cast<call_data*>(elem->call_data);
3072 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Mark D. Rothe40dd292016-10-05 14:58:37 -07003073 // Initialize data members.
Craig Tiller7c70b6c2017-01-23 07:48:42 -08003074 calld->path = grpc_slice_ref_internal(args->path);
Mark D. Rothff08f332016-10-14 13:01:01 -07003075 calld->call_start_time = args->start_time;
Craig Tiller89c14282017-07-19 15:32:27 -07003076 calld->deadline = args->deadline;
Craig Tillerd426cac2017-03-13 12:30:45 -07003077 calld->arena = args->arena;
Mark D. Roth66f3d2b2017-09-01 09:02:17 -07003078 calld->owning_call = args->call_stack;
Mark D. Roth76e264b2017-08-25 09:03:33 -07003079 calld->call_combiner = args->call_combiner;
Yash Tibrewalcff3f9f2018-05-21 15:51:47 -07003080 if (GPR_LIKELY(chand->deadline_checking_enabled)) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003081 grpc_deadline_state_init(elem, args->call_stack, args->call_combiner,
3082 calld->deadline);
Craig Tiller3be7dd02017-04-03 14:30:03 -07003083 }
Mark D. Roth718c8342018-02-28 13:00:04 -08003084 calld->enable_retries = chand->enable_retries;
Mark D. Rothefcd45b2018-03-28 10:49:59 -07003085 calld->send_messages.Init();
Mark D. Roth0badbe82016-06-23 10:15:12 -07003086 return GRPC_ERROR_NONE;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08003087}
3088
3089/* Destructor for call_data */
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003090static void cc_destroy_call_elem(grpc_call_element* elem,
Craig Tillerbaa14a92017-11-03 09:09:36 -07003091 const grpc_call_final_info* final_info,
3092 grpc_closure* then_schedule_closure) {
Noah Eisenbe82e642018-02-09 09:16:55 -08003093 call_data* calld = static_cast<call_data*>(elem->call_data);
3094 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Yash Tibrewalcff3f9f2018-05-21 15:51:47 -07003095 if (GPR_LIKELY(chand->deadline_checking_enabled)) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003096 grpc_deadline_state_destroy(elem);
Craig Tiller3be7dd02017-04-03 14:30:03 -07003097 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003098 grpc_slice_unref_internal(calld->path);
Mark D. Roth9db86fc2018-03-28 07:42:20 -07003099 calld->retry_throttle_data.reset();
Mark D. Roth3e7f2df2018-02-26 13:17:06 -08003100 calld->method_params.reset();
Mark D. Roth718c8342018-02-28 13:00:04 -08003101 GRPC_ERROR_UNREF(calld->cancel_error);
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07003102 if (GPR_LIKELY(calld->subchannel_call != nullptr)) {
Mark D. Roth76e264b2017-08-25 09:03:33 -07003103 grpc_subchannel_call_set_cleanup_closure(calld->subchannel_call,
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07003104 then_schedule_closure);
Craig Tiller4782d922017-11-10 09:53:21 -08003105 then_schedule_closure = nullptr;
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003106 GRPC_SUBCHANNEL_CALL_UNREF(calld->subchannel_call,
Craig Tillerf7c8c9f2017-05-17 15:22:05 -07003107 "client_channel_destroy_call");
Mark D. Roth4c0fe492016-08-31 13:51:55 -07003108 }
Mark D. Roth718c8342018-02-28 13:00:04 -08003109 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
3110 GPR_ASSERT(calld->pending_batches[i].batch == nullptr);
3111 }
Yash Tibrewal8c5b8832018-05-14 11:44:18 -07003112 if (GPR_LIKELY(calld->pick.connected_subchannel != nullptr)) {
David Garcia Quintasdfa28512018-01-11 18:31:13 -08003113 calld->pick.connected_subchannel.reset();
Craig Tiller693d3942016-10-27 16:51:25 -07003114 }
Mark D. Roth09e458c2017-05-02 08:13:26 -07003115 for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) {
Mark D. Rothc0febd32018-01-09 10:25:24 -08003116 if (calld->pick.subchannel_call_context[i].value != nullptr) {
3117 calld->pick.subchannel_call_context[i].destroy(
3118 calld->pick.subchannel_call_context[i].value);
Mark D. Roth09e458c2017-05-02 08:13:26 -07003119 }
3120 }
Mark D. Rothefcd45b2018-03-28 10:49:59 -07003121 calld->send_messages.Destroy();
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003122 GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08003123}
3124
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003125static void cc_set_pollset_or_pollset_set(grpc_call_element* elem,
Craig Tillerbaa14a92017-11-03 09:09:36 -07003126 grpc_polling_entity* pollent) {
Noah Eisenbe82e642018-02-09 09:16:55 -08003127 call_data* calld = static_cast<call_data*>(elem->call_data);
David Garcia Quintas2a50dfe2016-05-31 15:09:12 -07003128 calld->pollent = pollent;
Craig Tiller577c9b22015-11-02 14:11:15 -08003129}
3130
Mark D. Roth2a5959f2016-09-01 08:20:27 -07003131/*************************************************************************
3132 * EXPORTED SYMBOLS
3133 */
3134
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08003135const grpc_channel_filter grpc_client_channel_filter = {
Craig Tillera0f3abd2017-03-31 15:42:16 -07003136 cc_start_transport_stream_op_batch,
Craig Tillerf40df232016-03-25 13:38:14 -07003137 cc_start_transport_op,
3138 sizeof(call_data),
Mark D. Roth2a5959f2016-09-01 08:20:27 -07003139 cc_init_call_elem,
David Garcia Quintas4afce7e2016-04-18 16:25:17 -07003140 cc_set_pollset_or_pollset_set,
Mark D. Roth2a5959f2016-09-01 08:20:27 -07003141 cc_destroy_call_elem,
Craig Tillerf40df232016-03-25 13:38:14 -07003142 sizeof(channel_data),
Mark D. Roth2a5959f2016-09-01 08:20:27 -07003143 cc_init_channel_elem,
3144 cc_destroy_channel_elem,
Mark D. Rothb2d24882016-10-27 15:44:07 -07003145 cc_get_channel_info,
Craig Tillerf40df232016-03-25 13:38:14 -07003146 "client-channel",
Craig Tiller87d5b192015-04-16 14:37:57 -07003147};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08003148
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003149static void try_to_connect_locked(void* arg, grpc_error* error_ignored) {
Noah Eisenbe82e642018-02-09 09:16:55 -08003150 channel_data* chand = static_cast<channel_data*>(arg);
Craig Tiller4782d922017-11-10 09:53:21 -08003151 if (chand->lb_policy != nullptr) {
Mark D. Rothc8875492018-02-20 08:33:48 -08003152 chand->lb_policy->ExitIdleLocked();
Craig Tiller613dafa2017-02-09 12:00:43 -08003153 } else {
3154 chand->exit_idle_when_lb_policy_arrives = true;
Craig Tiller4782d922017-11-10 09:53:21 -08003155 if (!chand->started_resolving && chand->resolver != nullptr) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003156 start_resolving_locked(chand);
Craig Tiller613dafa2017-02-09 12:00:43 -08003157 }
3158 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003159 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "try_to_connect");
Craig Tiller613dafa2017-02-09 12:00:43 -08003160}
3161
ncteisen018498a2018-06-29 14:48:05 -07003162void grpc_client_channel_populate_child_refs(
3163 grpc_channel_element* elem, grpc_core::ChildRefsList* child_subchannels,
3164 grpc_core::ChildRefsList* child_channels) {
3165 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
3166 if (chand->lb_policy) {
3167 chand->lb_policy->FillChildRefsForChannelz(child_subchannels,
3168 child_channels);
3169 }
3170}
3171
Craig Tillera82950e2015-09-22 12:33:20 -07003172grpc_connectivity_state grpc_client_channel_check_connectivity_state(
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003173 grpc_channel_element* elem, int try_to_connect) {
Noah Eisenbe82e642018-02-09 09:16:55 -08003174 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Craig Tillera8610c02017-02-14 10:05:11 -08003175 grpc_connectivity_state out =
3176 grpc_connectivity_state_check(&chand->state_tracker);
Craig Tillera82950e2015-09-22 12:33:20 -07003177 if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
Craig Tillerd2e5cfc2017-02-09 13:02:20 -08003178 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect");
ncteisen274bbbe2017-06-08 14:57:11 -07003179 GRPC_CLOSURE_SCHED(
Yash Tibrewal0ee75742017-10-13 16:07:13 -07003180 GRPC_CLOSURE_CREATE(try_to_connect_locked, chand,
3181 grpc_combiner_scheduler(chand->combiner)),
Craig Tiller613dafa2017-02-09 12:00:43 -08003182 GRPC_ERROR_NONE);
Craig Tillera82950e2015-09-22 12:33:20 -07003183 }
Craig Tiller48cb07c2015-07-15 16:16:15 -07003184 return out;
3185}
3186
Alexander Polcync3b1f182017-04-18 13:51:36 -07003187typedef struct external_connectivity_watcher {
Craig Tillerbaa14a92017-11-03 09:09:36 -07003188 channel_data* chand;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07003189 grpc_polling_entity pollent;
Craig Tillerbaa14a92017-11-03 09:09:36 -07003190 grpc_closure* on_complete;
3191 grpc_closure* watcher_timer_init;
3192 grpc_connectivity_state* state;
Craig Tiller86c99582015-11-25 15:22:26 -08003193 grpc_closure my_closure;
Craig Tillerbaa14a92017-11-03 09:09:36 -07003194 struct external_connectivity_watcher* next;
Craig Tiller86c99582015-11-25 15:22:26 -08003195} external_connectivity_watcher;
3196
Craig Tillerbaa14a92017-11-03 09:09:36 -07003197static external_connectivity_watcher* lookup_external_connectivity_watcher(
3198 channel_data* chand, grpc_closure* on_complete) {
Alexander Polcync3b1f182017-04-18 13:51:36 -07003199 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
Craig Tillerbaa14a92017-11-03 09:09:36 -07003200 external_connectivity_watcher* w =
Alexander Polcync3b1f182017-04-18 13:51:36 -07003201 chand->external_connectivity_watcher_list_head;
Craig Tiller4782d922017-11-10 09:53:21 -08003202 while (w != nullptr && w->on_complete != on_complete) {
Alexander Polcync3b1f182017-04-18 13:51:36 -07003203 w = w->next;
3204 }
3205 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
3206 return w;
3207}
3208
3209static void external_connectivity_watcher_list_append(
Craig Tillerbaa14a92017-11-03 09:09:36 -07003210 channel_data* chand, external_connectivity_watcher* w) {
Alexander Polcync3b1f182017-04-18 13:51:36 -07003211 GPR_ASSERT(!lookup_external_connectivity_watcher(chand, w->on_complete));
3212
3213 gpr_mu_lock(&w->chand->external_connectivity_watcher_list_mu);
3214 GPR_ASSERT(!w->next);
3215 w->next = chand->external_connectivity_watcher_list_head;
3216 chand->external_connectivity_watcher_list_head = w;
3217 gpr_mu_unlock(&w->chand->external_connectivity_watcher_list_mu);
3218}
3219
3220static void external_connectivity_watcher_list_remove(
Craig Tillerbaa14a92017-11-03 09:09:36 -07003221 channel_data* chand, external_connectivity_watcher* too_remove) {
Alexander Polcync3b1f182017-04-18 13:51:36 -07003222 GPR_ASSERT(
3223 lookup_external_connectivity_watcher(chand, too_remove->on_complete));
3224 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
3225 if (too_remove == chand->external_connectivity_watcher_list_head) {
3226 chand->external_connectivity_watcher_list_head = too_remove->next;
3227 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
3228 return;
3229 }
Craig Tillerbaa14a92017-11-03 09:09:36 -07003230 external_connectivity_watcher* w =
Alexander Polcync3b1f182017-04-18 13:51:36 -07003231 chand->external_connectivity_watcher_list_head;
Craig Tiller4782d922017-11-10 09:53:21 -08003232 while (w != nullptr) {
Alexander Polcync3b1f182017-04-18 13:51:36 -07003233 if (w->next == too_remove) {
3234 w->next = w->next->next;
3235 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
3236 return;
3237 }
3238 w = w->next;
3239 }
3240 GPR_UNREACHABLE_CODE(return );
3241}
3242
3243int grpc_client_channel_num_external_connectivity_watchers(
Craig Tillerbaa14a92017-11-03 09:09:36 -07003244 grpc_channel_element* elem) {
Noah Eisenbe82e642018-02-09 09:16:55 -08003245 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Alexander Polcync3b1f182017-04-18 13:51:36 -07003246 int count = 0;
3247
3248 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
Craig Tillerbaa14a92017-11-03 09:09:36 -07003249 external_connectivity_watcher* w =
Alexander Polcync3b1f182017-04-18 13:51:36 -07003250 chand->external_connectivity_watcher_list_head;
Craig Tiller4782d922017-11-10 09:53:21 -08003251 while (w != nullptr) {
Alexander Polcync3b1f182017-04-18 13:51:36 -07003252 count++;
3253 w = w->next;
3254 }
3255 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
3256
3257 return count;
3258}
3259
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003260static void on_external_watch_complete_locked(void* arg, grpc_error* error) {
Noah Eisen4d20a662018-02-09 09:34:04 -08003261 external_connectivity_watcher* w =
3262 static_cast<external_connectivity_watcher*>(arg);
Craig Tillerbaa14a92017-11-03 09:09:36 -07003263 grpc_closure* follow_up = w->on_complete;
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003264 grpc_polling_entity_del_from_pollset_set(&w->pollent,
David Garcia Quintas87d5a312017-06-06 19:45:58 -07003265 w->chand->interested_parties);
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003266 GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack,
Craig Tiller1d881fb2015-12-01 07:39:04 -08003267 "external_connectivity_watcher");
Alexander Polcync3b1f182017-04-18 13:51:36 -07003268 external_connectivity_watcher_list_remove(w->chand, w);
Craig Tiller86c99582015-11-25 15:22:26 -08003269 gpr_free(w);
Yash Tibrewal2629f462018-04-30 14:52:31 -07003270 GRPC_CLOSURE_SCHED(follow_up, GRPC_ERROR_REF(error));
Craig Tiller613dafa2017-02-09 12:00:43 -08003271}
3272
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003273static void watch_connectivity_state_locked(void* arg,
Craig Tillerbaa14a92017-11-03 09:09:36 -07003274 grpc_error* error_ignored) {
Noah Eisen4d20a662018-02-09 09:34:04 -08003275 external_connectivity_watcher* w =
3276 static_cast<external_connectivity_watcher*>(arg);
Craig Tiller4782d922017-11-10 09:53:21 -08003277 external_connectivity_watcher* found = nullptr;
3278 if (w->state != nullptr) {
Alexander Polcync3b1f182017-04-18 13:51:36 -07003279 external_connectivity_watcher_list_append(w->chand, w);
Yash Tibrewal446d1ea2018-04-30 16:58:21 -07003280 // An assumption is being made that the closure is scheduled on the exec ctx
3281 // scheduler and that GRPC_CLOSURE_RUN would run the closure immediately.
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003282 GRPC_CLOSURE_RUN(w->watcher_timer_init, GRPC_ERROR_NONE);
Alexander Polcyn2004e392017-10-16 15:14:46 -07003283 GRPC_CLOSURE_INIT(&w->my_closure, on_external_watch_complete_locked, w,
3284 grpc_combiner_scheduler(w->chand->combiner));
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003285 grpc_connectivity_state_notify_on_state_change(&w->chand->state_tracker,
3286 w->state, &w->my_closure);
Alexander Polcync3b1f182017-04-18 13:51:36 -07003287 } else {
Craig Tiller4782d922017-11-10 09:53:21 -08003288 GPR_ASSERT(w->watcher_timer_init == nullptr);
Alexander Polcync3b1f182017-04-18 13:51:36 -07003289 found = lookup_external_connectivity_watcher(w->chand, w->on_complete);
3290 if (found) {
3291 GPR_ASSERT(found->on_complete == w->on_complete);
3292 grpc_connectivity_state_notify_on_state_change(
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003293 &found->chand->state_tracker, nullptr, &found->my_closure);
Alexander Polcync3b1f182017-04-18 13:51:36 -07003294 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003295 grpc_polling_entity_del_from_pollset_set(&w->pollent,
David Garcia Quintas87d5a312017-06-06 19:45:58 -07003296 w->chand->interested_parties);
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003297 GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack,
Alexander Polcync3b1f182017-04-18 13:51:36 -07003298 "external_connectivity_watcher");
3299 gpr_free(w);
3300 }
Craig Tiller86c99582015-11-25 15:22:26 -08003301}
3302
Craig Tillera82950e2015-09-22 12:33:20 -07003303void grpc_client_channel_watch_connectivity_state(
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003304 grpc_channel_element* elem, grpc_polling_entity pollent,
3305 grpc_connectivity_state* state, grpc_closure* closure,
3306 grpc_closure* watcher_timer_init) {
Noah Eisenbe82e642018-02-09 09:16:55 -08003307 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
Craig Tillerbaa14a92017-11-03 09:09:36 -07003308 external_connectivity_watcher* w =
Noah Eisenbe82e642018-02-09 09:16:55 -08003309 static_cast<external_connectivity_watcher*>(gpr_zalloc(sizeof(*w)));
Craig Tiller86c99582015-11-25 15:22:26 -08003310 w->chand = chand;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07003311 w->pollent = pollent;
Mark D. Roth92210832017-05-02 15:04:39 -07003312 w->on_complete = closure;
Craig Tiller613dafa2017-02-09 12:00:43 -08003313 w->state = state;
Alexander Polcync3b1f182017-04-18 13:51:36 -07003314 w->watcher_timer_init = watcher_timer_init;
Yash Tibrewal8cf14702017-12-06 09:47:54 -08003315 grpc_polling_entity_add_to_pollset_set(&w->pollent,
David Garcia Quintas87d5a312017-06-06 19:45:58 -07003316 chand->interested_parties);
Craig Tiller1d881fb2015-12-01 07:39:04 -08003317 GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
3318 "external_connectivity_watcher");
ncteisen274bbbe2017-06-08 14:57:11 -07003319 GRPC_CLOSURE_SCHED(
ncteisen274bbbe2017-06-08 14:57:11 -07003320 GRPC_CLOSURE_INIT(&w->my_closure, watch_connectivity_state_locked, w,
Craig Tilleree4b1452017-05-12 10:56:03 -07003321 grpc_combiner_scheduler(chand->combiner)),
Craig Tiller613dafa2017-02-09 12:00:43 -08003322 GRPC_ERROR_NONE);
Craig Tiller48cb07c2015-07-15 16:16:15 -07003323}
Mark D. Roth718c8342018-02-28 13:00:04 -08003324
3325grpc_subchannel_call* grpc_client_channel_get_subchannel_call(
3326 grpc_call_element* elem) {
3327 call_data* calld = static_cast<call_data*>(elem->call_data);
3328 return calld->subchannel_call;
3329}