Merge branch 'master' of https://github.com/grpc/grpc into connect-auth-doc
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 266f2c0..f71563a 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -394,6 +394,7 @@
add_dependencies(buildtests_c bdp_estimator_test)
add_dependencies(buildtests_c bin_decoder_test)
add_dependencies(buildtests_c bin_encoder_test)
+add_dependencies(buildtests_c byte_stream_test)
add_dependencies(buildtests_c census_context_test)
add_dependencies(buildtests_c census_intrusive_hash_map_test)
add_dependencies(buildtests_c census_resource_test)
@@ -4785,6 +4786,37 @@
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
+add_executable(byte_stream_test
+ test/core/transport/byte_stream_test.c
+)
+
+
+target_include_directories(byte_stream_test
+ PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
+ PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
+ PRIVATE ${BORINGSSL_ROOT_DIR}/include
+ PRIVATE ${PROTOBUF_ROOT_DIR}/src
+ PRIVATE ${BENCHMARK_ROOT_DIR}/include
+ PRIVATE ${ZLIB_ROOT_DIR}
+ PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/zlib
+ PRIVATE ${CARES_BUILD_INCLUDE_DIR}
+ PRIVATE ${CARES_INCLUDE_DIR}
+ PRIVATE ${CARES_PLATFORM_INCLUDE_DIR}
+ PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/cares/cares
+ PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/gflags/include
+)
+
+target_link_libraries(byte_stream_test
+ ${_gRPC_ALLTARGETS_LIBRARIES}
+ grpc_test_util
+ grpc
+ gpr_test_util
+ gpr
+)
+
+endif (gRPC_BUILD_TESTS)
+if (gRPC_BUILD_TESTS)
+
add_executable(census_context_test
test/core/census/context_test.c
)
diff --git a/Makefile b/Makefile
index 7b53024..98cfb04 100644
--- a/Makefile
+++ b/Makefile
@@ -954,6 +954,7 @@
bdp_estimator_test: $(BINDIR)/$(CONFIG)/bdp_estimator_test
bin_decoder_test: $(BINDIR)/$(CONFIG)/bin_decoder_test
bin_encoder_test: $(BINDIR)/$(CONFIG)/bin_encoder_test
+byte_stream_test: $(BINDIR)/$(CONFIG)/byte_stream_test
census_context_test: $(BINDIR)/$(CONFIG)/census_context_test
census_intrusive_hash_map_test: $(BINDIR)/$(CONFIG)/census_intrusive_hash_map_test
census_resource_test: $(BINDIR)/$(CONFIG)/census_resource_test
@@ -1345,6 +1346,7 @@
$(BINDIR)/$(CONFIG)/bdp_estimator_test \
$(BINDIR)/$(CONFIG)/bin_decoder_test \
$(BINDIR)/$(CONFIG)/bin_encoder_test \
+ $(BINDIR)/$(CONFIG)/byte_stream_test \
$(BINDIR)/$(CONFIG)/census_context_test \
$(BINDIR)/$(CONFIG)/census_intrusive_hash_map_test \
$(BINDIR)/$(CONFIG)/census_resource_test \
@@ -1746,6 +1748,8 @@
$(Q) $(BINDIR)/$(CONFIG)/bin_decoder_test || ( echo test bin_decoder_test failed ; exit 1 )
$(E) "[RUN] Testing bin_encoder_test"
$(Q) $(BINDIR)/$(CONFIG)/bin_encoder_test || ( echo test bin_encoder_test failed ; exit 1 )
+ $(E) "[RUN] Testing byte_stream_test"
+ $(Q) $(BINDIR)/$(CONFIG)/byte_stream_test || ( echo test byte_stream_test failed ; exit 1 )
$(E) "[RUN] Testing census_context_test"
$(Q) $(BINDIR)/$(CONFIG)/census_context_test || ( echo test census_context_test failed ; exit 1 )
$(E) "[RUN] Testing census_intrusive_hash_map_test"
@@ -8411,6 +8415,38 @@
endif
+BYTE_STREAM_TEST_SRC = \
+ test/core/transport/byte_stream_test.c \
+
+BYTE_STREAM_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(BYTE_STREAM_TEST_SRC))))
+ifeq ($(NO_SECURE),true)
+
+# You can't build secure targets if you don't have OpenSSL.
+
+$(BINDIR)/$(CONFIG)/byte_stream_test: openssl_dep_error
+
+else
+
+
+
+$(BINDIR)/$(CONFIG)/byte_stream_test: $(BYTE_STREAM_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+ $(E) "[LD] Linking $@"
+ $(Q) mkdir -p `dirname $@`
+ $(Q) $(LD) $(LDFLAGS) $(BYTE_STREAM_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/byte_stream_test
+
+endif
+
+$(OBJDIR)/$(CONFIG)/test/core/transport/byte_stream_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+
+deps_byte_stream_test: $(BYTE_STREAM_TEST_OBJS:.o=.dep)
+
+ifneq ($(NO_SECURE),true)
+ifneq ($(NO_DEPS),true)
+-include $(BYTE_STREAM_TEST_OBJS:.o=.dep)
+endif
+endif
+
+
CENSUS_CONTEXT_TEST_SRC = \
test/core/census/context_test.c \
diff --git a/build.yaml b/build.yaml
index 198467b..9c2504a 100644
--- a/build.yaml
+++ b/build.yaml
@@ -1702,6 +1702,16 @@
deps:
- grpc_test_util
- grpc
+- name: byte_stream_test
+ build: test
+ language: c
+ src:
+ - test/core/transport/byte_stream_test.c
+ deps:
+ - grpc_test_util
+ - grpc
+ - gpr_test_util
+ - gpr
- name: census_context_test
build: test
language: c
diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec
index 90580c5..202af58 100644
--- a/gRPC-Core.podspec
+++ b/gRPC-Core.podspec
@@ -176,7 +176,7 @@
ss.header_mappings_dir = '.'
ss.libraries = 'z'
ss.dependency "#{s.name}/Interface", version
- ss.dependency 'BoringSSL', '~> 8.0'
+ ss.dependency 'BoringSSL', '~> 9.0'
ss.dependency 'nanopb', '~> 0.3'
# To save you from scrolling, this is the last part of the podspec.
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c
index 52c6e38..568bb2b 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c
@@ -88,7 +88,6 @@
// Record call finished, optionally setting client_failed_to_send and
// received.
grpc_grpclb_client_stats_add_call_finished(
- false /* drop_for_rate_limiting */, false /* drop_for_load_balancing */,
!calld->send_initial_metadata_succeeded /* client_failed_to_send */,
calld->recv_initial_metadata_succeeded /* known_received */,
calld->client_stats);
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
index ebce801..bb9217d 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
@@ -416,9 +416,7 @@
static bool is_server_valid(const grpc_grpclb_server *server, size_t idx,
bool log) {
- if (server->drop_for_rate_limiting || server->drop_for_load_balancing) {
- return false;
- }
+ if (server->drop) return false;
const grpc_grpclb_ip_address *ip = &server->ip_address;
if (server->port >> 16 != 0) {
if (log) {
@@ -462,7 +460,7 @@
static void parse_server(const grpc_grpclb_server *server,
grpc_resolved_address *addr) {
memset(addr, 0, sizeof(*addr));
- if (server->drop_for_rate_limiting || server->drop_for_load_balancing) return;
+ if (server->drop) return;
const uint16_t netorder_port = htons((uint16_t)server->port);
/* the addresses are given in binary format (a in(6)_addr struct) in
* server->ip_address.bytes. */
@@ -610,7 +608,7 @@
if (glb_policy->serverlist_index == glb_policy->serverlist->num_servers) {
glb_policy->serverlist_index = 0; // Wrap-around.
}
- if (server->drop_for_rate_limiting || server->drop_for_load_balancing) {
+ if (server->drop) {
// Not using the RR policy, so unref it.
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "Unreffing RR for drop (0x%" PRIxPTR ")",
@@ -622,11 +620,8 @@
// the client_load_reporting filter, because we do not create a
// subchannel call (and therefore no client_load_reporting filter)
// for dropped calls.
- grpc_grpclb_client_stats_add_call_started(wc_arg->client_stats);
- grpc_grpclb_client_stats_add_call_finished(
- server->drop_for_rate_limiting, server->drop_for_load_balancing,
- false /* failed_to_send */, false /* known_received */,
- wc_arg->client_stats);
+ grpc_grpclb_client_stats_add_call_dropped_locked(server->load_balance_token,
+ wc_arg->client_stats);
grpc_grpclb_client_stats_unref(wc_arg->client_stats);
if (force_async) {
GPR_ASSERT(wc_arg->wrapped_closure != NULL);
@@ -715,7 +710,6 @@
return;
}
glb_policy->rr_policy = new_rr_policy;
-
grpc_error *rr_state_error = NULL;
const grpc_connectivity_state rr_state =
grpc_lb_policy_check_connectivity_locked(exec_ctx, glb_policy->rr_policy,
@@ -741,7 +735,7 @@
rr_connectivity->state = rr_state;
/* Subscribe to changes to the connectivity of the new RR */
- GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "rr_connectivity_sched");
+ GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "glb_rr_connectivity_cb");
grpc_lb_policy_notify_on_state_change_locked(exec_ctx, glb_policy->rr_policy,
&rr_connectivity->state,
&rr_connectivity->on_change);
@@ -806,32 +800,31 @@
void *arg, grpc_error *error) {
rr_connectivity_data *rr_connectivity = arg;
glb_lb_policy *glb_policy = rr_connectivity->glb_policy;
-
- const bool shutting_down = glb_policy->shutting_down;
- bool unref_needed = false;
- GRPC_ERROR_REF(error);
-
- if (rr_connectivity->state == GRPC_CHANNEL_SHUTDOWN || shutting_down) {
- /* RR policy shutting down. Don't renew subscription and free the arg of
- * this callback. In addition we need to stash away the current policy to
- * be UNREF'd after releasing the lock. Otherwise, if the UNREF is the last
- * one, the policy would be destroyed, alongside the lock, which would
- * result in a use-after-free */
- unref_needed = true;
- gpr_free(rr_connectivity);
- } else { /* rr state != SHUTDOWN && !shutting down: biz as usual */
- update_lb_connectivity_status_locked(
- exec_ctx, glb_policy, rr_connectivity->state, GRPC_ERROR_REF(error));
- /* Resubscribe. Reuse the "rr_connectivity_cb" weak ref. */
- grpc_lb_policy_notify_on_state_change_locked(
- exec_ctx, glb_policy->rr_policy, &rr_connectivity->state,
- &rr_connectivity->on_change);
- }
- if (unref_needed) {
+ if (glb_policy->shutting_down) {
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
- "rr_connectivity_cb");
+ "glb_rr_connectivity_cb");
+ gpr_free(rr_connectivity);
+ return;
}
- GRPC_ERROR_UNREF(error);
+ if (rr_connectivity->state == GRPC_CHANNEL_SHUTDOWN) {
+ /* An RR policy that has transitioned into the SHUTDOWN connectivity state
+ * should not be considered for picks or updates: the SHUTDOWN state is a
+ * sink, policies can't transition back from it. .*/
+ GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy,
+ "rr_connectivity_shutdown");
+ glb_policy->rr_policy = NULL;
+ GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
+ "glb_rr_connectivity_cb");
+ gpr_free(rr_connectivity);
+ return;
+ }
+ /* rr state != SHUTDOWN && !glb_policy->shutting down: biz as usual */
+ update_lb_connectivity_status_locked(
+ exec_ctx, glb_policy, rr_connectivity->state, GRPC_ERROR_REF(error));
+ /* Resubscribe. Reuse the "glb_rr_connectivity_cb" weak ref. */
+ grpc_lb_policy_notify_on_state_change_locked(exec_ctx, glb_policy->rr_policy,
+ &rr_connectivity->state,
+ &rr_connectivity->on_change);
}
static void destroy_balancer_name(grpc_exec_ctx *exec_ctx,
@@ -995,7 +988,6 @@
gpr_free(glb_policy);
return NULL;
}
-
GRPC_CLOSURE_INIT(&glb_policy->lb_channel_on_connectivity_changed,
glb_lb_channel_on_connectivity_changed_cb, glb_policy,
grpc_combiner_scheduler(args->combiner));
@@ -1052,7 +1044,7 @@
glb_policy->pending_picks = NULL;
pending_ping *pping = glb_policy->pending_pings;
glb_policy->pending_pings = NULL;
- if (glb_policy->rr_policy) {
+ if (glb_policy->rr_policy != NULL) {
GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown");
}
// We destroy the LB channel here because
@@ -1309,15 +1301,14 @@
}
static bool load_report_counters_are_zero(grpc_grpclb_request *request) {
+ grpc_grpclb_dropped_call_counts *drop_entries =
+ request->client_stats.calls_finished_with_drop.arg;
return request->client_stats.num_calls_started == 0 &&
request->client_stats.num_calls_finished == 0 &&
- request->client_stats.num_calls_finished_with_drop_for_rate_limiting ==
- 0 &&
- request->client_stats
- .num_calls_finished_with_drop_for_load_balancing == 0 &&
request->client_stats.num_calls_finished_with_client_failed_to_send ==
0 &&
- request->client_stats.num_calls_finished_known_received == 0;
+ request->client_stats.num_calls_finished_known_received == 0 &&
+ (drop_entries == NULL || drop_entries->num_entries == 0);
}
static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg,
@@ -1332,7 +1323,7 @@
// Construct message payload.
GPR_ASSERT(glb_policy->client_load_report_payload == NULL);
grpc_grpclb_request *request =
- grpc_grpclb_load_report_request_create(glb_policy->client_stats);
+ grpc_grpclb_load_report_request_create_locked(glb_policy->client_stats);
// Skip client load report if the counters were all zero in the last
// report and they are still zero in this one.
if (load_report_counters_are_zero(request)) {
@@ -1778,7 +1769,8 @@
if (!glb_policy->watching_lb_channel) {
// Watch the LB channel connectivity for connection.
- glb_policy->lb_channel_connectivity = GRPC_CHANNEL_INIT;
+ glb_policy->lb_channel_connectivity = grpc_channel_check_connectivity_state(
+ glb_policy->lb_channel, true /* try to connect */);
grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element(
grpc_channel_get_channel_stack(glb_policy->lb_channel));
GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c
index c762443..5b62623 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c
@@ -18,8 +18,11 @@
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h"
+#include <string.h>
+
#include <grpc/support/alloc.h>
#include <grpc/support/atm.h>
+#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
#include <grpc/support/useful.h>
@@ -29,10 +32,11 @@
struct grpc_grpclb_client_stats {
gpr_refcount refs;
+ // This field must only be accessed via *_locked() methods.
+ grpc_grpclb_dropped_call_counts* drop_token_counts;
+ // These fields may be accessed from multiple threads at a time.
gpr_atm num_calls_started;
gpr_atm num_calls_finished;
- gpr_atm num_calls_finished_with_drop_for_rate_limiting;
- gpr_atm num_calls_finished_with_drop_for_load_balancing;
gpr_atm num_calls_finished_with_client_failed_to_send;
gpr_atm num_calls_finished_known_received;
};
@@ -51,6 +55,7 @@
void grpc_grpclb_client_stats_unref(grpc_grpclb_client_stats* client_stats) {
if (gpr_unref(&client_stats->refs)) {
+ grpc_grpclb_dropped_call_counts_destroy(client_stats->drop_token_counts);
gpr_free(client_stats);
}
}
@@ -61,21 +66,9 @@
}
void grpc_grpclb_client_stats_add_call_finished(
- bool finished_with_drop_for_rate_limiting,
- bool finished_with_drop_for_load_balancing,
bool finished_with_client_failed_to_send, bool finished_known_received,
grpc_grpclb_client_stats* client_stats) {
gpr_atm_full_fetch_add(&client_stats->num_calls_finished, (gpr_atm)1);
- if (finished_with_drop_for_rate_limiting) {
- gpr_atm_full_fetch_add(
- &client_stats->num_calls_finished_with_drop_for_rate_limiting,
- (gpr_atm)1);
- }
- if (finished_with_drop_for_load_balancing) {
- gpr_atm_full_fetch_add(
- &client_stats->num_calls_finished_with_drop_for_load_balancing,
- (gpr_atm)1);
- }
if (finished_with_client_failed_to_send) {
gpr_atm_full_fetch_add(
&client_stats->num_calls_finished_with_client_failed_to_send,
@@ -87,32 +80,70 @@
}
}
+void grpc_grpclb_client_stats_add_call_dropped_locked(
+ char* token, grpc_grpclb_client_stats* client_stats) {
+ // Increment num_calls_started and num_calls_finished.
+ gpr_atm_full_fetch_add(&client_stats->num_calls_started, (gpr_atm)1);
+ gpr_atm_full_fetch_add(&client_stats->num_calls_finished, (gpr_atm)1);
+ // Record the drop.
+ if (client_stats->drop_token_counts == NULL) {
+ client_stats->drop_token_counts =
+ gpr_zalloc(sizeof(grpc_grpclb_dropped_call_counts));
+ }
+ grpc_grpclb_dropped_call_counts* drop_token_counts =
+ client_stats->drop_token_counts;
+ for (size_t i = 0; i < drop_token_counts->num_entries; ++i) {
+ if (strcmp(drop_token_counts->token_counts[i].token, token) == 0) {
+ ++drop_token_counts->token_counts[i].count;
+ return;
+ }
+ }
+ // Not found, so add a new entry. We double the size of the array each time.
+ size_t new_num_entries = 2;
+ while (new_num_entries < drop_token_counts->num_entries + 1) {
+ new_num_entries *= 2;
+ }
+ drop_token_counts->token_counts =
+ gpr_realloc(drop_token_counts->token_counts,
+ new_num_entries * sizeof(grpc_grpclb_drop_token_count));
+ grpc_grpclb_drop_token_count* new_entry =
+ &drop_token_counts->token_counts[drop_token_counts->num_entries++];
+ new_entry->token = gpr_strdup(token);
+ new_entry->count = 1;
+}
+
static void atomic_get_and_reset_counter(int64_t* value, gpr_atm* counter) {
*value = (int64_t)gpr_atm_acq_load(counter);
gpr_atm_full_fetch_add(counter, (gpr_atm)(-*value));
}
-void grpc_grpclb_client_stats_get(
+void grpc_grpclb_client_stats_get_locked(
grpc_grpclb_client_stats* client_stats, int64_t* num_calls_started,
int64_t* num_calls_finished,
- int64_t* num_calls_finished_with_drop_for_rate_limiting,
- int64_t* num_calls_finished_with_drop_for_load_balancing,
int64_t* num_calls_finished_with_client_failed_to_send,
- int64_t* num_calls_finished_known_received) {
+ int64_t* num_calls_finished_known_received,
+ grpc_grpclb_dropped_call_counts** drop_token_counts) {
atomic_get_and_reset_counter(num_calls_started,
&client_stats->num_calls_started);
atomic_get_and_reset_counter(num_calls_finished,
&client_stats->num_calls_finished);
atomic_get_and_reset_counter(
- num_calls_finished_with_drop_for_rate_limiting,
- &client_stats->num_calls_finished_with_drop_for_rate_limiting);
- atomic_get_and_reset_counter(
- num_calls_finished_with_drop_for_load_balancing,
- &client_stats->num_calls_finished_with_drop_for_load_balancing);
- atomic_get_and_reset_counter(
num_calls_finished_with_client_failed_to_send,
&client_stats->num_calls_finished_with_client_failed_to_send);
atomic_get_and_reset_counter(
num_calls_finished_known_received,
&client_stats->num_calls_finished_known_received);
+ *drop_token_counts = client_stats->drop_token_counts;
+ client_stats->drop_token_counts = NULL;
+}
+
+void grpc_grpclb_dropped_call_counts_destroy(
+ grpc_grpclb_dropped_call_counts* drop_entries) {
+ if (drop_entries != NULL) {
+ for (size_t i = 0; i < drop_entries->num_entries; ++i) {
+ gpr_free(drop_entries->token_counts[i].token);
+ }
+ gpr_free(drop_entries->token_counts);
+ gpr_free(drop_entries);
+ }
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h
index 4bb47d5..c51e2a4 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h
@@ -25,6 +25,16 @@
typedef struct grpc_grpclb_client_stats grpc_grpclb_client_stats;
+typedef struct {
+ char* token;
+ int64_t count;
+} grpc_grpclb_drop_token_count;
+
+typedef struct {
+ grpc_grpclb_drop_token_count* token_counts;
+ size_t num_entries;
+} grpc_grpclb_dropped_call_counts;
+
grpc_grpclb_client_stats* grpc_grpclb_client_stats_create();
grpc_grpclb_client_stats* grpc_grpclb_client_stats_ref(
grpc_grpclb_client_stats* client_stats);
@@ -33,18 +43,23 @@
void grpc_grpclb_client_stats_add_call_started(
grpc_grpclb_client_stats* client_stats);
void grpc_grpclb_client_stats_add_call_finished(
- bool finished_with_drop_for_rate_limiting,
- bool finished_with_drop_for_load_balancing,
bool finished_with_client_failed_to_send, bool finished_known_received,
grpc_grpclb_client_stats* client_stats);
-void grpc_grpclb_client_stats_get(
+// This method is not thread-safe; caller must synchronize.
+void grpc_grpclb_client_stats_add_call_dropped_locked(
+ char* token, grpc_grpclb_client_stats* client_stats);
+
+// This method is not thread-safe; caller must synchronize.
+void grpc_grpclb_client_stats_get_locked(
grpc_grpclb_client_stats* client_stats, int64_t* num_calls_started,
int64_t* num_calls_finished,
- int64_t* num_calls_finished_with_drop_for_rate_limiting,
- int64_t* num_calls_finished_with_drop_for_load_balancing,
int64_t* num_calls_finished_with_client_failed_to_send,
- int64_t* num_calls_finished_known_received);
+ int64_t* num_calls_finished_known_received,
+ grpc_grpclb_dropped_call_counts** drop_token_counts);
+
+void grpc_grpclb_dropped_call_counts_destroy(
+ grpc_grpclb_dropped_call_counts* drop_entries);
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_CLIENT_STATS_H \
*/
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c
index bec7c97..6fa29f3 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c
@@ -76,7 +76,33 @@
timestamp_pb->nanos = timestamp.tv_nsec;
}
-grpc_grpclb_request *grpc_grpclb_load_report_request_create(
+static bool encode_string(pb_ostream_t *stream, const pb_field_t *field,
+ void *const *arg) {
+ char *str = *arg;
+ if (!pb_encode_tag_for_field(stream, field)) return false;
+ return pb_encode_string(stream, (uint8_t *)str, strlen(str));
+}
+
+static bool encode_drops(pb_ostream_t *stream, const pb_field_t *field,
+ void *const *arg) {
+ grpc_grpclb_dropped_call_counts *drop_entries = *arg;
+ if (drop_entries == NULL) return true;
+ for (size_t i = 0; i < drop_entries->num_entries; ++i) {
+ if (!pb_encode_tag_for_field(stream, field)) return false;
+ grpc_lb_v1_ClientStatsPerToken drop_message;
+ drop_message.load_balance_token.funcs.encode = encode_string;
+ drop_message.load_balance_token.arg = drop_entries->token_counts[i].token;
+ drop_message.has_num_calls = true;
+ drop_message.num_calls = drop_entries->token_counts[i].count;
+ if (!pb_encode_submessage(stream, grpc_lb_v1_ClientStatsPerToken_fields,
+ &drop_message)) {
+ return false;
+ }
+ }
+ return true;
+}
+
+grpc_grpclb_request *grpc_grpclb_load_report_request_create_locked(
grpc_grpclb_client_stats *client_stats) {
grpc_grpclb_request *req = gpr_zalloc(sizeof(grpc_grpclb_request));
req->has_client_stats = true;
@@ -84,18 +110,17 @@
populate_timestamp(gpr_now(GPR_CLOCK_REALTIME), &req->client_stats.timestamp);
req->client_stats.has_num_calls_started = true;
req->client_stats.has_num_calls_finished = true;
- req->client_stats.has_num_calls_finished_with_drop_for_rate_limiting = true;
- req->client_stats.has_num_calls_finished_with_drop_for_load_balancing = true;
req->client_stats.has_num_calls_finished_with_client_failed_to_send = true;
req->client_stats.has_num_calls_finished_with_client_failed_to_send = true;
req->client_stats.has_num_calls_finished_known_received = true;
- grpc_grpclb_client_stats_get(
+ req->client_stats.calls_finished_with_drop.funcs.encode = encode_drops;
+ grpc_grpclb_client_stats_get_locked(
client_stats, &req->client_stats.num_calls_started,
&req->client_stats.num_calls_finished,
- &req->client_stats.num_calls_finished_with_drop_for_rate_limiting,
- &req->client_stats.num_calls_finished_with_drop_for_load_balancing,
&req->client_stats.num_calls_finished_with_client_failed_to_send,
- &req->client_stats.num_calls_finished_known_received);
+ &req->client_stats.num_calls_finished_known_received,
+ (grpc_grpclb_dropped_call_counts **)&req->client_stats
+ .calls_finished_with_drop.arg);
return req;
}
@@ -117,6 +142,11 @@
}
void grpc_grpclb_request_destroy(grpc_grpclb_request *request) {
+ if (request->has_client_stats) {
+ grpc_grpclb_dropped_call_counts *drop_entries =
+ request->client_stats.calls_finished_with_drop.arg;
+ grpc_grpclb_dropped_call_counts_destroy(drop_entries);
+ }
gpr_free(request);
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
index ef8d563..c4a9849 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
@@ -44,7 +44,7 @@
/** Create a request for a gRPC LB service under \a lb_service_name */
grpc_grpclb_request *grpc_grpclb_request_create(const char *lb_service_name);
-grpc_grpclb_request *grpc_grpclb_load_report_request_create(
+grpc_grpclb_request *grpc_grpclb_load_report_request_create_locked(
grpc_grpclb_client_stats *client_stats);
/** Protocol Buffers v3-encode \a request */
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
index fb119c7..6a5d54c 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
@@ -33,14 +33,19 @@
PB_LAST_FIELD
};
-const pb_field_t grpc_lb_v1_ClientStats_fields[8] = {
+const pb_field_t grpc_lb_v1_ClientStatsPerToken_fields[3] = {
+ PB_FIELD( 1, STRING , OPTIONAL, CALLBACK, FIRST, grpc_lb_v1_ClientStatsPerToken, load_balance_token, load_balance_token, 0),
+ PB_FIELD( 2, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStatsPerToken, num_calls, load_balance_token, 0),
+ PB_LAST_FIELD
+};
+
+const pb_field_t grpc_lb_v1_ClientStats_fields[7] = {
PB_FIELD( 1, MESSAGE , OPTIONAL, STATIC , FIRST, grpc_lb_v1_ClientStats, timestamp, timestamp, &grpc_lb_v1_Timestamp_fields),
PB_FIELD( 2, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_started, timestamp, 0),
PB_FIELD( 3, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished, num_calls_started, 0),
- PB_FIELD( 4, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_with_drop_for_rate_limiting, num_calls_finished, 0),
- PB_FIELD( 5, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_with_drop_for_load_balancing, num_calls_finished_with_drop_for_rate_limiting, 0),
- PB_FIELD( 6, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_with_client_failed_to_send, num_calls_finished_with_drop_for_load_balancing, 0),
+ PB_FIELD( 6, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_with_client_failed_to_send, num_calls_finished, 0),
PB_FIELD( 7, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_known_received, num_calls_finished_with_client_failed_to_send, 0),
+ PB_FIELD( 8, MESSAGE , REPEATED, CALLBACK, OTHER, grpc_lb_v1_ClientStats, calls_finished_with_drop, num_calls_finished_known_received, &grpc_lb_v1_ClientStatsPerToken_fields),
PB_LAST_FIELD
};
@@ -62,12 +67,11 @@
PB_LAST_FIELD
};
-const pb_field_t grpc_lb_v1_Server_fields[6] = {
+const pb_field_t grpc_lb_v1_Server_fields[5] = {
PB_FIELD( 1, BYTES , OPTIONAL, STATIC , FIRST, grpc_lb_v1_Server, ip_address, ip_address, 0),
PB_FIELD( 2, INT32 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, port, ip_address, 0),
PB_FIELD( 3, STRING , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, load_balance_token, port, 0),
- PB_FIELD( 4, BOOL , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, drop_for_rate_limiting, load_balance_token, 0),
- PB_FIELD( 5, BOOL , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, drop_for_load_balancing, drop_for_rate_limiting, 0),
+ PB_FIELD( 4, BOOL , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, drop, load_balance_token, 0),
PB_LAST_FIELD
};
@@ -81,7 +85,7 @@
* numbers or field sizes that are larger than what can fit in 8 or 16 bit
* field descriptors.
*/
-PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 65536 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 65536 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 65536 && pb_membersize(grpc_lb_v1_ServerList, servers) < 65536 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 65536), YOU_MUST_DEFINE_PB_FIELD_32BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server)
+PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 65536 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 65536 && pb_membersize(grpc_lb_v1_ClientStats, calls_finished_with_drop) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 65536 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 65536 && pb_membersize(grpc_lb_v1_ServerList, servers) < 65536 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 65536), YOU_MUST_DEFINE_PB_FIELD_32BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStatsPerToken_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server)
#endif
#if !defined(PB_FIELD_16BIT) && !defined(PB_FIELD_32BIT)
@@ -92,7 +96,7 @@
* numbers or field sizes that are larger than what can fit in the default
* 8 bit descriptors.
*/
-PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 256 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 256 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 256 && pb_membersize(grpc_lb_v1_ServerList, servers) < 256 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 256), YOU_MUST_DEFINE_PB_FIELD_16BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server)
+PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 256 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 256 && pb_membersize(grpc_lb_v1_ClientStats, calls_finished_with_drop) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 256 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 256 && pb_membersize(grpc_lb_v1_ServerList, servers) < 256 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 256), YOU_MUST_DEFINE_PB_FIELD_16BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStatsPerToken_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server)
#endif
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
index d3ae919..93333d1 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
@@ -14,6 +14,13 @@
#endif
/* Struct definitions */
+typedef struct _grpc_lb_v1_ClientStatsPerToken {
+ pb_callback_t load_balance_token;
+ bool has_num_calls;
+ int64_t num_calls;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_ClientStatsPerToken) */
+} grpc_lb_v1_ClientStatsPerToken;
+
typedef struct _grpc_lb_v1_Duration {
bool has_seconds;
int64_t seconds;
@@ -36,10 +43,8 @@
int32_t port;
bool has_load_balance_token;
char load_balance_token[50];
- bool has_drop_for_rate_limiting;
- bool drop_for_rate_limiting;
- bool has_drop_for_load_balancing;
- bool drop_for_load_balancing;
+ bool has_drop;
+ bool drop;
/* @@protoc_insertion_point(struct:grpc_lb_v1_Server) */
} grpc_lb_v1_Server;
@@ -58,14 +63,11 @@
int64_t num_calls_started;
bool has_num_calls_finished;
int64_t num_calls_finished;
- bool has_num_calls_finished_with_drop_for_rate_limiting;
- int64_t num_calls_finished_with_drop_for_rate_limiting;
- bool has_num_calls_finished_with_drop_for_load_balancing;
- int64_t num_calls_finished_with_drop_for_load_balancing;
bool has_num_calls_finished_with_client_failed_to_send;
int64_t num_calls_finished_with_client_failed_to_send;
bool has_num_calls_finished_known_received;
int64_t num_calls_finished_known_received;
+ pb_callback_t calls_finished_with_drop;
/* @@protoc_insertion_point(struct:grpc_lb_v1_ClientStats) */
} grpc_lb_v1_ClientStats;
@@ -107,39 +109,41 @@
#define grpc_lb_v1_Timestamp_init_default {false, 0, false, 0}
#define grpc_lb_v1_LoadBalanceRequest_init_default {false, grpc_lb_v1_InitialLoadBalanceRequest_init_default, false, grpc_lb_v1_ClientStats_init_default}
#define grpc_lb_v1_InitialLoadBalanceRequest_init_default {false, ""}
-#define grpc_lb_v1_ClientStats_init_default {false, grpc_lb_v1_Timestamp_init_default, false, 0, false, 0, false, 0, false, 0, false, 0, false, 0}
+#define grpc_lb_v1_ClientStatsPerToken_init_default {{{NULL}, NULL}, false, 0}
+#define grpc_lb_v1_ClientStats_init_default {false, grpc_lb_v1_Timestamp_init_default, false, 0, false, 0, false, 0, false, 0, {{NULL}, NULL}}
#define grpc_lb_v1_LoadBalanceResponse_init_default {false, grpc_lb_v1_InitialLoadBalanceResponse_init_default, false, grpc_lb_v1_ServerList_init_default}
#define grpc_lb_v1_InitialLoadBalanceResponse_init_default {false, "", false, grpc_lb_v1_Duration_init_default}
#define grpc_lb_v1_ServerList_init_default {{{NULL}, NULL}, false, grpc_lb_v1_Duration_init_default}
-#define grpc_lb_v1_Server_init_default {false, {0, {0}}, false, 0, false, "", false, 0, false, 0}
+#define grpc_lb_v1_Server_init_default {false, {0, {0}}, false, 0, false, "", false, 0}
#define grpc_lb_v1_Duration_init_zero {false, 0, false, 0}
#define grpc_lb_v1_Timestamp_init_zero {false, 0, false, 0}
#define grpc_lb_v1_LoadBalanceRequest_init_zero {false, grpc_lb_v1_InitialLoadBalanceRequest_init_zero, false, grpc_lb_v1_ClientStats_init_zero}
#define grpc_lb_v1_InitialLoadBalanceRequest_init_zero {false, ""}
-#define grpc_lb_v1_ClientStats_init_zero {false, grpc_lb_v1_Timestamp_init_zero, false, 0, false, 0, false, 0, false, 0, false, 0, false, 0}
+#define grpc_lb_v1_ClientStatsPerToken_init_zero {{{NULL}, NULL}, false, 0}
+#define grpc_lb_v1_ClientStats_init_zero {false, grpc_lb_v1_Timestamp_init_zero, false, 0, false, 0, false, 0, false, 0, {{NULL}, NULL}}
#define grpc_lb_v1_LoadBalanceResponse_init_zero {false, grpc_lb_v1_InitialLoadBalanceResponse_init_zero, false, grpc_lb_v1_ServerList_init_zero}
#define grpc_lb_v1_InitialLoadBalanceResponse_init_zero {false, "", false, grpc_lb_v1_Duration_init_zero}
#define grpc_lb_v1_ServerList_init_zero {{{NULL}, NULL}, false, grpc_lb_v1_Duration_init_zero}
-#define grpc_lb_v1_Server_init_zero {false, {0, {0}}, false, 0, false, "", false, 0, false, 0}
+#define grpc_lb_v1_Server_init_zero {false, {0, {0}}, false, 0, false, "", false, 0}
/* Field tags (for use in manual encoding/decoding) */
+#define grpc_lb_v1_ClientStatsPerToken_load_balance_token_tag 1
+#define grpc_lb_v1_ClientStatsPerToken_num_calls_tag 2
#define grpc_lb_v1_Duration_seconds_tag 1
#define grpc_lb_v1_Duration_nanos_tag 2
#define grpc_lb_v1_InitialLoadBalanceRequest_name_tag 1
#define grpc_lb_v1_Server_ip_address_tag 1
#define grpc_lb_v1_Server_port_tag 2
#define grpc_lb_v1_Server_load_balance_token_tag 3
-#define grpc_lb_v1_Server_drop_for_rate_limiting_tag 4
-#define grpc_lb_v1_Server_drop_for_load_balancing_tag 5
+#define grpc_lb_v1_Server_drop_tag 4
#define grpc_lb_v1_Timestamp_seconds_tag 1
#define grpc_lb_v1_Timestamp_nanos_tag 2
#define grpc_lb_v1_ClientStats_timestamp_tag 1
#define grpc_lb_v1_ClientStats_num_calls_started_tag 2
#define grpc_lb_v1_ClientStats_num_calls_finished_tag 3
-#define grpc_lb_v1_ClientStats_num_calls_finished_with_drop_for_rate_limiting_tag 4
-#define grpc_lb_v1_ClientStats_num_calls_finished_with_drop_for_load_balancing_tag 5
#define grpc_lb_v1_ClientStats_num_calls_finished_with_client_failed_to_send_tag 6
#define grpc_lb_v1_ClientStats_num_calls_finished_known_received_tag 7
+#define grpc_lb_v1_ClientStats_calls_finished_with_drop_tag 8
#define grpc_lb_v1_InitialLoadBalanceResponse_load_balancer_delegate_tag 1
#define grpc_lb_v1_InitialLoadBalanceResponse_client_stats_report_interval_tag 2
#define grpc_lb_v1_ServerList_servers_tag 1
@@ -154,22 +158,24 @@
extern const pb_field_t grpc_lb_v1_Timestamp_fields[3];
extern const pb_field_t grpc_lb_v1_LoadBalanceRequest_fields[3];
extern const pb_field_t grpc_lb_v1_InitialLoadBalanceRequest_fields[2];
-extern const pb_field_t grpc_lb_v1_ClientStats_fields[8];
+extern const pb_field_t grpc_lb_v1_ClientStatsPerToken_fields[3];
+extern const pb_field_t grpc_lb_v1_ClientStats_fields[7];
extern const pb_field_t grpc_lb_v1_LoadBalanceResponse_fields[3];
extern const pb_field_t grpc_lb_v1_InitialLoadBalanceResponse_fields[3];
extern const pb_field_t grpc_lb_v1_ServerList_fields[3];
-extern const pb_field_t grpc_lb_v1_Server_fields[6];
+extern const pb_field_t grpc_lb_v1_Server_fields[5];
/* Maximum encoded size of messages (where known) */
#define grpc_lb_v1_Duration_size 22
#define grpc_lb_v1_Timestamp_size 22
-#define grpc_lb_v1_LoadBalanceRequest_size 226
+#define grpc_lb_v1_LoadBalanceRequest_size (140 + grpc_lb_v1_ClientStats_size)
#define grpc_lb_v1_InitialLoadBalanceRequest_size 131
-#define grpc_lb_v1_ClientStats_size 90
+/* grpc_lb_v1_ClientStatsPerToken_size depends on runtime parameters */
+/* grpc_lb_v1_ClientStats_size depends on runtime parameters */
#define grpc_lb_v1_LoadBalanceResponse_size (98 + grpc_lb_v1_ServerList_size)
#define grpc_lb_v1_InitialLoadBalanceResponse_size 90
/* grpc_lb_v1_ServerList_size depends on runtime parameters */
-#define grpc_lb_v1_Server_size 85
+#define grpc_lb_v1_Server_size 83
/* Message IDs (where set with "msgid" option) */
#ifdef PB_MSGID
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
index bc40165..a7f7e95 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
@@ -74,6 +74,9 @@
bool started_picking;
/** are we shutting down? */
bool shutdown;
+ /** has the policy gotten into the GRPC_CHANNEL_SHUTDOWN? No picks can be
+ * service after this point, the policy will never transition out. */
+ bool in_connectivity_shutdown;
/** List of picks that are waiting on connectivity */
pending_pick *pending_picks;
@@ -420,6 +423,8 @@
grpc_call_context_element *context, void **user_data,
grpc_closure *on_complete) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
+ GPR_ASSERT(!p->shutdown);
+ GPR_ASSERT(!p->in_connectivity_shutdown);
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] Trying to pick", (void *)pol);
}
@@ -532,6 +537,7 @@
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error),
"rr_shutdown");
+ p->in_connectivity_shutdown = true;
new_state = GRPC_CHANNEL_SHUTDOWN;
} else if (subchannel_list->num_transient_failures ==
p->subchannel_list->num_subchannels) { /* 4) TRANSIENT_FAILURE */
diff --git a/src/core/ext/filters/http/client/http_client_filter.c b/src/core/ext/filters/http/client/http_client_filter.c
index 90f0aed..3ca01a4 100644
--- a/src/core/ext/filters/http/client/http_client_filter.c
+++ b/src/core/ext/filters/http/client/http_client_filter.c
@@ -36,41 +36,29 @@
static const size_t kMaxPayloadSizeForGet = 2048;
typedef struct call_data {
+ // State for handling send_initial_metadata ops.
grpc_linked_mdelem method;
grpc_linked_mdelem scheme;
grpc_linked_mdelem authority;
grpc_linked_mdelem te_trailers;
grpc_linked_mdelem content_type;
grpc_linked_mdelem user_agent;
-
+ // State for handling recv_initial_metadata ops.
grpc_metadata_batch *recv_initial_metadata;
+ grpc_closure *original_recv_initial_metadata_ready;
+ grpc_closure recv_initial_metadata_ready;
+ // State for handling recv_trailing_metadata ops.
grpc_metadata_batch *recv_trailing_metadata;
- uint8_t *payload_bytes;
-
- /* Vars to read data off of send_message */
- grpc_transport_stream_op_batch *send_op;
- uint32_t send_length;
- uint32_t send_flags;
- grpc_slice incoming_slice;
- grpc_slice_buffer_stream replacement_stream;
- grpc_slice_buffer slices;
- /* flag that indicates that all slices of send_messages aren't availble */
- bool send_message_blocked;
-
- /** Closure to call when finished with the hc_on_recv hook */
- grpc_closure *on_done_recv_initial_metadata;
- grpc_closure *on_done_recv_trailing_metadata;
- grpc_closure *on_complete;
- grpc_closure *post_send;
-
- /** Receive closures are chained: we inject this closure as the on_done_recv
- up-call on transport_op, and remember to call our on_done_recv member
- after handling it. */
- grpc_closure hc_on_recv_initial_metadata;
- grpc_closure hc_on_recv_trailing_metadata;
- grpc_closure hc_on_complete;
- grpc_closure got_slice;
- grpc_closure send_done;
+ grpc_closure *original_recv_trailing_metadata_on_complete;
+ grpc_closure recv_trailing_metadata_on_complete;
+ // State for handling send_message ops.
+ grpc_transport_stream_op_batch *send_message_batch;
+ size_t send_message_bytes_read;
+ grpc_byte_stream_cache send_message_cache;
+ grpc_caching_byte_stream send_message_caching_stream;
+ grpc_closure on_send_message_next_done;
+ grpc_closure *original_send_message_on_complete;
+ grpc_closure send_message_on_complete;
} call_data;
typedef struct channel_data {
@@ -148,7 +136,7 @@
return GRPC_ERROR_NONE;
}
-static void hc_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx,
+static void recv_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
void *user_data, grpc_error *error) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
@@ -158,11 +146,13 @@
} else {
GRPC_ERROR_REF(error);
}
- GRPC_CLOSURE_RUN(exec_ctx, calld->on_done_recv_initial_metadata, error);
+ GRPC_CLOSURE_RUN(exec_ctx, calld->original_recv_initial_metadata_ready,
+ error);
}
-static void hc_on_recv_trailing_metadata(grpc_exec_ctx *exec_ctx,
- void *user_data, grpc_error *error) {
+static void recv_trailing_metadata_on_complete(grpc_exec_ctx *exec_ctx,
+ void *user_data,
+ grpc_error *error) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
if (error == GRPC_ERROR_NONE) {
@@ -171,25 +161,131 @@
} else {
GRPC_ERROR_REF(error);
}
- GRPC_CLOSURE_RUN(exec_ctx, calld->on_done_recv_trailing_metadata, error);
+ GRPC_CLOSURE_RUN(exec_ctx, calld->original_recv_trailing_metadata_on_complete,
+ error);
}
-static void hc_on_complete(grpc_exec_ctx *exec_ctx, void *user_data,
- grpc_error *error) {
- grpc_call_element *elem = user_data;
- call_data *calld = elem->call_data;
- if (calld->payload_bytes) {
- gpr_free(calld->payload_bytes);
- calld->payload_bytes = NULL;
+static void send_message_on_complete(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_call_element *elem = (grpc_call_element *)arg;
+ call_data *calld = (call_data *)elem->call_data;
+ grpc_byte_stream_cache_destroy(exec_ctx, &calld->send_message_cache);
+ GRPC_CLOSURE_RUN(exec_ctx, calld->original_send_message_on_complete,
+ GRPC_ERROR_REF(error));
+}
+
+// Pulls a slice from the send_message byte stream, updating
+// calld->send_message_bytes_read.
+static grpc_error *pull_slice_from_send_message(grpc_exec_ctx *exec_ctx,
+ call_data *calld) {
+ grpc_slice incoming_slice;
+ grpc_error *error = grpc_byte_stream_pull(
+ exec_ctx, &calld->send_message_caching_stream.base, &incoming_slice);
+ if (error == GRPC_ERROR_NONE) {
+ calld->send_message_bytes_read += GRPC_SLICE_LENGTH(incoming_slice);
+ grpc_slice_unref_internal(exec_ctx, incoming_slice);
}
- calld->on_complete->cb(exec_ctx, calld->on_complete->cb_arg, error);
+ return error;
}
-static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
- grpc_call_element *elem = elemp;
- call_data *calld = elem->call_data;
- grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &calld->slices);
- calld->post_send->cb(exec_ctx, calld->post_send->cb_arg, error);
+// Reads as many slices as possible from the send_message byte stream.
+// Upon successful return, if calld->send_message_bytes_read ==
+// calld->send_message_caching_stream.base.length, then we have completed
+// reading from the byte stream; otherwise, an async read has been dispatched
+// and on_send_message_next_done() will be invoked when it is complete.
+static grpc_error *read_all_available_send_message_data(grpc_exec_ctx *exec_ctx,
+ call_data *calld) {
+ while (grpc_byte_stream_next(exec_ctx,
+ &calld->send_message_caching_stream.base,
+ ~(size_t)0, &calld->on_send_message_next_done)) {
+ grpc_error *error = pull_slice_from_send_message(exec_ctx, calld);
+ if (error != GRPC_ERROR_NONE) return error;
+ if (calld->send_message_bytes_read ==
+ calld->send_message_caching_stream.base.length) {
+ break;
+ }
+ }
+ return GRPC_ERROR_NONE;
+}
+
+// Async callback for grpc_byte_stream_next().
+static void on_send_message_next_done(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_call_element *elem = (grpc_call_element *)arg;
+ call_data *calld = (call_data *)elem->call_data;
+ if (error != GRPC_ERROR_NONE) {
+ grpc_transport_stream_op_batch_finish_with_failure(
+ exec_ctx, calld->send_message_batch, error);
+ return;
+ }
+ error = pull_slice_from_send_message(exec_ctx, calld);
+ if (error != GRPC_ERROR_NONE) {
+ grpc_transport_stream_op_batch_finish_with_failure(
+ exec_ctx, calld->send_message_batch, error);
+ return;
+ }
+ // There may or may not be more to read, but we don't care. If we got
+ // here, then we know that all of the data was not available
+ // synchronously, so we were not able to do a cached call. Instead,
+ // we just reset the byte stream and then send down the batch as-is.
+ grpc_caching_byte_stream_reset(&calld->send_message_caching_stream);
+ grpc_call_next_op(exec_ctx, elem, calld->send_message_batch);
+}
+
+static char *slice_buffer_to_string(grpc_slice_buffer *slice_buffer) {
+ char *payload_bytes = gpr_malloc(slice_buffer->length + 1);
+ size_t offset = 0;
+ for (size_t i = 0; i < slice_buffer->count; ++i) {
+ memcpy(payload_bytes + offset,
+ GRPC_SLICE_START_PTR(slice_buffer->slices[i]),
+ GRPC_SLICE_LENGTH(slice_buffer->slices[i]));
+ offset += GRPC_SLICE_LENGTH(slice_buffer->slices[i]);
+ }
+ *(payload_bytes + offset) = '\0';
+ return payload_bytes;
+}
+
+// Modifies the path entry in the batch's send_initial_metadata to
+// append the base64-encoded query for a GET request.
+static grpc_error *update_path_for_get(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_transport_stream_op_batch *batch) {
+ call_data *calld = (call_data *)elem->call_data;
+ grpc_slice path_slice =
+ GRPC_MDVALUE(batch->payload->send_initial_metadata.send_initial_metadata
+ ->idx.named.path->md);
+ /* sum up individual component's lengths and allocate enough memory to
+ * hold combined path+query */
+ size_t estimated_len = GRPC_SLICE_LENGTH(path_slice);
+ estimated_len++; /* for the '?' */
+ estimated_len += grpc_base64_estimate_encoded_size(
+ batch->payload->send_message.send_message->length, true /* url_safe */,
+ false /* multi_line */);
+ grpc_slice path_with_query_slice = GRPC_SLICE_MALLOC(estimated_len);
+ /* memcopy individual pieces into this slice */
+ char *write_ptr = (char *)GRPC_SLICE_START_PTR(path_with_query_slice);
+ char *original_path = (char *)GRPC_SLICE_START_PTR(path_slice);
+ memcpy(write_ptr, original_path, GRPC_SLICE_LENGTH(path_slice));
+ write_ptr += GRPC_SLICE_LENGTH(path_slice);
+ *write_ptr++ = '?';
+ char *payload_bytes =
+ slice_buffer_to_string(&calld->send_message_cache.cache_buffer);
+ grpc_base64_encode_core((char *)write_ptr, payload_bytes,
+ batch->payload->send_message.send_message->length,
+ true /* url_safe */, false /* multi_line */);
+ gpr_free(payload_bytes);
+ /* remove trailing unused memory and add trailing 0 to terminate string */
+ char *t = (char *)GRPC_SLICE_START_PTR(path_with_query_slice);
+ /* safe to use strlen since base64_encode will always add '\0' */
+ path_with_query_slice =
+ grpc_slice_sub_no_ref(path_with_query_slice, 0, strlen(t));
+ /* substitute previous path with the new path+query */
+ grpc_mdelem mdelem_path_and_query =
+ grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_PATH, path_with_query_slice);
+ grpc_metadata_batch *b =
+ batch->payload->send_initial_metadata.send_initial_metadata;
+ return grpc_metadata_batch_substitute(exec_ctx, b, b->idx.named.path,
+ mdelem_path_and_query);
}
static void remove_if_present(grpc_exec_ctx *exec_ctx,
@@ -200,273 +296,153 @@
}
}
-static void continue_send_message(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem) {
- call_data *calld = elem->call_data;
- uint8_t *wrptr = calld->payload_bytes;
- while (grpc_byte_stream_next(
- exec_ctx, calld->send_op->payload->send_message.send_message, ~(size_t)0,
- &calld->got_slice)) {
- grpc_byte_stream_pull(exec_ctx,
- calld->send_op->payload->send_message.send_message,
- &calld->incoming_slice);
- if (GRPC_SLICE_LENGTH(calld->incoming_slice) > 0) {
- memcpy(wrptr, GRPC_SLICE_START_PTR(calld->incoming_slice),
- GRPC_SLICE_LENGTH(calld->incoming_slice));
- }
- wrptr += GRPC_SLICE_LENGTH(calld->incoming_slice);
- grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
- if (calld->send_length == calld->slices.length) {
- calld->send_message_blocked = false;
- break;
- }
- }
-}
-
-static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
- grpc_call_element *elem = elemp;
- call_data *calld = elem->call_data;
- calld->send_message_blocked = false;
- if (GRPC_ERROR_NONE !=
- grpc_byte_stream_pull(exec_ctx,
- calld->send_op->payload->send_message.send_message,
- &calld->incoming_slice)) {
- /* Should never reach here */
- abort();
- }
- grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
- if (calld->send_length == calld->slices.length) {
- /* Pass down the original send_message op that was blocked.*/
- grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices,
- calld->send_flags);
- calld->send_op->payload->send_message.send_message =
- &calld->replacement_stream.base;
- calld->post_send = calld->send_op->on_complete;
- calld->send_op->on_complete = &calld->send_done;
- grpc_call_next_op(exec_ctx, elem, calld->send_op);
- } else {
- continue_send_message(exec_ctx, elem);
- }
-}
-
-static grpc_error *hc_mutate_op(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_transport_stream_op_batch *op) {
- /* grab pointers to our data from the call element */
+static void hc_start_transport_stream_op_batch(
+ grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ grpc_transport_stream_op_batch *batch) {
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
- grpc_error *error;
+ GPR_TIMER_BEGIN("hc_start_transport_stream_op_batch", 0);
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, batch);
- if (op->send_initial_metadata) {
- /* Decide which HTTP VERB to use. We use GET if the request is marked
- cacheable, and the operation contains both initial metadata and send
- message, and the payload is below the size threshold, and all the data
- for this request is immediately available. */
+ if (batch->recv_initial_metadata) {
+ /* substitute our callback for the higher callback */
+ calld->recv_initial_metadata =
+ batch->payload->recv_initial_metadata.recv_initial_metadata;
+ calld->original_recv_initial_metadata_ready =
+ batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
+ batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
+ &calld->recv_initial_metadata_ready;
+ }
+
+ if (batch->recv_trailing_metadata) {
+ /* substitute our callback for the higher callback */
+ calld->recv_trailing_metadata =
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata;
+ calld->original_recv_trailing_metadata_on_complete = batch->on_complete;
+ batch->on_complete = &calld->recv_trailing_metadata_on_complete;
+ }
+
+ grpc_error *error = GRPC_ERROR_NONE;
+ bool batch_will_be_handled_asynchronously = false;
+ if (batch->send_initial_metadata) {
+ // Decide which HTTP VERB to use. We use GET if the request is marked
+ // cacheable, and the operation contains both initial metadata and send
+ // message, and the payload is below the size threshold, and all the data
+ // for this request is immediately available.
grpc_mdelem method = GRPC_MDELEM_METHOD_POST;
- if (op->send_message &&
- (op->payload->send_initial_metadata.send_initial_metadata_flags &
+ if (batch->send_message &&
+ (batch->payload->send_initial_metadata.send_initial_metadata_flags &
GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) &&
- op->payload->send_message.send_message->length <
+ batch->payload->send_message.send_message->length <
channeld->max_payload_size_for_get) {
- method = GRPC_MDELEM_METHOD_GET;
- /* The following write to calld->send_message_blocked isn't racy with
- reads in hc_start_transport_op (which deals with SEND_MESSAGE ops) because
- being here means ops->send_message is not NULL, which is primarily
- guarding the read there. */
- calld->send_message_blocked = true;
- } else if (op->payload->send_initial_metadata.send_initial_metadata_flags &
+ calld->send_message_bytes_read = 0;
+ grpc_byte_stream_cache_init(&calld->send_message_cache,
+ batch->payload->send_message.send_message);
+ grpc_caching_byte_stream_init(&calld->send_message_caching_stream,
+ &calld->send_message_cache);
+ batch->payload->send_message.send_message =
+ &calld->send_message_caching_stream.base;
+ calld->original_send_message_on_complete = batch->on_complete;
+ batch->on_complete = &calld->send_message_on_complete;
+ calld->send_message_batch = batch;
+ error = read_all_available_send_message_data(exec_ctx, calld);
+ if (error != GRPC_ERROR_NONE) goto done;
+ // If all the data has been read, then we can use GET.
+ if (calld->send_message_bytes_read ==
+ calld->send_message_caching_stream.base.length) {
+ method = GRPC_MDELEM_METHOD_GET;
+ error = update_path_for_get(exec_ctx, elem, batch);
+ if (error != GRPC_ERROR_NONE) goto done;
+ batch->send_message = false;
+ grpc_byte_stream_destroy(exec_ctx,
+ &calld->send_message_caching_stream.base);
+ } else {
+ // Not all data is available. The batch will be sent down
+ // asynchronously in on_send_message_next_done().
+ batch_will_be_handled_asynchronously = true;
+ // Fall back to POST.
+ gpr_log(GPR_DEBUG,
+ "Request is marked Cacheable but not all data is available. "
+ "Falling back to POST");
+ }
+ } else if (batch->payload->send_initial_metadata
+ .send_initial_metadata_flags &
GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) {
method = GRPC_MDELEM_METHOD_PUT;
}
- /* Attempt to read the data from send_message and create a header field. */
- if (grpc_mdelem_eq(method, GRPC_MDELEM_METHOD_GET)) {
- /* allocate memory to hold the entire payload */
- calld->payload_bytes =
- gpr_malloc(op->payload->send_message.send_message->length);
-
- /* read slices of send_message and copy into payload_bytes */
- calld->send_op = op;
- calld->send_length = op->payload->send_message.send_message->length;
- calld->send_flags = op->payload->send_message.send_message->flags;
- continue_send_message(exec_ctx, elem);
-
- if (calld->send_message_blocked == false) {
- /* when all the send_message data is available, then modify the path
- * MDELEM by appending base64 encoded query to the path */
- const int k_url_safe = 1;
- const int k_multi_line = 0;
- const unsigned char k_query_separator = '?';
-
- grpc_slice path_slice =
- GRPC_MDVALUE(op->payload->send_initial_metadata
- .send_initial_metadata->idx.named.path->md);
- /* sum up individual component's lengths and allocate enough memory to
- * hold combined path+query */
- size_t estimated_len = GRPC_SLICE_LENGTH(path_slice);
- estimated_len++; /* for the '?' */
- estimated_len += grpc_base64_estimate_encoded_size(
- op->payload->send_message.send_message->length, k_url_safe,
- k_multi_line);
- grpc_slice path_with_query_slice = GRPC_SLICE_MALLOC(estimated_len);
-
- /* memcopy individual pieces into this slice */
- uint8_t *write_ptr =
- (uint8_t *)GRPC_SLICE_START_PTR(path_with_query_slice);
- uint8_t *original_path = (uint8_t *)GRPC_SLICE_START_PTR(path_slice);
- memcpy(write_ptr, original_path, GRPC_SLICE_LENGTH(path_slice));
- write_ptr += GRPC_SLICE_LENGTH(path_slice);
-
- *write_ptr = k_query_separator;
- write_ptr++; /* for the '?' */
-
- grpc_base64_encode_core((char *)write_ptr, calld->payload_bytes,
- op->payload->send_message.send_message->length,
- k_url_safe, k_multi_line);
-
- /* remove trailing unused memory and add trailing 0 to terminate string
- */
- char *t = (char *)GRPC_SLICE_START_PTR(path_with_query_slice);
- /* safe to use strlen since base64_encode will always add '\0' */
- path_with_query_slice =
- grpc_slice_sub_no_ref(path_with_query_slice, 0, strlen(t));
-
- /* substitute previous path with the new path+query */
- grpc_mdelem mdelem_path_and_query = grpc_mdelem_from_slices(
- exec_ctx, GRPC_MDSTR_PATH, path_with_query_slice);
- grpc_metadata_batch *b =
- op->payload->send_initial_metadata.send_initial_metadata;
- error = grpc_metadata_batch_substitute(exec_ctx, b, b->idx.named.path,
- mdelem_path_and_query);
- if (error != GRPC_ERROR_NONE) return error;
-
- calld->on_complete = op->on_complete;
- op->on_complete = &calld->hc_on_complete;
- op->send_message = false;
- } else {
- /* Not all data is available. Fall back to POST. */
- gpr_log(GPR_DEBUG,
- "Request is marked Cacheable but not all data is available.\
- Falling back to POST");
- method = GRPC_MDELEM_METHOD_POST;
- }
- }
-
- remove_if_present(exec_ctx,
- op->payload->send_initial_metadata.send_initial_metadata,
- GRPC_BATCH_METHOD);
- remove_if_present(exec_ctx,
- op->payload->send_initial_metadata.send_initial_metadata,
- GRPC_BATCH_SCHEME);
- remove_if_present(exec_ctx,
- op->payload->send_initial_metadata.send_initial_metadata,
- GRPC_BATCH_TE);
- remove_if_present(exec_ctx,
- op->payload->send_initial_metadata.send_initial_metadata,
- GRPC_BATCH_CONTENT_TYPE);
- remove_if_present(exec_ctx,
- op->payload->send_initial_metadata.send_initial_metadata,
- GRPC_BATCH_USER_AGENT);
+ remove_if_present(
+ exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata,
+ GRPC_BATCH_METHOD);
+ remove_if_present(
+ exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata,
+ GRPC_BATCH_SCHEME);
+ remove_if_present(
+ exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata,
+ GRPC_BATCH_TE);
+ remove_if_present(
+ exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata,
+ GRPC_BATCH_CONTENT_TYPE);
+ remove_if_present(
+ exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata,
+ GRPC_BATCH_USER_AGENT);
/* Send : prefixed headers, which have to be before any application
layer headers. */
error = grpc_metadata_batch_add_head(
- exec_ctx, op->payload->send_initial_metadata.send_initial_metadata,
+ exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata,
&calld->method, method);
- if (error != GRPC_ERROR_NONE) return error;
+ if (error != GRPC_ERROR_NONE) goto done;
error = grpc_metadata_batch_add_head(
- exec_ctx, op->payload->send_initial_metadata.send_initial_metadata,
+ exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata,
&calld->scheme, channeld->static_scheme);
- if (error != GRPC_ERROR_NONE) return error;
+ if (error != GRPC_ERROR_NONE) goto done;
error = grpc_metadata_batch_add_tail(
- exec_ctx, op->payload->send_initial_metadata.send_initial_metadata,
+ exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata,
&calld->te_trailers, GRPC_MDELEM_TE_TRAILERS);
- if (error != GRPC_ERROR_NONE) return error;
+ if (error != GRPC_ERROR_NONE) goto done;
error = grpc_metadata_batch_add_tail(
- exec_ctx, op->payload->send_initial_metadata.send_initial_metadata,
+ exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata,
&calld->content_type, GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC);
- if (error != GRPC_ERROR_NONE) return error;
+ if (error != GRPC_ERROR_NONE) goto done;
error = grpc_metadata_batch_add_tail(
- exec_ctx, op->payload->send_initial_metadata.send_initial_metadata,
+ exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata,
&calld->user_agent, GRPC_MDELEM_REF(channeld->user_agent));
- if (error != GRPC_ERROR_NONE) return error;
+ if (error != GRPC_ERROR_NONE) goto done;
}
- if (op->recv_initial_metadata) {
- /* substitute our callback for the higher callback */
- calld->recv_initial_metadata =
- op->payload->recv_initial_metadata.recv_initial_metadata;
- calld->on_done_recv_initial_metadata =
- op->payload->recv_initial_metadata.recv_initial_metadata_ready;
- op->payload->recv_initial_metadata.recv_initial_metadata_ready =
- &calld->hc_on_recv_initial_metadata;
- }
-
- if (op->recv_trailing_metadata) {
- /* substitute our callback for the higher callback */
- calld->recv_trailing_metadata =
- op->payload->recv_trailing_metadata.recv_trailing_metadata;
- calld->on_done_recv_trailing_metadata = op->on_complete;
- op->on_complete = &calld->hc_on_recv_trailing_metadata;
- }
-
- return GRPC_ERROR_NONE;
-}
-
-static void hc_start_transport_op(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_transport_stream_op_batch *op) {
- GPR_TIMER_BEGIN("hc_start_transport_op", 0);
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- grpc_error *error = hc_mutate_op(exec_ctx, elem, op);
+done:
if (error != GRPC_ERROR_NONE) {
- grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
- } else {
- call_data *calld = elem->call_data;
- if (op->send_message && calld->send_message_blocked) {
- /* Don't forward the op. send_message contains slices that aren't ready
- yet. The call will be forwarded by the op_complete of slice read call.
- */
- } else {
- grpc_call_next_op(exec_ctx, elem, op);
- }
+ grpc_transport_stream_op_batch_finish_with_failure(
+ exec_ctx, calld->send_message_batch, error);
+ } else if (!batch_will_be_handled_asynchronously) {
+ grpc_call_next_op(exec_ctx, elem, batch);
}
- GPR_TIMER_END("hc_start_transport_op", 0);
+ GPR_TIMER_END("hc_start_transport_stream_op_batch", 0);
}
/* Constructor for call_data */
static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
const grpc_call_element_args *args) {
- call_data *calld = elem->call_data;
- calld->on_done_recv_initial_metadata = NULL;
- calld->on_done_recv_trailing_metadata = NULL;
- calld->on_complete = NULL;
- calld->payload_bytes = NULL;
- calld->send_message_blocked = false;
- grpc_slice_buffer_init(&calld->slices);
- GRPC_CLOSURE_INIT(&calld->hc_on_recv_initial_metadata,
- hc_on_recv_initial_metadata, elem,
+ call_data *calld = (call_data *)elem->call_data;
+ GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
+ recv_initial_metadata_ready, elem,
grpc_schedule_on_exec_ctx);
- GRPC_CLOSURE_INIT(&calld->hc_on_recv_trailing_metadata,
- hc_on_recv_trailing_metadata, elem,
+ GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_on_complete,
+ recv_trailing_metadata_on_complete, elem,
grpc_schedule_on_exec_ctx);
- GRPC_CLOSURE_INIT(&calld->hc_on_complete, hc_on_complete, elem,
- grpc_schedule_on_exec_ctx);
- GRPC_CLOSURE_INIT(&calld->got_slice, got_slice, elem,
- grpc_schedule_on_exec_ctx);
- GRPC_CLOSURE_INIT(&calld->send_done, send_done, elem,
- grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&calld->send_message_on_complete, send_message_on_complete,
+ elem, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&calld->on_send_message_next_done,
+ on_send_message_next_done, elem, grpc_schedule_on_exec_ctx);
return GRPC_ERROR_NONE;
}
/* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const grpc_call_final_info *final_info,
- grpc_closure *ignored) {
- call_data *calld = elem->call_data;
- grpc_slice_buffer_destroy_internal(exec_ctx, &calld->slices);
-}
+ grpc_closure *ignored) {}
static grpc_mdelem scheme_from_args(const grpc_channel_args *args) {
unsigned i;
@@ -580,7 +556,7 @@
}
const grpc_channel_filter grpc_http_client_filter = {
- hc_start_transport_op,
+ hc_start_transport_stream_op_batch,
grpc_channel_next_op,
sizeof(call_data),
init_call_elem,
diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.c b/src/core/ext/filters/http/message_compress/message_compress_filter.c
index 71a8bc5..20a3488 100644
--- a/src/core/ext/filters/http/message_compress/message_compress_filter.c
+++ b/src/core/ext/filters/http/message_compress/message_compress_filter.c
@@ -61,14 +61,11 @@
pointer | CANCELLED_BIT - request was cancelled with error pointed to */
gpr_atm send_initial_metadata_state;
- grpc_transport_stream_op_batch *send_op;
- uint32_t send_length;
- uint32_t send_flags;
- grpc_slice incoming_slice;
+ grpc_transport_stream_op_batch *send_message_batch;
grpc_slice_buffer_stream replacement_stream;
- grpc_closure *post_send;
- grpc_closure send_done;
- grpc_closure got_slice;
+ grpc_closure *original_send_message_on_complete;
+ grpc_closure send_message_on_complete;
+ grpc_closure on_send_message_next_done;
} call_data;
typedef struct channel_data {
@@ -164,24 +161,25 @@
return error;
}
-static void continue_send_message(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem);
-
-static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
- grpc_call_element *elem = elemp;
- call_data *calld = elem->call_data;
+static void send_message_on_complete(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_call_element *elem = (grpc_call_element *)arg;
+ call_data *calld = (call_data *)elem->call_data;
grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &calld->slices);
- calld->post_send->cb(exec_ctx, calld->post_send->cb_arg, error);
+ GRPC_CLOSURE_RUN(exec_ctx, calld->original_send_message_on_complete,
+ GRPC_ERROR_REF(error));
}
static void finish_send_message(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
- call_data *calld = elem->call_data;
- int did_compress;
+ call_data *calld = (call_data *)elem->call_data;
+ // Compress the data if appropriate.
grpc_slice_buffer tmp;
grpc_slice_buffer_init(&tmp);
- did_compress = grpc_msg_compress(exec_ctx, calld->compression_algorithm,
- &calld->slices, &tmp);
+ uint32_t send_flags =
+ calld->send_message_batch->payload->send_message.send_message->flags;
+ const bool did_compress = grpc_msg_compress(
+ exec_ctx, calld->compression_algorithm, &calld->slices, &tmp);
if (did_compress) {
if (GRPC_TRACER_ON(grpc_compression_trace)) {
char *algo_name;
@@ -195,7 +193,7 @@
algo_name, before_size, after_size, 100 * savings_ratio);
}
grpc_slice_buffer_swap(&calld->slices, &tmp);
- calld->send_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
+ send_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
} else {
if (GRPC_TRACER_ON(grpc_compression_trace)) {
char *algo_name;
@@ -207,83 +205,118 @@
algo_name, calld->slices.length);
}
}
-
grpc_slice_buffer_destroy_internal(exec_ctx, &tmp);
-
+ // Swap out the original byte stream with our new one and send the
+ // batch down.
+ grpc_byte_stream_destroy(
+ exec_ctx, calld->send_message_batch->payload->send_message.send_message);
grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices,
- calld->send_flags);
- calld->send_op->payload->send_message.send_message =
+ send_flags);
+ calld->send_message_batch->payload->send_message.send_message =
&calld->replacement_stream.base;
- calld->post_send = calld->send_op->on_complete;
- calld->send_op->on_complete = &calld->send_done;
-
- grpc_call_next_op(exec_ctx, elem, calld->send_op);
+ calld->original_send_message_on_complete =
+ calld->send_message_batch->on_complete;
+ calld->send_message_batch->on_complete = &calld->send_message_on_complete;
+ grpc_call_next_op(exec_ctx, elem, calld->send_message_batch);
}
-static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
- grpc_call_element *elem = elemp;
- call_data *calld = elem->call_data;
- if (GRPC_ERROR_NONE !=
- grpc_byte_stream_pull(exec_ctx,
- calld->send_op->payload->send_message.send_message,
- &calld->incoming_slice)) {
- /* Should never reach here */
- abort();
+// Pulls a slice from the send_message byte stream and adds it to calld->slices.
+static grpc_error *pull_slice_from_send_message(grpc_exec_ctx *exec_ctx,
+ call_data *calld) {
+ grpc_slice incoming_slice;
+ grpc_error *error = grpc_byte_stream_pull(
+ exec_ctx, calld->send_message_batch->payload->send_message.send_message,
+ &incoming_slice);
+ if (error == GRPC_ERROR_NONE) {
+ grpc_slice_buffer_add(&calld->slices, incoming_slice);
}
- grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
- if (calld->send_length == calld->slices.length) {
- finish_send_message(exec_ctx, elem);
- } else {
- continue_send_message(exec_ctx, elem);
- }
+ return error;
}
-static void continue_send_message(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem) {
- call_data *calld = elem->call_data;
+// Reads as many slices as possible from the send_message byte stream.
+// If all data has been read, invokes finish_send_message(). Otherwise,
+// an async call to grpc_byte_stream_next() has been started, which will
+// eventually result in calling on_send_message_next_done().
+static grpc_error *continue_reading_send_message(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem) {
+ call_data *calld = (call_data *)elem->call_data;
while (grpc_byte_stream_next(
- exec_ctx, calld->send_op->payload->send_message.send_message, ~(size_t)0,
- &calld->got_slice)) {
- grpc_byte_stream_pull(exec_ctx,
- calld->send_op->payload->send_message.send_message,
- &calld->incoming_slice);
- grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
- if (calld->send_length == calld->slices.length) {
+ exec_ctx, calld->send_message_batch->payload->send_message.send_message,
+ ~(size_t)0, &calld->on_send_message_next_done)) {
+ grpc_error *error = pull_slice_from_send_message(exec_ctx, calld);
+ if (error != GRPC_ERROR_NONE) return error;
+ if (calld->slices.length ==
+ calld->send_message_batch->payload->send_message.send_message->length) {
finish_send_message(exec_ctx, elem);
break;
}
}
+ return GRPC_ERROR_NONE;
}
-static void handle_send_message_batch(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_transport_stream_op_batch *op,
- bool has_compression_algorithm) {
- call_data *calld = elem->call_data;
- if (!skip_compression(elem, op->payload->send_message.send_message->flags,
+// Async callback for grpc_byte_stream_next().
+static void on_send_message_next_done(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_call_element *elem = (grpc_call_element *)arg;
+ call_data *calld = (call_data *)elem->call_data;
+ if (error != GRPC_ERROR_NONE) goto fail;
+ error = pull_slice_from_send_message(exec_ctx, calld);
+ if (error != GRPC_ERROR_NONE) goto fail;
+ if (calld->slices.length ==
+ calld->send_message_batch->payload->send_message.send_message->length) {
+ finish_send_message(exec_ctx, elem);
+ } else {
+ // This will either finish reading all of the data and invoke
+ // finish_send_message(), or else it will make an async call to
+ // grpc_byte_stream_next(), which will eventually result in calling
+ // this function again.
+ error = continue_reading_send_message(exec_ctx, elem);
+ if (error != GRPC_ERROR_NONE) goto fail;
+ }
+ return;
+fail:
+ grpc_transport_stream_op_batch_finish_with_failure(
+ exec_ctx, calld->send_message_batch, error);
+}
+
+static void start_send_message_batch(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_transport_stream_op_batch *batch,
+ bool has_compression_algorithm) {
+ call_data *calld = (call_data *)elem->call_data;
+ if (!skip_compression(elem, batch->payload->send_message.send_message->flags,
has_compression_algorithm)) {
- calld->send_op = op;
- calld->send_length = op->payload->send_message.send_message->length;
- calld->send_flags = op->payload->send_message.send_message->flags;
- continue_send_message(exec_ctx, elem);
+ calld->send_message_batch = batch;
+ // This will either finish reading all of the data and invoke
+ // finish_send_message(), or else it will make an async call to
+ // grpc_byte_stream_next(), which will eventually result in calling
+ // on_send_message_next_done().
+ grpc_error *error = continue_reading_send_message(exec_ctx, elem);
+ if (error != GRPC_ERROR_NONE) {
+ grpc_transport_stream_op_batch_finish_with_failure(
+ exec_ctx, calld->send_message_batch, error);
+ }
} else {
/* pass control down the stack */
- grpc_call_next_op(exec_ctx, elem, op);
+ grpc_call_next_op(exec_ctx, elem, batch);
}
}
static void compress_start_transport_stream_op_batch(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_transport_stream_op_batch *op) {
+ grpc_transport_stream_op_batch *batch) {
call_data *calld = elem->call_data;
GPR_TIMER_BEGIN("compress_start_transport_stream_op_batch", 0);
- if (op->cancel_stream) {
- GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error);
+ if (batch->cancel_stream) {
+ // TODO(roth): As part of the upcoming call combiner work, change
+ // this to call grpc_byte_stream_shutdown() on the incoming byte
+ // stream, to cancel any in-flight calls to grpc_byte_stream_next().
+ GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
gpr_atm cur = gpr_atm_full_xchg(
&calld->send_initial_metadata_state,
- CANCELLED_BIT | (gpr_atm)op->payload->cancel_stream.cancel_error);
+ CANCELLED_BIT | (gpr_atm)batch->payload->cancel_stream.cancel_error);
switch (cur) {
case HAS_COMPRESSION_ALGORITHM:
case NO_COMPRESSION_ALGORITHM:
@@ -293,7 +326,7 @@
if ((cur & CANCELLED_BIT) == 0) {
grpc_transport_stream_op_batch_finish_with_failure(
exec_ctx, (grpc_transport_stream_op_batch *)cur,
- GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error));
+ GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error));
} else {
GRPC_ERROR_UNREF((grpc_error *)(cur & ~CANCELLED_BIT));
}
@@ -301,14 +334,15 @@
}
}
- if (op->send_initial_metadata) {
+ if (batch->send_initial_metadata) {
bool has_compression_algorithm;
grpc_error *error = process_send_initial_metadata(
exec_ctx, elem,
- op->payload->send_initial_metadata.send_initial_metadata,
+ batch->payload->send_initial_metadata.send_initial_metadata,
&has_compression_algorithm);
if (error != GRPC_ERROR_NONE) {
- grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
+ grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch,
+ error);
return;
}
gpr_atm cur;
@@ -324,32 +358,32 @@
goto retry_send_im;
}
if (cur != INITIAL_METADATA_UNSEEN) {
- handle_send_message_batch(exec_ctx, elem,
- (grpc_transport_stream_op_batch *)cur,
- has_compression_algorithm);
+ start_send_message_batch(exec_ctx, elem,
+ (grpc_transport_stream_op_batch *)cur,
+ has_compression_algorithm);
}
}
}
- if (op->send_message) {
+ if (batch->send_message) {
gpr_atm cur;
retry_send:
cur = gpr_atm_acq_load(&calld->send_initial_metadata_state);
switch (cur) {
case INITIAL_METADATA_UNSEEN:
if (!gpr_atm_rel_cas(&calld->send_initial_metadata_state, cur,
- (gpr_atm)op)) {
+ (gpr_atm)batch)) {
goto retry_send;
}
break;
case HAS_COMPRESSION_ALGORITHM:
case NO_COMPRESSION_ALGORITHM:
- handle_send_message_batch(exec_ctx, elem, op,
- cur == HAS_COMPRESSION_ALGORITHM);
+ start_send_message_batch(exec_ctx, elem, batch,
+ cur == HAS_COMPRESSION_ALGORITHM);
break;
default:
if (cur & CANCELLED_BIT) {
grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, op,
+ exec_ctx, batch,
GRPC_ERROR_REF((grpc_error *)(cur & ~CANCELLED_BIT)));
} else {
/* >1 send_message concurrently */
@@ -358,7 +392,7 @@
}
} else {
/* pass control down the stack */
- grpc_call_next_op(exec_ctx, elem, op);
+ grpc_call_next_op(exec_ctx, elem, batch);
}
GPR_TIMER_END("compress_start_transport_stream_op_batch", 0);
@@ -373,10 +407,10 @@
/* initialize members */
grpc_slice_buffer_init(&calld->slices);
- GRPC_CLOSURE_INIT(&calld->got_slice, got_slice, elem,
- grpc_schedule_on_exec_ctx);
- GRPC_CLOSURE_INIT(&calld->send_done, send_done, elem,
- grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&calld->on_send_message_next_done,
+ on_send_message_next_done, elem, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&calld->send_message_on_complete, send_message_on_complete,
+ elem, grpc_schedule_on_exec_ctx);
return GRPC_ERROR_NONE;
}
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 731ebf4..dc35f48 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -730,6 +730,14 @@
grpc_slice_buffer_destroy_internal(exec_ctx,
&s->unprocessed_incoming_frames_buffer);
grpc_slice_buffer_destroy_internal(exec_ctx, &s->frame_storage);
+ if (s->compressed_data_buffer) {
+ grpc_slice_buffer_destroy_internal(exec_ctx, s->compressed_data_buffer);
+ gpr_free(s->compressed_data_buffer);
+ }
+ if (s->decompressed_data_buffer) {
+ grpc_slice_buffer_destroy_internal(exec_ctx, s->decompressed_data_buffer);
+ gpr_free(s->decompressed_data_buffer);
+ }
grpc_chttp2_list_remove_stalled_by_transport(t, s);
grpc_chttp2_list_remove_stalled_by_stream(t, s);
@@ -780,6 +788,15 @@
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
+ if (s->stream_compression_ctx != NULL) {
+ grpc_stream_compression_context_destroy(s->stream_compression_ctx);
+ s->stream_compression_ctx = NULL;
+ }
+ if (s->stream_decompression_ctx != NULL) {
+ grpc_stream_compression_context_destroy(s->stream_decompression_ctx);
+ s->stream_decompression_ctx = NULL;
+ }
+
s->destroy_stream_arg = then_schedule_closure;
GRPC_CLOSURE_SCHED(
exec_ctx, GRPC_CLOSURE_INIT(&s->destroy_stream, destroy_stream_locked, s,
@@ -1173,6 +1190,7 @@
return; /* early out */
}
if (s->fetched_send_message_length == s->fetching_send_message->length) {
+ grpc_byte_stream_destroy(exec_ctx, s->fetching_send_message);
int64_t notify_offset = s->next_message_end_offset;
if (notify_offset <= s->flow_controlled_bytes_written) {
grpc_chttp2_complete_closure_step(
@@ -1195,9 +1213,14 @@
return; /* early out */
} else if (grpc_byte_stream_next(exec_ctx, s->fetching_send_message,
UINT32_MAX, &s->complete_fetch_locked)) {
- grpc_byte_stream_pull(exec_ctx, s->fetching_send_message,
- &s->fetching_slice);
- add_fetched_slice_locked(exec_ctx, t, s);
+ grpc_error *error = grpc_byte_stream_pull(
+ exec_ctx, s->fetching_send_message, &s->fetching_slice);
+ if (error != GRPC_ERROR_NONE) {
+ grpc_byte_stream_destroy(exec_ctx, s->fetching_send_message);
+ grpc_chttp2_cancel_stream(exec_ctx, t, s, error);
+ } else {
+ add_fetched_slice_locked(exec_ctx, t, s);
+ }
}
}
}
@@ -1214,10 +1237,9 @@
continue_fetching_send_locked(exec_ctx, t, s);
}
}
-
if (error != GRPC_ERROR_NONE) {
- /* TODO(ctiller): what to do here */
- abort();
+ grpc_byte_stream_destroy(exec_ctx, s->fetching_send_message);
+ grpc_chttp2_cancel_stream(exec_ctx, t, s, error);
}
}
@@ -1362,8 +1384,8 @@
"fetching_send_message_finished");
} else {
GPR_ASSERT(s->fetching_send_message == NULL);
- uint8_t *frame_hdr =
- grpc_slice_buffer_tiny_add(&s->flow_controlled_buffer, 5);
+ uint8_t *frame_hdr = grpc_slice_buffer_tiny_add(
+ &s->flow_controlled_buffer, GRPC_HEADER_SIZE_IN_BYTES);
uint32_t flags = op_payload->send_message.send_message->flags;
frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0;
size_t len = op_payload->send_message.send_message->length;
@@ -1454,14 +1476,9 @@
s->recv_message_ready = op_payload->recv_message.recv_message_ready;
s->recv_message = op_payload->recv_message.recv_message;
if (s->id != 0) {
- if (s->pending_byte_stream) {
- already_received = s->frame_storage.length;
- } else {
- already_received = s->frame_storage.length +
- s->unprocessed_incoming_frames_buffer.length;
- }
- incoming_byte_stream_update_flow_control(exec_ctx, t, s, 5,
- already_received);
+ already_received = s->frame_storage.length;
+ incoming_byte_stream_update_flow_control(
+ exec_ctx, t, s, GRPC_HEADER_SIZE_IN_BYTES, already_received);
}
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
}
@@ -1698,10 +1715,43 @@
if (s->unprocessed_incoming_frames_buffer.length == 0) {
grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer,
&s->frame_storage);
+ s->unprocessed_incoming_frames_decompressed = false;
}
- error = grpc_deframe_unprocessed_incoming_frames(
- exec_ctx, &s->data_parser, s,
- &s->unprocessed_incoming_frames_buffer, NULL, s->recv_message);
+ if (s->stream_compression_recv_enabled &&
+ !s->unprocessed_incoming_frames_decompressed) {
+ GPR_ASSERT(s->decompressed_data_buffer->length == 0);
+ bool end_of_context;
+ if (!s->stream_decompression_ctx) {
+ s->stream_decompression_ctx =
+ grpc_stream_compression_context_create(
+ GRPC_STREAM_COMPRESSION_DECOMPRESS);
+ }
+ if (!grpc_stream_decompress(s->stream_decompression_ctx,
+ &s->unprocessed_incoming_frames_buffer,
+ s->decompressed_data_buffer, NULL,
+ GRPC_HEADER_SIZE_IN_BYTES,
+ &end_of_context)) {
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
+ &s->frame_storage);
+ grpc_slice_buffer_reset_and_unref_internal(
+ exec_ctx, &s->unprocessed_incoming_frames_buffer);
+ error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Stream decompression error.");
+ } else {
+ error = grpc_deframe_unprocessed_incoming_frames(
+ exec_ctx, &s->data_parser, s, s->decompressed_data_buffer, NULL,
+ s->recv_message);
+ if (end_of_context) {
+ grpc_stream_compression_context_destroy(
+ s->stream_decompression_ctx);
+ s->stream_decompression_ctx = NULL;
+ }
+ }
+ } else {
+ error = grpc_deframe_unprocessed_incoming_frames(
+ exec_ctx, &s->data_parser, s,
+ &s->unprocessed_incoming_frames_buffer, NULL, s->recv_message);
+ }
if (error != GRPC_ERROR_NONE) {
s->seen_error = true;
grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
@@ -1739,7 +1789,37 @@
}
bool pending_data = s->pending_byte_stream ||
s->unprocessed_incoming_frames_buffer.length > 0;
+ if (s->stream_compression_recv_enabled && s->read_closed &&
+ s->frame_storage.length > 0 &&
+ s->unprocessed_incoming_frames_buffer.length == 0 && !pending_data &&
+ !s->seen_error && s->recv_trailing_metadata_finished != NULL) {
+ /* Maybe some SYNC_FLUSH data is left in frame_storage. Consume them and
+ * maybe decompress the next 5 bytes in the stream. */
+ bool end_of_context;
+ if (!s->stream_decompression_ctx) {
+ s->stream_decompression_ctx = grpc_stream_compression_context_create(
+ GRPC_STREAM_COMPRESSION_DECOMPRESS);
+ }
+ if (!grpc_stream_decompress(s->stream_decompression_ctx,
+ &s->frame_storage,
+ &s->unprocessed_incoming_frames_buffer, NULL,
+ GRPC_HEADER_SIZE_IN_BYTES, &end_of_context)) {
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage);
+ grpc_slice_buffer_reset_and_unref_internal(
+ exec_ctx, &s->unprocessed_incoming_frames_buffer);
+ s->seen_error = true;
+ } else {
+ if (s->unprocessed_incoming_frames_buffer.length > 0) {
+ s->unprocessed_incoming_frames_decompressed = true;
+ }
+ if (end_of_context) {
+ grpc_stream_compression_context_destroy(s->stream_decompression_ctx);
+ s->stream_decompression_ctx = NULL;
+ }
+ }
+ }
if (s->read_closed && s->frame_storage.length == 0 &&
+ s->unprocessed_incoming_frames_buffer.length == 0 &&
(!pending_data || s->seen_error) &&
s->recv_trailing_metadata_finished != NULL) {
grpc_chttp2_incoming_metadata_buffer_publish(
@@ -2607,6 +2687,7 @@
if (s->frame_storage.length > 0) {
grpc_slice_buffer_swap(&s->frame_storage,
&s->unprocessed_incoming_frames_buffer);
+ s->unprocessed_incoming_frames_decompressed = false;
GRPC_CLOSURE_SCHED(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE);
} else if (s->byte_stream_error != GRPC_ERROR_NONE) {
GRPC_CLOSURE_SCHED(exec_ctx, bs->next_action.on_complete,
@@ -2668,17 +2749,41 @@
grpc_chttp2_incoming_byte_stream *bs =
(grpc_chttp2_incoming_byte_stream *)byte_stream;
grpc_chttp2_stream *s = bs->stream;
+ grpc_error *error;
if (s->unprocessed_incoming_frames_buffer.length > 0) {
- grpc_error *error = grpc_deframe_unprocessed_incoming_frames(
+ if (s->stream_compression_recv_enabled &&
+ !s->unprocessed_incoming_frames_decompressed) {
+ bool end_of_context;
+ if (!s->stream_decompression_ctx) {
+ s->stream_decompression_ctx = grpc_stream_compression_context_create(
+ GRPC_STREAM_COMPRESSION_DECOMPRESS);
+ }
+ if (!grpc_stream_decompress(s->stream_decompression_ctx,
+ &s->unprocessed_incoming_frames_buffer,
+ s->decompressed_data_buffer, NULL, MAX_SIZE_T,
+ &end_of_context)) {
+ error =
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream decompression error.");
+ return error;
+ }
+ GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0);
+ grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer,
+ s->decompressed_data_buffer);
+ s->unprocessed_incoming_frames_decompressed = true;
+ if (end_of_context) {
+ grpc_stream_compression_context_destroy(s->stream_decompression_ctx);
+ s->stream_decompression_ctx = NULL;
+ }
+ }
+ error = grpc_deframe_unprocessed_incoming_frames(
exec_ctx, &s->data_parser, s, &s->unprocessed_incoming_frames_buffer,
slice, NULL);
if (error != GRPC_ERROR_NONE) {
return error;
}
} else {
- grpc_error *error =
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
+ error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
GRPC_CLOSURE_SCHED(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error));
return error;
}
@@ -2686,22 +2791,9 @@
return GRPC_ERROR_NONE;
}
-static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
- grpc_byte_stream *byte_stream);
-
static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
void *byte_stream,
- grpc_error *error_ignored) {
- grpc_chttp2_incoming_byte_stream *bs = byte_stream;
- grpc_chttp2_stream *s = bs->stream;
- grpc_chttp2_transport *t = s->t;
-
- GPR_ASSERT(bs->base.destroy == incoming_byte_stream_destroy);
- incoming_byte_stream_unref(exec_ctx, bs);
- s->pending_byte_stream = false;
- grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
- grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
-}
+ grpc_error *error_ignored);
static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream) {
@@ -2768,6 +2860,33 @@
return error;
}
+static void incoming_byte_stream_shutdown(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream,
+ grpc_error *error) {
+ grpc_chttp2_incoming_byte_stream *bs =
+ (grpc_chttp2_incoming_byte_stream *)byte_stream;
+ GRPC_ERROR_UNREF(grpc_chttp2_incoming_byte_stream_finished(
+ exec_ctx, bs, error, true /* reset_on_error */));
+}
+
+static const grpc_byte_stream_vtable grpc_chttp2_incoming_byte_stream_vtable = {
+ incoming_byte_stream_next, incoming_byte_stream_pull,
+ incoming_byte_stream_shutdown, incoming_byte_stream_destroy};
+
+static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
+ void *byte_stream,
+ grpc_error *error_ignored) {
+ grpc_chttp2_incoming_byte_stream *bs = byte_stream;
+ grpc_chttp2_stream *s = bs->stream;
+ grpc_chttp2_transport *t = s->t;
+
+ GPR_ASSERT(bs->base.vtable == &grpc_chttp2_incoming_byte_stream_vtable);
+ incoming_byte_stream_unref(exec_ctx, bs);
+ s->pending_byte_stream = false;
+ grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
+ grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
+}
+
grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s,
uint32_t frame_size, uint32_t flags) {
@@ -2776,9 +2895,7 @@
incoming_byte_stream->base.length = frame_size;
incoming_byte_stream->remaining_bytes = frame_size;
incoming_byte_stream->base.flags = flags;
- incoming_byte_stream->base.next = incoming_byte_stream_next;
- incoming_byte_stream->base.pull = incoming_byte_stream_pull;
- incoming_byte_stream->base.destroy = incoming_byte_stream_destroy;
+ incoming_byte_stream->base.vtable = &grpc_chttp2_incoming_byte_stream_vtable;
gpr_ref_init(&incoming_byte_stream->refs, 2);
incoming_byte_stream->transport = t;
incoming_byte_stream->stream = s;
diff --git a/src/core/ext/transport/chttp2/transport/frame_data.c b/src/core/ext/transport/chttp2/transport/frame_data.c
index dead6be..222d217 100644
--- a/src/core/ext/transport/chttp2/transport/frame_data.c
+++ b/src/core/ext/transport/chttp2/transport/frame_data.c
@@ -293,7 +293,6 @@
grpc_chttp2_transport *t,
grpc_chttp2_stream *s,
grpc_slice slice, int is_last) {
- /* grpc_error *error = parse_inner_buffer(exec_ctx, p, t, s, slice); */
if (!s->pending_byte_stream) {
grpc_slice_ref_internal(slice);
grpc_slice_buffer_add(&s->frame_storage, slice);
@@ -304,6 +303,7 @@
grpc_slice_buffer_add(&s->unprocessed_incoming_frames_buffer, slice);
GRPC_CLOSURE_SCHED(exec_ctx, s->on_next, GRPC_ERROR_NONE);
s->on_next = NULL;
+ s->unprocessed_incoming_frames_decompressed = false;
} else {
grpc_slice_ref_internal(slice);
grpc_slice_buffer_add(&s->frame_storage, slice);
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 4563b78..b538d1d 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -526,6 +526,26 @@
grpc_chttp2_write_cb *on_write_finished_cbs;
grpc_chttp2_write_cb *finish_after_write;
size_t sending_bytes;
+
+ /** Whether stream compression send is enabled */
+ bool stream_compression_recv_enabled;
+ /** Whether stream compression recv is enabled */
+ bool stream_compression_send_enabled;
+ /** Whether bytes stored in unprocessed_incoming_byte_stream is decompressed
+ */
+ bool unprocessed_incoming_frames_decompressed;
+ /** Stream compression decompress context */
+ grpc_stream_compression_context *stream_decompression_ctx;
+ /** Stream compression compress context */
+ grpc_stream_compression_context *stream_compression_ctx;
+
+ /** Buffer storing data that is compressed but not sent */
+ grpc_slice_buffer *compressed_data_buffer;
+ /** Amount of uncompressed bytes sent out when compressed_data_buffer is
+ * emptied */
+ size_t uncompressed_data_size;
+ /** Temporary buffer storing decompressed data */
+ grpc_slice_buffer *decompressed_data_buffer;
};
/** Transport writing call flow:
@@ -621,6 +641,9 @@
grpc_closure **pclosure,
grpc_error *error, const char *desc);
+#define GRPC_HEADER_SIZE_IN_BYTES 5
+#define MAX_SIZE_T (~(size_t)0)
+
#define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
#define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \
(sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING) - 1)
diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c
index 315f2a6..c3ede08 100644
--- a/src/core/ext/transport/chttp2/transport/writing.c
+++ b/src/core/ext/transport/chttp2/transport/writing.c
@@ -303,7 +303,9 @@
}
if (sent_initial_metadata) {
/* send any body bytes, if allowed by flow control */
- if (s->flow_controlled_buffer.length > 0) {
+ if (s->flow_controlled_buffer.length > 0 ||
+ (s->stream_compression_send_enabled &&
+ s->compressed_data_buffer->length > 0)) {
uint32_t stream_outgoing_window = (uint32_t)GPR_MAX(
0,
s->outgoing_window_delta +
@@ -314,21 +316,63 @@
[GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
GPR_MIN(stream_outgoing_window, t->outgoing_window));
if (max_outgoing > 0) {
- uint32_t send_bytes =
- (uint32_t)GPR_MIN(max_outgoing, s->flow_controlled_buffer.length);
- bool is_last_data_frame =
- s->fetching_send_message == NULL &&
- send_bytes == s->flow_controlled_buffer.length;
- bool is_last_frame =
- is_last_data_frame && s->send_trailing_metadata != NULL &&
- grpc_metadata_batch_is_empty(s->send_trailing_metadata);
- grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, send_bytes,
- is_last_frame, &s->stats.outgoing,
- &t->outbuf);
- GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, outgoing_window_delta,
- send_bytes);
- GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, outgoing_window,
- send_bytes);
+ bool is_last_data_frame = false;
+ bool is_last_frame = false;
+ if (s->stream_compression_send_enabled) {
+ while ((s->flow_controlled_buffer.length > 0 ||
+ s->compressed_data_buffer->length > 0) &&
+ max_outgoing > 0) {
+ if (s->compressed_data_buffer->length > 0) {
+ uint32_t send_bytes = (uint32_t)GPR_MIN(
+ max_outgoing, s->compressed_data_buffer->length);
+ is_last_data_frame =
+ (send_bytes == s->compressed_data_buffer->length &&
+ s->flow_controlled_buffer.length == 0 &&
+ s->fetching_send_message == NULL);
+ is_last_frame =
+ is_last_data_frame && s->send_trailing_metadata != NULL &&
+ grpc_metadata_batch_is_empty(s->send_trailing_metadata);
+ grpc_chttp2_encode_data(s->id, s->compressed_data_buffer,
+ send_bytes, is_last_frame,
+ &s->stats.outgoing, &t->outbuf);
+ GRPC_CHTTP2_FLOW_DEBIT_STREAM(
+ "write", t, s, outgoing_window_delta, send_bytes);
+ GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, outgoing_window,
+ send_bytes);
+ max_outgoing -= send_bytes;
+ if (s->compressed_data_buffer->length == 0) {
+ s->sending_bytes += s->uncompressed_data_size;
+ }
+ } else {
+ if (s->stream_compression_ctx == NULL) {
+ s->stream_compression_ctx =
+ grpc_stream_compression_context_create(
+ GRPC_STREAM_COMPRESSION_COMPRESS);
+ }
+ s->uncompressed_data_size = s->flow_controlled_buffer.length;
+ GPR_ASSERT(grpc_stream_compress(
+ s->stream_compression_ctx, &s->flow_controlled_buffer,
+ s->compressed_data_buffer, NULL, MAX_SIZE_T,
+ GRPC_STREAM_COMPRESSION_FLUSH_SYNC));
+ }
+ }
+ } else {
+ uint32_t send_bytes = (uint32_t)GPR_MIN(
+ max_outgoing, s->flow_controlled_buffer.length);
+ is_last_data_frame = s->fetching_send_message == NULL &&
+ send_bytes == s->flow_controlled_buffer.length;
+ is_last_frame =
+ is_last_data_frame && s->send_trailing_metadata != NULL &&
+ grpc_metadata_batch_is_empty(s->send_trailing_metadata);
+ grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer,
+ send_bytes, is_last_frame,
+ &s->stats.outgoing, &t->outbuf);
+ GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, outgoing_window_delta,
+ send_bytes);
+ GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, outgoing_window,
+ send_bytes);
+ s->sending_bytes += send_bytes;
+ }
t->ping_state.pings_before_data_required =
t->ping_policy.max_pings_without_data;
if (!t->is_client) {
@@ -345,9 +389,10 @@
&s->stats.outgoing));
}
}
- s->sending_bytes += send_bytes;
now_writing = true;
- if (s->flow_controlled_buffer.length > 0) {
+ if (s->flow_controlled_buffer.length > 0 ||
+ (s->stream_compression_send_enabled &&
+ s->compressed_data_buffer->length > 0)) {
GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:fork");
grpc_chttp2_list_add_writable_stream(t, s);
}
@@ -361,7 +406,9 @@
}
if (s->send_trailing_metadata != NULL &&
s->fetching_send_message == NULL &&
- s->flow_controlled_buffer.length == 0) {
+ s->flow_controlled_buffer.length == 0 &&
+ (!s->stream_compression_send_enabled ||
+ s->compressed_data_buffer->length == 0)) {
GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "sending trailing_metadata"));
if (grpc_metadata_batch_is_empty(s->send_trailing_metadata)) {
grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, 0, true,
diff --git a/src/core/ext/transport/inproc/inproc_transport.c b/src/core/ext/transport/inproc/inproc_transport.c
index 1449802..6f4b429 100644
--- a/src/core/ext/transport/inproc/inproc_transport.c
+++ b/src/core/ext/transport/inproc/inproc_transport.c
@@ -72,6 +72,7 @@
typedef struct {
grpc_byte_stream base;
sb_list_entry *le;
+ grpc_error *shutdown_error;
} inproc_slice_byte_stream;
typedef struct {
@@ -201,24 +202,39 @@
grpc_byte_stream *bs,
grpc_slice *slice) {
inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs;
+ if (stream->shutdown_error != GRPC_ERROR_NONE) {
+ return GRPC_ERROR_REF(stream->shutdown_error);
+ }
*slice = grpc_slice_buffer_take_first(&stream->le->sb);
return GRPC_ERROR_NONE;
}
+static void inproc_slice_byte_stream_shutdown(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *bs,
+ grpc_error *error) {
+ inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs;
+ GRPC_ERROR_UNREF(stream->shutdown_error);
+ stream->shutdown_error = error;
+}
+
static void inproc_slice_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *bs) {
inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs;
sb_list_entry_destroy(exec_ctx, stream->le);
+ GRPC_ERROR_UNREF(stream->shutdown_error);
}
+static const grpc_byte_stream_vtable inproc_slice_byte_stream_vtable = {
+ inproc_slice_byte_stream_next, inproc_slice_byte_stream_pull,
+ inproc_slice_byte_stream_shutdown, inproc_slice_byte_stream_destroy};
+
void inproc_slice_byte_stream_init(inproc_slice_byte_stream *s,
sb_list_entry *le) {
s->base.length = (uint32_t)le->sb.length;
s->base.flags = 0;
- s->base.next = inproc_slice_byte_stream_next;
- s->base.pull = inproc_slice_byte_stream_pull;
- s->base.destroy = inproc_slice_byte_stream_destroy;
+ s->base.vtable = &inproc_slice_byte_stream_vtable;
s->le = le;
+ s->shutdown_error = GRPC_ERROR_NONE;
}
static void ref_transport(inproc_transport *t) {
@@ -956,11 +972,18 @@
GPR_ASSERT(grpc_byte_stream_next(exec_ctx,
op->payload->send_message.send_message,
SIZE_MAX, &unused));
- grpc_byte_stream_pull(exec_ctx, op->payload->send_message.send_message,
- &message_slice);
+ error = grpc_byte_stream_pull(
+ exec_ctx, op->payload->send_message.send_message, &message_slice);
+ if (error != GRPC_ERROR_NONE) {
+ cancel_stream_locked(exec_ctx, s, GRPC_ERROR_REF(error));
+ break;
+ }
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
remaining -= GRPC_SLICE_LENGTH(message_slice);
grpc_slice_buffer_add(dest, message_slice);
} while (remaining != 0);
+ grpc_byte_stream_destroy(exec_ctx,
+ op->payload->send_message.send_message);
}
if (error == GRPC_ERROR_NONE && op->send_trailing_metadata) {
grpc_metadata_batch *dest = (other == NULL) ? &s->write_buffer_trailing_md
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index 978d7b4..3851993 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -855,8 +855,7 @@
inconsistent state. If it is the latter, we shold do a 0-timeout poll
so that the thread comes back quickly from poll to make a second
attempt at popping. Not doing this can potentially deadlock this
- thread
- forever (if the deadline is infinity) */
+ thread forever (if the deadline is infinity) */
if (cq_event_queue_num_items(&cqd->queue) > 0) {
iteration_deadline = gpr_time_0(GPR_CLOCK_MONOTONIC);
}
@@ -869,10 +868,8 @@
if (cq_event_queue_num_items(&cqd->queue) > 0) {
/* Go to the beginning of the loop. No point doing a poll because
(cq->shutdown == true) is only possible when there is no pending
- work
- (i.e cq->pending_events == 0) and any outstanding
- grpc_cq_completion
- events are already queued on this cq */
+ work (i.e cq->pending_events == 0) and any outstanding completion
+ events should have already been queued on this cq */
continue;
}
@@ -909,11 +906,6 @@
is_finished_arg.first_loop = false;
}
- GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
- GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "next");
- grpc_exec_ctx_finish(&exec_ctx);
- GPR_ASSERT(is_finished_arg.stolen_completion == NULL);
-
if (cq_event_queue_num_items(&cqd->queue) > 0 &&
gpr_atm_no_barrier_load(&cqd->pending_events) > 0) {
gpr_mu_lock(cq->mu);
@@ -921,6 +913,11 @@
gpr_mu_unlock(cq->mu);
}
+ GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
+ GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "next");
+ grpc_exec_ctx_finish(&exec_ctx);
+ GPR_ASSERT(is_finished_arg.stolen_completion == NULL);
+
GPR_TIMER_END("grpc_completion_queue_next", 0);
return ret;
diff --git a/src/core/lib/transport/byte_stream.c b/src/core/lib/transport/byte_stream.c
index 3355814..fb03a10 100644
--- a/src/core/lib/transport/byte_stream.c
+++ b/src/core/lib/transport/byte_stream.c
@@ -19,29 +19,37 @@
#include "src/core/lib/transport/byte_stream.h"
#include <stdlib.h>
+#include <string.h>
#include <grpc/support/log.h>
#include "src/core/lib/slice/slice_internal.h"
-int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx,
- grpc_byte_stream *byte_stream, size_t max_size_hint,
- grpc_closure *on_complete) {
- return byte_stream->next(exec_ctx, byte_stream, max_size_hint, on_complete);
+bool grpc_byte_stream_next(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream, size_t max_size_hint,
+ grpc_closure *on_complete) {
+ return byte_stream->vtable->next(exec_ctx, byte_stream, max_size_hint,
+ on_complete);
}
grpc_error *grpc_byte_stream_pull(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream,
grpc_slice *slice) {
- return byte_stream->pull(exec_ctx, byte_stream, slice);
+ return byte_stream->vtable->pull(exec_ctx, byte_stream, slice);
+}
+
+void grpc_byte_stream_shutdown(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream,
+ grpc_error *error) {
+ byte_stream->vtable->shutdown(exec_ctx, byte_stream, error);
}
void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream) {
- byte_stream->destroy(exec_ctx, byte_stream);
+ byte_stream->vtable->destroy(exec_ctx, byte_stream);
}
-/* slice_buffer_stream */
+// grpc_slice_buffer_stream
static bool slice_buffer_stream_next(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream,
@@ -56,6 +64,9 @@
grpc_byte_stream *byte_stream,
grpc_slice *slice) {
grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream;
+ if (stream->shutdown_error != GRPC_ERROR_NONE) {
+ return GRPC_ERROR_REF(stream->shutdown_error);
+ }
GPR_ASSERT(stream->cursor < stream->backing_buffer->count);
*slice =
grpc_slice_ref_internal(stream->backing_buffer->slices[stream->cursor]);
@@ -63,8 +74,23 @@
return GRPC_ERROR_NONE;
}
+static void slice_buffer_stream_shutdown(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream,
+ grpc_error *error) {
+ grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream;
+ GRPC_ERROR_UNREF(stream->shutdown_error);
+ stream->shutdown_error = error;
+}
+
static void slice_buffer_stream_destroy(grpc_exec_ctx *exec_ctx,
- grpc_byte_stream *byte_stream) {}
+ grpc_byte_stream *byte_stream) {
+ grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream;
+ GRPC_ERROR_UNREF(stream->shutdown_error);
+}
+
+static const grpc_byte_stream_vtable slice_buffer_stream_vtable = {
+ slice_buffer_stream_next, slice_buffer_stream_pull,
+ slice_buffer_stream_shutdown, slice_buffer_stream_destroy};
void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream *stream,
grpc_slice_buffer *slice_buffer,
@@ -72,9 +98,89 @@
GPR_ASSERT(slice_buffer->length <= UINT32_MAX);
stream->base.length = (uint32_t)slice_buffer->length;
stream->base.flags = flags;
- stream->base.next = slice_buffer_stream_next;
- stream->base.pull = slice_buffer_stream_pull;
- stream->base.destroy = slice_buffer_stream_destroy;
+ stream->base.vtable = &slice_buffer_stream_vtable;
stream->backing_buffer = slice_buffer;
stream->cursor = 0;
+ stream->shutdown_error = GRPC_ERROR_NONE;
+}
+
+// grpc_caching_byte_stream
+
+void grpc_byte_stream_cache_init(grpc_byte_stream_cache *cache,
+ grpc_byte_stream *underlying_stream) {
+ cache->underlying_stream = underlying_stream;
+ grpc_slice_buffer_init(&cache->cache_buffer);
+}
+
+void grpc_byte_stream_cache_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream_cache *cache) {
+ grpc_byte_stream_destroy(exec_ctx, cache->underlying_stream);
+ grpc_slice_buffer_destroy_internal(exec_ctx, &cache->cache_buffer);
+}
+
+static bool caching_byte_stream_next(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream,
+ size_t max_size_hint,
+ grpc_closure *on_complete) {
+ grpc_caching_byte_stream *stream = (grpc_caching_byte_stream *)byte_stream;
+ if (stream->shutdown_error != GRPC_ERROR_NONE) return true;
+ if (stream->cursor < stream->cache->cache_buffer.count) return true;
+ return grpc_byte_stream_next(exec_ctx, stream->cache->underlying_stream,
+ max_size_hint, on_complete);
+}
+
+static grpc_error *caching_byte_stream_pull(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream,
+ grpc_slice *slice) {
+ grpc_caching_byte_stream *stream = (grpc_caching_byte_stream *)byte_stream;
+ if (stream->shutdown_error != GRPC_ERROR_NONE) {
+ return GRPC_ERROR_REF(stream->shutdown_error);
+ }
+ if (stream->cursor < stream->cache->cache_buffer.count) {
+ *slice = grpc_slice_ref_internal(
+ stream->cache->cache_buffer.slices[stream->cursor]);
+ ++stream->cursor;
+ return GRPC_ERROR_NONE;
+ }
+ grpc_error *error =
+ grpc_byte_stream_pull(exec_ctx, stream->cache->underlying_stream, slice);
+ if (error == GRPC_ERROR_NONE) {
+ ++stream->cursor;
+ grpc_slice_buffer_add(&stream->cache->cache_buffer,
+ grpc_slice_ref_internal(*slice));
+ }
+ return error;
+}
+
+static void caching_byte_stream_shutdown(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream,
+ grpc_error *error) {
+ grpc_caching_byte_stream *stream = (grpc_caching_byte_stream *)byte_stream;
+ GRPC_ERROR_UNREF(stream->shutdown_error);
+ stream->shutdown_error = GRPC_ERROR_REF(error);
+ grpc_byte_stream_shutdown(exec_ctx, stream->cache->underlying_stream, error);
+}
+
+static void caching_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream) {
+ grpc_caching_byte_stream *stream = (grpc_caching_byte_stream *)byte_stream;
+ GRPC_ERROR_UNREF(stream->shutdown_error);
+}
+
+static const grpc_byte_stream_vtable caching_byte_stream_vtable = {
+ caching_byte_stream_next, caching_byte_stream_pull,
+ caching_byte_stream_shutdown, caching_byte_stream_destroy};
+
+void grpc_caching_byte_stream_init(grpc_caching_byte_stream *stream,
+ grpc_byte_stream_cache *cache) {
+ memset(stream, 0, sizeof(*stream));
+ stream->base.length = cache->underlying_stream->length;
+ stream->base.flags = cache->underlying_stream->flags;
+ stream->base.vtable = &caching_byte_stream_vtable;
+ stream->cache = cache;
+ stream->shutdown_error = GRPC_ERROR_NONE;
+}
+
+void grpc_caching_byte_stream_reset(grpc_caching_byte_stream *stream) {
+ stream->cursor = 0;
}
diff --git a/src/core/lib/transport/byte_stream.h b/src/core/lib/transport/byte_stream.h
index f172296..1e1e831 100644
--- a/src/core/lib/transport/byte_stream.h
+++ b/src/core/lib/transport/byte_stream.h
@@ -28,52 +28,109 @@
/** Mask of all valid internal flags. */
#define GRPC_WRITE_INTERNAL_USED_MASK (GRPC_WRITE_INTERNAL_COMPRESS)
-struct grpc_byte_stream;
typedef struct grpc_byte_stream grpc_byte_stream;
-struct grpc_byte_stream {
- uint32_t length;
- uint32_t flags;
+typedef struct {
bool (*next)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream,
size_t max_size_hint, grpc_closure *on_complete);
grpc_error *(*pull)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream,
grpc_slice *slice);
+ void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream,
+ grpc_error *error);
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream);
+} grpc_byte_stream_vtable;
+
+struct grpc_byte_stream {
+ uint32_t length;
+ uint32_t flags;
+ const grpc_byte_stream_vtable *vtable;
};
-/* returns 1 if the bytes are available immediately (in which case
- * on_complete will not be called), 0 if the bytes will be available
- * asynchronously.
- *
- * max_size_hint can be set as a hint as to the maximum number
- * of bytes that would be acceptable to read.
- */
-int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx,
- grpc_byte_stream *byte_stream, size_t max_size_hint,
- grpc_closure *on_complete);
+// Returns true if the bytes are available immediately (in which case
+// on_complete will not be called), false if the bytes will be available
+// asynchronously.
+//
+// max_size_hint can be set as a hint as to the maximum number
+// of bytes that would be acceptable to read.
+bool grpc_byte_stream_next(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream, size_t max_size_hint,
+ grpc_closure *on_complete);
-/* returns the next slice in the byte stream when it is ready (indicated by
- * either grpc_byte_stream_next returning 1 or on_complete passed to
- * grpc_byte_stream_next is called).
- *
- * once a slice is returned into *slice, it is owned by the caller.
- */
+// Returns the next slice in the byte stream when it is ready (indicated by
+// either grpc_byte_stream_next returning true or on_complete passed to
+// grpc_byte_stream_next is called).
+//
+// Once a slice is returned into *slice, it is owned by the caller.
grpc_error *grpc_byte_stream_pull(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream,
grpc_slice *slice);
+// Shuts down the byte stream.
+//
+// If there is a pending call to on_complete from grpc_byte_stream_next(),
+// it will be invoked with the error passed to grpc_byte_stream_shutdown().
+//
+// The next call to grpc_byte_stream_pull() (if any) will return the error
+// passed to grpc_byte_stream_shutdown().
+void grpc_byte_stream_shutdown(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream,
+ grpc_error *error);
+
void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream);
-/* grpc_byte_stream that wraps a slice buffer */
+// grpc_slice_buffer_stream
+//
+// A grpc_byte_stream that wraps a slice buffer.
+
typedef struct grpc_slice_buffer_stream {
grpc_byte_stream base;
grpc_slice_buffer *backing_buffer;
size_t cursor;
+ grpc_error *shutdown_error;
} grpc_slice_buffer_stream;
void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream *stream,
grpc_slice_buffer *slice_buffer,
uint32_t flags);
+// grpc_caching_byte_stream
+//
+// A grpc_byte_stream that that wraps an underlying byte stream but caches
+// the resulting slices in a slice buffer. If an initial attempt fails
+// without fully draining the underlying stream, a new caching stream
+// can be created from the same underlying cache, in which case it will
+// return whatever is in the backing buffer before continuing to read the
+// underlying stream.
+//
+// NOTE: No synchronization is done, so it is not safe to have multiple
+// grpc_caching_byte_streams simultaneously drawing from the same underlying
+// grpc_byte_stream_cache at the same time.
+
+typedef struct {
+ grpc_byte_stream *underlying_stream;
+ grpc_slice_buffer cache_buffer;
+} grpc_byte_stream_cache;
+
+// Takes ownership of underlying_stream.
+void grpc_byte_stream_cache_init(grpc_byte_stream_cache *cache,
+ grpc_byte_stream *underlying_stream);
+
+// Must not be called while still in use by a grpc_caching_byte_stream.
+void grpc_byte_stream_cache_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream_cache *cache);
+
+typedef struct {
+ grpc_byte_stream base;
+ grpc_byte_stream_cache *cache;
+ size_t cursor;
+ grpc_error *shutdown_error;
+} grpc_caching_byte_stream;
+
+void grpc_caching_byte_stream_init(grpc_caching_byte_stream *stream,
+ grpc_byte_stream_cache *cache);
+
+// Resets the byte stream to the start of the underlying stream.
+void grpc_caching_byte_stream_reset(grpc_caching_byte_stream *stream);
+
#endif /* GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H */
diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c
index 7281602..6c61f4b 100644
--- a/src/core/lib/transport/transport.c
+++ b/src/core/lib/transport/transport.c
@@ -207,27 +207,35 @@
return transport->vtable->get_endpoint(exec_ctx, transport);
}
+// This comment should be sung to the tune of
+// "Supercalifragilisticexpialidocious":
+//
// grpc_transport_stream_op_batch_finish_with_failure
// is a function that must always unref cancel_error
// though it lives in lib, it handles transport stream ops sure
// it's grpc_transport_stream_op_batch_finish_with_failure
void grpc_transport_stream_op_batch_finish_with_failure(
- grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *op,
+ grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *batch,
grpc_error *error) {
- if (op->recv_message) {
- GRPC_CLOSURE_SCHED(exec_ctx, op->payload->recv_message.recv_message_ready,
+ if (batch->send_message) {
+ grpc_byte_stream_destroy(exec_ctx,
+ batch->payload->send_message.send_message);
+ }
+ if (batch->recv_message) {
+ GRPC_CLOSURE_SCHED(exec_ctx,
+ batch->payload->recv_message.recv_message_ready,
GRPC_ERROR_REF(error));
}
- if (op->recv_initial_metadata) {
+ if (batch->recv_initial_metadata) {
GRPC_CLOSURE_SCHED(
exec_ctx,
- op->payload->recv_initial_metadata.recv_initial_metadata_ready,
+ batch->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_REF(error));
}
- GRPC_CLOSURE_SCHED(exec_ctx, op->on_complete, error);
- if (op->cancel_stream) {
- GRPC_ERROR_UNREF(op->payload->cancel_stream.cancel_error);
+ GRPC_CLOSURE_SCHED(exec_ctx, batch->on_complete, error);
+ if (batch->cancel_stream) {
+ GRPC_ERROR_UNREF(batch->payload->cancel_stream.cancel_error);
}
}
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index 84e53e6..099138e 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -159,6 +159,11 @@
} send_trailing_metadata;
struct {
+ // The transport (or a filter that decides to return a failure before
+ // the op gets down to the transport) is responsible for calling
+ // grpc_byte_stream_destroy() on this.
+ // The batch's on_complete will not be called until after the byte
+ // stream is destroyed.
grpc_byte_stream *send_message;
} send_message;
@@ -174,6 +179,10 @@
} recv_initial_metadata;
struct {
+ // Will be set by the transport to point to the byte stream
+ // containing a received message.
+ // The caller is responsible for calling grpc_byte_stream_destroy()
+ // on this byte stream.
grpc_byte_stream **recv_message;
/** Should be enqueued when one message is ready to be processed. */
grpc_closure *recv_message_ready;
diff --git a/src/csharp/Grpc.Core.Tests/ThreadingModelTest.cs b/src/csharp/Grpc.Core.Tests/ThreadingModelTest.cs
new file mode 100644
index 0000000..fb18198
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/ThreadingModelTest.cs
@@ -0,0 +1,98 @@
+#region Copyright notice and license
+
+// Copyright 2015 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#endregion
+
+using System;
+using NUnit.Framework;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Grpc.Core.Tests
+{
+ public class ThreadingModelTest
+ {
+ const string Host = "127.0.0.1";
+
+ MockServiceHelper helper;
+ Server server;
+ Channel channel;
+
+ [SetUp]
+ public void Init()
+ {
+ helper = new MockServiceHelper(Host);
+ server = helper.GetServer();
+ server.Start();
+ channel = helper.GetChannel();
+ }
+
+ [TearDown]
+ public void Cleanup()
+ {
+ channel.ShutdownAsync().Wait();
+ server.ShutdownAsync().Wait();
+ }
+
+ [Test]
+ public void BlockingCallInServerHandlerDoesNotDeadlock()
+ {
+ helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
+ {
+ int recursionDepth = int.Parse(request);
+ if (recursionDepth <= 0) {
+ return "SUCCESS";
+ }
+ return Calls.BlockingUnaryCall(helper.CreateUnaryCall(), (recursionDepth - 1).ToString());
+ });
+
+ int maxRecursionDepth = Environment.ProcessorCount * 2; // make sure we have more pending blocking calls than threads in GrpcThreadPool
+ Assert.AreEqual("SUCCESS", Calls.BlockingUnaryCall(helper.CreateUnaryCall(), maxRecursionDepth.ToString()));
+ }
+
+ [Test]
+ public void HandlerDoesNotRunOnGrpcThread()
+ {
+ helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
+ {
+ if (IsRunningOnGrpcThreadPool()) {
+ return "Server handler should not run on gRPC threadpool thread.";
+ }
+ return request;
+ });
+
+ Assert.AreEqual("ABC", Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "ABC"));
+ }
+
+ [Test]
+ public async Task ContinuationDoesNotRunOnGrpcThread()
+ {
+ helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
+ {
+ return request;
+ });
+
+ await Calls.AsyncUnaryCall(helper.CreateUnaryCall(), "ABC");
+ Assert.IsFalse(IsRunningOnGrpcThreadPool());
+ }
+
+ private static bool IsRunningOnGrpcThreadPool()
+ {
+ var threadName = Thread.CurrentThread.Name ?? "";
+ return threadName.Contains("grpc");
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj
index 5035829..e32711c 100755
--- a/src/csharp/Grpc.Core/Grpc.Core.csproj
+++ b/src/csharp/Grpc.Core/Grpc.Core.csproj
@@ -64,6 +64,7 @@
<ItemGroup Condition=" '$(TargetFramework)' == 'netstandard1.5' ">
<PackageReference Include="System.Runtime.Loader" Version="4.0.0" />
<PackageReference Include="System.Threading.Thread" Version="4.0.0" />
+ <PackageReference Include="System.Threading.ThreadPool" Version="4.0.0" />
</ItemGroup>
<Import Project="NativeDeps.csproj.include" />
diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs
index 8d0c66a..0663ee9 100644
--- a/src/csharp/Grpc.Core/GrpcEnvironment.cs
+++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs
@@ -39,6 +39,7 @@
static int refCount;
static int? customThreadPoolSize;
static int? customCompletionQueueCount;
+ static bool inlineHandlers;
static readonly HashSet<Channel> registeredChannels = new HashSet<Channel>();
static readonly HashSet<Server> registeredServers = new HashSet<Server>();
@@ -218,12 +219,31 @@
}
/// <summary>
+ /// By default, gRPC's internal event handlers get offloaded to .NET default thread pool thread (<c>inlineHandlers=false</c>).
+ /// Setting <c>inlineHandlers</c> to <c>true</c> will allow scheduling the event handlers directly to
+ /// <c>GrpcThreadPool</c> internal threads. That can lead to significant performance gains in some situations,
+ /// but requires user to never block in async code (incorrectly written code can easily lead to deadlocks).
+ /// Inlining handlers is an advanced setting and you should only use it if you know what you are doing.
+ /// Most users should rely on the default value provided by gRPC library.
+ /// Note: this method is part of an experimental API that can change or be removed without any prior notice.
+ /// Note: <c>inlineHandlers=true</c> was the default in gRPC C# v1.4.x and earlier.
+ /// </summary>
+ public static void SetHandlerInlining(bool inlineHandlers)
+ {
+ lock (staticLock)
+ {
+ GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized");
+ GrpcEnvironment.inlineHandlers = inlineHandlers;
+ }
+ }
+
+ /// <summary>
/// Creates gRPC environment.
/// </summary>
private GrpcEnvironment()
{
GrpcNativeInit();
- threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault(), GetCompletionQueueCountOrDefault());
+ threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault(), GetCompletionQueueCountOrDefault(), inlineHandlers);
threadPool.Start();
}
diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
index f9ae77c..19b44c2 100644
--- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
+++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
@@ -33,12 +33,15 @@
internal class GrpcThreadPool
{
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<GrpcThreadPool>();
+ static readonly WaitCallback RunCompletionQueueEventCallbackSuccess = new WaitCallback((callback) => RunCompletionQueueEventCallback((OpCompletionDelegate) callback, true));
+ static readonly WaitCallback RunCompletionQueueEventCallbackFailure = new WaitCallback((callback) => RunCompletionQueueEventCallback((OpCompletionDelegate) callback, false));
readonly GrpcEnvironment environment;
readonly object myLock = new object();
readonly List<Thread> threads = new List<Thread>();
readonly int poolSize;
readonly int completionQueueCount;
+ readonly bool inlineHandlers;
readonly List<BasicProfiler> threadProfilers = new List<BasicProfiler>(); // profilers assigned to threadpool threads
@@ -52,11 +55,13 @@
/// <param name="environment">Environment.</param>
/// <param name="poolSize">Pool size.</param>
/// <param name="completionQueueCount">Completion queue count.</param>
- public GrpcThreadPool(GrpcEnvironment environment, int poolSize, int completionQueueCount)
+ /// <param name="inlineHandlers">Handler inlining.</param>
+ public GrpcThreadPool(GrpcEnvironment environment, int poolSize, int completionQueueCount, bool inlineHandlers)
{
this.environment = environment;
this.poolSize = poolSize;
this.completionQueueCount = completionQueueCount;
+ this.inlineHandlers = inlineHandlers;
GrpcPreconditions.CheckArgument(poolSize >= completionQueueCount,
"Thread pool size cannot be smaller than the number of completion queues used.");
}
@@ -165,11 +170,19 @@
try
{
var callback = cq.CompletionRegistry.Extract(tag);
- callback(success);
+ // Use cached delegates to avoid unnecessary allocations
+ if (!inlineHandlers)
+ {
+ ThreadPool.QueueUserWorkItem(success ? RunCompletionQueueEventCallbackSuccess : RunCompletionQueueEventCallbackFailure, callback);
+ }
+ else
+ {
+ RunCompletionQueueEventCallback(callback, success);
+ }
}
catch (Exception e)
{
- Logger.Error(e, "Exception occured while invoking completion delegate");
+ Logger.Error(e, "Exception occured while extracting event from completion registry.");
}
}
}
@@ -186,5 +199,17 @@
}
return list.AsReadOnly();
}
+
+ private static void RunCompletionQueueEventCallback(OpCompletionDelegate callback, bool success)
+ {
+ try
+ {
+ callback(success);
+ }
+ catch (Exception e)
+ {
+ Logger.Error(e, "Exception occured while invoking completion delegate");
+ }
+ }
}
}
diff --git a/src/csharp/Grpc.IntegrationTesting/QpsWorker.cs b/src/csharp/Grpc.IntegrationTesting/QpsWorker.cs
index 7009a93..a579fb8 100644
--- a/src/csharp/Grpc.IntegrationTesting/QpsWorker.cs
+++ b/src/csharp/Grpc.IntegrationTesting/QpsWorker.cs
@@ -63,11 +63,6 @@
private async Task RunAsync()
{
- // (ThreadPoolSize == ProcessorCount) gives best throughput in benchmarks
- // and doesn't seem to harm performance even when server and client
- // are running on the same machine.
- GrpcEnvironment.SetThreadPoolSize(Environment.ProcessorCount);
-
string host = "0.0.0.0";
int port = options.DriverPort;
diff --git a/src/csharp/tests.json b/src/csharp/tests.json
index bc6adbb..7841051 100644
--- a/src/csharp/tests.json
+++ b/src/csharp/tests.json
@@ -31,6 +31,7 @@
"Grpc.Core.Tests.ShutdownHookPendingCallTest",
"Grpc.Core.Tests.ShutdownHookServerTest",
"Grpc.Core.Tests.ShutdownTest",
+ "Grpc.Core.Tests.ThreadingModelTest",
"Grpc.Core.Tests.TimeoutsTest",
"Grpc.Core.Tests.UserAgentStringTest"
],
diff --git "a/src/objective-c/\041ProtoCompiler-gRPCPlugin.podspec" "b/src/objective-c/\041ProtoCompiler-gRPCPlugin.podspec"
index 22527d1..345eecc 100644
--- "a/src/objective-c/\041ProtoCompiler-gRPCPlugin.podspec"
+++ "b/src/objective-c/\041ProtoCompiler-gRPCPlugin.podspec"
@@ -101,7 +101,7 @@
s.preserve_paths = plugin
# Restrict the protoc version to the one supported by this plugin.
- s.dependency '!ProtoCompiler', '3.2.0'
+ s.dependency '!ProtoCompiler', '3.3.0'
# For the Protobuf dependency not to complain:
s.ios.deployment_target = '7.0'
s.osx.deployment_target = '10.9'
diff --git "a/src/objective-c/\041ProtoCompiler.podspec" "b/src/objective-c/\041ProtoCompiler.podspec"
index 2e9b944..c3f95f9 100644
--- "a/src/objective-c/\041ProtoCompiler.podspec"
+++ "b/src/objective-c/\041ProtoCompiler.podspec"
@@ -36,7 +36,7 @@
# exclamation mark ensures that other "regular" pods will be able to find it as it'll be installed
# before them.
s.name = '!ProtoCompiler'
- v = '3.2.0'
+ v = '3.3.0'
s.version = v
s.summary = 'The Protobuf Compiler (protoc) generates Objective-C files from .proto files'
s.description = <<-DESC
diff --git a/src/objective-c/BoringSSL.podspec b/src/objective-c/BoringSSL.podspec
index 651bd49..37798ec 100644
--- a/src/objective-c/BoringSSL.podspec
+++ b/src/objective-c/BoringSSL.podspec
@@ -31,7 +31,7 @@
Pod::Spec.new do |s|
s.name = 'BoringSSL'
- version = '8.2'
+ version = '9.0'
s.version = version
s.summary = 'BoringSSL is a fork of OpenSSL that is designed to meet Google’s needs.'
# Adapted from the homepage:
@@ -69,9 +69,7 @@
s.source = {
:git => 'https://boringssl.googlesource.com/boringssl',
- # Restore this version name hack in the next version!!
- # :tag => "version_for_cocoapods_#{version}",
- :tag => "version_for_cocoapods_8.0",
+ :tag => "version_for_cocoapods_#{version}",
}
name = 'openssl'
@@ -169,7 +167,6 @@
#include "hkdf.h"
#include "md4.h"
#include "md5.h"
- #include "newhope.h"
#include "obj_mac.h"
#include "objects.h"
#include "opensslv.h"
@@ -183,7 +180,6 @@
#include "ripemd.h"
#include "safestack.h"
#include "srtp.h"
- #include "time_support.h"
#include "x509.h"
#include "x509v3.h"
EOF
@@ -389,42 +385,42 @@
0x28340c19,
0x283480ac,
0x283500ea,
- 0x2c3228ca,
- 0x2c32a8d8,
- 0x2c3328ea,
- 0x2c33a8fc,
- 0x2c342910,
- 0x2c34a922,
- 0x2c35293d,
- 0x2c35a94f,
- 0x2c362962,
+ 0x2c3229b1,
+ 0x2c32a9bf,
+ 0x2c3329d1,
+ 0x2c33a9e3,
+ 0x2c3429f7,
+ 0x2c34aa09,
+ 0x2c352a24,
+ 0x2c35aa36,
+ 0x2c362a49,
0x2c36832d,
- 0x2c37296f,
- 0x2c37a981,
- 0x2c382994,
- 0x2c38a9ab,
- 0x2c3929b9,
- 0x2c39a9c9,
- 0x2c3a29db,
- 0x2c3aa9ef,
- 0x2c3b2a00,
- 0x2c3baa1f,
- 0x2c3c2a33,
- 0x2c3caa49,
- 0x2c3d2a62,
- 0x2c3daa7f,
- 0x2c3e2a90,
- 0x2c3eaa9e,
- 0x2c3f2ab6,
- 0x2c3faace,
- 0x2c402adb,
+ 0x2c372a56,
+ 0x2c37aa68,
+ 0x2c382a7b,
+ 0x2c38aa92,
+ 0x2c392aa0,
+ 0x2c39aab0,
+ 0x2c3a2ac2,
+ 0x2c3aaad6,
+ 0x2c3b2ae7,
+ 0x2c3bab06,
+ 0x2c3c2b1a,
+ 0x2c3cab30,
+ 0x2c3d2b49,
+ 0x2c3dab66,
+ 0x2c3e2b77,
+ 0x2c3eab85,
+ 0x2c3f2b9d,
+ 0x2c3fabb5,
+ 0x2c402bc2,
0x2c4090e7,
- 0x2c412aec,
- 0x2c41aaff,
+ 0x2c412bd3,
+ 0x2c41abe6,
0x2c4210c0,
- 0x2c42ab10,
+ 0x2c42abf7,
0x2c430720,
- 0x2c43aa11,
+ 0x2c43aaf8,
0x30320000,
0x30328015,
0x3033001f,
@@ -577,180 +573,189 @@
0x403b9861,
0x403c0064,
0x403c8083,
- 0x403d18aa,
- 0x403d98c0,
- 0x403e18cf,
- 0x403e98e2,
- 0x403f18fc,
- 0x403f990a,
- 0x4040191f,
- 0x40409933,
- 0x40411950,
- 0x4041996b,
- 0x40421984,
- 0x40429997,
- 0x404319ab,
- 0x404399c3,
- 0x404419da,
+ 0x403d18c1,
+ 0x403d98d7,
+ 0x403e18e6,
+ 0x403e98f9,
+ 0x403f1913,
+ 0x403f9921,
+ 0x40401936,
+ 0x4040994a,
+ 0x40411967,
+ 0x40419982,
+ 0x4042199b,
+ 0x404299ae,
+ 0x404319c2,
+ 0x404399da,
+ 0x404419f1,
0x404480ac,
- 0x404519ef,
- 0x40459a01,
- 0x40461a25,
- 0x40469a45,
- 0x40471a53,
- 0x40479a7a,
- 0x40481ab7,
- 0x40489ad0,
- 0x40491ae7,
- 0x40499b01,
- 0x404a1b18,
- 0x404a9b36,
- 0x404b1b4e,
- 0x404b9b65,
- 0x404c1b7b,
- 0x404c9b8d,
- 0x404d1bae,
- 0x404d9bd0,
- 0x404e1be4,
- 0x404e9bf1,
- 0x404f1c1e,
- 0x404f9c47,
- 0x40501c71,
- 0x40509c85,
- 0x40511ca0,
- 0x40519cb0,
- 0x40521cc7,
- 0x40529ceb,
- 0x40531d03,
- 0x40539d16,
- 0x40541d2b,
- 0x40549d4e,
- 0x40551d5c,
- 0x40559d79,
- 0x40561d86,
- 0x40569d9f,
- 0x40571db7,
- 0x40579dca,
- 0x40581ddf,
- 0x40589e06,
- 0x40591e35,
- 0x40599e62,
- 0x405a1e76,
- 0x405a9e86,
- 0x405b1e9e,
- 0x405b9eaf,
- 0x405c1ec2,
- 0x405c9ed3,
- 0x405d1ee0,
- 0x405d9ef7,
- 0x405e1f17,
+ 0x40451a06,
+ 0x40459a18,
+ 0x40461a3c,
+ 0x40469a5c,
+ 0x40471a6a,
+ 0x40479a91,
+ 0x40481ace,
+ 0x40489ae7,
+ 0x40491afe,
+ 0x40499b18,
+ 0x404a1b2f,
+ 0x404a9b4d,
+ 0x404b1b65,
+ 0x404b9b7c,
+ 0x404c1b92,
+ 0x404c9ba4,
+ 0x404d1bc5,
+ 0x404d9be7,
+ 0x404e1bfb,
+ 0x404e9c08,
+ 0x404f1c35,
+ 0x404f9c5e,
+ 0x40501c99,
+ 0x40509cad,
+ 0x40511cc8,
+ 0x40519cd8,
+ 0x40521cef,
+ 0x40529d13,
+ 0x40531d2b,
+ 0x40539d3e,
+ 0x40541d53,
+ 0x40549d76,
+ 0x40551d84,
+ 0x40559da1,
+ 0x40561dae,
+ 0x40569dc7,
+ 0x40571ddf,
+ 0x40579df2,
+ 0x40581e07,
+ 0x40589e2e,
+ 0x40591e5d,
+ 0x40599e8a,
+ 0x405a1e9e,
+ 0x405a9eae,
+ 0x405b1ec6,
+ 0x405b9ed7,
+ 0x405c1eea,
+ 0x405c9f0b,
+ 0x405d1f18,
+ 0x405d9f2f,
+ 0x405e1f6d,
0x405e8a95,
- 0x405f1f38,
- 0x405f9f45,
- 0x40601f53,
- 0x40609f75,
- 0x40611f9d,
- 0x40619fb2,
- 0x40621fc9,
- 0x40629fda,
- 0x40631feb,
- 0x4063a000,
- 0x40642017,
- 0x4064a043,
- 0x4065205e,
- 0x4065a075,
- 0x4066208d,
- 0x4066a0b7,
- 0x406720e2,
- 0x4067a103,
- 0x40682116,
- 0x4068a137,
- 0x40692169,
- 0x4069a197,
- 0x406a21b8,
- 0x406aa1d8,
- 0x406b2360,
- 0x406ba383,
- 0x406c2399,
- 0x406ca5c5,
- 0x406d25f4,
- 0x406da61c,
- 0x406e264a,
- 0x406ea662,
- 0x406f2681,
- 0x406fa696,
- 0x407026a9,
- 0x4070a6c6,
+ 0x405f1f8e,
+ 0x405f9f9b,
+ 0x40601fa9,
+ 0x40609fcb,
+ 0x4061200f,
+ 0x4061a047,
+ 0x4062205e,
+ 0x4062a06f,
+ 0x40632080,
+ 0x4063a095,
+ 0x406420ac,
+ 0x4064a0d8,
+ 0x406520f3,
+ 0x4065a10a,
+ 0x40662122,
+ 0x4066a14c,
+ 0x40672177,
+ 0x4067a198,
+ 0x406821ab,
+ 0x4068a1cc,
+ 0x406921fe,
+ 0x4069a22c,
+ 0x406a224d,
+ 0x406aa26d,
+ 0x406b23f5,
+ 0x406ba418,
+ 0x406c242e,
+ 0x406ca690,
+ 0x406d26bf,
+ 0x406da6e7,
+ 0x406e2715,
+ 0x406ea749,
+ 0x406f2768,
+ 0x406fa77d,
+ 0x40702790,
+ 0x4070a7ad,
0x40710800,
- 0x4071a6d8,
- 0x407226eb,
- 0x4072a704,
- 0x4073271c,
+ 0x4071a7bf,
+ 0x407227d2,
+ 0x4072a7eb,
+ 0x40732803,
0x4073936d,
- 0x40742730,
- 0x4074a74a,
- 0x4075275b,
- 0x4075a76f,
- 0x4076277d,
+ 0x40742817,
+ 0x4074a831,
+ 0x40752842,
+ 0x4075a856,
+ 0x40762864,
0x407691aa,
- 0x407727a2,
- 0x4077a7c4,
- 0x407827df,
- 0x4078a818,
- 0x4079282f,
- 0x4079a845,
- 0x407a2851,
- 0x407aa864,
- 0x407b2879,
- 0x407ba88b,
- 0x407c28a0,
- 0x407ca8a9,
- 0x407d2152,
- 0x407d9c57,
- 0x407e27f4,
- 0x407e9e16,
- 0x407f1a67,
+ 0x40772889,
+ 0x4077a8ab,
+ 0x407828c6,
+ 0x4078a8ff,
+ 0x40792916,
+ 0x4079a92c,
+ 0x407a2938,
+ 0x407aa94b,
+ 0x407b2960,
+ 0x407ba972,
+ 0x407c2987,
+ 0x407ca990,
+ 0x407d21e7,
+ 0x407d9c6e,
+ 0x407e28db,
+ 0x407e9e3e,
+ 0x407f1a7e,
0x407f9887,
- 0x40801c2e,
- 0x40809a8f,
- 0x40811cd9,
- 0x40819c08,
- 0x40822635,
+ 0x40801c45,
+ 0x40809aa6,
+ 0x40811d01,
+ 0x40819c1f,
+ 0x40822700,
0x4082986d,
- 0x40831df1,
- 0x4083a028,
- 0x40841aa3,
- 0x40849e4e,
- 0x41f4228b,
- 0x41f9231d,
- 0x41fe2210,
- 0x41fea3ec,
- 0x41ff24dd,
- 0x420322a4,
- 0x420822c6,
- 0x4208a302,
- 0x420921f4,
- 0x4209a33c,
- 0x420a224b,
- 0x420aa22b,
- 0x420b226b,
- 0x420ba2e4,
- 0x420c24f9,
- 0x420ca3b9,
- 0x420d23d3,
- 0x420da40a,
- 0x42122424,
- 0x421724c0,
- 0x4217a466,
- 0x421c2488,
- 0x421f2443,
- 0x42212510,
- 0x422624a3,
- 0x422b25a9,
- 0x422ba572,
- 0x422c2591,
- 0x422ca54c,
- 0x422d252b,
+ 0x40831e19,
+ 0x4083a0bd,
+ 0x40841aba,
+ 0x40849e76,
+ 0x40851efb,
+ 0x40859ff3,
+ 0x40861f4f,
+ 0x40869c88,
+ 0x4087272d,
+ 0x4087a024,
+ 0x408818aa,
+ 0x41f42320,
+ 0x41f923b2,
+ 0x41fe22a5,
+ 0x41fea481,
+ 0x41ff2572,
+ 0x42032339,
+ 0x4208235b,
+ 0x4208a397,
+ 0x42092289,
+ 0x4209a3d1,
+ 0x420a22e0,
+ 0x420aa2c0,
+ 0x420b2300,
+ 0x420ba379,
+ 0x420c258e,
+ 0x420ca44e,
+ 0x420d2468,
+ 0x420da49f,
+ 0x421224b9,
+ 0x42172555,
+ 0x4217a4fb,
+ 0x421c251d,
+ 0x421f24d8,
+ 0x422125a5,
+ 0x42262538,
+ 0x422b2674,
+ 0x422ba622,
+ 0x422c265c,
+ 0x422ca5e1,
+ 0x422d25c0,
+ 0x422da641,
+ 0x422e2607,
0x4432072b,
0x4432873a,
0x44330746,
@@ -793,69 +798,69 @@
0x4c3d136d,
0x4c3d937c,
0x4c3e1389,
- 0x50322b22,
- 0x5032ab31,
- 0x50332b3c,
- 0x5033ab4c,
- 0x50342b65,
- 0x5034ab7f,
- 0x50352b8d,
- 0x5035aba3,
- 0x50362bb5,
- 0x5036abcb,
- 0x50372be4,
- 0x5037abf7,
- 0x50382c0f,
- 0x5038ac20,
- 0x50392c35,
- 0x5039ac49,
- 0x503a2c69,
- 0x503aac7f,
- 0x503b2c97,
- 0x503baca9,
- 0x503c2cc5,
- 0x503cacdc,
- 0x503d2cf5,
- 0x503dad0b,
- 0x503e2d18,
- 0x503ead2e,
- 0x503f2d40,
+ 0x50322c09,
+ 0x5032ac18,
+ 0x50332c23,
+ 0x5033ac33,
+ 0x50342c4c,
+ 0x5034ac66,
+ 0x50352c74,
+ 0x5035ac8a,
+ 0x50362c9c,
+ 0x5036acb2,
+ 0x50372ccb,
+ 0x5037acde,
+ 0x50382cf6,
+ 0x5038ad07,
+ 0x50392d1c,
+ 0x5039ad30,
+ 0x503a2d50,
+ 0x503aad66,
+ 0x503b2d7e,
+ 0x503bad90,
+ 0x503c2dac,
+ 0x503cadc3,
+ 0x503d2ddc,
+ 0x503dadf2,
+ 0x503e2dff,
+ 0x503eae15,
+ 0x503f2e27,
0x503f8382,
- 0x50402d53,
- 0x5040ad63,
- 0x50412d7d,
- 0x5041ad8c,
- 0x50422da6,
- 0x5042adc3,
- 0x50432dd3,
- 0x5043ade3,
- 0x50442df2,
+ 0x50402e3a,
+ 0x5040ae4a,
+ 0x50412e64,
+ 0x5041ae73,
+ 0x50422e8d,
+ 0x5042aeaa,
+ 0x50432eba,
+ 0x5043aeca,
+ 0x50442ed9,
0x5044843f,
- 0x50452e06,
- 0x5045ae24,
- 0x50462e37,
- 0x5046ae4d,
- 0x50472e5f,
- 0x5047ae74,
- 0x50482e9a,
- 0x5048aea8,
- 0x50492ebb,
- 0x5049aed0,
- 0x504a2ee6,
- 0x504aaef6,
- 0x504b2f16,
- 0x504baf29,
- 0x504c2f4c,
- 0x504caf7a,
- 0x504d2f8c,
- 0x504dafa9,
- 0x504e2fc4,
- 0x504eafe0,
- 0x504f2ff2,
- 0x504fb009,
- 0x50503018,
+ 0x50452eed,
+ 0x5045af0b,
+ 0x50462f1e,
+ 0x5046af34,
+ 0x50472f46,
+ 0x5047af5b,
+ 0x50482f81,
+ 0x5048af8f,
+ 0x50492fa2,
+ 0x5049afb7,
+ 0x504a2fcd,
+ 0x504aafdd,
+ 0x504b2ffd,
+ 0x504bb010,
+ 0x504c3033,
+ 0x504cb061,
+ 0x504d3073,
+ 0x504db090,
+ 0x504e30ab,
+ 0x504eb0c7,
+ 0x504f30d9,
+ 0x504fb0f0,
+ 0x505030ff,
0x505086ef,
- 0x5051302b,
+ 0x50513112,
0x58320ec9,
0x68320e8b,
0x68328c25,
@@ -1218,6 +1223,7 @@
"BIO_NOT_SET\\0"
"BLOCK_CIPHER_PAD_IS_WRONG\\0"
"BUFFERED_MESSAGES_ON_CIPHER_CHANGE\\0"
+ "CANNOT_PARSE_LEAF_CERT\\0"
"CA_DN_LENGTH_MISMATCH\\0"
"CA_DN_TOO_LONG\\0"
"CCS_RECEIVED_EARLY\\0"
@@ -1261,6 +1267,7 @@
"INVALID_COMPRESSION_LIST\\0"
"INVALID_MESSAGE\\0"
"INVALID_OUTER_RECORD_TYPE\\0"
+ "INVALID_SCT_LIST\\0"
"INVALID_SSL_SESSION\\0"
"INVALID_TICKET_KEYS_LENGTH\\0"
"LENGTH_MISMATCH\\0"
@@ -1290,15 +1297,19 @@
"NO_RENEGOTIATION\\0"
"NO_REQUIRED_DIGEST\\0"
"NO_SHARED_CIPHER\\0"
+ "NO_SHARED_GROUP\\0"
"NULL_SSL_CTX\\0"
"NULL_SSL_METHOD_PASSED\\0"
"OLD_SESSION_CIPHER_NOT_RETURNED\\0"
+ "OLD_SESSION_PRF_HASH_MISMATCH\\0"
"OLD_SESSION_VERSION_NOT_RETURNED\\0"
"PARSE_TLSEXT\\0"
"PATH_TOO_LONG\\0"
"PEER_DID_NOT_RETURN_A_CERTIFICATE\\0"
"PEER_ERROR_UNSUPPORTED_CERTIFICATE_TYPE\\0"
+ "PRE_SHARED_KEY_MUST_BE_LAST\\0"
"PROTOCOL_IS_SHUTDOWN\\0"
+ "PSK_IDENTITY_BINDER_COUNT_MISMATCH\\0"
"PSK_IDENTITY_NOT_FOUND\\0"
"PSK_NO_CLIENT_CB\\0"
"PSK_NO_SERVER_CB\\0"
@@ -1350,7 +1361,9 @@
"TLSV1_ALERT_USER_CANCELLED\\0"
"TLSV1_BAD_CERTIFICATE_HASH_VALUE\\0"
"TLSV1_BAD_CERTIFICATE_STATUS_RESPONSE\\0"
+ "TLSV1_CERTIFICATE_REQUIRED\\0"
"TLSV1_CERTIFICATE_UNOBTAINABLE\\0"
+ "TLSV1_UNKNOWN_PSK_IDENTITY\\0"
"TLSV1_UNRECOGNIZED_NAME\\0"
"TLSV1_UNSUPPORTED_EXTENSION\\0"
"TLS_PEER_DID_NOT_RESPOND_WITH_CERTIFICATE_LIST\\0"
@@ -1358,6 +1371,7 @@
"TOO_MANY_EMPTY_FRAGMENTS\\0"
"TOO_MANY_KEY_UPDATES\\0"
"TOO_MANY_WARNING_ALERTS\\0"
+ "TOO_MUCH_SKIPPED_EARLY_DATA\\0"
"UNABLE_TO_FIND_ECDH_PARAMETERS\\0"
"UNEXPECTED_EXTENSION\\0"
"UNEXPECTED_MESSAGE\\0"
diff --git a/src/proto/grpc/lb/v1/load_balancer.proto b/src/proto/grpc/lb/v1/load_balancer.proto
index b13b343..0a33568 100644
--- a/src/proto/grpc/lb/v1/load_balancer.proto
+++ b/src/proto/grpc/lb/v1/load_balancer.proto
@@ -67,6 +67,15 @@
string name = 1;
}
+// Contains the number of calls finished for a particular load balance token.
+message ClientStatsPerToken {
+ // See Server.load_balance_token.
+ string load_balance_token = 1;
+
+ // The total number of RPCs that finished associated with the token.
+ int64 num_calls = 2;
+}
+
// Contains client level statistics that are useful to load balancing. Each
// count except the timestamp should be reset to zero after reporting the stats.
message ClientStats {
@@ -79,20 +88,17 @@
// The total number of RPCs that finished.
int64 num_calls_finished = 3;
- // The total number of RPCs that were dropped by the client because of rate
- // limiting.
- int64 num_calls_finished_with_drop_for_rate_limiting = 4;
-
- // The total number of RPCs that were dropped by the client because of load
- // balancing.
- int64 num_calls_finished_with_drop_for_load_balancing = 5;
-
// The total number of RPCs that failed to reach a server except dropped RPCs.
int64 num_calls_finished_with_client_failed_to_send = 6;
// The total number of RPCs that finished and are known to have been received
// by a server.
int64 num_calls_finished_known_received = 7;
+
+ // The list of dropped calls.
+ repeated ClientStatsPerToken calls_finished_with_drop = 8;
+
+ reserved 4, 5;
}
message LoadBalanceResponse {
@@ -134,10 +140,8 @@
Duration expiration_interval = 3;
}
-// Contains server information. When none of the [drop_for_*] fields are true,
-// use the other fields. When drop_for_rate_limiting is true, ignore all other
-// fields. Use drop_for_load_balancing only when it is true and
-// drop_for_rate_limiting is false.
+// Contains server information. When the drop field is not true, use the other
+// fields.
message Server {
// A resolved address for the server, serialized in network-byte-order. It may
// either be an IPv4 or IPv6 address.
@@ -149,16 +153,16 @@
// An opaque but printable token given to the frontend for each pick. All
// frontend requests for that pick must include the token in its initial
// metadata. The token is used by the backend to verify the request and to
- // allow the backend to report load to the gRPC LB system.
+ // allow the backend to report load to the gRPC LB system. The token is also
+ // used in client stats for reporting dropped calls.
//
// Its length is variable but less than 50 bytes.
string load_balance_token = 3;
- // Indicates whether this particular request should be dropped by the client
- // for rate limiting.
- bool drop_for_rate_limiting = 4;
+ // Indicates whether this particular request should be dropped by the client.
+ // If the request is dropped, there will be a corresponding entry in
+ // ClientStats.calls_finished_with_drop.
+ bool drop = 4;
- // Indicates whether this particular request should be dropped by the client
- // for load balancing.
- bool drop_for_load_balancing = 5;
+ reserved 5;
}
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index 87b29c2..10eb70b 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -480,7 +480,20 @@
def bidi_streamer(requests, metadata: {}, &blk)
raise_error_if_already_executed
# Metadata might have already been sent if this is an operation view
- merge_metadata_and_send_if_not_already_sent(metadata)
+ begin
+ merge_metadata_and_send_if_not_already_sent(metadata)
+ rescue GRPC::Core::CallError => e
+ batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil)
+ set_input_stream_done
+ set_output_stream_done
+ attach_status_results_and_complete_call(batch_result)
+ raise e
+ rescue => e
+ set_input_stream_done
+ set_output_stream_done
+ raise e
+ end
+
bd = BidiCall.new(@call,
@marshal,
@unmarshal,
diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb
index 42cff40..e1e7a53 100644
--- a/src/ruby/spec/generic/client_stub_spec.rb
+++ b/src/ruby/spec/generic/client_stub_spec.rb
@@ -616,8 +616,22 @@
th.join
end
- # TODO: add test for metadata-related ArgumentError in a bidi call once
- # issue mentioned in https://github.com/grpc/grpc/issues/10526 is fixed
+ it 'should raise ArgumentError if metadata contains invalid values' do
+ @metadata.merge!(k3: 3)
+ stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
+ expect do
+ get_responses(stub).collect { |r| r }
+ end.to raise_error(ArgumentError,
+ /Header values must be of type string or array/)
+ end
+
+ it 'terminates if the call fails to start' do
+ # don't start the server
+ stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
+ expect do
+ get_responses(stub, deadline: from_relative_time(0)).collect { |r| r }
+ end.to raise_error(GRPC::BadStatus)
+ end
it 'should send metadata to the server ok' do
th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true,
@@ -630,9 +644,9 @@
end
describe 'without a call operation' do
- def get_responses(stub)
+ def get_responses(stub, deadline: nil)
e = stub.bidi_streamer(@method, @sent_msgs, noop, noop,
- metadata: @metadata)
+ metadata: @metadata, deadline: deadline)
expect(e).to be_a(Enumerator)
e
end
@@ -644,10 +658,10 @@
after(:each) do
@op.wait # make sure wait doesn't hang
end
- def get_responses(stub, run_start_call_first: false)
+ def get_responses(stub, run_start_call_first: false, deadline: nil)
@op = stub.bidi_streamer(@method, @sent_msgs, noop, noop,
return_op: true,
- metadata: @metadata)
+ metadata: @metadata, deadline: deadline)
expect(@op).to be_a(GRPC::ActiveCall::Operation)
@op.start_call if run_start_call_first
e = @op.execute
diff --git a/templates/gRPC-Core.podspec.template b/templates/gRPC-Core.podspec.template
index 3d54534..cfc13cf 100644
--- a/templates/gRPC-Core.podspec.template
+++ b/templates/gRPC-Core.podspec.template
@@ -135,7 +135,7 @@
ss.header_mappings_dir = '.'
ss.libraries = 'z'
ss.dependency "#{s.name}/Interface", version
- ss.dependency 'BoringSSL', '~> 8.0'
+ ss.dependency 'BoringSSL', '~> 9.0'
ss.dependency 'nanopb', '~> 0.3'
# To save you from scrolling, this is the last part of the podspec.
diff --git "a/templates/src/objective-c/\041ProtoCompiler-gRPCPlugin.podspec.template" "b/templates/src/objective-c/\041ProtoCompiler-gRPCPlugin.podspec.template"
index 24c167d..b822341 100644
--- "a/templates/src/objective-c/\041ProtoCompiler-gRPCPlugin.podspec.template"
+++ "b/templates/src/objective-c/\041ProtoCompiler-gRPCPlugin.podspec.template"
@@ -103,7 +103,7 @@
s.preserve_paths = plugin
# Restrict the protoc version to the one supported by this plugin.
- s.dependency '!ProtoCompiler', '3.2.0'
+ s.dependency '!ProtoCompiler', '3.3.0'
# For the Protobuf dependency not to complain:
s.ios.deployment_target = '7.0'
s.osx.deployment_target = '10.9'
diff --git a/test/core/transport/BUILD b/test/core/transport/BUILD
index 8091cf9..040c0c3 100644
--- a/test/core/transport/BUILD
+++ b/test/core/transport/BUILD
@@ -36,6 +36,18 @@
)
grpc_cc_test(
+ name = "byte_stream_test",
+ srcs = ["byte_stream_test.c"],
+ language = "C",
+ deps = [
+ "//:gpr",
+ "//:grpc",
+ "//test/core/util:gpr_test_util",
+ "//test/core/util:grpc_test_util",
+ ],
+)
+
+grpc_cc_test(
name = "connectivity_state_test",
srcs = ["connectivity_state_test.c"],
language = "C",
diff --git a/test/core/transport/byte_stream_test.c b/test/core/transport/byte_stream_test.c
new file mode 100644
index 0000000..a0c5f96
--- /dev/null
+++ b/test/core/transport/byte_stream_test.c
@@ -0,0 +1,279 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "src/core/lib/transport/byte_stream.h"
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/useful.h>
+
+#include "src/core/lib/slice/slice_internal.h"
+
+#include "test/core/util/test_config.h"
+
+//
+// grpc_slice_buffer_stream tests
+//
+
+static void not_called_closure(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ GPR_ASSERT(false);
+}
+
+static void test_slice_buffer_stream_basic(void) {
+ gpr_log(GPR_DEBUG, "test_slice_buffer_stream_basic");
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ // Create and populate slice buffer.
+ grpc_slice_buffer buffer;
+ grpc_slice_buffer_init(&buffer);
+ grpc_slice input[] = {
+ grpc_slice_from_static_string("foo"),
+ grpc_slice_from_static_string("bar"),
+ };
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
+ grpc_slice_buffer_add(&buffer, input[i]);
+ }
+ // Create byte stream.
+ grpc_slice_buffer_stream stream;
+ grpc_slice_buffer_stream_init(&stream, &buffer, 0);
+ GPR_ASSERT(stream.base.length == 6);
+ grpc_closure closure;
+ GRPC_CLOSURE_INIT(&closure, not_called_closure, NULL,
+ grpc_schedule_on_exec_ctx);
+ // Read each slice. Note that next() always returns synchronously.
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
+ GPR_ASSERT(
+ grpc_byte_stream_next(&exec_ctx, &stream.base, ~(size_t)0, &closure));
+ grpc_slice output;
+ grpc_error *error = grpc_byte_stream_pull(&exec_ctx, &stream.base, &output);
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
+ GPR_ASSERT(grpc_slice_eq(input[i], output));
+ grpc_slice_unref_internal(&exec_ctx, output);
+ }
+ // Clean up.
+ grpc_byte_stream_destroy(&exec_ctx, &stream.base);
+ grpc_slice_buffer_destroy_internal(&exec_ctx, &buffer);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+static void test_slice_buffer_stream_shutdown(void) {
+ gpr_log(GPR_DEBUG, "test_slice_buffer_stream_shutdown");
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ // Create and populate slice buffer.
+ grpc_slice_buffer buffer;
+ grpc_slice_buffer_init(&buffer);
+ grpc_slice input[] = {
+ grpc_slice_from_static_string("foo"),
+ grpc_slice_from_static_string("bar"),
+ };
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
+ grpc_slice_buffer_add(&buffer, input[i]);
+ }
+ // Create byte stream.
+ grpc_slice_buffer_stream stream;
+ grpc_slice_buffer_stream_init(&stream, &buffer, 0);
+ GPR_ASSERT(stream.base.length == 6);
+ grpc_closure closure;
+ GRPC_CLOSURE_INIT(&closure, not_called_closure, NULL,
+ grpc_schedule_on_exec_ctx);
+ // Read the first slice.
+ GPR_ASSERT(
+ grpc_byte_stream_next(&exec_ctx, &stream.base, ~(size_t)0, &closure));
+ grpc_slice output;
+ grpc_error *error = grpc_byte_stream_pull(&exec_ctx, &stream.base, &output);
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
+ GPR_ASSERT(grpc_slice_eq(input[0], output));
+ grpc_slice_unref_internal(&exec_ctx, output);
+ // Now shutdown.
+ grpc_error *shutdown_error =
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("shutdown error");
+ grpc_byte_stream_shutdown(&exec_ctx, &stream.base,
+ GRPC_ERROR_REF(shutdown_error));
+ // After shutdown, the next pull() should return the error.
+ GPR_ASSERT(
+ grpc_byte_stream_next(&exec_ctx, &stream.base, ~(size_t)0, &closure));
+ error = grpc_byte_stream_pull(&exec_ctx, &stream.base, &output);
+ GPR_ASSERT(error == shutdown_error);
+ GRPC_ERROR_UNREF(error);
+ GRPC_ERROR_UNREF(shutdown_error);
+ // Clean up.
+ grpc_byte_stream_destroy(&exec_ctx, &stream.base);
+ grpc_slice_buffer_destroy_internal(&exec_ctx, &buffer);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+//
+// grpc_caching_byte_stream tests
+//
+
+static void test_caching_byte_stream_basic(void) {
+ gpr_log(GPR_DEBUG, "test_caching_byte_stream_basic");
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ // Create and populate slice buffer byte stream.
+ grpc_slice_buffer buffer;
+ grpc_slice_buffer_init(&buffer);
+ grpc_slice input[] = {
+ grpc_slice_from_static_string("foo"),
+ grpc_slice_from_static_string("bar"),
+ };
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
+ grpc_slice_buffer_add(&buffer, input[i]);
+ }
+ grpc_slice_buffer_stream underlying_stream;
+ grpc_slice_buffer_stream_init(&underlying_stream, &buffer, 0);
+ // Create cache and caching stream.
+ grpc_byte_stream_cache cache;
+ grpc_byte_stream_cache_init(&cache, &underlying_stream.base);
+ grpc_caching_byte_stream stream;
+ grpc_caching_byte_stream_init(&stream, &cache);
+ grpc_closure closure;
+ GRPC_CLOSURE_INIT(&closure, not_called_closure, NULL,
+ grpc_schedule_on_exec_ctx);
+ // Read each slice. Note that next() always returns synchronously,
+ // because the underlying byte stream always does.
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
+ GPR_ASSERT(
+ grpc_byte_stream_next(&exec_ctx, &stream.base, ~(size_t)0, &closure));
+ grpc_slice output;
+ grpc_error *error = grpc_byte_stream_pull(&exec_ctx, &stream.base, &output);
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
+ GPR_ASSERT(grpc_slice_eq(input[i], output));
+ grpc_slice_unref_internal(&exec_ctx, output);
+ }
+ // Clean up.
+ grpc_byte_stream_destroy(&exec_ctx, &stream.base);
+ grpc_byte_stream_cache_destroy(&exec_ctx, &cache);
+ grpc_slice_buffer_destroy_internal(&exec_ctx, &buffer);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+static void test_caching_byte_stream_reset(void) {
+ gpr_log(GPR_DEBUG, "test_caching_byte_stream_reset");
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ // Create and populate slice buffer byte stream.
+ grpc_slice_buffer buffer;
+ grpc_slice_buffer_init(&buffer);
+ grpc_slice input[] = {
+ grpc_slice_from_static_string("foo"),
+ grpc_slice_from_static_string("bar"),
+ };
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
+ grpc_slice_buffer_add(&buffer, input[i]);
+ }
+ grpc_slice_buffer_stream underlying_stream;
+ grpc_slice_buffer_stream_init(&underlying_stream, &buffer, 0);
+ // Create cache and caching stream.
+ grpc_byte_stream_cache cache;
+ grpc_byte_stream_cache_init(&cache, &underlying_stream.base);
+ grpc_caching_byte_stream stream;
+ grpc_caching_byte_stream_init(&stream, &cache);
+ grpc_closure closure;
+ GRPC_CLOSURE_INIT(&closure, not_called_closure, NULL,
+ grpc_schedule_on_exec_ctx);
+ // Read one slice.
+ GPR_ASSERT(
+ grpc_byte_stream_next(&exec_ctx, &stream.base, ~(size_t)0, &closure));
+ grpc_slice output;
+ grpc_error *error = grpc_byte_stream_pull(&exec_ctx, &stream.base, &output);
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
+ GPR_ASSERT(grpc_slice_eq(input[0], output));
+ grpc_slice_unref_internal(&exec_ctx, output);
+ // Reset the caching stream. The reads should start over from the
+ // first slice.
+ grpc_caching_byte_stream_reset(&stream);
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
+ GPR_ASSERT(
+ grpc_byte_stream_next(&exec_ctx, &stream.base, ~(size_t)0, &closure));
+ error = grpc_byte_stream_pull(&exec_ctx, &stream.base, &output);
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
+ GPR_ASSERT(grpc_slice_eq(input[i], output));
+ grpc_slice_unref_internal(&exec_ctx, output);
+ }
+ // Clean up.
+ grpc_byte_stream_destroy(&exec_ctx, &stream.base);
+ grpc_byte_stream_cache_destroy(&exec_ctx, &cache);
+ grpc_slice_buffer_destroy_internal(&exec_ctx, &buffer);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+static void test_caching_byte_stream_shared_cache(void) {
+ gpr_log(GPR_DEBUG, "test_caching_byte_stream_shared_cache");
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ // Create and populate slice buffer byte stream.
+ grpc_slice_buffer buffer;
+ grpc_slice_buffer_init(&buffer);
+ grpc_slice input[] = {
+ grpc_slice_from_static_string("foo"),
+ grpc_slice_from_static_string("bar"),
+ };
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
+ grpc_slice_buffer_add(&buffer, input[i]);
+ }
+ grpc_slice_buffer_stream underlying_stream;
+ grpc_slice_buffer_stream_init(&underlying_stream, &buffer, 0);
+ // Create cache and two caching streams.
+ grpc_byte_stream_cache cache;
+ grpc_byte_stream_cache_init(&cache, &underlying_stream.base);
+ grpc_caching_byte_stream stream1;
+ grpc_caching_byte_stream_init(&stream1, &cache);
+ grpc_caching_byte_stream stream2;
+ grpc_caching_byte_stream_init(&stream2, &cache);
+ grpc_closure closure;
+ GRPC_CLOSURE_INIT(&closure, not_called_closure, NULL,
+ grpc_schedule_on_exec_ctx);
+ // Read one slice from stream1.
+ GPR_ASSERT(
+ grpc_byte_stream_next(&exec_ctx, &stream1.base, ~(size_t)0, &closure));
+ grpc_slice output;
+ grpc_error *error = grpc_byte_stream_pull(&exec_ctx, &stream1.base, &output);
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
+ GPR_ASSERT(grpc_slice_eq(input[0], output));
+ grpc_slice_unref_internal(&exec_ctx, output);
+ // Read all slices from stream2.
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
+ GPR_ASSERT(
+ grpc_byte_stream_next(&exec_ctx, &stream2.base, ~(size_t)0, &closure));
+ error = grpc_byte_stream_pull(&exec_ctx, &stream2.base, &output);
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
+ GPR_ASSERT(grpc_slice_eq(input[i], output));
+ grpc_slice_unref_internal(&exec_ctx, output);
+ }
+ // Now read the second slice from stream1.
+ GPR_ASSERT(
+ grpc_byte_stream_next(&exec_ctx, &stream1.base, ~(size_t)0, &closure));
+ error = grpc_byte_stream_pull(&exec_ctx, &stream1.base, &output);
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
+ GPR_ASSERT(grpc_slice_eq(input[1], output));
+ grpc_slice_unref_internal(&exec_ctx, output);
+ // Clean up.
+ grpc_byte_stream_destroy(&exec_ctx, &stream1.base);
+ grpc_byte_stream_destroy(&exec_ctx, &stream2.base);
+ grpc_byte_stream_cache_destroy(&exec_ctx, &cache);
+ grpc_slice_buffer_destroy_internal(&exec_ctx, &buffer);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+int main(int argc, char **argv) {
+ grpc_test_init(argc, argv);
+ test_slice_buffer_stream_basic();
+ test_slice_buffer_stream_shutdown();
+ test_caching_byte_stream_basic();
+ test_caching_byte_stream_reset();
+ test_caching_byte_stream_shared_cache();
+ return 0;
+}
diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc
index 1f3255d..4fef535 100644
--- a/test/cpp/end2end/grpclb_end2end_test.cc
+++ b/test/cpp/end2end/grpclb_end2end_test.cc
@@ -45,6 +45,7 @@
#include "src/proto/grpc/lb/v1/load_balancer.grpc.pb.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
+#include <gmock/gmock.h>
#include <gtest/gtest.h>
// TODO(dgq): Other scenarios in need of testing:
@@ -131,6 +132,19 @@
IncreaseResponseCount();
return status;
}
+
+ // Returns true on its first invocation, false otherwise.
+ bool Shutdown() {
+ std::unique_lock<std::mutex> lock(mu_);
+ const bool prev = !shutdown_;
+ shutdown_ = true;
+ gpr_log(GPR_INFO, "Backend: shut down");
+ return prev;
+ }
+
+ private:
+ std::mutex mu_;
+ bool shutdown_ = false;
};
grpc::string Ip4ToPackedString(const char* ip_str) {
@@ -142,22 +156,20 @@
struct ClientStats {
size_t num_calls_started = 0;
size_t num_calls_finished = 0;
- size_t num_calls_finished_with_drop_for_rate_limiting = 0;
- size_t num_calls_finished_with_drop_for_load_balancing = 0;
size_t num_calls_finished_with_client_failed_to_send = 0;
size_t num_calls_finished_known_received = 0;
+ std::map<grpc::string, size_t> drop_token_counts;
ClientStats& operator+=(const ClientStats& other) {
num_calls_started += other.num_calls_started;
num_calls_finished += other.num_calls_finished;
- num_calls_finished_with_drop_for_rate_limiting +=
- other.num_calls_finished_with_drop_for_rate_limiting;
- num_calls_finished_with_drop_for_load_balancing +=
- other.num_calls_finished_with_drop_for_load_balancing;
num_calls_finished_with_client_failed_to_send +=
other.num_calls_finished_with_client_failed_to_send;
num_calls_finished_known_received +=
other.num_calls_finished_known_received;
+ for (const auto& p : other.drop_token_counts) {
+ drop_token_counts[p.first] += p.second;
+ }
return *this;
}
};
@@ -173,11 +185,12 @@
shutdown_(false) {}
Status BalanceLoad(ServerContext* context, Stream* stream) override {
- gpr_log(GPR_INFO, "LB: BalanceLoad");
+ gpr_log(GPR_INFO, "LB[%p]: BalanceLoad", this);
LoadBalanceRequest request;
stream->Read(&request);
IncreaseRequestCount();
- gpr_log(GPR_INFO, "LB: recv msg '%s'", request.DebugString().c_str());
+ gpr_log(GPR_INFO, "LB[%p]: recv msg '%s'", this,
+ request.DebugString().c_str());
if (client_load_reporting_interval_seconds_ > 0) {
LoadBalanceResponse initial_response;
@@ -208,7 +221,7 @@
if (client_load_reporting_interval_seconds_ > 0) {
request.Clear();
stream->Read(&request);
- gpr_log(GPR_INFO, "LB: recv client load report msg: '%s'",
+ gpr_log(GPR_INFO, "LB[%p]: recv client load report msg: '%s'", this,
request.DebugString().c_str());
GPR_ASSERT(request.has_client_stats());
// We need to acquire the lock here in order to prevent the notify_one
@@ -218,21 +231,21 @@
request.client_stats().num_calls_started();
client_stats_.num_calls_finished +=
request.client_stats().num_calls_finished();
- client_stats_.num_calls_finished_with_drop_for_rate_limiting +=
- request.client_stats()
- .num_calls_finished_with_drop_for_rate_limiting();
- client_stats_.num_calls_finished_with_drop_for_load_balancing +=
- request.client_stats()
- .num_calls_finished_with_drop_for_load_balancing();
client_stats_.num_calls_finished_with_client_failed_to_send +=
request.client_stats()
.num_calls_finished_with_client_failed_to_send();
client_stats_.num_calls_finished_known_received +=
request.client_stats().num_calls_finished_known_received();
+ for (const auto& drop_token_count :
+ request.client_stats().calls_finished_with_drop()) {
+ client_stats_
+ .drop_token_counts[drop_token_count.load_balance_token()] +=
+ drop_token_count.num_calls();
+ }
load_report_cond_.notify_one();
}
done:
- gpr_log(GPR_INFO, "LB: done");
+ gpr_log(GPR_INFO, "LB[%p]: done", this);
return Status::OK;
}
@@ -247,21 +260,20 @@
std::unique_lock<std::mutex> lock(mu_);
const bool prev = !shutdown_;
shutdown_ = true;
- gpr_log(GPR_INFO, "LB: shut down");
+ gpr_log(GPR_INFO, "LB[%p]: shut down", this);
return prev;
}
static LoadBalanceResponse BuildResponseForBackends(
- const std::vector<int>& backend_ports, int num_drops_for_rate_limiting,
- int num_drops_for_load_balancing) {
+ const std::vector<int>& backend_ports,
+ const std::map<grpc::string, size_t>& drop_token_counts) {
LoadBalanceResponse response;
- for (int i = 0; i < num_drops_for_rate_limiting; ++i) {
- auto* server = response.mutable_server_list()->add_servers();
- server->set_drop_for_rate_limiting(true);
- }
- for (int i = 0; i < num_drops_for_load_balancing; ++i) {
- auto* server = response.mutable_server_list()->add_servers();
- server->set_drop_for_load_balancing(true);
+ for (const auto& drop_token_count : drop_token_counts) {
+ for (size_t i = 0; i < drop_token_count.second; ++i) {
+ auto* server = response.mutable_server_list()->add_servers();
+ server->set_drop(true);
+ server->set_load_balance_token(drop_token_count.first);
+ }
}
for (const int& backend_port : backend_ports) {
auto* server = response.mutable_server_list()->add_servers();
@@ -285,13 +297,13 @@
private:
void SendResponse(Stream* stream, const LoadBalanceResponse& response,
int delay_ms) {
- gpr_log(GPR_INFO, "LB: sleeping for %d ms...", delay_ms);
+ gpr_log(GPR_INFO, "LB[%p]: sleeping for %d ms...", this, delay_ms);
if (delay_ms > 0) {
gpr_sleep_until(
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_millis(delay_ms, GPR_TIMESPAN)));
}
- gpr_log(GPR_INFO, "LB: Woke up! Sending response '%s'",
+ gpr_log(GPR_INFO, "LB[%p]: Woke up! Sending response '%s'", this,
response.DebugString().c_str());
IncreaseResponseCount();
stream->Write(response);
@@ -341,7 +353,7 @@
void TearDown() override {
for (size_t i = 0; i < backends_.size(); ++i) {
- backend_servers_[i].Shutdown();
+ if (backends_[i]->Shutdown()) backend_servers_[i].Shutdown();
}
for (size_t i = 0; i < balancers_.size(); ++i) {
if (balancers_[i]->Shutdown()) balancer_servers_[i].Shutdown();
@@ -499,7 +511,7 @@
TEST_F(SingleBalancerTest, Vanilla) {
const size_t kNumRpcsPerAddress = 100;
ScheduleResponseForBalancer(
- 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0),
+ 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
0);
// Make sure that trying to connect works without a call.
channel_->GetState(true /* try_to_connect */);
@@ -538,7 +550,7 @@
ScheduleResponseForBalancer(0, LoadBalanceResponse(), 0);
// Send non-empty serverlist only after kServerlistDelayMs
ScheduleResponseForBalancer(
- 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0),
+ 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
kServerlistDelayMs);
const auto t0 = system_clock::now();
@@ -580,11 +592,11 @@
// Send a serverlist right away.
ScheduleResponseForBalancer(
- 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0),
+ 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
0);
// ... and the same one a bit later.
ScheduleResponseForBalancer(
- 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0),
+ 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
kServerlistDelayMs);
// Send num_backends/2 requests.
@@ -639,6 +651,61 @@
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
}
+TEST_F(SingleBalancerTest, BackendsRestart) {
+ const size_t kNumRpcsPerAddress = 100;
+ ScheduleResponseForBalancer(
+ 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
+ 0);
+ // Make sure that trying to connect works without a call.
+ channel_->GetState(true /* try_to_connect */);
+ // Send 100 RPCs per server.
+ auto statuses_and_responses =
+ SendRpc(kMessage_, kNumRpcsPerAddress * num_backends_);
+ for (const auto& status_and_response : statuses_and_responses) {
+ const Status& status = status_and_response.first;
+ const EchoResponse& response = status_and_response.second;
+ EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
+ << " message=" << status.error_message();
+ EXPECT_EQ(response.message(), kMessage_);
+ }
+ // Each backend should have gotten 100 requests.
+ for (size_t i = 0; i < backends_.size(); ++i) {
+ EXPECT_EQ(kNumRpcsPerAddress,
+ backend_servers_[i].service_->request_count());
+ }
+ balancers_[0]->NotifyDoneWithServerlists();
+ // The balancer got a single request.
+ EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
+ // and sent a single response.
+ EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
+ for (size_t i = 0; i < backends_.size(); ++i) {
+ if (backends_[i]->Shutdown()) backend_servers_[i].Shutdown();
+ }
+ statuses_and_responses = SendRpc(kMessage_, 1);
+ for (const auto& status_and_response : statuses_and_responses) {
+ const Status& status = status_and_response.first;
+ EXPECT_FALSE(status.ok());
+ }
+ for (size_t i = 0; i < num_backends_; ++i) {
+ backends_.emplace_back(new BackendServiceImpl());
+ backend_servers_.emplace_back(ServerThread<BackendService>(
+ "backend", server_host_, backends_.back().get()));
+ }
+ // The following RPC will fail due to the backend ports having changed. It
+ // will nonetheless exercise the grpclb-roundrobin handling of the RR policy
+ // having gone into shutdown.
+ // TODO(dgq): implement the "backend restart" component as well. We need extra
+ // machinery to either update the LB responses "on the fly" or instruct
+ // backends which ports to restart on.
+ statuses_and_responses = SendRpc(kMessage_, 1);
+ for (const auto& status_and_response : statuses_and_responses) {
+ const Status& status = status_and_response.first;
+ EXPECT_FALSE(status.ok());
+ }
+ // Check LB policy name for the channel.
+ EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
+}
+
class UpdatesTest : public GrpclbEnd2endTest {
public:
UpdatesTest() : GrpclbEnd2endTest(4, 3, 0) {}
@@ -648,10 +715,9 @@
const std::vector<int> first_backend{GetBackendPorts()[0]};
const std::vector<int> second_backend{GetBackendPorts()[1]};
ScheduleResponseForBalancer(
- 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, 0, 0), 0);
+ 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, {}), 0);
ScheduleResponseForBalancer(
- 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, 0, 0),
- 0);
+ 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, {}), 0);
// Start servers and send 10 RPCs per server.
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
@@ -726,10 +792,9 @@
const std::vector<int> second_backend{GetBackendPorts()[0]};
ScheduleResponseForBalancer(
- 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, 0, 0), 0);
+ 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, {}), 0);
ScheduleResponseForBalancer(
- 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, 0, 0),
- 0);
+ 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, {}), 0);
// Start servers and send 10 RPCs per server.
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
@@ -809,10 +874,9 @@
const std::vector<int> second_backend{GetBackendPorts()[1]};
ScheduleResponseForBalancer(
- 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, 0, 0), 0);
+ 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, {}), 0);
ScheduleResponseForBalancer(
- 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, 0, 0),
- 0);
+ 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, {}), 0);
// Start servers and send 10 RPCs per server.
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
@@ -901,7 +965,8 @@
TEST_F(SingleBalancerTest, Drop) {
const size_t kNumRpcsPerAddress = 100;
ScheduleResponseForBalancer(
- 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 1, 2),
+ 0, BalancerServiceImpl::BuildResponseForBackends(
+ GetBackendPorts(), {{"rate_limiting", 1}, {"load_balancing", 2}}),
0);
// Send 100 RPCs for each server and drop address.
const auto& statuses_and_responses =
@@ -936,7 +1001,9 @@
TEST_F(SingleBalancerTest, DropAllFirst) {
// All registered addresses are marked as "drop".
ScheduleResponseForBalancer(
- 0, BalancerServiceImpl::BuildResponseForBackends({}, 1, 1), 0);
+ 0, BalancerServiceImpl::BuildResponseForBackends(
+ {}, {{"rate_limiting", 1}, {"load_balancing", 1}}),
+ 0);
const auto& statuses_and_responses = SendRpc(kMessage_, 1);
for (const auto& status_and_response : statuses_and_responses) {
const Status& status = status_and_response.first;
@@ -947,10 +1014,12 @@
TEST_F(SingleBalancerTest, DropAll) {
ScheduleResponseForBalancer(
- 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0),
+ 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
0);
ScheduleResponseForBalancer(
- 0, BalancerServiceImpl::BuildResponseForBackends({}, 1, 1), 1000);
+ 0, BalancerServiceImpl::BuildResponseForBackends(
+ {}, {{"rate_limiting", 1}, {"load_balancing", 1}}),
+ 1000);
// First call succeeds.
auto statuses_and_responses = SendRpc(kMessage_, 1);
@@ -980,7 +1049,7 @@
TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) {
const size_t kNumRpcsPerAddress = 100;
ScheduleResponseForBalancer(
- 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0),
+ 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
0);
// Send 100 RPCs per server.
const auto& statuses_and_responses =
@@ -1009,17 +1078,17 @@
EXPECT_EQ(kNumRpcsPerAddress * num_backends_, client_stats.num_calls_started);
EXPECT_EQ(kNumRpcsPerAddress * num_backends_,
client_stats.num_calls_finished);
- EXPECT_EQ(0U, client_stats.num_calls_finished_with_drop_for_rate_limiting);
- EXPECT_EQ(0U, client_stats.num_calls_finished_with_drop_for_load_balancing);
EXPECT_EQ(0U, client_stats.num_calls_finished_with_client_failed_to_send);
EXPECT_EQ(kNumRpcsPerAddress * num_backends_,
client_stats.num_calls_finished_known_received);
+ EXPECT_THAT(client_stats.drop_token_counts, ::testing::ElementsAre());
}
TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) {
const size_t kNumRpcsPerAddress = 3;
ScheduleResponseForBalancer(
- 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 2, 1),
+ 0, BalancerServiceImpl::BuildResponseForBackends(
+ GetBackendPorts(), {{"rate_limiting", 2}, {"load_balancing", 1}}),
0);
// Send 100 RPCs for each server and drop address.
const auto& statuses_and_responses =
@@ -1056,13 +1125,13 @@
client_stats.num_calls_started);
EXPECT_EQ(kNumRpcsPerAddress * (num_backends_ + 3),
client_stats.num_calls_finished);
- EXPECT_EQ(kNumRpcsPerAddress * 2,
- client_stats.num_calls_finished_with_drop_for_rate_limiting);
- EXPECT_EQ(kNumRpcsPerAddress,
- client_stats.num_calls_finished_with_drop_for_load_balancing);
EXPECT_EQ(0U, client_stats.num_calls_finished_with_client_failed_to_send);
EXPECT_EQ(kNumRpcsPerAddress * num_backends_,
client_stats.num_calls_finished_known_received);
+ EXPECT_THAT(client_stats.drop_token_counts,
+ ::testing::ElementsAre(
+ ::testing::Pair("load_balancing", kNumRpcsPerAddress),
+ ::testing::Pair("rate_limiting", kNumRpcsPerAddress * 2)));
}
} // namespace
diff --git a/test/cpp/grpclb/grpclb_api_test.cc b/test/cpp/grpclb/grpclb_api_test.cc
index cec9666..6b0350e 100644
--- a/test/cpp/grpclb/grpclb_api_test.cc
+++ b/test/cpp/grpclb/grpclb_api_test.cc
@@ -91,13 +91,13 @@
auto* server = serverlist->add_servers();
server->set_ip_address(Ip4ToPackedString("127.0.0.1"));
server->set_port(12345);
- server->set_drop_for_rate_limiting(true);
- server->set_drop_for_load_balancing(false);
+ server->set_load_balance_token("rate_limting");
+ server->set_drop(true);
server = response.mutable_server_list()->add_servers();
server->set_ip_address(Ip4ToPackedString("10.0.0.1"));
server->set_port(54321);
- server->set_drop_for_rate_limiting(false);
- server->set_drop_for_load_balancing(true);
+ server->set_load_balance_token("load_balancing");
+ server->set_drop(true);
auto* expiration_interval = serverlist->mutable_expiration_interval();
expiration_interval->set_seconds(888);
expiration_interval->set_nanos(999);
@@ -112,14 +112,14 @@
EXPECT_EQ(PackedStringToIp(c_serverlist->servers[0]->ip_address),
"127.0.0.1");
EXPECT_EQ(c_serverlist->servers[0]->port, 12345);
- EXPECT_TRUE(c_serverlist->servers[0]->drop_for_rate_limiting);
- EXPECT_FALSE(c_serverlist->servers[0]->drop_for_load_balancing);
+ EXPECT_STREQ(c_serverlist->servers[0]->load_balance_token, "rate_limting");
+ EXPECT_TRUE(c_serverlist->servers[0]->drop);
EXPECT_TRUE(c_serverlist->servers[1]->has_ip_address);
EXPECT_EQ(PackedStringToIp(c_serverlist->servers[1]->ip_address), "10.0.0.1");
EXPECT_EQ(c_serverlist->servers[1]->port, 54321);
- EXPECT_FALSE(c_serverlist->servers[1]->drop_for_rate_limiting);
- EXPECT_TRUE(c_serverlist->servers[1]->drop_for_load_balancing);
+ EXPECT_STREQ(c_serverlist->servers[1]->load_balance_token, "load_balancing");
+ EXPECT_TRUE(c_serverlist->servers[1]->drop);
EXPECT_TRUE(c_serverlist->expiration_interval.has_seconds);
EXPECT_EQ(c_serverlist->expiration_interval.seconds, 888);
diff --git a/test/cpp/qps/server.h b/test/cpp/qps/server.h
index c0dac96..df27a43 100644
--- a/test/cpp/qps/server.h
+++ b/test/cpp/qps/server.h
@@ -80,8 +80,11 @@
return false;
}
payload->set_type(type);
- std::unique_ptr<char[]> body(new char[size]());
- payload->set_body(body.get(), size);
+ // Don't waste time creating a new payload of identical size.
+ if (payload->body().length() != (size_t)size) {
+ std::unique_ptr<char[]> body(new char[size]());
+ payload->set_body(body.get(), size);
+ }
return true;
}
diff --git a/tools/internal_ci/helper_scripts/prepare_build_macos_rc b/tools/internal_ci/helper_scripts/prepare_build_macos_rc
index 89e8e24..00105d4 100644
--- a/tools/internal_ci/helper_scripts/prepare_build_macos_rc
+++ b/tools/internal_ci/helper_scripts/prepare_build_macos_rc
@@ -27,6 +27,11 @@
# show current limits
ulimit -a
+# Add GCP credentials for BQ access
+# pip does not install google-api-python-client properly, so use easy_install
+sudo easy_install --upgrade google-api-python-client
+export GOOGLE_APPLICATION_CREDENTIALS=${KOKORO_GFILE_DIR}/GrpcTesting-d0eeee2db331.json
+gcloud auth activate-service-account --key-file=$GOOGLE_APPLICATION_CREDENTIALS
# required to build protobuf
brew install gflags
diff --git a/tools/internal_ci/helper_scripts/prepare_build_windows.bat b/tools/internal_ci/helper_scripts/prepare_build_windows.bat
index 1634acd..69e087e 100644
--- a/tools/internal_ci/helper_scripts/prepare_build_windows.bat
+++ b/tools/internal_ci/helper_scripts/prepare_build_windows.bat
@@ -18,4 +18,14 @@
bash tools/internal_ci/helper_scripts/gen_report_index.sh
+@rem Update DNS settings to:
+@rem 1. allow resolving metadata.google.internal hostname
+@rem 2. make fetching default GCE credential by oauth2client work
+netsh interface ip set dns "Local Area Connection 8" static 169.254.169.254 primary
+netsh interface ip add dnsservers "Local Area Connection 8" 8.8.8.8 index=2
+netsh interface ip add dnsservers "Local Area Connection 8" 8.8.4.4 index=3
+
+@rem Needed for big_query_utils
+python -m pip install google-api-python-client
+
git submodule update --init
diff --git a/tools/internal_ci/linux/grpc_build_artifacts.sh b/tools/internal_ci/linux/grpc_build_artifacts.sh
index 3997a13..985243e 100755
--- a/tools/internal_ci/linux/grpc_build_artifacts.sh
+++ b/tools/internal_ci/linux/grpc_build_artifacts.sh
@@ -26,4 +26,4 @@
rvm --default use ruby-2.4.1
set -ex
-tools/run_tests/task_runner.py -f artifact linux
+tools/run_tests/task_runner.py -f artifact linux -j 6
diff --git a/tools/internal_ci/linux/grpc_master.cfg b/tools/internal_ci/linux/pull_request/grpc_basictests_c_cpp_dbg.cfg
similarity index 89%
copy from tools/internal_ci/linux/grpc_master.cfg
copy to tools/internal_ci/linux/pull_request/grpc_basictests_c_cpp_dbg.cfg
index a2784df..dcc6265 100644
--- a/tools/internal_ci/linux/grpc_master.cfg
+++ b/tools/internal_ci/linux/pull_request/grpc_basictests_c_cpp_dbg.cfg
@@ -26,5 +26,5 @@
env_vars {
key: "RUN_TESTS_FLAGS"
- value: "-f basictests linux --inner_jobs 16 -j 1 --internal_ci --bq_result_table aggregate_results"
+ value: "-f basictests linux corelang dbg --inner_jobs 16 -j 1 --internal_ci"
}
diff --git a/tools/internal_ci/linux/grpc_master.cfg b/tools/internal_ci/linux/pull_request/grpc_basictests_c_cpp_opt.cfg
similarity index 89%
rename from tools/internal_ci/linux/grpc_master.cfg
rename to tools/internal_ci/linux/pull_request/grpc_basictests_c_cpp_opt.cfg
index a2784df..f60beaf 100644
--- a/tools/internal_ci/linux/grpc_master.cfg
+++ b/tools/internal_ci/linux/pull_request/grpc_basictests_c_cpp_opt.cfg
@@ -26,5 +26,5 @@
env_vars {
key: "RUN_TESTS_FLAGS"
- value: "-f basictests linux --inner_jobs 16 -j 1 --internal_ci --bq_result_table aggregate_results"
+ value: "-f basictests linux corelang opt --inner_jobs 16 -j 1 --internal_ci"
}
diff --git a/tools/internal_ci/linux/grpc_master.cfg b/tools/internal_ci/linux/pull_request/grpc_basictests_multilang.cfg
similarity index 89%
copy from tools/internal_ci/linux/grpc_master.cfg
copy to tools/internal_ci/linux/pull_request/grpc_basictests_multilang.cfg
index a2784df..7c16cf6 100644
--- a/tools/internal_ci/linux/grpc_master.cfg
+++ b/tools/internal_ci/linux/pull_request/grpc_basictests_multilang.cfg
@@ -26,5 +26,5 @@
env_vars {
key: "RUN_TESTS_FLAGS"
- value: "-f basictests linux --inner_jobs 16 -j 1 --internal_ci --bq_result_table aggregate_results"
+ value: "-f basictests linux multilang --inner_jobs 16 -j 2 --internal_ci"
}
diff --git a/tools/internal_ci/linux/grpc_master.cfg b/tools/internal_ci/macos/grpc_basictests.cfg
similarity index 76%
copy from tools/internal_ci/linux/grpc_master.cfg
copy to tools/internal_ci/macos/grpc_basictests.cfg
index a2784df..e10c2e3 100644
--- a/tools/internal_ci/linux/grpc_master.cfg
+++ b/tools/internal_ci/macos/grpc_basictests.cfg
@@ -15,7 +15,8 @@
# Config file for the internal CI (in protobuf text format)
# Location of the continuous shell script in repository.
-build_file: "grpc/tools/internal_ci/linux/grpc_run_tests_matrix.sh"
+build_file: "grpc/tools/internal_ci/macos/grpc_run_tests_matrix.sh"
+gfile_resources: "/bigstore/grpc-testing-secrets/gcp_credentials/GrpcTesting-d0eeee2db331.json"
timeout_mins: 240
action {
define_artifacts {
@@ -26,5 +27,5 @@
env_vars {
key: "RUN_TESTS_FLAGS"
- value: "-f basictests linux --inner_jobs 16 -j 1 --internal_ci --bq_result_table aggregate_results"
+ value: "-f basictests macos --internal_ci -j 2 --inner_jobs 4 --bq_result_table aggregate_results"
}
diff --git a/tools/internal_ci/macos/grpc_master.cfg b/tools/internal_ci/macos/grpc_master.cfg
deleted file mode 100644
index 24c021d..0000000
--- a/tools/internal_ci/macos/grpc_master.cfg
+++ /dev/null
@@ -1,25 +0,0 @@
-# Copyright 2017 gRPC authors.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# Config file for the internal CI (in protobuf text format)
-
-# Location of the continuous shell script in repository.
-build_file: "grpc/tools/internal_ci/macos/grpc_master.sh"
-timeout_mins: 240
-action {
- define_artifacts {
- regex: "**/*sponge_log.xml"
- regex: "github/grpc/reports/**"
- }
-}
diff --git a/tools/internal_ci/macos/grpc_master.sh b/tools/internal_ci/macos/grpc_run_tests_matrix.sh
similarity index 88%
rename from tools/internal_ci/macos/grpc_master.sh
rename to tools/internal_ci/macos/grpc_run_tests_matrix.sh
index c64666b..9a43e48 100755
--- a/tools/internal_ci/macos/grpc_master.sh
+++ b/tools/internal_ci/macos/grpc_run_tests_matrix.sh
@@ -20,7 +20,7 @@
source tools/internal_ci/helper_scripts/prepare_build_macos_rc
-tools/run_tests/run_tests_matrix.py -f basictests macos --internal_ci -j 2 --inner_jobs 4 || FAILED="true"
+tools/run_tests/run_tests_matrix.py $RUN_TESTS_FLAGS || FAILED="true"
# kill port_server.py to prevent the build from hanging
ps aux | grep port_server\\.py | awk '{print $2}' | xargs kill -9
diff --git a/tools/internal_ci/linux/grpc_master.cfg b/tools/internal_ci/windows/grpc_basictests.cfg
similarity index 80%
copy from tools/internal_ci/linux/grpc_master.cfg
copy to tools/internal_ci/windows/grpc_basictests.cfg
index a2784df..396d29e 100644
--- a/tools/internal_ci/linux/grpc_master.cfg
+++ b/tools/internal_ci/windows/grpc_basictests.cfg
@@ -15,8 +15,8 @@
# Config file for the internal CI (in protobuf text format)
# Location of the continuous shell script in repository.
-build_file: "grpc/tools/internal_ci/linux/grpc_run_tests_matrix.sh"
-timeout_mins: 240
+build_file: "grpc/tools/internal_ci/windows/grpc_run_tests_matrix.bat"
+timeout_mins: 360
action {
define_artifacts {
regex: "**/*sponge_log.xml"
@@ -26,5 +26,5 @@
env_vars {
key: "RUN_TESTS_FLAGS"
- value: "-f basictests linux --inner_jobs 16 -j 1 --internal_ci --bq_result_table aggregate_results"
+ value: "-f basictests windows -j 1 --inner_jobs 8 --internal_ci --bq_result_table aggregate_results"
}
diff --git a/tools/internal_ci/windows/grpc_master.bat b/tools/internal_ci/windows/grpc_master.bat
deleted file mode 100644
index 58eefc3..0000000
--- a/tools/internal_ci/windows/grpc_master.bat
+++ /dev/null
@@ -1,24 +0,0 @@
-@rem Copyright 2017 gRPC authors.
-@rem
-@rem Licensed under the Apache License, Version 2.0 (the "License");
-@rem you may not use this file except in compliance with the License.
-@rem You may obtain a copy of the License at
-@rem
-@rem http://www.apache.org/licenses/LICENSE-2.0
-@rem
-@rem Unless required by applicable law or agreed to in writing, software
-@rem distributed under the License is distributed on an "AS IS" BASIS,
-@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-@rem See the License for the specific language governing permissions and
-@rem limitations under the License.
-
-@rem enter repo root
-cd /d %~dp0\..\..\..
-
-call tools/internal_ci/helper_scripts/prepare_build_windows.bat
-
-python tools/run_tests/run_tests_matrix.py -f basictests windows -j 1 --inner_jobs 8 --internal_ci || goto :error
-goto :EOF
-
-:error
-exit /b %errorlevel%
diff --git a/tools/internal_ci/windows/grpc_master.cfg b/tools/internal_ci/windows/grpc_master.cfg
deleted file mode 100644
index 786ebc0..0000000
--- a/tools/internal_ci/windows/grpc_master.cfg
+++ /dev/null
@@ -1,25 +0,0 @@
-# Copyright 2017 gRPC authors.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# Config file for the internal CI (in protobuf text format)
-
-# Location of the continuous shell script in repository.
-build_file: "grpc/tools/internal_ci/windows/grpc_master.bat"
-timeout_mins: 360
-action {
- define_artifacts {
- regex: "**/*sponge_log.xml"
- regex: "github/grpc/reports/**"
- }
-}
diff --git a/tools/internal_ci/linux/grpc_master.cfg b/tools/internal_ci/windows/grpc_portability.cfg
similarity index 81%
copy from tools/internal_ci/linux/grpc_master.cfg
copy to tools/internal_ci/windows/grpc_portability.cfg
index a2784df..c395cb4 100644
--- a/tools/internal_ci/linux/grpc_master.cfg
+++ b/tools/internal_ci/windows/grpc_portability.cfg
@@ -15,8 +15,8 @@
# Config file for the internal CI (in protobuf text format)
# Location of the continuous shell script in repository.
-build_file: "grpc/tools/internal_ci/linux/grpc_run_tests_matrix.sh"
-timeout_mins: 240
+build_file: "grpc/tools/internal_ci/windows/grpc_run_tests_matrix.bat"
+timeout_mins: 360
action {
define_artifacts {
regex: "**/*sponge_log.xml"
@@ -26,5 +26,5 @@
env_vars {
key: "RUN_TESTS_FLAGS"
- value: "-f basictests linux --inner_jobs 16 -j 1 --internal_ci --bq_result_table aggregate_results"
+ value: "-f portability windows -j 1 --inner_jobs 8 --internal_ci"
}
diff --git a/tools/internal_ci/windows/grpc_portability_master.cfg b/tools/internal_ci/windows/grpc_portability_master.cfg
index 85f6927..c395cb4 100644
--- a/tools/internal_ci/windows/grpc_portability_master.cfg
+++ b/tools/internal_ci/windows/grpc_portability_master.cfg
@@ -15,7 +15,7 @@
# Config file for the internal CI (in protobuf text format)
# Location of the continuous shell script in repository.
-build_file: "grpc/tools/internal_ci/windows/grpc_portability_master.bat"
+build_file: "grpc/tools/internal_ci/windows/grpc_run_tests_matrix.bat"
timeout_mins: 360
action {
define_artifacts {
@@ -23,3 +23,8 @@
regex: "github/grpc/reports/**"
}
}
+
+env_vars {
+ key: "RUN_TESTS_FLAGS"
+ value: "-f portability windows -j 1 --inner_jobs 8 --internal_ci"
+}
diff --git a/tools/internal_ci/windows/grpc_portability_master.bat b/tools/internal_ci/windows/grpc_run_tests_matrix.bat
similarity index 86%
rename from tools/internal_ci/windows/grpc_portability_master.bat
rename to tools/internal_ci/windows/grpc_run_tests_matrix.bat
index c93d56c..08d834f 100644
--- a/tools/internal_ci/windows/grpc_portability_master.bat
+++ b/tools/internal_ci/windows/grpc_run_tests_matrix.bat
@@ -17,7 +17,7 @@
call tools/internal_ci/helper_scripts/prepare_build_windows.bat
-python tools/run_tests/run_tests_matrix.py -f portability windows -j 1 --inner_jobs 8 --internal_ci || goto :error
+python tools/run_tests/run_tests_matrix.py %RUN_TESTS_FLAGS% || goto :error
goto :EOF
:error
diff --git a/tools/internal_ci/linux/grpc_master.cfg b/tools/internal_ci/windows/pull_request/grpc_basictests.cfg
similarity index 81%
copy from tools/internal_ci/linux/grpc_master.cfg
copy to tools/internal_ci/windows/pull_request/grpc_basictests.cfg
index a2784df..a116738 100644
--- a/tools/internal_ci/linux/grpc_master.cfg
+++ b/tools/internal_ci/windows/pull_request/grpc_basictests.cfg
@@ -15,8 +15,8 @@
# Config file for the internal CI (in protobuf text format)
# Location of the continuous shell script in repository.
-build_file: "grpc/tools/internal_ci/linux/grpc_run_tests_matrix.sh"
-timeout_mins: 240
+build_file: "grpc/tools/internal_ci/windows/grpc_run_tests_matrix.bat"
+timeout_mins: 360
action {
define_artifacts {
regex: "**/*sponge_log.xml"
@@ -26,5 +26,5 @@
env_vars {
key: "RUN_TESTS_FLAGS"
- value: "-f basictests linux --inner_jobs 16 -j 1 --internal_ci --bq_result_table aggregate_results"
+ value: "-f basictests windows -j 1 --inner_jobs 8 --internal_ci"
}
diff --git a/tools/internal_ci/linux/grpc_master.cfg b/tools/internal_ci/windows/pull_request/grpc_portability.cfg
similarity index 81%
copy from tools/internal_ci/linux/grpc_master.cfg
copy to tools/internal_ci/windows/pull_request/grpc_portability.cfg
index a2784df..c395cb4 100644
--- a/tools/internal_ci/linux/grpc_master.cfg
+++ b/tools/internal_ci/windows/pull_request/grpc_portability.cfg
@@ -15,8 +15,8 @@
# Config file for the internal CI (in protobuf text format)
# Location of the continuous shell script in repository.
-build_file: "grpc/tools/internal_ci/linux/grpc_run_tests_matrix.sh"
-timeout_mins: 240
+build_file: "grpc/tools/internal_ci/windows/grpc_run_tests_matrix.bat"
+timeout_mins: 360
action {
define_artifacts {
regex: "**/*sponge_log.xml"
@@ -26,5 +26,5 @@
env_vars {
key: "RUN_TESTS_FLAGS"
- value: "-f basictests linux --inner_jobs 16 -j 1 --internal_ci --bq_result_table aggregate_results"
+ value: "-f portability windows -j 1 --inner_jobs 8 --internal_ci"
}
diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json
index 24e7ff2..aa76fc0 100644
--- a/tools/run_tests/generated/sources_and_headers.json
+++ b/tools/run_tests/generated/sources_and_headers.json
@@ -174,6 +174,23 @@
"headers": [],
"is_filegroup": false,
"language": "c",
+ "name": "byte_stream_test",
+ "src": [
+ "test/core/transport/byte_stream_test.c"
+ ],
+ "third_party": false,
+ "type": "target"
+ },
+ {
+ "deps": [
+ "gpr",
+ "gpr_test_util",
+ "grpc",
+ "grpc_test_util"
+ ],
+ "headers": [],
+ "is_filegroup": false,
+ "language": "c",
"name": "census_context_test",
"src": [
"test/core/census/context_test.c"
diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json
index ebf3071..83781f9 100644
--- a/tools/run_tests/generated/tests.json
+++ b/tools/run_tests/generated/tests.json
@@ -215,6 +215,28 @@
"flaky": false,
"gtest": false,
"language": "c",
+ "name": "byte_stream_test",
+ "platforms": [
+ "linux",
+ "mac",
+ "posix",
+ "windows"
+ ]
+ },
+ {
+ "args": [],
+ "ci_platforms": [
+ "linux",
+ "mac",
+ "posix",
+ "windows"
+ ],
+ "cpu_cost": 1.0,
+ "exclude_configs": [],
+ "exclude_iomgrs": [],
+ "flaky": false,
+ "gtest": false,
+ "language": "c",
"name": "census_context_test",
"platforms": [
"linux",
diff --git a/vsprojects/buildtests_c.sln b/vsprojects/buildtests_c.sln
index 55de734..6f13039 100644
--- a/vsprojects/buildtests_c.sln
+++ b/vsprojects/buildtests_c.sln
@@ -118,6 +118,17 @@
{29D16885-7228-4C31-81ED-5F9187C7F2A9} = {29D16885-7228-4C31-81ED-5F9187C7F2A9}
EndProjectSection
EndProject
+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "byte_stream_test", "vcxproj\test\byte_stream_test\byte_stream_test.vcxproj", "{9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}"
+ ProjectSection(myProperties) = preProject
+ lib = "False"
+ EndProjectSection
+ ProjectSection(ProjectDependencies) = postProject
+ {17BCAFC0-5FDC-4C94-AEB9-95F3E220614B} = {17BCAFC0-5FDC-4C94-AEB9-95F3E220614B}
+ {29D16885-7228-4C31-81ED-5F9187C7F2A9} = {29D16885-7228-4C31-81ED-5F9187C7F2A9}
+ {EAB0A629-17A9-44DB-B5FF-E91A721FE037} = {EAB0A629-17A9-44DB-B5FF-E91A721FE037}
+ {B23D3D1A-9438-4EDA-BEB6-9A0A03D17792} = {B23D3D1A-9438-4EDA-BEB6-9A0A03D17792}
+ EndProjectSection
+EndProject
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "census_context_test", "vcxproj\test\census_context_test\census_context_test.vcxproj", "{5C1CFC2D-AF3C-D7CB-BA74-D267E91CBC73}"
ProjectSection(myProperties) = preProject
lib = "False"
@@ -1940,6 +1951,22 @@
{D5C70922-D68E-0E9D-9988-995E0F9A79AE}.Release-DLL|Win32.Build.0 = Release|Win32
{D5C70922-D68E-0E9D-9988-995E0F9A79AE}.Release-DLL|x64.ActiveCfg = Release|x64
{D5C70922-D68E-0E9D-9988-995E0F9A79AE}.Release-DLL|x64.Build.0 = Release|x64
+ {9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Debug|Win32.ActiveCfg = Debug|Win32
+ {9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Debug|x64.ActiveCfg = Debug|x64
+ {9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Release|Win32.ActiveCfg = Release|Win32
+ {9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Release|x64.ActiveCfg = Release|x64
+ {9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Debug|Win32.Build.0 = Debug|Win32
+ {9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Debug|x64.Build.0 = Debug|x64
+ {9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Release|Win32.Build.0 = Release|Win32
+ {9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Release|x64.Build.0 = Release|x64
+ {9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Debug-DLL|Win32.ActiveCfg = Debug|Win32
+ {9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Debug-DLL|Win32.Build.0 = Debug|Win32
+ {9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Debug-DLL|x64.ActiveCfg = Debug|x64
+ {9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Debug-DLL|x64.Build.0 = Debug|x64
+ {9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Release-DLL|Win32.ActiveCfg = Release|Win32
+ {9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Release-DLL|Win32.Build.0 = Release|Win32
+ {9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Release-DLL|x64.ActiveCfg = Release|x64
+ {9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Release-DLL|x64.Build.0 = Release|x64
{5C1CFC2D-AF3C-D7CB-BA74-D267E91CBC73}.Debug|Win32.ActiveCfg = Debug|Win32
{5C1CFC2D-AF3C-D7CB-BA74-D267E91CBC73}.Debug|x64.ActiveCfg = Debug|x64
{5C1CFC2D-AF3C-D7CB-BA74-D267E91CBC73}.Release|Win32.ActiveCfg = Release|Win32
diff --git a/vsprojects/vcxproj/test/byte_stream_test/byte_stream_test.vcxproj b/vsprojects/vcxproj/test/byte_stream_test/byte_stream_test.vcxproj
new file mode 100644
index 0000000..5d65647
--- /dev/null
+++ b/vsprojects/vcxproj/test/byte_stream_test/byte_stream_test.vcxproj
@@ -0,0 +1,199 @@
+<?xml version="1.0" encoding="utf-8"?>
+<Project DefaultTargets="Build" ToolsVersion="12.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.props" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\1.0.204.1.props')" />
+ <ItemGroup Label="ProjectConfigurations">
+ <ProjectConfiguration Include="Debug|Win32">
+ <Configuration>Debug</Configuration>
+ <Platform>Win32</Platform>
+ </ProjectConfiguration>
+ <ProjectConfiguration Include="Debug|x64">
+ <Configuration>Debug</Configuration>
+ <Platform>x64</Platform>
+ </ProjectConfiguration>
+ <ProjectConfiguration Include="Release|Win32">
+ <Configuration>Release</Configuration>
+ <Platform>Win32</Platform>
+ </ProjectConfiguration>
+ <ProjectConfiguration Include="Release|x64">
+ <Configuration>Release</Configuration>
+ <Platform>x64</Platform>
+ </ProjectConfiguration>
+ </ItemGroup>
+ <PropertyGroup Label="Globals">
+ <ProjectGuid>{9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}</ProjectGuid>
+ <IgnoreWarnIntDirInTempDetected>true</IgnoreWarnIntDirInTempDetected>
+ <IntDir>$(SolutionDir)IntDir\$(MSBuildProjectName)\</IntDir>
+ </PropertyGroup>
+ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" />
+ <PropertyGroup Condition="'$(VisualStudioVersion)' == '10.0'" Label="Configuration">
+ <PlatformToolset>v100</PlatformToolset>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(VisualStudioVersion)' == '11.0'" Label="Configuration">
+ <PlatformToolset>v110</PlatformToolset>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(VisualStudioVersion)' == '12.0'" Label="Configuration">
+ <PlatformToolset>v120</PlatformToolset>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(VisualStudioVersion)' == '14.0'" Label="Configuration">
+ <PlatformToolset>v140</PlatformToolset>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(Configuration)'=='Debug'" Label="Configuration">
+ <ConfigurationType>Application</ConfigurationType>
+ <UseDebugLibraries>true</UseDebugLibraries>
+ <CharacterSet>Unicode</CharacterSet>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(Configuration)'=='Release'" Label="Configuration">
+ <ConfigurationType>Application</ConfigurationType>
+ <UseDebugLibraries>false</UseDebugLibraries>
+ <WholeProgramOptimization>true</WholeProgramOptimization>
+ <CharacterSet>Unicode</CharacterSet>
+ </PropertyGroup>
+ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.props" />
+ <ImportGroup Label="ExtensionSettings">
+ </ImportGroup>
+ <ImportGroup Label="PropertySheets">
+ <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
+ <Import Project="$(SolutionDir)\..\vsprojects\global.props" />
+ <Import Project="$(SolutionDir)\..\vsprojects\openssl.props" />
+ <Import Project="$(SolutionDir)\..\vsprojects\winsock.props" />
+ <Import Project="$(SolutionDir)\..\vsprojects\zlib.props" />
+ </ImportGroup>
+ <PropertyGroup Label="UserMacros" />
+ <PropertyGroup Condition="'$(Configuration)'=='Debug'">
+ <TargetName>byte_stream_test</TargetName>
+ <Linkage-grpc_dependencies_zlib>static</Linkage-grpc_dependencies_zlib>
+ <Configuration-grpc_dependencies_zlib>Debug</Configuration-grpc_dependencies_zlib>
+ <Linkage-grpc_dependencies_openssl>static</Linkage-grpc_dependencies_openssl>
+ <Configuration-grpc_dependencies_openssl>Debug</Configuration-grpc_dependencies_openssl>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(Configuration)'=='Release'">
+ <TargetName>byte_stream_test</TargetName>
+ <Linkage-grpc_dependencies_zlib>static</Linkage-grpc_dependencies_zlib>
+ <Configuration-grpc_dependencies_zlib>Release</Configuration-grpc_dependencies_zlib>
+ <Linkage-grpc_dependencies_openssl>static</Linkage-grpc_dependencies_openssl>
+ <Configuration-grpc_dependencies_openssl>Release</Configuration-grpc_dependencies_openssl>
+ </PropertyGroup>
+ <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
+ <ClCompile>
+ <PrecompiledHeader>NotUsing</PrecompiledHeader>
+ <WarningLevel>Level3</WarningLevel>
+ <Optimization>Disabled</Optimization>
+ <PreprocessorDefinitions>WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <SDLCheck>true</SDLCheck>
+ <RuntimeLibrary>MultiThreadedDebug</RuntimeLibrary>
+ <TreatWarningAsError>true</TreatWarningAsError>
+ <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat>
+ <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild>
+ </ClCompile>
+ <Link>
+ <SubSystem>Console</SubSystem>
+ <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation>
+ <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation>
+ </Link>
+ </ItemDefinitionGroup>
+
+ <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">
+ <ClCompile>
+ <PrecompiledHeader>NotUsing</PrecompiledHeader>
+ <WarningLevel>Level3</WarningLevel>
+ <Optimization>Disabled</Optimization>
+ <PreprocessorDefinitions>WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <SDLCheck>true</SDLCheck>
+ <RuntimeLibrary>MultiThreadedDebug</RuntimeLibrary>
+ <TreatWarningAsError>true</TreatWarningAsError>
+ <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat>
+ <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild>
+ </ClCompile>
+ <Link>
+ <SubSystem>Console</SubSystem>
+ <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation>
+ <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation>
+ </Link>
+ </ItemDefinitionGroup>
+
+ <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
+ <ClCompile>
+ <PrecompiledHeader>NotUsing</PrecompiledHeader>
+ <WarningLevel>Level3</WarningLevel>
+ <Optimization>MaxSpeed</Optimization>
+ <PreprocessorDefinitions>WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <FunctionLevelLinking>true</FunctionLevelLinking>
+ <IntrinsicFunctions>true</IntrinsicFunctions>
+ <SDLCheck>true</SDLCheck>
+ <RuntimeLibrary>MultiThreaded</RuntimeLibrary>
+ <TreatWarningAsError>true</TreatWarningAsError>
+ <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat>
+ <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild>
+ </ClCompile>
+ <Link>
+ <SubSystem>Console</SubSystem>
+ <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation>
+ <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation>
+ <EnableCOMDATFolding>true</EnableCOMDATFolding>
+ <OptimizeReferences>true</OptimizeReferences>
+ </Link>
+ </ItemDefinitionGroup>
+
+ <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
+ <ClCompile>
+ <PrecompiledHeader>NotUsing</PrecompiledHeader>
+ <WarningLevel>Level3</WarningLevel>
+ <Optimization>MaxSpeed</Optimization>
+ <PreprocessorDefinitions>WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <FunctionLevelLinking>true</FunctionLevelLinking>
+ <IntrinsicFunctions>true</IntrinsicFunctions>
+ <SDLCheck>true</SDLCheck>
+ <RuntimeLibrary>MultiThreaded</RuntimeLibrary>
+ <TreatWarningAsError>true</TreatWarningAsError>
+ <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat>
+ <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild>
+ </ClCompile>
+ <Link>
+ <SubSystem>Console</SubSystem>
+ <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation>
+ <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation>
+ <EnableCOMDATFolding>true</EnableCOMDATFolding>
+ <OptimizeReferences>true</OptimizeReferences>
+ </Link>
+ </ItemDefinitionGroup>
+
+ <ItemGroup>
+ <ClCompile Include="$(SolutionDir)\..\test\core\transport\byte_stream_test.c">
+ </ClCompile>
+ </ItemGroup>
+ <ItemGroup>
+ <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\grpc_test_util\grpc_test_util.vcxproj">
+ <Project>{17BCAFC0-5FDC-4C94-AEB9-95F3E220614B}</Project>
+ </ProjectReference>
+ <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\grpc\grpc.vcxproj">
+ <Project>{29D16885-7228-4C31-81ED-5F9187C7F2A9}</Project>
+ </ProjectReference>
+ <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\gpr_test_util\gpr_test_util.vcxproj">
+ <Project>{EAB0A629-17A9-44DB-B5FF-E91A721FE037}</Project>
+ </ProjectReference>
+ <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\gpr\gpr.vcxproj">
+ <Project>{B23D3D1A-9438-4EDA-BEB6-9A0A03D17792}</Project>
+ </ProjectReference>
+ </ItemGroup>
+ <ItemGroup>
+ <None Include="packages.config" />
+ </ItemGroup>
+ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
+ <ImportGroup Label="ExtensionTargets">
+ <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies.zlib.redist.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies\grpc.dependencies.zlib.targets')" />
+ <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies.zlib.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies\grpc.dependencies.zlib.targets')" />
+ <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies.openssl.redist.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies\grpc.dependencies.openssl.targets')" />
+ <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies\grpc.dependencies.openssl.targets')" />
+ </ImportGroup>
+ <Target Name="EnsureNuGetPackageBuildImports" BeforeTargets="PrepareForBuild">
+ <PropertyGroup>
+ <ErrorText>This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}.</ErrorText>
+ </PropertyGroup>
+ <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies.zlib.redist.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies.zlib.redist.targets')" />
+ <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies.zlib.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies.zlib.targets')" />
+ <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies.openssl.redist.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies.openssl.redist.targets')" />
+ <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.props')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.props')" />
+ <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.targets')" />
+ </Target>
+</Project>
+
diff --git a/vsprojects/vcxproj/test/byte_stream_test/byte_stream_test.vcxproj.filters b/vsprojects/vcxproj/test/byte_stream_test/byte_stream_test.vcxproj.filters
new file mode 100644
index 0000000..65e35b7
--- /dev/null
+++ b/vsprojects/vcxproj/test/byte_stream_test/byte_stream_test.vcxproj.filters
@@ -0,0 +1,21 @@
+<?xml version="1.0" encoding="utf-8"?>
+<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <ItemGroup>
+ <ClCompile Include="$(SolutionDir)\..\test\core\transport\byte_stream_test.c">
+ <Filter>test\core\transport</Filter>
+ </ClCompile>
+ </ItemGroup>
+
+ <ItemGroup>
+ <Filter Include="test">
+ <UniqueIdentifier>{f172d292-4ad6-342a-f27a-096c06d43a31}</UniqueIdentifier>
+ </Filter>
+ <Filter Include="test\core">
+ <UniqueIdentifier>{d7f690de-dfe0-56fc-ff3b-38eec3931699}</UniqueIdentifier>
+ </Filter>
+ <Filter Include="test\core\transport">
+ <UniqueIdentifier>{f78f56ef-47df-c99d-18f0-86277f7013f3}</UniqueIdentifier>
+ </Filter>
+ </ItemGroup>
+</Project>
+