Fixes, debug
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c
index 5ac2d4d..379b875 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.c
+++ b/src/core/lib/iomgr/ev_epoll1_linux.c
@@ -94,6 +94,18 @@
typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state;
+static const char *kick_state_string(kick_state st) {
+ switch (st) {
+ case UNKICKED:
+ return "UNKICKED";
+ case KICKED:
+ return "KICKED";
+ case DESIGNATED_POLLER:
+ return "DESIGNATED_POLLER";
+ }
+ GPR_UNREACHABLE_CODE(return "UNKNOWN");
+}
+
struct grpc_pollset_worker {
kick_state kick_state;
int kick_state_mutator; // which line of code last changed kick state
@@ -217,7 +229,7 @@
gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
#ifdef GRPC_FD_REF_COUNT_DEBUG
- gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, (void *)new_fd, fd_name);
+ gpr_log(GPR_ERROR, "FD %d %p create %s", fd, (void *)new_fd, fd_name);
#endif
gpr_free(fd_name);
@@ -283,12 +295,12 @@
static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure);
+ grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read");
}
static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
+ grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write");
}
static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
@@ -297,7 +309,7 @@
static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_pollset *notifier) {
- grpc_lfev_set_ready(exec_ctx, &fd->read_closure);
+ grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read");
/* Note, it is possible that fd_become_readable might be called twice with
different 'notifier's when an fd becomes readable and it is in two epoll
@@ -309,7 +321,7 @@
}
static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- grpc_lfev_set_ready(exec_ctx, &fd->write_closure);
+ grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write");
}
/*******************************************************************************
@@ -555,6 +567,10 @@
worker->schedule_on_end_work = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
pollset->begin_refs++;
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, "PS:%p BEGIN_STARTS:%p", pollset, worker);
+ }
+
if (pollset->seen_inactive) {
// pollset has been observed to be inactive, we need to move back to the
// active list
@@ -570,6 +586,11 @@
retry_lock_neighbourhood:
gpr_mu_lock(&neighbourhood->mu);
gpr_mu_lock(&pollset->mu);
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, "PS:%p BEGIN_REORG:%p kick_state=%s is_reassigning=%d",
+ pollset, worker, kick_state_string(worker->kick_state),
+ is_reassigning);
+ }
if (pollset->seen_inactive) {
if (neighbourhood != pollset->neighbourhood) {
gpr_mu_unlock(&neighbourhood->mu);
@@ -603,6 +624,11 @@
worker->initialized_cv = true;
gpr_cv_init(&worker->cv);
while (worker->kick_state == UNKICKED && !pollset->shutting_down) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, "PS:%p BEGIN_WAIT:%p kick_state=%s shutdown=%d",
+ pollset, worker, kick_state_string(worker->kick_state),
+ pollset->shutting_down);
+ }
if (gpr_cv_wait(&worker->cv, &pollset->mu, deadline) &&
worker->kick_state == UNKICKED) {
SET_KICK_STATE(worker, KICKED);
@@ -610,6 +636,11 @@
}
*now = gpr_now(now->clock_type);
}
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, "PS:%p BEGIN_DONE:%p kick_state=%s shutdown=%d", pollset,
+ worker, kick_state_string(worker->kick_state),
+ pollset->shutting_down);
+ }
return worker->kick_state == DESIGNATED_POLLER && !pollset->shutting_down;
}
@@ -631,10 +662,18 @@
case UNKICKED:
if (gpr_atm_no_barrier_cas(&g_active_poller, 0,
(gpr_atm)inspect_worker)) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, " .. choose next poller to be %p",
+ inspect_worker);
+ }
SET_KICK_STATE(inspect_worker, DESIGNATED_POLLER);
if (inspect_worker->initialized_cv) {
gpr_cv_signal(&inspect_worker->cv);
}
+ } else {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, " .. beaten to choose next poller");
+ }
}
// even if we didn't win the cas, there's a worker, we can stop
found_worker = true;
@@ -647,9 +686,12 @@
break;
}
inspect_worker = inspect_worker->next;
- } while (inspect_worker != inspect->root_worker);
+ } while (!found_worker && inspect_worker != inspect->root_worker);
}
if (!found_worker) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, " .. mark pollset %p inactive", inspect);
+ }
inspect->seen_inactive = true;
if (inspect == neighbourhood->active_root) {
neighbourhood->active_root =
@@ -667,12 +709,19 @@
static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker *worker,
grpc_pollset_worker **worker_hdl) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, "PS:%p END_WORKER:%p", pollset, worker);
+ }
if (worker_hdl != NULL) *worker_hdl = NULL;
+ /* Make sure we appear kicked */
SET_KICK_STATE(worker, KICKED);
grpc_closure_list_move(&worker->schedule_on_end_work,
&exec_ctx->closure_list);
if (gpr_atm_no_barrier_load(&g_active_poller) == (gpr_atm)worker) {
if (worker->next != worker && worker->next->kick_state == UNKICKED) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, " .. choose next poller to be peer %p", worker);
+ }
GPR_ASSERT(worker->next->initialized_cv);
gpr_atm_no_barrier_store(&g_active_poller, (gpr_atm)worker->next);
SET_KICK_STATE(worker->next, DESIGNATED_POLLER);
@@ -722,6 +771,9 @@
if (worker->initialized_cv) {
gpr_cv_destroy(&worker->cv);
}
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, " .. remove worker");
+ }
if (EMPTIED == worker_remove(pollset, worker)) {
pollset_maybe_finish_shutdown(exec_ctx, pollset);
}
@@ -742,8 +794,8 @@
pollset->kicked_without_poller = false;
return GRPC_ERROR_NONE;
}
- gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
if (begin_worker(pollset, &worker, worker_hdl, &now, deadline)) {
+ gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
GPR_ASSERT(!pollset->shutting_down);
GPR_ASSERT(!pollset->seen_inactive);
@@ -752,6 +804,8 @@
err_desc);
gpr_mu_lock(&pollset->mu);
gpr_tls_set(&g_current_thread_worker, 0);
+ } else {
+ gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
}
end_worker(exec_ctx, pollset, &worker, worker_hdl);
gpr_tls_set(&g_current_thread_pollset, 0);
@@ -770,18 +824,20 @@
(void *)gpr_tls_get(&g_current_thread_worker), pollset->root_worker);
gpr_strvec_add(&log, tmp);
if (pollset->root_worker != NULL) {
- gpr_asprintf(&tmp, " {kicked=%d next=%p {kicked=%d}}",
- pollset->root_worker->kick_state, pollset->root_worker->next,
- pollset->root_worker->next->kick_state);
+ gpr_asprintf(&tmp, " {kick_state=%s next=%p {kick_state=%s}}",
+ kick_state_string(pollset->root_worker->kick_state),
+ pollset->root_worker->next,
+ kick_state_string(pollset->root_worker->next->kick_state));
gpr_strvec_add(&log, tmp);
}
if (specific_worker != NULL) {
- gpr_asprintf(&tmp, " worker_kicked=%d", specific_worker->kick_state);
+ gpr_asprintf(&tmp, " worker_kick_state=%s",
+ kick_state_string(specific_worker->kick_state));
gpr_strvec_add(&log, tmp);
}
tmp = gpr_strvec_flatten(&log, NULL);
gpr_strvec_destroy(&log);
- gpr_log(GPR_DEBUG, "%s", tmp);
+ gpr_log(GPR_ERROR, "%s", tmp);
gpr_free(tmp);
}
if (specific_worker == NULL) {
@@ -790,23 +846,36 @@
if (root_worker == NULL) {
pollset->kicked_without_poller = true;
if (GRPC_TRACER_ON(grpc_polling_trace)) {
- gpr_log(GPR_DEBUG, " .. kicked_without_poller");
+ gpr_log(GPR_ERROR, " .. kicked_without_poller");
}
return GRPC_ERROR_NONE;
}
grpc_pollset_worker *next_worker = root_worker->next;
- if (root_worker == next_worker && // only try and wake up a poller if
- // there is no next worker
- root_worker == (grpc_pollset_worker *)gpr_atm_no_barrier_load(
- &g_active_poller)) {
+ if (root_worker->kick_state == KICKED) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
- gpr_log(GPR_DEBUG, " .. kicked %p", root_worker);
+ gpr_log(GPR_ERROR, " .. already kicked %p", root_worker);
+ }
+ SET_KICK_STATE(root_worker, KICKED);
+ return GRPC_ERROR_NONE;
+ } else if (next_worker->kick_state == KICKED) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, " .. already kicked %p", next_worker);
+ }
+ SET_KICK_STATE(next_worker, KICKED);
+ return GRPC_ERROR_NONE;
+ } else if (root_worker ==
+ next_worker && // only try and wake up a poller if
+ // there is no next worker
+ root_worker == (grpc_pollset_worker *)gpr_atm_no_barrier_load(
+ &g_active_poller)) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, " .. kicked %p", root_worker);
}
SET_KICK_STATE(root_worker, KICKED);
return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
} else if (next_worker->kick_state == UNKICKED) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
- gpr_log(GPR_DEBUG, " .. kicked %p", next_worker);
+ gpr_log(GPR_ERROR, " .. kicked %p", next_worker);
}
GPR_ASSERT(next_worker->initialized_cv);
SET_KICK_STATE(next_worker, KICKED);
@@ -815,7 +884,10 @@
} else if (next_worker->kick_state == DESIGNATED_POLLER) {
if (root_worker->kick_state != DESIGNATED_POLLER) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
- gpr_log(GPR_DEBUG, " .. kicked root non-poller %p", next_worker);
+ gpr_log(
+ GPR_ERROR,
+ " .. kicked root non-poller %p (initialized_cv=%d) (poller=%p)",
+ root_worker, root_worker->initialized_cv, next_worker);
}
SET_KICK_STATE(root_worker, KICKED);
if (root_worker->initialized_cv) {
@@ -824,7 +896,7 @@
return GRPC_ERROR_NONE;
} else {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
- gpr_log(GPR_DEBUG, " .. non-root poller %p (root=%p)", next_worker,
+ gpr_log(GPR_ERROR, " .. non-root poller %p (root=%p)", next_worker,
root_worker);
}
SET_KICK_STATE(next_worker, KICKED);
@@ -836,37 +908,40 @@
return GRPC_ERROR_NONE;
}
} else {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, " .. kicked while waking up");
+ }
return GRPC_ERROR_NONE;
}
} else if (specific_worker->kick_state == KICKED) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
- gpr_log(GPR_DEBUG, " .. specific worker already kicked");
+ gpr_log(GPR_ERROR, " .. specific worker already kicked");
}
return GRPC_ERROR_NONE;
} else if (gpr_tls_get(&g_current_thread_worker) ==
(intptr_t)specific_worker) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
- gpr_log(GPR_DEBUG, " .. mark %p kicked", specific_worker);
+ gpr_log(GPR_ERROR, " .. mark %p kicked", specific_worker);
}
SET_KICK_STATE(specific_worker, KICKED);
return GRPC_ERROR_NONE;
} else if (specific_worker ==
(grpc_pollset_worker *)gpr_atm_no_barrier_load(&g_active_poller)) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
- gpr_log(GPR_DEBUG, " .. kick active poller");
+ gpr_log(GPR_ERROR, " .. kick active poller");
}
SET_KICK_STATE(specific_worker, KICKED);
return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
} else if (specific_worker->initialized_cv) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
- gpr_log(GPR_DEBUG, " .. kick waiting worker");
+ gpr_log(GPR_ERROR, " .. kick waiting worker");
}
SET_KICK_STATE(specific_worker, KICKED);
gpr_cv_signal(&specific_worker->cv);
return GRPC_ERROR_NONE;
} else {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
- gpr_log(GPR_DEBUG, " .. kick non-waiting worker");
+ gpr_log(GPR_ERROR, " .. kick non-waiting worker");
}
SET_KICK_STATE(specific_worker, KICKED);
return GRPC_ERROR_NONE;
@@ -1051,6 +1126,8 @@
return NULL;
}
+ gpr_log(GPR_ERROR, "grpc epoll fd: %d", g_epfd);
+
return &vtable;
}