blob: b1127d38cb645c3443427dba3e184faa55463d0a [file] [log] [blame]
Craig Tillerc67cc992017-04-27 10:15:51 -07001/*
2 *
Craig Tillerd4838a92017-04-27 12:08:18 -07003 * Copyright 2017, Google Inc.
Craig Tillerc67cc992017-04-27 10:15:51 -07004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/lib/iomgr/port.h"
35
36/* This polling engine is only relevant on linux kernels supporting epoll() */
37#ifdef GRPC_LINUX_EPOLL
38
Craig Tiller4509c472017-04-27 19:05:13 +000039#include "src/core/lib/iomgr/ev_epoll1_linux.h"
Craig Tillerc67cc992017-04-27 10:15:51 -070040
41#include <assert.h>
42#include <errno.h>
43#include <poll.h>
44#include <pthread.h>
45#include <string.h>
46#include <sys/epoll.h>
47#include <sys/socket.h>
48#include <unistd.h>
49
50#include <grpc/support/alloc.h>
Craig Tiller6de05932017-04-28 09:17:38 -070051#include <grpc/support/cpu.h>
Craig Tillerc67cc992017-04-27 10:15:51 -070052#include <grpc/support/log.h>
53#include <grpc/support/string_util.h>
54#include <grpc/support/tls.h>
55#include <grpc/support/useful.h>
56
57#include "src/core/lib/iomgr/ev_posix.h"
58#include "src/core/lib/iomgr/iomgr_internal.h"
59#include "src/core/lib/iomgr/lockfree_event.h"
60#include "src/core/lib/iomgr/timer.h"
61#include "src/core/lib/iomgr/wakeup_fd_posix.h"
62#include "src/core/lib/iomgr/workqueue.h"
63#include "src/core/lib/profiling/timers.h"
64#include "src/core/lib/support/block_annotate.h"
65
66/* TODO: sreek: Right now, this wakes up all pollers. In future we should make
67 * sure to wake up one polling thread (which can wake up other threads if
68 * needed) */
69static grpc_wakeup_fd global_wakeup_fd;
70static int g_epfd;
Craig Tiller32f90ee2017-04-28 12:46:41 -070071static gpr_atm g_timer_kick;
Craig Tillerc67cc992017-04-27 10:15:51 -070072
73/*******************************************************************************
74 * Fd Declarations
75 */
76
77struct grpc_fd {
78 int fd;
79
Craig Tillerc67cc992017-04-27 10:15:51 -070080 gpr_atm read_closure;
81 gpr_atm write_closure;
82
83 struct grpc_fd *freelist_next;
Craig Tillerc67cc992017-04-27 10:15:51 -070084
85 /* The pollset that last noticed that the fd is readable. The actual type
86 * stored in this is (grpc_pollset *) */
87 gpr_atm read_notifier_pollset;
88
89 grpc_iomgr_object iomgr_object;
90};
91
92static void fd_global_init(void);
93static void fd_global_shutdown(void);
94
95/*******************************************************************************
96 * Pollset Declarations
97 */
98
Craig Tiller32f90ee2017-04-28 12:46:41 -070099typedef enum { UNKICKED, KICKED, KICKED_FOR_POLL } kick_state;
Craig Tillerc67cc992017-04-27 10:15:51 -0700100
101struct grpc_pollset_worker {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700102 kick_state kick_state;
Craig Tillerc67cc992017-04-27 10:15:51 -0700103 bool initialized_cv;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700104 grpc_pollset_worker *next;
105 grpc_pollset_worker *prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700106 gpr_cv cv;
107};
108
Craig Tiller6de05932017-04-28 09:17:38 -0700109typedef struct pollset_neighbourhood {
110 gpr_mu mu;
111 grpc_pollset *active_root;
112 grpc_pollset *inactive_root;
113 bool seen_inactive;
114 char pad[GPR_CACHELINE_SIZE];
115} pollset_neighbourhood;
116
Craig Tillerc67cc992017-04-27 10:15:51 -0700117struct grpc_pollset {
Craig Tiller6de05932017-04-28 09:17:38 -0700118 gpr_mu mu;
119 pollset_neighbourhood *neighbourhood;
Craig Tiller4509c472017-04-27 19:05:13 +0000120 grpc_pollset_worker *root_worker;
121 bool kicked_without_poller;
Craig Tiller6de05932017-04-28 09:17:38 -0700122 bool seen_inactive;
Craig Tillerc67cc992017-04-27 10:15:51 -0700123 bool shutting_down; /* Is the pollset shutting down ? */
124 bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
Craig Tiller4509c472017-04-27 19:05:13 +0000125 grpc_closure *shutdown_closure; /* Called after after shutdown is complete */
Craig Tiller6de05932017-04-28 09:17:38 -0700126
127 grpc_pollset *next;
128 grpc_pollset *prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700129};
130
131/*******************************************************************************
132 * Pollset-set Declarations
133 */
Craig Tiller6de05932017-04-28 09:17:38 -0700134
Craig Tillerc67cc992017-04-27 10:15:51 -0700135struct grpc_pollset_set {};
136
137/*******************************************************************************
138 * Common helpers
139 */
140
141static bool append_error(grpc_error **composite, grpc_error *error,
142 const char *desc) {
143 if (error == GRPC_ERROR_NONE) return true;
144 if (*composite == GRPC_ERROR_NONE) {
145 *composite = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc);
146 }
147 *composite = grpc_error_add_child(*composite, error);
148 return false;
149}
150
151/*******************************************************************************
152 * Fd Definitions
153 */
154
155/* We need to keep a freelist not because of any concerns of malloc performance
156 * but instead so that implementations with multiple threads in (for example)
157 * epoll_wait deal with the race between pollset removal and incoming poll
158 * notifications.
159 *
160 * The problem is that the poller ultimately holds a reference to this
161 * object, so it is very difficult to know when is safe to free it, at least
162 * without some expensive synchronization.
163 *
164 * If we keep the object freelisted, in the worst case losing this race just
165 * becomes a spurious read notification on a reused fd.
166 */
167
168/* The alarm system needs to be able to wakeup 'some poller' sometimes
169 * (specifically when a new alarm needs to be triggered earlier than the next
170 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
171 * case occurs. */
172
173static grpc_fd *fd_freelist = NULL;
174static gpr_mu fd_freelist_mu;
175
Craig Tillerc67cc992017-04-27 10:15:51 -0700176static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
177
178static void fd_global_shutdown(void) {
179 gpr_mu_lock(&fd_freelist_mu);
180 gpr_mu_unlock(&fd_freelist_mu);
181 while (fd_freelist != NULL) {
182 grpc_fd *fd = fd_freelist;
183 fd_freelist = fd_freelist->freelist_next;
Craig Tillerc67cc992017-04-27 10:15:51 -0700184 gpr_free(fd);
185 }
186 gpr_mu_destroy(&fd_freelist_mu);
187}
188
189static grpc_fd *fd_create(int fd, const char *name) {
190 grpc_fd *new_fd = NULL;
191
192 gpr_mu_lock(&fd_freelist_mu);
193 if (fd_freelist != NULL) {
194 new_fd = fd_freelist;
195 fd_freelist = fd_freelist->freelist_next;
196 }
197 gpr_mu_unlock(&fd_freelist_mu);
198
199 if (new_fd == NULL) {
200 new_fd = gpr_malloc(sizeof(grpc_fd));
Craig Tillerc67cc992017-04-27 10:15:51 -0700201 }
202
Craig Tillerc67cc992017-04-27 10:15:51 -0700203 new_fd->fd = fd;
Craig Tillerc67cc992017-04-27 10:15:51 -0700204 grpc_lfev_init(&new_fd->read_closure);
205 grpc_lfev_init(&new_fd->write_closure);
206 gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
207
208 new_fd->freelist_next = NULL;
Craig Tillerc67cc992017-04-27 10:15:51 -0700209
210 char *fd_name;
211 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
212 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
213#ifdef GRPC_FD_REF_COUNT_DEBUG
214 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, (void *)new_fd, fd_name);
215#endif
216 gpr_free(fd_name);
Craig Tiller9ddb3152017-04-27 21:32:56 +0000217
218 struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET),
219 .data.ptr = new_fd};
220 if (epoll_ctl(g_epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
221 gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno));
222 }
223
Craig Tillerc67cc992017-04-27 10:15:51 -0700224 return new_fd;
225}
226
Craig Tiller4509c472017-04-27 19:05:13 +0000227static int fd_wrapped_fd(grpc_fd *fd) { return fd->fd; }
Craig Tillerc67cc992017-04-27 10:15:51 -0700228
Craig Tiller9ddb3152017-04-27 21:32:56 +0000229/* Might be called multiple times */
230static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
231 if (grpc_lfev_set_shutdown(exec_ctx, &fd->read_closure,
232 GRPC_ERROR_REF(why))) {
233 shutdown(fd->fd, SHUT_RDWR);
234 grpc_lfev_set_shutdown(exec_ctx, &fd->write_closure, GRPC_ERROR_REF(why));
235 }
236 GRPC_ERROR_UNREF(why);
237}
238
Craig Tillerc67cc992017-04-27 10:15:51 -0700239static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
240 grpc_closure *on_done, int *release_fd,
241 const char *reason) {
Craig Tillerc67cc992017-04-27 10:15:51 -0700242 grpc_error *error = GRPC_ERROR_NONE;
Craig Tillerc67cc992017-04-27 10:15:51 -0700243
Craig Tiller9ddb3152017-04-27 21:32:56 +0000244 if (!grpc_lfev_is_shutdown(&fd->read_closure)) {
245 fd_shutdown(exec_ctx, fd, GRPC_ERROR_CREATE_FROM_COPIED_STRING(reason));
246 }
247
Craig Tillerc67cc992017-04-27 10:15:51 -0700248 /* If release_fd is not NULL, we should be relinquishing control of the file
249 descriptor fd->fd (but we still own the grpc_fd structure). */
250 if (release_fd != NULL) {
251 *release_fd = fd->fd;
252 } else {
253 close(fd->fd);
Craig Tillerc67cc992017-04-27 10:15:51 -0700254 }
255
Craig Tiller4509c472017-04-27 19:05:13 +0000256 grpc_closure_sched(exec_ctx, on_done, GRPC_ERROR_REF(error));
Craig Tillerc67cc992017-04-27 10:15:51 -0700257
Craig Tiller4509c472017-04-27 19:05:13 +0000258 grpc_iomgr_unregister_object(&fd->iomgr_object);
259 grpc_lfev_destroy(&fd->read_closure);
260 grpc_lfev_destroy(&fd->write_closure);
Craig Tillerc67cc992017-04-27 10:15:51 -0700261
Craig Tiller4509c472017-04-27 19:05:13 +0000262 gpr_mu_lock(&fd_freelist_mu);
263 fd->freelist_next = fd_freelist;
264 fd_freelist = fd;
265 gpr_mu_unlock(&fd_freelist_mu);
Craig Tillerc67cc992017-04-27 10:15:51 -0700266}
267
268static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
269 grpc_fd *fd) {
270 gpr_atm notifier = gpr_atm_acq_load(&fd->read_notifier_pollset);
271 return (grpc_pollset *)notifier;
272}
273
274static bool fd_is_shutdown(grpc_fd *fd) {
275 return grpc_lfev_is_shutdown(&fd->read_closure);
276}
277
Craig Tillerc67cc992017-04-27 10:15:51 -0700278static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
279 grpc_closure *closure) {
280 grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure);
281}
282
283static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
284 grpc_closure *closure) {
285 grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
286}
287
288static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
Craig Tiller4509c472017-04-27 19:05:13 +0000289 return NULL; /* TODO(ctiller): add a global workqueue */
290}
291
292static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
293 grpc_pollset *notifier) {
294 grpc_lfev_set_ready(exec_ctx, &fd->read_closure);
295
296 /* Note, it is possible that fd_become_readable might be called twice with
297 different 'notifier's when an fd becomes readable and it is in two epoll
298 sets (This can happen briefly during polling island merges). In such cases
299 it does not really matter which notifer is set as the read_notifier_pollset
300 (They would both point to the same polling island anyway) */
301 /* Use release store to match with acquire load in fd_get_read_notifier */
302 gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
303}
304
305static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
306 grpc_lfev_set_ready(exec_ctx, &fd->write_closure);
Craig Tillerc67cc992017-04-27 10:15:51 -0700307}
308
309/*******************************************************************************
310 * Pollset Definitions
311 */
312
Craig Tiller6de05932017-04-28 09:17:38 -0700313GPR_TLS_DECL(g_current_thread_pollset);
314GPR_TLS_DECL(g_current_thread_worker);
315static gpr_atm g_active_poller;
316static pollset_neighbourhood *g_neighbourhoods;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700317static size_t g_num_neighbourhoods;
Craig Tiller6de05932017-04-28 09:17:38 -0700318
Craig Tillerc67cc992017-04-27 10:15:51 -0700319/* Return true if first in list */
Craig Tiller32f90ee2017-04-28 12:46:41 -0700320static bool worker_insert(grpc_pollset *pollset, grpc_pollset_worker *worker) {
321 if (pollset->root_worker == NULL) {
322 pollset->root_worker = worker;
323 worker->next = worker->prev = worker;
Craig Tillerc67cc992017-04-27 10:15:51 -0700324 return true;
325 } else {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700326 worker->next = pollset->root_worker;
327 worker->prev = worker->next->prev;
328 worker->next->prev = worker;
329 worker->prev->next = worker;
Craig Tillerc67cc992017-04-27 10:15:51 -0700330 return false;
331 }
332}
333
334/* Return true if last in list */
335typedef enum { EMPTIED, NEW_ROOT, REMOVED } worker_remove_result;
336
Craig Tiller32f90ee2017-04-28 12:46:41 -0700337static worker_remove_result worker_remove(grpc_pollset *pollset,
Craig Tillerc67cc992017-04-27 10:15:51 -0700338 grpc_pollset_worker *worker) {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700339 if (worker == pollset->root_worker) {
340 if (worker == worker->next) {
341 pollset->root_worker = NULL;
Craig Tillerc67cc992017-04-27 10:15:51 -0700342 return EMPTIED;
343 } else {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700344 pollset->root_worker = worker->next;
345 worker->prev->next = worker->next;
346 worker->next->prev = worker->prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700347 return NEW_ROOT;
348 }
349 } else {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700350 worker->prev->next = worker->next;
351 worker->next->prev = worker->prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700352 return REMOVED;
353 }
354}
355
Craig Tiller4509c472017-04-27 19:05:13 +0000356static grpc_error *pollset_global_init(void) {
Craig Tiller4509c472017-04-27 19:05:13 +0000357 gpr_tls_init(&g_current_thread_pollset);
358 gpr_tls_init(&g_current_thread_worker);
Craig Tiller6de05932017-04-28 09:17:38 -0700359 gpr_atm_no_barrier_store(&g_active_poller, 0);
Craig Tiller375eb252017-04-27 23:29:12 +0000360 global_wakeup_fd.read_fd = -1;
361 grpc_error *err = grpc_wakeup_fd_init(&global_wakeup_fd);
362 if (err != GRPC_ERROR_NONE) return err;
Craig Tiller4509c472017-04-27 19:05:13 +0000363 struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLET),
364 .data.ptr = &global_wakeup_fd};
365 if (epoll_ctl(g_epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd, &ev) != 0) {
366 return GRPC_OS_ERROR(errno, "epoll_ctl");
367 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700368 g_num_neighbourhoods = GPR_MAX(1, gpr_cpu_num_cores());
369 g_neighbourhoods =
370 gpr_zalloc(sizeof(*g_neighbourhoods) * g_num_neighbourhoods);
371 for (size_t i = 0; i < g_num_neighbourhoods; i++) {
372 gpr_mu_init(&g_neighbourhoods[i].mu);
373 g_neighbourhoods[i].seen_inactive = true;
374 }
Craig Tiller4509c472017-04-27 19:05:13 +0000375 return GRPC_ERROR_NONE;
376}
377
378static void pollset_global_shutdown(void) {
Craig Tiller4509c472017-04-27 19:05:13 +0000379 gpr_tls_destroy(&g_current_thread_pollset);
380 gpr_tls_destroy(&g_current_thread_worker);
Craig Tiller375eb252017-04-27 23:29:12 +0000381 if (global_wakeup_fd.read_fd != -1) grpc_wakeup_fd_destroy(&global_wakeup_fd);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700382 for (size_t i = 0; i < g_num_neighbourhoods; i++) {
383 gpr_mu_destroy(&g_neighbourhoods[i].mu);
384 }
385 gpr_free(g_neighbourhoods);
Craig Tiller4509c472017-04-27 19:05:13 +0000386}
387
388static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
Craig Tiller6de05932017-04-28 09:17:38 -0700389 gpr_mu_init(&pollset->mu);
390 *mu = &pollset->mu;
391 pollset->neighbourhood = &g_neighbourhoods[gpr_cpu_current_cpu()];
392 pollset->seen_inactive = true;
393 pollset->next = pollset->prev = pollset;
394}
395
396static void pollset_destroy(grpc_pollset *pollset) {
Craig Tiller6de05932017-04-28 09:17:38 -0700397 gpr_mu_lock(&pollset->neighbourhood->mu);
398 pollset->prev->next = pollset->next;
399 pollset->next->prev = pollset->prev;
400 if (pollset == pollset->neighbourhood->active_root) {
401 pollset->neighbourhood->active_root =
402 pollset->next == pollset ? NULL : pollset->next;
403 } else if (pollset == pollset->neighbourhood->inactive_root) {
404 pollset->neighbourhood->inactive_root =
405 pollset->next == pollset ? NULL : pollset->next;
406 }
407 gpr_mu_unlock(&pollset->neighbourhood->mu);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700408 gpr_mu_destroy(&pollset->mu);
Craig Tiller4509c472017-04-27 19:05:13 +0000409}
410
411static grpc_error *pollset_kick_all(grpc_pollset *pollset) {
412 grpc_error *error = GRPC_ERROR_NONE;
413 if (pollset->root_worker != NULL) {
414 grpc_pollset_worker *worker = pollset->root_worker;
415 do {
416 if (worker->initialized_cv) {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700417 worker->kick_state = KICKED;
Craig Tiller4509c472017-04-27 19:05:13 +0000418 gpr_cv_signal(&worker->cv);
419 } else {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700420 worker->kick_state = KICKED;
Craig Tiller4509c472017-04-27 19:05:13 +0000421 append_error(&error, grpc_wakeup_fd_wakeup(&global_wakeup_fd),
422 "pollset_shutdown");
423 }
424
Craig Tiller32f90ee2017-04-28 12:46:41 -0700425 worker = worker->next;
Craig Tiller4509c472017-04-27 19:05:13 +0000426 } while (worker != pollset->root_worker);
427 }
428 return error;
429}
430
431static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx,
432 grpc_pollset *pollset) {
433 if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL) {
434 grpc_closure_sched(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE);
435 pollset->shutdown_closure = NULL;
436 }
437}
438
439static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
440 grpc_closure *closure) {
441 GPR_ASSERT(pollset->shutdown_closure == NULL);
442 pollset->shutdown_closure = closure;
443 GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(pollset));
444 pollset_maybe_finish_shutdown(exec_ctx, pollset);
445}
446
Craig Tiller4509c472017-04-27 19:05:13 +0000447#define MAX_EPOLL_EVENTS 100
448
449static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
450 gpr_timespec now) {
451 gpr_timespec timeout;
452 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
453 return -1;
454 }
455
456 if (gpr_time_cmp(deadline, now) <= 0) {
457 return 0;
458 }
459
460 static const gpr_timespec round_up = {
461 .clock_type = GPR_TIMESPAN, .tv_sec = 0, .tv_nsec = GPR_NS_PER_MS - 1};
462 timeout = gpr_time_sub(deadline, now);
463 int millis = gpr_time_to_millis(gpr_time_add(timeout, round_up));
464 return millis >= 1 ? millis : 1;
465}
466
467static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
468 gpr_timespec now, gpr_timespec deadline) {
469 struct epoll_event events[MAX_EPOLL_EVENTS];
470 static const char *err_desc = "pollset_poll";
471
472 int timeout = poll_deadline_to_millis_timeout(deadline, now);
473
474 if (timeout != 0) {
475 GRPC_SCHEDULING_START_BLOCKING_REGION;
476 }
477 int r;
478 do {
479 r = epoll_wait(g_epfd, events, MAX_EPOLL_EVENTS, timeout);
480 } while (r < 0 && errno == EINTR);
481 if (timeout != 0) {
482 GRPC_SCHEDULING_END_BLOCKING_REGION;
483 }
484
485 if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
486
487 grpc_error *error = GRPC_ERROR_NONE;
488 for (int i = 0; i < r; i++) {
489 void *data_ptr = events[i].data.ptr;
490 if (data_ptr == &global_wakeup_fd) {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700491 if (gpr_atm_no_barrier_cas(&g_timer_kick, 1, 0)) {
Craig Tiller375eb252017-04-27 23:29:12 +0000492 grpc_timer_consume_kick();
493 }
Craig Tiller4509c472017-04-27 19:05:13 +0000494 append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
495 err_desc);
496 } else {
497 grpc_fd *fd = (grpc_fd *)(data_ptr);
498 bool cancel = (events[i].events & (EPOLLERR | EPOLLHUP)) != 0;
499 bool read_ev = (events[i].events & (EPOLLIN | EPOLLPRI)) != 0;
500 bool write_ev = (events[i].events & EPOLLOUT) != 0;
501 if (read_ev || cancel) {
502 fd_become_readable(exec_ctx, fd, pollset);
503 }
504 if (write_ev || cancel) {
505 fd_become_writable(exec_ctx, fd);
506 }
507 }
508 }
509
510 return error;
511}
512
Craig Tiller32f90ee2017-04-28 12:46:41 -0700513#if 0
514static void verify_all_entries_in_neighbourhood_list(
515 grpc_pollset *root, bool should_be_seen_inactive) {
516 if (root == NULL) return;
517 grpc_pollset *p = root;
518 do {
519 GPR_ASSERT(p->seen_inactive == should_be_seen_inactive);
520 p = p->next;
521 } while (p != root);
522}
523
524static void verify_neighbourhood_lists(pollset_neighbourhood *neighbourhood) {
525 // assumes neighbourhood->mu locked
526 verify_all_entries_in_neighbourhood_list(neighbourhood->active_root, false);
527 verify_all_entries_in_neighbourhood_list(neighbourhood->inactive_root, true);
528}
529#endif
530
531static void move_pollset_to_neighbourhood_list(grpc_pollset *pollset,
532 grpc_pollset **from_root,
533 grpc_pollset **to_root) {
534 // remove from old list
535 pollset->prev->next = pollset->next;
536 pollset->next->prev = pollset->prev;
537 if (*from_root == pollset) {
538 *from_root = pollset->next == pollset ? NULL : pollset->next;
539 }
540 // add to new list
541 if (*to_root == NULL) {
542 *to_root = pollset->next = pollset->prev = pollset;
543 } else {
544 pollset->next = *to_root;
545 pollset->prev = pollset->next->prev;
546 pollset->next->prev = pollset->prev->next = pollset;
547 }
548}
549
Craig Tiller4509c472017-04-27 19:05:13 +0000550static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
551 grpc_pollset_worker **worker_hdl, gpr_timespec *now,
552 gpr_timespec deadline) {
Craig Tiller4509c472017-04-27 19:05:13 +0000553 if (worker_hdl != NULL) *worker_hdl = worker;
554 worker->initialized_cv = false;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700555 worker->kick_state = UNKICKED;
Craig Tiller4509c472017-04-27 19:05:13 +0000556
Craig Tiller32f90ee2017-04-28 12:46:41 -0700557 if (pollset->seen_inactive) {
558 // pollset has been observed to be inactive, we need to move back to the
559 // active list
560 pollset_neighbourhood *neighbourhood = pollset->neighbourhood;
561 gpr_mu_unlock(&pollset->mu);
562 gpr_mu_lock(&neighbourhood->mu);
563 gpr_mu_lock(&pollset->mu);
564 if (pollset->seen_inactive) {
565 pollset->seen_inactive = false;
566 move_pollset_to_neighbourhood_list(pollset, &neighbourhood->inactive_root,
567 &neighbourhood->active_root);
568 if (neighbourhood->seen_inactive) {
569 neighbourhood->seen_inactive = false;
570 if (gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)worker)) {
571 worker->kick_state = KICKED_FOR_POLL;
572 }
Craig Tiller4509c472017-04-27 19:05:13 +0000573 }
574 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700575 gpr_mu_unlock(&neighbourhood->mu);
576 }
577 worker_insert(pollset, worker);
578 if (worker->kick_state == UNKICKED) {
579 worker->initialized_cv = true;
580 gpr_cv_init(&worker->cv);
581 do {
582 if (gpr_cv_wait(&worker->cv, &pollset->mu, deadline) &&
583 worker->kick_state == UNKICKED) {
584 worker->kick_state = KICKED;
585 }
586 } while (worker->kick_state == UNKICKED);
Craig Tiller4509c472017-04-27 19:05:13 +0000587 *now = gpr_now(now->clock_type);
588 }
589
Craig Tiller32f90ee2017-04-28 12:46:41 -0700590 return worker->kick_state == KICKED_FOR_POLL &&
591 pollset->shutdown_closure == NULL;
Craig Tiller4509c472017-04-27 19:05:13 +0000592}
593
594static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
595 grpc_pollset_worker *worker,
596 grpc_pollset_worker **worker_hdl) {
Craig Tiller8502ecb2017-04-28 14:22:01 -0700597 if (worker_hdl != NULL) *worker_hdl = NULL;
598 if (gpr_atm_no_barrier_load(&g_active_poller) == (gpr_atm)worker) {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700599 GPR_ASSERT(!pollset->seen_inactive);
600 GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) == (gpr_atm)worker);
601 if (worker->next != worker) {
602 assert(worker->next->initialized_cv);
603 gpr_atm_no_barrier_store(&g_active_poller, (gpr_atm)worker->next);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700604 gpr_log(GPR_DEBUG, "Picked sibling worker %p for poller", worker);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700605 worker->next->kick_state = KICKED_FOR_POLL;
606 gpr_cv_signal(&worker->next->cv);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700607 if (grpc_exec_ctx_has_work(exec_ctx)) {
608 gpr_mu_unlock(&pollset->mu);
609 grpc_exec_ctx_flush(exec_ctx);
610 gpr_mu_lock(&pollset->mu);
611 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700612 } else {
613 gpr_atm_no_barrier_store(&g_active_poller, 0);
614 pollset_neighbourhood *neighbourhood = pollset->neighbourhood;
615 gpr_mu_unlock(&pollset->mu);
616 bool found_worker = false;
617 do {
618 gpr_mu_lock(&neighbourhood->mu);
619 do {
620 grpc_pollset *inspect = neighbourhood->active_root;
621 if (inspect == NULL) {
622 break;
623 }
624 gpr_mu_lock(&inspect->mu);
625 GPR_ASSERT(!inspect->seen_inactive);
626 grpc_pollset_worker *inspect_worker = inspect->root_worker;
627 if (inspect_worker == worker) inspect_worker = worker->next;
628 if (inspect_worker == worker) inspect_worker = NULL;
629 if (inspect_worker != NULL) {
630 if (gpr_atm_no_barrier_cas(&g_active_poller, 0,
631 (gpr_atm)inspect_worker)) {
632 GPR_ASSERT(inspect_worker->initialized_cv);
633 inspect_worker->kick_state = KICKED_FOR_POLL;
634 gpr_cv_signal(&inspect_worker->cv);
635 }
636 // even if we didn't win the cas, there's a worker, we can stop
637 found_worker = true;
638 } else {
639 inspect->seen_inactive = true;
640 move_pollset_to_neighbourhood_list(inspect,
641 &neighbourhood->active_root,
642 &neighbourhood->inactive_root);
643 }
644 gpr_mu_unlock(&inspect->mu);
645 } while (!found_worker);
646 if (!found_worker) {
647 neighbourhood->seen_inactive = true;
648 }
649 gpr_mu_unlock(&neighbourhood->mu);
650 ssize_t cur_neighbourhood_idx = neighbourhood - g_neighbourhoods;
651 GPR_ASSERT(cur_neighbourhood_idx >= 0);
652 GPR_ASSERT(g_num_neighbourhoods < INTPTR_MAX);
653 GPR_ASSERT(cur_neighbourhood_idx < (ssize_t)g_neighbourhoods);
654 size_t new_neighbourhood_idx =
655 ((size_t)cur_neighbourhood_idx + 1) % g_num_neighbourhoods;
656 neighbourhood = &g_neighbourhoods[new_neighbourhood_idx];
657 } while (!found_worker && neighbourhood != pollset->neighbourhood);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700658 grpc_exec_ctx_flush(exec_ctx);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700659 gpr_mu_lock(&pollset->mu);
660 }
Craig Tiller4509c472017-04-27 19:05:13 +0000661 }
662 if (worker->initialized_cv) {
663 gpr_cv_destroy(&worker->cv);
664 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700665 if (EMPTIED == worker_remove(pollset, worker)) {
Craig Tiller4509c472017-04-27 19:05:13 +0000666 pollset_maybe_finish_shutdown(exec_ctx, pollset);
667 }
668}
669
670/* pollset->po.mu lock must be held by the caller before calling this.
671 The function pollset_work() may temporarily release the lock (pollset->po.mu)
672 during the course of its execution but it will always re-acquire the lock and
673 ensure that it is held by the time the function returns */
674static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
675 grpc_pollset_worker **worker_hdl,
676 gpr_timespec now, gpr_timespec deadline) {
677 grpc_pollset_worker worker;
678 grpc_error *error = GRPC_ERROR_NONE;
679 static const char *err_desc = "pollset_work";
680 if (pollset->kicked_without_poller) {
681 pollset->kicked_without_poller = false;
682 return GRPC_ERROR_NONE;
683 }
Craig Tiller8502ecb2017-04-28 14:22:01 -0700684 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
Craig Tiller4509c472017-04-27 19:05:13 +0000685 if (begin_worker(pollset, &worker, worker_hdl, &now, deadline)) {
Craig Tiller4509c472017-04-27 19:05:13 +0000686 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
687 GPR_ASSERT(!pollset->shutdown_closure);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700688 gpr_mu_unlock(&pollset->mu);
Craig Tiller4509c472017-04-27 19:05:13 +0000689 append_error(&error, pollset_epoll(exec_ctx, pollset, now, deadline),
690 err_desc);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700691 gpr_mu_lock(&pollset->mu);
Craig Tiller4509c472017-04-27 19:05:13 +0000692 gpr_tls_set(&g_current_thread_worker, 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000693 }
694 end_worker(exec_ctx, pollset, &worker, worker_hdl);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700695 gpr_tls_set(&g_current_thread_pollset, 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000696 return error;
697}
698
699static grpc_error *pollset_kick(grpc_pollset *pollset,
700 grpc_pollset_worker *specific_worker) {
701 if (specific_worker == NULL) {
702 if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) {
Craig Tiller375eb252017-04-27 23:29:12 +0000703 grpc_pollset_worker *root_worker = pollset->root_worker;
704 if (root_worker == NULL) {
Craig Tiller4509c472017-04-27 19:05:13 +0000705 pollset->kicked_without_poller = true;
706 return GRPC_ERROR_NONE;
Craig Tiller375eb252017-04-27 23:29:12 +0000707 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700708 grpc_pollset_worker *next_worker = root_worker->next;
709 if (root_worker == next_worker &&
710 root_worker == (grpc_pollset_worker *)gpr_atm_no_barrier_load(
711 &g_active_poller)) {
712 root_worker->kick_state = KICKED;
Craig Tiller4509c472017-04-27 19:05:13 +0000713 return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700714 } else if (next_worker->kick_state == UNKICKED) {
715 GPR_ASSERT(next_worker->initialized_cv);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700716 next_worker->kick_state = KICKED;
Craig Tiller375eb252017-04-27 23:29:12 +0000717 gpr_cv_signal(&next_worker->cv);
718 return GRPC_ERROR_NONE;
Craig Tiller8502ecb2017-04-28 14:22:01 -0700719 } else {
720 return GRPC_ERROR_NONE;
Craig Tiller4509c472017-04-27 19:05:13 +0000721 }
722 } else {
723 return GRPC_ERROR_NONE;
724 }
Craig Tillerc9b09e92017-04-28 13:09:10 -0700725 } else if (specific_worker->kick_state != UNKICKED) {
Craig Tiller4509c472017-04-27 19:05:13 +0000726 return GRPC_ERROR_NONE;
727 } else if (gpr_tls_get(&g_current_thread_worker) ==
728 (intptr_t)specific_worker) {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700729 specific_worker->kick_state = KICKED;
Craig Tiller4509c472017-04-27 19:05:13 +0000730 return GRPC_ERROR_NONE;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700731 } else if (specific_worker ==
732 (grpc_pollset_worker *)gpr_atm_no_barrier_load(&g_active_poller)) {
733 specific_worker->kick_state = KICKED;
Craig Tiller4509c472017-04-27 19:05:13 +0000734 return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700735 } else if (specific_worker->initialized_cv) {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700736 specific_worker->kick_state = KICKED;
Craig Tiller4509c472017-04-27 19:05:13 +0000737 gpr_cv_signal(&specific_worker->cv);
738 return GRPC_ERROR_NONE;
Craig Tiller8502ecb2017-04-28 14:22:01 -0700739 } else {
740 specific_worker->kick_state = KICKED;
741 return GRPC_ERROR_NONE;
Craig Tiller4509c472017-04-27 19:05:13 +0000742 }
743}
744
745static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
746 grpc_fd *fd) {}
747
748static grpc_error *kick_poller(void) {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700749 gpr_atm_no_barrier_store(&g_timer_kick, 1);
Craig Tiller4509c472017-04-27 19:05:13 +0000750 return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
751}
752
753/*******************************************************************************
754 * Workqueue Definitions
755 */
756
757#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
758static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
759 const char *file, int line,
760 const char *reason) {
761 return workqueue;
762}
763
764static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
765 const char *file, int line, const char *reason) {}
766#else
767static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
768 return workqueue;
769}
770
771static void workqueue_unref(grpc_exec_ctx *exec_ctx,
772 grpc_workqueue *workqueue) {}
773#endif
774
775static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) {
776 return grpc_schedule_on_exec_ctx;
777}
Craig Tillerc67cc992017-04-27 10:15:51 -0700778
779/*******************************************************************************
780 * Pollset-set Definitions
781 */
782
783static grpc_pollset_set *pollset_set_create(void) {
784 return (grpc_pollset_set *)((intptr_t)0xdeafbeef);
785}
786
787static void pollset_set_destroy(grpc_exec_ctx *exec_ctx,
788 grpc_pollset_set *pss) {}
789
790static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
791 grpc_fd *fd) {}
792
793static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
794 grpc_fd *fd) {}
795
796static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
797 grpc_pollset_set *pss, grpc_pollset *ps) {}
798
799static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
800 grpc_pollset_set *pss, grpc_pollset *ps) {}
801
802static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
803 grpc_pollset_set *bag,
804 grpc_pollset_set *item) {}
805
806static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
807 grpc_pollset_set *bag,
808 grpc_pollset_set *item) {}
809
810/*******************************************************************************
811 * Event engine binding
812 */
813
814static void shutdown_engine(void) {
815 fd_global_shutdown();
816 pollset_global_shutdown();
Craig Tillerc67cc992017-04-27 10:15:51 -0700817}
818
819static const grpc_event_engine_vtable vtable = {
820 .pollset_size = sizeof(grpc_pollset),
821
822 .fd_create = fd_create,
823 .fd_wrapped_fd = fd_wrapped_fd,
824 .fd_orphan = fd_orphan,
825 .fd_shutdown = fd_shutdown,
826 .fd_is_shutdown = fd_is_shutdown,
827 .fd_notify_on_read = fd_notify_on_read,
828 .fd_notify_on_write = fd_notify_on_write,
829 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
830 .fd_get_workqueue = fd_get_workqueue,
831
832 .pollset_init = pollset_init,
833 .pollset_shutdown = pollset_shutdown,
834 .pollset_destroy = pollset_destroy,
835 .pollset_work = pollset_work,
836 .pollset_kick = pollset_kick,
837 .pollset_add_fd = pollset_add_fd,
838
839 .pollset_set_create = pollset_set_create,
840 .pollset_set_destroy = pollset_set_destroy,
841 .pollset_set_add_pollset = pollset_set_add_pollset,
842 .pollset_set_del_pollset = pollset_set_del_pollset,
843 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
844 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
845 .pollset_set_add_fd = pollset_set_add_fd,
846 .pollset_set_del_fd = pollset_set_del_fd,
847
848 .kick_poller = kick_poller,
849
850 .workqueue_ref = workqueue_ref,
851 .workqueue_unref = workqueue_unref,
852 .workqueue_scheduler = workqueue_scheduler,
853
854 .shutdown_engine = shutdown_engine,
855};
856
857/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
858 * Create a dummy epoll_fd to make sure epoll support is available */
Craig Tiller6f0af492017-04-27 19:26:16 +0000859const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) {
Craig Tillerc67cc992017-04-27 10:15:51 -0700860 if (!grpc_has_wakeup_fd()) {
861 return NULL;
862 }
863
Craig Tiller4509c472017-04-27 19:05:13 +0000864 g_epfd = epoll_create1(EPOLL_CLOEXEC);
865 if (g_epfd < 0) {
866 gpr_log(GPR_ERROR, "epoll unavailable");
Craig Tillerc67cc992017-04-27 10:15:51 -0700867 return NULL;
868 }
869
Craig Tillerc67cc992017-04-27 10:15:51 -0700870 fd_global_init();
871
872 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
Craig Tiller4509c472017-04-27 19:05:13 +0000873 close(g_epfd);
874 fd_global_shutdown();
Craig Tillerc67cc992017-04-27 10:15:51 -0700875 return NULL;
876 }
877
878 return &vtable;
879}
880
881#else /* defined(GRPC_LINUX_EPOLL) */
882#if defined(GRPC_POSIX_SOCKET)
883#include "src/core/lib/iomgr/ev_posix.h"
884/* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
885 * NULL */
Craig Tiller9ddb3152017-04-27 21:32:56 +0000886const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) {
887 return NULL;
888}
Craig Tillerc67cc992017-04-27 10:15:51 -0700889#endif /* defined(GRPC_POSIX_SOCKET) */
890#endif /* !defined(GRPC_LINUX_EPOLL) */