PID controller stabilization
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 6d302b0..8cce283 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -252,8 +252,8 @@
grpc_bdp_estimator_init(&t->bdp_estimator);
t->last_bdp_ping_finished = gpr_now(GPR_CLOCK_MONOTONIC);
t->last_pid_update = t->last_bdp_ping_finished;
- grpc_pid_controller_init(&t->pid_controller, 16, 8, 0);
- t->bdp_guess = DEFAULT_WINDOW;
+ grpc_pid_controller_init(&t->pid_controller, 4, 4, 0);
+ t->log2_bdp_guess = log2(DEFAULT_WINDOW);
grpc_chttp2_goaway_parser_init(&t->goaway_parser);
grpc_chttp2_hpack_parser_init(&t->hpack_parser);
@@ -1897,26 +1897,29 @@
int64_t estimate = -1;
double bdp_error = 0.0;
if (grpc_bdp_estimator_get_estimate(&t->bdp_estimator, &estimate)) {
- bdp_error = 2.0 * (double)estimate - t->bdp_guess;
+ double target = (double)estimate;
+ double memory_pressure = grpc_resource_quota_get_memory_pressure(
+ grpc_resource_user_get_quota(grpc_endpoint_get_resource_user(t->ep)));
+ if (memory_pressure > 0.8) {
+ target *= 1 - GPR_MIN(1, (memory_pressure - 0.8) / 0.1);
+ }
+ bdp_error = target > 0 ? log2(target) - t->log2_bdp_guess
+ : GPR_MIN(0, -t->log2_bdp_guess);
+ gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
+ gpr_timespec dt_timespec = gpr_time_sub(now, t->last_pid_update);
+ double dt = (double)dt_timespec.tv_sec + dt_timespec.tv_nsec * 1e-9;
+ if (dt > 3) {
+ grpc_pid_controller_reset(&t->pid_controller);
+ }
+ t->log2_bdp_guess +=
+ grpc_pid_controller_update(&t->pid_controller, bdp_error, dt);
+ t->log2_bdp_guess = GPR_CLAMP(t->log2_bdp_guess, -5, 21);
+ gpr_log(GPR_DEBUG, "%s: err=%lf cur=%lf pressure=%lf target=%lf",
+ t->peer_string, bdp_error, t->log2_bdp_guess, memory_pressure,
+ target);
+ update_bdp(exec_ctx, t, pow(2, t->log2_bdp_guess));
+ t->last_pid_update = now;
}
- gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
- gpr_timespec dt_timespec = gpr_time_sub(now, t->last_pid_update);
- double dt = (double)dt_timespec.tv_sec + dt_timespec.tv_nsec * 1e-9;
- if (dt > 3) {
- grpc_pid_controller_reset(&t->pid_controller);
- }
- double memory_pressure = grpc_resource_quota_get_memory_pressure(
- grpc_resource_user_get_quota(grpc_endpoint_get_resource_user(t->ep)));
- if (memory_pressure > 0.8) {
- bdp_error -= GPR_MAX(0, t->bdp_guess) * (memory_pressure - 0.8) / 0.2;
- }
- if (t->bdp_guess < 1e-6 && bdp_error < 0) {
- bdp_error = 0;
- }
- t->bdp_guess +=
- grpc_pid_controller_update(&t->pid_controller, bdp_error, dt);
- update_bdp(exec_ctx, t, t->bdp_guess);
- t->last_pid_update = now;
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keep_reading");
} else {
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "reading_action");
@@ -1929,21 +1932,23 @@
GPR_TIMER_END("reading_action_locked", 0);
}
+static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
+ grpc_error *error) {
+ grpc_chttp2_transport *t = tp;
+ gpr_log(GPR_DEBUG, "%s: Start BDP ping", t->peer_string);
+ grpc_bdp_estimator_start_ping(&t->bdp_estimator);
+}
+
static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error) {
grpc_chttp2_transport *t = tp;
+ gpr_log(GPR_DEBUG, "%s: Complete BDP ping", t->peer_string);
grpc_bdp_estimator_complete_ping(&t->bdp_estimator);
t->last_bdp_ping_finished = gpr_now(GPR_CLOCK_MONOTONIC);
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping");
}
-static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
- grpc_error *error) {
- grpc_chttp2_transport *t = tp;
- grpc_bdp_estimator_start_ping(&t->bdp_estimator);
-}
-
/*******************************************************************************
* CALLBACK LOOP
*/
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 90abf38..e320dd0 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -82,8 +82,8 @@
} grpc_chttp2_ping_type;
typedef enum {
- GRPC_CHTTP2_PCL_NEXT = 0,
- GRPC_CHTTP2_PCL_INITIATE,
+ GRPC_CHTTP2_PCL_INITIATE = 0,
+ GRPC_CHTTP2_PCL_NEXT,
GRPC_CHTTP2_PCL_INFLIGHT,
GRPC_CHTTP2_PCL_COUNT /* must be last */
} grpc_chttp2_ping_closure_list;
@@ -330,7 +330,7 @@
/* bdp estimator */
grpc_bdp_estimator bdp_estimator;
grpc_pid_controller pid_controller;
- double bdp_guess;
+ double log2_bdp_guess;
grpc_closure start_bdp_ping_locked;
grpc_closure finish_bdp_ping_locked;
gpr_timespec last_bdp_ping_finished;
diff --git a/src/core/lib/transport/bdp_estimator.c b/src/core/lib/transport/bdp_estimator.c
index 4c2c1d3..90e4332 100644
--- a/src/core/lib/transport/bdp_estimator.c
+++ b/src/core/lib/transport/bdp_estimator.c
@@ -39,33 +39,16 @@
#include <grpc/support/useful.h>
void grpc_bdp_estimator_init(grpc_bdp_estimator *estimator) {
- estimator->num_samples = 0;
- estimator->first_sample_idx = 0;
+ estimator->estimate = 65536;
estimator->ping_state = GRPC_BDP_PING_UNSCHEDULED;
}
bool grpc_bdp_estimator_get_estimate(grpc_bdp_estimator *estimator,
int64_t *estimate) {
- if (estimator->num_samples < GRPC_BDP_MIN_SAMPLES_FOR_ESTIMATE) {
- return false;
- }
-
- *estimate = -1;
- for (uint8_t i = 0; i < estimator->num_samples; i++) {
- *estimate = GPR_MAX(
- *estimate,
- estimator
- ->samples[(estimator->first_sample_idx + i) % GRPC_BDP_SAMPLES]);
- }
+ *estimate = estimator->estimate;
return true;
}
-static int64_t *sampling(grpc_bdp_estimator *estimator) {
- return &estimator
- ->samples[(estimator->first_sample_idx + estimator->num_samples) %
- GRPC_BDP_SAMPLES];
-}
-
bool grpc_bdp_estimator_add_incoming_bytes(grpc_bdp_estimator *estimator,
int64_t num_bytes) {
switch (estimator->ping_state) {
@@ -74,7 +57,7 @@
case GRPC_BDP_PING_SCHEDULED:
return false;
case GRPC_BDP_PING_STARTED:
- *sampling(estimator) += num_bytes;
+ estimator->accumulator += num_bytes;
return false;
}
GPR_UNREACHABLE_CODE(return false);
@@ -88,15 +71,14 @@
void grpc_bdp_estimator_start_ping(grpc_bdp_estimator *estimator) {
GPR_ASSERT(estimator->ping_state == GRPC_BDP_PING_SCHEDULED);
estimator->ping_state = GRPC_BDP_PING_STARTED;
- if (estimator->num_samples == GRPC_BDP_SAMPLES) {
- estimator->first_sample_idx++;
- estimator->num_samples--;
- }
- *sampling(estimator) = 0;
+ estimator->accumulator = 0;
}
void grpc_bdp_estimator_complete_ping(grpc_bdp_estimator *estimator) {
GPR_ASSERT(estimator->ping_state == GRPC_BDP_PING_STARTED);
- estimator->num_samples++;
+ if (estimator->accumulator > 2 * estimator->estimate / 3) {
+ estimator->estimate *= 2;
+ gpr_log(GPR_DEBUG, "est --> %" PRId64, estimator->estimate);
+ }
estimator->ping_state = GRPC_BDP_PING_UNSCHEDULED;
}
diff --git a/src/core/lib/transport/bdp_estimator.h b/src/core/lib/transport/bdp_estimator.h
index d812f90..ea74f2b 100644
--- a/src/core/lib/transport/bdp_estimator.h
+++ b/src/core/lib/transport/bdp_estimator.h
@@ -47,10 +47,9 @@
} grpc_bdp_estimator_ping_state;
typedef struct grpc_bdp_estimator {
- uint8_t num_samples;
- uint8_t first_sample_idx;
grpc_bdp_estimator_ping_state ping_state;
- int64_t samples[GRPC_BDP_SAMPLES];
+ int64_t accumulator;
+ int64_t estimate;
} grpc_bdp_estimator;
void grpc_bdp_estimator_init(grpc_bdp_estimator *estimator);