blob: 379b8750145250434a53323257d8271067755c57 [file] [log] [blame]
Craig Tillerc67cc992017-04-27 10:15:51 -07001/*
2 *
Craig Tillerd4838a92017-04-27 12:08:18 -07003 * Copyright 2017, Google Inc.
Craig Tillerc67cc992017-04-27 10:15:51 -07004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/lib/iomgr/port.h"
35
36/* This polling engine is only relevant on linux kernels supporting epoll() */
37#ifdef GRPC_LINUX_EPOLL
38
Craig Tiller4509c472017-04-27 19:05:13 +000039#include "src/core/lib/iomgr/ev_epoll1_linux.h"
Craig Tillerc67cc992017-04-27 10:15:51 -070040
41#include <assert.h>
42#include <errno.h>
43#include <poll.h>
44#include <pthread.h>
45#include <string.h>
46#include <sys/epoll.h>
47#include <sys/socket.h>
48#include <unistd.h>
49
50#include <grpc/support/alloc.h>
Craig Tiller6de05932017-04-28 09:17:38 -070051#include <grpc/support/cpu.h>
Craig Tillerc67cc992017-04-27 10:15:51 -070052#include <grpc/support/log.h>
53#include <grpc/support/string_util.h>
54#include <grpc/support/tls.h>
55#include <grpc/support/useful.h>
56
57#include "src/core/lib/iomgr/ev_posix.h"
58#include "src/core/lib/iomgr/iomgr_internal.h"
59#include "src/core/lib/iomgr/lockfree_event.h"
Craig Tillerc67cc992017-04-27 10:15:51 -070060#include "src/core/lib/iomgr/wakeup_fd_posix.h"
61#include "src/core/lib/iomgr/workqueue.h"
62#include "src/core/lib/profiling/timers.h"
63#include "src/core/lib/support/block_annotate.h"
Craig Tillerb89bac02017-05-26 15:20:32 +000064#include "src/core/lib/support/string.h"
Craig Tillerc67cc992017-04-27 10:15:51 -070065
Craig Tillerc67cc992017-04-27 10:15:51 -070066static grpc_wakeup_fd global_wakeup_fd;
67static int g_epfd;
68
69/*******************************************************************************
70 * Fd Declarations
71 */
72
73struct grpc_fd {
74 int fd;
75
Craig Tillerc67cc992017-04-27 10:15:51 -070076 gpr_atm read_closure;
77 gpr_atm write_closure;
78
79 struct grpc_fd *freelist_next;
Craig Tillerc67cc992017-04-27 10:15:51 -070080
81 /* The pollset that last noticed that the fd is readable. The actual type
82 * stored in this is (grpc_pollset *) */
83 gpr_atm read_notifier_pollset;
84
85 grpc_iomgr_object iomgr_object;
86};
87
88static void fd_global_init(void);
89static void fd_global_shutdown(void);
90
91/*******************************************************************************
92 * Pollset Declarations
93 */
94
Craig Tiller43bf2592017-04-28 23:21:01 +000095typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state;
Craig Tillerc67cc992017-04-27 10:15:51 -070096
Craig Tiller830e82a2017-05-31 16:26:27 -070097static const char *kick_state_string(kick_state st) {
98 switch (st) {
99 case UNKICKED:
100 return "UNKICKED";
101 case KICKED:
102 return "KICKED";
103 case DESIGNATED_POLLER:
104 return "DESIGNATED_POLLER";
105 }
106 GPR_UNREACHABLE_CODE(return "UNKNOWN");
107}
108
Craig Tillerc67cc992017-04-27 10:15:51 -0700109struct grpc_pollset_worker {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700110 kick_state kick_state;
Craig Tiller55624a32017-05-26 08:14:44 -0700111 int kick_state_mutator; // which line of code last changed kick state
Craig Tillerc67cc992017-04-27 10:15:51 -0700112 bool initialized_cv;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700113 grpc_pollset_worker *next;
114 grpc_pollset_worker *prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700115 gpr_cv cv;
Craig Tiller50da5ec2017-05-01 13:51:14 -0700116 grpc_closure_list schedule_on_end_work;
Craig Tillerc67cc992017-04-27 10:15:51 -0700117};
118
Craig Tiller55624a32017-05-26 08:14:44 -0700119#define SET_KICK_STATE(worker, state) \
120 do { \
121 (worker)->kick_state = (state); \
122 (worker)->kick_state_mutator = __LINE__; \
123 } while (false)
124
Craig Tillerba550da2017-05-01 14:26:31 +0000125#define MAX_NEIGHBOURHOODS 1024
126
Craig Tiller6de05932017-04-28 09:17:38 -0700127typedef struct pollset_neighbourhood {
128 gpr_mu mu;
129 grpc_pollset *active_root;
Craig Tiller6de05932017-04-28 09:17:38 -0700130 char pad[GPR_CACHELINE_SIZE];
131} pollset_neighbourhood;
132
Craig Tillerc67cc992017-04-27 10:15:51 -0700133struct grpc_pollset {
Craig Tiller6de05932017-04-28 09:17:38 -0700134 gpr_mu mu;
135 pollset_neighbourhood *neighbourhood;
Craig Tillere00d7332017-05-01 15:43:51 +0000136 bool reassigning_neighbourhood;
Craig Tiller4509c472017-04-27 19:05:13 +0000137 grpc_pollset_worker *root_worker;
138 bool kicked_without_poller;
Craig Tiller6de05932017-04-28 09:17:38 -0700139 bool seen_inactive;
Craig Tillerc67cc992017-04-27 10:15:51 -0700140 bool shutting_down; /* Is the pollset shutting down ? */
141 bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
Craig Tiller4509c472017-04-27 19:05:13 +0000142 grpc_closure *shutdown_closure; /* Called after after shutdown is complete */
Craig Tillerba550da2017-05-01 14:26:31 +0000143 int begin_refs;
Craig Tiller6de05932017-04-28 09:17:38 -0700144
145 grpc_pollset *next;
146 grpc_pollset *prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700147};
148
149/*******************************************************************************
150 * Pollset-set Declarations
151 */
Craig Tiller6de05932017-04-28 09:17:38 -0700152
Craig Tillerc67cc992017-04-27 10:15:51 -0700153struct grpc_pollset_set {};
154
155/*******************************************************************************
156 * Common helpers
157 */
158
159static bool append_error(grpc_error **composite, grpc_error *error,
160 const char *desc) {
161 if (error == GRPC_ERROR_NONE) return true;
162 if (*composite == GRPC_ERROR_NONE) {
163 *composite = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc);
164 }
165 *composite = grpc_error_add_child(*composite, error);
166 return false;
167}
168
169/*******************************************************************************
170 * Fd Definitions
171 */
172
173/* We need to keep a freelist not because of any concerns of malloc performance
174 * but instead so that implementations with multiple threads in (for example)
175 * epoll_wait deal with the race between pollset removal and incoming poll
176 * notifications.
177 *
178 * The problem is that the poller ultimately holds a reference to this
179 * object, so it is very difficult to know when is safe to free it, at least
180 * without some expensive synchronization.
181 *
182 * If we keep the object freelisted, in the worst case losing this race just
183 * becomes a spurious read notification on a reused fd.
184 */
185
186/* The alarm system needs to be able to wakeup 'some poller' sometimes
187 * (specifically when a new alarm needs to be triggered earlier than the next
188 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
189 * case occurs. */
190
191static grpc_fd *fd_freelist = NULL;
192static gpr_mu fd_freelist_mu;
193
Craig Tillerc67cc992017-04-27 10:15:51 -0700194static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
195
196static void fd_global_shutdown(void) {
197 gpr_mu_lock(&fd_freelist_mu);
198 gpr_mu_unlock(&fd_freelist_mu);
199 while (fd_freelist != NULL) {
200 grpc_fd *fd = fd_freelist;
201 fd_freelist = fd_freelist->freelist_next;
Craig Tillerc67cc992017-04-27 10:15:51 -0700202 gpr_free(fd);
203 }
204 gpr_mu_destroy(&fd_freelist_mu);
205}
206
207static grpc_fd *fd_create(int fd, const char *name) {
208 grpc_fd *new_fd = NULL;
209
210 gpr_mu_lock(&fd_freelist_mu);
211 if (fd_freelist != NULL) {
212 new_fd = fd_freelist;
213 fd_freelist = fd_freelist->freelist_next;
214 }
215 gpr_mu_unlock(&fd_freelist_mu);
216
217 if (new_fd == NULL) {
218 new_fd = gpr_malloc(sizeof(grpc_fd));
Craig Tillerc67cc992017-04-27 10:15:51 -0700219 }
220
Craig Tillerc67cc992017-04-27 10:15:51 -0700221 new_fd->fd = fd;
Craig Tillerc67cc992017-04-27 10:15:51 -0700222 grpc_lfev_init(&new_fd->read_closure);
223 grpc_lfev_init(&new_fd->write_closure);
224 gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
225
226 new_fd->freelist_next = NULL;
Craig Tillerc67cc992017-04-27 10:15:51 -0700227
228 char *fd_name;
229 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
230 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
231#ifdef GRPC_FD_REF_COUNT_DEBUG
Craig Tiller830e82a2017-05-31 16:26:27 -0700232 gpr_log(GPR_ERROR, "FD %d %p create %s", fd, (void *)new_fd, fd_name);
Craig Tillerc67cc992017-04-27 10:15:51 -0700233#endif
234 gpr_free(fd_name);
Craig Tiller9ddb3152017-04-27 21:32:56 +0000235
236 struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET),
237 .data.ptr = new_fd};
238 if (epoll_ctl(g_epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
239 gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno));
240 }
241
Craig Tillerc67cc992017-04-27 10:15:51 -0700242 return new_fd;
243}
244
Craig Tiller4509c472017-04-27 19:05:13 +0000245static int fd_wrapped_fd(grpc_fd *fd) { return fd->fd; }
Craig Tillerc67cc992017-04-27 10:15:51 -0700246
Craig Tiller9ddb3152017-04-27 21:32:56 +0000247/* Might be called multiple times */
248static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
249 if (grpc_lfev_set_shutdown(exec_ctx, &fd->read_closure,
250 GRPC_ERROR_REF(why))) {
251 shutdown(fd->fd, SHUT_RDWR);
252 grpc_lfev_set_shutdown(exec_ctx, &fd->write_closure, GRPC_ERROR_REF(why));
253 }
254 GRPC_ERROR_UNREF(why);
255}
256
Craig Tillerc67cc992017-04-27 10:15:51 -0700257static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
258 grpc_closure *on_done, int *release_fd,
259 const char *reason) {
Craig Tillerc67cc992017-04-27 10:15:51 -0700260 grpc_error *error = GRPC_ERROR_NONE;
Craig Tillerc67cc992017-04-27 10:15:51 -0700261
Craig Tiller9ddb3152017-04-27 21:32:56 +0000262 if (!grpc_lfev_is_shutdown(&fd->read_closure)) {
263 fd_shutdown(exec_ctx, fd, GRPC_ERROR_CREATE_FROM_COPIED_STRING(reason));
264 }
265
Craig Tillerc67cc992017-04-27 10:15:51 -0700266 /* If release_fd is not NULL, we should be relinquishing control of the file
267 descriptor fd->fd (but we still own the grpc_fd structure). */
268 if (release_fd != NULL) {
269 *release_fd = fd->fd;
270 } else {
271 close(fd->fd);
Craig Tillerc67cc992017-04-27 10:15:51 -0700272 }
273
Craig Tiller4509c472017-04-27 19:05:13 +0000274 grpc_closure_sched(exec_ctx, on_done, GRPC_ERROR_REF(error));
Craig Tillerc67cc992017-04-27 10:15:51 -0700275
Craig Tiller4509c472017-04-27 19:05:13 +0000276 grpc_iomgr_unregister_object(&fd->iomgr_object);
277 grpc_lfev_destroy(&fd->read_closure);
278 grpc_lfev_destroy(&fd->write_closure);
Craig Tillerc67cc992017-04-27 10:15:51 -0700279
Craig Tiller4509c472017-04-27 19:05:13 +0000280 gpr_mu_lock(&fd_freelist_mu);
281 fd->freelist_next = fd_freelist;
282 fd_freelist = fd;
283 gpr_mu_unlock(&fd_freelist_mu);
Craig Tillerc67cc992017-04-27 10:15:51 -0700284}
285
286static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
287 grpc_fd *fd) {
288 gpr_atm notifier = gpr_atm_acq_load(&fd->read_notifier_pollset);
289 return (grpc_pollset *)notifier;
290}
291
292static bool fd_is_shutdown(grpc_fd *fd) {
293 return grpc_lfev_is_shutdown(&fd->read_closure);
294}
295
Craig Tillerc67cc992017-04-27 10:15:51 -0700296static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
297 grpc_closure *closure) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700298 grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read");
Craig Tillerc67cc992017-04-27 10:15:51 -0700299}
300
301static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
302 grpc_closure *closure) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700303 grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write");
Craig Tillerc67cc992017-04-27 10:15:51 -0700304}
305
306static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
Craig Tiller50da5ec2017-05-01 13:51:14 -0700307 return (grpc_workqueue *)0xb0b51ed;
Craig Tiller4509c472017-04-27 19:05:13 +0000308}
309
310static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
311 grpc_pollset *notifier) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700312 grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read");
Craig Tiller4509c472017-04-27 19:05:13 +0000313
314 /* Note, it is possible that fd_become_readable might be called twice with
315 different 'notifier's when an fd becomes readable and it is in two epoll
316 sets (This can happen briefly during polling island merges). In such cases
317 it does not really matter which notifer is set as the read_notifier_pollset
318 (They would both point to the same polling island anyway) */
319 /* Use release store to match with acquire load in fd_get_read_notifier */
320 gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
321}
322
323static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700324 grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write");
Craig Tillerc67cc992017-04-27 10:15:51 -0700325}
326
327/*******************************************************************************
328 * Pollset Definitions
329 */
330
Craig Tiller6de05932017-04-28 09:17:38 -0700331GPR_TLS_DECL(g_current_thread_pollset);
332GPR_TLS_DECL(g_current_thread_worker);
333static gpr_atm g_active_poller;
334static pollset_neighbourhood *g_neighbourhoods;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700335static size_t g_num_neighbourhoods;
Craig Tiller67e229e2017-05-01 20:57:59 +0000336static gpr_mu g_wq_mu;
337static grpc_closure_list g_wq_items;
Craig Tiller6de05932017-04-28 09:17:38 -0700338
Craig Tillerc67cc992017-04-27 10:15:51 -0700339/* Return true if first in list */
Craig Tiller32f90ee2017-04-28 12:46:41 -0700340static bool worker_insert(grpc_pollset *pollset, grpc_pollset_worker *worker) {
341 if (pollset->root_worker == NULL) {
342 pollset->root_worker = worker;
343 worker->next = worker->prev = worker;
Craig Tillerc67cc992017-04-27 10:15:51 -0700344 return true;
345 } else {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700346 worker->next = pollset->root_worker;
347 worker->prev = worker->next->prev;
348 worker->next->prev = worker;
349 worker->prev->next = worker;
Craig Tillerc67cc992017-04-27 10:15:51 -0700350 return false;
351 }
352}
353
354/* Return true if last in list */
355typedef enum { EMPTIED, NEW_ROOT, REMOVED } worker_remove_result;
356
Craig Tiller32f90ee2017-04-28 12:46:41 -0700357static worker_remove_result worker_remove(grpc_pollset *pollset,
Craig Tillerc67cc992017-04-27 10:15:51 -0700358 grpc_pollset_worker *worker) {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700359 if (worker == pollset->root_worker) {
360 if (worker == worker->next) {
361 pollset->root_worker = NULL;
Craig Tillerc67cc992017-04-27 10:15:51 -0700362 return EMPTIED;
363 } else {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700364 pollset->root_worker = worker->next;
365 worker->prev->next = worker->next;
366 worker->next->prev = worker->prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700367 return NEW_ROOT;
368 }
369 } else {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700370 worker->prev->next = worker->next;
371 worker->next->prev = worker->prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700372 return REMOVED;
373 }
374}
375
Craig Tillerba550da2017-05-01 14:26:31 +0000376static size_t choose_neighbourhood(void) {
377 return (size_t)gpr_cpu_current_cpu() % g_num_neighbourhoods;
378}
379
Craig Tiller4509c472017-04-27 19:05:13 +0000380static grpc_error *pollset_global_init(void) {
Craig Tiller4509c472017-04-27 19:05:13 +0000381 gpr_tls_init(&g_current_thread_pollset);
382 gpr_tls_init(&g_current_thread_worker);
Craig Tiller6de05932017-04-28 09:17:38 -0700383 gpr_atm_no_barrier_store(&g_active_poller, 0);
Craig Tiller375eb252017-04-27 23:29:12 +0000384 global_wakeup_fd.read_fd = -1;
385 grpc_error *err = grpc_wakeup_fd_init(&global_wakeup_fd);
Craig Tiller67e229e2017-05-01 20:57:59 +0000386 gpr_mu_init(&g_wq_mu);
387 g_wq_items = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
Craig Tiller375eb252017-04-27 23:29:12 +0000388 if (err != GRPC_ERROR_NONE) return err;
Craig Tiller4509c472017-04-27 19:05:13 +0000389 struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLET),
390 .data.ptr = &global_wakeup_fd};
391 if (epoll_ctl(g_epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd, &ev) != 0) {
392 return GRPC_OS_ERROR(errno, "epoll_ctl");
393 }
Craig Tillerba550da2017-05-01 14:26:31 +0000394 g_num_neighbourhoods = GPR_CLAMP(gpr_cpu_num_cores(), 1, MAX_NEIGHBOURHOODS);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700395 g_neighbourhoods =
396 gpr_zalloc(sizeof(*g_neighbourhoods) * g_num_neighbourhoods);
397 for (size_t i = 0; i < g_num_neighbourhoods; i++) {
398 gpr_mu_init(&g_neighbourhoods[i].mu);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700399 }
Craig Tiller4509c472017-04-27 19:05:13 +0000400 return GRPC_ERROR_NONE;
401}
402
403static void pollset_global_shutdown(void) {
Craig Tiller4509c472017-04-27 19:05:13 +0000404 gpr_tls_destroy(&g_current_thread_pollset);
405 gpr_tls_destroy(&g_current_thread_worker);
Craig Tiller67e229e2017-05-01 20:57:59 +0000406 gpr_mu_destroy(&g_wq_mu);
Craig Tiller375eb252017-04-27 23:29:12 +0000407 if (global_wakeup_fd.read_fd != -1) grpc_wakeup_fd_destroy(&global_wakeup_fd);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700408 for (size_t i = 0; i < g_num_neighbourhoods; i++) {
409 gpr_mu_destroy(&g_neighbourhoods[i].mu);
410 }
411 gpr_free(g_neighbourhoods);
Craig Tiller4509c472017-04-27 19:05:13 +0000412}
413
414static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
Craig Tiller6de05932017-04-28 09:17:38 -0700415 gpr_mu_init(&pollset->mu);
416 *mu = &pollset->mu;
Craig Tillerba550da2017-05-01 14:26:31 +0000417 pollset->neighbourhood = &g_neighbourhoods[choose_neighbourhood()];
Craig Tiller6de05932017-04-28 09:17:38 -0700418 pollset->seen_inactive = true;
Craig Tiller6de05932017-04-28 09:17:38 -0700419}
420
Craig Tillerc6109852017-05-01 14:26:49 -0700421static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
Craig Tillere00d7332017-05-01 15:43:51 +0000422 gpr_mu_lock(&pollset->mu);
Craig Tillerba550da2017-05-01 14:26:31 +0000423 if (!pollset->seen_inactive) {
Craig Tillere00d7332017-05-01 15:43:51 +0000424 pollset_neighbourhood *neighbourhood = pollset->neighbourhood;
425 gpr_mu_unlock(&pollset->mu);
Craig Tillera95bacf2017-05-01 12:51:24 -0700426 retry_lock_neighbourhood:
Craig Tillere00d7332017-05-01 15:43:51 +0000427 gpr_mu_lock(&neighbourhood->mu);
428 gpr_mu_lock(&pollset->mu);
429 if (!pollset->seen_inactive) {
430 if (pollset->neighbourhood != neighbourhood) {
431 gpr_mu_unlock(&neighbourhood->mu);
432 neighbourhood = pollset->neighbourhood;
433 gpr_mu_unlock(&pollset->mu);
434 goto retry_lock_neighbourhood;
435 }
436 pollset->prev->next = pollset->next;
437 pollset->next->prev = pollset->prev;
438 if (pollset == pollset->neighbourhood->active_root) {
439 pollset->neighbourhood->active_root =
440 pollset->next == pollset ? NULL : pollset->next;
441 }
Craig Tillerba550da2017-05-01 14:26:31 +0000442 }
443 gpr_mu_unlock(&pollset->neighbourhood->mu);
Craig Tiller6de05932017-04-28 09:17:38 -0700444 }
Craig Tillere00d7332017-05-01 15:43:51 +0000445 gpr_mu_unlock(&pollset->mu);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700446 gpr_mu_destroy(&pollset->mu);
Craig Tiller4509c472017-04-27 19:05:13 +0000447}
448
449static grpc_error *pollset_kick_all(grpc_pollset *pollset) {
450 grpc_error *error = GRPC_ERROR_NONE;
451 if (pollset->root_worker != NULL) {
452 grpc_pollset_worker *worker = pollset->root_worker;
453 do {
Craig Tiller55624a32017-05-26 08:14:44 -0700454 switch (worker->kick_state) {
455 case KICKED:
456 break;
457 case UNKICKED:
458 SET_KICK_STATE(worker, KICKED);
459 if (worker->initialized_cv) {
460 gpr_cv_signal(&worker->cv);
461 }
462 break;
463 case DESIGNATED_POLLER:
464 SET_KICK_STATE(worker, KICKED);
465 append_error(&error, grpc_wakeup_fd_wakeup(&global_wakeup_fd),
466 "pollset_shutdown");
467 break;
Craig Tiller4509c472017-04-27 19:05:13 +0000468 }
469
Craig Tiller32f90ee2017-04-28 12:46:41 -0700470 worker = worker->next;
Craig Tiller4509c472017-04-27 19:05:13 +0000471 } while (worker != pollset->root_worker);
472 }
473 return error;
474}
475
476static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx,
477 grpc_pollset *pollset) {
Craig Tillerba550da2017-05-01 14:26:31 +0000478 if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL &&
479 pollset->begin_refs == 0) {
Craig Tiller4509c472017-04-27 19:05:13 +0000480 grpc_closure_sched(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE);
481 pollset->shutdown_closure = NULL;
482 }
483}
484
485static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
486 grpc_closure *closure) {
487 GPR_ASSERT(pollset->shutdown_closure == NULL);
Craig Tillerc81512a2017-05-26 09:53:58 -0700488 GPR_ASSERT(!pollset->shutting_down);
Craig Tiller4509c472017-04-27 19:05:13 +0000489 pollset->shutdown_closure = closure;
Craig Tillerc81512a2017-05-26 09:53:58 -0700490 pollset->shutting_down = true;
Craig Tiller4509c472017-04-27 19:05:13 +0000491 GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(pollset));
492 pollset_maybe_finish_shutdown(exec_ctx, pollset);
493}
494
Craig Tillera95bacf2017-05-01 12:51:24 -0700495#define MAX_EPOLL_EVENTS 100
Craig Tiller4509c472017-04-27 19:05:13 +0000496
497static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
498 gpr_timespec now) {
499 gpr_timespec timeout;
500 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
501 return -1;
502 }
503
504 if (gpr_time_cmp(deadline, now) <= 0) {
505 return 0;
506 }
507
508 static const gpr_timespec round_up = {
509 .clock_type = GPR_TIMESPAN, .tv_sec = 0, .tv_nsec = GPR_NS_PER_MS - 1};
510 timeout = gpr_time_sub(deadline, now);
511 int millis = gpr_time_to_millis(gpr_time_add(timeout, round_up));
512 return millis >= 1 ? millis : 1;
513}
514
515static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
516 gpr_timespec now, gpr_timespec deadline) {
517 struct epoll_event events[MAX_EPOLL_EVENTS];
518 static const char *err_desc = "pollset_poll";
519
520 int timeout = poll_deadline_to_millis_timeout(deadline, now);
521
522 if (timeout != 0) {
523 GRPC_SCHEDULING_START_BLOCKING_REGION;
524 }
525 int r;
526 do {
527 r = epoll_wait(g_epfd, events, MAX_EPOLL_EVENTS, timeout);
528 } while (r < 0 && errno == EINTR);
529 if (timeout != 0) {
530 GRPC_SCHEDULING_END_BLOCKING_REGION;
531 }
532
533 if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
534
535 grpc_error *error = GRPC_ERROR_NONE;
536 for (int i = 0; i < r; i++) {
537 void *data_ptr = events[i].data.ptr;
538 if (data_ptr == &global_wakeup_fd) {
Craig Tiller67e229e2017-05-01 20:57:59 +0000539 gpr_mu_lock(&g_wq_mu);
540 grpc_closure_list_move(&g_wq_items, &exec_ctx->closure_list);
541 gpr_mu_unlock(&g_wq_mu);
Craig Tiller4509c472017-04-27 19:05:13 +0000542 append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
543 err_desc);
544 } else {
545 grpc_fd *fd = (grpc_fd *)(data_ptr);
546 bool cancel = (events[i].events & (EPOLLERR | EPOLLHUP)) != 0;
547 bool read_ev = (events[i].events & (EPOLLIN | EPOLLPRI)) != 0;
548 bool write_ev = (events[i].events & EPOLLOUT) != 0;
549 if (read_ev || cancel) {
550 fd_become_readable(exec_ctx, fd, pollset);
551 }
552 if (write_ev || cancel) {
553 fd_become_writable(exec_ctx, fd);
554 }
555 }
556 }
557
558 return error;
559}
560
561static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
562 grpc_pollset_worker **worker_hdl, gpr_timespec *now,
563 gpr_timespec deadline) {
Craig Tiller4509c472017-04-27 19:05:13 +0000564 if (worker_hdl != NULL) *worker_hdl = worker;
565 worker->initialized_cv = false;
Craig Tiller55624a32017-05-26 08:14:44 -0700566 SET_KICK_STATE(worker, UNKICKED);
Craig Tiller50da5ec2017-05-01 13:51:14 -0700567 worker->schedule_on_end_work = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
Craig Tillerba550da2017-05-01 14:26:31 +0000568 pollset->begin_refs++;
Craig Tiller4509c472017-04-27 19:05:13 +0000569
Craig Tiller830e82a2017-05-31 16:26:27 -0700570 if (GRPC_TRACER_ON(grpc_polling_trace)) {
571 gpr_log(GPR_ERROR, "PS:%p BEGIN_STARTS:%p", pollset, worker);
572 }
573
Craig Tiller32f90ee2017-04-28 12:46:41 -0700574 if (pollset->seen_inactive) {
575 // pollset has been observed to be inactive, we need to move back to the
576 // active list
Craig Tillere00d7332017-05-01 15:43:51 +0000577 bool is_reassigning = false;
578 if (!pollset->reassigning_neighbourhood) {
579 is_reassigning = true;
580 pollset->reassigning_neighbourhood = true;
581 pollset->neighbourhood = &g_neighbourhoods[choose_neighbourhood()];
582 }
583 pollset_neighbourhood *neighbourhood = pollset->neighbourhood;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700584 gpr_mu_unlock(&pollset->mu);
Craig Tillerba550da2017-05-01 14:26:31 +0000585 // pollset unlocked: state may change (even worker->kick_state)
586 retry_lock_neighbourhood:
Craig Tiller32f90ee2017-04-28 12:46:41 -0700587 gpr_mu_lock(&neighbourhood->mu);
588 gpr_mu_lock(&pollset->mu);
Craig Tiller830e82a2017-05-31 16:26:27 -0700589 if (GRPC_TRACER_ON(grpc_polling_trace)) {
590 gpr_log(GPR_ERROR, "PS:%p BEGIN_REORG:%p kick_state=%s is_reassigning=%d",
591 pollset, worker, kick_state_string(worker->kick_state),
592 is_reassigning);
593 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700594 if (pollset->seen_inactive) {
Craig Tiller2acab6e2017-04-30 23:06:33 +0000595 if (neighbourhood != pollset->neighbourhood) {
596 gpr_mu_unlock(&neighbourhood->mu);
597 neighbourhood = pollset->neighbourhood;
598 gpr_mu_unlock(&pollset->mu);
599 goto retry_lock_neighbourhood;
600 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700601 pollset->seen_inactive = false;
Craig Tiller2acab6e2017-04-30 23:06:33 +0000602 if (neighbourhood->active_root == NULL) {
603 neighbourhood->active_root = pollset->next = pollset->prev = pollset;
Craig Tiller55624a32017-05-26 08:14:44 -0700604 if (worker->kick_state == UNKICKED &&
605 gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)worker)) {
606 SET_KICK_STATE(worker, DESIGNATED_POLLER);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700607 }
Craig Tiller2acab6e2017-04-30 23:06:33 +0000608 } else {
609 pollset->next = neighbourhood->active_root;
610 pollset->prev = pollset->next->prev;
611 pollset->next->prev = pollset->prev->next = pollset;
Craig Tiller4509c472017-04-27 19:05:13 +0000612 }
613 }
Craig Tillere00d7332017-05-01 15:43:51 +0000614 if (is_reassigning) {
615 GPR_ASSERT(pollset->reassigning_neighbourhood);
616 pollset->reassigning_neighbourhood = false;
617 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700618 gpr_mu_unlock(&neighbourhood->mu);
619 }
620 worker_insert(pollset, worker);
Craig Tillerba550da2017-05-01 14:26:31 +0000621 pollset->begin_refs--;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700622 if (worker->kick_state == UNKICKED) {
Craig Tillera4b8eb02017-04-29 00:13:52 +0000623 GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700624 worker->initialized_cv = true;
625 gpr_cv_init(&worker->cv);
Craig Tillerc81512a2017-05-26 09:53:58 -0700626 while (worker->kick_state == UNKICKED && !pollset->shutting_down) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700627 if (GRPC_TRACER_ON(grpc_polling_trace)) {
628 gpr_log(GPR_ERROR, "PS:%p BEGIN_WAIT:%p kick_state=%s shutdown=%d",
629 pollset, worker, kick_state_string(worker->kick_state),
630 pollset->shutting_down);
631 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700632 if (gpr_cv_wait(&worker->cv, &pollset->mu, deadline) &&
633 worker->kick_state == UNKICKED) {
Craig Tiller55624a32017-05-26 08:14:44 -0700634 SET_KICK_STATE(worker, KICKED);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700635 }
Craig Tillerba550da2017-05-01 14:26:31 +0000636 }
Craig Tiller4509c472017-04-27 19:05:13 +0000637 *now = gpr_now(now->clock_type);
638 }
Craig Tiller830e82a2017-05-31 16:26:27 -0700639 if (GRPC_TRACER_ON(grpc_polling_trace)) {
640 gpr_log(GPR_ERROR, "PS:%p BEGIN_DONE:%p kick_state=%s shutdown=%d", pollset,
641 worker, kick_state_string(worker->kick_state),
642 pollset->shutting_down);
643 }
Craig Tiller4509c472017-04-27 19:05:13 +0000644
Craig Tillerc81512a2017-05-26 09:53:58 -0700645 return worker->kick_state == DESIGNATED_POLLER && !pollset->shutting_down;
Craig Tiller4509c472017-04-27 19:05:13 +0000646}
647
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700648static bool check_neighbourhood_for_available_poller(
Craig Tillera4b8eb02017-04-29 00:13:52 +0000649 pollset_neighbourhood *neighbourhood) {
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700650 bool found_worker = false;
651 do {
652 grpc_pollset *inspect = neighbourhood->active_root;
653 if (inspect == NULL) {
654 break;
655 }
656 gpr_mu_lock(&inspect->mu);
657 GPR_ASSERT(!inspect->seen_inactive);
658 grpc_pollset_worker *inspect_worker = inspect->root_worker;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700659 if (inspect_worker != NULL) {
Craig Tillera4b8eb02017-04-29 00:13:52 +0000660 do {
Craig Tillerba550da2017-05-01 14:26:31 +0000661 switch (inspect_worker->kick_state) {
662 case UNKICKED:
663 if (gpr_atm_no_barrier_cas(&g_active_poller, 0,
664 (gpr_atm)inspect_worker)) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700665 if (GRPC_TRACER_ON(grpc_polling_trace)) {
666 gpr_log(GPR_DEBUG, " .. choose next poller to be %p",
667 inspect_worker);
668 }
Craig Tiller55624a32017-05-26 08:14:44 -0700669 SET_KICK_STATE(inspect_worker, DESIGNATED_POLLER);
Craig Tillerba550da2017-05-01 14:26:31 +0000670 if (inspect_worker->initialized_cv) {
671 gpr_cv_signal(&inspect_worker->cv);
672 }
Craig Tiller830e82a2017-05-31 16:26:27 -0700673 } else {
674 if (GRPC_TRACER_ON(grpc_polling_trace)) {
675 gpr_log(GPR_DEBUG, " .. beaten to choose next poller");
676 }
Craig Tillera4b8eb02017-04-29 00:13:52 +0000677 }
Craig Tillerba550da2017-05-01 14:26:31 +0000678 // even if we didn't win the cas, there's a worker, we can stop
679 found_worker = true;
680 break;
681 case KICKED:
682 break;
683 case DESIGNATED_POLLER:
684 found_worker = true; // ok, so someone else found the worker, but
685 // we'll accept that
686 break;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700687 }
Craig Tillera4b8eb02017-04-29 00:13:52 +0000688 inspect_worker = inspect_worker->next;
Craig Tiller830e82a2017-05-31 16:26:27 -0700689 } while (!found_worker && inspect_worker != inspect->root_worker);
Craig Tillera4b8eb02017-04-29 00:13:52 +0000690 }
691 if (!found_worker) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700692 if (GRPC_TRACER_ON(grpc_polling_trace)) {
693 gpr_log(GPR_DEBUG, " .. mark pollset %p inactive", inspect);
694 }
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700695 inspect->seen_inactive = true;
Craig Tiller2acab6e2017-04-30 23:06:33 +0000696 if (inspect == neighbourhood->active_root) {
Craig Tillera95bacf2017-05-01 12:51:24 -0700697 neighbourhood->active_root =
698 inspect->next == inspect ? NULL : inspect->next;
Craig Tiller2acab6e2017-04-30 23:06:33 +0000699 }
700 inspect->next->prev = inspect->prev;
701 inspect->prev->next = inspect->next;
Craig Tillere00d7332017-05-01 15:43:51 +0000702 inspect->next = inspect->prev = NULL;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700703 }
704 gpr_mu_unlock(&inspect->mu);
705 } while (!found_worker);
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700706 return found_worker;
707}
708
Craig Tiller4509c472017-04-27 19:05:13 +0000709static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
710 grpc_pollset_worker *worker,
711 grpc_pollset_worker **worker_hdl) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700712 if (GRPC_TRACER_ON(grpc_polling_trace)) {
713 gpr_log(GPR_DEBUG, "PS:%p END_WORKER:%p", pollset, worker);
714 }
Craig Tiller8502ecb2017-04-28 14:22:01 -0700715 if (worker_hdl != NULL) *worker_hdl = NULL;
Craig Tiller830e82a2017-05-31 16:26:27 -0700716 /* Make sure we appear kicked */
Craig Tiller55624a32017-05-26 08:14:44 -0700717 SET_KICK_STATE(worker, KICKED);
Craig Tiller50da5ec2017-05-01 13:51:14 -0700718 grpc_closure_list_move(&worker->schedule_on_end_work,
719 &exec_ctx->closure_list);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700720 if (gpr_atm_no_barrier_load(&g_active_poller) == (gpr_atm)worker) {
Craig Tillera4b8eb02017-04-29 00:13:52 +0000721 if (worker->next != worker && worker->next->kick_state == UNKICKED) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700722 if (GRPC_TRACER_ON(grpc_polling_trace)) {
723 gpr_log(GPR_DEBUG, " .. choose next poller to be peer %p", worker);
724 }
Craig Tiller2acab6e2017-04-30 23:06:33 +0000725 GPR_ASSERT(worker->next->initialized_cv);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700726 gpr_atm_no_barrier_store(&g_active_poller, (gpr_atm)worker->next);
Craig Tiller55624a32017-05-26 08:14:44 -0700727 SET_KICK_STATE(worker->next, DESIGNATED_POLLER);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700728 gpr_cv_signal(&worker->next->cv);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700729 if (grpc_exec_ctx_has_work(exec_ctx)) {
730 gpr_mu_unlock(&pollset->mu);
731 grpc_exec_ctx_flush(exec_ctx);
732 gpr_mu_lock(&pollset->mu);
733 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700734 } else {
735 gpr_atm_no_barrier_store(&g_active_poller, 0);
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700736 size_t poller_neighbourhood_idx =
737 (size_t)(pollset->neighbourhood - g_neighbourhoods);
Craig Tillerbb742672017-05-17 22:19:05 +0000738 gpr_mu_unlock(&pollset->mu);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700739 bool found_worker = false;
Craig Tillerba550da2017-05-01 14:26:31 +0000740 bool scan_state[MAX_NEIGHBOURHOODS];
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700741 for (size_t i = 0; !found_worker && i < g_num_neighbourhoods; i++) {
742 pollset_neighbourhood *neighbourhood =
743 &g_neighbourhoods[(poller_neighbourhood_idx + i) %
744 g_num_neighbourhoods];
745 if (gpr_mu_trylock(&neighbourhood->mu)) {
746 found_worker =
Craig Tillera4b8eb02017-04-29 00:13:52 +0000747 check_neighbourhood_for_available_poller(neighbourhood);
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700748 gpr_mu_unlock(&neighbourhood->mu);
Craig Tillerba550da2017-05-01 14:26:31 +0000749 scan_state[i] = true;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700750 } else {
Craig Tillerba550da2017-05-01 14:26:31 +0000751 scan_state[i] = false;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700752 }
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700753 }
Craig Tiller2acab6e2017-04-30 23:06:33 +0000754 for (size_t i = 0; !found_worker && i < g_num_neighbourhoods; i++) {
Craig Tillerba550da2017-05-01 14:26:31 +0000755 if (scan_state[i]) continue;
Craig Tiller2acab6e2017-04-30 23:06:33 +0000756 pollset_neighbourhood *neighbourhood =
757 &g_neighbourhoods[(poller_neighbourhood_idx + i) %
758 g_num_neighbourhoods];
759 gpr_mu_lock(&neighbourhood->mu);
Craig Tillerba550da2017-05-01 14:26:31 +0000760 found_worker = check_neighbourhood_for_available_poller(neighbourhood);
Craig Tiller2acab6e2017-04-30 23:06:33 +0000761 gpr_mu_unlock(&neighbourhood->mu);
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700762 }
Craig Tiller8502ecb2017-04-28 14:22:01 -0700763 grpc_exec_ctx_flush(exec_ctx);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700764 gpr_mu_lock(&pollset->mu);
765 }
Craig Tiller50da5ec2017-05-01 13:51:14 -0700766 } else if (grpc_exec_ctx_has_work(exec_ctx)) {
767 gpr_mu_unlock(&pollset->mu);
768 grpc_exec_ctx_flush(exec_ctx);
769 gpr_mu_lock(&pollset->mu);
Craig Tiller4509c472017-04-27 19:05:13 +0000770 }
771 if (worker->initialized_cv) {
772 gpr_cv_destroy(&worker->cv);
773 }
Craig Tiller830e82a2017-05-31 16:26:27 -0700774 if (GRPC_TRACER_ON(grpc_polling_trace)) {
775 gpr_log(GPR_DEBUG, " .. remove worker");
776 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700777 if (EMPTIED == worker_remove(pollset, worker)) {
Craig Tiller4509c472017-04-27 19:05:13 +0000778 pollset_maybe_finish_shutdown(exec_ctx, pollset);
779 }
Craig Tillera4b8eb02017-04-29 00:13:52 +0000780 GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
Craig Tiller4509c472017-04-27 19:05:13 +0000781}
782
783/* pollset->po.mu lock must be held by the caller before calling this.
784 The function pollset_work() may temporarily release the lock (pollset->po.mu)
785 during the course of its execution but it will always re-acquire the lock and
786 ensure that it is held by the time the function returns */
787static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
788 grpc_pollset_worker **worker_hdl,
789 gpr_timespec now, gpr_timespec deadline) {
790 grpc_pollset_worker worker;
791 grpc_error *error = GRPC_ERROR_NONE;
792 static const char *err_desc = "pollset_work";
793 if (pollset->kicked_without_poller) {
794 pollset->kicked_without_poller = false;
795 return GRPC_ERROR_NONE;
796 }
797 if (begin_worker(pollset, &worker, worker_hdl, &now, deadline)) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700798 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
Craig Tiller4509c472017-04-27 19:05:13 +0000799 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
Craig Tillerc81512a2017-05-26 09:53:58 -0700800 GPR_ASSERT(!pollset->shutting_down);
Craig Tiller2acab6e2017-04-30 23:06:33 +0000801 GPR_ASSERT(!pollset->seen_inactive);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700802 gpr_mu_unlock(&pollset->mu);
Craig Tiller4509c472017-04-27 19:05:13 +0000803 append_error(&error, pollset_epoll(exec_ctx, pollset, now, deadline),
804 err_desc);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700805 gpr_mu_lock(&pollset->mu);
Craig Tiller4509c472017-04-27 19:05:13 +0000806 gpr_tls_set(&g_current_thread_worker, 0);
Craig Tiller830e82a2017-05-31 16:26:27 -0700807 } else {
808 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
Craig Tiller4509c472017-04-27 19:05:13 +0000809 }
810 end_worker(exec_ctx, pollset, &worker, worker_hdl);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700811 gpr_tls_set(&g_current_thread_pollset, 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000812 return error;
813}
814
815static grpc_error *pollset_kick(grpc_pollset *pollset,
816 grpc_pollset_worker *specific_worker) {
Craig Tillerb89bac02017-05-26 15:20:32 +0000817 if (GRPC_TRACER_ON(grpc_polling_trace)) {
818 gpr_strvec log;
819 gpr_strvec_init(&log);
820 char *tmp;
Craig Tiller75aef7f2017-05-26 08:26:08 -0700821 gpr_asprintf(
822 &tmp, "PS:%p KICK:%p curps=%p curworker=%p root=%p", pollset,
823 specific_worker, (void *)gpr_tls_get(&g_current_thread_pollset),
824 (void *)gpr_tls_get(&g_current_thread_worker), pollset->root_worker);
Craig Tillerb89bac02017-05-26 15:20:32 +0000825 gpr_strvec_add(&log, tmp);
826 if (pollset->root_worker != NULL) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700827 gpr_asprintf(&tmp, " {kick_state=%s next=%p {kick_state=%s}}",
828 kick_state_string(pollset->root_worker->kick_state),
829 pollset->root_worker->next,
830 kick_state_string(pollset->root_worker->next->kick_state));
Craig Tillerb89bac02017-05-26 15:20:32 +0000831 gpr_strvec_add(&log, tmp);
832 }
833 if (specific_worker != NULL) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700834 gpr_asprintf(&tmp, " worker_kick_state=%s",
835 kick_state_string(specific_worker->kick_state));
Craig Tillerb89bac02017-05-26 15:20:32 +0000836 gpr_strvec_add(&log, tmp);
837 }
838 tmp = gpr_strvec_flatten(&log, NULL);
839 gpr_strvec_destroy(&log);
Craig Tiller830e82a2017-05-31 16:26:27 -0700840 gpr_log(GPR_ERROR, "%s", tmp);
Craig Tillerb89bac02017-05-26 15:20:32 +0000841 gpr_free(tmp);
842 }
Craig Tiller4509c472017-04-27 19:05:13 +0000843 if (specific_worker == NULL) {
844 if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) {
Craig Tiller375eb252017-04-27 23:29:12 +0000845 grpc_pollset_worker *root_worker = pollset->root_worker;
846 if (root_worker == NULL) {
Craig Tiller4509c472017-04-27 19:05:13 +0000847 pollset->kicked_without_poller = true;
Craig Tiller75aef7f2017-05-26 08:26:08 -0700848 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700849 gpr_log(GPR_ERROR, " .. kicked_without_poller");
Craig Tiller75aef7f2017-05-26 08:26:08 -0700850 }
Craig Tiller4509c472017-04-27 19:05:13 +0000851 return GRPC_ERROR_NONE;
Craig Tiller375eb252017-04-27 23:29:12 +0000852 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700853 grpc_pollset_worker *next_worker = root_worker->next;
Craig Tiller830e82a2017-05-31 16:26:27 -0700854 if (root_worker->kick_state == KICKED) {
Craig Tiller75aef7f2017-05-26 08:26:08 -0700855 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700856 gpr_log(GPR_ERROR, " .. already kicked %p", root_worker);
857 }
858 SET_KICK_STATE(root_worker, KICKED);
859 return GRPC_ERROR_NONE;
860 } else if (next_worker->kick_state == KICKED) {
861 if (GRPC_TRACER_ON(grpc_polling_trace)) {
862 gpr_log(GPR_ERROR, " .. already kicked %p", next_worker);
863 }
864 SET_KICK_STATE(next_worker, KICKED);
865 return GRPC_ERROR_NONE;
866 } else if (root_worker ==
867 next_worker && // only try and wake up a poller if
868 // there is no next worker
869 root_worker == (grpc_pollset_worker *)gpr_atm_no_barrier_load(
870 &g_active_poller)) {
871 if (GRPC_TRACER_ON(grpc_polling_trace)) {
872 gpr_log(GPR_ERROR, " .. kicked %p", root_worker);
Craig Tiller75aef7f2017-05-26 08:26:08 -0700873 }
Craig Tiller55624a32017-05-26 08:14:44 -0700874 SET_KICK_STATE(root_worker, KICKED);
Craig Tiller4509c472017-04-27 19:05:13 +0000875 return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700876 } else if (next_worker->kick_state == UNKICKED) {
Craig Tiller75aef7f2017-05-26 08:26:08 -0700877 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700878 gpr_log(GPR_ERROR, " .. kicked %p", next_worker);
Craig Tiller75aef7f2017-05-26 08:26:08 -0700879 }
Craig Tiller8502ecb2017-04-28 14:22:01 -0700880 GPR_ASSERT(next_worker->initialized_cv);
Craig Tiller55624a32017-05-26 08:14:44 -0700881 SET_KICK_STATE(next_worker, KICKED);
Craig Tiller375eb252017-04-27 23:29:12 +0000882 gpr_cv_signal(&next_worker->cv);
883 return GRPC_ERROR_NONE;
Craig Tiller55624a32017-05-26 08:14:44 -0700884 } else if (next_worker->kick_state == DESIGNATED_POLLER) {
885 if (root_worker->kick_state != DESIGNATED_POLLER) {
Craig Tiller75aef7f2017-05-26 08:26:08 -0700886 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700887 gpr_log(
888 GPR_ERROR,
889 " .. kicked root non-poller %p (initialized_cv=%d) (poller=%p)",
890 root_worker, root_worker->initialized_cv, next_worker);
Craig Tiller75aef7f2017-05-26 08:26:08 -0700891 }
Craig Tiller55624a32017-05-26 08:14:44 -0700892 SET_KICK_STATE(root_worker, KICKED);
893 if (root_worker->initialized_cv) {
894 gpr_cv_signal(&root_worker->cv);
895 }
896 return GRPC_ERROR_NONE;
897 } else {
Craig Tiller75aef7f2017-05-26 08:26:08 -0700898 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700899 gpr_log(GPR_ERROR, " .. non-root poller %p (root=%p)", next_worker,
Craig Tiller75aef7f2017-05-26 08:26:08 -0700900 root_worker);
901 }
Craig Tiller55624a32017-05-26 08:14:44 -0700902 SET_KICK_STATE(next_worker, KICKED);
903 return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
904 }
Craig Tiller8502ecb2017-04-28 14:22:01 -0700905 } else {
Craig Tiller55624a32017-05-26 08:14:44 -0700906 GPR_ASSERT(next_worker->kick_state == KICKED);
907 SET_KICK_STATE(next_worker, KICKED);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700908 return GRPC_ERROR_NONE;
Craig Tiller4509c472017-04-27 19:05:13 +0000909 }
910 } else {
Craig Tiller830e82a2017-05-31 16:26:27 -0700911 if (GRPC_TRACER_ON(grpc_polling_trace)) {
912 gpr_log(GPR_ERROR, " .. kicked while waking up");
913 }
Craig Tiller4509c472017-04-27 19:05:13 +0000914 return GRPC_ERROR_NONE;
915 }
Craig Tiller43bf2592017-04-28 23:21:01 +0000916 } else if (specific_worker->kick_state == KICKED) {
Craig Tiller75aef7f2017-05-26 08:26:08 -0700917 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700918 gpr_log(GPR_ERROR, " .. specific worker already kicked");
Craig Tiller75aef7f2017-05-26 08:26:08 -0700919 }
Craig Tiller4509c472017-04-27 19:05:13 +0000920 return GRPC_ERROR_NONE;
921 } else if (gpr_tls_get(&g_current_thread_worker) ==
922 (intptr_t)specific_worker) {
Craig Tiller75aef7f2017-05-26 08:26:08 -0700923 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700924 gpr_log(GPR_ERROR, " .. mark %p kicked", specific_worker);
Craig Tiller75aef7f2017-05-26 08:26:08 -0700925 }
Craig Tiller55624a32017-05-26 08:14:44 -0700926 SET_KICK_STATE(specific_worker, KICKED);
Craig Tiller4509c472017-04-27 19:05:13 +0000927 return GRPC_ERROR_NONE;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700928 } else if (specific_worker ==
929 (grpc_pollset_worker *)gpr_atm_no_barrier_load(&g_active_poller)) {
Craig Tiller75aef7f2017-05-26 08:26:08 -0700930 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700931 gpr_log(GPR_ERROR, " .. kick active poller");
Craig Tiller75aef7f2017-05-26 08:26:08 -0700932 }
Craig Tiller55624a32017-05-26 08:14:44 -0700933 SET_KICK_STATE(specific_worker, KICKED);
Craig Tiller4509c472017-04-27 19:05:13 +0000934 return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700935 } else if (specific_worker->initialized_cv) {
Craig Tiller75aef7f2017-05-26 08:26:08 -0700936 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700937 gpr_log(GPR_ERROR, " .. kick waiting worker");
Craig Tiller75aef7f2017-05-26 08:26:08 -0700938 }
Craig Tiller55624a32017-05-26 08:14:44 -0700939 SET_KICK_STATE(specific_worker, KICKED);
Craig Tiller4509c472017-04-27 19:05:13 +0000940 gpr_cv_signal(&specific_worker->cv);
941 return GRPC_ERROR_NONE;
Craig Tiller8502ecb2017-04-28 14:22:01 -0700942 } else {
Craig Tiller75aef7f2017-05-26 08:26:08 -0700943 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700944 gpr_log(GPR_ERROR, " .. kick non-waiting worker");
Craig Tiller75aef7f2017-05-26 08:26:08 -0700945 }
Craig Tiller55624a32017-05-26 08:14:44 -0700946 SET_KICK_STATE(specific_worker, KICKED);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700947 return GRPC_ERROR_NONE;
Craig Tiller4509c472017-04-27 19:05:13 +0000948 }
949}
950
951static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
952 grpc_fd *fd) {}
953
Craig Tiller4509c472017-04-27 19:05:13 +0000954/*******************************************************************************
955 * Workqueue Definitions
956 */
957
958#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
959static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
960 const char *file, int line,
961 const char *reason) {
962 return workqueue;
963}
964
965static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
966 const char *file, int line, const char *reason) {}
967#else
968static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
969 return workqueue;
970}
971
972static void workqueue_unref(grpc_exec_ctx *exec_ctx,
973 grpc_workqueue *workqueue) {}
974#endif
975
Craig Tiller50da5ec2017-05-01 13:51:14 -0700976static void wq_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
977 grpc_error *error) {
978 // find a neighbourhood to wakeup
979 bool scheduled = false;
980 size_t initial_neighbourhood = choose_neighbourhood();
981 for (size_t i = 0; !scheduled && i < g_num_neighbourhoods; i++) {
982 pollset_neighbourhood *neighbourhood =
983 &g_neighbourhoods[(initial_neighbourhood + i) % g_num_neighbourhoods];
984 if (gpr_mu_trylock(&neighbourhood->mu)) {
985 if (neighbourhood->active_root != NULL) {
986 grpc_pollset *inspect = neighbourhood->active_root;
987 do {
988 if (gpr_mu_trylock(&inspect->mu)) {
989 if (inspect->root_worker != NULL) {
990 grpc_pollset_worker *inspect_worker = inspect->root_worker;
991 do {
992 if (inspect_worker->kick_state == UNKICKED) {
Craig Tiller55624a32017-05-26 08:14:44 -0700993 SET_KICK_STATE(inspect_worker, KICKED);
Craig Tiller50da5ec2017-05-01 13:51:14 -0700994 grpc_closure_list_append(
995 &inspect_worker->schedule_on_end_work, closure, error);
996 if (inspect_worker->initialized_cv) {
997 gpr_cv_signal(&inspect_worker->cv);
998 }
999 scheduled = true;
1000 }
1001 inspect_worker = inspect_worker->next;
1002 } while (!scheduled && inspect_worker != inspect->root_worker);
1003 }
1004 gpr_mu_unlock(&inspect->mu);
1005 }
1006 inspect = inspect->next;
1007 } while (!scheduled && inspect != neighbourhood->active_root);
1008 }
1009 gpr_mu_unlock(&neighbourhood->mu);
1010 }
1011 }
1012 if (!scheduled) {
Craig Tiller67e229e2017-05-01 20:57:59 +00001013 gpr_mu_lock(&g_wq_mu);
1014 grpc_closure_list_append(&g_wq_items, closure, error);
1015 gpr_mu_unlock(&g_wq_mu);
Craig Tiller50da5ec2017-05-01 13:51:14 -07001016 GRPC_LOG_IF_ERROR("workqueue_scheduler",
1017 grpc_wakeup_fd_wakeup(&global_wakeup_fd));
1018 }
1019}
1020
1021static const grpc_closure_scheduler_vtable
1022 singleton_workqueue_scheduler_vtable = {wq_sched, wq_sched,
1023 "epoll1_workqueue"};
1024
1025static grpc_closure_scheduler singleton_workqueue_scheduler = {
1026 &singleton_workqueue_scheduler_vtable};
1027
Craig Tiller4509c472017-04-27 19:05:13 +00001028static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) {
Craig Tiller50da5ec2017-05-01 13:51:14 -07001029 return &singleton_workqueue_scheduler;
Craig Tiller4509c472017-04-27 19:05:13 +00001030}
Craig Tillerc67cc992017-04-27 10:15:51 -07001031
1032/*******************************************************************************
1033 * Pollset-set Definitions
1034 */
1035
1036static grpc_pollset_set *pollset_set_create(void) {
1037 return (grpc_pollset_set *)((intptr_t)0xdeafbeef);
1038}
1039
1040static void pollset_set_destroy(grpc_exec_ctx *exec_ctx,
1041 grpc_pollset_set *pss) {}
1042
1043static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
1044 grpc_fd *fd) {}
1045
1046static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
1047 grpc_fd *fd) {}
1048
1049static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
1050 grpc_pollset_set *pss, grpc_pollset *ps) {}
1051
1052static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
1053 grpc_pollset_set *pss, grpc_pollset *ps) {}
1054
1055static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
1056 grpc_pollset_set *bag,
1057 grpc_pollset_set *item) {}
1058
1059static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
1060 grpc_pollset_set *bag,
1061 grpc_pollset_set *item) {}
1062
1063/*******************************************************************************
1064 * Event engine binding
1065 */
1066
1067static void shutdown_engine(void) {
1068 fd_global_shutdown();
1069 pollset_global_shutdown();
Craig Tillerc67cc992017-04-27 10:15:51 -07001070}
1071
1072static const grpc_event_engine_vtable vtable = {
1073 .pollset_size = sizeof(grpc_pollset),
1074
1075 .fd_create = fd_create,
1076 .fd_wrapped_fd = fd_wrapped_fd,
1077 .fd_orphan = fd_orphan,
1078 .fd_shutdown = fd_shutdown,
1079 .fd_is_shutdown = fd_is_shutdown,
1080 .fd_notify_on_read = fd_notify_on_read,
1081 .fd_notify_on_write = fd_notify_on_write,
1082 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
1083 .fd_get_workqueue = fd_get_workqueue,
1084
1085 .pollset_init = pollset_init,
1086 .pollset_shutdown = pollset_shutdown,
1087 .pollset_destroy = pollset_destroy,
1088 .pollset_work = pollset_work,
1089 .pollset_kick = pollset_kick,
1090 .pollset_add_fd = pollset_add_fd,
1091
1092 .pollset_set_create = pollset_set_create,
1093 .pollset_set_destroy = pollset_set_destroy,
1094 .pollset_set_add_pollset = pollset_set_add_pollset,
1095 .pollset_set_del_pollset = pollset_set_del_pollset,
1096 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
1097 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
1098 .pollset_set_add_fd = pollset_set_add_fd,
1099 .pollset_set_del_fd = pollset_set_del_fd,
1100
Craig Tillerc67cc992017-04-27 10:15:51 -07001101 .workqueue_ref = workqueue_ref,
1102 .workqueue_unref = workqueue_unref,
1103 .workqueue_scheduler = workqueue_scheduler,
1104
1105 .shutdown_engine = shutdown_engine,
1106};
1107
1108/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
1109 * Create a dummy epoll_fd to make sure epoll support is available */
Craig Tiller6f0af492017-04-27 19:26:16 +00001110const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) {
Craig Tillerc67cc992017-04-27 10:15:51 -07001111 if (!grpc_has_wakeup_fd()) {
1112 return NULL;
1113 }
1114
Craig Tiller4509c472017-04-27 19:05:13 +00001115 g_epfd = epoll_create1(EPOLL_CLOEXEC);
1116 if (g_epfd < 0) {
1117 gpr_log(GPR_ERROR, "epoll unavailable");
Craig Tillerc67cc992017-04-27 10:15:51 -07001118 return NULL;
1119 }
1120
Craig Tillerc67cc992017-04-27 10:15:51 -07001121 fd_global_init();
1122
1123 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
Craig Tiller4509c472017-04-27 19:05:13 +00001124 close(g_epfd);
1125 fd_global_shutdown();
Craig Tillerc67cc992017-04-27 10:15:51 -07001126 return NULL;
1127 }
1128
Craig Tiller830e82a2017-05-31 16:26:27 -07001129 gpr_log(GPR_ERROR, "grpc epoll fd: %d", g_epfd);
1130
Craig Tillerc67cc992017-04-27 10:15:51 -07001131 return &vtable;
1132}
1133
1134#else /* defined(GRPC_LINUX_EPOLL) */
1135#if defined(GRPC_POSIX_SOCKET)
1136#include "src/core/lib/iomgr/ev_posix.h"
1137/* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
1138 * NULL */
Craig Tiller9ddb3152017-04-27 21:32:56 +00001139const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) {
1140 return NULL;
1141}
Craig Tillerc67cc992017-04-27 10:15:51 -07001142#endif /* defined(GRPC_POSIX_SOCKET) */
1143#endif /* !defined(GRPC_LINUX_EPOLL) */