Merge pull request #12789 from ctiller/epexinf
EPOLLEXCLUSIVE polling engine (redux)
diff --git a/BUILD b/BUILD
index 7a05172..99411f9 100644
--- a/BUILD
+++ b/BUILD
@@ -1261,6 +1261,7 @@
"src/core/ext/transport/chttp2/transport/bin_encoder.h",
"src/core/ext/transport/chttp2/transport/chttp2_transport.h",
"src/core/ext/transport/chttp2/transport/frame.h",
+ "src/core/ext/transport/chttp2/transport/flow_control.h",
"src/core/ext/transport/chttp2/transport/frame_data.h",
"src/core/ext/transport/chttp2/transport/frame_goaway.h",
"src/core/ext/transport/chttp2/transport/frame_ping.h",
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 8e65310..2c978de 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -543,7 +543,6 @@
add_dependencies(buildtests_c timer_list_test)
add_dependencies(buildtests_c transport_connectivity_state_test)
add_dependencies(buildtests_c transport_metadata_test)
-add_dependencies(buildtests_c transport_pid_controller_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_c transport_security_test)
endif()
@@ -560,7 +559,6 @@
add_dependencies(buildtests_c head_of_line_blocking_bad_client_test)
add_dependencies(buildtests_c headers_bad_client_test)
add_dependencies(buildtests_c initial_settings_frame_bad_client_test)
-add_dependencies(buildtests_c large_metadata_bad_client_test)
add_dependencies(buildtests_c server_registered_method_bad_client_test)
add_dependencies(buildtests_c simple_request_bad_client_test)
add_dependencies(buildtests_c unknown_frame_bad_client_test)
@@ -758,6 +756,7 @@
add_dependencies(buildtests_cxx stress_test)
add_dependencies(buildtests_cxx thread_manager_test)
add_dependencies(buildtests_cxx thread_stress_test)
+add_dependencies(buildtests_cxx transport_pid_controller_test)
add_dependencies(buildtests_cxx vector_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx writes_per_rpc_test)
@@ -9147,36 +9146,6 @@
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
-
-add_executable(transport_pid_controller_test
- test/core/transport/pid_controller_test.c
-)
-
-
-target_include_directories(transport_pid_controller_test
- PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
- PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
- PRIVATE ${BORINGSSL_ROOT_DIR}/include
- PRIVATE ${PROTOBUF_ROOT_DIR}/src
- PRIVATE ${BENCHMARK_ROOT_DIR}/include
- PRIVATE ${ZLIB_ROOT_DIR}
- PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/zlib
- PRIVATE ${CARES_INCLUDE_DIR}
- PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/cares/cares
- PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/gflags/include
- PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/third_party/abseil-cpp
-)
-
-target_link_libraries(transport_pid_controller_test
- ${_gRPC_ALLTARGETS_LIBRARIES}
- grpc_test_util
- grpc
- gpr_test_util
- gpr
-)
-
-endif (gRPC_BUILD_TESTS)
-if (gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_executable(transport_security_test
@@ -12967,6 +12936,47 @@
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
+add_executable(transport_pid_controller_test
+ test/core/transport/pid_controller_test.cc
+ third_party/googletest/googletest/src/gtest-all.cc
+ third_party/googletest/googlemock/src/gmock-all.cc
+)
+
+
+target_include_directories(transport_pid_controller_test
+ PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
+ PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
+ PRIVATE ${BORINGSSL_ROOT_DIR}/include
+ PRIVATE ${PROTOBUF_ROOT_DIR}/src
+ PRIVATE ${BENCHMARK_ROOT_DIR}/include
+ PRIVATE ${ZLIB_ROOT_DIR}
+ PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/zlib
+ PRIVATE ${CARES_INCLUDE_DIR}
+ PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/cares/cares
+ PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/gflags/include
+ PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/third_party/abseil-cpp
+ PRIVATE third_party/googletest/googletest/include
+ PRIVATE third_party/googletest/googletest
+ PRIVATE third_party/googletest/googlemock/include
+ PRIVATE third_party/googletest/googlemock
+ PRIVATE ${_gRPC_PROTO_GENS_DIR}
+)
+
+target_link_libraries(transport_pid_controller_test
+ ${_gRPC_PROTOBUF_LIBRARIES}
+ ${_gRPC_ALLTARGETS_LIBRARIES}
+ grpc++_test_util
+ grpc++
+ grpc_test_util
+ grpc
+ gpr_test_util
+ gpr
+ ${_gRPC_GFLAGS_LIBRARIES}
+)
+
+endif (gRPC_BUILD_TESTS)
+if (gRPC_BUILD_TESTS)
+
add_executable(vector_test
test/core/support/vector_test.cc
third_party/googletest/googletest/src/gtest-all.cc
@@ -13238,38 +13248,6 @@
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
-add_executable(large_metadata_bad_client_test
- test/core/bad_client/tests/large_metadata.c
-)
-
-
-target_include_directories(large_metadata_bad_client_test
- PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
- PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
- PRIVATE ${BORINGSSL_ROOT_DIR}/include
- PRIVATE ${PROTOBUF_ROOT_DIR}/src
- PRIVATE ${BENCHMARK_ROOT_DIR}/include
- PRIVATE ${ZLIB_ROOT_DIR}
- PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/zlib
- PRIVATE ${CARES_INCLUDE_DIR}
- PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/cares/cares
- PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/gflags/include
- PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/third_party/abseil-cpp
-)
-
-target_link_libraries(large_metadata_bad_client_test
- ${_gRPC_SSL_LIBRARIES}
- ${_gRPC_ALLTARGETS_LIBRARIES}
- bad_client_test
- grpc_test_util_unsecure
- grpc_unsecure
- gpr_test_util
- gpr
-)
-
-endif (gRPC_BUILD_TESTS)
-if (gRPC_BUILD_TESTS)
-
add_executable(server_registered_method_bad_client_test
test/core/bad_client/tests/server_registered_method.c
)
diff --git a/Makefile b/Makefile
index cab0a95..53fe911 100644
--- a/Makefile
+++ b/Makefile
@@ -1091,7 +1091,6 @@
timer_list_test: $(BINDIR)/$(CONFIG)/timer_list_test
transport_connectivity_state_test: $(BINDIR)/$(CONFIG)/transport_connectivity_state_test
transport_metadata_test: $(BINDIR)/$(CONFIG)/transport_metadata_test
-transport_pid_controller_test: $(BINDIR)/$(CONFIG)/transport_pid_controller_test
transport_security_test: $(BINDIR)/$(CONFIG)/transport_security_test
udp_server_test: $(BINDIR)/$(CONFIG)/udp_server_test
uri_fuzzer_test: $(BINDIR)/$(CONFIG)/uri_fuzzer_test
@@ -1180,6 +1179,7 @@
stress_test: $(BINDIR)/$(CONFIG)/stress_test
thread_manager_test: $(BINDIR)/$(CONFIG)/thread_manager_test
thread_stress_test: $(BINDIR)/$(CONFIG)/thread_stress_test
+transport_pid_controller_test: $(BINDIR)/$(CONFIG)/transport_pid_controller_test
vector_test: $(BINDIR)/$(CONFIG)/vector_test
writes_per_rpc_test: $(BINDIR)/$(CONFIG)/writes_per_rpc_test
public_headers_must_be_c89: $(BINDIR)/$(CONFIG)/public_headers_must_be_c89
@@ -1226,7 +1226,6 @@
head_of_line_blocking_bad_client_test: $(BINDIR)/$(CONFIG)/head_of_line_blocking_bad_client_test
headers_bad_client_test: $(BINDIR)/$(CONFIG)/headers_bad_client_test
initial_settings_frame_bad_client_test: $(BINDIR)/$(CONFIG)/initial_settings_frame_bad_client_test
-large_metadata_bad_client_test: $(BINDIR)/$(CONFIG)/large_metadata_bad_client_test
server_registered_method_bad_client_test: $(BINDIR)/$(CONFIG)/server_registered_method_bad_client_test
simple_request_bad_client_test: $(BINDIR)/$(CONFIG)/simple_request_bad_client_test
unknown_frame_bad_client_test: $(BINDIR)/$(CONFIG)/unknown_frame_bad_client_test
@@ -1473,7 +1472,6 @@
$(BINDIR)/$(CONFIG)/timer_list_test \
$(BINDIR)/$(CONFIG)/transport_connectivity_state_test \
$(BINDIR)/$(CONFIG)/transport_metadata_test \
- $(BINDIR)/$(CONFIG)/transport_pid_controller_test \
$(BINDIR)/$(CONFIG)/transport_security_test \
$(BINDIR)/$(CONFIG)/udp_server_test \
$(BINDIR)/$(CONFIG)/uri_parser_test \
@@ -1484,7 +1482,6 @@
$(BINDIR)/$(CONFIG)/head_of_line_blocking_bad_client_test \
$(BINDIR)/$(CONFIG)/headers_bad_client_test \
$(BINDIR)/$(CONFIG)/initial_settings_frame_bad_client_test \
- $(BINDIR)/$(CONFIG)/large_metadata_bad_client_test \
$(BINDIR)/$(CONFIG)/server_registered_method_bad_client_test \
$(BINDIR)/$(CONFIG)/simple_request_bad_client_test \
$(BINDIR)/$(CONFIG)/unknown_frame_bad_client_test \
@@ -1618,6 +1615,7 @@
$(BINDIR)/$(CONFIG)/stress_test \
$(BINDIR)/$(CONFIG)/thread_manager_test \
$(BINDIR)/$(CONFIG)/thread_stress_test \
+ $(BINDIR)/$(CONFIG)/transport_pid_controller_test \
$(BINDIR)/$(CONFIG)/vector_test \
$(BINDIR)/$(CONFIG)/writes_per_rpc_test \
$(BINDIR)/$(CONFIG)/boringssl_aes_test \
@@ -1741,6 +1739,7 @@
$(BINDIR)/$(CONFIG)/stress_test \
$(BINDIR)/$(CONFIG)/thread_manager_test \
$(BINDIR)/$(CONFIG)/thread_stress_test \
+ $(BINDIR)/$(CONFIG)/transport_pid_controller_test \
$(BINDIR)/$(CONFIG)/vector_test \
$(BINDIR)/$(CONFIG)/writes_per_rpc_test \
$(BINDIR)/$(CONFIG)/resolver_component_test_unsecure \
@@ -1994,8 +1993,6 @@
$(Q) $(BINDIR)/$(CONFIG)/transport_connectivity_state_test || ( echo test transport_connectivity_state_test failed ; exit 1 )
$(E) "[RUN] Testing transport_metadata_test"
$(Q) $(BINDIR)/$(CONFIG)/transport_metadata_test || ( echo test transport_metadata_test failed ; exit 1 )
- $(E) "[RUN] Testing transport_pid_controller_test"
- $(Q) $(BINDIR)/$(CONFIG)/transport_pid_controller_test || ( echo test transport_pid_controller_test failed ; exit 1 )
$(E) "[RUN] Testing transport_security_test"
$(Q) $(BINDIR)/$(CONFIG)/transport_security_test || ( echo test transport_security_test failed ; exit 1 )
$(E) "[RUN] Testing udp_server_test"
@@ -2016,8 +2013,6 @@
$(Q) $(BINDIR)/$(CONFIG)/headers_bad_client_test || ( echo test headers_bad_client_test failed ; exit 1 )
$(E) "[RUN] Testing initial_settings_frame_bad_client_test"
$(Q) $(BINDIR)/$(CONFIG)/initial_settings_frame_bad_client_test || ( echo test initial_settings_frame_bad_client_test failed ; exit 1 )
- $(E) "[RUN] Testing large_metadata_bad_client_test"
- $(Q) $(BINDIR)/$(CONFIG)/large_metadata_bad_client_test || ( echo test large_metadata_bad_client_test failed ; exit 1 )
$(E) "[RUN] Testing server_registered_method_bad_client_test"
$(Q) $(BINDIR)/$(CONFIG)/server_registered_method_bad_client_test || ( echo test server_registered_method_bad_client_test failed ; exit 1 )
$(E) "[RUN] Testing simple_request_bad_client_test"
@@ -2158,6 +2153,8 @@
$(Q) $(BINDIR)/$(CONFIG)/thread_manager_test || ( echo test thread_manager_test failed ; exit 1 )
$(E) "[RUN] Testing thread_stress_test"
$(Q) $(BINDIR)/$(CONFIG)/thread_stress_test || ( echo test thread_stress_test failed ; exit 1 )
+ $(E) "[RUN] Testing transport_pid_controller_test"
+ $(Q) $(BINDIR)/$(CONFIG)/transport_pid_controller_test || ( echo test transport_pid_controller_test failed ; exit 1 )
$(E) "[RUN] Testing vector_test"
$(Q) $(BINDIR)/$(CONFIG)/vector_test || ( echo test vector_test failed ; exit 1 )
$(E) "[RUN] Testing writes_per_rpc_test"
@@ -13424,38 +13421,6 @@
endif
-TRANSPORT_PID_CONTROLLER_TEST_SRC = \
- test/core/transport/pid_controller_test.c \
-
-TRANSPORT_PID_CONTROLLER_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(TRANSPORT_PID_CONTROLLER_TEST_SRC))))
-ifeq ($(NO_SECURE),true)
-
-# You can't build secure targets if you don't have OpenSSL.
-
-$(BINDIR)/$(CONFIG)/transport_pid_controller_test: openssl_dep_error
-
-else
-
-
-
-$(BINDIR)/$(CONFIG)/transport_pid_controller_test: $(TRANSPORT_PID_CONTROLLER_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
- $(E) "[LD] Linking $@"
- $(Q) mkdir -p `dirname $@`
- $(Q) $(LD) $(LDFLAGS) $(TRANSPORT_PID_CONTROLLER_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/transport_pid_controller_test
-
-endif
-
-$(OBJDIR)/$(CONFIG)/test/core/transport/pid_controller_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
-
-deps_transport_pid_controller_test: $(TRANSPORT_PID_CONTROLLER_TEST_OBJS:.o=.dep)
-
-ifneq ($(NO_SECURE),true)
-ifneq ($(NO_DEPS),true)
--include $(TRANSPORT_PID_CONTROLLER_TEST_OBJS:.o=.dep)
-endif
-endif
-
-
TRANSPORT_SECURITY_TEST_SRC = \
test/core/tsi/transport_security_test.c \
@@ -17206,6 +17171,49 @@
endif
+TRANSPORT_PID_CONTROLLER_TEST_SRC = \
+ test/core/transport/pid_controller_test.cc \
+
+TRANSPORT_PID_CONTROLLER_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(TRANSPORT_PID_CONTROLLER_TEST_SRC))))
+ifeq ($(NO_SECURE),true)
+
+# You can't build secure targets if you don't have OpenSSL.
+
+$(BINDIR)/$(CONFIG)/transport_pid_controller_test: openssl_dep_error
+
+else
+
+
+
+
+ifeq ($(NO_PROTOBUF),true)
+
+# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.0.0+.
+
+$(BINDIR)/$(CONFIG)/transport_pid_controller_test: protobuf_dep_error
+
+else
+
+$(BINDIR)/$(CONFIG)/transport_pid_controller_test: $(PROTOBUF_DEP) $(TRANSPORT_PID_CONTROLLER_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+ $(E) "[LD] Linking $@"
+ $(Q) mkdir -p `dirname $@`
+ $(Q) $(LDXX) $(LDFLAGS) $(TRANSPORT_PID_CONTROLLER_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/transport_pid_controller_test
+
+endif
+
+endif
+
+$(OBJDIR)/$(CONFIG)/test/core/transport/pid_controller_test.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+
+deps_transport_pid_controller_test: $(TRANSPORT_PID_CONTROLLER_TEST_OBJS:.o=.dep)
+
+ifneq ($(NO_SECURE),true)
+ifneq ($(NO_DEPS),true)
+-include $(TRANSPORT_PID_CONTROLLER_TEST_OBJS:.o=.dep)
+endif
+endif
+
+
VECTOR_TEST_SRC = \
test/core/support/vector_test.cc \
@@ -18530,26 +18538,6 @@
endif
-LARGE_METADATA_BAD_CLIENT_TEST_SRC = \
- test/core/bad_client/tests/large_metadata.c \
-
-LARGE_METADATA_BAD_CLIENT_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(LARGE_METADATA_BAD_CLIENT_TEST_SRC))))
-
-
-$(BINDIR)/$(CONFIG)/large_metadata_bad_client_test: $(LARGE_METADATA_BAD_CLIENT_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libbad_client_test.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util_unsecure.a $(LIBDIR)/$(CONFIG)/libgrpc_unsecure.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
- $(E) "[LD] Linking $@"
- $(Q) mkdir -p `dirname $@`
- $(Q) $(LD) $(LDFLAGS) $(LARGE_METADATA_BAD_CLIENT_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libbad_client_test.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util_unsecure.a $(LIBDIR)/$(CONFIG)/libgrpc_unsecure.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBS) -o $(BINDIR)/$(CONFIG)/large_metadata_bad_client_test
-
-$(OBJDIR)/$(CONFIG)/test/core/bad_client/tests/large_metadata.o: $(LIBDIR)/$(CONFIG)/libbad_client_test.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util_unsecure.a $(LIBDIR)/$(CONFIG)/libgrpc_unsecure.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
-
-deps_large_metadata_bad_client_test: $(LARGE_METADATA_BAD_CLIENT_TEST_OBJS:.o=.dep)
-
-ifneq ($(NO_DEPS),true)
--include $(LARGE_METADATA_BAD_CLIENT_TEST_OBJS:.o=.dep)
-endif
-
-
SERVER_REGISTERED_METHOD_BAD_CLIENT_TEST_SRC = \
test/core/bad_client/tests/server_registered_method.c \
diff --git a/build.yaml b/build.yaml
index 557025d..78b5a1b 100644
--- a/build.yaml
+++ b/build.yaml
@@ -778,6 +778,7 @@
- src/core/ext/transport/chttp2/transport/bin_decoder.h
- src/core/ext/transport/chttp2/transport/bin_encoder.h
- src/core/ext/transport/chttp2/transport/chttp2_transport.h
+ - src/core/ext/transport/chttp2/transport/flow_control.h
- src/core/ext/transport/chttp2/transport/frame.h
- src/core/ext/transport/chttp2/transport/frame_data.h
- src/core/ext/transport/chttp2/transport/frame_goaway.h
@@ -3406,18 +3407,6 @@
- grpc
- gpr_test_util
- gpr
- uses_polling: false
-- name: transport_pid_controller_test
- build: test
- language: c
- src:
- - test/core/transport/pid_controller_test.c
- deps:
- - grpc_test_util
- - grpc
- - gpr_test_util
- - gpr
- uses_polling: false
- name: transport_security_test
build: test
language: c
@@ -4819,6 +4808,18 @@
- gpr_test_util
- gpr
timeout_seconds: 1200
+- name: transport_pid_controller_test
+ build: test
+ language: c++
+ src:
+ - test/core/transport/pid_controller_test.cc
+ deps:
+ - grpc++_test_util
+ - grpc++
+ - grpc_test_util
+ - grpc
+ - gpr_test_util
+ - gpr
- name: vector_test
gtest: true
build: test
diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec
index 448dfad..a9b451e 100644
--- a/gRPC-Core.podspec
+++ b/gRPC-Core.podspec
@@ -249,6 +249,7 @@
'src/core/ext/transport/chttp2/transport/bin_decoder.h',
'src/core/ext/transport/chttp2/transport/bin_encoder.h',
'src/core/ext/transport/chttp2/transport/chttp2_transport.h',
+ 'src/core/ext/transport/chttp2/transport/flow_control.h',
'src/core/ext/transport/chttp2/transport/frame.h',
'src/core/ext/transport/chttp2/transport/frame_data.h',
'src/core/ext/transport/chttp2/transport/frame_goaway.h',
@@ -751,6 +752,7 @@
'src/core/ext/transport/chttp2/transport/bin_decoder.h',
'src/core/ext/transport/chttp2/transport/bin_encoder.h',
'src/core/ext/transport/chttp2/transport/chttp2_transport.h',
+ 'src/core/ext/transport/chttp2/transport/flow_control.h',
'src/core/ext/transport/chttp2/transport/frame.h',
'src/core/ext/transport/chttp2/transport/frame_data.h',
'src/core/ext/transport/chttp2/transport/frame_goaway.h',
diff --git a/grpc.gemspec b/grpc.gemspec
index cbbc48a..4567058 100644
--- a/grpc.gemspec
+++ b/grpc.gemspec
@@ -181,6 +181,7 @@
s.files += %w( src/core/ext/transport/chttp2/transport/bin_decoder.h )
s.files += %w( src/core/ext/transport/chttp2/transport/bin_encoder.h )
s.files += %w( src/core/ext/transport/chttp2/transport/chttp2_transport.h )
+ s.files += %w( src/core/ext/transport/chttp2/transport/flow_control.h )
s.files += %w( src/core/ext/transport/chttp2/transport/frame.h )
s.files += %w( src/core/ext/transport/chttp2/transport/frame_data.h )
s.files += %w( src/core/ext/transport/chttp2/transport/frame_goaway.h )
diff --git a/package.xml b/package.xml
index dae18db..d08b803 100644
--- a/package.xml
+++ b/package.xml
@@ -193,6 +193,7 @@
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/bin_decoder.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/bin_encoder.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/chttp2_transport.h" role="src" />
+ <file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/flow_control.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/frame.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/frame_data.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/frame_goaway.h" role="src" />
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
index 9462d10..02fc531 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
@@ -54,7 +54,6 @@
#include "src/core/lib/transport/transport.h"
#include "src/core/lib/transport/transport_impl.h"
-#define DEFAULT_WINDOW 65535
#define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024)
#define MAX_WINDOW 0x7fffffffu
#define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024)
@@ -222,7 +221,7 @@
t->write_cb_pool = next;
}
- t->flow_control.bdp_estimator.Destroy();
+ t->flow_control.Destroy();
GRPC_ERROR_UNREF(t->closed_with_error);
gpr_free(t->ping_acks);
@@ -282,10 +281,6 @@
t->endpoint_reading = 1;
t->next_stream_id = is_client ? 1 : 2;
t->is_client = is_client;
- t->flow_control.remote_window = DEFAULT_WINDOW;
- t->flow_control.announced_window = DEFAULT_WINDOW;
- t->flow_control.target_initial_window_size = DEFAULT_WINDOW;
- t->flow_control.t = t;
t->deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0;
t->is_first_frame = true;
grpc_connectivity_state_init(
@@ -325,8 +320,6 @@
keepalive_watchdog_fired_locked, t,
grpc_combiner_scheduler(t->combiner));
- t->flow_control.bdp_estimator.Init(t->peer_string);
-
grpc_chttp2_goaway_parser_init(&t->goaway_parser);
grpc_chttp2_hpack_parser_init(exec_ctx, &t->hpack_parser);
@@ -350,8 +343,7 @@
window -- this should by rights be 0 */
t->force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
t->sent_local_settings = 0;
- t->write_buffer_size = DEFAULT_WINDOW;
- t->flow_control.enable_bdp_probe = true;
+ t->write_buffer_size = grpc_core::chttp2::kDefaultWindow;
if (is_client) {
grpc_slice_buffer_add(&t->outbuf, grpc_slice_from_copied_string(
@@ -396,6 +388,8 @@
t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY;
+ bool enable_bdp = true;
+
if (channel_args) {
for (i = 0; i < channel_args->num_args; i++) {
if (0 == strcmp(channel_args->args[i].key,
@@ -456,8 +450,7 @@
&channel_args->args[i], {0, 0, MAX_WRITE_BUFFER_SIZE});
} else if (0 ==
strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_BDP_PROBE)) {
- t->flow_control.enable_bdp_probe =
- grpc_channel_arg_get_integer(&channel_args->args[i], {1, 0, 1});
+ enable_bdp = grpc_channel_arg_get_bool(&channel_args->args[i], true);
} else if (0 == strcmp(channel_args->args[i].key,
GRPC_ARG_KEEPALIVE_TIME_MS)) {
const int value = grpc_channel_arg_get_integer(
@@ -552,6 +545,8 @@
}
}
+ t->flow_control.Init(exec_ctx, t, enable_bdp);
+
/* No pings allowed before receiving a header or data frame. */
t->ping_state.pings_before_data_required = 0;
t->ping_state.is_delayed_ping_timer_set = false;
@@ -572,15 +567,13 @@
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED;
}
- if (t->flow_control.enable_bdp_probe) {
+ if (enable_bdp) {
GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping");
schedule_bdp_ping_locked(exec_ctx, t);
- }
- grpc_chttp2_act_on_flowctl_action(
- exec_ctx,
- grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control, NULL), t,
- NULL);
+ grpc_chttp2_act_on_flowctl_action(
+ exec_ctx, t->flow_control->PeriodicUpdate(exec_ctx), t, NULL);
+ }
grpc_chttp2_initiate_write(exec_ctx, t,
GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE);
@@ -718,7 +711,7 @@
post_destructive_reclaimer(exec_ctx, t);
}
- s->flow_control.s = s;
+ s->flow_control.Init(t->flow_control.get(), s);
GPR_TIMER_END("init_stream", 0);
return 0;
@@ -769,7 +762,7 @@
GRPC_ERROR_UNREF(s->write_closed_error);
GRPC_ERROR_UNREF(s->byte_stream_error);
- grpc_chttp2_flowctl_destroy_stream(&t->flow_control, &s->flow_control);
+ s->flow_control.Destroy();
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "stream");
@@ -1638,13 +1631,10 @@
if (s->id != 0) {
if (!s->read_closed) {
already_received = s->frame_storage.length;
- grpc_chttp2_flowctl_incoming_bs_update(
- &t->flow_control, &s->flow_control, GRPC_HEADER_SIZE_IN_BYTES,
- already_received);
- grpc_chttp2_act_on_flowctl_action(
- exec_ctx, grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control,
- &s->flow_control),
- t, s);
+ s->flow_control->IncomingByteStreamUpdate(GRPC_HEADER_SIZE_IN_BYTES,
+ already_received);
+ grpc_chttp2_act_on_flowctl_action(exec_ctx,
+ s->flow_control->MakeAction(), t, s);
}
}
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
@@ -2420,49 +2410,44 @@
* INPUT PROCESSING - PARSING
*/
-void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_flowctl_action action,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s) {
- switch (action.send_stream_update) {
- case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED:
+template <class F>
+static void WithUrgency(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ grpc_core::chttp2::FlowControlAction::Urgency urgency,
+ grpc_chttp2_initiate_write_reason reason, F action) {
+ switch (urgency) {
+ case grpc_core::chttp2::FlowControlAction::Urgency::NO_ACTION_NEEDED:
break;
- case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY:
- grpc_chttp2_mark_stream_writable(exec_ctx, t, s);
- grpc_chttp2_initiate_write(
- exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL);
- break;
- case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE:
- grpc_chttp2_mark_stream_writable(exec_ctx, t, s);
+ case grpc_core::chttp2::FlowControlAction::Urgency::UPDATE_IMMEDIATELY:
+ grpc_chttp2_initiate_write(exec_ctx, t, reason);
+ // fallthrough
+ case grpc_core::chttp2::FlowControlAction::Urgency::QUEUE_UPDATE:
+ action();
break;
}
- switch (action.send_transport_update) {
- case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED:
- break;
- case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY:
- grpc_chttp2_initiate_write(
- exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL);
- break;
- // this is the same as no action b/c every time the transport enters the
- // writing path it will maybe do an update
- case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE:
- break;
- }
- if (action.send_setting_update != GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED) {
- if (action.initial_window_size > 0) {
- queue_setting_update(exec_ctx, t,
- GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
- (uint32_t)action.initial_window_size);
- }
- if (action.max_frame_size > 0) {
- queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
- (uint32_t)action.max_frame_size);
- }
- if (action.send_setting_update == GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY) {
- grpc_chttp2_initiate_write(exec_ctx, t,
- GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS);
- }
- }
+}
+
+void grpc_chttp2_act_on_flowctl_action(
+ grpc_exec_ctx *exec_ctx, const grpc_core::chttp2::FlowControlAction &action,
+ grpc_chttp2_transport *t, grpc_chttp2_stream *s) {
+ WithUrgency(
+ exec_ctx, t, action.send_stream_update(),
+ GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL,
+ [exec_ctx, t, s]() { grpc_chttp2_mark_stream_writable(exec_ctx, t, s); });
+ WithUrgency(exec_ctx, t, action.send_transport_update(),
+ GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL, []() {});
+ WithUrgency(exec_ctx, t, action.send_initial_window_update(),
+ GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS,
+ [exec_ctx, t, &action]() {
+ queue_setting_update(exec_ctx, t,
+ GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
+ action.initial_window_size());
+ });
+ WithUrgency(
+ exec_ctx, t, action.send_max_frame_size_update(),
+ GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [exec_ctx, t, &action]() {
+ queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
+ action.max_frame_size());
+ });
}
static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx,
@@ -2518,7 +2503,7 @@
grpc_error *errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE,
GRPC_ERROR_NONE};
for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) {
- t->flow_control.bdp_estimator->AddIncomingBytes(
+ t->flow_control->bdp_estimator()->AddIncomingBytes(
(int64_t)GRPC_SLICE_LENGTH(t->read_buffer.slices[i]));
errors[1] =
grpc_chttp2_perform_read(exec_ctx, t, t->read_buffer.slices[i]);
@@ -2535,8 +2520,8 @@
GPR_TIMER_END("reading_action.parse", 0);
GPR_TIMER_BEGIN("post_parse_locked", 0);
- if (t->flow_control.initial_window_update != 0) {
- if (t->flow_control.initial_window_update > 0) {
+ if (t->initial_window_update != 0) {
+ if (t->initial_window_update > 0) {
grpc_chttp2_stream *s;
while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) {
grpc_chttp2_mark_stream_writable(exec_ctx, t, s);
@@ -2545,7 +2530,7 @@
GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING);
}
}
- t->flow_control.initial_window_update = 0;
+ t->initial_window_update = 0;
}
GPR_TIMER_END("post_parse_locked", 0);
}
@@ -2568,10 +2553,8 @@
if (keep_reading) {
grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer,
&t->read_action_locked);
- grpc_chttp2_act_on_flowctl_action(
- exec_ctx,
- grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control, NULL), t,
- NULL);
+ grpc_chttp2_act_on_flowctl_action(exec_ctx, t->flow_control->MakeAction(),
+ t, NULL);
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keep_reading");
} else {
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "reading_action");
@@ -2588,7 +2571,7 @@
// that kicks off finishes, it's unreffed
static void schedule_bdp_ping_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) {
- t->flow_control.bdp_estimator->SchedulePing();
+ t->flow_control->bdp_estimator()->SchedulePing();
send_ping_locked(exec_ctx, t, &t->start_bdp_ping_locked,
&t->finish_bdp_ping_locked);
}
@@ -2604,7 +2587,7 @@
if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer);
}
- t->flow_control.bdp_estimator->StartPing();
+ t->flow_control->bdp_estimator()->StartPing();
}
static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
@@ -2618,7 +2601,10 @@
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping");
return;
}
- grpc_millis next_ping = t->flow_control.bdp_estimator->CompletePing(exec_ctx);
+ grpc_millis next_ping =
+ t->flow_control->bdp_estimator()->CompletePing(exec_ctx);
+ grpc_chttp2_act_on_flowctl_action(
+ exec_ctx, t->flow_control->PeriodicUpdate(exec_ctx), t, nullptr);
GPR_ASSERT(!t->have_next_bdp_ping_timer);
t->have_next_bdp_ping_timer = true;
grpc_timer_init(exec_ctx, &t->next_bdp_ping_timer, next_ping,
@@ -2844,13 +2830,10 @@
size_t cur_length = s->frame_storage.length;
if (!s->read_closed) {
- grpc_chttp2_flowctl_incoming_bs_update(&t->flow_control, &s->flow_control,
- bs->next_action.max_size_hint,
- cur_length);
- grpc_chttp2_act_on_flowctl_action(
- exec_ctx, grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control,
- &s->flow_control),
- t, s);
+ s->flow_control->IncomingByteStreamUpdate(bs->next_action.max_size_hint,
+ cur_length);
+ grpc_chttp2_act_on_flowctl_action(exec_ctx, s->flow_control->MakeAction(),
+ t, s);
}
GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0);
if (s->frame_storage.length > 0) {
diff --git a/src/core/ext/transport/chttp2/transport/flow_control.cc b/src/core/ext/transport/chttp2/transport/flow_control.cc
index d0e80c4..dd80036 100644
--- a/src/core/ext/transport/chttp2/transport/flow_control.cc
+++ b/src/core/ext/transport/chttp2/transport/flow_control.cc
@@ -16,7 +16,7 @@
*
*/
-#include "src/core/ext/transport/chttp2/transport/internal.h"
+#include "src/core/ext/transport/chttp2/transport/flow_control.h"
#include <inttypes.h>
#include <limits.h>
@@ -28,38 +28,15 @@
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
+#include "src/core/ext/transport/chttp2/transport/internal.h"
#include "src/core/lib/support/string.h"
-static uint32_t grpc_chttp2_target_announced_window(
- const grpc_chttp2_transport_flowctl* tfc);
+namespace grpc_core {
+namespace chttp2 {
-#ifndef NDEBUG
+namespace {
-typedef struct {
- int64_t remote_window;
- int64_t target_window;
- int64_t announced_window;
- int64_t remote_window_delta;
- int64_t local_window_delta;
- int64_t announced_window_delta;
- uint32_t local_init_window;
- uint32_t local_max_frame;
-} shadow_flow_control;
-
-static void pretrace(shadow_flow_control* shadow_fc,
- grpc_chttp2_transport_flowctl* tfc,
- grpc_chttp2_stream_flowctl* sfc) {
- shadow_fc->remote_window = tfc->remote_window;
- shadow_fc->target_window = grpc_chttp2_target_announced_window(tfc);
- shadow_fc->announced_window = tfc->announced_window;
- if (sfc != NULL) {
- shadow_fc->remote_window_delta = sfc->remote_window_delta;
- shadow_fc->local_window_delta = sfc->local_window_delta;
- shadow_fc->announced_window_delta = sfc->announced_window_delta;
- }
-}
-
-#define TRACE_PADDING 30
+static constexpr const int kTracePadding = 30;
static char* fmt_int64_diff_str(int64_t old_val, int64_t new_val) {
char* str;
@@ -68,7 +45,7 @@
} else {
gpr_asprintf(&str, "%" PRId64 "", old_val);
}
- char* str_lp = gpr_leftpad(str, ' ', TRACE_PADDING);
+ char* str_lp = gpr_leftpad(str, ' ', kTracePadding);
gpr_free(str);
return str_lp;
}
@@ -80,47 +57,58 @@
} else {
gpr_asprintf(&str, "%" PRIu32 "", old_val);
}
- char* str_lp = gpr_leftpad(str, ' ', TRACE_PADDING);
+ char* str_lp = gpr_leftpad(str, ' ', kTracePadding);
gpr_free(str);
return str_lp;
}
+} // namespace
-static void posttrace(shadow_flow_control* shadow_fc,
- grpc_chttp2_transport_flowctl* tfc,
- grpc_chttp2_stream_flowctl* sfc, const char* reason) {
+void FlowControlTrace::Init(const char* reason, TransportFlowControl* tfc,
+ StreamFlowControl* sfc) {
+ tfc_ = tfc;
+ sfc_ = sfc;
+ reason_ = reason;
+ remote_window_ = tfc->remote_window();
+ target_window_ = tfc->target_window();
+ announced_window_ = tfc->announced_window();
+ if (sfc != nullptr) {
+ remote_window_delta_ = sfc->remote_window_delta();
+ local_window_delta_ = sfc->local_window_delta();
+ announced_window_delta_ = sfc->announced_window_delta();
+ }
+}
+
+void FlowControlTrace::Finish() {
uint32_t acked_local_window =
- tfc->t->settings[GRPC_SENT_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
+ tfc_->transport()->settings[GRPC_SENT_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
uint32_t remote_window =
- tfc->t->settings[GRPC_PEER_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
- char* trw_str =
- fmt_int64_diff_str(shadow_fc->remote_window, tfc->remote_window);
- char* tlw_str = fmt_int64_diff_str(shadow_fc->target_window,
- grpc_chttp2_target_announced_window(tfc));
+ tfc_->transport()->settings[GRPC_PEER_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
+ char* trw_str = fmt_int64_diff_str(remote_window_, tfc_->remote_window());
+ char* tlw_str = fmt_int64_diff_str(target_window_, tfc_->target_window());
char* taw_str =
- fmt_int64_diff_str(shadow_fc->announced_window, tfc->announced_window);
+ fmt_int64_diff_str(announced_window_, tfc_->announced_window());
char* srw_str;
char* slw_str;
char* saw_str;
- if (sfc != NULL) {
- srw_str = fmt_int64_diff_str(shadow_fc->remote_window_delta + remote_window,
- sfc->remote_window_delta + remote_window);
- slw_str =
- fmt_int64_diff_str(shadow_fc->local_window_delta + acked_local_window,
- sfc->local_window_delta + acked_local_window);
- saw_str = fmt_int64_diff_str(
- shadow_fc->announced_window_delta + acked_local_window,
- sfc->announced_window_delta + acked_local_window);
+ if (sfc_ != nullptr) {
+ srw_str = fmt_int64_diff_str(remote_window_delta_ + remote_window,
+ sfc_->remote_window_delta() + remote_window);
+ slw_str = fmt_int64_diff_str(local_window_delta_ + acked_local_window,
+ local_window_delta_ + acked_local_window);
+ saw_str = fmt_int64_diff_str(announced_window_delta_ + acked_local_window,
+ announced_window_delta_ + acked_local_window);
} else {
- srw_str = gpr_leftpad("", ' ', TRACE_PADDING);
- slw_str = gpr_leftpad("", ' ', TRACE_PADDING);
- saw_str = gpr_leftpad("", ' ', TRACE_PADDING);
+ srw_str = gpr_leftpad("", ' ', kTracePadding);
+ slw_str = gpr_leftpad("", ' ', kTracePadding);
+ saw_str = gpr_leftpad("", ' ', kTracePadding);
}
gpr_log(GPR_DEBUG,
"%p[%u][%s] | %s | trw:%s, ttw:%s, taw:%s, srw:%s, slw:%s, saw:%s",
- tfc, sfc != NULL ? sfc->s->id : 0, tfc->t->is_client ? "cli" : "svr",
- reason, trw_str, tlw_str, taw_str, srw_str, slw_str, saw_str);
+ tfc_, sfc_ != nullptr ? sfc_->stream()->id : 0,
+ tfc_->transport()->is_client ? "cli" : "svr", reason_, trw_str,
+ tlw_str, taw_str, srw_str, slw_str, saw_str);
gpr_free(trw_str);
gpr_free(tlw_str);
gpr_free(taw_str);
@@ -129,13 +117,13 @@
gpr_free(saw_str);
}
-static const char* urgency_to_string(grpc_chttp2_flowctl_urgency urgency) {
- switch (urgency) {
- case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED:
+const char* FlowControlAction::UrgencyString(Urgency u) {
+ switch (u) {
+ case Urgency::NO_ACTION_NEEDED:
return "no action";
- case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY:
+ case Urgency::UPDATE_IMMEDIATELY:
return "update immediately";
- case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE:
+ case Urgency::QUEUE_UPDATE:
return "queue update";
default:
GPR_UNREACHABLE_CODE(return "unknown");
@@ -143,209 +131,132 @@
GPR_UNREACHABLE_CODE(return "unknown");
}
-static void trace_action(grpc_chttp2_transport_flowctl* tfc,
- grpc_chttp2_flowctl_action action) {
+void FlowControlAction::Trace(grpc_chttp2_transport* t) const {
char* iw_str = fmt_uint32_diff_str(
- tfc->t->settings[GRPC_SENT_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
- action.initial_window_size);
+ t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
+ initial_window_size_);
char* mf_str = fmt_uint32_diff_str(
- tfc->t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
- action.max_frame_size);
- gpr_log(GPR_DEBUG, "t[%s], s[%s], settings[%s] iw:%s mf:%s",
- urgency_to_string(action.send_transport_update),
- urgency_to_string(action.send_stream_update),
- urgency_to_string(action.send_setting_update), iw_str, mf_str);
+ t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
+ max_frame_size_);
+ gpr_log(GPR_DEBUG, "t[%s], s[%s], iw:%s:%s mf:%s:%s",
+ UrgencyString(send_transport_update_),
+ UrgencyString(send_stream_update_),
+ UrgencyString(send_initial_window_update_), iw_str,
+ UrgencyString(send_max_frame_size_update_), mf_str);
gpr_free(iw_str);
gpr_free(mf_str);
}
-#define PRETRACE(tfc, sfc) \
- shadow_flow_control shadow_fc; \
- GRPC_FLOW_CONTROL_IF_TRACING(pretrace(&shadow_fc, tfc, sfc))
-#define POSTTRACE(tfc, sfc, reason) \
- GRPC_FLOW_CONTROL_IF_TRACING(posttrace(&shadow_fc, tfc, sfc, reason))
-#define TRACEACTION(tfc, action) \
- GRPC_FLOW_CONTROL_IF_TRACING(trace_action(tfc, action))
-#else
-#define PRETRACE(tfc, sfc)
-#define POSTTRACE(tfc, sfc, reason)
-#define TRACEACTION(tfc, action)
-#endif
+TransportFlowControl::TransportFlowControl(grpc_exec_ctx* exec_ctx,
+ const grpc_chttp2_transport* t,
+ bool enable_bdp_probe)
+ : t_(t),
+ enable_bdp_probe_(enable_bdp_probe),
+ bdp_estimator_(t->peer_string),
+ pid_controller_(grpc_core::PidController::Args()
+ .set_gain_p(4)
+ .set_gain_i(8)
+ .set_gain_d(0)
+ .set_initial_control_value(TargetLogBdp())
+ .set_min_control_value(-1)
+ .set_max_control_value(25)
+ .set_integral_range(10)),
+ last_pid_update_(grpc_exec_ctx_now(exec_ctx)) {}
-/* How many bytes of incoming flow control would we like to advertise */
-static uint32_t grpc_chttp2_target_announced_window(
- const grpc_chttp2_transport_flowctl* tfc) {
- return (uint32_t)GPR_MIN((int64_t)((1u << 31) - 1),
- tfc->announced_stream_total_over_incoming_window +
- tfc->target_initial_window_size);
-}
-
-// we have sent data on the wire, we must track this in our bookkeeping for the
-// remote peer's flow control.
-void grpc_chttp2_flowctl_sent_data(grpc_chttp2_transport_flowctl* tfc,
- grpc_chttp2_stream_flowctl* sfc,
- int64_t size) {
- PRETRACE(tfc, sfc);
- tfc->remote_window -= size;
- sfc->remote_window_delta -= size;
- POSTTRACE(tfc, sfc, " data sent");
-}
-
-static void announced_window_delta_preupdate(grpc_chttp2_transport_flowctl* tfc,
- grpc_chttp2_stream_flowctl* sfc) {
- if (sfc->announced_window_delta > 0) {
- tfc->announced_stream_total_over_incoming_window -=
- sfc->announced_window_delta;
- } else {
- tfc->announced_stream_total_under_incoming_window +=
- -sfc->announced_window_delta;
+uint32_t TransportFlowControl::MaybeSendUpdate(bool writing_anyway) {
+ FlowControlTrace trace("t updt sent", this, nullptr);
+ const uint32_t target_announced_window = target_window();
+ if ((writing_anyway || announced_window_ <= target_announced_window / 2) &&
+ announced_window_ != target_announced_window) {
+ const uint32_t announce = (uint32_t)GPR_CLAMP(
+ target_announced_window - announced_window_, 0, UINT32_MAX);
+ announced_window_ += announce;
+ return announce;
}
+ return 0;
}
-static void announced_window_delta_postupdate(
- grpc_chttp2_transport_flowctl* tfc, grpc_chttp2_stream_flowctl* sfc) {
- if (sfc->announced_window_delta > 0) {
- tfc->announced_stream_total_over_incoming_window +=
- sfc->announced_window_delta;
- } else {
- tfc->announced_stream_total_under_incoming_window -=
- -sfc->announced_window_delta;
- }
-}
-
-// We have received data from the wire. We must track this in our own flow
-// control bookkeeping.
-// Returns an error if the incoming frame violates our flow control.
-grpc_error* grpc_chttp2_flowctl_recv_data(grpc_chttp2_transport_flowctl* tfc,
- grpc_chttp2_stream_flowctl* sfc,
- int64_t incoming_frame_size) {
- uint32_t sent_init_window =
- tfc->t->settings[GRPC_SENT_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
- uint32_t acked_init_window =
- tfc->t->settings[GRPC_ACKED_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
- PRETRACE(tfc, sfc);
- if (incoming_frame_size > tfc->announced_window) {
+grpc_error* TransportFlowControl::ValidateRecvData(
+ int64_t incoming_frame_size) {
+ if (incoming_frame_size > announced_window_) {
char* msg;
gpr_asprintf(&msg,
"frame of size %" PRId64 " overflows local window of %" PRId64,
- incoming_frame_size, tfc->announced_window);
+ incoming_frame_size, announced_window_);
grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
gpr_free(msg);
return err;
}
-
- if (sfc != NULL) {
- int64_t acked_stream_window =
- sfc->announced_window_delta + acked_init_window;
- int64_t sent_stream_window = sfc->announced_window_delta + sent_init_window;
- if (incoming_frame_size > acked_stream_window) {
- if (incoming_frame_size <= sent_stream_window) {
- gpr_log(
- GPR_ERROR,
- "Incoming frame of size %" PRId64
- " exceeds local window size of %" PRId64
- ".\n"
- "The (un-acked, future) window size would be %" PRId64
- " which is not exceeded.\n"
- "This would usually cause a disconnection, but allowing it due to"
- "broken HTTP2 implementations in the wild.\n"
- "See (for example) https://github.com/netty/netty/issues/6520.",
- incoming_frame_size, acked_stream_window, sent_stream_window);
- } else {
- char* msg;
- gpr_asprintf(&msg, "frame of size %" PRId64
- " overflows local window of %" PRId64,
- incoming_frame_size, acked_stream_window);
- grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
- gpr_free(msg);
- return err;
- }
- }
-
- announced_window_delta_preupdate(tfc, sfc);
- sfc->announced_window_delta -= incoming_frame_size;
- announced_window_delta_postupdate(tfc, sfc);
- sfc->local_window_delta -= incoming_frame_size;
- }
-
- tfc->announced_window -= incoming_frame_size;
-
- POSTTRACE(tfc, sfc, " data recv");
return GRPC_ERROR_NONE;
}
-// Returns a non zero announce integer if we should send a transport window
-// update
-uint32_t grpc_chttp2_flowctl_maybe_send_transport_update(
- grpc_chttp2_transport_flowctl* tfc, bool writing_anyway) {
- PRETRACE(tfc, NULL);
- uint32_t target_announced_window = grpc_chttp2_target_announced_window(tfc);
- uint32_t threshold_to_send_transport_window_update =
- tfc->t->outbuf.count > 0 ? 3 * target_announced_window / 4
- : target_announced_window / 2;
- if ((writing_anyway ||
- tfc->announced_window <= threshold_to_send_transport_window_update) &&
- tfc->announced_window != target_announced_window) {
+StreamFlowControl::StreamFlowControl(TransportFlowControl* tfc,
+ const grpc_chttp2_stream* s)
+ : tfc_(tfc), s_(s) {}
+
+grpc_error* StreamFlowControl::RecvData(int64_t incoming_frame_size) {
+ FlowControlTrace trace(" data recv", tfc_, this);
+
+ grpc_error* error = GRPC_ERROR_NONE;
+ error = tfc_->ValidateRecvData(incoming_frame_size);
+ if (error != GRPC_ERROR_NONE) return error;
+
+ uint32_t sent_init_window =
+ tfc_->transport()->settings[GRPC_SENT_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
+ uint32_t acked_init_window =
+ tfc_->transport()->settings[GRPC_ACKED_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
+
+ int64_t acked_stream_window = announced_window_delta_ + acked_init_window;
+ int64_t sent_stream_window = announced_window_delta_ + sent_init_window;
+ if (incoming_frame_size > acked_stream_window) {
+ if (incoming_frame_size <= sent_stream_window) {
+ gpr_log(GPR_ERROR,
+ "Incoming frame of size %" PRId64
+ " exceeds local window size of %" PRId64
+ ".\n"
+ "The (un-acked, future) window size would be %" PRId64
+ " which is not exceeded.\n"
+ "This would usually cause a disconnection, but allowing it due to"
+ "broken HTTP2 implementations in the wild.\n"
+ "See (for example) https://github.com/netty/netty/issues/6520.",
+ incoming_frame_size, acked_stream_window, sent_stream_window);
+ } else {
+ char* msg;
+ gpr_asprintf(&msg, "frame of size %" PRId64
+ " overflows local window of %" PRId64,
+ incoming_frame_size, acked_stream_window);
+ grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
+ gpr_free(msg);
+ return err;
+ }
+ }
+
+ UpdateAnnouncedWindowDelta(tfc_, -incoming_frame_size);
+ local_window_delta_ -= incoming_frame_size;
+ tfc_->CommitRecvData(incoming_frame_size);
+ return GRPC_ERROR_NONE;
+}
+
+uint32_t StreamFlowControl::MaybeSendUpdate() {
+ FlowControlTrace trace("s updt sent", tfc_, this);
+ if (local_window_delta_ > announced_window_delta_) {
uint32_t announce = (uint32_t)GPR_CLAMP(
- target_announced_window - tfc->announced_window, 0, UINT32_MAX);
- tfc->announced_window += announce;
- POSTTRACE(tfc, NULL, "t updt sent");
+ local_window_delta_ - announced_window_delta_, 0, UINT32_MAX);
+ UpdateAnnouncedWindowDelta(tfc_, announce);
return announce;
}
- GRPC_FLOW_CONTROL_IF_TRACING(
- gpr_log(GPR_DEBUG, "%p[0][%s] will not send transport update", tfc,
- tfc->t->is_client ? "cli" : "svr"));
return 0;
}
-// Returns a non zero announce integer if we should send a stream window update
-uint32_t grpc_chttp2_flowctl_maybe_send_stream_update(
- grpc_chttp2_transport_flowctl* tfc, grpc_chttp2_stream_flowctl* sfc) {
- PRETRACE(tfc, sfc);
- if (sfc->local_window_delta > sfc->announced_window_delta) {
- uint32_t announce = (uint32_t)GPR_CLAMP(
- sfc->local_window_delta - sfc->announced_window_delta, 0, UINT32_MAX);
- announced_window_delta_preupdate(tfc, sfc);
- sfc->announced_window_delta += announce;
- announced_window_delta_postupdate(tfc, sfc);
- POSTTRACE(tfc, sfc, "s updt sent");
- return announce;
- }
- GRPC_FLOW_CONTROL_IF_TRACING(
- gpr_log(GPR_DEBUG, "%p[%u][%s] will not send stream update", tfc,
- sfc->s->id, tfc->t->is_client ? "cli" : "svr"));
- return 0;
-}
-
-// we have received a WINDOW_UPDATE frame for a transport
-void grpc_chttp2_flowctl_recv_transport_update(
- grpc_chttp2_transport_flowctl* tfc, uint32_t size) {
- PRETRACE(tfc, NULL);
- tfc->remote_window += size;
- POSTTRACE(tfc, NULL, "t updt recv");
-}
-
-// we have received a WINDOW_UPDATE frame for a stream
-void grpc_chttp2_flowctl_recv_stream_update(grpc_chttp2_transport_flowctl* tfc,
- grpc_chttp2_stream_flowctl* sfc,
- uint32_t size) {
- PRETRACE(tfc, sfc);
- sfc->remote_window_delta += size;
- POSTTRACE(tfc, sfc, "s updt recv");
-}
-
-void grpc_chttp2_flowctl_incoming_bs_update(grpc_chttp2_transport_flowctl* tfc,
- grpc_chttp2_stream_flowctl* sfc,
- size_t max_size_hint,
- size_t have_already) {
- PRETRACE(tfc, sfc);
+void StreamFlowControl::IncomingByteStreamUpdate(size_t max_size_hint,
+ size_t have_already) {
+ FlowControlTrace trace("app st recv", tfc_, this);
uint32_t max_recv_bytes;
uint32_t sent_init_window =
- tfc->t->settings[GRPC_SENT_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
+ tfc_->transport()->settings[GRPC_SENT_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
/* clamp max recv hint to an allowable size */
if (max_size_hint >= UINT32_MAX - sent_init_window) {
@@ -363,68 +274,18 @@
/* add some small lookahead to keep pipelines flowing */
GPR_ASSERT(max_recv_bytes <= UINT32_MAX - sent_init_window);
- if (sfc->local_window_delta < max_recv_bytes) {
+ if (local_window_delta_ < max_recv_bytes) {
uint32_t add_max_recv_bytes =
- (uint32_t)(max_recv_bytes - sfc->local_window_delta);
- sfc->local_window_delta += add_max_recv_bytes;
+ (uint32_t)(max_recv_bytes - local_window_delta_);
+ local_window_delta_ += add_max_recv_bytes;
}
- POSTTRACE(tfc, sfc, "app st recv");
-}
-
-void grpc_chttp2_flowctl_destroy_stream(grpc_chttp2_transport_flowctl* tfc,
- grpc_chttp2_stream_flowctl* sfc) {
- announced_window_delta_preupdate(tfc, sfc);
-}
-
-// Returns an urgency with which to make an update
-static grpc_chttp2_flowctl_urgency delta_is_significant(
- const grpc_chttp2_transport_flowctl* tfc, int32_t value,
- grpc_chttp2_setting_id setting_id) {
- int64_t delta = (int64_t)value -
- (int64_t)tfc->t->settings[GRPC_LOCAL_SETTINGS][setting_id];
- // TODO(ncteisen): tune this
- if (delta != 0 && (delta <= -value / 5 || delta >= value / 5)) {
- return GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE;
- } else {
- return GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED;
- }
-}
-
-// Takes in a target and uses the pid controller to return a stabilized
-// guess at the new bdp.
-static double get_pid_controller_guess(grpc_exec_ctx* exec_ctx,
- grpc_chttp2_transport_flowctl* tfc,
- double target) {
- grpc_millis now = grpc_exec_ctx_now(exec_ctx);
- if (!tfc->pid_controller_initialized) {
- tfc->last_pid_update = now;
- tfc->pid_controller_initialized = true;
- grpc_pid_controller_args args;
- memset(&args, 0, sizeof(args));
- args.gain_p = 4;
- args.gain_i = 8;
- args.gain_d = 0;
- args.initial_control_value = target;
- args.min_control_value = -1;
- args.max_control_value = 25;
- args.integral_range = 10;
- grpc_pid_controller_init(&tfc->pid_controller, args);
- return pow(2, target);
- }
- double bdp_error = target - grpc_pid_controller_last(&tfc->pid_controller);
- double dt = (double)(now - tfc->last_pid_update) * 1e-3;
- double log2_bdp_guess =
- grpc_pid_controller_update(&tfc->pid_controller, bdp_error, dt);
- tfc->last_pid_update = now;
- return pow(2, log2_bdp_guess);
}
// Take in a target and modifies it based on the memory pressure of the system
-static double get_target_under_memory_pressure(
- grpc_chttp2_transport_flowctl* tfc, double target) {
+static double AdjustForMemoryPressure(grpc_resource_quota* quota,
+ double target) {
// do not increase window under heavy memory pressure.
- double memory_pressure = grpc_resource_quota_get_memory_pressure(
- grpc_resource_user_quota(grpc_endpoint_get_resource_user(tfc->t->ep)));
+ double memory_pressure = grpc_resource_quota_get_memory_pressure(quota);
static const double kLowMemPressure = 0.1;
static const double kZeroTarget = 22;
static const double kHighMemPressure = 0.8;
@@ -439,75 +300,82 @@
return target;
}
-grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action(
- grpc_exec_ctx* exec_ctx, grpc_chttp2_transport_flowctl* tfc,
- grpc_chttp2_stream_flowctl* sfc) {
- grpc_chttp2_flowctl_action action;
- memset(&action, 0, sizeof(action));
+double TransportFlowControl::TargetLogBdp() {
+ return AdjustForMemoryPressure(
+ grpc_resource_user_quota(grpc_endpoint_get_resource_user(t_->ep)),
+ 1 + log2(bdp_estimator_.EstimateBdp()));
+}
+
+double TransportFlowControl::SmoothLogBdp(grpc_exec_ctx* exec_ctx,
+ double value) {
+ grpc_millis now = grpc_exec_ctx_now(exec_ctx);
+ double bdp_error = value - pid_controller_.last_control_value();
+ const double dt = (double)(now - last_pid_update_) * 1e-3;
+ last_pid_update_ = now;
+ return pid_controller_.Update(bdp_error, dt);
+}
+
+FlowControlAction::Urgency TransportFlowControl::DeltaUrgency(
+ int32_t value, grpc_chttp2_setting_id setting_id) {
+ int64_t delta =
+ (int64_t)value - (int64_t)t_->settings[GRPC_LOCAL_SETTINGS][setting_id];
// TODO(ncteisen): tune this
- if (sfc != NULL && !sfc->s->read_closed) {
- uint32_t sent_init_window =
- tfc->t->settings[GRPC_SENT_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
- if ((int64_t)sfc->local_window_delta >
- (int64_t)sfc->announced_window_delta &&
- (int64_t)sfc->announced_window_delta + sent_init_window <=
- sent_init_window / 2) {
- action.send_stream_update = GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY;
- } else if (sfc->local_window_delta > sfc->announced_window_delta) {
- action.send_stream_update = GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE;
- }
+ if (delta != 0 && (delta <= -value / 5 || delta >= value / 5)) {
+ return FlowControlAction::Urgency::QUEUE_UPDATE;
+ } else {
+ return FlowControlAction::Urgency::NO_ACTION_NEEDED;
}
- if (tfc->enable_bdp_probe) {
+}
+
+FlowControlAction TransportFlowControl::PeriodicUpdate(
+ grpc_exec_ctx* exec_ctx) {
+ FlowControlAction action;
+ if (enable_bdp_probe_) {
// get bdp estimate and update initial_window accordingly.
- int64_t estimate = -1;
- if (tfc->bdp_estimator->EstimateBdp(&estimate)) {
- double target = 1 + log2((double)estimate);
+ // target might change based on how much memory pressure we are under
+ // TODO(ncteisen): experiment with setting target to be huge under low
+ // memory pressure.
+ const double target = pow(2, SmoothLogBdp(exec_ctx, TargetLogBdp()));
- // target might change based on how much memory pressure we are under
- // TODO(ncteisen): experiment with setting target to be huge under low
- // memory pressure.
- target = get_target_under_memory_pressure(tfc, target);
+ // Though initial window 'could' drop to 0, we keep the floor at 128
+ target_initial_window_size_ = (int32_t)GPR_CLAMP(target, 128, INT32_MAX);
- // run our target through the pid controller to stabilize change.
- // TODO(ncteisen): experiment with other controllers here.
- double bdp_guess = get_pid_controller_guess(exec_ctx, tfc, target);
-
- // Though initial window 'could' drop to 0, we keep the floor at 128
- tfc->target_initial_window_size =
- (int32_t)GPR_CLAMP(bdp_guess, 128, INT32_MAX);
-
- grpc_chttp2_flowctl_urgency init_window_update_urgency =
- delta_is_significant(tfc, tfc->target_initial_window_size,
- GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE);
- if (init_window_update_urgency != GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED) {
- action.send_setting_update = init_window_update_urgency;
- action.initial_window_size = (uint32_t)tfc->target_initial_window_size;
- }
- }
+ action.set_send_initial_window_update(
+ DeltaUrgency(target_initial_window_size_,
+ GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE),
+ target_initial_window_size_);
// get bandwidth estimate and update max_frame accordingly.
- double bw_dbl = -1;
- if (tfc->bdp_estimator->EstimateBandwidth(&bw_dbl)) {
- // we target the max of BDP or bandwidth in microseconds.
- int32_t frame_size = (int32_t)GPR_CLAMP(
- GPR_MAX((int32_t)GPR_CLAMP(bw_dbl, 0, INT_MAX) / 1000,
- tfc->target_initial_window_size),
- 16384, 16777215);
- grpc_chttp2_flowctl_urgency frame_size_urgency = delta_is_significant(
- tfc, frame_size, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE);
- if (frame_size_urgency != GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED) {
- if (frame_size_urgency > action.send_setting_update) {
- action.send_setting_update = frame_size_urgency;
- }
- action.max_frame_size = (uint32_t)frame_size;
- }
+ double bw_dbl = bdp_estimator_.EstimateBandwidth();
+ // we target the max of BDP or bandwidth in microseconds.
+ int32_t frame_size = (int32_t)GPR_CLAMP(
+ GPR_MAX((int32_t)GPR_CLAMP(bw_dbl, 0, INT_MAX) / 1000,
+ target_initial_window_size_),
+ 16384, 16777215);
+ action.set_send_max_frame_size_update(
+ DeltaUrgency(frame_size, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE),
+ frame_size);
+ }
+ return UpdateAction(action);
+}
+
+FlowControlAction StreamFlowControl::UpdateAction(FlowControlAction action) {
+ // TODO(ncteisen): tune this
+ if (!s_->read_closed) {
+ uint32_t sent_init_window =
+ tfc_->transport()->settings[GRPC_SENT_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
+ if (local_window_delta_ > announced_window_delta_ &&
+ announced_window_delta_ + sent_init_window <= sent_init_window / 2) {
+ action.set_send_stream_update(
+ FlowControlAction::Urgency::UPDATE_IMMEDIATELY);
+ } else if (local_window_delta_ > announced_window_delta_) {
+ action.set_send_stream_update(FlowControlAction::Urgency::QUEUE_UPDATE);
}
}
- uint32_t target_announced_window = grpc_chttp2_target_announced_window(tfc);
- if (tfc->announced_window < target_announced_window / 2) {
- action.send_transport_update = GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY;
- }
- TRACEACTION(tfc, action);
+
return action;
}
+
+} // namespace chttp2
+} // namespace grpc_core
diff --git a/src/core/ext/transport/chttp2/transport/flow_control.h b/src/core/ext/transport/chttp2/transport/flow_control.h
new file mode 100644
index 0000000..d5107d4
--- /dev/null
+++ b/src/core/ext/transport/chttp2/transport/flow_control.h
@@ -0,0 +1,328 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H
+#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H
+
+#include <stdint.h>
+
+#include <grpc/support/useful.h>
+#include "src/core/ext/transport/chttp2/transport/http2_settings.h"
+#include "src/core/lib/support/manual_constructor.h"
+#include "src/core/lib/transport/bdp_estimator.h"
+#include "src/core/lib/transport/pid_controller.h"
+
+struct grpc_chttp2_transport;
+struct grpc_chttp2_stream;
+
+extern "C" grpc_tracer_flag grpc_flowctl_trace;
+
+namespace grpc_core {
+namespace chttp2 {
+
+static constexpr uint32_t kDefaultWindow = 65535;
+
+class TransportFlowControl;
+class StreamFlowControl;
+
+class FlowControlAction {
+ public:
+ enum class Urgency : uint8_t {
+ // Nothing to be done.
+ NO_ACTION_NEEDED = 0,
+ // Initiate a write to update the initial window immediately.
+ UPDATE_IMMEDIATELY,
+ // Push the flow control update into a send buffer, to be sent
+ // out the next time a write is initiated.
+ QUEUE_UPDATE,
+ };
+
+ Urgency send_stream_update() const { return send_stream_update_; }
+ Urgency send_transport_update() const { return send_transport_update_; }
+ Urgency send_initial_window_update() const {
+ return send_initial_window_update_;
+ }
+ Urgency send_max_frame_size_update() const {
+ return send_max_frame_size_update_;
+ }
+ uint32_t initial_window_size() const { return initial_window_size_; }
+ uint32_t max_frame_size() const { return max_frame_size_; }
+
+ FlowControlAction& set_send_stream_update(Urgency u) {
+ send_stream_update_ = u;
+ return *this;
+ }
+ FlowControlAction& set_send_transport_update(Urgency u) {
+ send_transport_update_ = u;
+ return *this;
+ }
+ FlowControlAction& set_send_initial_window_update(Urgency u,
+ uint32_t update) {
+ send_initial_window_update_ = u;
+ initial_window_size_ = update;
+ return *this;
+ }
+ FlowControlAction& set_send_max_frame_size_update(Urgency u,
+ uint32_t update) {
+ send_max_frame_size_update_ = u;
+ max_frame_size_ = update;
+ return *this;
+ }
+
+ static const char* UrgencyString(Urgency u);
+ void Trace(grpc_chttp2_transport* t) const;
+
+ private:
+ Urgency send_stream_update_ = Urgency::NO_ACTION_NEEDED;
+ Urgency send_transport_update_ = Urgency::NO_ACTION_NEEDED;
+ Urgency send_initial_window_update_ = Urgency::NO_ACTION_NEEDED;
+ Urgency send_max_frame_size_update_ = Urgency::NO_ACTION_NEEDED;
+ uint32_t initial_window_size_ = 0;
+ uint32_t max_frame_size_ = 0;
+};
+
+class FlowControlTrace {
+ public:
+ FlowControlTrace(const char* reason, TransportFlowControl* tfc,
+ StreamFlowControl* sfc) {
+ if (enabled_) Init(reason, tfc, sfc);
+ }
+
+ ~FlowControlTrace() {
+ if (enabled_) Finish();
+ }
+
+ private:
+ void Init(const char* reason, TransportFlowControl* tfc,
+ StreamFlowControl* sfc);
+ void Finish();
+
+ const bool enabled_ = GRPC_TRACER_ON(grpc_flowctl_trace);
+
+ TransportFlowControl* tfc_;
+ StreamFlowControl* sfc_;
+ const char* reason_;
+ int64_t remote_window_;
+ int64_t target_window_;
+ int64_t announced_window_;
+ int64_t remote_window_delta_;
+ int64_t local_window_delta_;
+ int64_t announced_window_delta_;
+};
+
+class TransportFlowControl {
+ public:
+ TransportFlowControl(grpc_exec_ctx* exec_ctx, const grpc_chttp2_transport* t,
+ bool enable_bdp_probe);
+ ~TransportFlowControl() {}
+
+ bool bdp_probe() const { return enable_bdp_probe_; }
+
+ // returns an announce if we should send a transport update to our peer,
+ // else returns zero; writing_anyway indicates if a write would happen
+ // regardless of the send - if it is false and this function returns non-zero,
+ // this announce will cause a write to occur
+ uint32_t MaybeSendUpdate(bool writing_anyway);
+
+ // Reads the flow control data and returns and actionable struct that will
+ // tell chttp2 exactly what it needs to do
+ FlowControlAction MakeAction() { return UpdateAction(FlowControlAction()); }
+
+ // Call periodically (at a low-ish rate, 100ms - 10s makes sense)
+ // to perform more complex flow control calculations and return an action
+ // to let chttp2 change its parameters
+ FlowControlAction PeriodicUpdate(grpc_exec_ctx* exec_ctx);
+
+ void StreamSentData(int64_t size) { remote_window_ -= size; }
+
+ grpc_error* ValidateRecvData(int64_t incoming_frame_size);
+ void CommitRecvData(int64_t incoming_frame_size) {
+ announced_window_ -= incoming_frame_size;
+ }
+
+ grpc_error* RecvData(int64_t incoming_frame_size) {
+ FlowControlTrace trace(" data recv", this, nullptr);
+ grpc_error* error = ValidateRecvData(incoming_frame_size);
+ if (error != GRPC_ERROR_NONE) return error;
+ CommitRecvData(incoming_frame_size);
+ return GRPC_ERROR_NONE;
+ }
+
+ // we have received a WINDOW_UPDATE frame for a transport
+ void RecvUpdate(uint32_t size) {
+ FlowControlTrace trace("t updt recv", this, nullptr);
+ remote_window_ += size;
+ }
+
+ int64_t remote_window() const { return remote_window_; }
+ int64_t target_window() const {
+ return (uint32_t)GPR_MIN((int64_t)((1u << 31) - 1),
+ announced_stream_total_over_incoming_window_ +
+ target_initial_window_size_);
+ }
+ int64_t announced_window() const { return announced_window_; }
+
+ const grpc_chttp2_transport* transport() const { return t_; }
+
+ void PreUpdateAnnouncedWindowOverIncomingWindow(int64_t delta) {
+ if (delta > 0) {
+ announced_stream_total_over_incoming_window_ -= delta;
+ } else {
+ announced_stream_total_under_incoming_window_ += -delta;
+ }
+ }
+
+ void PostUpdateAnnouncedWindowOverIncomingWindow(int64_t delta) {
+ if (delta > 0) {
+ announced_stream_total_over_incoming_window_ += delta;
+ } else {
+ announced_stream_total_under_incoming_window_ -= -delta;
+ }
+ }
+
+ BdpEstimator* bdp_estimator() { return &bdp_estimator_; }
+
+ void TestOnlyForceHugeWindow() {
+ announced_window_ = 1024 * 1024 * 1024;
+ remote_window_ = 1024 * 1024 * 1024;
+ }
+
+ private:
+ double TargetLogBdp();
+ double SmoothLogBdp(grpc_exec_ctx* exec_ctx, double value);
+ FlowControlAction::Urgency DeltaUrgency(int32_t value,
+ grpc_chttp2_setting_id setting_id);
+
+ FlowControlAction UpdateAction(FlowControlAction action) {
+ if (announced_window_ < target_window() / 2) {
+ action.set_send_transport_update(
+ FlowControlAction::Urgency::UPDATE_IMMEDIATELY);
+ }
+ return action;
+ }
+
+ const grpc_chttp2_transport* const t_;
+
+ /** Our bookkeeping for the remote peer's available window */
+ int64_t remote_window_ = kDefaultWindow;
+
+ /** calculating what we should give for local window:
+ we track the total amount of flow control over initial window size
+ across all streams: this is data that we want to receive right now (it
+ has an outstanding read)
+ and the total amount of flow control under initial window size across all
+ streams: this is data we've read early
+ we want to adjust incoming_window such that:
+ incoming_window = total_over - max(bdp - total_under, 0) */
+ int64_t announced_stream_total_over_incoming_window_ = 0;
+ int64_t announced_stream_total_under_incoming_window_ = 0;
+
+ /** This is out window according to what we have sent to our remote peer. The
+ * difference between this and target window is what we use to decide when
+ * to send WINDOW_UPDATE frames. */
+ int64_t announced_window_ = kDefaultWindow;
+
+ int32_t target_initial_window_size_ = kDefaultWindow;
+
+ /** should we probe bdp? */
+ const bool enable_bdp_probe_;
+
+ /* bdp estimation */
+ grpc_core::BdpEstimator bdp_estimator_;
+
+ /* pid controller */
+ grpc_core::PidController pid_controller_;
+ grpc_millis last_pid_update_ = 0;
+};
+
+class StreamFlowControl {
+ public:
+ StreamFlowControl(TransportFlowControl* tfc, const grpc_chttp2_stream* s);
+ ~StreamFlowControl() {
+ tfc_->PreUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_);
+ }
+
+ FlowControlAction UpdateAction(FlowControlAction action);
+ FlowControlAction MakeAction() { return UpdateAction(tfc_->MakeAction()); }
+
+ // we have sent data on the wire, we must track this in our bookkeeping for
+ // the remote peer's flow control.
+ void SentData(int64_t outgoing_frame_size) {
+ FlowControlTrace tracer(" data sent", tfc_, this);
+ tfc_->StreamSentData(outgoing_frame_size);
+ remote_window_delta_ -= outgoing_frame_size;
+ }
+
+ // we have received data from the wire
+ grpc_error* RecvData(int64_t incoming_frame_size);
+
+ // returns an announce if we should send a stream update to our peer, else
+ // returns zero
+ uint32_t MaybeSendUpdate();
+
+ // we have received a WINDOW_UPDATE frame for a stream
+ void RecvUpdate(uint32_t size) {
+ FlowControlTrace trace("s updt recv", tfc_, this);
+ remote_window_delta_ += size;
+ }
+
+ // the application is asking for a certain amount of bytes
+ void IncomingByteStreamUpdate(size_t max_size_hint, size_t have_already);
+
+ int64_t remote_window_delta() const { return remote_window_delta_; }
+ int64_t local_window_delta() const { return local_window_delta_; }
+ int64_t announced_window_delta() const { return announced_window_delta_; }
+
+ const grpc_chttp2_stream* stream() const { return s_; }
+
+ void TestOnlyForceHugeWindow() {
+ announced_window_delta_ = 1024 * 1024 * 1024;
+ local_window_delta_ = 1024 * 1024 * 1024;
+ remote_window_delta_ = 1024 * 1024 * 1024;
+ }
+
+ private:
+ TransportFlowControl* const tfc_;
+ const grpc_chttp2_stream* const s_;
+
+ void UpdateAnnouncedWindowDelta(TransportFlowControl* tfc, int64_t change) {
+ tfc->PreUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_);
+ announced_window_delta_ += change;
+ tfc->PostUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_);
+ }
+
+ /** window available for us to send to peer, over or under the initial
+ * window
+ * size of the transport... ie:
+ * remote_window = remote_window_delta + transport.initial_window_size */
+ int64_t remote_window_delta_ = 0;
+
+ /** window available for peer to send to us (as a delta on
+ * transport.initial_window_size)
+ * local_window = local_window_delta + transport.initial_window_size */
+ int64_t local_window_delta_ = 0;
+
+ /** window available for peer to send to us over this stream that we have
+ * announced to the peer */
+ int64_t announced_window_delta_ = 0;
+};
+
+} // namespace chttp2
+} // namespace grpc_core
+
+#endif
diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.cc b/src/core/ext/transport/chttp2/transport/frame_settings.cc
index 2995bf7..db0245b 100644
--- a/src/core/ext/transport/chttp2/transport/frame_settings.cc
+++ b/src/core/ext/transport/chttp2/transport/frame_settings.cc
@@ -202,13 +202,13 @@
}
if (id == GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE &&
parser->incoming_settings[id] != parser->value) {
- t->flow_control.initial_window_update +=
+ t->initial_window_update +=
(int64_t)parser->value - parser->incoming_settings[id];
if (GRPC_TRACER_ON(grpc_http_trace) ||
GRPC_TRACER_ON(grpc_flowctl_trace)) {
gpr_log(GPR_DEBUG, "%p[%s] adding %d for initial_window change",
t, t->is_client ? "cli" : "svr",
- (int)t->flow_control.initial_window_update);
+ (int)t->initial_window_update);
}
}
parser->incoming_settings[id] = parser->value;
diff --git a/src/core/ext/transport/chttp2/transport/frame_window_update.cc b/src/core/ext/transport/chttp2/transport/frame_window_update.cc
index c9ab8d1..15eaf59 100644
--- a/src/core/ext/transport/chttp2/transport/frame_window_update.cc
+++ b/src/core/ext/transport/chttp2/transport/frame_window_update.cc
@@ -96,8 +96,7 @@
if (t->incoming_stream_id != 0) {
if (s != NULL) {
- grpc_chttp2_flowctl_recv_stream_update(
- &t->flow_control, &s->flow_control, received_update);
+ s->flow_control->RecvUpdate(received_update);
if (grpc_chttp2_list_remove_stalled_by_stream(t, s)) {
grpc_chttp2_mark_stream_writable(exec_ctx, t, s);
grpc_chttp2_initiate_write(
@@ -106,10 +105,9 @@
}
}
} else {
- bool was_zero = t->flow_control.remote_window <= 0;
- grpc_chttp2_flowctl_recv_transport_update(&t->flow_control,
- received_update);
- bool is_zero = t->flow_control.remote_window <= 0;
+ bool was_zero = t->flow_control->remote_window() <= 0;
+ t->flow_control->RecvUpdate(received_update);
+ bool is_zero = t->flow_control->remote_window() <= 0;
if (was_zero && !is_zero) {
grpc_chttp2_initiate_write(
exec_ctx, t,
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 703f3ba..9e0796e 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -22,6 +22,7 @@
#include <assert.h>
#include <stdbool.h>
+#include "src/core/ext/transport/chttp2/transport/flow_control.h"
#include "src/core/ext/transport/chttp2/transport/frame.h"
#include "src/core/ext/transport/chttp2/transport/frame_data.h"
#include "src/core/ext/transport/chttp2/transport/frame_goaway.h"
@@ -38,9 +39,7 @@
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/support/manual_constructor.h"
-#include "src/core/lib/transport/bdp_estimator.h"
#include "src/core/lib/transport/connectivity_state.h"
-#include "src/core/lib/transport/pid_controller.h"
#include "src/core/lib/transport/transport_impl.h"
#ifdef __cplusplus
@@ -238,48 +237,6 @@
GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED,
} grpc_chttp2_keepalive_state;
-typedef struct {
- /** initial window change. This is tracked as we parse settings frames from
- * the remote peer. If there is a positive delta, then we will make all
- * streams readable since they may have become unstalled */
- int64_t initial_window_update;
-
- /** Our bookkeeping for the remote peer's available window */
- int64_t remote_window;
-
- /** calculating what we should give for local window:
- we track the total amount of flow control over initial window size
- across all streams: this is data that we want to receive right now (it
- has an outstanding read)
- and the total amount of flow control under initial window size across all
- streams: this is data we've read early
- we want to adjust incoming_window such that:
- incoming_window = total_over - max(bdp - total_under, 0) */
- int64_t announced_stream_total_over_incoming_window;
- int64_t announced_stream_total_under_incoming_window;
-
- /** This is out window according to what we have sent to our remote peer. The
- * difference between this and target window is what we use to decide when
- * to send WINDOW_UPDATE frames. */
- int64_t announced_window;
-
- int32_t target_initial_window_size;
-
- /** should we probe bdp? */
- bool enable_bdp_probe;
-
- /* bdp estimation */
- grpc_core::ManualConstructor<grpc_core::BdpEstimator> bdp_estimator;
-
- /* pid controller */
- bool pid_controller_initialized;
- grpc_pid_controller pid_controller;
- grpc_millis last_pid_update;
-
- // pointer back to transport for tracing
- const grpc_chttp2_transport *t;
-} grpc_chttp2_transport_flowctl;
-
struct grpc_chttp2_transport {
grpc_transport base; /* must be first */
gpr_refcount refs;
@@ -395,7 +352,12 @@
/** parser for goaway frames */
grpc_chttp2_goaway_parser goaway_parser;
- grpc_chttp2_transport_flowctl flow_control;
+ grpc_core::ManualConstructor<grpc_core::chttp2::TransportFlowControl>
+ flow_control;
+ /** initial window change. This is tracked as we parse settings frames from
+ * the remote peer. If there is a positive delta, then we will make all
+ * streams readable since they may have become unstalled */
+ int64_t initial_window_update = 0;
/* deframing */
grpc_chttp2_deframe_transport_state deframe_state;
@@ -477,25 +439,6 @@
GPRC_METADATA_PUBLISHED_AT_CLOSE
} grpc_published_metadata_method;
-typedef struct {
- /** window available for us to send to peer, over or under the initial window
- * size of the transport... ie:
- * remote_window = remote_window_delta + transport.initial_window_size */
- int64_t remote_window_delta;
-
- /** window available for peer to send to us (as a delta on
- * transport.initial_window_size)
- * local_window = local_window_delta + transport.initial_window_size */
- int64_t local_window_delta;
-
- /** window available for peer to send to us over this stream that we have
- * announced to the peer */
- int64_t announced_window_delta;
-
- // read only pointer back to stream for data
- const grpc_chttp2_stream *s;
-} grpc_chttp2_stream_flowctl;
-
struct grpc_chttp2_stream {
grpc_chttp2_transport *t;
grpc_stream_refcount *refcount;
@@ -589,7 +532,8 @@
bool sent_initial_metadata;
bool sent_trailing_metadata;
- grpc_chttp2_stream_flowctl flow_control;
+ grpc_core::ManualConstructor<grpc_core::chttp2::StreamFlowControl>
+ flow_control;
grpc_slice_buffer flow_controlled_buffer;
@@ -700,73 +644,10 @@
/********* Flow Control ***************/
-// we have sent data on the wire
-void grpc_chttp2_flowctl_sent_data(grpc_chttp2_transport_flowctl *tfc,
- grpc_chttp2_stream_flowctl *sfc,
- int64_t size);
-
-// we have received data from the wire
-grpc_error *grpc_chttp2_flowctl_recv_data(grpc_chttp2_transport_flowctl *tfc,
- grpc_chttp2_stream_flowctl *sfc,
- int64_t incoming_frame_size);
-
-// returns an announce if we should send a transport update to our peer,
-// else returns zero
-uint32_t grpc_chttp2_flowctl_maybe_send_transport_update(
- grpc_chttp2_transport_flowctl *tfc, bool writing_anyway);
-
-// returns an announce if we should send a stream update to our peer, else
-// returns zero
-uint32_t grpc_chttp2_flowctl_maybe_send_stream_update(
- grpc_chttp2_transport_flowctl *tfc, grpc_chttp2_stream_flowctl *sfc);
-
-// we have received a WINDOW_UPDATE frame for a transport
-void grpc_chttp2_flowctl_recv_transport_update(
- grpc_chttp2_transport_flowctl *tfc, uint32_t size);
-
-// we have received a WINDOW_UPDATE frame for a stream
-void grpc_chttp2_flowctl_recv_stream_update(grpc_chttp2_transport_flowctl *tfc,
- grpc_chttp2_stream_flowctl *sfc,
- uint32_t size);
-
-// the application is asking for a certain amount of bytes
-void grpc_chttp2_flowctl_incoming_bs_update(grpc_chttp2_transport_flowctl *tfc,
- grpc_chttp2_stream_flowctl *sfc,
- size_t max_size_hint,
- size_t have_already);
-
-void grpc_chttp2_flowctl_destroy_stream(grpc_chttp2_transport_flowctl *tfc,
- grpc_chttp2_stream_flowctl *sfc);
-
-typedef enum {
- // Nothing to be done.
- GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED = 0,
- // Initiate a write to update the initial window immediately.
- GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY,
- // Push the flow control update into a send buffer, to be sent
- // out the next time a write is initiated.
- GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE,
-} grpc_chttp2_flowctl_urgency;
-
-typedef struct {
- grpc_chttp2_flowctl_urgency send_stream_update;
- grpc_chttp2_flowctl_urgency send_transport_update;
- grpc_chttp2_flowctl_urgency send_setting_update;
- uint32_t initial_window_size;
- uint32_t max_frame_size;
-} grpc_chttp2_flowctl_action;
-
-// Reads the flow control data and returns and actionable struct that will tell
-// chttp2 exactly what it needs to do
-grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action(
- grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_flowctl *tfc,
- grpc_chttp2_stream_flowctl *sfc);
-
// Takes in a flow control action and performs all the needed operations.
-void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_flowctl_action action,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s);
+void grpc_chttp2_act_on_flowctl_action(
+ grpc_exec_ctx *exec_ctx, const grpc_core::chttp2::FlowControlAction &action,
+ grpc_chttp2_transport *t, grpc_chttp2_stream *s);
/********* End of Flow Control ***************/
@@ -800,16 +681,6 @@
extern grpc_tracer_flag grpc_http_trace;
extern grpc_tracer_flag grpc_flowctl_trace;
-#ifndef NDEBUG
-#define GRPC_FLOW_CONTROL_IF_TRACING(stmt) \
- if (!(GRPC_TRACER_ON(grpc_flowctl_trace))) \
- ; \
- else \
- stmt
-#else
-#define GRPC_FLOW_CONTROL_IF_TRACING(stmt)
-#endif
-
#define GRPC_CHTTP2_IF_TRACING(stmt) \
if (!(GRPC_TRACER_ON(grpc_http_trace))) \
; \
diff --git a/src/core/ext/transport/chttp2/transport/parsing.cc b/src/core/ext/transport/chttp2/transport/parsing.cc
index 78886b4..ace71f8 100644
--- a/src/core/ext/transport/chttp2/transport/parsing.cc
+++ b/src/core/ext/transport/chttp2/transport/parsing.cc
@@ -355,14 +355,15 @@
grpc_chttp2_stream *s =
grpc_chttp2_parsing_lookup_stream(t, t->incoming_stream_id);
grpc_error *err = GRPC_ERROR_NONE;
- err = grpc_chttp2_flowctl_recv_data(&t->flow_control,
- s == NULL ? NULL : &s->flow_control,
- t->incoming_frame_size);
- grpc_chttp2_act_on_flowctl_action(
- exec_ctx,
- grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control,
- s == NULL ? NULL : &s->flow_control),
- t, s);
+ grpc_core::chttp2::FlowControlAction action;
+ if (s == nullptr) {
+ err = t->flow_control->RecvData(t->incoming_frame_size);
+ action = t->flow_control->MakeAction();
+ } else {
+ err = s->flow_control->RecvData(t->incoming_frame_size);
+ action = s->flow_control->MakeAction();
+ }
+ grpc_chttp2_act_on_flowctl_action(exec_ctx, action, t, s);
if (err != GRPC_ERROR_NONE) {
goto error_handler;
}
diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc
index c6fecf2..ff76a5f 100644
--- a/src/core/ext/transport/chttp2/transport/writing.cc
+++ b/src/core/ext/transport/chttp2/transport/writing.cc
@@ -146,13 +146,13 @@
s->flow_controlled_bytes_flowed,
t->settings[GRPC_ACKED_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
- t->flow_control.remote_window,
+ t->flow_control->remote_window(),
(uint32_t)GPR_MAX(
0,
- s->flow_control.remote_window_delta +
+ s->flow_control->remote_window_delta() +
(int64_t)t->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]),
- s->flow_control.remote_window_delta);
+ s->flow_control->remote_window_delta());
}
static bool stream_ref_if_not_destroyed(gpr_refcount *r) {
@@ -216,8 +216,7 @@
void FlushWindowUpdates(grpc_exec_ctx *exec_ctx) {
uint32_t transport_announce =
- grpc_chttp2_flowctl_maybe_send_transport_update(&t_->flow_control,
- t_->outbuf.count > 0);
+ t_->flow_control->MaybeSendUpdate(t_->outbuf.count > 0);
if (transport_announce) {
grpc_transport_one_way_stats throwaway_stats;
grpc_slice_buffer_add(
@@ -312,7 +311,7 @@
uint32_t stream_remote_window() const {
return (uint32_t)GPR_MAX(
- 0, s_->flow_control.remote_window_delta +
+ 0, s_->flow_control->remote_window_delta() +
(int64_t)t_->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]);
}
@@ -320,7 +319,7 @@
uint32_t max_outgoing() const {
return (uint32_t)GPR_MIN(
t_->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
- GPR_MIN(stream_remote_window(), t_->flow_control.remote_window));
+ GPR_MIN(stream_remote_window(), t_->flow_control->remote_window()));
}
bool AnyOutgoing() const { return max_outgoing() != 0; }
@@ -352,8 +351,7 @@
grpc_metadata_batch_is_empty(s_->send_trailing_metadata);
grpc_chttp2_encode_data(s_->id, &s_->compressed_data_buffer, send_bytes,
is_last_frame_, &s_->stats.outgoing, &t_->outbuf);
- grpc_chttp2_flowctl_sent_data(&t_->flow_control, &s_->flow_control,
- send_bytes);
+ s_->flow_control->SentData(send_bytes);
if (s_->compressed_data_buffer.length == 0) {
s_->sending_bytes += s_->uncompressed_data_size;
}
@@ -400,8 +398,8 @@
gpr_log(GPR_DEBUG, "W:%p %s[%d] im-(sent,send)=(%d,%d) announce=%d", t_,
t_->is_client ? "CLIENT" : "SERVER", s->id,
s->sent_initial_metadata, s->send_initial_metadata != NULL,
- (int)(s->flow_control.local_window_delta -
- s->flow_control.announced_window_delta)));
+ (int)(s->flow_control->local_window_delta() -
+ s->flow_control->announced_window_delta())));
}
void FlushInitialMetadata(grpc_exec_ctx *exec_ctx) {
@@ -447,8 +445,7 @@
void FlushWindowUpdates(grpc_exec_ctx *exec_ctx) {
/* send any window updates */
- uint32_t stream_announce = grpc_chttp2_flowctl_maybe_send_stream_update(
- &t_->flow_control, &s_->flow_control);
+ const uint32_t stream_announce = s_->flow_control->MaybeSendUpdate();
if (stream_announce == 0) return;
grpc_slice_buffer_add(
@@ -469,10 +466,10 @@
DataSendContext data_send_context(write_context_, t_, s_);
if (!data_send_context.AnyOutgoing()) {
- if (t_->flow_control.remote_window == 0) {
+ if (t_->flow_control->remote_window() <= 0) {
report_stall(t_, s_, "transport");
grpc_chttp2_list_add_stalled_by_transport(t_, s_);
- } else if (data_send_context.stream_remote_window() == 0) {
+ } else if (data_send_context.stream_remote_window() <= 0) {
report_stall(t_, s_, "stream");
grpc_chttp2_list_add_stalled_by_stream(t_, s_);
}
@@ -588,7 +585,7 @@
ctx.FlushQueuedBuffers(exec_ctx);
ctx.EnactHpackSettings(exec_ctx);
- if (t->flow_control.remote_window > 0) {
+ if (t->flow_control->remote_window() > 0) {
ctx.UpdateStreamsNoLongerStalled();
}
diff --git a/src/core/lib/transport/bdp_estimator.h b/src/core/lib/transport/bdp_estimator.h
index 470c127..750da39 100644
--- a/src/core/lib/transport/bdp_estimator.h
+++ b/src/core/lib/transport/bdp_estimator.h
@@ -40,15 +40,8 @@
explicit BdpEstimator(const char *name);
~BdpEstimator() {}
- // Returns true if a reasonable estimate could be obtained
- bool EstimateBdp(int64_t *estimate_out) const {
- *estimate_out = estimate_;
- return true;
- }
- bool EstimateBandwidth(double *bw_out) const {
- *bw_out = bw_est_;
- return true;
- }
+ int64_t EstimateBdp() const { return estimate_; }
+ double EstimateBandwidth() const { return bw_est_; }
void AddIncomingBytes(int64_t num_bytes) { accumulator_ += num_bytes; }
diff --git a/src/core/lib/transport/pid_controller.cc b/src/core/lib/transport/pid_controller.cc
index 4b304f1..9f7750d 100644
--- a/src/core/lib/transport/pid_controller.cc
+++ b/src/core/lib/transport/pid_controller.cc
@@ -19,45 +19,30 @@
#include "src/core/lib/transport/pid_controller.h"
#include <grpc/support/useful.h>
-void grpc_pid_controller_init(grpc_pid_controller *pid_controller,
- grpc_pid_controller_args args) {
- pid_controller->args = args;
- pid_controller->last_control_value = args.initial_control_value;
- grpc_pid_controller_reset(pid_controller);
-}
+namespace grpc_core {
-void grpc_pid_controller_reset(grpc_pid_controller *pid_controller) {
- pid_controller->last_error = 0.0;
- pid_controller->last_dc_dt = 0.0;
- pid_controller->error_integral = 0.0;
-}
+PidController::PidController(const Args &args)
+ : last_control_value_(args.initial_control_value()), args_(args) {}
-double grpc_pid_controller_update(grpc_pid_controller *pid_controller,
- double error, double dt) {
- if (dt == 0) return pid_controller->last_control_value;
+double PidController::Update(double error, double dt) {
+ if (dt <= 0) return last_control_value_;
/* integrate error using the trapezoid rule */
- pid_controller->error_integral +=
- dt * (pid_controller->last_error + error) * 0.5;
- pid_controller->error_integral = GPR_CLAMP(
- pid_controller->error_integral, -pid_controller->args.integral_range,
- pid_controller->args.integral_range);
- double diff_error = (error - pid_controller->last_error) / dt;
+ error_integral_ += dt * (last_error_ + error) * 0.5;
+ error_integral_ = GPR_CLAMP(error_integral_, -args_.integral_range(),
+ args_.integral_range());
+ double diff_error = (error - last_error_) / dt;
/* calculate derivative of control value vs time */
- double dc_dt = pid_controller->args.gain_p * error +
- pid_controller->args.gain_i * pid_controller->error_integral +
- pid_controller->args.gain_d * diff_error;
+ double dc_dt = args_.gain_p() * error + args_.gain_i() * error_integral_ +
+ args_.gain_d() * diff_error;
/* and perform trapezoidal integration */
- double new_control_value = pid_controller->last_control_value +
- dt * (pid_controller->last_dc_dt + dc_dt) * 0.5;
- new_control_value =
- GPR_CLAMP(new_control_value, pid_controller->args.min_control_value,
- pid_controller->args.max_control_value);
- pid_controller->last_error = error;
- pid_controller->last_dc_dt = dc_dt;
- pid_controller->last_control_value = new_control_value;
+ double new_control_value =
+ last_control_value_ + dt * (last_dc_dt_ + dc_dt) * 0.5;
+ new_control_value = GPR_CLAMP(new_control_value, args_.min_control_value(),
+ args_.max_control_value());
+ last_error_ = error;
+ last_dc_dt_ = dc_dt;
+ last_control_value_ = new_control_value;
return new_control_value;
}
-double grpc_pid_controller_last(grpc_pid_controller *pid_controller) {
- return pid_controller->last_control_value;
-}
+} // namespace grpc_core
diff --git a/src/core/lib/transport/pid_controller.h b/src/core/lib/transport/pid_controller.h
index 80899e9..87e59a1 100644
--- a/src/core/lib/transport/pid_controller.h
+++ b/src/core/lib/transport/pid_controller.h
@@ -19,9 +19,7 @@
#ifndef GRPC_CORE_LIB_TRANSPORT_PID_CONTROLLER_H
#define GRPC_CORE_LIB_TRANSPORT_PID_CONTROLLER_H
-#ifdef __cplusplus
-extern "C" {
-#endif
+#include <limits>
/* \file Simple PID controller.
Implements a proportional-integral-derivative controller.
@@ -30,41 +28,87 @@
Gains can be set to adjust sensitivity to current error (p), the integral
of error (i), and the derivative of error (d). */
-typedef struct {
- double gain_p;
- double gain_i;
- double gain_d;
- double initial_control_value;
- double min_control_value;
- double max_control_value;
- double integral_range;
-} grpc_pid_controller_args;
+namespace grpc_core {
-typedef struct {
- double last_error;
- double error_integral;
- double last_control_value;
- double last_dc_dt;
- grpc_pid_controller_args args;
-} grpc_pid_controller;
+class PidController {
+ public:
+ class Args {
+ public:
+ double gain_p() const { return gain_p_; }
+ double gain_i() const { return gain_i_; }
+ double gain_d() const { return gain_d_; }
+ double initial_control_value() const { return initial_control_value_; }
+ double min_control_value() const { return min_control_value_; }
+ double max_control_value() const { return max_control_value_; }
+ double integral_range() const { return integral_range_; }
-/** Initialize the controller */
-void grpc_pid_controller_init(grpc_pid_controller *pid_controller,
- grpc_pid_controller_args args);
+ Args& set_gain_p(double gain_p) {
+ gain_p_ = gain_p;
+ return *this;
+ }
+ Args& set_gain_i(double gain_i) {
+ gain_i_ = gain_i;
+ return *this;
+ }
+ Args& set_gain_d(double gain_d) {
+ gain_d_ = gain_d;
+ return *this;
+ }
+ Args& set_initial_control_value(double initial_control_value) {
+ initial_control_value_ = initial_control_value;
+ return *this;
+ }
+ Args& set_min_control_value(double min_control_value) {
+ min_control_value_ = min_control_value;
+ return *this;
+ }
+ Args& set_max_control_value(double max_control_value) {
+ max_control_value_ = max_control_value;
+ return *this;
+ }
+ Args& set_integral_range(double integral_range) {
+ integral_range_ = integral_range;
+ return *this;
+ }
-/** Reset the controller: useful when things have changed significantly */
-void grpc_pid_controller_reset(grpc_pid_controller *pid_controller);
+ private:
+ double gain_p_ = 0.0;
+ double gain_i_ = 0.0;
+ double gain_d_ = 0.0;
+ double initial_control_value_ = 0.0;
+ double min_control_value_ = std::numeric_limits<double>::min();
+ double max_control_value_ = std::numeric_limits<double>::max();
+ double integral_range_ = std::numeric_limits<double>::max();
+ };
-/** Update the controller: given a current error estimate, and the time since
- the last update, returns a new control value */
-double grpc_pid_controller_update(grpc_pid_controller *pid_controller,
- double error, double dt);
+ explicit PidController(const Args& args);
-/** Returns the last control value calculated */
-double grpc_pid_controller_last(grpc_pid_controller *pid_controller);
+ /// Reset the controller internal state: useful when the environment has
+ /// changed significantly
+ void Reset() {
+ last_error_ = 0.0;
+ last_dc_dt_ = 0.0;
+ error_integral_ = 0.0;
+ }
-#ifdef __cplusplus
-}
-#endif
+ /// Update the controller: given a current error estimate, and the time since
+ /// the last update, returns a new control value
+ double Update(double error, double dt);
+
+ /// Returns the last control value calculated
+ double last_control_value() const { return last_control_value_; }
+
+ /// Returns the current error integral (mostly for testing)
+ double error_integral() const { return error_integral_; }
+
+ private:
+ double last_error_ = 0.0;
+ double error_integral_ = 0.0;
+ double last_control_value_;
+ double last_dc_dt_ = 0.0;
+ const Args args_;
+};
+
+} // namespace grpc_core
#endif /* GRPC_CORE_LIB_TRANSPORT_PID_CONTROLLER_H */
diff --git a/src/php/ext/grpc/server.c b/src/php/ext/grpc/server.c
index e460377..a65d233 100644
--- a/src/php/ext/grpc/server.c
+++ b/src/php/ext/grpc/server.c
@@ -169,7 +169,7 @@
/**
* Add a http2 over tcp listener.
* @param string $addr The address to add
- * @return bool True on success, false on failure
+ * @return int Port on success, 0 on failure
*/
PHP_METHOD(Server, addHttp2Port) {
const char *addr;
@@ -190,7 +190,7 @@
* Add a secure http2 over tcp listener.
* @param string $addr The address to add
* @param ServerCredentials The ServerCredentials object
- * @return bool True on success, false on failure
+ * @return int Port on success, 0 on failure
*/
PHP_METHOD(Server, addSecureHttp2Port) {
const char *addr;
diff --git a/test/core/bad_client/gen_build_yaml.py b/test/core/bad_client/gen_build_yaml.py
index dbd5277..61cf1f7 100755
--- a/test/core/bad_client/gen_build_yaml.py
+++ b/test/core/bad_client/gen_build_yaml.py
@@ -30,7 +30,7 @@
'headers': default_test_options._replace(cpu_cost=0.2),
'initial_settings_frame': default_test_options._replace(cpu_cost=0.2),
'head_of_line_blocking': default_test_options,
- 'large_metadata': default_test_options,
+ # 'large_metadata': default_test_options, #disabling as per issue #11745
'server_registered_method': default_test_options,
'simple_request': default_test_options,
'window_overflow': default_test_options,
diff --git a/test/core/bad_client/generate_tests.bzl b/test/core/bad_client/generate_tests.bzl
index 1aeb81c..58b48d6 100755
--- a/test/core/bad_client/generate_tests.bzl
+++ b/test/core/bad_client/generate_tests.bzl
@@ -28,7 +28,7 @@
'headers': test_options(),
'initial_settings_frame': test_options(),
'head_of_line_blocking': test_options(),
- 'large_metadata': test_options(),
+ # 'large_metadata': test_options(), # disabling as per issue #11745
'server_registered_method': test_options(),
'simple_request': test_options(),
'window_overflow': test_options(),
diff --git a/test/core/transport/BUILD b/test/core/transport/BUILD
index ea5e577..edd27b8 100644
--- a/test/core/transport/BUILD
+++ b/test/core/transport/BUILD
@@ -71,14 +71,17 @@
grpc_cc_test(
name = "pid_controller_test",
- srcs = ["pid_controller_test.c"],
- language = "C",
+ srcs = ["pid_controller_test.cc"],
+ language = "C++",
deps = [
"//:gpr",
"//:grpc",
"//test/core/util:gpr_test_util",
"//test/core/util:grpc_test_util",
],
+ external_deps = [
+ "gtest",
+ ],
)
grpc_cc_test(
diff --git a/test/core/transport/bdp_estimator_test.cc b/test/core/transport/bdp_estimator_test.cc
index 7ac3501..2c4fc45 100644
--- a/test/core/transport/bdp_estimator_test.cc
+++ b/test/core/transport/bdp_estimator_test.cc
@@ -51,8 +51,7 @@
TEST(BdpEstimatorTest, EstimateBdpNoSamples) {
BdpEstimator est("test");
- int64_t estimate;
- est.EstimateBdp(&estimate);
+ est.EstimateBdp();
}
namespace {
@@ -80,16 +79,14 @@
TEST(BdpEstimatorTest, GetEstimate1Sample) {
BdpEstimator est("test");
AddSample(&est, 100);
- int64_t estimate;
- est.EstimateBdp(&estimate);
+ est.EstimateBdp();
}
TEST(BdpEstimatorTest, GetEstimate2Samples) {
BdpEstimator est("test");
AddSample(&est, 100);
AddSample(&est, 100);
- int64_t estimate;
- est.EstimateBdp(&estimate);
+ est.EstimateBdp();
}
TEST(BdpEstimatorTest, GetEstimate3Samples) {
@@ -97,17 +94,10 @@
AddSample(&est, 100);
AddSample(&est, 100);
AddSample(&est, 100);
- int64_t estimate;
- est.EstimateBdp(&estimate);
+ est.EstimateBdp();
}
namespace {
-static int64_t GetEstimate(const BdpEstimator &estimator) {
- int64_t out;
- EXPECT_TRUE(estimator.EstimateBdp(&out));
- return out;
-}
-
int64_t NextPow2(int64_t v) {
v--;
v |= v >> 1;
@@ -134,7 +124,7 @@
if (sample > max) max = sample;
AddSample(&est, sample);
if (i >= 3) {
- EXPECT_LE(GetEstimate(est), GPR_MAX(65536, 2 * NextPow2(max)))
+ EXPECT_LE(est.EstimateBdp(), GPR_MAX(65536, 2 * NextPow2(max)))
<< " min:" << min << " max:" << max << " sample:" << sample;
}
}
@@ -143,6 +133,7 @@
INSTANTIATE_TEST_CASE_P(TooManyNames, BdpEstimatorRandomTest,
::testing::Values(3, 4, 6, 9, 13, 19, 28, 42, 63, 94,
141, 211, 316, 474, 711));
+
} // namespace testing
} // namespace grpc_core
diff --git a/test/core/transport/pid_controller_test.c b/test/core/transport/pid_controller_test.c
deleted file mode 100644
index 831c4b4..0000000
--- a/test/core/transport/pid_controller_test.c
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- *
- * Copyright 2016 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "src/core/lib/transport/pid_controller.h"
-
-#include <float.h>
-#include <math.h>
-
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/string_util.h>
-#include <grpc/support/useful.h>
-#include "src/core/lib/support/string.h"
-#include "test/core/util/test_config.h"
-
-static void test_noop(void) {
- gpr_log(GPR_INFO, "test_noop");
- grpc_pid_controller pid;
- grpc_pid_controller_init(
- &pid, (grpc_pid_controller_args){.gain_p = 1,
- .gain_i = 1,
- .gain_d = 1,
- .initial_control_value = 1,
- .min_control_value = DBL_MIN,
- .max_control_value = DBL_MAX,
- .integral_range = DBL_MAX});
-}
-
-static void test_simple_convergence(double gain_p, double gain_i, double gain_d,
- double dt, double set_point, double start) {
- gpr_log(GPR_INFO,
- "test_simple_convergence(p=%lf, i=%lf, d=%lf); dt=%lf set_point=%lf "
- "start=%lf",
- gain_p, gain_i, gain_d, dt, set_point, start);
- grpc_pid_controller pid;
- grpc_pid_controller_init(
- &pid, (grpc_pid_controller_args){.gain_p = gain_p,
- .gain_i = gain_i,
- .gain_d = gain_d,
- .initial_control_value = start,
- .min_control_value = DBL_MIN,
- .max_control_value = DBL_MAX,
- .integral_range = DBL_MAX});
-
- for (int i = 0; i < 100000; i++) {
- grpc_pid_controller_update(&pid, set_point - grpc_pid_controller_last(&pid),
- 1);
- }
-
- GPR_ASSERT(fabs(set_point - grpc_pid_controller_last(&pid)) < 0.1);
- if (gain_i > 0) {
- GPR_ASSERT(fabs(pid.error_integral) < 0.1);
- }
-}
-
-int main(int argc, char **argv) {
- grpc_test_init(argc, argv);
- test_noop();
- test_simple_convergence(0.2, 0, 0, 1, 100, 0);
- test_simple_convergence(0.2, 0.1, 0, 1, 100, 0);
- test_simple_convergence(0.2, 0.1, 0.1, 1, 100, 0);
- return 0;
-}
diff --git a/test/core/transport/pid_controller_test.cc b/test/core/transport/pid_controller_test.cc
new file mode 100644
index 0000000..081d034
--- /dev/null
+++ b/test/core/transport/pid_controller_test.cc
@@ -0,0 +1,91 @@
+/*
+ *
+ * Copyright 2016 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "src/core/lib/transport/pid_controller.h"
+
+#include <float.h>
+#include <math.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/useful.h>
+#include <gtest/gtest.h>
+#include "src/core/lib/support/string.h"
+#include "test/core/util/test_config.h"
+
+namespace grpc_core {
+namespace testing {
+
+TEST(PidController, NoOp) {
+ PidController pid(PidController::Args()
+ .set_gain_p(1)
+ .set_gain_i(1)
+ .set_gain_d(1)
+ .set_initial_control_value(1));
+}
+
+struct SimpleConvergenceTestArgs {
+ double gain_p;
+ double gain_i;
+ double gain_d;
+ double dt;
+ double set_point;
+ double start;
+};
+
+std::ostream& operator<<(std::ostream& out, SimpleConvergenceTestArgs args) {
+ return out << "gain_p:" << args.gain_p << " gain_i:" << args.gain_i
+ << " gain_d:" << args.gain_d << " dt:" << args.dt
+ << " set_point:" << args.set_point << " start:" << args.start;
+}
+
+class SimpleConvergenceTest
+ : public ::testing::TestWithParam<SimpleConvergenceTestArgs> {};
+
+TEST_P(SimpleConvergenceTest, Converges) {
+ PidController pid(PidController::Args()
+ .set_gain_p(GetParam().gain_p)
+ .set_gain_i(GetParam().gain_i)
+ .set_gain_d(GetParam().gain_d)
+ .set_initial_control_value(GetParam().start));
+
+ for (int i = 0; i < 100000; i++) {
+ pid.Update(GetParam().set_point - pid.last_control_value(), GetParam().dt);
+ }
+
+ EXPECT_LT(fabs(GetParam().set_point - pid.last_control_value()), 0.1);
+ if (GetParam().gain_i > 0) {
+ EXPECT_LT(fabs(pid.error_integral()), 0.1);
+ }
+}
+
+INSTANTIATE_TEST_CASE_P(
+ X, SimpleConvergenceTest,
+ ::testing::Values(SimpleConvergenceTestArgs{0.2, 0, 0, 1, 100, 0},
+ SimpleConvergenceTestArgs{0.2, 0.1, 0, 1, 100, 0},
+ SimpleConvergenceTestArgs{0.2, 0.1, 0.1, 1, 100, 0}));
+
+} // namespace testing
+} // namespace grpc_core
+
+int main(int argc, char** argv) {
+ grpc_test_init(argc, argv);
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc
index 8ee3ae7..3a484bb 100644
--- a/test/cpp/microbenchmarks/bm_chttp2_transport.cc
+++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc
@@ -428,9 +428,8 @@
return;
}
// force outgoing window to be yuge
- s->chttp2_stream()->flow_control.remote_window_delta =
- 1024 * 1024 * 1024;
- f.chttp2_transport()->flow_control.remote_window = 1024 * 1024 * 1024;
+ s->chttp2_stream()->flow_control->TestOnlyForceHugeWindow();
+ f.chttp2_transport()->flow_control->TestOnlyForceHugeWindow();
grpc_slice_buffer_stream_init(&send_stream, &send_buffer, 0);
reset_op();
op.on_complete = c.get();
@@ -560,22 +559,21 @@
std::unique_ptr<Closure> drain_continue;
grpc_slice recv_slice;
- std::unique_ptr<Closure> c = MakeClosure([&](grpc_exec_ctx *exec_ctx,
- grpc_error *error) {
- if (!state.KeepRunning()) return;
- // force outgoing window to be yuge
- s.chttp2_stream()->flow_control.local_window_delta = 1024 * 1024 * 1024;
- s.chttp2_stream()->flow_control.announced_window_delta = 1024 * 1024 * 1024;
- f.chttp2_transport()->flow_control.announced_window = 1024 * 1024 * 1024;
- received = 0;
- reset_op();
- op.on_complete = do_nothing.get();
- op.recv_message = true;
- op.payload->recv_message.recv_message = &recv_stream;
- op.payload->recv_message.recv_message_ready = drain_start.get();
- s.Op(exec_ctx, &op);
- f.PushInput(grpc_slice_ref(incoming_data));
- });
+ std::unique_ptr<Closure> c =
+ MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) {
+ if (!state.KeepRunning()) return;
+ // force outgoing window to be yuge
+ s.chttp2_stream()->flow_control->TestOnlyForceHugeWindow();
+ f.chttp2_transport()->flow_control->TestOnlyForceHugeWindow();
+ received = 0;
+ reset_op();
+ op.on_complete = do_nothing.get();
+ op.recv_message = true;
+ op.payload->recv_message.recv_message = &recv_stream;
+ op.payload->recv_message.recv_message_ready = drain_start.get();
+ s.Op(exec_ctx, &op);
+ f.PushInput(grpc_slice_ref(incoming_data));
+ });
drain_start = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) {
if (recv_stream == NULL) {
diff --git a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc
index 06ae342..389b8c9 100644
--- a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc
+++ b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc
@@ -142,15 +142,18 @@
client->lists[GRPC_CHTTP2_LIST_STALLED_BY_STREAM].head != nullptr,
server->lists[GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT].head != nullptr,
server->lists[GRPC_CHTTP2_LIST_STALLED_BY_STREAM].head != nullptr,
- client->flow_control.remote_window, server->flow_control.remote_window,
- client->flow_control.announced_window,
- server->flow_control.announced_window,
- client_stream ? client_stream->flow_control.remote_window_delta : -1,
- server_stream ? server_stream->flow_control.remote_window_delta : -1,
- client_stream ? client_stream->flow_control.local_window_delta : -1,
- server_stream ? server_stream->flow_control.local_window_delta : -1,
- client_stream ? client_stream->flow_control.announced_window_delta : -1,
- server_stream ? server_stream->flow_control.announced_window_delta : -1,
+ client->flow_control->remote_window(),
+ server->flow_control->remote_window(),
+ client->flow_control->announced_window(),
+ server->flow_control->announced_window(),
+ client_stream ? client_stream->flow_control->remote_window_delta() : -1,
+ server_stream ? server_stream->flow_control->remote_window_delta() : -1,
+ client_stream ? client_stream->flow_control->local_window_delta() : -1,
+ server_stream ? server_stream->flow_control->local_window_delta() : -1,
+ client_stream ? client_stream->flow_control->announced_window_delta()
+ : -1,
+ server_stream ? server_stream->flow_control->announced_window_delta()
+ : -1,
client->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
client->settings[GRPC_LOCAL_SETTINGS]
diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal
index 1b1a7e7..e053642 100644
--- a/tools/doxygen/Doxyfile.core.internal
+++ b/tools/doxygen/Doxyfile.core.internal
@@ -1020,6 +1020,7 @@
src/core/ext/transport/chttp2/transport/chttp2_transport.cc \
src/core/ext/transport/chttp2/transport/chttp2_transport.h \
src/core/ext/transport/chttp2/transport/flow_control.cc \
+src/core/ext/transport/chttp2/transport/flow_control.h \
src/core/ext/transport/chttp2/transport/frame.h \
src/core/ext/transport/chttp2/transport/frame_data.cc \
src/core/ext/transport/chttp2/transport/frame_data.h \
diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json
index dca2e83..66d91e3 100644
--- a/tools/run_tests/generated/sources_and_headers.json
+++ b/tools/run_tests/generated/sources_and_headers.json
@@ -2447,23 +2447,6 @@
"headers": [],
"is_filegroup": false,
"language": "c",
- "name": "transport_pid_controller_test",
- "src": [
- "test/core/transport/pid_controller_test.c"
- ],
- "third_party": false,
- "type": "target"
- },
- {
- "deps": [
- "gpr",
- "gpr_test_util",
- "grpc",
- "grpc_test_util"
- ],
- "headers": [],
- "is_filegroup": false,
- "language": "c",
"name": "transport_security_test",
"src": [
"test/core/tsi/transport_security_test.c"
@@ -4248,6 +4231,25 @@
"gpr_test_util",
"grpc",
"grpc++",
+ "grpc++_test_util",
+ "grpc_test_util"
+ ],
+ "headers": [],
+ "is_filegroup": false,
+ "language": "c++",
+ "name": "transport_pid_controller_test",
+ "src": [
+ "test/core/transport/pid_controller_test.cc"
+ ],
+ "third_party": false,
+ "type": "target"
+ },
+ {
+ "deps": [
+ "gpr",
+ "gpr_test_util",
+ "grpc",
+ "grpc++",
"grpc++_test",
"grpc_test_util"
],
@@ -4928,24 +4930,6 @@
"headers": [],
"is_filegroup": false,
"language": "c",
- "name": "large_metadata_bad_client_test",
- "src": [
- "test/core/bad_client/tests/large_metadata.c"
- ],
- "third_party": false,
- "type": "target"
- },
- {
- "deps": [
- "bad_client_test",
- "gpr",
- "gpr_test_util",
- "grpc_test_util_unsecure",
- "grpc_unsecure"
- ],
- "headers": [],
- "is_filegroup": false,
- "language": "c",
"name": "server_registered_method_bad_client_test",
"src": [
"test/core/bad_client/tests/server_registered_method.c"
@@ -9026,6 +9010,7 @@
"src/core/ext/transport/chttp2/transport/bin_decoder.h",
"src/core/ext/transport/chttp2/transport/bin_encoder.h",
"src/core/ext/transport/chttp2/transport/chttp2_transport.h",
+ "src/core/ext/transport/chttp2/transport/flow_control.h",
"src/core/ext/transport/chttp2/transport/frame.h",
"src/core/ext/transport/chttp2/transport/frame_data.h",
"src/core/ext/transport/chttp2/transport/frame_goaway.h",
@@ -9055,6 +9040,7 @@
"src/core/ext/transport/chttp2/transport/chttp2_transport.cc",
"src/core/ext/transport/chttp2/transport/chttp2_transport.h",
"src/core/ext/transport/chttp2/transport/flow_control.cc",
+ "src/core/ext/transport/chttp2/transport/flow_control.h",
"src/core/ext/transport/chttp2/transport/frame.h",
"src/core/ext/transport/chttp2/transport/frame_data.cc",
"src/core/ext/transport/chttp2/transport/frame_data.h",
diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json
index 4794b14..fe466a0 100644
--- a/tools/run_tests/generated/tests.json
+++ b/tools/run_tests/generated/tests.json
@@ -2867,31 +2867,7 @@
"posix",
"windows"
],
- "uses_polling": false
- },
- {
- "args": [],
- "benchmark": false,
- "ci_platforms": [
- "linux",
- "mac",
- "posix",
- "windows"
- ],
- "cpu_cost": 1.0,
- "exclude_configs": [],
- "exclude_iomgrs": [],
- "flaky": false,
- "gtest": false,
- "language": "c",
- "name": "transport_pid_controller_test",
- "platforms": [
- "linux",
- "mac",
- "posix",
- "windows"
- ],
- "uses_polling": false
+ "uses_polling": true
},
{
"args": [],
@@ -4514,6 +4490,30 @@
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
+ "gtest": false,
+ "language": "c++",
+ "name": "transport_pid_controller_test",
+ "platforms": [
+ "linux",
+ "mac",
+ "posix",
+ "windows"
+ ],
+ "uses_polling": true
+ },
+ {
+ "args": [],
+ "benchmark": false,
+ "ci_platforms": [
+ "linux",
+ "mac",
+ "posix",
+ "windows"
+ ],
+ "cpu_cost": 1.0,
+ "exclude_configs": [],
+ "exclude_iomgrs": [],
+ "flaky": false,
"gtest": true,
"language": "c++",
"name": "vector_test",
@@ -4718,32 +4718,6 @@
"flaky": false,
"gtest": false,
"language": "c",
- "name": "large_metadata_bad_client_test",
- "platforms": [
- "linux",
- "mac",
- "posix",
- "windows"
- ],
- "uses_polling": true
- },
- {
- "args": [],
- "benchmark": false,
- "ci_platforms": [
- "linux",
- "mac",
- "posix",
- "windows"
- ],
- "cpu_cost": 1.0,
- "exclude_configs": [],
- "exclude_iomgrs": [
- "uv"
- ],
- "flaky": false,
- "gtest": false,
- "language": "c",
"name": "server_registered_method_bad_client_test",
"platforms": [
"linux",