Merge pull request #15962 from sreecha/fix-dns-job
Move executor implementation into GrpcExecutor class
diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h
index c32e99e..5fd080c 100644
--- a/include/grpc/impl/codegen/grpc_types.h
+++ b/include/grpc/impl/codegen/grpc_types.h
@@ -196,8 +196,8 @@
data frame, Int valued, milliseconds. */
#define GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS \
"grpc.http2.min_time_between_pings_ms"
-/** Minimum allowed time between receiving successive ping frames without
- sending any data frame. Int valued, milliseconds */
+/** Minimum allowed time between a server receiving successive ping frames
+ without sending any data frame. Int valued, milliseconds */
#define GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS \
"grpc.http2.min_ping_interval_without_data_ms"
/** Channel arg to override the http2 :scheme header */
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 520431e..04f7a2c 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -126,9 +126,9 @@
/* the following properties are guarded by a mutex since APIs require them
to be instantaneously available */
gpr_mu info_mu;
- char* info_lb_policy_name;
+ grpc_core::UniquePtr<char> info_lb_policy_name;
/** service config in JSON form */
- char* info_service_config_json;
+ grpc_core::UniquePtr<char> info_service_config_json;
} channel_data;
typedef struct {
@@ -284,6 +284,78 @@
}
}
+// Invoked from the resolver NextLocked() callback when the resolver
+// is shutting down.
+static void on_resolver_shutdown_locked(channel_data* chand,
+ grpc_error* error) {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO, "chand=%p: shutting down", chand);
+ }
+ if (chand->lb_policy != nullptr) {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", chand,
+ chand->lb_policy.get());
+ }
+ grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
+ chand->interested_parties);
+ chand->lb_policy.reset();
+ }
+ if (chand->resolver != nullptr) {
+ // This should never happen; it can only be triggered by a resolver
+ // implementation spotaneously deciding to report shutdown without
+ // being orphaned. This code is included just to be defensive.
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO, "chand=%p: spontaneous shutdown from resolver %p",
+ chand, chand->resolver.get());
+ }
+ chand->resolver.reset();
+ set_channel_connectivity_state_locked(
+ chand, GRPC_CHANNEL_SHUTDOWN,
+ GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "Resolver spontaneous shutdown", &error, 1),
+ "resolver_spontaneous_shutdown");
+ }
+ grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures,
+ GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "Channel disconnected", &error, 1));
+ GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
+ GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "resolver");
+ grpc_channel_args_destroy(chand->resolver_result);
+ chand->resolver_result = nullptr;
+ GRPC_ERROR_UNREF(error);
+}
+
+// Returns the LB policy name from the resolver result.
+static grpc_core::UniquePtr<char>
+get_lb_policy_name_from_resolver_result_locked(channel_data* chand) {
+ // Find LB policy name in channel args.
+ const grpc_arg* channel_arg =
+ grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_POLICY_NAME);
+ const char* lb_policy_name = grpc_channel_arg_get_string(channel_arg);
+ // Special case: If at least one balancer address is present, we use
+ // the grpclb policy, regardless of what the resolver actually specified.
+ channel_arg =
+ grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES);
+ if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) {
+ grpc_lb_addresses* addresses =
+ static_cast<grpc_lb_addresses*>(channel_arg->value.pointer.p);
+ if (grpc_lb_addresses_contains_balancer_address(*addresses)) {
+ if (lb_policy_name != nullptr &&
+ gpr_stricmp(lb_policy_name, "grpclb") != 0) {
+ gpr_log(GPR_INFO,
+ "resolver requested LB policy %s but provided at least one "
+ "balancer address -- forcing use of grpclb LB policy",
+ lb_policy_name);
+ }
+ lb_policy_name = "grpclb";
+ }
+ }
+ // Use pick_first if nothing was specified and we didn't select grpclb
+ // above.
+ if (lb_policy_name == nullptr) lb_policy_name = "pick_first";
+ return grpc_core::UniquePtr<char>(gpr_strdup(lb_policy_name));
+}
+
static void request_reresolution_locked(void* arg, grpc_error* error) {
reresolution_request_args* args =
static_cast<reresolution_request_args*>(arg);
@@ -304,234 +376,183 @@
chand->lb_policy->SetReresolutionClosureLocked(&args->closure);
}
-// TODO(roth): The logic in this function is very hard to follow. We
-// should refactor this so that it's easier to understand, perhaps as
-// part of changing the resolver API to more clearly differentiate
-// between transient failures and shutdown.
-static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
- channel_data* chand = static_cast<channel_data*>(arg);
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "chand=%p: got resolver result: resolver_result=%p error=%s", chand,
- chand->resolver_result, grpc_error_string(error));
- }
- // Extract the following fields from the resolver result, if non-nullptr.
- bool lb_policy_updated = false;
- bool lb_policy_created = false;
- char* lb_policy_name_dup = nullptr;
- bool lb_policy_name_changed = false;
- grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> new_lb_policy;
- char* service_config_json = nullptr;
- grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
- grpc_core::RefCountedPtr<MethodParamsTable> method_params_table;
- if (chand->resolver_result != nullptr) {
- if (chand->resolver != nullptr) {
- // Find LB policy name.
- const grpc_arg* channel_arg = grpc_channel_args_find(
- chand->resolver_result, GRPC_ARG_LB_POLICY_NAME);
- const char* lb_policy_name = grpc_channel_arg_get_string(channel_arg);
- // Special case: If at least one balancer address is present, we use
- // the grpclb policy, regardless of what the resolver actually specified.
- channel_arg =
- grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES);
- if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) {
- grpc_lb_addresses* addresses =
- static_cast<grpc_lb_addresses*>(channel_arg->value.pointer.p);
- bool found_balancer_address = false;
- for (size_t i = 0; i < addresses->num_addresses; ++i) {
- if (addresses->addresses[i].is_balancer) {
- found_balancer_address = true;
- break;
- }
- }
- if (found_balancer_address) {
- if (lb_policy_name != nullptr &&
- strcmp(lb_policy_name, "grpclb") != 0) {
- gpr_log(GPR_INFO,
- "resolver requested LB policy %s but provided at least one "
- "balancer address -- forcing use of grpclb LB policy",
- lb_policy_name);
- }
- lb_policy_name = "grpclb";
- }
- }
- // Use pick_first if nothing was specified and we didn't select grpclb
- // above.
- if (lb_policy_name == nullptr) lb_policy_name = "pick_first";
- // Check to see if we're already using the right LB policy.
- // Note: It's safe to use chand->info_lb_policy_name here without
- // taking a lock on chand->info_mu, because this function is the
- // only thing that modifies its value, and it can only be invoked
- // once at any given time.
- lb_policy_name_changed =
- chand->info_lb_policy_name == nullptr ||
- gpr_stricmp(chand->info_lb_policy_name, lb_policy_name) != 0;
- if (chand->lb_policy != nullptr && !lb_policy_name_changed) {
- // Continue using the same LB policy. Update with new addresses.
- lb_policy_updated = true;
- chand->lb_policy->UpdateLocked(*chand->resolver_result);
- } else {
- // Instantiate new LB policy.
- grpc_core::LoadBalancingPolicy::Args lb_policy_args;
- lb_policy_args.combiner = chand->combiner;
- lb_policy_args.client_channel_factory = chand->client_channel_factory;
- lb_policy_args.args = chand->resolver_result;
- new_lb_policy =
- grpc_core::LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
- lb_policy_name, lb_policy_args);
- if (GPR_UNLIKELY(new_lb_policy == nullptr)) {
- gpr_log(GPR_ERROR, "could not create LB policy \"%s\"",
- lb_policy_name);
- } else {
- lb_policy_created = true;
- reresolution_request_args* args =
- static_cast<reresolution_request_args*>(
- gpr_zalloc(sizeof(*args)));
- args->chand = chand;
- args->lb_policy = new_lb_policy.get();
- GRPC_CLOSURE_INIT(&args->closure, request_reresolution_locked, args,
- grpc_combiner_scheduler(chand->combiner));
- GRPC_CHANNEL_STACK_REF(chand->owning_stack, "re-resolution");
- new_lb_policy->SetReresolutionClosureLocked(&args->closure);
- }
- }
- // Before we clean up, save a copy of lb_policy_name, since it might
- // be pointing to data inside chand->resolver_result.
- // The copy will be saved in chand->lb_policy_name below.
- lb_policy_name_dup = gpr_strdup(lb_policy_name);
- // Find service config.
- channel_arg = grpc_channel_args_find(chand->resolver_result,
- GRPC_ARG_SERVICE_CONFIG);
- service_config_json =
- gpr_strdup(grpc_channel_arg_get_string(channel_arg));
- if (service_config_json != nullptr) {
- grpc_core::UniquePtr<grpc_core::ServiceConfig> service_config =
- grpc_core::ServiceConfig::Create(service_config_json);
- if (service_config != nullptr) {
- if (chand->enable_retries) {
- channel_arg = grpc_channel_args_find(chand->resolver_result,
- GRPC_ARG_SERVER_URI);
- const char* server_uri = grpc_channel_arg_get_string(channel_arg);
- GPR_ASSERT(server_uri != nullptr);
- grpc_uri* uri = grpc_uri_parse(server_uri, true);
- GPR_ASSERT(uri->path[0] != '\0');
- service_config_parsing_state parsing_state;
- memset(&parsing_state, 0, sizeof(parsing_state));
- parsing_state.server_name =
- uri->path[0] == '/' ? uri->path + 1 : uri->path;
- service_config->ParseGlobalParams(parse_retry_throttle_params,
- &parsing_state);
- grpc_uri_destroy(uri);
- retry_throttle_data = std::move(parsing_state.retry_throttle_data);
- }
- method_params_table = service_config->CreateMethodConfigTable(
- ClientChannelMethodParams::CreateFromJson);
- }
- }
+// Creates a new LB policy, replacing any previous one.
+// If the new policy is created successfully, sets *connectivity_state and
+// *connectivity_error to its initial connectivity state; otherwise,
+// leaves them unchanged.
+static void create_new_lb_policy_locked(
+ channel_data* chand, char* lb_policy_name,
+ grpc_connectivity_state* connectivity_state,
+ grpc_error** connectivity_error) {
+ grpc_core::LoadBalancingPolicy::Args lb_policy_args;
+ lb_policy_args.combiner = chand->combiner;
+ lb_policy_args.client_channel_factory = chand->client_channel_factory;
+ lb_policy_args.args = chand->resolver_result;
+ grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> new_lb_policy =
+ grpc_core::LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
+ lb_policy_name, lb_policy_args);
+ if (GPR_UNLIKELY(new_lb_policy == nullptr)) {
+ gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name);
+ } else {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO, "chand=%p: created new LB policy \"%s\" (%p)", chand,
+ lb_policy_name, new_lb_policy.get());
}
- }
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "chand=%p: resolver result: lb_policy_name=\"%s\"%s, "
- "service_config=\"%s\"",
- chand, lb_policy_name_dup,
- lb_policy_name_changed ? " (changed)" : "", service_config_json);
- }
- // Now swap out fields in chand. Note that the new values may still
- // be nullptr if (e.g.) the resolver failed to return results or the
- // results did not contain the necessary data.
- //
- // First, swap out the data used by cc_get_channel_info().
- gpr_mu_lock(&chand->info_mu);
- if (lb_policy_name_dup != nullptr) {
- gpr_free(chand->info_lb_policy_name);
- chand->info_lb_policy_name = lb_policy_name_dup;
- }
- if (service_config_json != nullptr) {
- gpr_free(chand->info_service_config_json);
- chand->info_service_config_json = service_config_json;
- }
- gpr_mu_unlock(&chand->info_mu);
- // Swap out the retry throttle data.
- chand->retry_throttle_data = std::move(retry_throttle_data);
- // Swap out the method params table.
- chand->method_params_table = std::move(method_params_table);
- // If we have a new LB policy or are shutting down (in which case
- // new_lb_policy will be nullptr), swap out the LB policy, unreffing the
- // old one and removing its fds from chand->interested_parties.
- // Note that we do NOT do this if either (a) we updated the existing
- // LB policy above or (b) we failed to create the new LB policy (in
- // which case we want to continue using the most recent one we had).
- if (new_lb_policy != nullptr || error != GRPC_ERROR_NONE ||
- chand->resolver == nullptr) {
+ // Swap out the LB policy and update the fds in
+ // chand->interested_parties.
if (chand->lb_policy != nullptr) {
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p: unreffing lb_policy=%p", chand,
+ gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", chand,
chand->lb_policy.get());
}
grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
chand->interested_parties);
chand->lb_policy->HandOffPendingPicksLocked(new_lb_policy.get());
- chand->lb_policy.reset();
}
chand->lb_policy = std::move(new_lb_policy);
+ grpc_pollset_set_add_pollset_set(chand->lb_policy->interested_parties(),
+ chand->interested_parties);
+ // Set up re-resolution callback.
+ reresolution_request_args* args =
+ static_cast<reresolution_request_args*>(gpr_zalloc(sizeof(*args)));
+ args->chand = chand;
+ args->lb_policy = chand->lb_policy.get();
+ GRPC_CLOSURE_INIT(&args->closure, request_reresolution_locked, args,
+ grpc_combiner_scheduler(chand->combiner));
+ GRPC_CHANNEL_STACK_REF(chand->owning_stack, "re-resolution");
+ chand->lb_policy->SetReresolutionClosureLocked(&args->closure);
+ // Get the new LB policy's initial connectivity state and start a
+ // connectivity watch.
+ GRPC_ERROR_UNREF(*connectivity_error);
+ *connectivity_state =
+ chand->lb_policy->CheckConnectivityLocked(connectivity_error);
+ if (chand->exit_idle_when_lb_policy_arrives) {
+ chand->lb_policy->ExitIdleLocked();
+ chand->exit_idle_when_lb_policy_arrives = false;
+ }
+ watch_lb_policy_locked(chand, chand->lb_policy.get(), *connectivity_state);
}
- // Now that we've swapped out the relevant fields of chand, check for
- // error or shutdown.
- if (error != GRPC_ERROR_NONE || chand->resolver == nullptr) {
+}
+
+// Returns the service config (as a JSON string) from the resolver result.
+// Also updates state in chand.
+static grpc_core::UniquePtr<char>
+get_service_config_from_resolver_result_locked(channel_data* chand) {
+ const grpc_arg* channel_arg =
+ grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG);
+ const char* service_config_json = grpc_channel_arg_get_string(channel_arg);
+ if (service_config_json != nullptr) {
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p: shutting down", chand);
+ gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"",
+ chand, service_config_json);
}
- if (chand->resolver != nullptr) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p: shutting down resolver", chand);
+ grpc_core::UniquePtr<grpc_core::ServiceConfig> service_config =
+ grpc_core::ServiceConfig::Create(service_config_json);
+ if (service_config != nullptr) {
+ if (chand->enable_retries) {
+ channel_arg =
+ grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI);
+ const char* server_uri = grpc_channel_arg_get_string(channel_arg);
+ GPR_ASSERT(server_uri != nullptr);
+ grpc_uri* uri = grpc_uri_parse(server_uri, true);
+ GPR_ASSERT(uri->path[0] != '\0');
+ service_config_parsing_state parsing_state;
+ memset(&parsing_state, 0, sizeof(parsing_state));
+ parsing_state.server_name =
+ uri->path[0] == '/' ? uri->path + 1 : uri->path;
+ service_config->ParseGlobalParams(parse_retry_throttle_params,
+ &parsing_state);
+ grpc_uri_destroy(uri);
+ chand->retry_throttle_data =
+ std::move(parsing_state.retry_throttle_data);
}
- chand->resolver.reset();
+ chand->method_params_table = service_config->CreateMethodConfigTable(
+ ClientChannelMethodParams::CreateFromJson);
}
- set_channel_connectivity_state_locked(
- chand, GRPC_CHANNEL_SHUTDOWN,
- GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "Got resolver result after disconnection", &error, 1),
- "resolver_gone");
- grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures,
- GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "Channel disconnected", &error, 1));
- GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
- GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "resolver");
- grpc_channel_args_destroy(chand->resolver_result);
- chand->resolver_result = nullptr;
- } else { // Not shutting down.
- grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
- grpc_error* state_error =
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy");
- if (lb_policy_created) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p: initializing new LB policy", chand);
- }
- GRPC_ERROR_UNREF(state_error);
- state = chand->lb_policy->CheckConnectivityLocked(&state_error);
- grpc_pollset_set_add_pollset_set(chand->lb_policy->interested_parties(),
- chand->interested_parties);
- GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
- if (chand->exit_idle_when_lb_policy_arrives) {
- chand->lb_policy->ExitIdleLocked();
- chand->exit_idle_when_lb_policy_arrives = false;
- }
- watch_lb_policy_locked(chand, chand->lb_policy.get(), state);
- } else if (chand->resolver_result == nullptr) {
- // Transient failure.
- GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
- }
- if (!lb_policy_updated) {
- set_channel_connectivity_state_locked(
- chand, state, GRPC_ERROR_REF(state_error), "new_lb+resolver");
- }
- grpc_channel_args_destroy(chand->resolver_result);
- chand->resolver_result = nullptr;
- chand->resolver->NextLocked(&chand->resolver_result,
- &chand->on_resolver_result_changed);
- GRPC_ERROR_UNREF(state_error);
}
+ return grpc_core::UniquePtr<char>(gpr_strdup(service_config_json));
+}
+
+// Callback invoked when a resolver result is available.
+static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
+ channel_data* chand = static_cast<channel_data*>(arg);
+ if (grpc_client_channel_trace.enabled()) {
+ const char* disposition =
+ chand->resolver_result != nullptr
+ ? ""
+ : (error == GRPC_ERROR_NONE ? " (transient error)"
+ : " (resolver shutdown)");
+ gpr_log(GPR_INFO,
+ "chand=%p: got resolver result: resolver_result=%p error=%s%s",
+ chand, chand->resolver_result, grpc_error_string(error),
+ disposition);
+ }
+ // Handle shutdown.
+ if (error != GRPC_ERROR_NONE || chand->resolver == nullptr) {
+ on_resolver_shutdown_locked(chand, GRPC_ERROR_REF(error));
+ return;
+ }
+ // Data used to set the channel's connectivity state.
+ bool set_connectivity_state = true;
+ grpc_connectivity_state connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
+ grpc_error* connectivity_error =
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy");
+ // chand->resolver_result will be null in the case of a transient
+ // resolution error. In that case, we don't have any new result to
+ // process, which means that we keep using the previous result (if any).
+ if (chand->resolver_result == nullptr) {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO, "chand=%p: resolver transient failure", chand);
+ }
+ } else {
+ grpc_core::UniquePtr<char> lb_policy_name =
+ get_lb_policy_name_from_resolver_result_locked(chand);
+ // Check to see if we're already using the right LB policy.
+ // Note: It's safe to use chand->info_lb_policy_name here without
+ // taking a lock on chand->info_mu, because this function is the
+ // only thing that modifies its value, and it can only be invoked
+ // once at any given time.
+ bool lb_policy_name_changed = chand->info_lb_policy_name == nullptr ||
+ gpr_stricmp(chand->info_lb_policy_name.get(),
+ lb_policy_name.get()) != 0;
+ if (chand->lb_policy != nullptr && !lb_policy_name_changed) {
+ // Continue using the same LB policy. Update with new addresses.
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO, "chand=%p: updating existing LB policy \"%s\" (%p)",
+ chand, lb_policy_name.get(), chand->lb_policy.get());
+ }
+ chand->lb_policy->UpdateLocked(*chand->resolver_result);
+ // No need to set the channel's connectivity state; the existing
+ // watch on the LB policy will take care of that.
+ set_connectivity_state = false;
+ } else {
+ // Instantiate new LB policy.
+ create_new_lb_policy_locked(chand, lb_policy_name.get(),
+ &connectivity_state, &connectivity_error);
+ }
+ // Find service config.
+ grpc_core::UniquePtr<char> service_config_json =
+ get_service_config_from_resolver_result_locked(chand);
+ // Swap out the data used by cc_get_channel_info().
+ gpr_mu_lock(&chand->info_mu);
+ chand->info_lb_policy_name = std::move(lb_policy_name);
+ chand->info_service_config_json = std::move(service_config_json);
+ gpr_mu_unlock(&chand->info_mu);
+ // Clean up.
+ grpc_channel_args_destroy(chand->resolver_result);
+ chand->resolver_result = nullptr;
+ }
+ // Set the channel's connectivity state if needed.
+ if (set_connectivity_state) {
+ set_channel_connectivity_state_locked(
+ chand, connectivity_state, connectivity_error, "resolver_result");
+ } else {
+ GRPC_ERROR_UNREF(connectivity_error);
+ }
+ // Invoke closures that were waiting for results and renew the watch.
+ GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
+ chand->resolver->NextLocked(&chand->resolver_result,
+ &chand->on_resolver_result_changed);
}
static void start_transport_op_locked(void* arg, grpc_error* error_ignored) {
@@ -611,15 +632,11 @@
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
gpr_mu_lock(&chand->info_mu);
if (info->lb_policy_name != nullptr) {
- *info->lb_policy_name = chand->info_lb_policy_name == nullptr
- ? nullptr
- : gpr_strdup(chand->info_lb_policy_name);
+ *info->lb_policy_name = gpr_strdup(chand->info_lb_policy_name.get());
}
if (info->service_config_json != nullptr) {
*info->service_config_json =
- chand->info_service_config_json == nullptr
- ? nullptr
- : gpr_strdup(chand->info_service_config_json);
+ gpr_strdup(chand->info_service_config_json.get());
}
gpr_mu_unlock(&chand->info_mu);
}
@@ -699,19 +716,15 @@
return GRPC_ERROR_NONE;
}
-static void shutdown_resolver_locked(void* arg, grpc_error* error) {
- grpc_core::Resolver* resolver = static_cast<grpc_core::Resolver*>(arg);
- resolver->Orphan();
-}
-
/* Destructor for channel_data */
static void cc_destroy_channel_elem(grpc_channel_element* elem) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
if (chand->resolver != nullptr) {
- GRPC_CLOSURE_SCHED(
- GRPC_CLOSURE_CREATE(shutdown_resolver_locked, chand->resolver.release(),
- grpc_combiner_scheduler(chand->combiner)),
- GRPC_ERROR_NONE);
+ // The only way we can get here is if we never started resolving,
+ // because we take a ref to the channel stack when we start
+ // resolving and do not release it until the resolver callback is
+ // invoked after the resolver shuts down.
+ chand->resolver.reset();
}
if (chand->client_channel_factory != nullptr) {
grpc_client_channel_factory_unref(chand->client_channel_factory);
@@ -721,8 +734,10 @@
chand->interested_parties);
chand->lb_policy.reset();
}
- gpr_free(chand->info_lb_policy_name);
- gpr_free(chand->info_service_config_json);
+ // TODO(roth): Once we convert the filter API to C++, there will no
+ // longer be any need to explicitly reset these smart pointer data members.
+ chand->info_lb_policy_name.reset();
+ chand->info_service_config_json.reset();
chand->retry_throttle_data.reset();
chand->method_params_table.reset();
grpc_client_channel_stop_backup_polling(chand->interested_parties);
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index b177385..42e8e88 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -354,11 +354,11 @@
if (DoPickLocked(pick)) return true;
}
/* no pick currently available. Save for later in list of pending picks */
+ pick->next = pending_picks_;
+ pending_picks_ = pick;
if (!started_picking_) {
StartPickingLocked();
}
- pick->next = pending_picks_;
- pending_picks_ = pick;
return false;
}
diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.cc b/src/core/ext/filters/client_channel/lb_policy_factory.cc
index 7c8cba5..5c6363d 100644
--- a/src/core/ext/filters/client_channel/lb_policy_factory.cc
+++ b/src/core/ext/filters/client_channel/lb_policy_factory.cc
@@ -153,3 +153,11 @@
return nullptr;
return static_cast<grpc_lb_addresses*>(lb_addresses_arg->value.pointer.p);
}
+
+bool grpc_lb_addresses_contains_balancer_address(
+ const grpc_lb_addresses& addresses) {
+ for (size_t i = 0; i < addresses.num_addresses; ++i) {
+ if (addresses.addresses[i].is_balancer) return true;
+ }
+ return false;
+}
diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.h b/src/core/ext/filters/client_channel/lb_policy_factory.h
index 6440258..c07792d 100644
--- a/src/core/ext/filters/client_channel/lb_policy_factory.h
+++ b/src/core/ext/filters/client_channel/lb_policy_factory.h
@@ -101,6 +101,10 @@
grpc_lb_addresses* grpc_lb_addresses_find_channel_arg(
const grpc_channel_args* channel_args);
+// Returns true if addresses contains at least one balancer address.
+bool grpc_lb_addresses_contains_balancer_address(
+ const grpc_lb_addresses& addresses);
+
//
// LB policy factory
//
diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs
index 6bb2f6c..a6a1d8a 100644
--- a/src/csharp/Grpc.Core/GrpcEnvironment.cs
+++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs
@@ -422,9 +422,32 @@
{
if (!hooksRegistered)
{
+ // Under normal circumstances, the user is expected to shutdown all
+ // the gRPC channels and servers before the application exits. The following
+ // hooks provide some extra handling for cases when this is not the case,
+ // in the effort to achieve a reasonable behavior on shutdown.
#if NETSTANDARD1_5
- System.Runtime.Loader.AssemblyLoadContext.Default.Unloading += (assemblyLoadContext) => { HandleShutdown(); };
+ // No action required at shutdown on .NET Core
+ // - In-progress P/Invoke calls (such as grpc_completion_queue_next) don't seem
+ // to prevent a .NET core application from terminating, so no special handling
+ // is needed.
+ // - .NET core doesn't run finalizers on shutdown, so there's no risk of getting
+ // a crash because grpc_*_destroy methods for native objects being invoked
+ // in wrong order.
+ // TODO(jtattermusch): Verify that the shutdown hooks are still not needed
+ // once we add support for new platforms using netstandard (e.g. Xamarin).
#else
+ // On desktop .NET framework and Mono, we need to register for a shutdown
+ // event to explicitly shutdown the GrpcEnvironment.
+ // - On Desktop .NET framework, we need to do a proper shutdown to prevent a crash
+ // when the framework attempts to run the finalizers for SafeHandle object representing the native
+ // grpc objects. The finalizers calls the native grpc_*_destroy methods (e.g. grpc_server_destroy)
+ // in a random order, which is not supported by gRPC.
+ // - On Mono, the process would hang as the GrpcThreadPool threads are sleeping
+ // in grpc_completion_queue_next P/Invoke invocation and mono won't let the
+ // process shutdown until the P/Invoke calls return. We achieve that by shutting down
+ // the completion queue(s) which associated with the GrpcThreadPool, which will
+ // cause the grpc_completion_queue_next calls to return immediately.
AppDomain.CurrentDomain.ProcessExit += (sender, eventArgs) => { HandleShutdown(); };
AppDomain.CurrentDomain.DomainUnload += (sender, eventArgs) => { HandleShutdown(); };
#endif
diff --git a/templates/tools/dockerfile/bazel.include b/templates/tools/dockerfile/bazel.include
index 2c0f382..ee4f955 100644
--- a/templates/tools/dockerfile/bazel.include
+++ b/templates/tools/dockerfile/bazel.include
@@ -2,4 +2,4 @@
# Bazel installation
RUN echo "deb [arch=amd64] http://storage.googleapis.com/bazel-apt stable jdk1.8" > /etc/apt/sources.list.d/bazel.list
RUN curl https://bazel.build/bazel-release.pub.gpg | apt-key add -
-RUN apt-get -y update && apt-get -y install bazel=0.13.1 && apt-get clean
+RUN apt-get -y update && apt-get -y install bazel=0.15.0 && apt-get clean
diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc
index feea7c3..8896fc6 100644
--- a/test/cpp/end2end/client_lb_end2end_test.cc
+++ b/test/cpp/end2end/client_lb_end2end_test.cc
@@ -539,6 +539,23 @@
EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName());
}
+TEST_F(ClientLbEnd2endTest, RoundRobinProcessPending) {
+ StartServers(1); // Single server
+ auto channel = BuildChannel("round_robin");
+ auto stub = BuildStub(channel);
+ SetNextResolution({servers_[0]->port_});
+ WaitForServer(stub, 0, DEBUG_LOCATION);
+ // Create a new channel and its corresponding RR LB policy, which will pick
+ // the subchannels in READY state from the previous RPC against the same
+ // target (even if it happened over a different channel, because subchannels
+ // are globally reused). Progress should happen without any transition from
+ // this READY state.
+ auto second_channel = BuildChannel("round_robin");
+ auto second_stub = BuildStub(second_channel);
+ SetNextResolution({servers_[0]->port_});
+ CheckRpcSendOk(second_stub, DEBUG_LOCATION);
+}
+
TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
// Start servers and send one RPC per server.
const int kNumServers = 3;
diff --git a/tools/dockerfile/test/bazel/Dockerfile b/tools/dockerfile/test/bazel/Dockerfile
index 2cb2c12..1f73311 100644
--- a/tools/dockerfile/test/bazel/Dockerfile
+++ b/tools/dockerfile/test/bazel/Dockerfile
@@ -30,7 +30,7 @@
# Bazel installation
RUN echo "deb [arch=amd64] http://storage.googleapis.com/bazel-apt stable jdk1.8" > /etc/apt/sources.list.d/bazel.list
RUN curl https://bazel.build/bazel-release.pub.gpg | apt-key add -
-RUN apt-get -y update && apt-get -y install bazel=0.13.1 && apt-get clean
+RUN apt-get -y update && apt-get -y install bazel=0.15.0 && apt-get clean
RUN mkdir -p /var/local/jenkins
diff --git a/tools/internal_ci/linux/grpc_msan_on_foundry.sh b/tools/internal_ci/linux/grpc_msan_on_foundry.sh
index e8ef249..aa1e613 100644
--- a/tools/internal_ci/linux/grpc_msan_on_foundry.sh
+++ b/tools/internal_ci/linux/grpc_msan_on_foundry.sh
@@ -50,7 +50,6 @@
--strategy=Closure=remote \
--genrule_strategy=remote \
--experimental_strict_action_env=true \
- --experimental_remote_platform_override='properties:{name:"container-image" value:"docker://gcr.io/cloud-marketplace/google/rbe-ubuntu16-04@sha256:59bf0e191a6b5cc1ab62c2224c810681d1326bad5a27b1d36c9f40113e79da7f" }' \
--define GRPC_PORT_ISOLATED_RUNTIME=1 \
--copt=-gmlt \
--strip=never \
@@ -63,7 +62,9 @@
--crosstool_top=@com_github_bazelbuild_bazeltoolchains//configs/ubuntu16_04_clang/1.0/bazel_0.13.0/msan:toolchain \
--action_env=BAZEL_DO_NOT_DETECT_CPP_TOOLCHAIN=1 \
--extra_toolchains=@com_github_bazelbuild_bazeltoolchains//configs/ubuntu16_04_clang/1.0/bazel_0.13.0/cpp:cc-toolchain-clang-x86_64-default \
- --extra_execution_platforms=@com_github_bazelbuild_bazeltoolchains//configs/ubuntu16_04_clang/1.0:rbe_ubuntu1604 \
+ --extra_execution_platforms=//third_party/toolchains:rbe_ubuntu1604 \
+ --host_platform=//third_party/toolchains:rbe_ubuntu1604 \
+ --platforms=//third_party/toolchains:rbe_ubuntu1604 \
-- //test/... || FAILED="true"
# Sleep to let ResultStore finish writing results before querying
diff --git a/tools/internal_ci/linux/grpc_ubsan_on_foundry.sh b/tools/internal_ci/linux/grpc_ubsan_on_foundry.sh
index c733a0a..d9b039f 100644
--- a/tools/internal_ci/linux/grpc_ubsan_on_foundry.sh
+++ b/tools/internal_ci/linux/grpc_ubsan_on_foundry.sh
@@ -50,7 +50,6 @@
--strategy=Closure=remote \
--genrule_strategy=remote \
--experimental_strict_action_env=true \
- --experimental_remote_platform_override='properties:{name:"container-image" value:"docker://gcr.io/cloud-marketplace/google/rbe-ubuntu16-04@sha256:59bf0e191a6b5cc1ab62c2224c810681d1326bad5a27b1d36c9f40113e79da7f" }' \
--define GRPC_PORT_ISOLATED_RUNTIME=1 \
--copt=-gmlt \
--strip=never \
@@ -59,7 +58,9 @@
--crosstool_top=@com_github_bazelbuild_bazeltoolchains//configs/experimental/ubuntu16_04_clang/1.0/bazel_0.13.0/ubsan:toolchain \
--action_env=BAZEL_DO_NOT_DETECT_CPP_TOOLCHAIN=1 \
--extra_toolchains=@com_github_bazelbuild_bazeltoolchains//configs/ubuntu16_04_clang/1.0/bazel_0.13.0/cpp:cc-toolchain-clang-x86_64-default \
- --extra_execution_platforms=@com_github_bazelbuild_bazeltoolchains//configs/ubuntu16_04_clang/1.0:rbe_ubuntu1604 \
+ --extra_execution_platforms=//third_party/toolchains:rbe_ubuntu1604 \
+ --host_platform=//third_party/toolchains:rbe_ubuntu1604 \
+ --platforms=//third_party/toolchains:rbe_ubuntu1604 \
--runs_per_test_detects_flakes \
--runs_per_test=2 \
-- //test/... || FAILED="true"
diff --git a/tools/internal_ci/linux/pull_request/grpc_ubsan_on_foundry.sh b/tools/internal_ci/linux/pull_request/grpc_ubsan_on_foundry.sh
index e3db4d6..c7ab5b1 100644
--- a/tools/internal_ci/linux/pull_request/grpc_ubsan_on_foundry.sh
+++ b/tools/internal_ci/linux/pull_request/grpc_ubsan_on_foundry.sh
@@ -50,7 +50,6 @@
--strategy=Closure=remote \
--genrule_strategy=remote \
--experimental_strict_action_env=true \
- --experimental_remote_platform_override='properties:{name:"container-image" value:"docker://gcr.io/cloud-marketplace/google/rbe-ubuntu16-04@sha256:59bf0e191a6b5cc1ab62c2224c810681d1326bad5a27b1d36c9f40113e79da7f" }' \
--define GRPC_PORT_ISOLATED_RUNTIME=1 \
--copt=-gmlt \
--strip=never \
@@ -59,7 +58,9 @@
--crosstool_top=@com_github_bazelbuild_bazeltoolchains//configs/experimental/ubuntu16_04_clang/1.0/bazel_0.13.0/ubsan:toolchain \
--action_env=BAZEL_DO_NOT_DETECT_CPP_TOOLCHAIN=1 \
--extra_toolchains=@com_github_bazelbuild_bazeltoolchains//configs/ubuntu16_04_clang/1.0/bazel_0.13.0/cpp:cc-toolchain-clang-x86_64-default \
- --extra_execution_platforms=@com_github_bazelbuild_bazeltoolchains//configs/ubuntu16_04_clang/1.0:rbe_ubuntu1604 \
+ --extra_execution_platforms=//third_party/toolchains:rbe_ubuntu1604 \
+ --host_platform=//third_party/toolchains:rbe_ubuntu1604 \
+ --platforms=//third_party/toolchains:rbe_ubuntu1604 \
-- //test/... || FAILED="true"
if [ "$FAILED" != "" ]
diff --git a/tools/run_tests/python_utils/upload_rbe_results.py b/tools/run_tests/python_utils/upload_rbe_results.py
index cbeb1ad..d29ebc6 100644
--- a/tools/run_tests/python_utils/upload_rbe_results.py
+++ b/tools/run_tests/python_utils/upload_rbe_results.py
@@ -136,7 +136,7 @@
resultstore_actions = _get_resultstore_data(api_key, invocation_id)
bq_rows = []
- for action in resultstore_actions:
+ for index, action in enumerate(resultstore_actions):
# Filter out non-test related data, such as build results.
if 'testAction' not in action:
continue
@@ -157,6 +157,23 @@
'timedOut': True
}
}]
+ # When RBE believes its infrastructure is failing, it will abort and
+ # mark running tests as UNKNOWN. These infrastructure failures may be
+ # related to our tests, so we should investigate if specific tests are
+ # repeatedly being marked as UNKNOWN.
+ elif action['statusAttributes']['status'] == 'UNKNOWN':
+ test_cases = [{
+ 'testCase': {
+ 'caseName': str(action['id']['actionId']),
+ 'unknown': True
+ }
+ }]
+ # Take the timestamp from the previous action, which should be
+ # a close approximation.
+ action['timing'] = {
+ 'startTime':
+ resultstore_actions[index - 1]['timing']['startTime']
+ }
else:
test_cases = action['testAction']['testSuite']['tests'][0][
'testSuite']['tests']
@@ -165,28 +182,55 @@
result = 'FAILED'
elif 'timedOut' in test_case['testCase']:
result = 'TIMEOUT'
+ elif 'unknown' in test_case['testCase']:
+ result = 'UNKNOWN'
else:
result = 'PASSED'
- bq_rows.append({
- 'insertId': str(uuid.uuid4()),
- 'json': {
- 'job_name':
- os.getenv('KOKORO_JOB_NAME'),
- 'build_id':
- os.getenv('KOKORO_BUILD_NUMBER'),
- 'build_url':
- 'https://source.cloud.google.com/results/invocations/%s' %
- invocation_id,
- 'test_target':
- action['id']['targetId'],
- 'test_case':
- test_case['testCase']['caseName'],
- 'result':
- result,
- 'timestamp':
- action['timing']['startTime'],
- }
- })
+ try:
+ bq_rows.append({
+ 'insertId': str(uuid.uuid4()),
+ 'json': {
+ 'job_name':
+ os.getenv('KOKORO_JOB_NAME'),
+ 'build_id':
+ os.getenv('KOKORO_BUILD_NUMBER'),
+ 'build_url':
+ 'https://source.cloud.google.com/results/invocations/%s'
+ % invocation_id,
+ 'test_target':
+ action['id']['targetId'],
+ 'test_case':
+ test_case['testCase']['caseName'],
+ 'result':
+ result,
+ 'timestamp':
+ action['timing']['startTime'],
+ }
+ })
+ except Exception as e:
+ print('Failed to parse test result. Error: %s' % str(e))
+ print(json.dumps(test_case, indent=4))
+ bq_rows.append({
+ 'insertId': str(uuid.uuid4()),
+ 'json': {
+ 'job_name':
+ os.getenv('KOKORO_JOB_NAME'),
+ 'build_id':
+ os.getenv('KOKORO_BUILD_NUMBER'),
+ 'build_url':
+ 'https://source.cloud.google.com/results/invocations/%s'
+ % invocation_id,
+ 'test_target':
+ action['id']['targetId'],
+ 'test_case':
+ 'N/A',
+ 'result':
+ 'UNPARSEABLE',
+ 'timestamp':
+ 'N/A',
+ }
+ })
+
# BigQuery sometimes fails with large uploads, so batch 1,000 rows at a time.
for i in range((len(bq_rows) / 1000) + 1):
_upload_results_to_bq(bq_rows[i * 1000:(i + 1) * 1000])