Merge github.com:grpc/grpc into clangf
diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c
index 8d91ef4..96abaea 100644
--- a/src/core/iomgr/pollset_windows.c
+++ b/src/core/iomgr/pollset_windows.c
@@ -43,19 +43,19 @@
#include "src/core/iomgr/pollset.h"
#include "src/core/iomgr/pollset_windows.h"
-static gpr_mu g_polling_mu;
+gpr_mu grpc_polling_mu;
static grpc_pollset_worker *g_active_poller;
static grpc_pollset_worker g_global_root_worker;
void grpc_pollset_global_init() {
- gpr_mu_init(&g_polling_mu);
+ gpr_mu_init(&grpc_polling_mu);
g_active_poller = NULL;
g_global_root_worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next =
g_global_root_worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev =
&g_global_root_worker;
}
-void grpc_pollset_global_shutdown() { gpr_mu_destroy(&g_polling_mu); }
+void grpc_pollset_global_shutdown() { gpr_mu_destroy(&grpc_polling_mu); }
static void remove_worker(grpc_pollset_worker *worker,
grpc_pollset_worker_link_type type) {
@@ -105,7 +105,6 @@
void grpc_pollset_init(grpc_pollset *pollset) {
memset(pollset, 0, sizeof(*pollset));
- gpr_mu_init(&pollset->mu);
pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next =
pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev =
&pollset->root_worker;
@@ -113,7 +112,7 @@
void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_closure *closure) {
- gpr_mu_lock(&pollset->mu);
+ gpr_mu_lock(&grpc_polling_mu);
pollset->shutting_down = 1;
grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
if (!pollset->is_iocp_worker) {
@@ -121,43 +120,49 @@
} else {
pollset->on_shutdown = closure;
}
- gpr_mu_unlock(&pollset->mu);
+ gpr_mu_unlock(&grpc_polling_mu);
}
-void grpc_pollset_destroy(grpc_pollset *pollset) {
- gpr_mu_destroy(&pollset->mu);
-}
+void grpc_pollset_destroy(grpc_pollset *pollset) {}
void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker *worker, gpr_timespec now,
gpr_timespec deadline) {
int added_worker = 0;
worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].next =
- worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev = NULL;
+ worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev =
+ worker->links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next =
+ worker->links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev = NULL;
+ worker->kicked = 0;
+ worker->pollset = pollset;
gpr_cv_init(&worker->cv);
if (grpc_alarm_check(exec_ctx, now, &deadline)) {
goto done;
}
if (!pollset->kicked_without_pollers && !pollset->shutting_down) {
- gpr_mu_lock(&g_polling_mu);
if (g_active_poller == NULL) {
grpc_pollset_worker *next_worker;
/* become poller */
pollset->is_iocp_worker = 1;
g_active_poller = worker;
- gpr_mu_unlock(&g_polling_mu);
- gpr_mu_unlock(&pollset->mu);
+ gpr_mu_unlock(&grpc_polling_mu);
grpc_iocp_work(exec_ctx, deadline);
- gpr_mu_lock(&pollset->mu);
- gpr_mu_lock(&g_polling_mu);
+ grpc_exec_ctx_flush(exec_ctx);
+ gpr_mu_lock(&grpc_polling_mu);
pollset->is_iocp_worker = 0;
g_active_poller = NULL;
- next_worker = pop_front_worker(&g_global_root_worker,
- GRPC_POLLSET_WORKER_LINK_GLOBAL);
+ /* try to get a worker from this pollsets worker list */
+ next_worker = pop_front_worker(&pollset->root_worker,
+ GRPC_POLLSET_WORKER_LINK_POLLSET);
+ if (next_worker == NULL) {
+ /* try to get a worker from the global list */
+ next_worker = pop_front_worker(&g_global_root_worker,
+ GRPC_POLLSET_WORKER_LINK_GLOBAL);
+ }
if (next_worker != NULL) {
+ next_worker->kicked = 1;
gpr_cv_signal(&next_worker->cv);
}
- gpr_mu_unlock(&g_polling_mu);
if (pollset->shutting_down && pollset->on_shutdown != NULL) {
grpc_exec_ctx_enqueue(exec_ctx, pollset->on_shutdown, 1);
@@ -167,25 +172,28 @@
}
push_front_worker(&g_global_root_worker, GRPC_POLLSET_WORKER_LINK_GLOBAL,
worker);
- gpr_mu_unlock(&g_polling_mu);
push_front_worker(&pollset->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET,
worker);
added_worker = 1;
- gpr_cv_wait(&worker->cv, &pollset->mu, deadline);
+ while (!worker->kicked) {
+ if (gpr_cv_wait(&worker->cv, &grpc_polling_mu, deadline)) {
+ break;
+ }
+ }
} else {
pollset->kicked_without_pollers = 0;
}
done:
if (!grpc_closure_list_empty(exec_ctx->closure_list)) {
- gpr_mu_unlock(&pollset->mu);
+ gpr_mu_unlock(&grpc_polling_mu);
grpc_exec_ctx_flush(exec_ctx);
- gpr_mu_lock(&pollset->mu);
+ gpr_mu_lock(&grpc_polling_mu);
}
- gpr_cv_destroy(&worker->cv);
if (added_worker) {
remove_worker(worker, GRPC_POLLSET_WORKER_LINK_GLOBAL);
remove_worker(worker, GRPC_POLLSET_WORKER_LINK_POLLSET);
}
+ gpr_cv_destroy(&worker->cv);
}
void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {
@@ -196,6 +204,7 @@
specific_worker != &p->root_worker;
specific_worker =
specific_worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].next) {
+ specific_worker->kicked = 1;
gpr_cv_signal(&specific_worker->cv);
}
p->kicked_without_pollers = 1;
@@ -204,12 +213,11 @@
}
} else {
if (p->is_iocp_worker) {
- gpr_mu_lock(&g_polling_mu);
if (g_active_poller == specific_worker) {
grpc_iocp_kick();
}
- gpr_mu_unlock(&g_polling_mu);
} else {
+ specific_worker->kicked = 1;
gpr_cv_signal(&specific_worker->cv);
}
}