blob: 64eef092fab4dcc76e9c08f7febca2c0bf220d52 [file] [log] [blame]
Craig Tillerc67cc992017-04-27 10:15:51 -07001/*
2 *
Craig Tillerd4838a92017-04-27 12:08:18 -07003 * Copyright 2017, Google Inc.
Craig Tillerc67cc992017-04-27 10:15:51 -07004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/lib/iomgr/port.h"
35
36/* This polling engine is only relevant on linux kernels supporting epoll() */
37#ifdef GRPC_LINUX_EPOLL
38
Craig Tiller4509c472017-04-27 19:05:13 +000039#include "src/core/lib/iomgr/ev_epoll1_linux.h"
Craig Tillerc67cc992017-04-27 10:15:51 -070040
41#include <assert.h>
42#include <errno.h>
43#include <poll.h>
44#include <pthread.h>
45#include <string.h>
46#include <sys/epoll.h>
47#include <sys/socket.h>
48#include <unistd.h>
49
50#include <grpc/support/alloc.h>
Craig Tiller6de05932017-04-28 09:17:38 -070051#include <grpc/support/cpu.h>
Craig Tillerc67cc992017-04-27 10:15:51 -070052#include <grpc/support/log.h>
53#include <grpc/support/string_util.h>
54#include <grpc/support/tls.h>
55#include <grpc/support/useful.h>
56
57#include "src/core/lib/iomgr/ev_posix.h"
58#include "src/core/lib/iomgr/iomgr_internal.h"
59#include "src/core/lib/iomgr/lockfree_event.h"
Craig Tillerc67cc992017-04-27 10:15:51 -070060#include "src/core/lib/iomgr/wakeup_fd_posix.h"
61#include "src/core/lib/iomgr/workqueue.h"
62#include "src/core/lib/profiling/timers.h"
63#include "src/core/lib/support/block_annotate.h"
64
Craig Tillerc67cc992017-04-27 10:15:51 -070065static grpc_wakeup_fd global_wakeup_fd;
66static int g_epfd;
67
68/*******************************************************************************
69 * Fd Declarations
70 */
71
72struct grpc_fd {
73 int fd;
74
Craig Tillerc67cc992017-04-27 10:15:51 -070075 gpr_atm read_closure;
76 gpr_atm write_closure;
77
78 struct grpc_fd *freelist_next;
Craig Tillerc67cc992017-04-27 10:15:51 -070079
80 /* The pollset that last noticed that the fd is readable. The actual type
81 * stored in this is (grpc_pollset *) */
82 gpr_atm read_notifier_pollset;
83
84 grpc_iomgr_object iomgr_object;
85};
86
87static void fd_global_init(void);
88static void fd_global_shutdown(void);
89
90/*******************************************************************************
91 * Pollset Declarations
92 */
93
Craig Tiller43bf2592017-04-28 23:21:01 +000094typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state;
Craig Tillerc67cc992017-04-27 10:15:51 -070095
96struct grpc_pollset_worker {
Craig Tiller32f90ee2017-04-28 12:46:41 -070097 kick_state kick_state;
Craig Tiller55624a32017-05-26 08:14:44 -070098 int kick_state_mutator; // which line of code last changed kick state
Craig Tillerc67cc992017-04-27 10:15:51 -070099 bool initialized_cv;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700100 grpc_pollset_worker *next;
101 grpc_pollset_worker *prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700102 gpr_cv cv;
Craig Tiller50da5ec2017-05-01 13:51:14 -0700103 grpc_closure_list schedule_on_end_work;
Craig Tillerc67cc992017-04-27 10:15:51 -0700104};
105
Craig Tiller55624a32017-05-26 08:14:44 -0700106#define SET_KICK_STATE(worker, state) \
107 do { \
108 (worker)->kick_state = (state); \
109 (worker)->kick_state_mutator = __LINE__; \
110 } while (false)
111
Craig Tillerba550da2017-05-01 14:26:31 +0000112#define MAX_NEIGHBOURHOODS 1024
113
Craig Tiller6de05932017-04-28 09:17:38 -0700114typedef struct pollset_neighbourhood {
115 gpr_mu mu;
116 grpc_pollset *active_root;
Craig Tiller6de05932017-04-28 09:17:38 -0700117 char pad[GPR_CACHELINE_SIZE];
118} pollset_neighbourhood;
119
Craig Tillerc67cc992017-04-27 10:15:51 -0700120struct grpc_pollset {
Craig Tiller6de05932017-04-28 09:17:38 -0700121 gpr_mu mu;
122 pollset_neighbourhood *neighbourhood;
Craig Tillere00d7332017-05-01 15:43:51 +0000123 bool reassigning_neighbourhood;
Craig Tiller4509c472017-04-27 19:05:13 +0000124 grpc_pollset_worker *root_worker;
125 bool kicked_without_poller;
Craig Tiller6de05932017-04-28 09:17:38 -0700126 bool seen_inactive;
Craig Tillerc67cc992017-04-27 10:15:51 -0700127 bool shutting_down; /* Is the pollset shutting down ? */
128 bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
Craig Tiller4509c472017-04-27 19:05:13 +0000129 grpc_closure *shutdown_closure; /* Called after after shutdown is complete */
Craig Tillerba550da2017-05-01 14:26:31 +0000130 int begin_refs;
Craig Tiller6de05932017-04-28 09:17:38 -0700131
132 grpc_pollset *next;
133 grpc_pollset *prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700134};
135
136/*******************************************************************************
137 * Pollset-set Declarations
138 */
Craig Tiller6de05932017-04-28 09:17:38 -0700139
Craig Tillerc67cc992017-04-27 10:15:51 -0700140struct grpc_pollset_set {};
141
142/*******************************************************************************
143 * Common helpers
144 */
145
146static bool append_error(grpc_error **composite, grpc_error *error,
147 const char *desc) {
148 if (error == GRPC_ERROR_NONE) return true;
149 if (*composite == GRPC_ERROR_NONE) {
150 *composite = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc);
151 }
152 *composite = grpc_error_add_child(*composite, error);
153 return false;
154}
155
156/*******************************************************************************
157 * Fd Definitions
158 */
159
160/* We need to keep a freelist not because of any concerns of malloc performance
161 * but instead so that implementations with multiple threads in (for example)
162 * epoll_wait deal with the race between pollset removal and incoming poll
163 * notifications.
164 *
165 * The problem is that the poller ultimately holds a reference to this
166 * object, so it is very difficult to know when is safe to free it, at least
167 * without some expensive synchronization.
168 *
169 * If we keep the object freelisted, in the worst case losing this race just
170 * becomes a spurious read notification on a reused fd.
171 */
172
173/* The alarm system needs to be able to wakeup 'some poller' sometimes
174 * (specifically when a new alarm needs to be triggered earlier than the next
175 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
176 * case occurs. */
177
178static grpc_fd *fd_freelist = NULL;
179static gpr_mu fd_freelist_mu;
180
Craig Tillerc67cc992017-04-27 10:15:51 -0700181static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
182
183static void fd_global_shutdown(void) {
184 gpr_mu_lock(&fd_freelist_mu);
185 gpr_mu_unlock(&fd_freelist_mu);
186 while (fd_freelist != NULL) {
187 grpc_fd *fd = fd_freelist;
188 fd_freelist = fd_freelist->freelist_next;
Craig Tillerc67cc992017-04-27 10:15:51 -0700189 gpr_free(fd);
190 }
191 gpr_mu_destroy(&fd_freelist_mu);
192}
193
194static grpc_fd *fd_create(int fd, const char *name) {
195 grpc_fd *new_fd = NULL;
196
197 gpr_mu_lock(&fd_freelist_mu);
198 if (fd_freelist != NULL) {
199 new_fd = fd_freelist;
200 fd_freelist = fd_freelist->freelist_next;
201 }
202 gpr_mu_unlock(&fd_freelist_mu);
203
204 if (new_fd == NULL) {
205 new_fd = gpr_malloc(sizeof(grpc_fd));
Craig Tillerc67cc992017-04-27 10:15:51 -0700206 }
207
Craig Tillerc67cc992017-04-27 10:15:51 -0700208 new_fd->fd = fd;
Craig Tillerc67cc992017-04-27 10:15:51 -0700209 grpc_lfev_init(&new_fd->read_closure);
210 grpc_lfev_init(&new_fd->write_closure);
211 gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
212
213 new_fd->freelist_next = NULL;
Craig Tillerc67cc992017-04-27 10:15:51 -0700214
215 char *fd_name;
216 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
217 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
218#ifdef GRPC_FD_REF_COUNT_DEBUG
219 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, (void *)new_fd, fd_name);
220#endif
221 gpr_free(fd_name);
Craig Tiller9ddb3152017-04-27 21:32:56 +0000222
223 struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET),
224 .data.ptr = new_fd};
225 if (epoll_ctl(g_epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
226 gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno));
227 }
228
Craig Tillerc67cc992017-04-27 10:15:51 -0700229 return new_fd;
230}
231
Craig Tiller4509c472017-04-27 19:05:13 +0000232static int fd_wrapped_fd(grpc_fd *fd) { return fd->fd; }
Craig Tillerc67cc992017-04-27 10:15:51 -0700233
Craig Tiller9ddb3152017-04-27 21:32:56 +0000234/* Might be called multiple times */
235static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
236 if (grpc_lfev_set_shutdown(exec_ctx, &fd->read_closure,
237 GRPC_ERROR_REF(why))) {
238 shutdown(fd->fd, SHUT_RDWR);
239 grpc_lfev_set_shutdown(exec_ctx, &fd->write_closure, GRPC_ERROR_REF(why));
240 }
241 GRPC_ERROR_UNREF(why);
242}
243
Craig Tillerc67cc992017-04-27 10:15:51 -0700244static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
245 grpc_closure *on_done, int *release_fd,
246 const char *reason) {
Craig Tillerc67cc992017-04-27 10:15:51 -0700247 grpc_error *error = GRPC_ERROR_NONE;
Craig Tillerc67cc992017-04-27 10:15:51 -0700248
Craig Tiller9ddb3152017-04-27 21:32:56 +0000249 if (!grpc_lfev_is_shutdown(&fd->read_closure)) {
250 fd_shutdown(exec_ctx, fd, GRPC_ERROR_CREATE_FROM_COPIED_STRING(reason));
251 }
252
Craig Tillerc67cc992017-04-27 10:15:51 -0700253 /* If release_fd is not NULL, we should be relinquishing control of the file
254 descriptor fd->fd (but we still own the grpc_fd structure). */
255 if (release_fd != NULL) {
256 *release_fd = fd->fd;
257 } else {
258 close(fd->fd);
Craig Tillerc67cc992017-04-27 10:15:51 -0700259 }
260
Craig Tiller4509c472017-04-27 19:05:13 +0000261 grpc_closure_sched(exec_ctx, on_done, GRPC_ERROR_REF(error));
Craig Tillerc67cc992017-04-27 10:15:51 -0700262
Craig Tiller4509c472017-04-27 19:05:13 +0000263 grpc_iomgr_unregister_object(&fd->iomgr_object);
264 grpc_lfev_destroy(&fd->read_closure);
265 grpc_lfev_destroy(&fd->write_closure);
Craig Tillerc67cc992017-04-27 10:15:51 -0700266
Craig Tiller4509c472017-04-27 19:05:13 +0000267 gpr_mu_lock(&fd_freelist_mu);
268 fd->freelist_next = fd_freelist;
269 fd_freelist = fd;
270 gpr_mu_unlock(&fd_freelist_mu);
Craig Tillerc67cc992017-04-27 10:15:51 -0700271}
272
273static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
274 grpc_fd *fd) {
275 gpr_atm notifier = gpr_atm_acq_load(&fd->read_notifier_pollset);
276 return (grpc_pollset *)notifier;
277}
278
279static bool fd_is_shutdown(grpc_fd *fd) {
280 return grpc_lfev_is_shutdown(&fd->read_closure);
281}
282
Craig Tillerc67cc992017-04-27 10:15:51 -0700283static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
284 grpc_closure *closure) {
285 grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure);
286}
287
288static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
289 grpc_closure *closure) {
290 grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
291}
292
293static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
Craig Tiller50da5ec2017-05-01 13:51:14 -0700294 return (grpc_workqueue *)0xb0b51ed;
Craig Tiller4509c472017-04-27 19:05:13 +0000295}
296
297static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
298 grpc_pollset *notifier) {
299 grpc_lfev_set_ready(exec_ctx, &fd->read_closure);
300
301 /* Note, it is possible that fd_become_readable might be called twice with
302 different 'notifier's when an fd becomes readable and it is in two epoll
303 sets (This can happen briefly during polling island merges). In such cases
304 it does not really matter which notifer is set as the read_notifier_pollset
305 (They would both point to the same polling island anyway) */
306 /* Use release store to match with acquire load in fd_get_read_notifier */
307 gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
308}
309
310static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
311 grpc_lfev_set_ready(exec_ctx, &fd->write_closure);
Craig Tillerc67cc992017-04-27 10:15:51 -0700312}
313
314/*******************************************************************************
315 * Pollset Definitions
316 */
317
Craig Tiller6de05932017-04-28 09:17:38 -0700318GPR_TLS_DECL(g_current_thread_pollset);
319GPR_TLS_DECL(g_current_thread_worker);
320static gpr_atm g_active_poller;
321static pollset_neighbourhood *g_neighbourhoods;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700322static size_t g_num_neighbourhoods;
Craig Tiller67e229e2017-05-01 20:57:59 +0000323static gpr_mu g_wq_mu;
324static grpc_closure_list g_wq_items;
Craig Tiller6de05932017-04-28 09:17:38 -0700325
Craig Tillerc67cc992017-04-27 10:15:51 -0700326/* Return true if first in list */
Craig Tiller32f90ee2017-04-28 12:46:41 -0700327static bool worker_insert(grpc_pollset *pollset, grpc_pollset_worker *worker) {
328 if (pollset->root_worker == NULL) {
329 pollset->root_worker = worker;
330 worker->next = worker->prev = worker;
Craig Tillerc67cc992017-04-27 10:15:51 -0700331 return true;
332 } else {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700333 worker->next = pollset->root_worker;
334 worker->prev = worker->next->prev;
335 worker->next->prev = worker;
336 worker->prev->next = worker;
Craig Tillerc67cc992017-04-27 10:15:51 -0700337 return false;
338 }
339}
340
341/* Return true if last in list */
342typedef enum { EMPTIED, NEW_ROOT, REMOVED } worker_remove_result;
343
Craig Tiller32f90ee2017-04-28 12:46:41 -0700344static worker_remove_result worker_remove(grpc_pollset *pollset,
Craig Tillerc67cc992017-04-27 10:15:51 -0700345 grpc_pollset_worker *worker) {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700346 if (worker == pollset->root_worker) {
347 if (worker == worker->next) {
348 pollset->root_worker = NULL;
Craig Tillerc67cc992017-04-27 10:15:51 -0700349 return EMPTIED;
350 } else {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700351 pollset->root_worker = worker->next;
352 worker->prev->next = worker->next;
353 worker->next->prev = worker->prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700354 return NEW_ROOT;
355 }
356 } else {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700357 worker->prev->next = worker->next;
358 worker->next->prev = worker->prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700359 return REMOVED;
360 }
361}
362
Craig Tillerba550da2017-05-01 14:26:31 +0000363static size_t choose_neighbourhood(void) {
364 return (size_t)gpr_cpu_current_cpu() % g_num_neighbourhoods;
365}
366
Craig Tiller4509c472017-04-27 19:05:13 +0000367static grpc_error *pollset_global_init(void) {
Craig Tiller4509c472017-04-27 19:05:13 +0000368 gpr_tls_init(&g_current_thread_pollset);
369 gpr_tls_init(&g_current_thread_worker);
Craig Tiller6de05932017-04-28 09:17:38 -0700370 gpr_atm_no_barrier_store(&g_active_poller, 0);
Craig Tiller375eb252017-04-27 23:29:12 +0000371 global_wakeup_fd.read_fd = -1;
372 grpc_error *err = grpc_wakeup_fd_init(&global_wakeup_fd);
Craig Tiller67e229e2017-05-01 20:57:59 +0000373 gpr_mu_init(&g_wq_mu);
374 g_wq_items = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
Craig Tiller375eb252017-04-27 23:29:12 +0000375 if (err != GRPC_ERROR_NONE) return err;
Craig Tiller4509c472017-04-27 19:05:13 +0000376 struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLET),
377 .data.ptr = &global_wakeup_fd};
378 if (epoll_ctl(g_epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd, &ev) != 0) {
379 return GRPC_OS_ERROR(errno, "epoll_ctl");
380 }
Craig Tillerba550da2017-05-01 14:26:31 +0000381 g_num_neighbourhoods = GPR_CLAMP(gpr_cpu_num_cores(), 1, MAX_NEIGHBOURHOODS);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700382 g_neighbourhoods =
383 gpr_zalloc(sizeof(*g_neighbourhoods) * g_num_neighbourhoods);
384 for (size_t i = 0; i < g_num_neighbourhoods; i++) {
385 gpr_mu_init(&g_neighbourhoods[i].mu);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700386 }
Craig Tiller4509c472017-04-27 19:05:13 +0000387 return GRPC_ERROR_NONE;
388}
389
390static void pollset_global_shutdown(void) {
Craig Tiller4509c472017-04-27 19:05:13 +0000391 gpr_tls_destroy(&g_current_thread_pollset);
392 gpr_tls_destroy(&g_current_thread_worker);
Craig Tiller67e229e2017-05-01 20:57:59 +0000393 gpr_mu_destroy(&g_wq_mu);
Craig Tiller375eb252017-04-27 23:29:12 +0000394 if (global_wakeup_fd.read_fd != -1) grpc_wakeup_fd_destroy(&global_wakeup_fd);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700395 for (size_t i = 0; i < g_num_neighbourhoods; i++) {
396 gpr_mu_destroy(&g_neighbourhoods[i].mu);
397 }
398 gpr_free(g_neighbourhoods);
Craig Tiller4509c472017-04-27 19:05:13 +0000399}
400
401static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
Craig Tiller6de05932017-04-28 09:17:38 -0700402 gpr_mu_init(&pollset->mu);
403 *mu = &pollset->mu;
Craig Tillerba550da2017-05-01 14:26:31 +0000404 pollset->neighbourhood = &g_neighbourhoods[choose_neighbourhood()];
Craig Tiller6de05932017-04-28 09:17:38 -0700405 pollset->seen_inactive = true;
Craig Tiller6de05932017-04-28 09:17:38 -0700406}
407
Craig Tillerc6109852017-05-01 14:26:49 -0700408static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
Craig Tillere00d7332017-05-01 15:43:51 +0000409 gpr_mu_lock(&pollset->mu);
Craig Tillerba550da2017-05-01 14:26:31 +0000410 if (!pollset->seen_inactive) {
Craig Tillere00d7332017-05-01 15:43:51 +0000411 pollset_neighbourhood *neighbourhood = pollset->neighbourhood;
412 gpr_mu_unlock(&pollset->mu);
Craig Tillera95bacf2017-05-01 12:51:24 -0700413 retry_lock_neighbourhood:
Craig Tillere00d7332017-05-01 15:43:51 +0000414 gpr_mu_lock(&neighbourhood->mu);
415 gpr_mu_lock(&pollset->mu);
416 if (!pollset->seen_inactive) {
417 if (pollset->neighbourhood != neighbourhood) {
418 gpr_mu_unlock(&neighbourhood->mu);
419 neighbourhood = pollset->neighbourhood;
420 gpr_mu_unlock(&pollset->mu);
421 goto retry_lock_neighbourhood;
422 }
423 pollset->prev->next = pollset->next;
424 pollset->next->prev = pollset->prev;
425 if (pollset == pollset->neighbourhood->active_root) {
426 pollset->neighbourhood->active_root =
427 pollset->next == pollset ? NULL : pollset->next;
428 }
Craig Tillerba550da2017-05-01 14:26:31 +0000429 }
430 gpr_mu_unlock(&pollset->neighbourhood->mu);
Craig Tiller6de05932017-04-28 09:17:38 -0700431 }
Craig Tillere00d7332017-05-01 15:43:51 +0000432 gpr_mu_unlock(&pollset->mu);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700433 gpr_mu_destroy(&pollset->mu);
Craig Tiller4509c472017-04-27 19:05:13 +0000434}
435
436static grpc_error *pollset_kick_all(grpc_pollset *pollset) {
437 grpc_error *error = GRPC_ERROR_NONE;
438 if (pollset->root_worker != NULL) {
439 grpc_pollset_worker *worker = pollset->root_worker;
440 do {
Craig Tiller55624a32017-05-26 08:14:44 -0700441 switch (worker->kick_state) {
442 case KICKED:
443 break;
444 case UNKICKED:
445 SET_KICK_STATE(worker, KICKED);
446 if (worker->initialized_cv) {
447 gpr_cv_signal(&worker->cv);
448 }
449 break;
450 case DESIGNATED_POLLER:
451 SET_KICK_STATE(worker, KICKED);
452 append_error(&error, grpc_wakeup_fd_wakeup(&global_wakeup_fd),
453 "pollset_shutdown");
454 break;
Craig Tiller4509c472017-04-27 19:05:13 +0000455 }
456
Craig Tiller32f90ee2017-04-28 12:46:41 -0700457 worker = worker->next;
Craig Tiller4509c472017-04-27 19:05:13 +0000458 } while (worker != pollset->root_worker);
459 }
460 return error;
461}
462
463static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx,
464 grpc_pollset *pollset) {
Craig Tillerba550da2017-05-01 14:26:31 +0000465 if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL &&
466 pollset->begin_refs == 0) {
Craig Tiller4509c472017-04-27 19:05:13 +0000467 grpc_closure_sched(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE);
468 pollset->shutdown_closure = NULL;
469 }
470}
471
472static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
473 grpc_closure *closure) {
474 GPR_ASSERT(pollset->shutdown_closure == NULL);
475 pollset->shutdown_closure = closure;
476 GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(pollset));
477 pollset_maybe_finish_shutdown(exec_ctx, pollset);
478}
479
Craig Tillera95bacf2017-05-01 12:51:24 -0700480#define MAX_EPOLL_EVENTS 100
Craig Tiller4509c472017-04-27 19:05:13 +0000481
482static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
483 gpr_timespec now) {
484 gpr_timespec timeout;
485 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
486 return -1;
487 }
488
489 if (gpr_time_cmp(deadline, now) <= 0) {
490 return 0;
491 }
492
493 static const gpr_timespec round_up = {
494 .clock_type = GPR_TIMESPAN, .tv_sec = 0, .tv_nsec = GPR_NS_PER_MS - 1};
495 timeout = gpr_time_sub(deadline, now);
496 int millis = gpr_time_to_millis(gpr_time_add(timeout, round_up));
497 return millis >= 1 ? millis : 1;
498}
499
500static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
501 gpr_timespec now, gpr_timespec deadline) {
502 struct epoll_event events[MAX_EPOLL_EVENTS];
503 static const char *err_desc = "pollset_poll";
504
505 int timeout = poll_deadline_to_millis_timeout(deadline, now);
506
507 if (timeout != 0) {
508 GRPC_SCHEDULING_START_BLOCKING_REGION;
509 }
510 int r;
511 do {
512 r = epoll_wait(g_epfd, events, MAX_EPOLL_EVENTS, timeout);
513 } while (r < 0 && errno == EINTR);
514 if (timeout != 0) {
515 GRPC_SCHEDULING_END_BLOCKING_REGION;
516 }
517
518 if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
519
520 grpc_error *error = GRPC_ERROR_NONE;
521 for (int i = 0; i < r; i++) {
522 void *data_ptr = events[i].data.ptr;
523 if (data_ptr == &global_wakeup_fd) {
Craig Tiller67e229e2017-05-01 20:57:59 +0000524 gpr_mu_lock(&g_wq_mu);
525 grpc_closure_list_move(&g_wq_items, &exec_ctx->closure_list);
526 gpr_mu_unlock(&g_wq_mu);
Craig Tiller4509c472017-04-27 19:05:13 +0000527 append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
528 err_desc);
529 } else {
530 grpc_fd *fd = (grpc_fd *)(data_ptr);
531 bool cancel = (events[i].events & (EPOLLERR | EPOLLHUP)) != 0;
532 bool read_ev = (events[i].events & (EPOLLIN | EPOLLPRI)) != 0;
533 bool write_ev = (events[i].events & EPOLLOUT) != 0;
534 if (read_ev || cancel) {
535 fd_become_readable(exec_ctx, fd, pollset);
536 }
537 if (write_ev || cancel) {
538 fd_become_writable(exec_ctx, fd);
539 }
540 }
541 }
542
543 return error;
544}
545
546static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
547 grpc_pollset_worker **worker_hdl, gpr_timespec *now,
548 gpr_timespec deadline) {
Craig Tiller4509c472017-04-27 19:05:13 +0000549 if (worker_hdl != NULL) *worker_hdl = worker;
550 worker->initialized_cv = false;
Craig Tiller55624a32017-05-26 08:14:44 -0700551 SET_KICK_STATE(worker, UNKICKED);
Craig Tiller50da5ec2017-05-01 13:51:14 -0700552 worker->schedule_on_end_work = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
Craig Tillerba550da2017-05-01 14:26:31 +0000553 pollset->begin_refs++;
Craig Tiller4509c472017-04-27 19:05:13 +0000554
Craig Tiller32f90ee2017-04-28 12:46:41 -0700555 if (pollset->seen_inactive) {
556 // pollset has been observed to be inactive, we need to move back to the
557 // active list
Craig Tillere00d7332017-05-01 15:43:51 +0000558 bool is_reassigning = false;
559 if (!pollset->reassigning_neighbourhood) {
560 is_reassigning = true;
561 pollset->reassigning_neighbourhood = true;
562 pollset->neighbourhood = &g_neighbourhoods[choose_neighbourhood()];
563 }
564 pollset_neighbourhood *neighbourhood = pollset->neighbourhood;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700565 gpr_mu_unlock(&pollset->mu);
Craig Tillerba550da2017-05-01 14:26:31 +0000566 // pollset unlocked: state may change (even worker->kick_state)
567 retry_lock_neighbourhood:
Craig Tiller32f90ee2017-04-28 12:46:41 -0700568 gpr_mu_lock(&neighbourhood->mu);
569 gpr_mu_lock(&pollset->mu);
570 if (pollset->seen_inactive) {
Craig Tiller2acab6e2017-04-30 23:06:33 +0000571 if (neighbourhood != pollset->neighbourhood) {
572 gpr_mu_unlock(&neighbourhood->mu);
573 neighbourhood = pollset->neighbourhood;
574 gpr_mu_unlock(&pollset->mu);
575 goto retry_lock_neighbourhood;
576 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700577 pollset->seen_inactive = false;
Craig Tiller2acab6e2017-04-30 23:06:33 +0000578 if (neighbourhood->active_root == NULL) {
579 neighbourhood->active_root = pollset->next = pollset->prev = pollset;
Craig Tiller55624a32017-05-26 08:14:44 -0700580 if (worker->kick_state == UNKICKED &&
581 gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)worker)) {
582 SET_KICK_STATE(worker, DESIGNATED_POLLER);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700583 }
Craig Tiller2acab6e2017-04-30 23:06:33 +0000584 } else {
585 pollset->next = neighbourhood->active_root;
586 pollset->prev = pollset->next->prev;
587 pollset->next->prev = pollset->prev->next = pollset;
Craig Tiller4509c472017-04-27 19:05:13 +0000588 }
589 }
Craig Tillere00d7332017-05-01 15:43:51 +0000590 if (is_reassigning) {
591 GPR_ASSERT(pollset->reassigning_neighbourhood);
592 pollset->reassigning_neighbourhood = false;
593 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700594 gpr_mu_unlock(&neighbourhood->mu);
595 }
596 worker_insert(pollset, worker);
Craig Tillerba550da2017-05-01 14:26:31 +0000597 pollset->begin_refs--;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700598 if (worker->kick_state == UNKICKED) {
Craig Tillera4b8eb02017-04-29 00:13:52 +0000599 GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700600 worker->initialized_cv = true;
601 gpr_cv_init(&worker->cv);
Craig Tillerba550da2017-05-01 14:26:31 +0000602 while (worker->kick_state == UNKICKED &&
603 pollset->shutdown_closure == NULL) {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700604 if (gpr_cv_wait(&worker->cv, &pollset->mu, deadline) &&
605 worker->kick_state == UNKICKED) {
Craig Tiller55624a32017-05-26 08:14:44 -0700606 SET_KICK_STATE(worker, KICKED);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700607 }
Craig Tillerba550da2017-05-01 14:26:31 +0000608 }
Craig Tiller4509c472017-04-27 19:05:13 +0000609 *now = gpr_now(now->clock_type);
610 }
611
Craig Tiller43bf2592017-04-28 23:21:01 +0000612 return worker->kick_state == DESIGNATED_POLLER &&
Craig Tiller32f90ee2017-04-28 12:46:41 -0700613 pollset->shutdown_closure == NULL;
Craig Tiller4509c472017-04-27 19:05:13 +0000614}
615
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700616static bool check_neighbourhood_for_available_poller(
Craig Tillera4b8eb02017-04-29 00:13:52 +0000617 pollset_neighbourhood *neighbourhood) {
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700618 bool found_worker = false;
619 do {
620 grpc_pollset *inspect = neighbourhood->active_root;
621 if (inspect == NULL) {
622 break;
623 }
624 gpr_mu_lock(&inspect->mu);
625 GPR_ASSERT(!inspect->seen_inactive);
626 grpc_pollset_worker *inspect_worker = inspect->root_worker;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700627 if (inspect_worker != NULL) {
Craig Tillera4b8eb02017-04-29 00:13:52 +0000628 do {
Craig Tillerba550da2017-05-01 14:26:31 +0000629 switch (inspect_worker->kick_state) {
630 case UNKICKED:
631 if (gpr_atm_no_barrier_cas(&g_active_poller, 0,
632 (gpr_atm)inspect_worker)) {
Craig Tiller55624a32017-05-26 08:14:44 -0700633 SET_KICK_STATE(inspect_worker, DESIGNATED_POLLER);
Craig Tillerba550da2017-05-01 14:26:31 +0000634 if (inspect_worker->initialized_cv) {
635 gpr_cv_signal(&inspect_worker->cv);
636 }
Craig Tillera4b8eb02017-04-29 00:13:52 +0000637 }
Craig Tillerba550da2017-05-01 14:26:31 +0000638 // even if we didn't win the cas, there's a worker, we can stop
639 found_worker = true;
640 break;
641 case KICKED:
642 break;
643 case DESIGNATED_POLLER:
644 found_worker = true; // ok, so someone else found the worker, but
645 // we'll accept that
646 break;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700647 }
Craig Tillera4b8eb02017-04-29 00:13:52 +0000648 inspect_worker = inspect_worker->next;
649 } while (inspect_worker != inspect->root_worker);
650 }
651 if (!found_worker) {
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700652 inspect->seen_inactive = true;
Craig Tiller2acab6e2017-04-30 23:06:33 +0000653 if (inspect == neighbourhood->active_root) {
Craig Tillera95bacf2017-05-01 12:51:24 -0700654 neighbourhood->active_root =
655 inspect->next == inspect ? NULL : inspect->next;
Craig Tiller2acab6e2017-04-30 23:06:33 +0000656 }
657 inspect->next->prev = inspect->prev;
658 inspect->prev->next = inspect->next;
Craig Tillere00d7332017-05-01 15:43:51 +0000659 inspect->next = inspect->prev = NULL;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700660 }
661 gpr_mu_unlock(&inspect->mu);
662 } while (!found_worker);
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700663 return found_worker;
664}
665
Craig Tiller4509c472017-04-27 19:05:13 +0000666static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
667 grpc_pollset_worker *worker,
668 grpc_pollset_worker **worker_hdl) {
Craig Tiller8502ecb2017-04-28 14:22:01 -0700669 if (worker_hdl != NULL) *worker_hdl = NULL;
Craig Tiller55624a32017-05-26 08:14:44 -0700670 SET_KICK_STATE(worker, KICKED);
Craig Tiller50da5ec2017-05-01 13:51:14 -0700671 grpc_closure_list_move(&worker->schedule_on_end_work,
672 &exec_ctx->closure_list);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700673 if (gpr_atm_no_barrier_load(&g_active_poller) == (gpr_atm)worker) {
Craig Tillera4b8eb02017-04-29 00:13:52 +0000674 if (worker->next != worker && worker->next->kick_state == UNKICKED) {
Craig Tiller2acab6e2017-04-30 23:06:33 +0000675 GPR_ASSERT(worker->next->initialized_cv);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700676 gpr_atm_no_barrier_store(&g_active_poller, (gpr_atm)worker->next);
Craig Tiller55624a32017-05-26 08:14:44 -0700677 SET_KICK_STATE(worker->next, DESIGNATED_POLLER);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700678 gpr_cv_signal(&worker->next->cv);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700679 if (grpc_exec_ctx_has_work(exec_ctx)) {
680 gpr_mu_unlock(&pollset->mu);
681 grpc_exec_ctx_flush(exec_ctx);
682 gpr_mu_lock(&pollset->mu);
683 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700684 } else {
685 gpr_atm_no_barrier_store(&g_active_poller, 0);
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700686 size_t poller_neighbourhood_idx =
687 (size_t)(pollset->neighbourhood - g_neighbourhoods);
Craig Tillerbb742672017-05-17 22:19:05 +0000688 gpr_mu_unlock(&pollset->mu);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700689 bool found_worker = false;
Craig Tillerba550da2017-05-01 14:26:31 +0000690 bool scan_state[MAX_NEIGHBOURHOODS];
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700691 for (size_t i = 0; !found_worker && i < g_num_neighbourhoods; i++) {
692 pollset_neighbourhood *neighbourhood =
693 &g_neighbourhoods[(poller_neighbourhood_idx + i) %
694 g_num_neighbourhoods];
695 if (gpr_mu_trylock(&neighbourhood->mu)) {
696 found_worker =
Craig Tillera4b8eb02017-04-29 00:13:52 +0000697 check_neighbourhood_for_available_poller(neighbourhood);
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700698 gpr_mu_unlock(&neighbourhood->mu);
Craig Tillerba550da2017-05-01 14:26:31 +0000699 scan_state[i] = true;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700700 } else {
Craig Tillerba550da2017-05-01 14:26:31 +0000701 scan_state[i] = false;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700702 }
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700703 }
Craig Tiller2acab6e2017-04-30 23:06:33 +0000704 for (size_t i = 0; !found_worker && i < g_num_neighbourhoods; i++) {
Craig Tillerba550da2017-05-01 14:26:31 +0000705 if (scan_state[i]) continue;
Craig Tiller2acab6e2017-04-30 23:06:33 +0000706 pollset_neighbourhood *neighbourhood =
707 &g_neighbourhoods[(poller_neighbourhood_idx + i) %
708 g_num_neighbourhoods];
709 gpr_mu_lock(&neighbourhood->mu);
Craig Tillerba550da2017-05-01 14:26:31 +0000710 found_worker = check_neighbourhood_for_available_poller(neighbourhood);
Craig Tiller2acab6e2017-04-30 23:06:33 +0000711 gpr_mu_unlock(&neighbourhood->mu);
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700712 }
Craig Tiller8502ecb2017-04-28 14:22:01 -0700713 grpc_exec_ctx_flush(exec_ctx);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700714 gpr_mu_lock(&pollset->mu);
715 }
Craig Tiller50da5ec2017-05-01 13:51:14 -0700716 } else if (grpc_exec_ctx_has_work(exec_ctx)) {
717 gpr_mu_unlock(&pollset->mu);
718 grpc_exec_ctx_flush(exec_ctx);
719 gpr_mu_lock(&pollset->mu);
Craig Tiller4509c472017-04-27 19:05:13 +0000720 }
721 if (worker->initialized_cv) {
722 gpr_cv_destroy(&worker->cv);
723 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700724 if (EMPTIED == worker_remove(pollset, worker)) {
Craig Tiller4509c472017-04-27 19:05:13 +0000725 pollset_maybe_finish_shutdown(exec_ctx, pollset);
726 }
Craig Tillera4b8eb02017-04-29 00:13:52 +0000727 GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
Craig Tiller4509c472017-04-27 19:05:13 +0000728}
729
730/* pollset->po.mu lock must be held by the caller before calling this.
731 The function pollset_work() may temporarily release the lock (pollset->po.mu)
732 during the course of its execution but it will always re-acquire the lock and
733 ensure that it is held by the time the function returns */
734static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
735 grpc_pollset_worker **worker_hdl,
736 gpr_timespec now, gpr_timespec deadline) {
737 grpc_pollset_worker worker;
738 grpc_error *error = GRPC_ERROR_NONE;
739 static const char *err_desc = "pollset_work";
740 if (pollset->kicked_without_poller) {
741 pollset->kicked_without_poller = false;
742 return GRPC_ERROR_NONE;
743 }
Craig Tiller8502ecb2017-04-28 14:22:01 -0700744 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
Craig Tiller4509c472017-04-27 19:05:13 +0000745 if (begin_worker(pollset, &worker, worker_hdl, &now, deadline)) {
Craig Tiller4509c472017-04-27 19:05:13 +0000746 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
747 GPR_ASSERT(!pollset->shutdown_closure);
Craig Tiller2acab6e2017-04-30 23:06:33 +0000748 GPR_ASSERT(!pollset->seen_inactive);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700749 gpr_mu_unlock(&pollset->mu);
Craig Tiller4509c472017-04-27 19:05:13 +0000750 append_error(&error, pollset_epoll(exec_ctx, pollset, now, deadline),
751 err_desc);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700752 gpr_mu_lock(&pollset->mu);
Craig Tiller4509c472017-04-27 19:05:13 +0000753 gpr_tls_set(&g_current_thread_worker, 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000754 }
755 end_worker(exec_ctx, pollset, &worker, worker_hdl);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700756 gpr_tls_set(&g_current_thread_pollset, 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000757 return error;
758}
759
760static grpc_error *pollset_kick(grpc_pollset *pollset,
761 grpc_pollset_worker *specific_worker) {
762 if (specific_worker == NULL) {
763 if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) {
Craig Tiller375eb252017-04-27 23:29:12 +0000764 grpc_pollset_worker *root_worker = pollset->root_worker;
765 if (root_worker == NULL) {
Craig Tiller4509c472017-04-27 19:05:13 +0000766 pollset->kicked_without_poller = true;
767 return GRPC_ERROR_NONE;
Craig Tiller375eb252017-04-27 23:29:12 +0000768 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700769 grpc_pollset_worker *next_worker = root_worker->next;
Craig Tiller55624a32017-05-26 08:14:44 -0700770 if (root_worker == next_worker && // only try and wake up a poller if
771 // there is no next worker
Craig Tiller32f90ee2017-04-28 12:46:41 -0700772 root_worker == (grpc_pollset_worker *)gpr_atm_no_barrier_load(
773 &g_active_poller)) {
Craig Tiller55624a32017-05-26 08:14:44 -0700774 SET_KICK_STATE(root_worker, KICKED);
Craig Tiller4509c472017-04-27 19:05:13 +0000775 return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700776 } else if (next_worker->kick_state == UNKICKED) {
777 GPR_ASSERT(next_worker->initialized_cv);
Craig Tiller55624a32017-05-26 08:14:44 -0700778 SET_KICK_STATE(next_worker, KICKED);
Craig Tiller375eb252017-04-27 23:29:12 +0000779 gpr_cv_signal(&next_worker->cv);
780 return GRPC_ERROR_NONE;
Craig Tiller55624a32017-05-26 08:14:44 -0700781 } else if (next_worker->kick_state == DESIGNATED_POLLER) {
782 if (root_worker->kick_state != DESIGNATED_POLLER) {
783 SET_KICK_STATE(root_worker, KICKED);
784 if (root_worker->initialized_cv) {
785 gpr_cv_signal(&root_worker->cv);
786 }
787 return GRPC_ERROR_NONE;
788 } else {
789 SET_KICK_STATE(next_worker, KICKED);
790 return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
791 }
Craig Tiller8502ecb2017-04-28 14:22:01 -0700792 } else {
Craig Tiller55624a32017-05-26 08:14:44 -0700793 GPR_ASSERT(next_worker->kick_state == KICKED);
794 SET_KICK_STATE(next_worker, KICKED);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700795 return GRPC_ERROR_NONE;
Craig Tiller4509c472017-04-27 19:05:13 +0000796 }
797 } else {
Craig Tiller55624a32017-05-26 08:14:44 -0700798 GPR_ASSERT(false);
Craig Tiller4509c472017-04-27 19:05:13 +0000799 return GRPC_ERROR_NONE;
800 }
Craig Tiller43bf2592017-04-28 23:21:01 +0000801 } else if (specific_worker->kick_state == KICKED) {
Craig Tiller4509c472017-04-27 19:05:13 +0000802 return GRPC_ERROR_NONE;
803 } else if (gpr_tls_get(&g_current_thread_worker) ==
804 (intptr_t)specific_worker) {
Craig Tiller55624a32017-05-26 08:14:44 -0700805 SET_KICK_STATE(specific_worker, KICKED);
Craig Tiller4509c472017-04-27 19:05:13 +0000806 return GRPC_ERROR_NONE;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700807 } else if (specific_worker ==
808 (grpc_pollset_worker *)gpr_atm_no_barrier_load(&g_active_poller)) {
Craig Tiller55624a32017-05-26 08:14:44 -0700809 SET_KICK_STATE(specific_worker, KICKED);
Craig Tiller4509c472017-04-27 19:05:13 +0000810 return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700811 } else if (specific_worker->initialized_cv) {
Craig Tiller55624a32017-05-26 08:14:44 -0700812 SET_KICK_STATE(specific_worker, KICKED);
Craig Tiller4509c472017-04-27 19:05:13 +0000813 gpr_cv_signal(&specific_worker->cv);
814 return GRPC_ERROR_NONE;
Craig Tiller8502ecb2017-04-28 14:22:01 -0700815 } else {
Craig Tiller55624a32017-05-26 08:14:44 -0700816 SET_KICK_STATE(specific_worker, KICKED);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700817 return GRPC_ERROR_NONE;
Craig Tiller4509c472017-04-27 19:05:13 +0000818 }
819}
820
821static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
822 grpc_fd *fd) {}
823
Craig Tiller4509c472017-04-27 19:05:13 +0000824/*******************************************************************************
825 * Workqueue Definitions
826 */
827
828#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
829static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
830 const char *file, int line,
831 const char *reason) {
832 return workqueue;
833}
834
835static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
836 const char *file, int line, const char *reason) {}
837#else
838static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
839 return workqueue;
840}
841
842static void workqueue_unref(grpc_exec_ctx *exec_ctx,
843 grpc_workqueue *workqueue) {}
844#endif
845
Craig Tiller50da5ec2017-05-01 13:51:14 -0700846static void wq_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
847 grpc_error *error) {
848 // find a neighbourhood to wakeup
849 bool scheduled = false;
850 size_t initial_neighbourhood = choose_neighbourhood();
851 for (size_t i = 0; !scheduled && i < g_num_neighbourhoods; i++) {
852 pollset_neighbourhood *neighbourhood =
853 &g_neighbourhoods[(initial_neighbourhood + i) % g_num_neighbourhoods];
854 if (gpr_mu_trylock(&neighbourhood->mu)) {
855 if (neighbourhood->active_root != NULL) {
856 grpc_pollset *inspect = neighbourhood->active_root;
857 do {
858 if (gpr_mu_trylock(&inspect->mu)) {
859 if (inspect->root_worker != NULL) {
860 grpc_pollset_worker *inspect_worker = inspect->root_worker;
861 do {
862 if (inspect_worker->kick_state == UNKICKED) {
Craig Tiller55624a32017-05-26 08:14:44 -0700863 SET_KICK_STATE(inspect_worker, KICKED);
Craig Tiller50da5ec2017-05-01 13:51:14 -0700864 grpc_closure_list_append(
865 &inspect_worker->schedule_on_end_work, closure, error);
866 if (inspect_worker->initialized_cv) {
867 gpr_cv_signal(&inspect_worker->cv);
868 }
869 scheduled = true;
870 }
871 inspect_worker = inspect_worker->next;
872 } while (!scheduled && inspect_worker != inspect->root_worker);
873 }
874 gpr_mu_unlock(&inspect->mu);
875 }
876 inspect = inspect->next;
877 } while (!scheduled && inspect != neighbourhood->active_root);
878 }
879 gpr_mu_unlock(&neighbourhood->mu);
880 }
881 }
882 if (!scheduled) {
Craig Tiller67e229e2017-05-01 20:57:59 +0000883 gpr_mu_lock(&g_wq_mu);
884 grpc_closure_list_append(&g_wq_items, closure, error);
885 gpr_mu_unlock(&g_wq_mu);
Craig Tiller50da5ec2017-05-01 13:51:14 -0700886 GRPC_LOG_IF_ERROR("workqueue_scheduler",
887 grpc_wakeup_fd_wakeup(&global_wakeup_fd));
888 }
889}
890
891static const grpc_closure_scheduler_vtable
892 singleton_workqueue_scheduler_vtable = {wq_sched, wq_sched,
893 "epoll1_workqueue"};
894
895static grpc_closure_scheduler singleton_workqueue_scheduler = {
896 &singleton_workqueue_scheduler_vtable};
897
Craig Tiller4509c472017-04-27 19:05:13 +0000898static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) {
Craig Tiller50da5ec2017-05-01 13:51:14 -0700899 return &singleton_workqueue_scheduler;
Craig Tiller4509c472017-04-27 19:05:13 +0000900}
Craig Tillerc67cc992017-04-27 10:15:51 -0700901
902/*******************************************************************************
903 * Pollset-set Definitions
904 */
905
906static grpc_pollset_set *pollset_set_create(void) {
907 return (grpc_pollset_set *)((intptr_t)0xdeafbeef);
908}
909
910static void pollset_set_destroy(grpc_exec_ctx *exec_ctx,
911 grpc_pollset_set *pss) {}
912
913static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
914 grpc_fd *fd) {}
915
916static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
917 grpc_fd *fd) {}
918
919static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
920 grpc_pollset_set *pss, grpc_pollset *ps) {}
921
922static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
923 grpc_pollset_set *pss, grpc_pollset *ps) {}
924
925static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
926 grpc_pollset_set *bag,
927 grpc_pollset_set *item) {}
928
929static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
930 grpc_pollset_set *bag,
931 grpc_pollset_set *item) {}
932
933/*******************************************************************************
934 * Event engine binding
935 */
936
937static void shutdown_engine(void) {
938 fd_global_shutdown();
939 pollset_global_shutdown();
Craig Tillerc67cc992017-04-27 10:15:51 -0700940}
941
942static const grpc_event_engine_vtable vtable = {
943 .pollset_size = sizeof(grpc_pollset),
944
945 .fd_create = fd_create,
946 .fd_wrapped_fd = fd_wrapped_fd,
947 .fd_orphan = fd_orphan,
948 .fd_shutdown = fd_shutdown,
949 .fd_is_shutdown = fd_is_shutdown,
950 .fd_notify_on_read = fd_notify_on_read,
951 .fd_notify_on_write = fd_notify_on_write,
952 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
953 .fd_get_workqueue = fd_get_workqueue,
954
955 .pollset_init = pollset_init,
956 .pollset_shutdown = pollset_shutdown,
957 .pollset_destroy = pollset_destroy,
958 .pollset_work = pollset_work,
959 .pollset_kick = pollset_kick,
960 .pollset_add_fd = pollset_add_fd,
961
962 .pollset_set_create = pollset_set_create,
963 .pollset_set_destroy = pollset_set_destroy,
964 .pollset_set_add_pollset = pollset_set_add_pollset,
965 .pollset_set_del_pollset = pollset_set_del_pollset,
966 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
967 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
968 .pollset_set_add_fd = pollset_set_add_fd,
969 .pollset_set_del_fd = pollset_set_del_fd,
970
Craig Tillerc67cc992017-04-27 10:15:51 -0700971 .workqueue_ref = workqueue_ref,
972 .workqueue_unref = workqueue_unref,
973 .workqueue_scheduler = workqueue_scheduler,
974
975 .shutdown_engine = shutdown_engine,
976};
977
978/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
979 * Create a dummy epoll_fd to make sure epoll support is available */
Craig Tiller6f0af492017-04-27 19:26:16 +0000980const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) {
Craig Tillerc67cc992017-04-27 10:15:51 -0700981 if (!grpc_has_wakeup_fd()) {
982 return NULL;
983 }
984
Craig Tiller4509c472017-04-27 19:05:13 +0000985 g_epfd = epoll_create1(EPOLL_CLOEXEC);
986 if (g_epfd < 0) {
987 gpr_log(GPR_ERROR, "epoll unavailable");
Craig Tillerc67cc992017-04-27 10:15:51 -0700988 return NULL;
989 }
990
Craig Tillerc67cc992017-04-27 10:15:51 -0700991 fd_global_init();
992
993 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
Craig Tiller4509c472017-04-27 19:05:13 +0000994 close(g_epfd);
995 fd_global_shutdown();
Craig Tillerc67cc992017-04-27 10:15:51 -0700996 return NULL;
997 }
998
999 return &vtable;
1000}
1001
1002#else /* defined(GRPC_LINUX_EPOLL) */
1003#if defined(GRPC_POSIX_SOCKET)
1004#include "src/core/lib/iomgr/ev_posix.h"
1005/* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
1006 * NULL */
Craig Tiller9ddb3152017-04-27 21:32:56 +00001007const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) {
1008 return NULL;
1009}
Craig Tillerc67cc992017-04-27 10:15:51 -07001010#endif /* defined(GRPC_POSIX_SOCKET) */
1011#endif /* !defined(GRPC_LINUX_EPOLL) */