Merge pull request #11779 from markdroth/client_channel_fix
Don't set the channel's state to TRANSIENT_FAILURE if we updated the LB policy.
diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS
new file mode 100644
index 0000000..5e5c26c
--- /dev/null
+++ b/.github/CODEOWNERS
@@ -0,0 +1,25 @@
+# Auto-generated by the tools/mkowners/mkowners.py tool
+# Uses OWNERS files in different modules throughout the
+# repository as the source of truth for module ownership.
+** @a11r @nicolasnoble @ctiller
+bazel/** @nicolasnoble @dgquintas @ctiller
+cmake/** @jtattermusch @a11r @nicolasnoble @ctiller
+doc/PROTOCOL-HTTP2.md @ejona86 @a11r @nicolasnoble @ctiller
+doc/interop-test-descriptions.md @ejona86 @a11r @nicolasnoble @ctiller
+etc/** @jboeuf @nicolasnoble @a11r @ctiller
+include/** @ctiller @markdroth @dgquintas @a11r @nicolasnoble
+src/core/** @ctiller @markdroth @dgquintas @a11r @nicolasnoble
+src/cpp/** @ctiller @markdroth @dgquintas @a11r @nicolasnoble
+src/csharp/** @jtattermusch @apolcyn @a11r @nicolasnoble @ctiller
+src/node/** @murgatroid99 @a11r @nicolasnoble @ctiller
+src/objective-c/** @muxi @makdharma @a11r @nicolasnoble @ctiller
+src/php/** @stanley-cheung @murgatroid99 @a11r @nicolasnoble @ctiller
+src/python/** @nathanielmanistaatgoogle @kpayson64 @a11r @nicolasnoble @ctiller
+src/ruby/** @apolcyn @murgatroid99 @a11r @nicolasnoble @ctiller
+test/build/** @ctiller @markdroth @dgquintas @a11r @nicolasnoble
+test/core/** @ctiller @markdroth @dgquintas @a11r @nicolasnoble
+test/cpp/** @ctiller @markdroth @dgquintas @a11r @nicolasnoble
+tools/** @matt-kwong @jtattermusch @nicolasnoble @a11r @ctiller
+tools/codegen/core/** @ctiller @dgquintas @markdroth
+tools/dockerfile/** @matt-kwong @jtattermusch @nicolasnoble @a11r @ctiller
+tools/run_tests/** @matt-kwong @jtattermusch @nicolasnoble @a11r @ctiller
diff --git a/CMakeLists.txt b/CMakeLists.txt
index b535d7f..3dd8ec4 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -11405,6 +11405,7 @@
grpc
gpr_test_util
gpr
+ grpc++_test_config
${_gRPC_GFLAGS_LIBRARIES}
)
@@ -11756,6 +11757,7 @@
grpc
gpr_test_util
gpr
+ grpc++_test_config
${_gRPC_GFLAGS_LIBRARIES}
)
diff --git a/Makefile b/Makefile
index 23f8706..f58a02d 100644
--- a/Makefile
+++ b/Makefile
@@ -15398,16 +15398,16 @@
else
-$(BINDIR)/$(CONFIG)/qps_interarrival_test: $(PROTOBUF_DEP) $(QPS_INTERARRIVAL_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+$(BINDIR)/$(CONFIG)/qps_interarrival_test: $(PROTOBUF_DEP) $(QPS_INTERARRIVAL_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a
$(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@`
- $(Q) $(LDXX) $(LDFLAGS) $(QPS_INTERARRIVAL_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/qps_interarrival_test
+ $(Q) $(LDXX) $(LDFLAGS) $(QPS_INTERARRIVAL_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/qps_interarrival_test
endif
endif
-$(OBJDIR)/$(CONFIG)/test/cpp/qps/qps_interarrival_test.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+$(OBJDIR)/$(CONFIG)/test/cpp/qps/qps_interarrival_test.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a
deps_qps_interarrival_test: $(QPS_INTERARRIVAL_TEST_OBJS:.o=.dep)
@@ -15719,16 +15719,16 @@
else
-$(BINDIR)/$(CONFIG)/secure_sync_unary_ping_pong_test: $(PROTOBUF_DEP) $(SECURE_SYNC_UNARY_PING_PONG_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+$(BINDIR)/$(CONFIG)/secure_sync_unary_ping_pong_test: $(PROTOBUF_DEP) $(SECURE_SYNC_UNARY_PING_PONG_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a
$(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@`
- $(Q) $(LDXX) $(LDFLAGS) $(SECURE_SYNC_UNARY_PING_PONG_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/secure_sync_unary_ping_pong_test
+ $(Q) $(LDXX) $(LDFLAGS) $(SECURE_SYNC_UNARY_PING_PONG_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/secure_sync_unary_ping_pong_test
endif
endif
-$(OBJDIR)/$(CONFIG)/test/cpp/qps/secure_sync_unary_ping_pong_test.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+$(OBJDIR)/$(CONFIG)/test/cpp/qps/secure_sync_unary_ping_pong_test.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a
deps_secure_sync_unary_ping_pong_test: $(SECURE_SYNC_UNARY_PING_PONG_TEST_OBJS:.o=.dep)
diff --git a/OWNERS b/OWNERS
new file mode 100644
index 0000000..78fc251
--- /dev/null
+++ b/OWNERS
@@ -0,0 +1,6 @@
+# Top level ownership
+
+@a11r
+@nicolasnoble
+@ctiller
+
diff --git a/bazel/OWNERS b/bazel/OWNERS
new file mode 100644
index 0000000..8fc7502
--- /dev/null
+++ b/bazel/OWNERS
@@ -0,0 +1,5 @@
+set noparent
+@nicolasnoble
+@dgquintas
+@ctiller
+
diff --git a/binding.gyp b/binding.gyp
index 70dfd10..0b581b3 100644
--- a/binding.gyp
+++ b/binding.gyp
@@ -67,6 +67,14 @@
'ldflags': [
'-g',
],
+ 'cflags_c': [
+ '-Werror',
+ '-std=c99'
+ ],
+ 'cflags_cc': [
+ '-Werror',
+ '-std=c++11'
+ ],
'include_dirs': [
'.',
'include'
@@ -164,6 +172,24 @@
'<(node_root_dir)/deps/zlib',
'<(node_root_dir)/deps/cares/include'
]
+ }],
+ ['OS == "mac"', {
+ 'xcode_settings': {
+ 'MACOSX_DEPLOYMENT_TARGET': '10.9'
+ },
+ 'OTHER_CFLAGS': [
+ '-g',
+ '-Wall',
+ '-Wextra',
+ '-Werror',
+ '-Wno-long-long',
+ '-Wno-unused-parameter',
+ '-DOSATOMIC_USE_INLINED=1',
+ ],
+ 'OTHER_CPLUSPLUSFLAGS': [
+ '-stdlib=libc++',
+ '-std=c++11'
+ ],
}]
]
},
@@ -171,12 +197,6 @@
['OS=="win" or runtime=="electron"', {
'targets': [
{
- 'cflags': [
- '-std=c++11',
- '-std=c99',
- '-Wall',
- '-Werror'
- ],
'target_name': 'boringssl',
'product_prefix': 'lib',
'type': 'static_library',
@@ -488,17 +508,6 @@
'third_party/boringssl/ssl/tls_method.c',
'third_party/boringssl/ssl/tls_record.c',
],
- 'conditions': [
- ['OS=="mac"', {
- 'xcode_settings': {
- 'MACOSX_DEPLOYMENT_TARGET': '10.9',
- 'OTHER_CPLUSPLUSFLAGS': [
- '-stdlib=libc++',
- '-std=c++11'
- ],
- }
- }],
- ],
},
],
}],
@@ -536,11 +545,6 @@
'targets': [
# Only want to compile zlib under Windows
{
- 'cflags': [
- '-std=c99',
- '-Wall',
- '-Werror'
- ],
'target_name': 'z',
'product_prefix': 'lib',
'type': 'static_library',
@@ -569,11 +573,6 @@
],
'targets': [
{
- 'cflags': [
- '-std=c99',
- '-Wall',
- '-Werror'
- ],
'target_name': 'gpr',
'product_prefix': 'lib',
'type': 'static_library',
@@ -627,20 +626,8 @@
'src/core/lib/support/tmpfile_windows.c',
'src/core/lib/support/wrap_memcpy.c',
],
- "conditions": [
- ['OS == "mac"', {
- 'xcode_settings': {
- 'MACOSX_DEPLOYMENT_TARGET': '10.9'
- }
- }]
- ]
},
{
- 'cflags': [
- '-std=c99',
- '-Wall',
- '-Werror'
- ],
'target_name': 'grpc',
'product_prefix': 'lib',
'type': 'static_library',
@@ -898,20 +885,12 @@
'src/core/ext/filters/workarounds/workaround_utils.c',
'src/core/plugin_registry/grpc_plugin_registry.c',
],
- "conditions": [
- ['OS == "mac"', {
- 'xcode_settings': {
- 'MACOSX_DEPLOYMENT_TARGET': '10.9'
- }
- }]
- ]
},
{
'include_dirs': [
"<!(node -e \"require('nan')\")"
],
'cflags': [
- '-std=c++11',
'-pthread',
'-zdefs',
'-Wno-error=deprecated-declarations'
@@ -922,15 +901,6 @@
"boringssl",
]
}],
- ['OS=="mac"', {
- 'xcode_settings': {
- 'MACOSX_DEPLOYMENT_TARGET': '10.9',
- 'OTHER_CFLAGS': [
- '-stdlib=libc++',
- '-std=c++11'
- ]
- }
- }],
['OS=="win"', {
'dependencies': [
"z",
diff --git a/build.yaml b/build.yaml
index e55c4ca..8a2fbe1 100644
--- a/build.yaml
+++ b/build.yaml
@@ -4162,6 +4162,7 @@
- grpc
- gpr_test_util
- gpr
+ - grpc++_test_config
platforms:
- mac
- linux
@@ -4280,6 +4281,7 @@
- grpc
- gpr_test_util
- gpr
+ - grpc++_test_config
platforms:
- mac
- linux
diff --git a/cmake/OWNERS b/cmake/OWNERS
new file mode 100644
index 0000000..c0ae053
--- /dev/null
+++ b/cmake/OWNERS
@@ -0,0 +1,2 @@
+@jtattermusch
+
diff --git a/doc/OWNERS b/doc/OWNERS
new file mode 100644
index 0000000..f058617
--- /dev/null
+++ b/doc/OWNERS
@@ -0,0 +1,2 @@
+@ejona86 PROTOCOL-HTTP2.md interop-test-descriptions.md
+
diff --git a/doc/environment_variables.md b/doc/environment_variables.md
index 339b705..a283a5a 100644
--- a/doc/environment_variables.md
+++ b/doc/environment_variables.md
@@ -54,12 +54,14 @@
- op_failure - traces error information when failure is pushed onto a
completion queue
- round_robin - traces the round_robin load balancing policy
+ - resource_quota - trace resource quota objects internals
- glb - traces the grpclb load balancer
- queue_pluck
- queue_timeout
- server_channel - lightweight trace of significant server channel events
- secure_endpoint - traces bytes flowing through encrypted channels
- timer - timers (alarms) in the grpc internals
+ - timer_check - more detailed trace of timer logic in grpc internals
- transport_security - traces metadata about secure channel establishment
- tcp - traces bytes in and out of a channel
@@ -83,6 +85,11 @@
'all' can additionally be used to turn all traces on.
Individual traces can be disabled by prefixing them with '-'.
+ 'refcount' will turn on all of the tracers for refcount debugging.
+
+ if 'list_tracers' is present, then all of the available tracers will be
+ printed when the program starts up.
+
Example:
export GRPC_TRACE=all,-pending_tags
diff --git a/etc/OWNERS b/etc/OWNERS
new file mode 100644
index 0000000..2dfd7cd
--- /dev/null
+++ b/etc/OWNERS
@@ -0,0 +1,2 @@
+@jboeuf
+@nicolasnoble
diff --git a/include/OWNERS b/include/OWNERS
new file mode 100644
index 0000000..8dca75c
--- /dev/null
+++ b/include/OWNERS
@@ -0,0 +1,4 @@
+@ctiller
+@markdroth
+@dgquintas
+
diff --git a/include/grpc++/impl/codegen/config_protobuf.h b/include/grpc++/impl/codegen/config_protobuf.h
index 7387fa2..c5e5bdf 100644
--- a/include/grpc++/impl/codegen/config_protobuf.h
+++ b/include/grpc++/impl/codegen/config_protobuf.h
@@ -19,8 +19,6 @@
#ifndef GRPCXX_IMPL_CODEGEN_CONFIG_PROTOBUF_H
#define GRPCXX_IMPL_CODEGEN_CONFIG_PROTOBUF_H
-#define GRPC_OPEN_SOURCE_PROTO
-
#ifndef GRPC_CUSTOM_PROTOBUF_INT64
#include <google/protobuf/stubs/common.h>
#define GRPC_CUSTOM_PROTOBUF_INT64 ::google::protobuf::int64
diff --git a/include/grpc++/impl/codegen/proto_utils.h b/include/grpc++/impl/codegen/proto_utils.h
index fcb4453..67e8f71 100644
--- a/include/grpc++/impl/codegen/proto_utils.h
+++ b/include/grpc++/impl/codegen/proto_utils.h
@@ -39,7 +39,8 @@
const int kGrpcBufferWriterMaxBufferLength = 1024 * 1024;
-class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream {
+class GrpcBufferWriter final
+ : public ::grpc::protobuf::io::ZeroCopyOutputStream {
public:
explicit GrpcBufferWriter(grpc_byte_buffer** bp, int block_size)
: block_size_(block_size), byte_count_(0), have_backup_(false) {
@@ -87,8 +88,6 @@
grpc::protobuf::int64 ByteCount() const override { return byte_count_; }
- grpc_slice_buffer* SliceBuffer() { return slice_buffer_; }
-
private:
friend class GrpcBufferWriterPeer;
const int block_size_;
@@ -99,7 +98,8 @@
grpc_slice slice_;
};
-class GrpcBufferReader : public ::grpc::protobuf::io::ZeroCopyInputStream {
+class GrpcBufferReader final
+ : public ::grpc::protobuf::io::ZeroCopyInputStream {
public:
explicit GrpcBufferReader(grpc_byte_buffer* buffer)
: byte_count_(0), backup_count_(0), status_() {
@@ -160,7 +160,7 @@
return byte_count_ - backup_count_;
}
- protected:
+ private:
int64_t byte_count_;
int64_t backup_count_;
grpc_byte_buffer_reader reader_;
@@ -168,83 +168,57 @@
Status status_;
};
-template <class BufferWriter, class T>
-Status GenericSerialize(const grpc::protobuf::Message& msg,
- grpc_byte_buffer** bp, bool* own_buffer) {
- static_assert(
- std::is_base_of<protobuf::io::ZeroCopyOutputStream, BufferWriter>::value,
- "BufferWriter must be a subclass of io::ZeroCopyOutputStream");
- *own_buffer = true;
- int byte_size = msg.ByteSize();
- if (byte_size <= internal::kGrpcBufferWriterMaxBufferLength) {
- grpc_slice slice = g_core_codegen_interface->grpc_slice_malloc(byte_size);
- GPR_CODEGEN_ASSERT(
- GRPC_SLICE_END_PTR(slice) ==
- msg.SerializeWithCachedSizesToArray(GRPC_SLICE_START_PTR(slice)));
- *bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(&slice, 1);
- g_core_codegen_interface->grpc_slice_unref(slice);
- return g_core_codegen_interface->ok();
- } else {
- BufferWriter writer(bp, internal::kGrpcBufferWriterMaxBufferLength);
- return msg.SerializeToZeroCopyStream(&writer)
- ? g_core_codegen_interface->ok()
- : Status(StatusCode::INTERNAL, "Failed to serialize message");
- }
-}
-
-template <class BufferReader, class T>
-Status GenericDeserialize(grpc_byte_buffer* buffer,
- grpc::protobuf::Message* msg) {
- static_assert(
- std::is_base_of<protobuf::io::ZeroCopyInputStream, BufferReader>::value,
- "BufferReader must be a subclass of io::ZeroCopyInputStream");
- if (buffer == nullptr) {
- return Status(StatusCode::INTERNAL, "No payload");
- }
- Status result = g_core_codegen_interface->ok();
- {
- BufferReader reader(buffer);
- if (!reader.status().ok()) {
- return reader.status();
- }
- ::grpc::protobuf::io::CodedInputStream decoder(&reader);
- decoder.SetTotalBytesLimit(INT_MAX, INT_MAX);
- if (!msg->ParseFromCodedStream(&decoder)) {
- result = Status(StatusCode::INTERNAL, msg->InitializationErrorString());
- }
- if (!decoder.ConsumedEntireMessage()) {
- result = Status(StatusCode::INTERNAL, "Did not read entire message");
- }
- }
- g_core_codegen_interface->grpc_byte_buffer_destroy(buffer);
- return result;
-}
-
} // namespace internal
-// this is needed so the following class does not conflict with protobuf
-// serializers that utilize internal-only tools.
-#ifdef GRPC_OPEN_SOURCE_PROTO
-// This class provides a protobuf serializer. It translates between protobuf
-// objects and grpc_byte_buffers. More information about SerializationTraits can
-// be found in include/grpc++/impl/codegen/serialization_traits.h.
template <class T>
class SerializationTraits<T, typename std::enable_if<std::is_base_of<
grpc::protobuf::Message, T>::value>::type> {
public:
static Status Serialize(const grpc::protobuf::Message& msg,
grpc_byte_buffer** bp, bool* own_buffer) {
- return internal::GenericSerialize<internal::GrpcBufferWriter, T>(
- msg, bp, own_buffer);
+ *own_buffer = true;
+ int byte_size = msg.ByteSize();
+ if (byte_size <= internal::kGrpcBufferWriterMaxBufferLength) {
+ grpc_slice slice = g_core_codegen_interface->grpc_slice_malloc(byte_size);
+ GPR_CODEGEN_ASSERT(
+ GRPC_SLICE_END_PTR(slice) ==
+ msg.SerializeWithCachedSizesToArray(GRPC_SLICE_START_PTR(slice)));
+ *bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(&slice, 1);
+ g_core_codegen_interface->grpc_slice_unref(slice);
+ return g_core_codegen_interface->ok();
+ } else {
+ internal::GrpcBufferWriter writer(
+ bp, internal::kGrpcBufferWriterMaxBufferLength);
+ return msg.SerializeToZeroCopyStream(&writer)
+ ? g_core_codegen_interface->ok()
+ : Status(StatusCode::INTERNAL, "Failed to serialize message");
+ }
}
static Status Deserialize(grpc_byte_buffer* buffer,
grpc::protobuf::Message* msg) {
- return internal::GenericDeserialize<internal::GrpcBufferReader, T>(buffer,
- msg);
+ if (buffer == nullptr) {
+ return Status(StatusCode::INTERNAL, "No payload");
+ }
+ Status result = g_core_codegen_interface->ok();
+ {
+ internal::GrpcBufferReader reader(buffer);
+ if (!reader.status().ok()) {
+ return reader.status();
+ }
+ ::grpc::protobuf::io::CodedInputStream decoder(&reader);
+ decoder.SetTotalBytesLimit(INT_MAX, INT_MAX);
+ if (!msg->ParseFromCodedStream(&decoder)) {
+ result = Status(StatusCode::INTERNAL, msg->InitializationErrorString());
+ }
+ if (!decoder.ConsumedEntireMessage()) {
+ result = Status(StatusCode::INTERNAL, "Did not read entire message");
+ }
+ }
+ g_core_codegen_interface->grpc_byte_buffer_destroy(buffer);
+ return result;
}
};
-#endif
} // namespace grpc
diff --git a/package.json b/package.json
index d5eec72..b4b1663 100644
--- a/package.json
+++ b/package.json
@@ -56,7 +56,7 @@
},
"binary": {
"module_name": "grpc_node",
- "module_path": "src/node/extension_binary",
+ "module_path": "src/node/extension_binary/{node_abi}-{platform}-{arch}",
"host": "https://storage.googleapis.com/",
"remote_path": "grpc-precompiled-binaries/node/{name}/v{version}",
"package_name": "{node_abi}-{platform}-{arch}.tar.gz"
diff --git a/src/core/OWNERS b/src/core/OWNERS
new file mode 100644
index 0000000..8dca75c
--- /dev/null
+++ b/src/core/OWNERS
@@ -0,0 +1,4 @@
+@ctiller
+@markdroth
+@dgquintas
+
diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c
index a383d4f..58e31d7 100644
--- a/src/core/ext/filters/client_channel/client_channel.c
+++ b/src/core/ext/filters/client_channel/client_channel.c
@@ -52,7 +52,8 @@
/* Client channel implementation */
-grpc_tracer_flag grpc_client_channel_trace = GRPC_TRACER_INITIALIZER(false);
+grpc_tracer_flag grpc_client_channel_trace =
+ GRPC_TRACER_INITIALIZER(false, "client_channel");
/*************************************************************************
* METHOD-CONFIG TABLE
diff --git a/src/core/ext/filters/client_channel/client_channel_plugin.c b/src/core/ext/filters/client_channel/client_channel_plugin.c
index 6f133a6..c32e83d 100644
--- a/src/core/ext/filters/client_channel/client_channel_plugin.c
+++ b/src/core/ext/filters/client_channel/client_channel_plugin.c
@@ -78,9 +78,9 @@
GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, append_filter,
(void *)&grpc_client_channel_filter);
grpc_http_connect_register_handshaker_factory();
- grpc_register_tracer("client_channel", &grpc_client_channel_trace);
+ grpc_register_tracer(&grpc_client_channel_trace);
#ifndef NDEBUG
- grpc_register_tracer("resolver_refcount", &grpc_trace_resolver_refcount);
+ grpc_register_tracer(&grpc_trace_resolver_refcount);
#endif
}
diff --git a/src/core/ext/filters/client_channel/lb_policy.c b/src/core/ext/filters/client_channel/lb_policy.c
index 8d69ba6..10b0322 100644
--- a/src/core/ext/filters/client_channel/lb_policy.c
+++ b/src/core/ext/filters/client_channel/lb_policy.c
@@ -22,7 +22,8 @@
#define WEAK_REF_BITS 16
#ifndef NDEBUG
-grpc_tracer_flag grpc_trace_lb_policy_refcount = GRPC_TRACER_INITIALIZER(false);
+grpc_tracer_flag grpc_trace_lb_policy_refcount =
+ GRPC_TRACER_INITIALIZER(false, "lb_policy_refcount");
#endif
void grpc_lb_policy_init(grpc_lb_policy *policy,
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
index 5a5ff29..cccc3e8 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
@@ -123,7 +123,7 @@
#define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120
#define GRPC_GRPCLB_RECONNECT_JITTER 0.2
-grpc_tracer_flag grpc_lb_glb_trace = GRPC_TRACER_INITIALIZER(false);
+grpc_tracer_flag grpc_lb_glb_trace = GRPC_TRACER_INITIALIZER(false, "glb");
/* add lb_token of selected subchannel (address) to the call's initial
* metadata */
@@ -1879,9 +1879,9 @@
void grpc_lb_policy_grpclb_init() {
grpc_register_lb_policy(grpc_glb_lb_factory_create());
- grpc_register_tracer("glb", &grpc_lb_glb_trace);
+ grpc_register_tracer(&grpc_lb_glb_trace);
#ifndef NDEBUG
- grpc_register_tracer("lb_policy_refcount", &grpc_trace_lb_policy_refcount);
+ grpc_register_tracer(&grpc_trace_lb_policy_refcount);
#endif
grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL,
GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
index d0acd7a..fd0fb41 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
@@ -28,7 +28,8 @@
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/transport/connectivity_state.h"
-grpc_tracer_flag grpc_lb_pick_first_trace = GRPC_TRACER_INITIALIZER(false);
+grpc_tracer_flag grpc_lb_pick_first_trace =
+ GRPC_TRACER_INITIALIZER(false, "pick_first");
typedef struct pending_pick {
struct pending_pick *next;
@@ -707,7 +708,7 @@
void grpc_lb_policy_pick_first_init() {
grpc_register_lb_policy(pick_first_lb_factory_create());
- grpc_register_tracer("pick_first", &grpc_lb_pick_first_trace);
+ grpc_register_tracer(&grpc_lb_pick_first_trace);
}
void grpc_lb_policy_pick_first_shutdown() {}
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
index 56d340b..3758008 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
@@ -37,7 +37,8 @@
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/static_metadata.h"
-grpc_tracer_flag grpc_lb_round_robin_trace = GRPC_TRACER_INITIALIZER(false);
+grpc_tracer_flag grpc_lb_round_robin_trace =
+ GRPC_TRACER_INITIALIZER(false, "round_robin");
/** List of entities waiting for a pick.
*
@@ -158,6 +159,7 @@
if (sd->user_data != NULL) {
GPR_ASSERT(sd->user_data_vtable != NULL);
sd->user_data_vtable->destroy(exec_ctx, sd->user_data);
+ sd->user_data = NULL;
}
}
gpr_free(subchannel_list->subchannels);
@@ -578,6 +580,7 @@
if (sd->user_data != NULL) {
GPR_ASSERT(sd->user_data_vtable != NULL);
sd->user_data_vtable->destroy(exec_ctx, sd->user_data);
+ sd->user_data = NULL;
}
if (new_policy_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
/* the policy is shutting down. Flush all the pending picks... */
@@ -866,7 +869,7 @@
void grpc_lb_policy_round_robin_init() {
grpc_register_lb_policy(round_robin_lb_factory_create());
- grpc_register_tracer("round_robin", &grpc_lb_round_robin_trace);
+ grpc_register_tracer(&grpc_lb_round_robin_trace);
}
void grpc_lb_policy_round_robin_shutdown() {}
diff --git a/src/core/ext/filters/client_channel/resolver.c b/src/core/ext/filters/client_channel/resolver.c
index de9a8ce..8401504 100644
--- a/src/core/ext/filters/client_channel/resolver.c
+++ b/src/core/ext/filters/client_channel/resolver.c
@@ -20,7 +20,8 @@
#include "src/core/lib/iomgr/combiner.h"
#ifndef NDEBUG
-grpc_tracer_flag grpc_trace_resolver_refcount = GRPC_TRACER_INITIALIZER(false);
+grpc_tracer_flag grpc_trace_resolver_refcount =
+ GRPC_TRACER_INITIALIZER(false, "resolver_refcount");
#endif
void grpc_resolver_init(grpc_resolver *resolver,
diff --git a/src/core/ext/filters/http/http_filters_plugin.c b/src/core/ext/filters/http/http_filters_plugin.c
index 3e4ec01..a5c1b92 100644
--- a/src/core/ext/filters/http/http_filters_plugin.c
+++ b/src/core/ext/filters/http/http_filters_plugin.c
@@ -65,7 +65,7 @@
}
void grpc_http_filters_init(void) {
- grpc_register_tracer("compression", &grpc_compression_trace);
+ grpc_register_tracer(&grpc_compression_trace);
grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL,
GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
maybe_add_optional_filter, &compress_filter);
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_plugin.c b/src/core/ext/transport/chttp2/transport/chttp2_plugin.c
index 6a8c814..78551df 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_plugin.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_plugin.c
@@ -21,10 +21,10 @@
#include "src/core/lib/transport/metadata.h"
void grpc_chttp2_plugin_init(void) {
- grpc_register_tracer("http", &grpc_http_trace);
- grpc_register_tracer("flowctl", &grpc_flowctl_trace);
+ grpc_register_tracer(&grpc_http_trace);
+ grpc_register_tracer(&grpc_flowctl_trace);
#ifndef NDEBUG
- grpc_register_tracer("chttp2_refcount", &grpc_trace_chttp2_refcount);
+ grpc_register_tracer(&grpc_trace_chttp2_refcount);
#endif
}
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index f790267..0859ce8 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -74,11 +74,12 @@
DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS;
#define MAX_CLIENT_STREAM_ID 0x7fffffffu
-grpc_tracer_flag grpc_http_trace = GRPC_TRACER_INITIALIZER(false);
-grpc_tracer_flag grpc_flowctl_trace = GRPC_TRACER_INITIALIZER(false);
+grpc_tracer_flag grpc_http_trace = GRPC_TRACER_INITIALIZER(false, "http");
+grpc_tracer_flag grpc_flowctl_trace = GRPC_TRACER_INITIALIZER(false, "flowctl");
#ifndef NDEBUG
-grpc_tracer_flag grpc_trace_chttp2_refcount = GRPC_TRACER_INITIALIZER(false);
+grpc_tracer_flag grpc_trace_chttp2_refcount =
+ GRPC_TRACER_INITIALIZER(false, "chttp2_refcount");
#endif
static const grpc_transport_vtable vtable;
diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c
index 1066664..0f8e33c 100644
--- a/src/core/lib/channel/channel_stack.c
+++ b/src/core/lib/channel/channel_stack.c
@@ -23,7 +23,7 @@
#include <stdlib.h>
#include <string.h>
-grpc_tracer_flag grpc_trace_channel = GRPC_TRACER_INITIALIZER(false);
+grpc_tracer_flag grpc_trace_channel = GRPC_TRACER_INITIALIZER(false, "channel");
/* Memory layouts.
diff --git a/src/core/lib/channel/channel_stack_builder.c b/src/core/lib/channel/channel_stack_builder.c
index 01529df..c369e33 100644
--- a/src/core/lib/channel/channel_stack_builder.c
+++ b/src/core/lib/channel/channel_stack_builder.c
@@ -24,7 +24,7 @@
#include <grpc/support/string_util.h>
grpc_tracer_flag grpc_trace_channel_stack_builder =
- GRPC_TRACER_INITIALIZER(false);
+ GRPC_TRACER_INITIALIZER(false, "channel_stack_builder");
typedef struct filter_node {
struct filter_node *next;
diff --git a/src/core/lib/debug/trace.c b/src/core/lib/debug/trace.c
index 8249b2e..c6c1853 100644
--- a/src/core/lib/debug/trace.c
+++ b/src/core/lib/debug/trace.c
@@ -27,7 +27,6 @@
int grpc_tracer_set_enabled(const char *name, int enabled);
typedef struct tracer {
- const char *name;
grpc_tracer_flag *flag;
struct tracer *next;
} tracer;
@@ -39,9 +38,8 @@
#define TRACER_SET(flag, on) (flag).value = (on)
#endif
-void grpc_register_tracer(const char *name, grpc_tracer_flag *flag) {
+void grpc_register_tracer(grpc_tracer_flag *flag) {
tracer *t = gpr_malloc(sizeof(*t));
- t->name = name;
t->flag = flag;
t->next = tracers;
TRACER_SET(*flag, false);
@@ -93,6 +91,14 @@
gpr_free(strings);
}
+static void list_tracers() {
+ gpr_log(GPR_DEBUG, "available tracers:");
+ tracer *t;
+ for (t = tracers; t; t = t->next) {
+ gpr_log(GPR_DEBUG, "\t%s", t->flag->name);
+ }
+}
+
void grpc_tracer_init(const char *env_var) {
char *e = gpr_getenv(env_var);
if (e != NULL) {
@@ -115,10 +121,18 @@
for (t = tracers; t; t = t->next) {
TRACER_SET(*t->flag, enabled);
}
+ } else if (0 == strcmp(name, "list_tracers")) {
+ list_tracers();
+ } else if (0 == strcmp(name, "refcount")) {
+ for (t = tracers; t; t = t->next) {
+ if (strstr(t->flag->name, "refcount") != NULL) {
+ TRACER_SET(*t->flag, enabled);
+ }
+ }
} else {
int found = 0;
for (t = tracers; t; t = t->next) {
- if (0 == strcmp(name, t->name)) {
+ if (0 == strcmp(name, t->flag->name)) {
TRACER_SET(*t->flag, enabled);
found = 1;
}
diff --git a/src/core/lib/debug/trace.h b/src/core/lib/debug/trace.h
index 7cc9fb4..dd9e6a3 100644
--- a/src/core/lib/debug/trace.h
+++ b/src/core/lib/debug/trace.h
@@ -35,19 +35,20 @@
#else
bool value;
#endif
+ char *name;
} grpc_tracer_flag;
#ifdef GRPC_THREADSAFE_TRACER
#define GRPC_TRACER_ON(flag) (gpr_atm_no_barrier_load(&(flag).value) != 0)
-#define GRPC_TRACER_INITIALIZER(on) \
- { (gpr_atm)(on) }
+#define GRPC_TRACER_INITIALIZER(on, name) \
+ { (gpr_atm)(on), (name) }
#else
#define GRPC_TRACER_ON(flag) ((flag).value)
-#define GRPC_TRACER_INITIALIZER(on) \
- { (on) }
+#define GRPC_TRACER_INITIALIZER(on, name) \
+ { (on), (name) }
#endif
-void grpc_register_tracer(const char *name, grpc_tracer_flag *flag);
+void grpc_register_tracer(grpc_tracer_flag *flag);
void grpc_tracer_init(const char *env_var_name);
void grpc_tracer_shutdown(void);
diff --git a/src/core/lib/http/parser.c b/src/core/lib/http/parser.c
index 71d697c..9c5e93f 100644
--- a/src/core/lib/http/parser.c
+++ b/src/core/lib/http/parser.c
@@ -25,7 +25,7 @@
#include <grpc/support/log.h>
#include <grpc/support/useful.h>
-grpc_tracer_flag grpc_http1_trace = GRPC_TRACER_INITIALIZER(false);
+grpc_tracer_flag grpc_http1_trace = GRPC_TRACER_INITIALIZER(false, "http1");
static char *buf2str(void *buffer, size_t length) {
char *out = gpr_malloc(length + 1);
diff --git a/src/core/lib/iomgr/closure.c b/src/core/lib/iomgr/closure.c
index e028e72..26f9cbe 100644
--- a/src/core/lib/iomgr/closure.c
+++ b/src/core/lib/iomgr/closure.c
@@ -25,7 +25,7 @@
#include "src/core/lib/profiling/timers.h"
#ifndef NDEBUG
-grpc_tracer_flag grpc_trace_closure = GRPC_TRACER_INITIALIZER(false);
+grpc_tracer_flag grpc_trace_closure = GRPC_TRACER_INITIALIZER(false, "closure");
#endif
#ifndef NDEBUG
diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c
index 7f9c5d8..c72c37e 100644
--- a/src/core/lib/iomgr/combiner.c
+++ b/src/core/lib/iomgr/combiner.c
@@ -27,7 +27,8 @@
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/profiling/timers.h"
-grpc_tracer_flag grpc_combiner_trace = GRPC_TRACER_INITIALIZER(false);
+grpc_tracer_flag grpc_combiner_trace =
+ GRPC_TRACER_INITIALIZER(false, "combiner");
#define GRPC_COMBINER_TRACE(fn) \
do { \
diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c
index a95929a..3759dda 100644
--- a/src/core/lib/iomgr/error.c
+++ b/src/core/lib/iomgr/error.c
@@ -36,7 +36,8 @@
#include "src/core/lib/slice/slice_internal.h"
#ifndef NDEBUG
-grpc_tracer_flag grpc_trace_error_refcount = GRPC_TRACER_INITIALIZER(false);
+grpc_tracer_flag grpc_trace_error_refcount =
+ GRPC_TRACER_INITIALIZER(false, "error_refcount");
#endif
static const char *error_int_name(grpc_error_ints key) {
diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c
index 5574838..5690431 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.c
+++ b/src/core/lib/iomgr/ev_epollex_linux.c
@@ -103,6 +103,32 @@
grpc_pollset_worker *root_worker;
} pollable;
+static const char *polling_obj_type_string(polling_obj_type t) {
+ switch (t) {
+ case PO_POLLING_GROUP:
+ return "polling_group";
+ case PO_POLLSET_SET:
+ return "pollset_set";
+ case PO_POLLSET:
+ return "pollset";
+ case PO_FD:
+ return "fd";
+ case PO_EMPTY_POLLABLE:
+ return "empty_pollable";
+ case PO_COUNT:
+ return "<invalid:count>";
+ }
+ return "<invalid>";
+}
+
+static char *pollable_desc(pollable *p) {
+ char *out;
+ gpr_asprintf(&out, "type=%s group=%p epfd=%d wakeup=%d",
+ polling_obj_type_string(p->po.type), p->po.group, p->epfd,
+ p->wakeup.read_fd);
+ return out;
+}
+
static pollable g_empty_pollable;
static void pollable_init(pollable *p, polling_obj_type type);
@@ -472,7 +498,7 @@
GPR_ASSERT(epfd != -1);
if (GRPC_TRACER_ON(grpc_polling_trace)) {
- gpr_log(GPR_DEBUG, "add fd %p to pollable %p", fd, p);
+ gpr_log(GPR_DEBUG, "add fd %p (%d) to pollable %p", fd, fd->fd, p);
}
gpr_mu_lock(&fd->orphaned_mu);
@@ -537,10 +563,18 @@
if (worker->pollable != &pollset->pollable) {
gpr_mu_lock(&worker->pollable->po.mu);
}
- if (worker->initialized_cv) {
+ if (worker->initialized_cv && worker != pollset->root_worker) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, "PS:%p kickall_via_cv %p (pollable %p vs %p)",
+ pollset, worker, &pollset->pollable, worker->pollable);
+ }
worker->kicked = true;
gpr_cv_signal(&worker->cv);
} else {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, "PS:%p kickall_via_wakeup %p (pollable %p vs %p)",
+ pollset, worker, &pollset->pollable, worker->pollable);
+ }
append_error(&error, grpc_wakeup_fd_wakeup(&worker->pollable->wakeup),
"pollset_shutdown");
}
@@ -770,7 +804,9 @@
int timeout = poll_deadline_to_millis_timeout(deadline, now);
if (GRPC_TRACER_ON(grpc_polling_trace)) {
- gpr_log(GPR_DEBUG, "PS:%p poll %p for %dms", pollset, p, timeout);
+ char *desc = pollable_desc(p);
+ gpr_log(GPR_DEBUG, "PS:%p poll %p[%s] for %dms", pollset, p, desc, timeout);
+ gpr_free(desc);
}
if (timeout != 0) {
@@ -985,10 +1021,11 @@
static const char *err_desc = "pollset_add_fd";
grpc_error *error = GRPC_ERROR_NONE;
if (pollset->current_pollable == &g_empty_pollable) {
- if (GRPC_TRACER_ON(grpc_polling_trace))
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG,
"PS:%p add fd %p; transition pollable from empty to fd", pollset,
fd);
+ }
/* empty pollable --> single fd pollable */
pollset_kick_all(exec_ctx, pollset);
pollset->current_pollable = &fd->pollable;
@@ -997,16 +1034,23 @@
if (!fd_locked) gpr_mu_unlock(&fd->pollable.po.mu);
REF_BY(fd, 2, "pollset_pollable");
} else if (pollset->current_pollable == &pollset->pollable) {
- if (GRPC_TRACER_ON(grpc_polling_trace))
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG, "PS:%p add fd %p; already multipolling", pollset, fd);
+ }
append_error(&error, pollable_add_fd(pollset->current_pollable, fd),
err_desc);
} else if (pollset->current_pollable != &fd->pollable) {
grpc_fd *had_fd = (grpc_fd *)pollset->current_pollable;
- if (GRPC_TRACER_ON(grpc_polling_trace))
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG,
"PS:%p add fd %p; transition pollable from fd %p to multipoller",
pollset, fd, had_fd);
+ }
+ /* Introduce a spurious completion.
+ If we do not, then it may be that the fd-specific epoll set consumed
+ a completion without being polled, leading to a missed edge going up. */
+ grpc_lfev_set_ready(exec_ctx, &had_fd->read_closure);
+ grpc_lfev_set_ready(exec_ctx, &had_fd->write_closure);
pollset_kick_all(exec_ctx, pollset);
pollset->current_pollable = &pollset->pollable;
if (append_error(&error, pollable_materialize(&pollset->pollable),
diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c
index 2648df3..a5ae04c 100644
--- a/src/core/lib/iomgr/ev_posix.c
+++ b/src/core/lib/iomgr/ev_posix.c
@@ -39,10 +39,11 @@
#include "src/core/lib/support/env.h"
grpc_tracer_flag grpc_polling_trace =
- GRPC_TRACER_INITIALIZER(false); /* Disabled by default */
+ GRPC_TRACER_INITIALIZER(false, "polling"); /* Disabled by default */
#ifndef NDEBUG
-grpc_tracer_flag grpc_trace_fd_refcount = GRPC_TRACER_INITIALIZER(false);
+grpc_tracer_flag grpc_trace_fd_refcount =
+ GRPC_TRACER_INITIALIZER(false, "fd_refcount");
#endif
/** Default poll() function - a pointer so that it can be overridden by some
@@ -124,7 +125,7 @@
const char *grpc_get_poll_strategy_name() { return g_poll_strategy_name; }
void grpc_event_engine_init(void) {
- grpc_register_tracer("polling", &grpc_polling_trace);
+ grpc_register_tracer(&grpc_polling_trace);
char *s = gpr_getenv("GRPC_POLL_STRATEGY");
if (s == NULL) {
diff --git a/src/core/lib/iomgr/ev_windows.c b/src/core/lib/iomgr/ev_windows.c
index 027609c..c24dfae 100644
--- a/src/core/lib/iomgr/ev_windows.c
+++ b/src/core/lib/iomgr/ev_windows.c
@@ -23,6 +23,6 @@
#include "src/core/lib/debug/trace.h"
grpc_tracer_flag grpc_polling_trace =
- GRPC_TRACER_INITIALIZER(false); /* Disabled by default */
+ GRPC_TRACER_INITIALIZER(false, "polling"); /* Disabled by default */
#endif // GRPC_WINSOCK_SOCKET
diff --git a/src/core/lib/iomgr/iomgr_posix.c b/src/core/lib/iomgr/iomgr_posix.c
index 43f5d04..f5875a2 100644
--- a/src/core/lib/iomgr/iomgr_posix.c
+++ b/src/core/lib/iomgr/iomgr_posix.c
@@ -28,7 +28,7 @@
void grpc_iomgr_platform_init(void) {
grpc_wakeup_fd_global_init();
grpc_event_engine_init();
- grpc_register_tracer("tcp", &grpc_tcp_trace);
+ grpc_register_tracer(&grpc_tcp_trace);
}
void grpc_iomgr_platform_flush(void) {}
diff --git a/src/core/lib/iomgr/iomgr_uv.c b/src/core/lib/iomgr/iomgr_uv.c
index 49d1a03..8b1245c 100644
--- a/src/core/lib/iomgr/iomgr_uv.c
+++ b/src/core/lib/iomgr/iomgr_uv.c
@@ -26,7 +26,7 @@
void grpc_iomgr_platform_init(void) {
grpc_pollset_global_init();
- grpc_register_tracer("tcp", &grpc_tcp_trace);
+ grpc_register_tracer(&grpc_tcp_trace);
}
void grpc_iomgr_platform_flush(void) {}
void grpc_iomgr_platform_shutdown(void) { grpc_pollset_global_shutdown(); }
diff --git a/src/core/lib/iomgr/pollset_uv.c b/src/core/lib/iomgr/pollset_uv.c
index 1a54065..946f0c8 100644
--- a/src/core/lib/iomgr/pollset_uv.c
+++ b/src/core/lib/iomgr/pollset_uv.c
@@ -34,7 +34,8 @@
#include "src/core/lib/debug/trace.h"
#ifndef NDEBUG
-grpc_tracer_flag grpc_trace_fd_refcount = GRPC_TRACER_INITIALIZER(false);
+grpc_tracer_flag grpc_trace_fd_refcount =
+ GRPC_TRACER_INITIALIZER(false, "fd_refcount");
#endif
struct grpc_pollset {
diff --git a/src/core/lib/iomgr/pollset_windows.c b/src/core/lib/iomgr/pollset_windows.c
index 1bfc2a2..ea017a6 100644
--- a/src/core/lib/iomgr/pollset_windows.c
+++ b/src/core/lib/iomgr/pollset_windows.c
@@ -31,7 +31,8 @@
#define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1)
#ifndef NDEBUG
-grpc_tracer_flag grpc_trace_fd_refcount = GRPC_TRACER_INITIALIZER(false);
+grpc_tracer_flag grpc_trace_fd_refcount =
+ GRPC_TRACER_INITIALIZER(false, "fd_refcount");
#endif
gpr_mu grpc_polling_mu;
diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c
index f2cc1be..a31d9ee 100644
--- a/src/core/lib/iomgr/resource_quota.c
+++ b/src/core/lib/iomgr/resource_quota.c
@@ -29,7 +29,8 @@
#include "src/core/lib/iomgr/combiner.h"
-grpc_tracer_flag grpc_resource_quota_trace = GRPC_TRACER_INITIALIZER(false);
+grpc_tracer_flag grpc_resource_quota_trace =
+ GRPC_TRACER_INITIALIZER(false, "resource_quota");
#define MEMORY_USAGE_ESTIMATION_MAX 65536
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index 5de2b0f..b6dcd15 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -59,7 +59,7 @@
typedef size_t msg_iovlen_type;
#endif
-grpc_tracer_flag grpc_tcp_trace = GRPC_TRACER_INITIALIZER(false);
+grpc_tracer_flag grpc_tcp_trace = GRPC_TRACER_INITIALIZER(false, "tcp");
typedef struct {
grpc_endpoint base;
diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c
index ff5fd3e..7c6a9b8 100644
--- a/src/core/lib/iomgr/tcp_uv.c
+++ b/src/core/lib/iomgr/tcp_uv.c
@@ -37,7 +37,7 @@
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/string.h"
-grpc_tracer_flag grpc_tcp_trace = GRPC_TRACER_INITIALIZER(false);
+grpc_tracer_flag grpc_tcp_trace = GRPC_TRACER_INITIALIZER(false, "tcp");
typedef struct {
grpc_endpoint base;
diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c
index 6704a15..2cbb974 100644
--- a/src/core/lib/iomgr/tcp_windows.c
+++ b/src/core/lib/iomgr/tcp_windows.c
@@ -48,7 +48,7 @@
#define GRPC_FIONBIO FIONBIO
#endif
-grpc_tracer_flag grpc_tcp_trace = GRPC_TRACER_INITIALIZER(false);
+grpc_tracer_flag grpc_tcp_trace = GRPC_TRACER_INITIALIZER(false, "tcp");
static grpc_error *set_non_block(SOCKET sock) {
int status;
diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c
index bf73d2c..12efce2 100644
--- a/src/core/lib/iomgr/timer_generic.c
+++ b/src/core/lib/iomgr/timer_generic.c
@@ -41,43 +41,66 @@
#define MIN_QUEUE_WINDOW_DURATION 0.01
#define MAX_QUEUE_WINDOW_DURATION 1
-grpc_tracer_flag grpc_timer_trace = GRPC_TRACER_INITIALIZER(false);
-grpc_tracer_flag grpc_timer_check_trace = GRPC_TRACER_INITIALIZER(false);
+grpc_tracer_flag grpc_timer_trace = GRPC_TRACER_INITIALIZER(false, "timer");
+grpc_tracer_flag grpc_timer_check_trace =
+ GRPC_TRACER_INITIALIZER(false, "timer_check");
+/* A "timer shard". Contains a 'heap' and a 'list' of timers. All timers with
+ * deadlines earlier than 'queue_deadline" cap are maintained in the heap and
+ * others are maintained in the list (unordered). This helps to keep the number
+ * of elements in the heap low.
+ *
+ * The 'queue_deadline_cap' gets recomputed periodically based on the timer
+ * stats maintained in 'stats' and the relevant timers are then moved from the
+ * 'list' to 'heap'
+ */
typedef struct {
gpr_mu mu;
grpc_time_averaged_stats stats;
/* All and only timers with deadlines <= this will be in the heap. */
gpr_atm queue_deadline_cap;
+ /* The deadline of the next timer due in this shard */
gpr_atm min_deadline;
- /* Index in the g_shard_queue */
+ /* Index of this timer_shard in the g_shard_queue */
uint32_t shard_queue_index;
/* This holds all timers with deadlines < queue_deadline_cap. Timers in this
list have the top bit of their deadline set to 0. */
grpc_timer_heap heap;
/* This holds timers whose deadline is >= queue_deadline_cap. */
grpc_timer list;
-} shard_type;
+} timer_shard;
+
+/* Array of timer shards. Whenever a timer (grpc_timer *) is added, its address
+ * is hashed to select the timer shard to add the timer to */
+static timer_shard g_shards[NUM_SHARDS];
+
+/* Maintains a sorted list of timer shards (sorted by their min_deadline, i.e
+ * the deadline of the next timer in each shard).
+ * Access to this is protected by g_shared_mutables.mu */
+static timer_shard *g_shard_queue[NUM_SHARDS];
+
+/* Thread local variable that stores the deadline of the next timer the thread
+ * has last-seen. This is an optimization to prevent the thread from checking
+ * shared_mutables.min_timer (which requires acquiring shared_mutables.mu lock,
+ * an expensive operation) */
+GPR_TLS_DECL(g_last_seen_min_timer);
struct shared_mutables {
+ /* The deadline of the next timer due across all timer shards */
gpr_atm min_timer;
/* Allow only one run_some_expired_timers at once */
gpr_spinlock checker_mu;
bool initialized;
- /* Protects g_shard_queue */
+ /* Protects g_shard_queue (and the shared_mutables struct itself) */
gpr_mu mu;
} GPR_ALIGN_STRUCT(GPR_CACHELINE_SIZE);
static struct shared_mutables g_shared_mutables = {
.checker_mu = GPR_SPINLOCK_STATIC_INITIALIZER, .initialized = false,
};
-static gpr_clock_type g_clock_type;
-static shard_type g_shards[NUM_SHARDS];
-/* Protected by g_shared_mutables.mu */
-static shard_type *g_shard_queue[NUM_SHARDS];
-static gpr_timespec g_start_time;
-GPR_TLS_DECL(g_last_seen_min_timer);
+static gpr_clock_type g_clock_type;
+static gpr_timespec g_start_time;
static gpr_atm saturating_add(gpr_atm a, gpr_atm b) {
if (a > GPR_ATM_MAX - b) {
@@ -122,7 +145,7 @@
return gpr_time_add(g_start_time, dbl_to_ts((double)x / 1000.0));
}
-static gpr_atm compute_min_deadline(shard_type *shard) {
+static gpr_atm compute_min_deadline(timer_shard *shard) {
return grpc_timer_heap_is_empty(&shard->heap)
? saturating_add(shard->queue_deadline_cap, 1)
: grpc_timer_heap_top(&shard->heap)->deadline;
@@ -138,11 +161,11 @@
g_shared_mutables.min_timer = timespec_to_atm_round_down(now);
gpr_tls_init(&g_last_seen_min_timer);
gpr_tls_set(&g_last_seen_min_timer, 0);
- grpc_register_tracer("timer", &grpc_timer_trace);
- grpc_register_tracer("timer_check", &grpc_timer_check_trace);
+ grpc_register_tracer(&grpc_timer_trace);
+ grpc_register_tracer(&grpc_timer_check_trace);
for (i = 0; i < NUM_SHARDS; i++) {
- shard_type *shard = &g_shards[i];
+ timer_shard *shard = &g_shards[i];
gpr_mu_init(&shard->mu);
grpc_time_averaged_stats_init(&shard->stats, 1.0 / ADD_DEADLINE_SCALE, 0.1,
0.5);
@@ -161,7 +184,7 @@
exec_ctx, GPR_ATM_MAX, NULL,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Timer list shutdown"));
for (i = 0; i < NUM_SHARDS; i++) {
- shard_type *shard = &g_shards[i];
+ timer_shard *shard = &g_shards[i];
gpr_mu_destroy(&shard->mu);
grpc_timer_heap_destroy(&shard->heap);
}
@@ -187,7 +210,7 @@
}
static void swap_adjacent_shards_in_queue(uint32_t first_shard_queue_index) {
- shard_type *temp;
+ timer_shard *temp;
temp = g_shard_queue[first_shard_queue_index];
g_shard_queue[first_shard_queue_index] =
g_shard_queue[first_shard_queue_index + 1];
@@ -198,7 +221,7 @@
first_shard_queue_index + 1;
}
-static void note_deadline_change(shard_type *shard) {
+static void note_deadline_change(timer_shard *shard) {
while (shard->shard_queue_index > 0 &&
shard->min_deadline <
g_shard_queue[shard->shard_queue_index - 1]->min_deadline) {
@@ -215,7 +238,7 @@
gpr_timespec deadline, grpc_closure *closure,
gpr_timespec now) {
int is_first_timer = 0;
- shard_type *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)];
+ timer_shard *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)];
GPR_ASSERT(deadline.clock_type == g_clock_type);
GPR_ASSERT(now.clock_type == g_clock_type);
timer->closure = closure;
@@ -303,7 +326,7 @@
return;
}
- shard_type *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)];
+ timer_shard *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)];
gpr_mu_lock(&shard->mu);
if (GRPC_TRACER_ON(grpc_timer_trace)) {
gpr_log(GPR_DEBUG, "TIMER %p: CANCEL pending=%s", timer,
@@ -321,12 +344,12 @@
gpr_mu_unlock(&shard->mu);
}
-/* This is called when the queue is empty and "now" has reached the
- queue_deadline_cap. We compute a new queue deadline and then scan the map
- for timers that fall at or under it. Returns true if the queue is no
- longer empty.
+/* Rebalances the timer shard by computing a new 'queue_deadline_cap' and moving
+ all relevant timers in shard->list (i.e timers with deadlines earlier than
+ 'queue_deadline_cap') into into shard->heap.
+ Returns 'true' if shard->heap has atleast ONE element
REQUIRES: shard->mu locked */
-static int refill_queue(shard_type *shard, gpr_atm now) {
+static int refill_heap(timer_shard *shard, gpr_atm now) {
/* Compute the new queue window width and bound by the limits: */
double computed_deadline_delta =
grpc_time_averaged_stats_update_average(&shard->stats) *
@@ -363,7 +386,7 @@
/* This pops the next non-cancelled timer with deadline <= now from the
queue, or returns NULL if there isn't one.
REQUIRES: shard->mu locked */
-static grpc_timer *pop_one(shard_type *shard, gpr_atm now) {
+static grpc_timer *pop_one(timer_shard *shard, gpr_atm now) {
grpc_timer *timer;
for (;;) {
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
@@ -373,7 +396,7 @@
}
if (grpc_timer_heap_is_empty(&shard->heap)) {
if (now < shard->queue_deadline_cap) return NULL;
- if (!refill_queue(shard, now)) return NULL;
+ if (!refill_heap(shard, now)) return NULL;
}
timer = grpc_timer_heap_top(&shard->heap);
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
@@ -393,7 +416,7 @@
}
/* REQUIRES: shard->mu unlocked */
-static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard,
+static size_t pop_timers(grpc_exec_ctx *exec_ctx, timer_shard *shard,
gpr_atm now, gpr_atm *new_min_deadline,
grpc_error *error) {
size_t n = 0;
diff --git a/src/core/lib/iomgr/timer_uv.c b/src/core/lib/iomgr/timer_uv.c
index 4f204cf..1ab82ef 100644
--- a/src/core/lib/iomgr/timer_uv.c
+++ b/src/core/lib/iomgr/timer_uv.c
@@ -28,8 +28,9 @@
#include <uv.h>
-grpc_tracer_flag grpc_timer_trace = GRPC_TRACER_INITIALIZER(false);
-grpc_tracer_flag grpc_timer_check_trace = GRPC_TRACER_INITIALIZER(false);
+grpc_tracer_flag grpc_timer_trace = GRPC_TRACER_INITIALIZER(false, "timer");
+grpc_tracer_flag grpc_timer_check_trace =
+ GRPC_TRACER_INITIALIZER(false, "timer_check");
static void timer_close_callback(uv_handle_t *handle) { gpr_free(handle); }
diff --git a/src/core/lib/security/context/security_context.c b/src/core/lib/security/context/security_context.c
index dffe6d2..8fff2c9 100644
--- a/src/core/lib/security/context/security_context.c
+++ b/src/core/lib/security/context/security_context.c
@@ -31,7 +31,7 @@
#ifndef NDEBUG
grpc_tracer_flag grpc_trace_auth_context_refcount =
- GRPC_TRACER_INITIALIZER(false);
+ GRPC_TRACER_INITIALIZER(false, "auth_context_refcount");
#endif
/* --- grpc_call --- */
diff --git a/src/core/lib/security/transport/secure_endpoint.c b/src/core/lib/security/transport/secure_endpoint.c
index f4ed81d..5e41b94 100644
--- a/src/core/lib/security/transport/secure_endpoint.c
+++ b/src/core/lib/security/transport/secure_endpoint.c
@@ -60,7 +60,8 @@
gpr_refcount ref;
} secure_endpoint;
-grpc_tracer_flag grpc_trace_secure_endpoint = GRPC_TRACER_INITIALIZER(false);
+grpc_tracer_flag grpc_trace_secure_endpoint =
+ GRPC_TRACER_INITIALIZER(false, "secure_endpoint");
static void destroy(grpc_exec_ctx *exec_ctx, secure_endpoint *secure_ep) {
secure_endpoint *ep = secure_ep;
diff --git a/src/core/lib/security/transport/security_connector.c b/src/core/lib/security/transport/security_connector.c
index 3c0c242..6788126 100644
--- a/src/core/lib/security/transport/security_connector.c
+++ b/src/core/lib/security/transport/security_connector.c
@@ -45,7 +45,7 @@
#ifndef NDEBUG
grpc_tracer_flag grpc_trace_security_connector_refcount =
- GRPC_TRACER_INITIALIZER(false);
+ GRPC_TRACER_INITIALIZER(false, "security_connector_refcount");
#endif
/* -- Constants. -- */
@@ -383,8 +383,7 @@
grpc_handshake_manager_add(
handshake_mgr,
grpc_security_handshaker_create(
- exec_ctx, tsi_create_adapter_handshaker(
- tsi_create_fake_handshaker(true /* is_client */)),
+ exec_ctx, tsi_create_fake_handshaker(true /* is_client */),
&sc->base));
}
@@ -394,8 +393,7 @@
grpc_handshake_manager_add(
handshake_mgr,
grpc_security_handshaker_create(
- exec_ctx, tsi_create_adapter_handshaker(
- tsi_create_fake_handshaker(false /* is_client */)),
+ exec_ctx, tsi_create_fake_handshaker(false /* is_client */),
&sc->base));
}
diff --git a/src/core/lib/support/log_linux.c b/src/core/lib/support/log_linux.c
index 5c51266..61d2346 100644
--- a/src/core/lib/support/log_linux.c
+++ b/src/core/lib/support/log_linux.c
@@ -64,6 +64,8 @@
time_t timer;
gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME);
struct tm tm;
+ static __thread long tid = 0;
+ if (tid == 0) tid = gettid();
timer = (time_t)now.tv_sec;
final_slash = strrchr(args->file, '/');
@@ -81,7 +83,7 @@
gpr_asprintf(&prefix, "%s%s.%09" PRId32 " %7ld %s:%d]",
gpr_log_severity_string(args->severity), time_buffer,
- now.tv_nsec, gettid(), display_file, args->line);
+ now.tv_nsec, tid, display_file, args->line);
fprintf(stderr, "%-60s %s\n", prefix, args->message);
gpr_free(prefix);
diff --git a/src/core/lib/surface/api_trace.c b/src/core/lib/surface/api_trace.c
index f88ffd5..5697330 100644
--- a/src/core/lib/surface/api_trace.c
+++ b/src/core/lib/surface/api_trace.c
@@ -19,4 +19,4 @@
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/debug/trace.h"
-grpc_tracer_flag grpc_api_trace = GRPC_TRACER_INITIALIZER(false);
+grpc_tracer_flag grpc_api_trace = GRPC_TRACER_INITIALIZER(false, "api");
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index aac443e..2365d27 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -229,8 +229,10 @@
void *saved_receiving_stream_ready_bctlp;
};
-grpc_tracer_flag grpc_call_error_trace = GRPC_TRACER_INITIALIZER(false);
-grpc_tracer_flag grpc_compression_trace = GRPC_TRACER_INITIALIZER(false);
+grpc_tracer_flag grpc_call_error_trace =
+ GRPC_TRACER_INITIALIZER(false, "call_error");
+grpc_tracer_flag grpc_compression_trace =
+ GRPC_TRACER_INITIALIZER(false, "compression");
#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
#define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1)
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index b04aee6..978d7b4 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -35,10 +35,13 @@
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/event_string.h"
-grpc_tracer_flag grpc_trace_operation_failures = GRPC_TRACER_INITIALIZER(false);
+grpc_tracer_flag grpc_trace_operation_failures =
+ GRPC_TRACER_INITIALIZER(false, "op_failure");
#ifndef NDEBUG
-grpc_tracer_flag grpc_trace_pending_tags = GRPC_TRACER_INITIALIZER(false);
-grpc_tracer_flag grpc_trace_cq_refcount = GRPC_TRACER_INITIALIZER(false);
+grpc_tracer_flag grpc_trace_pending_tags =
+ GRPC_TRACER_INITIALIZER(false, "pending_tags");
+grpc_tracer_flag grpc_trace_cq_refcount =
+ GRPC_TRACER_INITIALIZER(false, "cq_refcount");
#endif
typedef struct {
@@ -189,16 +192,19 @@
typedef struct cq_vtable {
grpc_cq_completion_type cq_completion_type;
- size_t (*size)();
- void (*begin_op)(grpc_completion_queue *cc, void *tag);
- void (*end_op)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, void *tag,
+ size_t data_size;
+ void (*init)(void *data);
+ void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq);
+ void (*destroy)(void *data);
+ void (*begin_op)(grpc_completion_queue *cq, void *tag);
+ void (*end_op)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, void *tag,
grpc_error *error,
void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg,
grpc_cq_completion *storage),
void *done_arg, grpc_cq_completion *storage);
- grpc_event (*next)(grpc_completion_queue *cc, gpr_timespec deadline,
+ grpc_event (*next)(grpc_completion_queue *cq, gpr_timespec deadline,
void *reserved);
- grpc_event (*pluck)(grpc_completion_queue *cc, void *tag,
+ grpc_event (*pluck)(grpc_completion_queue *cq, void *tag,
gpr_timespec deadline, void *reserved);
} cq_vtable;
@@ -218,25 +224,28 @@
gpr_atm num_queue_items;
} grpc_cq_event_queue;
-/* TODO: sreek Refactor this based on the completion_type. Put completion-type
- * specific data in a different structure (and co-allocate memory for it along
- * with completion queue + pollset )*/
-typedef struct cq_data {
- gpr_mu *mu;
+typedef struct cq_next_data {
+ /** Completed events for completion-queues of type GRPC_CQ_NEXT */
+ grpc_cq_event_queue queue;
+ /** Counter of how many things have ever been queued on this completion queue
+ useful for avoiding locks to check the queue */
+ gpr_atm things_queued_ever;
+
+ /* Number of outstanding events (+1 if not shut down) */
+ gpr_atm pending_events;
+
+ int shutdown_called;
+} cq_next_data;
+
+typedef struct cq_pluck_data {
/** Completed events for completion-queues of type GRPC_CQ_PLUCK */
grpc_cq_completion completed_head;
grpc_cq_completion *completed_tail;
- /** Completed events for completion-queues of type GRPC_CQ_NEXT */
- grpc_cq_event_queue queue;
-
/** Number of pending events (+1 if we're not shutdown) */
gpr_refcount pending_events;
- /** Once owning_refs drops to zero, we will destroy the cq */
- gpr_refcount owning_refs;
-
/** Counter of how many things have ever been queued on this completion queue
useful for avoiding locks to check the queue */
gpr_atm things_queued_ever;
@@ -245,37 +254,45 @@
gpr_atm shutdown;
int shutdown_called;
- int is_server_cq;
-
int num_pluckers;
- int num_polls;
plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
- grpc_closure pollset_shutdown_done;
+} cq_pluck_data;
+
+/* Completion queue structure */
+struct grpc_completion_queue {
+ /** Once owning_refs drops to zero, we will destroy the cq */
+ gpr_refcount owning_refs;
+
+ gpr_mu *mu;
+
+ const cq_vtable *vtable;
+ const cq_poller_vtable *poller_vtable;
#ifndef NDEBUG
void **outstanding_tags;
size_t outstanding_tag_count;
size_t outstanding_tag_capacity;
#endif
-} cq_data;
-/* Completion queue structure */
-struct grpc_completion_queue {
- cq_data data;
- const cq_vtable *vtable;
- const cq_poller_vtable *poller_vtable;
+ grpc_closure pollset_shutdown_done;
+ int num_polls;
};
/* Forward declarations */
-static void cq_finish_shutdown(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cc);
+static void cq_finish_shutdown_next(grpc_exec_ctx *exec_ctx,
+ grpc_completion_queue *cq);
+static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx,
+ grpc_completion_queue *cq);
+static void cq_shutdown_next(grpc_exec_ctx *exec_ctx,
+ grpc_completion_queue *cq);
+static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx,
+ grpc_completion_queue *cq);
-static size_t cq_size(grpc_completion_queue *cc);
-
-static void cq_begin_op(grpc_completion_queue *cc, void *tag);
+static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag);
+static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag);
static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cc, void *tag,
+ grpc_completion_queue *cq, void *tag,
grpc_error *error,
void (*done)(grpc_exec_ctx *exec_ctx,
void *done_arg,
@@ -283,42 +300,56 @@
void *done_arg, grpc_cq_completion *storage);
static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cc, void *tag,
+ grpc_completion_queue *cq, void *tag,
grpc_error *error,
void (*done)(grpc_exec_ctx *exec_ctx,
void *done_arg,
grpc_cq_completion *storage),
void *done_arg, grpc_cq_completion *storage);
-static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline,
+static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
void *reserved);
-static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag,
+static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag,
gpr_timespec deadline, void *reserved);
+static void cq_init_next(void *data);
+static void cq_init_pluck(void *data);
+static void cq_destroy_next(void *data);
+static void cq_destroy_pluck(void *data);
+
/* Completion queue vtables based on the completion-type */
static const cq_vtable g_cq_vtable[] = {
/* GRPC_CQ_NEXT */
- {.cq_completion_type = GRPC_CQ_NEXT,
- .size = cq_size,
- .begin_op = cq_begin_op,
+ {.data_size = sizeof(cq_next_data),
+ .cq_completion_type = GRPC_CQ_NEXT,
+ .init = cq_init_next,
+ .shutdown = cq_shutdown_next,
+ .destroy = cq_destroy_next,
+ .begin_op = cq_begin_op_for_next,
.end_op = cq_end_op_for_next,
.next = cq_next,
.pluck = NULL},
/* GRPC_CQ_PLUCK */
- {.cq_completion_type = GRPC_CQ_PLUCK,
- .size = cq_size,
- .begin_op = cq_begin_op,
+ {.data_size = sizeof(cq_pluck_data),
+ .cq_completion_type = GRPC_CQ_PLUCK,
+ .init = cq_init_pluck,
+ .shutdown = cq_shutdown_pluck,
+ .destroy = cq_destroy_pluck,
+ .begin_op = cq_begin_op_for_pluck,
.end_op = cq_end_op_for_pluck,
.next = NULL,
.pluck = cq_pluck},
};
-#define POLLSET_FROM_CQ(cq) ((grpc_pollset *)(cq + 1))
-#define CQ_FROM_POLLSET(ps) (((grpc_completion_queue *)ps) - 1)
+#define DATA_FROM_CQ(cq) ((void *)(cq + 1))
+#define POLLSET_FROM_CQ(cq) \
+ ((grpc_pollset *)(cq->vtable->data_size + (char *)DATA_FROM_CQ(cq)))
-grpc_tracer_flag grpc_cq_pluck_trace = GRPC_TRACER_INITIALIZER(true);
-grpc_tracer_flag grpc_cq_event_timeout_trace = GRPC_TRACER_INITIALIZER(true);
+grpc_tracer_flag grpc_cq_pluck_trace =
+ GRPC_TRACER_INITIALIZER(true, "queue_pluck");
+grpc_tracer_flag grpc_cq_event_timeout_trace =
+ GRPC_TRACER_INITIALIZER(true, "queue_timeout");
#define GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, event) \
if (GRPC_TRACER_ON(grpc_api_trace) && \
@@ -329,7 +360,7 @@
gpr_free(_ev); \
}
-static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cc,
+static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cq,
grpc_error *error);
static void cq_event_queue_init(grpc_cq_event_queue *q) {
@@ -342,9 +373,9 @@
gpr_mpscq_destroy(&q->queue);
}
-static void cq_event_queue_push(grpc_cq_event_queue *q, grpc_cq_completion *c) {
+static bool cq_event_queue_push(grpc_cq_event_queue *q, grpc_cq_completion *c) {
gpr_mpscq_push(&q->queue, (gpr_mpscq_node *)c);
- gpr_atm_no_barrier_fetch_add(&q->num_queue_items, 1);
+ return gpr_atm_no_barrier_fetch_add(&q->num_queue_items, 1) == 0;
}
static grpc_cq_completion *cq_event_queue_pop(grpc_cq_event_queue *q) {
@@ -367,16 +398,10 @@
return (long)gpr_atm_no_barrier_load(&q->num_queue_items);
}
-static size_t cq_size(grpc_completion_queue *cc) {
- /* Size of the completion queue and the size of the pollset whose memory is
- allocated right after that of completion queue */
- return sizeof(grpc_completion_queue) + cc->poller_vtable->size();
-}
-
grpc_completion_queue *grpc_completion_queue_create_internal(
grpc_cq_completion_type completion_type,
grpc_cq_polling_type polling_type) {
- grpc_completion_queue *cc;
+ grpc_completion_queue *cq;
GPR_TIMER_BEGIN("grpc_completion_queue_create_internal", 0);
@@ -389,158 +414,173 @@
const cq_poller_vtable *poller_vtable =
&g_poller_vtable_by_poller_type[polling_type];
- cc = gpr_zalloc(sizeof(grpc_completion_queue) + poller_vtable->size());
- cq_data *cqd = &cc->data;
+ cq = gpr_zalloc(sizeof(grpc_completion_queue) + vtable->data_size +
+ poller_vtable->size());
- cc->vtable = vtable;
- cc->poller_vtable = poller_vtable;
+ cq->vtable = vtable;
+ cq->poller_vtable = poller_vtable;
- poller_vtable->init(POLLSET_FROM_CQ(cc), &cc->data.mu);
-
-#ifndef NDEBUG
- cqd->outstanding_tags = NULL;
- cqd->outstanding_tag_capacity = 0;
-#endif
-
- /* Initial ref is dropped by grpc_completion_queue_shutdown */
- gpr_ref_init(&cqd->pending_events, 1);
/* One for destroy(), one for pollset_shutdown */
- gpr_ref_init(&cqd->owning_refs, 2);
- cqd->completed_tail = &cqd->completed_head;
- cqd->completed_head.next = (uintptr_t)cqd->completed_tail;
- gpr_atm_no_barrier_store(&cqd->shutdown, 0);
- cqd->shutdown_called = 0;
- cqd->is_server_cq = 0;
- cqd->num_pluckers = 0;
- cqd->num_polls = 0;
- gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0);
-#ifndef NDEBUG
- cqd->outstanding_tag_count = 0;
-#endif
- cq_event_queue_init(&cqd->queue);
- GRPC_CLOSURE_INIT(&cqd->pollset_shutdown_done, on_pollset_shutdown_done, cc,
+ gpr_ref_init(&cq->owning_refs, 2);
+
+ poller_vtable->init(POLLSET_FROM_CQ(cq), &cq->mu);
+ vtable->init(DATA_FROM_CQ(cq));
+
+ GRPC_CLOSURE_INIT(&cq->pollset_shutdown_done, on_pollset_shutdown_done, cq,
grpc_schedule_on_exec_ctx);
GPR_TIMER_END("grpc_completion_queue_create_internal", 0);
- return cc;
+ return cq;
}
-grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc) {
- return cc->vtable->cq_completion_type;
+static void cq_init_next(void *ptr) {
+ cq_next_data *cqd = ptr;
+ /* Initial ref is dropped by grpc_completion_queue_shutdown */
+ gpr_atm_no_barrier_store(&cqd->pending_events, 1);
+ cqd->shutdown_called = false;
+ gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0);
+ cq_event_queue_init(&cqd->queue);
}
-int grpc_get_cq_poll_num(grpc_completion_queue *cc) {
+static void cq_destroy_next(void *ptr) {
+ cq_next_data *cqd = ptr;
+ GPR_ASSERT(cq_event_queue_num_items(&cqd->queue) == 0);
+ cq_event_queue_destroy(&cqd->queue);
+}
+
+static void cq_init_pluck(void *ptr) {
+ cq_pluck_data *cqd = ptr;
+ /* Initial ref is dropped by grpc_completion_queue_shutdown */
+ gpr_ref_init(&cqd->pending_events, 1);
+ cqd->completed_tail = &cqd->completed_head;
+ cqd->completed_head.next = (uintptr_t)cqd->completed_tail;
+ gpr_atm_no_barrier_store(&cqd->shutdown, 0);
+ cqd->shutdown_called = 0;
+ cqd->num_pluckers = 0;
+ gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0);
+}
+
+static void cq_destroy_pluck(void *ptr) {
+ cq_pluck_data *cqd = ptr;
+ GPR_ASSERT(cqd->completed_head.next == (uintptr_t)&cqd->completed_head);
+}
+
+grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cq) {
+ return cq->vtable->cq_completion_type;
+}
+
+int grpc_get_cq_poll_num(grpc_completion_queue *cq) {
int cur_num_polls;
- gpr_mu_lock(cc->data.mu);
- cur_num_polls = cc->data.num_polls;
- gpr_mu_unlock(cc->data.mu);
+ gpr_mu_lock(cq->mu);
+ cur_num_polls = cq->num_polls;
+ gpr_mu_unlock(cq->mu);
return cur_num_polls;
}
#ifndef NDEBUG
-void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason,
+void grpc_cq_internal_ref(grpc_completion_queue *cq, const char *reason,
const char *file, int line) {
- cq_data *cqd = &cc->data;
if (GRPC_TRACER_ON(grpc_trace_cq_refcount)) {
- gpr_atm val = gpr_atm_no_barrier_load(&cqd->owning_refs.count);
+ gpr_atm val = gpr_atm_no_barrier_load(&cq->owning_refs.count);
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
- "CQ:%p ref %" PRIdPTR " -> %" PRIdPTR " %s", cc, val, val + 1,
+ "CQ:%p ref %" PRIdPTR " -> %" PRIdPTR " %s", cq, val, val + 1,
reason);
}
#else
-void grpc_cq_internal_ref(grpc_completion_queue *cc) {
- cq_data *cqd = &cc->data;
+void grpc_cq_internal_ref(grpc_completion_queue *cq) {
#endif
- gpr_ref(&cqd->owning_refs);
+ gpr_ref(&cq->owning_refs);
}
static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
- grpc_completion_queue *cc = arg;
- GRPC_CQ_INTERNAL_UNREF(exec_ctx, cc, "pollset_destroy");
+ grpc_completion_queue *cq = arg;
+ GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "pollset_destroy");
}
#ifndef NDEBUG
-void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
+void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq,
const char *reason, const char *file, int line) {
- cq_data *cqd = &cc->data;
if (GRPC_TRACER_ON(grpc_trace_cq_refcount)) {
- gpr_atm val = gpr_atm_no_barrier_load(&cqd->owning_refs.count);
+ gpr_atm val = gpr_atm_no_barrier_load(&cq->owning_refs.count);
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
- "CQ:%p unref %" PRIdPTR " -> %" PRIdPTR " %s", cc, val, val - 1,
+ "CQ:%p unref %" PRIdPTR " -> %" PRIdPTR " %s", cq, val, val - 1,
reason);
}
#else
void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cc) {
- cq_data *cqd = &cc->data;
+ grpc_completion_queue *cq) {
#endif
- if (gpr_unref(&cqd->owning_refs)) {
- GPR_ASSERT(cqd->completed_head.next == (uintptr_t)&cqd->completed_head);
- cc->poller_vtable->destroy(exec_ctx, POLLSET_FROM_CQ(cc));
- cq_event_queue_destroy(&cqd->queue);
+ if (gpr_unref(&cq->owning_refs)) {
+ cq->vtable->destroy(DATA_FROM_CQ(cq));
+ cq->poller_vtable->destroy(exec_ctx, POLLSET_FROM_CQ(cq));
#ifndef NDEBUG
- gpr_free(cqd->outstanding_tags);
+ gpr_free(cq->outstanding_tags);
#endif
- gpr_free(cc);
+ gpr_free(cq);
}
}
-static void cq_begin_op(grpc_completion_queue *cc, void *tag) {
- cq_data *cqd = &cc->data;
-#ifndef NDEBUG
- gpr_mu_lock(cqd->mu);
+static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) {
+ cq_next_data *cqd = DATA_FROM_CQ(cq);
GPR_ASSERT(!cqd->shutdown_called);
- if (cqd->outstanding_tag_count == cqd->outstanding_tag_capacity) {
- cqd->outstanding_tag_capacity =
- GPR_MAX(4, 2 * cqd->outstanding_tag_capacity);
- cqd->outstanding_tags =
- gpr_realloc(cqd->outstanding_tags, sizeof(*cqd->outstanding_tags) *
- cqd->outstanding_tag_capacity);
- }
- cqd->outstanding_tags[cqd->outstanding_tag_count++] = tag;
- gpr_mu_unlock(cqd->mu);
-#endif
+ gpr_atm_no_barrier_fetch_add(&cqd->pending_events, 1);
+}
+
+static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) {
+ cq_pluck_data *cqd = DATA_FROM_CQ(cq);
+ GPR_ASSERT(!cqd->shutdown_called);
gpr_ref(&cqd->pending_events);
}
-void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) {
- cc->vtable->begin_op(cc, tag);
+void grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) {
+#ifndef NDEBUG
+ gpr_mu_lock(cq->mu);
+ if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) {
+ cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity);
+ cq->outstanding_tags =
+ gpr_realloc(cq->outstanding_tags, sizeof(*cq->outstanding_tags) *
+ cq->outstanding_tag_capacity);
+ }
+ cq->outstanding_tags[cq->outstanding_tag_count++] = tag;
+ gpr_mu_unlock(cq->mu);
+#endif
+ cq->vtable->begin_op(cq, tag);
}
#ifndef NDEBUG
-static void cq_check_tag(grpc_completion_queue *cc, void *tag, bool lock_cq) {
- cq_data *cqd = &cc->data;
+static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {
int found = 0;
if (lock_cq) {
- gpr_mu_lock(cqd->mu);
+ gpr_mu_lock(cq->mu);
}
- for (int i = 0; i < (int)cqd->outstanding_tag_count; i++) {
- if (cqd->outstanding_tags[i] == tag) {
- cqd->outstanding_tag_count--;
- GPR_SWAP(void *, cqd->outstanding_tags[i],
- cqd->outstanding_tags[cqd->outstanding_tag_count]);
+ for (int i = 0; i < (int)cq->outstanding_tag_count; i++) {
+ if (cq->outstanding_tags[i] == tag) {
+ cq->outstanding_tag_count--;
+ GPR_SWAP(void *, cq->outstanding_tags[i],
+ cq->outstanding_tags[cq->outstanding_tag_count]);
found = 1;
break;
}
}
if (lock_cq) {
- gpr_mu_unlock(cqd->mu);
+ gpr_mu_unlock(cq->mu);
}
GPR_ASSERT(found);
}
#else
-static void cq_check_tag(grpc_completion_queue *cc, void *tag, bool lock_cq) {}
+static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {}
#endif
-/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a completion
+/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
+ * completion
* type of GRPC_CQ_NEXT) */
static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cc, void *tag,
+ grpc_completion_queue *cq, void *tag,
grpc_error *error,
void (*done)(grpc_exec_ctx *exec_ctx,
void *done_arg,
@@ -553,16 +593,16 @@
error != GRPC_ERROR_NONE)) {
const char *errmsg = grpc_error_string(error);
GRPC_API_TRACE(
- "cq_end_op_for_next(exec_ctx=%p, cc=%p, tag=%p, error=%s, "
+ "cq_end_op_for_next(exec_ctx=%p, cq=%p, tag=%p, error=%s, "
"done=%p, done_arg=%p, storage=%p)",
- 7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage));
+ 7, (exec_ctx, cq, tag, errmsg, done, done_arg, storage));
if (GRPC_TRACER_ON(grpc_trace_operation_failures) &&
error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
}
}
- cq_data *cqd = &cc->data;
+ cq_next_data *cqd = DATA_FROM_CQ(cq);
int is_success = (error == GRPC_ERROR_NONE);
storage->tag = tag;
@@ -570,28 +610,42 @@
storage->done_arg = done_arg;
storage->next = (uintptr_t)(is_success);
- cq_check_tag(cc, tag, true); /* Used in debug builds only */
+ cq_check_tag(cq, tag, true); /* Used in debug builds only */
/* Add the completion to the queue */
- cq_event_queue_push(&cqd->queue, storage);
+ bool is_first = cq_event_queue_push(&cqd->queue, storage);
gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1);
+ bool will_definitely_shutdown =
+ gpr_atm_no_barrier_load(&cqd->pending_events) == 1;
- gpr_mu_lock(cqd->mu);
+ if (!will_definitely_shutdown) {
+ /* Only kick if this is the first item queued */
+ if (is_first) {
+ gpr_mu_lock(cq->mu);
+ grpc_error *kick_error =
+ cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), NULL);
+ gpr_mu_unlock(cq->mu);
- int shutdown = gpr_unref(&cqd->pending_events);
- if (!shutdown) {
- grpc_error *kick_error = cc->poller_vtable->kick(POLLSET_FROM_CQ(cc), NULL);
- gpr_mu_unlock(cqd->mu);
-
- if (kick_error != GRPC_ERROR_NONE) {
- const char *msg = grpc_error_string(kick_error);
- gpr_log(GPR_ERROR, "Kick failed: %s", msg);
-
- GRPC_ERROR_UNREF(kick_error);
+ if (kick_error != GRPC_ERROR_NONE) {
+ const char *msg = grpc_error_string(kick_error);
+ gpr_log(GPR_ERROR, "Kick failed: %s", msg);
+ GRPC_ERROR_UNREF(kick_error);
+ }
+ }
+ if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
+ GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
+ gpr_mu_lock(cq->mu);
+ cq_finish_shutdown_next(exec_ctx, cq);
+ gpr_mu_unlock(cq->mu);
+ GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
}
} else {
- cq_finish_shutdown(exec_ctx, cc);
- gpr_mu_unlock(cqd->mu);
+ GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
+ gpr_atm_rel_store(&cqd->pending_events, 0);
+ gpr_mu_lock(cq->mu);
+ cq_finish_shutdown_next(exec_ctx, cq);
+ gpr_mu_unlock(cq->mu);
+ GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
}
GPR_TIMER_END("cq_end_op_for_next", 0);
@@ -599,16 +653,17 @@
GRPC_ERROR_UNREF(error);
}
-/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a completion
+/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
+ * completion
* type of GRPC_CQ_PLUCK) */
static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cc, void *tag,
+ grpc_completion_queue *cq, void *tag,
grpc_error *error,
void (*done)(grpc_exec_ctx *exec_ctx,
void *done_arg,
grpc_cq_completion *storage),
void *done_arg, grpc_cq_completion *storage) {
- cq_data *cqd = &cc->data;
+ cq_pluck_data *cqd = DATA_FROM_CQ(cq);
int is_success = (error == GRPC_ERROR_NONE);
GPR_TIMER_BEGIN("cq_end_op_for_pluck", 0);
@@ -618,9 +673,9 @@
error != GRPC_ERROR_NONE)) {
const char *errmsg = grpc_error_string(error);
GRPC_API_TRACE(
- "cq_end_op_for_pluck(exec_ctx=%p, cc=%p, tag=%p, error=%s, "
+ "cq_end_op_for_pluck(exec_ctx=%p, cq=%p, tag=%p, error=%s, "
"done=%p, done_arg=%p, storage=%p)",
- 7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage));
+ 7, (exec_ctx, cq, tag, errmsg, done, done_arg, storage));
if (GRPC_TRACER_ON(grpc_trace_operation_failures) &&
error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
@@ -632,8 +687,8 @@
storage->done_arg = done_arg;
storage->next = ((uintptr_t)&cqd->completed_head) | ((uintptr_t)(is_success));
- gpr_mu_lock(cqd->mu);
- cq_check_tag(cc, tag, false); /* Used in debug builds only */
+ gpr_mu_lock(cq->mu);
+ cq_check_tag(cq, tag, false); /* Used in debug builds only */
/* Add to the list of completions */
gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1);
@@ -652,9 +707,9 @@
}
grpc_error *kick_error =
- cc->poller_vtable->kick(POLLSET_FROM_CQ(cc), pluck_worker);
+ cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), pluck_worker);
- gpr_mu_unlock(cqd->mu);
+ gpr_mu_unlock(cq->mu);
if (kick_error != GRPC_ERROR_NONE) {
const char *msg = grpc_error_string(kick_error);
@@ -663,8 +718,8 @@
GRPC_ERROR_UNREF(kick_error);
}
} else {
- cq_finish_shutdown(exec_ctx, cc);
- gpr_mu_unlock(cqd->mu);
+ cq_finish_shutdown_pluck(exec_ctx, cq);
+ gpr_mu_unlock(cq->mu);
}
GPR_TIMER_END("cq_end_op_for_pluck", 0);
@@ -672,12 +727,12 @@
GRPC_ERROR_UNREF(error);
}
-void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
+void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq,
void *tag, grpc_error *error,
void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg,
grpc_cq_completion *storage),
void *done_arg, grpc_cq_completion *storage) {
- cc->vtable->end_op(exec_ctx, cc, tag, error, done, done_arg, storage);
+ cq->vtable->end_op(exec_ctx, cq, tag, error, done, done_arg, storage);
}
typedef struct {
@@ -692,7 +747,7 @@
static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) {
cq_is_finished_arg *a = arg;
grpc_completion_queue *cq = a->cq;
- cq_data *cqd = &cq->data;
+ cq_next_data *cqd = DATA_FROM_CQ(cq);
GPR_ASSERT(a->stolen_completion == NULL);
gpr_atm current_last_seen_things_queued_ever =
@@ -703,7 +758,8 @@
gpr_atm_no_barrier_load(&cqd->things_queued_ever);
/* Pop a cq_completion from the queue. Returns NULL if the queue is empty
- * might return NULL in some cases even if the queue is not empty; but that
+ * might return NULL in some cases even if the queue is not empty; but
+ * that
* is ok and doesn't affect correctness. Might effect the tail latencies a
* bit) */
a->stolen_completion = cq_event_queue_pop(&cqd->queue);
@@ -716,58 +772,56 @@
}
#ifndef NDEBUG
-static void dump_pending_tags(grpc_completion_queue *cc) {
+static void dump_pending_tags(grpc_completion_queue *cq) {
if (!GRPC_TRACER_ON(grpc_trace_pending_tags)) return;
- cq_data *cqd = &cc->data;
-
gpr_strvec v;
gpr_strvec_init(&v);
gpr_strvec_add(&v, gpr_strdup("PENDING TAGS:"));
- gpr_mu_lock(cqd->mu);
- for (size_t i = 0; i < cqd->outstanding_tag_count; i++) {
+ gpr_mu_lock(cq->mu);
+ for (size_t i = 0; i < cq->outstanding_tag_count; i++) {
char *s;
- gpr_asprintf(&s, " %p", cqd->outstanding_tags[i]);
+ gpr_asprintf(&s, " %p", cq->outstanding_tags[i]);
gpr_strvec_add(&v, s);
}
- gpr_mu_unlock(cqd->mu);
+ gpr_mu_unlock(cq->mu);
char *out = gpr_strvec_flatten(&v, NULL);
gpr_strvec_destroy(&v);
gpr_log(GPR_DEBUG, "%s", out);
gpr_free(out);
}
#else
-static void dump_pending_tags(grpc_completion_queue *cc) {}
+static void dump_pending_tags(grpc_completion_queue *cq) {}
#endif
-static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline,
+static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
void *reserved) {
grpc_event ret;
gpr_timespec now;
- cq_data *cqd = &cc->data;
+ cq_next_data *cqd = DATA_FROM_CQ(cq);
GPR_TIMER_BEGIN("grpc_completion_queue_next", 0);
GRPC_API_TRACE(
"grpc_completion_queue_next("
- "cc=%p, "
+ "cq=%p, "
"deadline=gpr_timespec { tv_sec: %" PRId64
", tv_nsec: %d, clock_type: %d }, "
"reserved=%p)",
- 5, (cc, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type,
+ 5, (cq, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type,
reserved));
GPR_ASSERT(!reserved);
- dump_pending_tags(cc);
+ dump_pending_tags(cq);
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
- GRPC_CQ_INTERNAL_REF(cc, "next");
+ GRPC_CQ_INTERNAL_REF(cq, "next");
cq_is_finished_arg is_finished_arg = {
.last_seen_things_queued_ever =
gpr_atm_no_barrier_load(&cqd->things_queued_ever),
- .cq = cc,
+ .cq = cq,
.deadline = deadline,
.stolen_completion = NULL,
.tag = NULL,
@@ -800,21 +854,24 @@
/* If c == NULL it means either the queue is empty OR in an transient
inconsistent state. If it is the latter, we shold do a 0-timeout poll
so that the thread comes back quickly from poll to make a second
- attempt at popping. Not doing this can potentially deadlock this thread
+ attempt at popping. Not doing this can potentially deadlock this
+ thread
forever (if the deadline is infinity) */
if (cq_event_queue_num_items(&cqd->queue) > 0) {
iteration_deadline = gpr_time_0(GPR_CLOCK_MONOTONIC);
}
}
- if (gpr_atm_no_barrier_load(&cqd->shutdown)) {
+ if (gpr_atm_no_barrier_load(&cqd->pending_events) == 0) {
/* Before returning, check if the queue has any items left over (since
gpr_mpscq_pop() can sometimes return NULL even if the queue is not
empty. If so, keep retrying but do not return GRPC_QUEUE_SHUTDOWN */
if (cq_event_queue_num_items(&cqd->queue) > 0) {
/* Go to the beginning of the loop. No point doing a poll because
- (cc->shutdown == true) is only possible when there is no pending work
- (i.e cc->pending_events == 0) and any outstanding grpc_cq_completion
+ (cq->shutdown == true) is only possible when there is no pending
+ work
+ (i.e cq->pending_events == 0) and any outstanding
+ grpc_cq_completion
events are already queued on this cq */
continue;
}
@@ -828,16 +885,16 @@
if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) {
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
- dump_pending_tags(cc);
+ dump_pending_tags(cq);
break;
}
/* The main polling work happens in grpc_pollset_work */
- gpr_mu_lock(cqd->mu);
- cqd->num_polls++;
- grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc),
+ gpr_mu_lock(cq->mu);
+ cq->num_polls++;
+ grpc_error *err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq),
NULL, now, iteration_deadline);
- gpr_mu_unlock(cqd->mu);
+ gpr_mu_unlock(cq->mu);
if (err != GRPC_ERROR_NONE) {
const char *msg = grpc_error_string(err);
@@ -846,30 +903,74 @@
GRPC_ERROR_UNREF(err);
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
- dump_pending_tags(cc);
+ dump_pending_tags(cq);
break;
}
is_finished_arg.first_loop = false;
}
- GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
- GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cc, "next");
+ GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
+ GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "next");
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(is_finished_arg.stolen_completion == NULL);
+ if (cq_event_queue_num_items(&cqd->queue) > 0 &&
+ gpr_atm_no_barrier_load(&cqd->pending_events) > 0) {
+ gpr_mu_lock(cq->mu);
+ cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), NULL);
+ gpr_mu_unlock(cq->mu);
+ }
+
GPR_TIMER_END("grpc_completion_queue_next", 0);
return ret;
}
-grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
- gpr_timespec deadline, void *reserved) {
- return cc->vtable->next(cc, deadline, reserved);
+/* Finishes the completion queue shutdown. This means that there are no more
+ completion events / tags expected from the completion queue
+ - Must be called under completion queue lock
+ - Must be called only once in completion queue's lifetime
+ - grpc_completion_queue_shutdown() MUST have been called before calling
+ this function */
+static void cq_finish_shutdown_next(grpc_exec_ctx *exec_ctx,
+ grpc_completion_queue *cq) {
+ cq_next_data *cqd = DATA_FROM_CQ(cq);
+
+ GPR_ASSERT(cqd->shutdown_called);
+ GPR_ASSERT(gpr_atm_no_barrier_load(&cqd->pending_events) == 0);
+
+ cq->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cq),
+ &cq->pollset_shutdown_done);
}
-static int add_plucker(grpc_completion_queue *cc, void *tag,
+static void cq_shutdown_next(grpc_exec_ctx *exec_ctx,
+ grpc_completion_queue *cq) {
+ cq_next_data *cqd = DATA_FROM_CQ(cq);
+
+ GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
+ gpr_mu_lock(cq->mu);
+ if (cqd->shutdown_called) {
+ gpr_mu_unlock(cq->mu);
+ GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
+ GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
+ return;
+ }
+ cqd->shutdown_called = 1;
+ if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
+ cq_finish_shutdown_next(exec_ctx, cq);
+ }
+ gpr_mu_unlock(cq->mu);
+ GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
+}
+
+grpc_event grpc_completion_queue_next(grpc_completion_queue *cq,
+ gpr_timespec deadline, void *reserved) {
+ return cq->vtable->next(cq, deadline, reserved);
+}
+
+static int add_plucker(grpc_completion_queue *cq, void *tag,
grpc_pollset_worker **worker) {
- cq_data *cqd = &cc->data;
+ cq_pluck_data *cqd = DATA_FROM_CQ(cq);
if (cqd->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) {
return 0;
}
@@ -879,9 +980,9 @@
return 1;
}
-static void del_plucker(grpc_completion_queue *cc, void *tag,
+static void del_plucker(grpc_completion_queue *cq, void *tag,
grpc_pollset_worker **worker) {
- cq_data *cqd = &cc->data;
+ cq_pluck_data *cqd = DATA_FROM_CQ(cq);
for (int i = 0; i < cqd->num_pluckers; i++) {
if (cqd->pluckers[i].tag == tag && cqd->pluckers[i].worker == worker) {
cqd->num_pluckers--;
@@ -895,13 +996,13 @@
static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) {
cq_is_finished_arg *a = arg;
grpc_completion_queue *cq = a->cq;
- cq_data *cqd = &cq->data;
+ cq_pluck_data *cqd = DATA_FROM_CQ(cq);
GPR_ASSERT(a->stolen_completion == NULL);
gpr_atm current_last_seen_things_queued_ever =
gpr_atm_no_barrier_load(&cqd->things_queued_ever);
if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) {
- gpr_mu_lock(cqd->mu);
+ gpr_mu_lock(cq->mu);
a->last_seen_things_queued_ever =
gpr_atm_no_barrier_load(&cqd->things_queued_ever);
grpc_cq_completion *c;
@@ -913,51 +1014,51 @@
if (c == cqd->completed_tail) {
cqd->completed_tail = prev;
}
- gpr_mu_unlock(cqd->mu);
+ gpr_mu_unlock(cq->mu);
a->stolen_completion = c;
return true;
}
prev = c;
}
- gpr_mu_unlock(cqd->mu);
+ gpr_mu_unlock(cq->mu);
}
return !a->first_loop &&
gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0;
}
-static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag,
+static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag,
gpr_timespec deadline, void *reserved) {
grpc_event ret;
grpc_cq_completion *c;
grpc_cq_completion *prev;
grpc_pollset_worker *worker = NULL;
gpr_timespec now;
- cq_data *cqd = &cc->data;
+ cq_pluck_data *cqd = DATA_FROM_CQ(cq);
GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0);
if (GRPC_TRACER_ON(grpc_cq_pluck_trace)) {
GRPC_API_TRACE(
"grpc_completion_queue_pluck("
- "cc=%p, tag=%p, "
+ "cq=%p, tag=%p, "
"deadline=gpr_timespec { tv_sec: %" PRId64
", tv_nsec: %d, clock_type: %d }, "
"reserved=%p)",
- 6, (cc, tag, deadline.tv_sec, deadline.tv_nsec,
+ 6, (cq, tag, deadline.tv_sec, deadline.tv_nsec,
(int)deadline.clock_type, reserved));
}
GPR_ASSERT(!reserved);
- dump_pending_tags(cc);
+ dump_pending_tags(cq);
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
- GRPC_CQ_INTERNAL_REF(cc, "pluck");
- gpr_mu_lock(cqd->mu);
+ GRPC_CQ_INTERNAL_REF(cq, "pluck");
+ gpr_mu_lock(cq->mu);
cq_is_finished_arg is_finished_arg = {
.last_seen_things_queued_ever =
gpr_atm_no_barrier_load(&cqd->things_queued_ever),
- .cq = cc,
+ .cq = cq,
.deadline = deadline,
.stolen_completion = NULL,
.tag = tag,
@@ -966,7 +1067,7 @@
GRPC_EXEC_CTX_INITIALIZER(0, cq_is_pluck_finished, &is_finished_arg);
for (;;) {
if (is_finished_arg.stolen_completion != NULL) {
- gpr_mu_unlock(cqd->mu);
+ gpr_mu_unlock(cq->mu);
c = is_finished_arg.stolen_completion;
is_finished_arg.stolen_completion = NULL;
ret.type = GRPC_OP_COMPLETE;
@@ -983,7 +1084,7 @@
if (c == cqd->completed_tail) {
cqd->completed_tail = prev;
}
- gpr_mu_unlock(cqd->mu);
+ gpr_mu_unlock(cq->mu);
ret.type = GRPC_OP_COMPLETE;
ret.success = c->next & 1u;
ret.tag = c->tag;
@@ -993,54 +1094,54 @@
prev = c;
}
if (gpr_atm_no_barrier_load(&cqd->shutdown)) {
- gpr_mu_unlock(cqd->mu);
+ gpr_mu_unlock(cq->mu);
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_SHUTDOWN;
break;
}
- if (!add_plucker(cc, tag, &worker)) {
+ if (!add_plucker(cq, tag, &worker)) {
gpr_log(GPR_DEBUG,
"Too many outstanding grpc_completion_queue_pluck calls: maximum "
"is %d",
GRPC_MAX_COMPLETION_QUEUE_PLUCKERS);
- gpr_mu_unlock(cqd->mu);
+ gpr_mu_unlock(cq->mu);
memset(&ret, 0, sizeof(ret));
/* TODO(ctiller): should we use a different result here */
ret.type = GRPC_QUEUE_TIMEOUT;
- dump_pending_tags(cc);
+ dump_pending_tags(cq);
break;
}
now = gpr_now(GPR_CLOCK_MONOTONIC);
if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) {
- del_plucker(cc, tag, &worker);
- gpr_mu_unlock(cqd->mu);
+ del_plucker(cq, tag, &worker);
+ gpr_mu_unlock(cq->mu);
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
- dump_pending_tags(cc);
+ dump_pending_tags(cq);
break;
}
- cqd->num_polls++;
- grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc),
+ cq->num_polls++;
+ grpc_error *err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq),
&worker, now, deadline);
if (err != GRPC_ERROR_NONE) {
- del_plucker(cc, tag, &worker);
- gpr_mu_unlock(cqd->mu);
+ del_plucker(cq, tag, &worker);
+ gpr_mu_unlock(cq->mu);
const char *msg = grpc_error_string(err);
gpr_log(GPR_ERROR, "Completion queue pluck failed: %s", msg);
GRPC_ERROR_UNREF(err);
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
- dump_pending_tags(cc);
+ dump_pending_tags(cq);
break;
}
is_finished_arg.first_loop = false;
- del_plucker(cc, tag, &worker);
+ del_plucker(cq, tag, &worker);
}
done:
- GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
- GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cc, "pluck");
+ GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
+ GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "pluck");
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(is_finished_arg.stolen_completion == NULL);
@@ -1049,85 +1150,66 @@
return ret;
}
-grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
+grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag,
gpr_timespec deadline, void *reserved) {
- return cc->vtable->pluck(cc, tag, deadline, reserved);
+ return cq->vtable->pluck(cq, tag, deadline, reserved);
}
-/* Finishes the completion queue shutdown. This means that there are no more
- completion events / tags expected from the completion queue
- - Must be called under completion queue lock
- - Must be called only once in completion queue's lifetime
- - grpc_completion_queue_shutdown() MUST have been called before calling
- this function */
-static void cq_finish_shutdown(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cc) {
- cq_data *cqd = &cc->data;
+static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx,
+ grpc_completion_queue *cq) {
+ cq_pluck_data *cqd = DATA_FROM_CQ(cq);
GPR_ASSERT(cqd->shutdown_called);
GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown));
gpr_atm_no_barrier_store(&cqd->shutdown, 1);
- cc->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cc),
- &cqd->pollset_shutdown_done);
+ cq->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cq),
+ &cq->pollset_shutdown_done);
}
-/* Shutdown simply drops a ref that we reserved at creation time; if we drop
- to zero here, then enter shutdown mode and wake up any waiters */
-void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0);
- GRPC_API_TRACE("grpc_completion_queue_shutdown(cc=%p)", 1, (cc));
- cq_data *cqd = &cc->data;
+static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx,
+ grpc_completion_queue *cq) {
+ cq_pluck_data *cqd = DATA_FROM_CQ(cq);
- gpr_mu_lock(cqd->mu);
+ gpr_mu_lock(cq->mu);
if (cqd->shutdown_called) {
- gpr_mu_unlock(cqd->mu);
+ gpr_mu_unlock(cq->mu);
GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
return;
}
cqd->shutdown_called = 1;
if (gpr_unref(&cqd->pending_events)) {
- cq_finish_shutdown(&exec_ctx, cc);
+ cq_finish_shutdown_pluck(exec_ctx, cq);
}
- gpr_mu_unlock(cqd->mu);
+ gpr_mu_unlock(cq->mu);
+}
+
+/* Shutdown simply drops a ref that we reserved at creation time; if we drop
+ to zero here, then enter shutdown mode and wake up any waiters */
+void grpc_completion_queue_shutdown(grpc_completion_queue *cq) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0);
+ GRPC_API_TRACE("grpc_completion_queue_shutdown(cq=%p)", 1, (cq));
+ cq->vtable->shutdown(&exec_ctx, cq);
grpc_exec_ctx_finish(&exec_ctx);
GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
}
-void grpc_completion_queue_destroy(grpc_completion_queue *cc) {
- GRPC_API_TRACE("grpc_completion_queue_destroy(cc=%p)", 1, (cc));
+void grpc_completion_queue_destroy(grpc_completion_queue *cq) {
+ GRPC_API_TRACE("grpc_completion_queue_destroy(cq=%p)", 1, (cq));
GPR_TIMER_BEGIN("grpc_completion_queue_destroy", 0);
- grpc_completion_queue_shutdown(cc);
-
- /* TODO (sreek): This should not ideally be here. Refactor it into the
- * cq_vtable (perhaps have a create/destroy methods in the cq vtable) */
- if (cc->vtable->cq_completion_type == GRPC_CQ_NEXT) {
- GPR_ASSERT(cq_event_queue_num_items(&cc->data.queue) == 0);
- }
+ grpc_completion_queue_shutdown(cq);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cc, "destroy");
+ GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "destroy");
grpc_exec_ctx_finish(&exec_ctx);
GPR_TIMER_END("grpc_completion_queue_destroy", 0);
}
-grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) {
- return cc->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cc) : NULL;
+grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cq) {
+ return cq->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cq) : NULL;
}
-grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps) {
- return CQ_FROM_POLLSET(ps);
-}
-
-void grpc_cq_mark_server_cq(grpc_completion_queue *cc) {
- cc->data.is_server_cq = 1;
-}
-
-bool grpc_cq_is_server_cq(grpc_completion_queue *cc) {
- return cc->data.is_server_cq;
-}
-
-bool grpc_cq_can_listen(grpc_completion_queue *cc) {
- return cc->poller_vtable->can_listen;
+bool grpc_cq_can_listen(grpc_completion_queue *cq) {
+ return cq->poller_vtable->can_listen;
}
diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h
index 97ea9ca..af44482 100644
--- a/src/core/lib/surface/completion_queue.h
+++ b/src/core/lib/surface/completion_queue.h
@@ -84,10 +84,7 @@
void *done_arg, grpc_cq_completion *storage);
grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc);
-grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps);
-void grpc_cq_mark_server_cq(grpc_completion_queue *cc);
-bool grpc_cq_is_server_cq(grpc_completion_queue *cc);
bool grpc_cq_can_listen(grpc_completion_queue *cc);
grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc);
diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c
index 14a86bf..db111e5 100644
--- a/src/core/lib/surface/init.c
+++ b/src/core/lib/surface/init.c
@@ -120,29 +120,27 @@
grpc_slice_intern_init();
grpc_mdctx_global_init();
grpc_channel_init_init();
- grpc_register_tracer("api", &grpc_api_trace);
- grpc_register_tracer("channel", &grpc_trace_channel);
- grpc_register_tracer("connectivity_state", &grpc_connectivity_state_trace);
- grpc_register_tracer("channel_stack_builder",
- &grpc_trace_channel_stack_builder);
- grpc_register_tracer("http1", &grpc_http1_trace);
- grpc_register_tracer("queue_pluck", &grpc_cq_pluck_trace); // default on
- grpc_register_tracer("combiner", &grpc_combiner_trace);
- grpc_register_tracer("server_channel", &grpc_server_channel_trace);
- grpc_register_tracer("bdp_estimator", &grpc_bdp_estimator_trace);
- grpc_register_tracer("queue_timeout",
- &grpc_cq_event_timeout_trace); // default on
- grpc_register_tracer("op_failure", &grpc_trace_operation_failures);
- grpc_register_tracer("resource_quota", &grpc_resource_quota_trace);
- grpc_register_tracer("call_error", &grpc_call_error_trace);
+ grpc_register_tracer(&grpc_api_trace);
+ grpc_register_tracer(&grpc_trace_channel);
+ grpc_register_tracer(&grpc_connectivity_state_trace);
+ grpc_register_tracer(&grpc_trace_channel_stack_builder);
+ grpc_register_tracer(&grpc_http1_trace);
+ grpc_register_tracer(&grpc_cq_pluck_trace); // default on
+ grpc_register_tracer(&grpc_combiner_trace);
+ grpc_register_tracer(&grpc_server_channel_trace);
+ grpc_register_tracer(&grpc_bdp_estimator_trace);
+ grpc_register_tracer(&grpc_cq_event_timeout_trace); // default on
+ grpc_register_tracer(&grpc_trace_operation_failures);
+ grpc_register_tracer(&grpc_resource_quota_trace);
+ grpc_register_tracer(&grpc_call_error_trace);
#ifndef NDEBUG
- grpc_register_tracer("pending_tags", &grpc_trace_pending_tags);
- grpc_register_tracer("queue_refcount", &grpc_trace_cq_refcount);
- grpc_register_tracer("closure", &grpc_trace_closure);
- grpc_register_tracer("error_refcount", &grpc_trace_error_refcount);
- grpc_register_tracer("stream_refcount", &grpc_trace_stream_refcount);
- grpc_register_tracer("fd_refcount", &grpc_trace_fd_refcount);
- grpc_register_tracer("metadata", &grpc_trace_metadata);
+ grpc_register_tracer(&grpc_trace_pending_tags);
+ grpc_register_tracer(&grpc_trace_cq_refcount);
+ grpc_register_tracer(&grpc_trace_closure);
+ grpc_register_tracer(&grpc_trace_error_refcount);
+ grpc_register_tracer(&grpc_trace_stream_refcount);
+ grpc_register_tracer(&grpc_trace_fd_refcount);
+ grpc_register_tracer(&grpc_trace_metadata);
#endif
grpc_security_pre_init();
grpc_iomgr_init(&exec_ctx);
diff --git a/src/core/lib/surface/init_secure.c b/src/core/lib/surface/init_secure.c
index 7dbea58..2366c24 100644
--- a/src/core/lib/surface/init_secure.c
+++ b/src/core/lib/surface/init_secure.c
@@ -37,13 +37,11 @@
#endif
void grpc_security_pre_init(void) {
- grpc_register_tracer("secure_endpoint", &grpc_trace_secure_endpoint);
- grpc_register_tracer("transport_security", &tsi_tracing_enabled);
+ grpc_register_tracer(&grpc_trace_secure_endpoint);
+ grpc_register_tracer(&tsi_tracing_enabled);
#ifndef NDEBUG
- grpc_register_tracer("auth_context_refcount",
- &grpc_trace_auth_context_refcount);
- grpc_register_tracer("security_connector_refcount",
- &grpc_trace_security_connector_refcount);
+ grpc_register_tracer(&grpc_trace_auth_context_refcount);
+ grpc_register_tracer(&grpc_trace_security_connector_refcount);
#endif
}
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 0cd4368..fce7f8d 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -58,7 +58,8 @@
typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type;
-grpc_tracer_flag grpc_server_channel_trace = GRPC_TRACER_INITIALIZER(false);
+grpc_tracer_flag grpc_server_channel_trace =
+ GRPC_TRACER_INITIALIZER(false, "server_channel");
typedef struct requested_call {
requested_call_type type;
@@ -975,8 +976,6 @@
if (server->cqs[i] == cq) return;
}
- grpc_cq_mark_server_cq(cq);
-
GRPC_CQ_INTERNAL_REF(cq, "server");
n = server->cq_count++;
server->cqs = gpr_realloc(server->cqs,
@@ -1156,9 +1155,8 @@
chand->channel = channel;
size_t cq_idx;
- grpc_completion_queue *accepting_cq = grpc_cq_from_pollset(accepting_pollset);
for (cq_idx = 0; cq_idx < s->cq_count; cq_idx++) {
- if (s->cqs[cq_idx] == accepting_cq) break;
+ if (grpc_cq_pollset(s->cqs[cq_idx]) == accepting_pollset) break;
}
if (cq_idx == s->cq_count) {
/* completion queue not found: pick a random one to publish new calls to */
diff --git a/src/core/lib/transport/bdp_estimator.c b/src/core/lib/transport/bdp_estimator.c
index d33e3a4..311ae63 100644
--- a/src/core/lib/transport/bdp_estimator.c
+++ b/src/core/lib/transport/bdp_estimator.c
@@ -23,7 +23,8 @@
#include <grpc/support/log.h>
#include <grpc/support/useful.h>
-grpc_tracer_flag grpc_bdp_estimator_trace = GRPC_TRACER_INITIALIZER(false);
+grpc_tracer_flag grpc_bdp_estimator_trace =
+ GRPC_TRACER_INITIALIZER(false, "bdp_estimator");
void grpc_bdp_estimator_init(grpc_bdp_estimator *estimator, const char *name) {
estimator->estimate = 65536;
diff --git a/src/core/lib/transport/connectivity_state.c b/src/core/lib/transport/connectivity_state.c
index 6fe40af..73a9178 100644
--- a/src/core/lib/transport/connectivity_state.c
+++ b/src/core/lib/transport/connectivity_state.c
@@ -24,7 +24,8 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
-grpc_tracer_flag grpc_connectivity_state_trace = GRPC_TRACER_INITIALIZER(false);
+grpc_tracer_flag grpc_connectivity_state_trace =
+ GRPC_TRACER_INITIALIZER(false, "connectivity_state");
const char *grpc_connectivity_state_name(grpc_connectivity_state state) {
switch (state) {
diff --git a/src/core/lib/transport/metadata.c b/src/core/lib/transport/metadata.c
index 87a2abf..2fea366 100644
--- a/src/core/lib/transport/metadata.c
+++ b/src/core/lib/transport/metadata.c
@@ -48,7 +48,8 @@
*/
#ifndef NDEBUG
-grpc_tracer_flag grpc_trace_metadata = GRPC_TRACER_INITIALIZER(false);
+grpc_tracer_flag grpc_trace_metadata =
+ GRPC_TRACER_INITIALIZER(false, "metadata");
#define DEBUG_ARGS , const char *file, int line
#define FWD_DEBUG_ARGS , file, line
#define REF_MD_LOCKED(shard, s) ref_md_locked((shard), (s), __FILE__, __LINE__)
diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c
index 6a9eba1..7281602 100644
--- a/src/core/lib/transport/transport.c
+++ b/src/core/lib/transport/transport.c
@@ -32,7 +32,8 @@
#include "src/core/lib/transport/transport_impl.h"
#ifndef NDEBUG
-grpc_tracer_flag grpc_trace_stream_refcount = GRPC_TRACER_INITIALIZER(false);
+grpc_tracer_flag grpc_trace_stream_refcount =
+ GRPC_TRACER_INITIALIZER(false, "stream_refcount");
#endif
#ifndef NDEBUG
diff --git a/src/core/tsi/fake_transport_security.c b/src/core/tsi/fake_transport_security.c
index 1e919c4..1280680 100644
--- a/src/core/tsi/fake_transport_security.c
+++ b/src/core/tsi/fake_transport_security.c
@@ -31,6 +31,7 @@
#define TSI_FAKE_FRAME_HEADER_SIZE 4
#define TSI_FAKE_FRAME_INITIAL_ALLOCATED_SIZE 64
#define TSI_FAKE_DEFAULT_FRAME_SIZE 16384
+#define TSI_FAKE_HANDSHAKER_OUTGOING_BUFFER_INITIAL_SIZE 256
/* --- Structure definitions. ---*/
@@ -59,8 +60,10 @@
int is_client;
tsi_fake_handshake_message next_message_to_send;
int needs_incoming_message;
- tsi_fake_frame incoming;
- tsi_fake_frame outgoing;
+ tsi_fake_frame incoming_frame;
+ tsi_fake_frame outgoing_frame;
+ unsigned char *outgoing_bytes_buffer;
+ size_t outgoing_bytes_buffer_size;
tsi_result result;
} tsi_fake_handshaker;
@@ -116,27 +119,23 @@
if (!needs_draining) frame->size = 0;
}
-/* Returns 1 if successful, 0 otherwise. */
-static int tsi_fake_frame_ensure_size(tsi_fake_frame *frame) {
+/* Checks if the frame's allocated size is at least frame->size, and reallocs
+ * more memory if necessary. */
+static void tsi_fake_frame_ensure_size(tsi_fake_frame *frame) {
if (frame->data == NULL) {
frame->allocated_size = frame->size;
frame->data = gpr_malloc(frame->allocated_size);
- if (frame->data == NULL) return 0;
} else if (frame->size > frame->allocated_size) {
unsigned char *new_data = gpr_realloc(frame->data, frame->size);
- if (new_data == NULL) {
- gpr_free(frame->data);
- frame->data = NULL;
- return 0;
- }
frame->data = new_data;
frame->allocated_size = frame->size;
}
- return 1;
}
-/* This method should not be called if frame->needs_framing is not 0. */
-static tsi_result fill_frame_from_bytes(const unsigned char *incoming_bytes,
+/* Decodes the serialized fake frame contained in incoming_bytes, and fills
+ * frame with the contents of the decoded frame.
+ * This method should not be called if frame->needs_framing is not 0. */
+static tsi_result tsi_fake_frame_decode(const unsigned char *incoming_bytes,
size_t *incoming_bytes_size,
tsi_fake_frame *frame) {
size_t available_size = *incoming_bytes_size;
@@ -147,7 +146,6 @@
if (frame->data == NULL) {
frame->allocated_size = TSI_FAKE_FRAME_INITIAL_ALLOCATED_SIZE;
frame->data = gpr_malloc(frame->allocated_size);
- if (frame->data == NULL) return TSI_OUT_OF_RESOURCES;
}
if (frame->offset < TSI_FAKE_FRAME_HEADER_SIZE) {
@@ -165,7 +163,7 @@
frame->offset += to_read_size;
available_size -= to_read_size;
frame->size = load32_little_endian(frame->data);
- if (!tsi_fake_frame_ensure_size(frame)) return TSI_OUT_OF_RESOURCES;
+ tsi_fake_frame_ensure_size(frame);
}
to_read_size = frame->size - frame->offset;
@@ -183,10 +181,12 @@
return TSI_OK;
}
-/* This method should not be called if frame->needs_framing is 0. */
-static tsi_result drain_frame_to_bytes(unsigned char *outgoing_bytes,
- size_t *outgoing_bytes_size,
- tsi_fake_frame *frame) {
+/* Encodes a fake frame into its wire format and places the result in
+ * outgoing_bytes. outgoing_bytes_size indicates the size of the encoded frame.
+ * This method should not be called if frame->needs_framing is 0. */
+static tsi_result tsi_fake_frame_encode(unsigned char *outgoing_bytes,
+ size_t *outgoing_bytes_size,
+ tsi_fake_frame *frame) {
size_t to_write_size = frame->size - frame->offset;
if (!frame->needs_draining) return TSI_INTERNAL_ERROR;
if (*outgoing_bytes_size < to_write_size) {
@@ -200,17 +200,20 @@
return TSI_OK;
}
-static tsi_result bytes_to_frame(unsigned char *bytes, size_t bytes_size,
- tsi_fake_frame *frame) {
+/* Sets the payload of a fake frame to contain the given data blob, where
+ * data_size indicates the size of data. */
+static tsi_result tsi_fake_frame_set_data(unsigned char *data, size_t data_size,
+ tsi_fake_frame *frame) {
frame->offset = 0;
- frame->size = bytes_size + TSI_FAKE_FRAME_HEADER_SIZE;
- if (!tsi_fake_frame_ensure_size(frame)) return TSI_OUT_OF_RESOURCES;
+ frame->size = data_size + TSI_FAKE_FRAME_HEADER_SIZE;
+ tsi_fake_frame_ensure_size(frame);
store32_little_endian((uint32_t)frame->size, frame->data);
- memcpy(frame->data + TSI_FAKE_FRAME_HEADER_SIZE, bytes, bytes_size);
+ memcpy(frame->data + TSI_FAKE_FRAME_HEADER_SIZE, data, data_size);
tsi_fake_frame_reset(frame, 1 /* needs draining */);
return TSI_OK;
}
+/* Destroys the contents of a fake frame. */
static void tsi_fake_frame_destruct(tsi_fake_frame *frame) {
if (frame->data != NULL) gpr_free(frame->data);
}
@@ -235,7 +238,7 @@
if (frame->needs_draining) {
drained_size = saved_output_size - *num_bytes_written;
result =
- drain_frame_to_bytes(protected_output_frames, &drained_size, frame);
+ tsi_fake_frame_encode(protected_output_frames, &drained_size, frame);
*num_bytes_written += drained_size;
protected_output_frames += drained_size;
if (result != TSI_OK) {
@@ -254,15 +257,15 @@
size_t written_in_frame_size = 0;
store32_little_endian((uint32_t)impl->max_frame_size, frame_header);
written_in_frame_size = TSI_FAKE_FRAME_HEADER_SIZE;
- result = fill_frame_from_bytes(frame_header, &written_in_frame_size, frame);
+ result = tsi_fake_frame_decode(frame_header, &written_in_frame_size, frame);
if (result != TSI_INCOMPLETE_DATA) {
- gpr_log(GPR_ERROR, "fill_frame_from_bytes returned %s",
+ gpr_log(GPR_ERROR, "tsi_fake_frame_decode returned %s",
tsi_result_to_string(result));
return result;
}
}
result =
- fill_frame_from_bytes(unprotected_bytes, unprotected_bytes_size, frame);
+ tsi_fake_frame_decode(unprotected_bytes, unprotected_bytes_size, frame);
if (result != TSI_OK) {
if (result == TSI_INCOMPLETE_DATA) result = TSI_OK;
return result;
@@ -272,7 +275,7 @@
if (!frame->needs_draining) return TSI_INTERNAL_ERROR;
if (frame->offset != 0) return TSI_INTERNAL_ERROR;
drained_size = saved_output_size - *num_bytes_written;
- result = drain_frame_to_bytes(protected_output_frames, &drained_size, frame);
+ result = tsi_fake_frame_encode(protected_output_frames, &drained_size, frame);
*num_bytes_written += drained_size;
if (result == TSI_INCOMPLETE_DATA) result = TSI_OK;
return result;
@@ -292,8 +295,8 @@
store32_little_endian((uint32_t)frame->size,
frame->data); /* Overwrite header. */
}
- result = drain_frame_to_bytes(protected_output_frames,
- protected_output_frames_size, frame);
+ result = tsi_fake_frame_encode(protected_output_frames,
+ protected_output_frames_size, frame);
if (result == TSI_INCOMPLETE_DATA) result = TSI_OK;
*still_pending_size = frame->size - frame->offset;
return result;
@@ -316,7 +319,7 @@
/* Go past the header if needed. */
if (frame->offset == 0) frame->offset = TSI_FAKE_FRAME_HEADER_SIZE;
drained_size = saved_output_size - *num_bytes_written;
- result = drain_frame_to_bytes(unprotected_bytes, &drained_size, frame);
+ result = tsi_fake_frame_encode(unprotected_bytes, &drained_size, frame);
unprotected_bytes += drained_size;
*num_bytes_written += drained_size;
if (result != TSI_OK) {
@@ -330,7 +333,7 @@
/* Now process the protected_bytes. */
if (frame->needs_draining) return TSI_INTERNAL_ERROR;
- result = fill_frame_from_bytes(protected_frames_bytes,
+ result = tsi_fake_frame_decode(protected_frames_bytes,
protected_frames_bytes_size, frame);
if (result != TSI_OK) {
if (result == TSI_INCOMPLETE_DATA) result = TSI_OK;
@@ -342,7 +345,7 @@
if (frame->offset != 0) return TSI_INTERNAL_ERROR;
frame->offset = TSI_FAKE_FRAME_HEADER_SIZE; /* Go past the header. */
drained_size = saved_output_size - *num_bytes_written;
- result = drain_frame_to_bytes(unprotected_bytes, &drained_size, frame);
+ result = tsi_fake_frame_encode(unprotected_bytes, &drained_size, frame);
*num_bytes_written += drained_size;
if (result == TSI_INCOMPLETE_DATA) result = TSI_OK;
return result;
@@ -360,6 +363,72 @@
fake_protector_unprotect, fake_protector_destroy,
};
+/* --- tsi_handshaker_result methods implementation. ---*/
+
+typedef struct {
+ tsi_handshaker_result base;
+ unsigned char *unused_bytes;
+ size_t unused_bytes_size;
+} fake_handshaker_result;
+
+static tsi_result fake_handshaker_result_extract_peer(
+ const tsi_handshaker_result *self, tsi_peer *peer) {
+ /* Construct a tsi_peer with 1 property: certificate type. */
+ tsi_result result = tsi_construct_peer(1, peer);
+ if (result != TSI_OK) return result;
+ result = tsi_construct_string_peer_property_from_cstring(
+ TSI_CERTIFICATE_TYPE_PEER_PROPERTY, TSI_FAKE_CERTIFICATE_TYPE,
+ &peer->properties[0]);
+ if (result != TSI_OK) tsi_peer_destruct(peer);
+ return result;
+}
+
+static tsi_result fake_handshaker_result_create_frame_protector(
+ const tsi_handshaker_result *self, size_t *max_output_protected_frame_size,
+ tsi_frame_protector **protector) {
+ *protector = tsi_create_fake_frame_protector(max_output_protected_frame_size);
+ return TSI_OK;
+}
+
+static tsi_result fake_handshaker_result_get_unused_bytes(
+ const tsi_handshaker_result *self, unsigned char **bytes,
+ size_t *bytes_size) {
+ fake_handshaker_result *result = (fake_handshaker_result *)self;
+ *bytes_size = result->unused_bytes_size;
+ *bytes = result->unused_bytes;
+ return TSI_OK;
+}
+
+static void fake_handshaker_result_destroy(tsi_handshaker_result *self) {
+ fake_handshaker_result *result = (fake_handshaker_result *)self;
+ gpr_free(result->unused_bytes);
+ gpr_free(self);
+}
+
+static const tsi_handshaker_result_vtable handshaker_result_vtable = {
+ fake_handshaker_result_extract_peer,
+ fake_handshaker_result_create_frame_protector,
+ fake_handshaker_result_get_unused_bytes, fake_handshaker_result_destroy,
+};
+
+static tsi_result fake_handshaker_result_create(
+ const unsigned char *unused_bytes, size_t unused_bytes_size,
+ tsi_handshaker_result **handshaker_result) {
+ if ((unused_bytes_size > 0 && unused_bytes == NULL) ||
+ handshaker_result == NULL) {
+ return TSI_INVALID_ARGUMENT;
+ }
+ fake_handshaker_result *result = gpr_zalloc(sizeof(*result));
+ result->base.vtable = &handshaker_result_vtable;
+ if (unused_bytes_size > 0) {
+ result->unused_bytes = gpr_malloc(unused_bytes_size);
+ memcpy(result->unused_bytes, unused_bytes, unused_bytes_size);
+ }
+ result->unused_bytes_size = unused_bytes_size;
+ *handshaker_result = &result->base;
+ return TSI_OK;
+}
+
/* --- tsi_handshaker methods implementation. ---*/
static tsi_result fake_handshaker_get_bytes_to_send_to_peer(
@@ -370,13 +439,13 @@
*bytes_size = 0;
return TSI_OK;
}
- if (!impl->outgoing.needs_draining) {
+ if (!impl->outgoing_frame.needs_draining) {
tsi_fake_handshake_message next_message_to_send =
impl->next_message_to_send + 2;
const char *msg_string =
tsi_fake_handshake_message_to_string(impl->next_message_to_send);
- result = bytes_to_frame((unsigned char *)msg_string, strlen(msg_string),
- &impl->outgoing);
+ result = tsi_fake_frame_set_data((unsigned char *)msg_string,
+ strlen(msg_string), &impl->outgoing_frame);
if (result != TSI_OK) return result;
if (next_message_to_send > TSI_FAKE_HANDSHAKE_MESSAGE_MAX) {
next_message_to_send = TSI_FAKE_HANDSHAKE_MESSAGE_MAX;
@@ -388,7 +457,7 @@
}
impl->next_message_to_send = next_message_to_send;
}
- result = drain_frame_to_bytes(bytes, bytes_size, &impl->outgoing);
+ result = tsi_fake_frame_encode(bytes, bytes_size, &impl->outgoing_frame);
if (result != TSI_OK) return result;
if (!impl->is_client &&
impl->next_message_to_send == TSI_FAKE_HANDSHAKE_MESSAGE_MAX) {
@@ -414,12 +483,12 @@
*bytes_size = 0;
return TSI_OK;
}
- result = fill_frame_from_bytes(bytes, bytes_size, &impl->incoming);
+ result = tsi_fake_frame_decode(bytes, bytes_size, &impl->incoming_frame);
if (result != TSI_OK) return result;
/* We now have a complete frame. */
result = tsi_fake_handshake_message_from_string(
- (const char *)impl->incoming.data + TSI_FAKE_FRAME_HEADER_SIZE,
+ (const char *)impl->incoming_frame.data + TSI_FAKE_FRAME_HEADER_SIZE,
&received_msg);
if (result != TSI_OK) {
impl->result = result;
@@ -434,7 +503,7 @@
gpr_log(GPR_INFO, "%s received %s.", impl->is_client ? "Client" : "Server",
tsi_fake_handshake_message_to_string(received_msg));
}
- tsi_fake_frame_reset(&impl->incoming, 0 /* needs_draining */);
+ tsi_fake_frame_reset(&impl->incoming_frame, 0 /* needs_draining */);
impl->needs_incoming_message = 0;
if (impl->next_message_to_send == TSI_FAKE_HANDSHAKE_MESSAGE_MAX) {
/* We're done. */
@@ -451,40 +520,86 @@
return impl->result;
}
-static tsi_result fake_handshaker_extract_peer(tsi_handshaker *self,
- tsi_peer *peer) {
- tsi_result result = tsi_construct_peer(1, peer);
- if (result != TSI_OK) return result;
- result = tsi_construct_string_peer_property_from_cstring(
- TSI_CERTIFICATE_TYPE_PEER_PROPERTY, TSI_FAKE_CERTIFICATE_TYPE,
- &peer->properties[0]);
- if (result != TSI_OK) tsi_peer_destruct(peer);
- return result;
-}
-
-static tsi_result fake_handshaker_create_frame_protector(
- tsi_handshaker *self, size_t *max_protected_frame_size,
- tsi_frame_protector **protector) {
- *protector = tsi_create_fake_protector(max_protected_frame_size);
- if (*protector == NULL) return TSI_OUT_OF_RESOURCES;
- return TSI_OK;
-}
-
static void fake_handshaker_destroy(tsi_handshaker *self) {
tsi_fake_handshaker *impl = (tsi_fake_handshaker *)self;
- tsi_fake_frame_destruct(&impl->incoming);
- tsi_fake_frame_destruct(&impl->outgoing);
+ tsi_fake_frame_destruct(&impl->incoming_frame);
+ tsi_fake_frame_destruct(&impl->outgoing_frame);
+ gpr_free(impl->outgoing_bytes_buffer);
gpr_free(self);
}
+static tsi_result fake_handshaker_next(
+ tsi_handshaker *self, const unsigned char *received_bytes,
+ size_t received_bytes_size, unsigned char **bytes_to_send,
+ size_t *bytes_to_send_size, tsi_handshaker_result **handshaker_result,
+ tsi_handshaker_on_next_done_cb cb, void *user_data) {
+ /* Sanity check the arguments. */
+ if ((received_bytes_size > 0 && received_bytes == NULL) ||
+ bytes_to_send == NULL || bytes_to_send_size == NULL ||
+ handshaker_result == NULL) {
+ return TSI_INVALID_ARGUMENT;
+ }
+ tsi_fake_handshaker *handshaker = (tsi_fake_handshaker *)self;
+ tsi_result result = TSI_OK;
+
+ /* Decode and process a handshake frame from the peer. */
+ size_t consumed_bytes_size = received_bytes_size;
+ if (received_bytes_size > 0) {
+ result = fake_handshaker_process_bytes_from_peer(self, received_bytes,
+ &consumed_bytes_size);
+ if (result != TSI_OK) return result;
+ }
+
+ /* Create a handshake message to send to the peer and encode it as a fake
+ * frame. */
+ size_t offset = 0;
+ do {
+ size_t sent_bytes_size = handshaker->outgoing_bytes_buffer_size - offset;
+ result = fake_handshaker_get_bytes_to_send_to_peer(
+ self, handshaker->outgoing_bytes_buffer + offset, &sent_bytes_size);
+ offset += sent_bytes_size;
+ if (result == TSI_INCOMPLETE_DATA) {
+ handshaker->outgoing_bytes_buffer_size *= 2;
+ handshaker->outgoing_bytes_buffer =
+ gpr_realloc(handshaker->outgoing_bytes_buffer,
+ handshaker->outgoing_bytes_buffer_size);
+ }
+ } while (result == TSI_INCOMPLETE_DATA);
+ if (result != TSI_OK) return result;
+ *bytes_to_send = handshaker->outgoing_bytes_buffer;
+ *bytes_to_send_size = offset;
+
+ /* Check if the handshake was completed. */
+ if (fake_handshaker_get_result(self) == TSI_HANDSHAKE_IN_PROGRESS) {
+ *handshaker_result = NULL;
+ } else {
+ /* Calculate the unused bytes. */
+ const unsigned char *unused_bytes = NULL;
+ size_t unused_bytes_size = received_bytes_size - consumed_bytes_size;
+ if (unused_bytes_size > 0) {
+ unused_bytes = received_bytes + consumed_bytes_size;
+ }
+
+ /* Create a handshaker_result containing the unused bytes. */
+ result = fake_handshaker_result_create(unused_bytes, unused_bytes_size,
+ handshaker_result);
+ if (result == TSI_OK) {
+ /* Indicate that the handshake has completed and that a handshaker_result
+ * has been created. */
+ self->handshaker_result_created = true;
+ }
+ }
+ return result;
+}
+
static const tsi_handshaker_vtable handshaker_vtable = {
- fake_handshaker_get_bytes_to_send_to_peer,
- fake_handshaker_process_bytes_from_peer,
- fake_handshaker_get_result,
- fake_handshaker_extract_peer,
- fake_handshaker_create_frame_protector,
+ NULL, /* get_bytes_to_send_to_peer -- deprecated */
+ NULL, /* process_bytes_from_peer -- deprecated */
+ NULL, /* get_result -- deprecated */
+ NULL, /* extract_peer -- deprecated */
+ NULL, /* create_frame_protector -- deprecated */
fake_handshaker_destroy,
- NULL,
+ fake_handshaker_next,
};
tsi_handshaker *tsi_create_fake_handshaker(int is_client) {
@@ -492,6 +607,9 @@
impl->base.vtable = &handshaker_vtable;
impl->is_client = is_client;
impl->result = TSI_HANDSHAKE_IN_PROGRESS;
+ impl->outgoing_bytes_buffer_size =
+ TSI_FAKE_HANDSHAKER_OUTGOING_BUFFER_INITIAL_SIZE;
+ impl->outgoing_bytes_buffer = gpr_malloc(impl->outgoing_bytes_buffer_size);
if (is_client) {
impl->needs_incoming_message = 0;
impl->next_message_to_send = TSI_FAKE_CLIENT_INIT;
@@ -502,7 +620,7 @@
return &impl->base;
}
-tsi_frame_protector *tsi_create_fake_protector(
+tsi_frame_protector *tsi_create_fake_frame_protector(
size_t *max_protected_frame_size) {
tsi_fake_frame_protector *impl = gpr_zalloc(sizeof(*impl));
impl->max_frame_size = (max_protected_frame_size == NULL)
diff --git a/src/core/tsi/fake_transport_security.h b/src/core/tsi/fake_transport_security.h
index 3d468c4..934b3cb 100644
--- a/src/core/tsi/fake_transport_security.h
+++ b/src/core/tsi/fake_transport_security.h
@@ -36,7 +36,7 @@
tsi_handshaker *tsi_create_fake_handshaker(int is_client);
/* Creates a protector directly without going through the handshake phase. */
-tsi_frame_protector *tsi_create_fake_protector(
+tsi_frame_protector *tsi_create_fake_frame_protector(
size_t *max_protected_frame_size);
#ifdef __cplusplus
diff --git a/src/core/tsi/transport_security.c b/src/core/tsi/transport_security.c
index 08fa431..be11d64 100644
--- a/src/core/tsi/transport_security.c
+++ b/src/core/tsi/transport_security.c
@@ -26,7 +26,7 @@
/* --- Tracing. --- */
-grpc_tracer_flag tsi_tracing_enabled = GRPC_TRACER_INITIALIZER(false);
+grpc_tracer_flag tsi_tracing_enabled = GRPC_TRACER_INITIALIZER(false, "tsi");
/* --- tsi_result common implementation. --- */
diff --git a/src/cpp/OWNERS b/src/cpp/OWNERS
new file mode 100644
index 0000000..8dca75c
--- /dev/null
+++ b/src/cpp/OWNERS
@@ -0,0 +1,4 @@
+@ctiller
+@markdroth
+@dgquintas
+
diff --git a/src/csharp/OWNERS b/src/csharp/OWNERS
new file mode 100644
index 0000000..bde3ed9
--- /dev/null
+++ b/src/csharp/OWNERS
@@ -0,0 +1,3 @@
+@jtattermusch
+@apolcyn
+
diff --git a/src/node/OWNERS b/src/node/OWNERS
new file mode 100644
index 0000000..3a49353
--- /dev/null
+++ b/src/node/OWNERS
@@ -0,0 +1,2 @@
+@murgatroid99
+
diff --git a/src/objective-c/OWNERS b/src/objective-c/OWNERS
new file mode 100644
index 0000000..2bc9118
--- /dev/null
+++ b/src/objective-c/OWNERS
@@ -0,0 +1,3 @@
+@muxi
+@makdharma
+
diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.h b/src/objective-c/RxLibrary/GRXBufferedPipe.h
index bd7d4ad..a871ea8 100644
--- a/src/objective-c/RxLibrary/GRXBufferedPipe.h
+++ b/src/objective-c/RxLibrary/GRXBufferedPipe.h
@@ -27,8 +27,8 @@
* immediately, unless flow control prevents it.
* If it is throttled and keeps receiving values, as well as if it receives values before being
* started, it will buffer them and propagate them in order as soon as its state becomes Started.
- * If it receives an error (via -writesFinishedWithError:), it will drop any buffered values and
- * propagate the error immediately.
+ * If it receives an end of stream (via -writesFinishedWithError:), it will buffer the EOS after the
+ * last buffered value and issue it to the writeable after all buffered values are issued.
*
* Beware that a pipe of this type can't prevent receiving more values when it is paused (for
* example if used to write data to a congested network connection). Because in such situations the
diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m
index e4a7cc4..99cb0ad 100644
--- a/src/objective-c/RxLibrary/GRXBufferedPipe.m
+++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m
@@ -18,11 +18,13 @@
#import "GRXBufferedPipe.h"
+@interface GRXBufferedPipe ()
+@property(atomic) id<GRXWriteable> writeable;
+@end
+
@implementation GRXBufferedPipe {
- id<GRXWriteable> _writeable;
- NSMutableArray *_queue;
- BOOL _inputIsFinished;
NSError *_errorOrNil;
+ dispatch_queue_t _writeQueue;
}
@synthesize state = _state;
@@ -33,99 +35,79 @@
- (instancetype)init {
if (self = [super init]) {
- _queue = [NSMutableArray array];
_state = GRXWriterStateNotStarted;
+ _writeQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
+ dispatch_suspend(_writeQueue);
}
return self;
}
-- (id)popValue {
- id value = _queue[0];
- [_queue removeObjectAtIndex:0];
- return value;
-}
-
-- (void)writeBufferUntilPausedOrStopped {
- while (_state == GRXWriterStateStarted && _queue.count > 0) {
- [_writeable writeValue:[self popValue]];
- }
- if (_inputIsFinished && _queue.count == 0) {
- // Our writer finished normally while we were paused or not-started-yet.
- [self finishWithError:_errorOrNil];
- }
-}
-
#pragma mark GRXWriteable implementation
-// Returns whether events can be simply propagated to the other end of the pipe.
-- (BOOL)shouldFastForward {
- return _state == GRXWriterStateStarted && _queue.count == 0;
-}
-
- (void)writeValue:(id)value {
- if (self.shouldFastForward) {
- // Skip the queue.
- [_writeable writeValue:value];
- } else {
+ if ([value respondsToSelector:@selector(copy)]) {
// Even if we're paused and with enqueued values, we can't excert back-pressure to our writer.
// So just buffer the new value.
// We need a copy, so that it doesn't mutate before it's written at the other end of the pipe.
- if ([value respondsToSelector:@selector(copy)]) {
- value = [value copy];
- }
- [_queue addObject:value];
+ value = [value copy];
}
+ __weak GRXBufferedPipe *weakSelf = self;
+ dispatch_async(_writeQueue, ^(void) {
+ [weakSelf.writeable writeValue:value];
+ });
}
- (void)writesFinishedWithError:(NSError *)errorOrNil {
- _inputIsFinished = YES;
- _errorOrNil = errorOrNil;
- if (errorOrNil || self.shouldFastForward) {
- // No need to write pending values.
- [self finishWithError:_errorOrNil];
- }
+ __weak GRXBufferedPipe *weakSelf = self;
+ dispatch_async(_writeQueue, ^{
+ [weakSelf finishWithError:errorOrNil];
+ });
}
#pragma mark GRXWriter implementation
- (void)setState:(GRXWriterState)newState {
- // Manual transitions are only allowed from the started or paused states.
- if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) {
- return;
- }
+ @synchronized (self) {
+ // Manual transitions are only allowed from the started or paused states.
+ if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) {
+ return;
+ }
- switch (newState) {
- case GRXWriterStateFinished:
- _state = newState;
- _queue = nil;
- // Per GRXWriter's contract, setting the state to Finished manually means one doesn't wish the
- // writeable to be messaged anymore.
- _writeable = nil;
- return;
- case GRXWriterStatePaused:
- _state = newState;
- return;
- case GRXWriterStateStarted:
- if (_state == GRXWriterStatePaused) {
+ switch (newState) {
+ case GRXWriterStateFinished:
+ self.writeable = nil;
+ if (_state == GRXWriterStatePaused) {
+ dispatch_resume(_writeQueue);
+ }
_state = newState;
- [self writeBufferUntilPausedOrStopped];
- }
- return;
- case GRXWriterStateNotStarted:
- return;
+ return;
+ case GRXWriterStatePaused:
+ if (_state == GRXWriterStateStarted) {
+ _state = newState;
+ dispatch_suspend(_writeQueue);
+ }
+ return;
+ case GRXWriterStateStarted:
+ if (_state == GRXWriterStatePaused) {
+ _state = newState;
+ dispatch_resume(_writeQueue);
+ }
+ return;
+ case GRXWriterStateNotStarted:
+ return;
+ }
}
}
- (void)startWithWriteable:(id<GRXWriteable>)writeable {
+ self.writeable = writeable;
_state = GRXWriterStateStarted;
- _writeable = writeable;
- [self writeBufferUntilPausedOrStopped];
+ dispatch_resume(_writeQueue);
}
- (void)finishWithError:(NSError *)errorOrNil {
- id<GRXWriteable> writeable = _writeable;
+ [self.writeable writesFinishedWithError:errorOrNil];
self.state = GRXWriterStateFinished;
- [writeable writesFinishedWithError:errorOrNil];
}
@end
diff --git a/src/objective-c/tests/RxLibraryUnitTests.m b/src/objective-c/tests/RxLibraryUnitTests.m
index f152452..fa3ded4 100644
--- a/src/objective-c/tests/RxLibraryUnitTests.m
+++ b/src/objective-c/tests/RxLibraryUnitTests.m
@@ -23,6 +23,8 @@
#import <RxLibrary/GRXWriteable.h>
#import <RxLibrary/GRXWriter.h>
+#define TEST_TIMEOUT 1
+
// A mock of a GRXSingleValueHandler block that can be queried for how many times it was called and
// what were the last values passed to it.
//
@@ -140,26 +142,38 @@
#pragma mark BufferedPipe
- (void)testBufferedPipePropagatesValue {
+ __weak XCTestExpectation *expectation = [self expectationWithDescription:@"Response received"];
// Given:
CapturingSingleValueHandler *handler = [CapturingSingleValueHandler handler];
- id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:handler.block];
+ id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:^(id value, NSError *errorOrNil) {
+ handler.block(value, errorOrNil);
+ [expectation fulfill];
+ }];
+
id anyValue = @7;
// If:
GRXBufferedPipe *pipe = [GRXBufferedPipe pipe];
[pipe startWithWriteable:writeable];
[pipe writeValue:anyValue];
+ [pipe writesFinishedWithError:nil];
// Then:
+ [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil];
XCTAssertEqual(handler.timesCalled, 1);
XCTAssertEqualObjects(handler.value, anyValue);
XCTAssertEqualObjects(handler.errorOrNil, nil);
+
}
- (void)testBufferedPipePropagatesError {
+ __weak XCTestExpectation *expectation = [self expectationWithDescription:@"Response received"];
// Given:
CapturingSingleValueHandler *handler = [CapturingSingleValueHandler handler];
- id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:handler.block];
+ id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:^(id value, NSError *errorOrNil) {
+ handler.block(value, errorOrNil);
+ [expectation fulfill];
+ }];
NSError *anyError = [NSError errorWithDomain:@"domain" code:7 userInfo:nil];
// If:
@@ -168,15 +182,20 @@
[pipe writesFinishedWithError:anyError];
// Then:
+ [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil];
XCTAssertEqual(handler.timesCalled, 1);
XCTAssertEqualObjects(handler.value, nil);
XCTAssertEqualObjects(handler.errorOrNil, anyError);
}
- (void)testBufferedPipeFinishWriteWhilePaused {
+ __weak XCTestExpectation *expectation = [self expectationWithDescription:@"Response received"];
// Given:
CapturingSingleValueHandler *handler = [CapturingSingleValueHandler handler];
- id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:handler.block];
+ id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:^(id value, NSError *errorOrNil) {
+ handler.block(value, errorOrNil);
+ [expectation fulfill];
+ }];
id anyValue = @7;
// If:
@@ -188,6 +207,7 @@
[pipe startWithWriteable:writeable];
// Then:
+ [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil];
XCTAssertEqual(handler.timesCalled, 1);
XCTAssertEqualObjects(handler.value, anyValue);
XCTAssertEqualObjects(handler.errorOrNil, nil);
diff --git a/src/php/OWNERS b/src/php/OWNERS
new file mode 100644
index 0000000..7065b41
--- /dev/null
+++ b/src/php/OWNERS
@@ -0,0 +1,3 @@
+@stanley-cheung
+@murgatroid99
+
diff --git a/src/python/OWNERS b/src/python/OWNERS
new file mode 100644
index 0000000..73a707c
--- /dev/null
+++ b/src/python/OWNERS
@@ -0,0 +1,3 @@
+@nathanielmanistaatgoogle
+@kpayson64
+
diff --git a/src/ruby/OWNERS b/src/ruby/OWNERS
new file mode 100644
index 0000000..b01fd80
--- /dev/null
+++ b/src/ruby/OWNERS
@@ -0,0 +1,3 @@
+@apolcyn
+@murgatroid99
+
diff --git a/templates/binding.gyp.template b/templates/binding.gyp.template
index b304011..b7560fe 100644
--- a/templates/binding.gyp.template
+++ b/templates/binding.gyp.template
@@ -64,6 +64,14 @@
],
% endif
% endfor
+ 'cflags_c': [
+ '-Werror',
+ '-std=c99'
+ ],
+ 'cflags_cc': [
+ '-Werror',
+ '-std=c++11'
+ ],
'include_dirs': [
'.',
'include'
@@ -154,6 +162,22 @@
'<(node_root_dir)/deps/zlib',
'<(node_root_dir)/deps/cares/include'
]
+ }],
+ ['OS == "mac"', {
+ 'xcode_settings': {
+ 'MACOSX_DEPLOYMENT_TARGET': '10.9'
+ },
+ % if defaults['global'].get('CPPFLAGS', None) is not None:
+ 'OTHER_CFLAGS': [
+ % for item in defaults['global'].get('CPPFLAGS').split():
+ '${item}',
+ % endfor
+ ],
+ 'OTHER_CPLUSPLUSFLAGS': [
+ '-stdlib=libc++',
+ '-std=c++11'
+ ],
+ % endif
}]
]
},
@@ -164,12 +188,6 @@
% for lib in libs:
% if lib.name in module.transitive_deps and lib.name == 'boringssl':
{
- 'cflags': [
- '-std=c++11',
- '-std=c99',
- '-Wall',
- '-Werror'
- ],
'target_name': '${lib.name}',
'product_prefix': 'lib',
'type': 'static_library',
@@ -183,17 +201,6 @@
'${source}',
% endfor
],
- 'conditions': [
- ['OS=="mac"', {
- 'xcode_settings': {
- 'MACOSX_DEPLOYMENT_TARGET': '10.9',
- 'OTHER_CPLUSPLUSFLAGS': [
- '-stdlib=libc++',
- '-std=c++11'
- ],
- }
- }],
- ],
},
% endif
% endfor
@@ -237,11 +244,6 @@
% for lib in libs:
% if lib.name in module.transitive_deps and lib.name == 'z':
{
- 'cflags': [
- '-std=c99',
- '-Wall',
- '-Werror'
- ],
'target_name': '${lib.name}',
'product_prefix': 'lib',
'type': 'static_library',
@@ -267,11 +269,6 @@
% for lib in libs:
% if lib.name in module.transitive_deps and lib.name not in ('boringssl', 'z'):
{
- 'cflags': [
- '-std=c99',
- '-Wall',
- '-Werror'
- ],
'target_name': '${lib.name}',
'product_prefix': 'lib',
'type': 'static_library',
@@ -285,13 +282,6 @@
'${source}',
% endfor
],
- "conditions": [
- ['OS == "mac"', {
- 'xcode_settings': {
- 'MACOSX_DEPLOYMENT_TARGET': '10.9'
- }
- }]
- ]
},
% endif
% endfor
@@ -300,7 +290,6 @@
"<!(node -e \"require('nan')\")"
],
'cflags': [
- '-std=c++11',
'-pthread',
'-zdefs',
'-Wno-error=deprecated-declarations'
@@ -315,15 +304,6 @@
% endfor
]
}],
- ['OS=="mac"', {
- 'xcode_settings': {
- 'MACOSX_DEPLOYMENT_TARGET': '10.9',
- 'OTHER_CFLAGS': [
- '-stdlib=libc++',
- '-std=c++11'
- ]
- }
- }],
['OS=="win"', {
'dependencies': [
% for dep in getattr(module, 'deps', []):
diff --git a/templates/package.json.template b/templates/package.json.template
index af13d52..50893d3 100644
--- a/templates/package.json.template
+++ b/templates/package.json.template
@@ -58,7 +58,7 @@
},
"binary": {
"module_name": "grpc_node",
- "module_path": "src/node/extension_binary",
+ "module_path": "src/node/extension_binary/{node_abi}-{platform}-{arch}",
"host": "https://storage.googleapis.com/",
"remote_path": "grpc-precompiled-binaries/node/{name}/v{version}",
"package_name": "{node_abi}-{platform}-{arch}.tar.gz"
diff --git a/test/build/OWNERS b/test/build/OWNERS
new file mode 100644
index 0000000..8dca75c
--- /dev/null
+++ b/test/build/OWNERS
@@ -0,0 +1,4 @@
+@ctiller
+@markdroth
+@dgquintas
+
diff --git a/test/core/OWNERS b/test/core/OWNERS
new file mode 100644
index 0000000..8dca75c
--- /dev/null
+++ b/test/core/OWNERS
@@ -0,0 +1,4 @@
+@ctiller
+@markdroth
+@dgquintas
+
diff --git a/test/core/security/secure_endpoint_test.c b/test/core/security/secure_endpoint_test.c
index fd8af2f..7ecd947 100644
--- a/test/core/security/secure_endpoint_test.c
+++ b/test/core/security/secure_endpoint_test.c
@@ -38,8 +38,10 @@
static grpc_endpoint_test_fixture secure_endpoint_create_fixture_tcp_socketpair(
size_t slice_size, grpc_slice *leftover_slices, size_t leftover_nslices) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- tsi_frame_protector *fake_read_protector = tsi_create_fake_protector(NULL);
- tsi_frame_protector *fake_write_protector = tsi_create_fake_protector(NULL);
+ tsi_frame_protector *fake_read_protector =
+ tsi_create_fake_frame_protector(NULL);
+ tsi_frame_protector *fake_write_protector =
+ tsi_create_fake_frame_protector(NULL);
grpc_endpoint_test_fixture f;
grpc_endpoint_pair tcp;
diff --git a/test/core/surface/completion_queue_test.c b/test/core/surface/completion_queue_test.c
index c27337a..f9d88d6 100644
--- a/test/core/surface/completion_queue_test.c
+++ b/test/core/surface/completion_queue_test.c
@@ -93,7 +93,7 @@
attr.cq_polling_type = polling_types[j];
cq = grpc_completion_queue_create(
grpc_completion_queue_factory_lookup(&attr), &attr, NULL);
- GPR_ASSERT(grpc_cq_from_pollset(grpc_cq_pollset(cq)) == cq);
+ GPR_ASSERT(grpc_cq_pollset(cq) != NULL);
shutdown_and_destroy(cq);
}
}
diff --git a/test/cpp/OWNERS b/test/cpp/OWNERS
new file mode 100644
index 0000000..8dca75c
--- /dev/null
+++ b/test/cpp/OWNERS
@@ -0,0 +1,4 @@
+@ctiller
+@markdroth
+@dgquintas
+
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index a2a6e36..7b78071 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -1752,7 +1752,9 @@
messages.push_back(big_msg);
}
- for (auto health_check_service : {false, true}) {
+ // TODO (sreek) Renable tests with health check service after the issue
+ // https://github.com/grpc/grpc/issues/11223 is resolved
+ for (auto health_check_service : {false}) {
for (auto cred = credentials_types.begin(); cred != credentials_types.end();
++cred) {
for (auto msg = messages.begin(); msg != messages.end(); msg++) {
diff --git a/test/cpp/qps/BUILD b/test/cpp/qps/BUILD
index 4d8b3b4..ed2f9ed 100644
--- a/test/cpp/qps/BUILD
+++ b/test/cpp/qps/BUILD
@@ -121,6 +121,7 @@
"//:gpr",
"//test/core/util:gpr_test_util",
"//test/core/util:grpc_test_util",
+ "//test/cpp/util:test_config",
"//test/cpp/util:test_util",
],
)
@@ -131,6 +132,7 @@
deps = [
":histogram",
":interarrival",
+ "//test/cpp/util:test_config",
],
)
@@ -141,6 +143,7 @@
":benchmark_config",
":driver_impl",
"//:grpc++",
+ "//test/cpp/util:test_config",
],
external_deps = [
"gflags",
@@ -154,6 +157,7 @@
":benchmark_config",
":driver_impl",
":qps_worker_impl",
+ "//test/cpp/util:test_config",
],
)
@@ -164,6 +168,7 @@
":benchmark_config",
":driver_impl",
"//:grpc++",
+ "//test/cpp/util:test_config",
],
)
diff --git a/test/cpp/qps/benchmark_config.cc b/test/cpp/qps/benchmark_config.cc
index 8f53670..fb1e060 100644
--- a/test/cpp/qps/benchmark_config.cc
+++ b/test/cpp/qps/benchmark_config.cc
@@ -54,10 +54,6 @@
namespace grpc {
namespace testing {
-void InitBenchmark(int* argc, char*** argv, bool remove_flags) {
- ParseCommandLineFlags(argc, argv, remove_flags);
-}
-
static std::shared_ptr<Reporter> InitBenchmarkReporters() {
auto* composite_reporter = new CompositeReporter;
if (FLAGS_enable_log_reporter) {
diff --git a/test/cpp/qps/benchmark_config.h b/test/cpp/qps/benchmark_config.h
index 054dbb2..d3d6910 100644
--- a/test/cpp/qps/benchmark_config.h
+++ b/test/cpp/qps/benchmark_config.h
@@ -20,15 +20,12 @@
#define GRPC_TEST_CPP_UTIL_BENCHMARK_CONFIG_H
#include <memory>
-#include <vector>
#include "test/cpp/qps/report.h"
namespace grpc {
namespace testing {
-void InitBenchmark(int* argc, char*** argv, bool remove_flags);
-
/** Returns the benchmark Reporter instance.
*
* The returned instance will take care of generating reports for all the actual
diff --git a/test/cpp/qps/qps_interarrival_test.cc b/test/cpp/qps/qps_interarrival_test.cc
index d19266f..87f09e8 100644
--- a/test/cpp/qps/qps_interarrival_test.cc
+++ b/test/cpp/qps/qps_interarrival_test.cc
@@ -23,6 +23,7 @@
#include <grpc/support/histogram.h>
#include "test/cpp/qps/interarrival.h"
+#include "test/cpp/util/test_config.h"
using grpc::testing::RandomDistInterface;
using grpc::testing::InterarrivalTimer;
@@ -50,6 +51,8 @@
using grpc::testing::ExpDist;
int main(int argc, char **argv) {
+ grpc::testing::InitTest(&argc, &argv, true);
+
RunTest(ExpDist(10.0), 5, std::string("Exponential(10)"));
return 0;
}
diff --git a/test/cpp/qps/qps_json_driver.cc b/test/cpp/qps/qps_json_driver.cc
index 590c22e..e1e5802 100644
--- a/test/cpp/qps/qps_json_driver.cc
+++ b/test/cpp/qps/qps_json_driver.cc
@@ -30,6 +30,7 @@
#include "test/cpp/qps/driver.h"
#include "test/cpp/qps/parse_json.h"
#include "test/cpp/qps/report.h"
+#include "test/cpp/util/test_config.h"
DEFINE_string(scenarios_file, "",
"JSON file containing an array of Scenario objects");
@@ -219,7 +220,7 @@
} // namespace grpc
int main(int argc, char** argv) {
- grpc::testing::InitBenchmark(&argc, &argv, true);
+ grpc::testing::InitTest(&argc, &argv, true);
bool ok = grpc::testing::QpsDriver();
diff --git a/test/cpp/qps/qps_openloop_test.cc b/test/cpp/qps/qps_openloop_test.cc
index 28dec4f..b2503da 100644
--- a/test/cpp/qps/qps_openloop_test.cc
+++ b/test/cpp/qps/qps_openloop_test.cc
@@ -20,10 +20,11 @@
#include <grpc/support/log.h>
-#include "test/core/util/test_config.h"
#include "test/cpp/qps/benchmark_config.h"
#include "test/cpp/qps/driver.h"
#include "test/cpp/qps/report.h"
+#include "test/core/util/test_config.h"
+#include "test/cpp/util/test_config.h"
namespace grpc {
namespace testing {
@@ -58,7 +59,7 @@
} // namespace grpc
int main(int argc, char** argv) {
- grpc::testing::InitBenchmark(&argc, &argv, true);
+ grpc::testing::InitTest(&argc, &argv, true);
grpc::testing::RunQPS();
diff --git a/test/cpp/qps/qps_test.cc b/test/cpp/qps/qps_test.cc
deleted file mode 100644
index 6ab2102..0000000
--- a/test/cpp/qps/qps_test.cc
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- *
- * Copyright 2015 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <set>
-
-#include <grpc/support/log.h>
-
-#include "test/cpp/qps/benchmark_config.h"
-#include "test/cpp/qps/driver.h"
-#include "test/cpp/qps/report.h"
-
-namespace grpc {
-namespace testing {
-
-static const int WARMUP = 20;
-static const int BENCHMARK = 20;
-
-static void RunQPS() {
- gpr_log(GPR_INFO, "Running QPS test");
-
- ClientConfig client_config;
- client_config.set_client_type(ASYNC_CLIENT);
- client_config.set_outstanding_rpcs_per_channel(100);
- client_config.set_client_channels(64);
- client_config.set_async_client_threads(8);
- client_config.set_rpc_type(STREAMING);
- client_config.mutable_load_params()->mutable_closed_loop();
-
- ServerConfig server_config;
- server_config.set_server_type(ASYNC_SERVER);
- server_config.set_async_server_threads(8);
-
- const auto result =
- RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
-
- GetReporter()->ReportQPSPerCore(*result);
- GetReporter()->ReportLatency(*result);
-}
-
-} // namespace testing
-} // namespace grpc
-
-int main(int argc, char** argv) {
- grpc::testing::InitBenchmark(&argc, &argv, true);
-
- grpc::testing::RunQPS();
-
- return 0;
-}
diff --git a/test/cpp/qps/secure_sync_unary_ping_pong_test.cc b/test/cpp/qps/secure_sync_unary_ping_pong_test.cc
index 5863926..1ee6e37 100644
--- a/test/cpp/qps/secure_sync_unary_ping_pong_test.cc
+++ b/test/cpp/qps/secure_sync_unary_ping_pong_test.cc
@@ -23,6 +23,7 @@
#include "test/cpp/qps/benchmark_config.h"
#include "test/cpp/qps/driver.h"
#include "test/cpp/qps/report.h"
+#include "test/cpp/util/test_config.h"
namespace grpc {
namespace testing {
@@ -61,7 +62,7 @@
} // namespace grpc
int main(int argc, char** argv) {
- grpc::testing::InitBenchmark(&argc, &argv, true);
+ grpc::testing::InitTest(&argc, &argv, true);
grpc::testing::RunSynchronousUnaryPingPong();
diff --git a/tools/OWNERS b/tools/OWNERS
new file mode 100644
index 0000000..8f6de53
--- /dev/null
+++ b/tools/OWNERS
@@ -0,0 +1,4 @@
+@matt-kwong
+@jtattermusch
+@nicolasnoble
+
diff --git a/tools/codegen/core/OWNERS b/tools/codegen/core/OWNERS
new file mode 100644
index 0000000..17e6647
--- /dev/null
+++ b/tools/codegen/core/OWNERS
@@ -0,0 +1,5 @@
+set noparent
+@ctiller
+@dgquintas
+@markdroth
+
diff --git a/tools/dockerfile/OWNERS b/tools/dockerfile/OWNERS
new file mode 100644
index 0000000..20ee808
--- /dev/null
+++ b/tools/dockerfile/OWNERS
@@ -0,0 +1,3 @@
+@matt-kwong
+@jtattermusch
+
diff --git a/tools/internal_ci/helper_scripts/prepare_build_macos_rc b/tools/internal_ci/helper_scripts/prepare_build_macos_rc
index 7d2c522..57463e6 100644
--- a/tools/internal_ci/helper_scripts/prepare_build_macos_rc
+++ b/tools/internal_ci/helper_scripts/prepare_build_macos_rc
@@ -31,7 +31,7 @@
set +ex # rvm script is very verbose and exits with errorcode
source $HOME/.rvm/scripts/rvm
-set -e # rvm commands are very verbose
+set -e # rvm commands are very verbose
rvm install ruby-2.4
rvm osx-ssl-certs status all
rvm osx-ssl-certs update all
diff --git a/tools/internal_ci/linux/grpc_basictests_c_cpp_dbg.cfg b/tools/internal_ci/linux/grpc_basictests_c_cpp_dbg.cfg
new file mode 100644
index 0000000..ca547a0
--- /dev/null
+++ b/tools/internal_ci/linux/grpc_basictests_c_cpp_dbg.cfg
@@ -0,0 +1,29 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Config file for the internal CI (in protobuf text format)
+
+# Location of the continuous shell script in repository.
+build_file: "grpc/tools/internal_ci/linux/grpc_run_tests_matrix.sh"
+timeout_mins: 240
+action {
+ define_artifacts {
+ regex: "**/*sponge_log.xml"
+ }
+}
+
+env_vars {
+ key: "RUN_TESTS_FLAGS"
+ value: "-f basictests linux corelang dbg --inner_jobs 16 -j 1 --internal_ci --bq_result_table aggregate_results"
+}
diff --git a/tools/internal_ci/linux/grpc_basictests_c_cpp_opt.cfg b/tools/internal_ci/linux/grpc_basictests_c_cpp_opt.cfg
new file mode 100644
index 0000000..62f05ce
--- /dev/null
+++ b/tools/internal_ci/linux/grpc_basictests_c_cpp_opt.cfg
@@ -0,0 +1,29 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Config file for the internal CI (in protobuf text format)
+
+# Location of the continuous shell script in repository.
+build_file: "grpc/tools/internal_ci/linux/grpc_run_tests_matrix.sh"
+timeout_mins: 240
+action {
+ define_artifacts {
+ regex: "**/*sponge_log.xml"
+ }
+}
+
+env_vars {
+ key: "RUN_TESTS_FLAGS"
+ value: "-f basictests linux corelang opt --inner_jobs 16 -j 1 --internal_ci --bq_result_table aggregate_results"
+}
diff --git a/tools/internal_ci/linux/grpc_basictests_multilang.cfg b/tools/internal_ci/linux/grpc_basictests_multilang.cfg
new file mode 100644
index 0000000..0a1afa2
--- /dev/null
+++ b/tools/internal_ci/linux/grpc_basictests_multilang.cfg
@@ -0,0 +1,29 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Config file for the internal CI (in protobuf text format)
+
+# Location of the continuous shell script in repository.
+build_file: "grpc/tools/internal_ci/linux/grpc_run_tests_matrix.sh"
+timeout_mins: 240
+action {
+ define_artifacts {
+ regex: "**/*sponge_log.xml"
+ }
+}
+
+env_vars {
+ key: "RUN_TESTS_FLAGS"
+ value: "-f basictests linux multilang --inner_jobs 16 -j 1 --internal_ci --bq_result_table aggregate_results"
+}
diff --git a/tools/internal_ci/linux/grpc_build_artifacts.sh b/tools/internal_ci/linux/grpc_build_artifacts.sh
index 1f438a6..bc29014 100755
--- a/tools/internal_ci/linux/grpc_build_artifacts.sh
+++ b/tools/internal_ci/linux/grpc_build_artifacts.sh
@@ -22,6 +22,7 @@
# TODO(jtattermusch): install ruby on the internal_ci worker
gpg --keyserver hkp://keys.gnupg.net --recv-keys 409B6B1796C275462A1703113804BB82D39DC0E3
-curl -sSL https://get.rvm.io | bash -s stable --ruby
+# TODO(jtattermusch): grep works around https://github.com/rvm/rvm/issues/4068
+curl -sSL https://get.rvm.io | grep -v __rvm_print_headline | bash -s stable --ruby
tools/run_tests/task_runner.py -f artifact linux
diff --git a/tools/internal_ci/linux/grpc_interop_tocloud.cfg b/tools/internal_ci/linux/grpc_interop_tocloud.cfg
index 1f6421c..0c31b49 100644
--- a/tools/internal_ci/linux/grpc_interop_tocloud.cfg
+++ b/tools/internal_ci/linux/grpc_interop_tocloud.cfg
@@ -20,6 +20,6 @@
timeout_mins: 480
action {
define_artifacts {
- regex: "**/report.xml"
+ regex: "**/sponge_log.xml"
}
}
diff --git a/tools/internal_ci/linux/grpc_interop_tocloud.sh b/tools/internal_ci/linux/grpc_interop_tocloud.sh
index fe5c9a5..e3ba25a 100755
--- a/tools/internal_ci/linux/grpc_interop_tocloud.sh
+++ b/tools/internal_ci/linux/grpc_interop_tocloud.sh
@@ -23,4 +23,4 @@
source tools/internal_ci/helper_scripts/prepare_build_linux_rc
source tools/internal_ci/helper_scripts/prepare_build_interop_rc
-tools/run_tests/run_interop_tests.py -l all -s all --use_docker --http2_interop -t -j 12 $@
+tools/run_tests/run_interop_tests.py -l all -s all --use_docker --http2_interop --internal_ci -t -j 12 $@
diff --git a/tools/internal_ci/linux/grpc_interop_toprod.cfg b/tools/internal_ci/linux/grpc_interop_toprod.cfg
new file mode 100644
index 0000000..18978b8
--- /dev/null
+++ b/tools/internal_ci/linux/grpc_interop_toprod.cfg
@@ -0,0 +1,26 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Config file for the internal CI (in protobuf text format)
+
+# Location of the continuous shell script in repository.
+build_file: "grpc/tools/internal_ci/linux/grpc_interop_toprod.sh"
+# grpc_interop tests can take 6+ hours to complete.
+timeout_mins: 60
+action {
+ define_artifacts {
+ regex: "**/sponge_log.xml"
+ }
+}
+
diff --git a/tools/internal_ci/linux/grpc_interop_toprod.sh b/tools/internal_ci/linux/grpc_interop_toprod.sh
new file mode 100755
index 0000000..3d06185
--- /dev/null
+++ b/tools/internal_ci/linux/grpc_interop_toprod.sh
@@ -0,0 +1,32 @@
+#!/usr/bin/env bash
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+set -ex
+
+export LANG=en_US.UTF-8
+
+# Enter the gRPC repo root
+cd $(dirname $0)/../../..
+
+source tools/internal_ci/helper_scripts/prepare_build_linux_rc
+source tools/internal_ci/helper_scripts/prepare_build_interop_rc
+
+tools/run_tests/run_interop_tests.py \
+ -l all \
+ --cloud_to_prod \
+ --cloud_to_prod_auth \
+ --prod_servers default gateway_v4 \
+ --use_docker --internal_ci -t -j 12 $@
+
diff --git a/tools/jenkins/run_bazel_full.sh b/tools/jenkins/run_bazel_full.sh
index edde1bc..3436a8f 100755
--- a/tools/jenkins/run_bazel_full.sh
+++ b/tools/jenkins/run_bazel_full.sh
@@ -20,6 +20,4 @@
export DOCKERFILE_DIR=tools/dockerfile/test/bazel
export DOCKER_RUN_SCRIPT=tools/jenkins/run_bazel_full_in_docker.sh
-# Warn PR author if they make a change to the bazel directory
-tools/run_tests/python_utils/check_bazel_dir.py
exec tools/run_tests/dockerize/build_and_run_docker.sh
diff --git a/tools/mkowners/mkowners.py b/tools/mkowners/mkowners.py
new file mode 100755
index 0000000..d4a3b56
--- /dev/null
+++ b/tools/mkowners/mkowners.py
@@ -0,0 +1,175 @@
+#!/usr/bin/env python3
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import argparse
+import collections
+import operator
+import os
+import re
+import subprocess
+
+#
+# Find the root of the git tree
+#
+
+git_root = (subprocess
+ .check_output(['git', 'rev-parse', '--show-toplevel'])
+ .decode('utf-8')
+ .strip())
+
+#
+# Parse command line arguments
+#
+
+default_out = os.path.join(git_root, '.github', 'CODEOWNERS')
+
+argp = argparse.ArgumentParser('Generate .github/CODEOWNERS file')
+argp.add_argument('--out', '-o',
+ type=str,
+ default=default_out,
+ help='Output file (default %s)' % default_out)
+args = argp.parse_args()
+
+#
+# Walk git tree to locate all OWNERS files
+#
+
+owners_files = [os.path.join(root, 'OWNERS')
+ for root, dirs, files in os.walk(git_root)
+ if 'OWNERS' in files]
+
+#
+# Parse owners files
+#
+
+Owners = collections.namedtuple('Owners', 'parent directives dir')
+Directive = collections.namedtuple('Directive', 'who globs')
+
+def parse_owners(filename):
+ with open(filename) as f:
+ src = f.read().splitlines()
+ parent = True
+ directives = []
+ for line in src:
+ line = line.strip()
+ # line := directive | comment
+ if not line: continue
+ if line[0] == '#': continue
+ # it's a directive
+ directive = None
+ if line == 'set noparent':
+ parent = False
+ elif line == '*':
+ directive = Directive(who='*', globs=[])
+ elif ' ' in line:
+ (who, globs) = line.split(' ', 1)
+ globs_list = [glob
+ for glob in globs.split(' ')
+ if glob]
+ directive = Directive(who=who, globs=globs_list)
+ else:
+ directive = Directive(who=line, globs=[])
+ if directive:
+ directives.append(directive)
+ return Owners(parent=parent,
+ directives=directives,
+ dir=os.path.relpath(os.path.dirname(filename), git_root))
+
+owners_data = sorted([parse_owners(filename)
+ for filename in owners_files],
+ key=operator.attrgetter('dir'))
+
+#
+# Modify owners so that parented OWNERS files point to the actual
+# Owners tuple with their parent field
+#
+
+new_owners_data = []
+for owners in owners_data:
+ if owners.parent == True:
+ best_parent = None
+ best_parent_score = None
+ for possible_parent in owners_data:
+ if possible_parent is owners: continue
+ rel = os.path.relpath(owners.dir, possible_parent.dir)
+ # '..' ==> we had to walk up from possible_parent to get to owners
+ # ==> not a parent
+ if '..' in rel: continue
+ depth = len(rel.split(os.sep))
+ if not best_parent or depth < best_parent_score:
+ best_parent = possible_parent
+ best_parent_score = depth
+ if best_parent:
+ owners = owners._replace(parent = best_parent.dir)
+ else:
+ owners = owners._replace(parent = None)
+ new_owners_data.append(owners)
+owners_data = new_owners_data
+
+#
+# In bottom to top order, process owners data structures to build up
+# a CODEOWNERS file for GitHub
+#
+
+def glob_intersect(g1, g2):
+ if not g2:
+ return all(c == '*' for c in g1)
+ if not g1:
+ return all(c == '*' for c in g2)
+ c1, *t1 = g1
+ c2, *t2 = g2
+ if c1 == '*':
+ return glob_intersect(g1, t2) or glob_intersect(t1, g2)
+ if c2 == '*':
+ return glob_intersect(t1, g2) or glob_intersect(g1, t2)
+ return c1 == c2 and glob_intersect(t1, t2)
+
+def add_parent_to_globs(parent, globs):
+ if not parent: return
+ for owners in owners_data:
+ if owners.dir == parent:
+ for directive in owners.directives:
+ for dglob in directive.globs or ['**']:
+ for gglob, glob in globs.items():
+ if glob_intersect(dglob, gglob):
+ if directive.who not in glob:
+ glob.append(directive.who)
+ add_parent_to_globs(owners.parent, globs)
+ return
+ assert(False)
+
+todo = owners_data.copy()
+done = set()
+with open(args.out, 'w') as out:
+ out.write('# Auto-generated by the tools/mkowners/mkowners.py tool\n')
+ out.write('# Uses OWNERS files in different modules throughout the\n')
+ out.write('# repository as the source of truth for module ownership.\n')
+ while todo:
+ head, *todo = todo
+ if head.parent and not head.parent in done:
+ todo.append(head)
+ continue
+ globs = collections.OrderedDict()
+ for directive in head.directives:
+ for glob in directive.globs or ['**']:
+ if glob not in globs:
+ globs[glob] = []
+ globs[glob].append(directive.who)
+ add_parent_to_globs(head.parent, globs)
+ for glob, owners in globs.items():
+ out.write('%s %s\n' % (
+ os.path.join(head.dir, glob) if head.dir != '.' else glob,
+ ' '.join(owners)))
+ done.add(head.dir)
diff --git a/tools/run_tests/OWNERS b/tools/run_tests/OWNERS
new file mode 100644
index 0000000..8f6de53
--- /dev/null
+++ b/tools/run_tests/OWNERS
@@ -0,0 +1,4 @@
+@matt-kwong
+@jtattermusch
+@nicolasnoble
+
diff --git a/tools/run_tests/artifacts/artifact_targets.py b/tools/run_tests/artifacts/artifact_targets.py
index 9373db2..841bbdf 100644
--- a/tools/run_tests/artifacts/artifact_targets.py
+++ b/tools/run_tests/artifacts/artifact_targets.py
@@ -180,7 +180,8 @@
# We are using a custom workspace instead.
return create_jobspec(self.name,
['tools/run_tests/artifacts/build_artifact_ruby.sh'],
- use_workspace=True)
+ use_workspace=True,
+ timeout_seconds=45*60)
class CSharpExtArtifact:
diff --git a/tools/run_tests/artifacts/build_artifact_node.bat b/tools/run_tests/artifacts/build_artifact_node.bat
index cca6563..7f1d1aa 100644
--- a/tools/run_tests/artifacts/build_artifact_node.bat
+++ b/tools/run_tests/artifacts/build_artifact_node.bat
@@ -28,15 +28,15 @@
call .\node_modules\.bin\node-pre-gyp.cmd configure build --target=%%v --target_arch=%1
@rem Try again after removing openssl headers
- rmdir "%HOMEDRIVE%%HOMEPATH%\.node-gyp\%%v\include\node\openssl" /S /Q
- rmdir "%HOMEDRIVE%%HOMEPATH%\.node-gyp\iojs-%%v\include\node\openssl" /S /Q
+ rmdir "%USERPROFILE%\.node-gyp\%%v\include\node\openssl" /S /Q
+ rmdir "%USERPROFILE%\.node-gyp\iojs-%%v\include\node\openssl" /S /Q
call .\node_modules\.bin\node-pre-gyp.cmd build package --target=%%v --target_arch=%1 || goto :error
xcopy /Y /I /S build\stage\* %ARTIFACTS_OUT%\ || goto :error
)
for %%v in (%electron_versions%) do (
- cmd /V /C "set "HOME=%HOMEDRIVE%%HOMEPATH%\electron-gyp" && call .\node_modules\.bin\node-pre-gyp.cmd configure rebuild package --runtime=electron --target=%%v --target_arch=%1 --disturl=https://atom.io/download/electron" || goto :error
+ cmd /V /C "set "HOME=%USERPROFILE%\electron-gyp" && call .\node_modules\.bin\node-pre-gyp.cmd configure rebuild package --runtime=electron --target=%%v --target_arch=%1 --disturl=https://atom.io/download/electron" || goto :error
xcopy /Y /I /S build\stage\* %ARTIFACTS_OUT%\ || goto :error
)
diff --git a/tools/run_tests/dockerize/build_interop_image.sh b/tools/run_tests/dockerize/build_interop_image.sh
index fa63185..61105b0 100755
--- a/tools/run_tests/dockerize/build_interop_image.sh
+++ b/tools/run_tests/dockerize/build_interop_image.sh
@@ -67,8 +67,14 @@
BASE_IMAGE=${BASE_NAME}_base:`md5 -r tools/dockerfile/interoptest/$BASE_NAME/Dockerfile | cut -f1 -d\ `
fi
-# Make sure base docker image has been built. Should be instantaneous if so.
-docker build -t $BASE_IMAGE --force-rm=true tools/dockerfile/interoptest/$BASE_NAME || exit $?
+if [ "$DOCKERHUB_ORGANIZATION" != "" ]
+then
+ DOCKER_IMAGE_NAME=$DOCKERHUB_ORGANIZATION/$BASE_IMAGE
+ docker pull $DOCKER_IMAGE_NAME
+else
+ # Make sure docker image has been built. Should be instantaneous if so.
+ docker build -t $BASE_IMAGE --force-rm=true tools/dockerfile/interoptest/$BASE_NAME || exit $?
+fi
# Create a local branch so the child Docker script won't complain
git branch -f jenkins-docker
diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json
index 477a258..02562bf 100644
--- a/tools/run_tests/generated/sources_and_headers.json
+++ b/tools/run_tests/generated/sources_and_headers.json
@@ -3646,6 +3646,7 @@
"gpr_test_util",
"grpc",
"grpc++",
+ "grpc++_test_config",
"grpc++_test_util",
"grpc_test_util",
"qps"
@@ -3815,6 +3816,7 @@
"gpr_test_util",
"grpc",
"grpc++",
+ "grpc++_test_config",
"grpc++_test_util",
"grpc_test_util",
"qps"
diff --git a/tools/run_tests/python_utils/check_bazel_dir.py b/tools/run_tests/python_utils/check_bazel_dir.py
deleted file mode 100755
index 1daf6ee..0000000
--- a/tools/run_tests/python_utils/check_bazel_dir.py
+++ /dev/null
@@ -1,41 +0,0 @@
-#!/usr/bin/env python
-# Copyright 2017 gRPC authors.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""This sends out a warning if any changes to the bazel dir are made."""
-
-from __future__ import print_function
-from subprocess import check_output
-
-import comment_on_pr
-import os
-
-_WARNING_MESSAGE = 'WARNING: You are making changes in the Bazel subdirectory. ' \
- 'Please get explicit approval from @nicolasnoble before merging.'
-
-
-def _get_changed_files(base_branch):
- """
- Get list of changed files between current branch and base of target merge branch
- """
- # Get file changes between branch and merge-base of specified branch
- base_commit = check_output(["git", "merge-base", base_branch, "HEAD"]).rstrip()
- return check_output(["git", "diff", base_commit, "--name-only"]).splitlines()
-
-
-# ghprbTargetBranch environment variable only available during a Jenkins PR tests
-if 'ghprbTargetBranch' in os.environ:
- changed_files = _get_changed_files('origin/%s' % os.environ['ghprbTargetBranch'])
- if any(file.startswith('bazel/') for file in changed_files):
- comment_on_pr.comment_on_pr(_WARNING_MESSAGE)
diff --git a/tools/run_tests/python_utils/jobset.py b/tools/run_tests/python_utils/jobset.py
index 044c6f3..08d652a 100755
--- a/tools/run_tests/python_utils/jobset.py
+++ b/tools/run_tests/python_utils/jobset.py
@@ -367,9 +367,10 @@
"""Manages one run of jobs."""
def __init__(self, check_cancelled, maxjobs, newline_on_success, travis,
- stop_on_failure, add_env, quiet_success, max_time):
+ stop_on_failure, add_env, quiet_success, max_time, clear_alarms):
self._running = set()
self._check_cancelled = check_cancelled
+ self._clear_alarms = clear_alarms
self._cancelled = False
self._failures = 0
self._completed = 0
@@ -473,7 +474,10 @@
while self._running:
if self.cancelled(): pass # poll cancellation
self.reap()
- if platform_string() != 'windows':
+ # Clear the alarms when finished to avoid a race condition causing job
+ # failures. Don't do this when running multi-VM tests because clearing
+ # the alarms causes the test to stall
+ if platform_string() != 'windows' and self._clear_alarms:
signal.alarm(0)
return not self.cancelled() and self._failures == 0
@@ -503,7 +507,8 @@
add_env={},
skip_jobs=False,
quiet_success=False,
- max_time=-1):
+ max_time=-1,
+ clear_alarms=True):
if skip_jobs:
resultset = {}
skipped_job_result = JobResult()
@@ -515,7 +520,7 @@
js = Jobset(check_cancelled,
maxjobs if maxjobs is not None else _DEFAULT_MAX_JOBS,
newline_on_success, travis, stop_on_failure, add_env,
- quiet_success, max_time)
+ quiet_success, max_time, clear_alarms)
for cmdline, remaining in tag_remaining(cmdlines):
if not js.start(cmdline):
break
diff --git a/tools/run_tests/run_interop_tests.py b/tools/run_tests/run_interop_tests.py
index 80062aa..1e702a8 100755
--- a/tools/run_tests/run_interop_tests.py
+++ b/tools/run_tests/run_interop_tests.py
@@ -63,6 +63,13 @@
# see https://github.com/grpc/grpc/issues/9779
_SKIP_DATA_FRAME_PADDING = ['data_frame_padding']
+# report suffix is important for reports to get picked up by internal CI
+_INTERNAL_CL_XML_REPORT = 'sponge_log.xml'
+
+# report suffix is important for reports to get picked up by internal CI
+_XML_REPORT = 'report.xml'
+
+
class CXXLanguage:
def __init__(self):
@@ -943,7 +950,12 @@
action='store_const',
const=True,
help='Whether to use secure channel.')
-
+argp.add_argument('--internal_ci',
+ default=False,
+ action='store_const',
+ const=True,
+ help=('Put reports into subdirectories to improve '
+ 'presentation of results by Internal CI.'))
args = argp.parse_args()
servers = set(s for s in itertools.chain.from_iterable(_SERVERS
@@ -1201,7 +1213,10 @@
write_cmdlog_maybe(server_manual_cmd_log, 'interop_server_cmds.sh')
write_cmdlog_maybe(client_manual_cmd_log, 'interop_client_cmds.sh')
- report_utils.render_junit_xml_report(resultset, 'report.xml')
+ xml_report_name = _XML_REPORT
+ if args.internal_ci:
+ xml_report_name = _INTERNAL_CL_XML_REPORT
+ report_utils.render_junit_xml_report(resultset, xml_report_name)
for name, job in resultset.items():
if "http2" in name:
diff --git a/tools/run_tests/run_performance_tests.py b/tools/run_tests/run_performance_tests.py
index ad1fb05..78d1079 100755
--- a/tools/run_tests/run_performance_tests.py
+++ b/tools/run_tests/run_performance_tests.py
@@ -183,7 +183,7 @@
jobset.message('START', 'Archiving local repository.', do_newline=True)
num_failures, _ = jobset.run(
- [archive_job], newline_on_success=True, maxjobs=1)
+ [archive_job], newline_on_success=True, maxjobs=1, clear_alarms=False)
if num_failures == 0:
jobset.message('SUCCESS',
'Archive with local repository created successfully.',
@@ -215,7 +215,7 @@
timeout_seconds=prepare_timeout))
jobset.message('START', 'Preparing hosts.', do_newline=True)
num_failures, _ = jobset.run(
- prepare_jobs, newline_on_success=True, maxjobs=10)
+ prepare_jobs, newline_on_success=True, maxjobs=10, clear_alarms=False)
if num_failures == 0:
jobset.message('SUCCESS',
'Prepare step completed successfully.',
@@ -248,7 +248,7 @@
timeout_seconds=build_timeout))
jobset.message('START', 'Building.', do_newline=True)
num_failures, _ = jobset.run(
- build_jobs, newline_on_success=True, maxjobs=10)
+ build_jobs, newline_on_success=True, maxjobs=10, clear_alarms=False)
if num_failures == 0:
jobset.message('SUCCESS',
'Built successfully.',
@@ -414,7 +414,7 @@
perf_report_jobs.append(perf_report_processor_job(host, perf_base_name, output_filename))
jobset.message('START', 'Collecting perf reports from qps workers', do_newline=True)
- failures, _ = jobset.run(perf_report_jobs, newline_on_success=True, maxjobs=1)
+ failures, _ = jobset.run(perf_report_jobs, newline_on_success=True, maxjobs=1, clear_alarms=False)
jobset.message('END', 'Collecting perf reports from qps workers', do_newline=True)
return failures
@@ -556,7 +556,7 @@
jobs = [scenario.jobspec]
if scenario.workers:
jobs.append(create_quit_jobspec(scenario.workers, remote_host=args.remote_driver_host))
- scenario_failures, resultset = jobset.run(jobs, newline_on_success=True, maxjobs=1)
+ scenario_failures, resultset = jobset.run(jobs, newline_on_success=True, maxjobs=1, clear_alarms=False)
total_scenario_failures += scenario_failures
merged_resultset = dict(itertools.chain(six.iteritems(merged_resultset),
six.iteritems(resultset)))
diff --git a/tools/run_tests/run_tests_matrix.py b/tools/run_tests/run_tests_matrix.py
index 635d87f..6fe1609 100755
--- a/tools/run_tests/run_tests_matrix.py
+++ b/tools/run_tests/run_tests_matrix.py
@@ -126,23 +126,37 @@
test_jobs += _generate_jobs(languages=['sanity', 'php7'],
configs=['dbg', 'opt'],
platforms=['linux'],
- labels=['basictests'],
+ labels=['basictests', 'multilang'],
extra_args=extra_args,
inner_jobs=inner_jobs)
# supported on all platforms.
- test_jobs += _generate_jobs(languages=['c', 'csharp', 'node', 'python'],
+ test_jobs += _generate_jobs(languages=['c'],
configs=['dbg', 'opt'],
platforms=['linux', 'macos', 'windows'],
- labels=['basictests'],
+ labels=['basictests', 'corelang'],
+ extra_args=extra_args,
+ inner_jobs=inner_jobs)
+
+ test_jobs += _generate_jobs(languages=['csharp', 'node', 'python'],
+ configs=['dbg', 'opt'],
+ platforms=['linux', 'macos', 'windows'],
+ labels=['basictests', 'multilang'],
extra_args=extra_args,
inner_jobs=inner_jobs)
# supported on linux and mac.
- test_jobs += _generate_jobs(languages=['c++', 'ruby', 'php'],
+ test_jobs += _generate_jobs(languages=['c++'],
configs=['dbg', 'opt'],
platforms=['linux', 'macos'],
- labels=['basictests'],
+ labels=['basictests', 'corelang'],
+ extra_args=extra_args,
+ inner_jobs=inner_jobs)
+
+ test_jobs += _generate_jobs(languages=['ruby', 'php'],
+ configs=['dbg', 'opt'],
+ platforms=['linux', 'macos'],
+ labels=['basictests', 'multilang'],
extra_args=extra_args,
inner_jobs=inner_jobs)
@@ -150,7 +164,7 @@
test_jobs += _generate_jobs(languages=['objc'],
configs=['dbg', 'opt'],
platforms=['macos'],
- labels=['basictests'],
+ labels=['basictests', 'multilang'],
extra_args=extra_args,
inner_jobs=inner_jobs)
@@ -158,13 +172,13 @@
test_jobs += _generate_jobs(languages=['c'],
configs=['msan', 'asan', 'tsan', 'ubsan'],
platforms=['linux'],
- labels=['sanitizers'],
+ labels=['sanitizers', 'corelang'],
extra_args=extra_args,
inner_jobs=inner_jobs)
test_jobs += _generate_jobs(languages=['c++'],
configs=['asan', 'tsan'],
platforms=['linux'],
- labels=['sanitizers'],
+ labels=['sanitizers', 'corelang'],
extra_args=extra_args,
inner_jobs=inner_jobs)
@@ -179,7 +193,7 @@
platforms=['linux'],
arch='x86',
compiler='default',
- labels=['portability'],
+ labels=['portability', 'corelang'],
extra_args=extra_args,
inner_jobs=inner_jobs)
@@ -191,7 +205,7 @@
platforms=['linux'],
arch='x64',
compiler=compiler,
- labels=['portability'],
+ labels=['portability', 'corelang'],
extra_args=extra_args,
inner_jobs=inner_jobs)
@@ -203,14 +217,14 @@
platforms=['windows'],
arch=arch,
compiler=compiler,
- labels=['portability'],
+ labels=['portability', 'corelang'],
extra_args=extra_args,
inner_jobs=inner_jobs)
# C and C++ with the c-ares DNS resolver on Linux
test_jobs += _generate_jobs(languages=['c', 'c++'],
configs=['dbg'], platforms=['linux'],
- labels=['portability'],
+ labels=['portability', 'corelang'],
extra_args=extra_args,
extra_envs={'GRPC_DNS_RESOLVER': 'ares'})
@@ -218,7 +232,7 @@
# C with the c-ares DNS resolver on Windonws
# test_jobs += _generate_jobs(languages=['c'],
# configs=['dbg'], platforms=['windows'],
- # labels=['portability'],
+ # labels=['portability', 'corelang'],
# extra_args=extra_args,
# extra_envs={'GRPC_DNS_RESOLVER': 'ares'})
@@ -230,7 +244,7 @@
platforms=['linux', 'windows'],
arch='default',
compiler='cmake',
- labels=['portability'],
+ labels=['portability', 'corelang'],
extra_args=extra_args + ['--build_only'],
inner_jobs=inner_jobs)
@@ -239,7 +253,7 @@
platforms=['linux'],
arch='default',
compiler='python_alpine',
- labels=['portability'],
+ labels=['portability', 'multilang'],
extra_args=extra_args,
inner_jobs=inner_jobs)
@@ -248,7 +262,7 @@
platforms=['linux'],
arch='default',
compiler='coreclr',
- labels=['portability'],
+ labels=['portability', 'multilang'],
extra_args=extra_args,
inner_jobs=inner_jobs)
@@ -256,7 +270,7 @@
configs=['dbg'],
platforms=['linux'],
iomgr_platform='uv',
- labels=['portability'],
+ labels=['portability', 'corelang'],
extra_args=extra_args,
inner_jobs=inner_jobs)
@@ -265,7 +279,7 @@
platforms=['linux'],
arch='default',
compiler='electron1.6',
- labels=['portability'],
+ labels=['portability', 'multilang'],
extra_args=extra_args,
inner_jobs=inner_jobs)
@@ -274,7 +288,7 @@
platforms=['linux'],
arch='default',
compiler='node4',
- labels=['portability'],
+ labels=['portability', 'multilang'],
extra_args=extra_args,
inner_jobs=inner_jobs)
@@ -283,7 +297,7 @@
platforms=['linux'],
arch='default',
compiler='node6',
- labels=['portability'],
+ labels=['portability', 'multilang'],
extra_args=extra_args,
inner_jobs=inner_jobs)
@@ -292,7 +306,7 @@
platforms=['linux'],
arch='default',
compiler='node7',
- labels=['portability'],
+ labels=['portability', 'multilang'],
extra_args=extra_args,
inner_jobs=inner_jobs)
diff --git a/tools/run_tests/sanity/check_owners.sh b/tools/run_tests/sanity/check_owners.sh
new file mode 100755
index 0000000..b681fed
--- /dev/null
+++ b/tools/run_tests/sanity/check_owners.sh
@@ -0,0 +1,29 @@
+#!/bin/sh
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+set -e
+
+export TEST=true
+
+cd `dirname $0`/../../..
+
+owners=.github/CODEOWNERS
+want_owners=`mktemp /tmp/submXXXXXX`
+
+tools/mkowners/mkowners.py -o $want_owners
+diff -u $owners $want_owners
+
+rm $want_owners
diff --git a/tools/run_tests/sanity/sanity_tests.yaml b/tools/run_tests/sanity/sanity_tests.yaml
index 445f53e..65cb2fe 100644
--- a/tools/run_tests/sanity/sanity_tests.yaml
+++ b/tools/run_tests/sanity/sanity_tests.yaml
@@ -1,5 +1,6 @@
# a set of tests that are run in parallel for sanity tests
- script: tools/run_tests/sanity/check_cache_mk.sh
+- script: tools/run_tests/sanity/check_owners.sh
- script: tools/run_tests/sanity/check_sources_and_headers.py
- script: tools/run_tests/sanity/check_submodules.sh
- script: tools/run_tests/sanity/check_test_filtering.py