blob: cce52b2d9447c987f9629d45cc7d9a555ecf6db4 [file] [log] [blame]
Craig Tillerc67cc992017-04-27 10:15:51 -07001/*
2 *
Craig Tillerd4838a92017-04-27 12:08:18 -07003 * Copyright 2017, Google Inc.
Craig Tillerc67cc992017-04-27 10:15:51 -07004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/lib/iomgr/port.h"
35
36/* This polling engine is only relevant on linux kernels supporting epoll() */
37#ifdef GRPC_LINUX_EPOLL
38
Craig Tiller4509c472017-04-27 19:05:13 +000039#include "src/core/lib/iomgr/ev_epoll1_linux.h"
Craig Tillerc67cc992017-04-27 10:15:51 -070040
41#include <assert.h>
42#include <errno.h>
43#include <poll.h>
44#include <pthread.h>
45#include <string.h>
46#include <sys/epoll.h>
47#include <sys/socket.h>
48#include <unistd.h>
49
50#include <grpc/support/alloc.h>
Craig Tiller6de05932017-04-28 09:17:38 -070051#include <grpc/support/cpu.h>
Craig Tillerc67cc992017-04-27 10:15:51 -070052#include <grpc/support/log.h>
53#include <grpc/support/string_util.h>
54#include <grpc/support/tls.h>
55#include <grpc/support/useful.h>
56
57#include "src/core/lib/iomgr/ev_posix.h"
58#include "src/core/lib/iomgr/iomgr_internal.h"
59#include "src/core/lib/iomgr/lockfree_event.h"
60#include "src/core/lib/iomgr/timer.h"
61#include "src/core/lib/iomgr/wakeup_fd_posix.h"
62#include "src/core/lib/iomgr/workqueue.h"
63#include "src/core/lib/profiling/timers.h"
64#include "src/core/lib/support/block_annotate.h"
65
66/* TODO: sreek: Right now, this wakes up all pollers. In future we should make
67 * sure to wake up one polling thread (which can wake up other threads if
68 * needed) */
69static grpc_wakeup_fd global_wakeup_fd;
70static int g_epfd;
Craig Tiller32f90ee2017-04-28 12:46:41 -070071static gpr_atm g_timer_kick;
Craig Tillerc67cc992017-04-27 10:15:51 -070072
73/*******************************************************************************
74 * Fd Declarations
75 */
76
77struct grpc_fd {
78 int fd;
79
Craig Tillerc67cc992017-04-27 10:15:51 -070080 gpr_atm read_closure;
81 gpr_atm write_closure;
82
83 struct grpc_fd *freelist_next;
Craig Tillerc67cc992017-04-27 10:15:51 -070084
85 /* The pollset that last noticed that the fd is readable. The actual type
86 * stored in this is (grpc_pollset *) */
87 gpr_atm read_notifier_pollset;
88
89 grpc_iomgr_object iomgr_object;
90};
91
92static void fd_global_init(void);
93static void fd_global_shutdown(void);
94
95/*******************************************************************************
96 * Pollset Declarations
97 */
98
Craig Tiller43bf2592017-04-28 23:21:01 +000099typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state;
Craig Tillerc67cc992017-04-27 10:15:51 -0700100
101struct grpc_pollset_worker {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700102 kick_state kick_state;
Craig Tillerc67cc992017-04-27 10:15:51 -0700103 bool initialized_cv;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700104 grpc_pollset_worker *next;
105 grpc_pollset_worker *prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700106 gpr_cv cv;
Craig Tiller50da5ec2017-05-01 13:51:14 -0700107 grpc_closure_list schedule_on_end_work;
Craig Tillerc67cc992017-04-27 10:15:51 -0700108};
109
Craig Tillerba550da2017-05-01 14:26:31 +0000110#define MAX_NEIGHBOURHOODS 1024
111
Craig Tiller6de05932017-04-28 09:17:38 -0700112typedef struct pollset_neighbourhood {
113 gpr_mu mu;
114 grpc_pollset *active_root;
Craig Tiller6de05932017-04-28 09:17:38 -0700115 char pad[GPR_CACHELINE_SIZE];
116} pollset_neighbourhood;
117
Craig Tillerc67cc992017-04-27 10:15:51 -0700118struct grpc_pollset {
Craig Tiller6de05932017-04-28 09:17:38 -0700119 gpr_mu mu;
120 pollset_neighbourhood *neighbourhood;
Craig Tillere00d7332017-05-01 15:43:51 +0000121 bool reassigning_neighbourhood;
Craig Tiller4509c472017-04-27 19:05:13 +0000122 grpc_pollset_worker *root_worker;
123 bool kicked_without_poller;
Craig Tiller6de05932017-04-28 09:17:38 -0700124 bool seen_inactive;
Craig Tillerc67cc992017-04-27 10:15:51 -0700125 bool shutting_down; /* Is the pollset shutting down ? */
126 bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
Craig Tiller4509c472017-04-27 19:05:13 +0000127 grpc_closure *shutdown_closure; /* Called after after shutdown is complete */
Craig Tillerba550da2017-05-01 14:26:31 +0000128 int begin_refs;
Craig Tiller6de05932017-04-28 09:17:38 -0700129
130 grpc_pollset *next;
131 grpc_pollset *prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700132};
133
134/*******************************************************************************
135 * Pollset-set Declarations
136 */
Craig Tiller6de05932017-04-28 09:17:38 -0700137
Craig Tillerc67cc992017-04-27 10:15:51 -0700138struct grpc_pollset_set {};
139
140/*******************************************************************************
141 * Common helpers
142 */
143
144static bool append_error(grpc_error **composite, grpc_error *error,
145 const char *desc) {
146 if (error == GRPC_ERROR_NONE) return true;
147 if (*composite == GRPC_ERROR_NONE) {
148 *composite = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc);
149 }
150 *composite = grpc_error_add_child(*composite, error);
151 return false;
152}
153
154/*******************************************************************************
155 * Fd Definitions
156 */
157
158/* We need to keep a freelist not because of any concerns of malloc performance
159 * but instead so that implementations with multiple threads in (for example)
160 * epoll_wait deal with the race between pollset removal and incoming poll
161 * notifications.
162 *
163 * The problem is that the poller ultimately holds a reference to this
164 * object, so it is very difficult to know when is safe to free it, at least
165 * without some expensive synchronization.
166 *
167 * If we keep the object freelisted, in the worst case losing this race just
168 * becomes a spurious read notification on a reused fd.
169 */
170
171/* The alarm system needs to be able to wakeup 'some poller' sometimes
172 * (specifically when a new alarm needs to be triggered earlier than the next
173 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
174 * case occurs. */
175
176static grpc_fd *fd_freelist = NULL;
177static gpr_mu fd_freelist_mu;
178
Craig Tillerc67cc992017-04-27 10:15:51 -0700179static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
180
181static void fd_global_shutdown(void) {
182 gpr_mu_lock(&fd_freelist_mu);
183 gpr_mu_unlock(&fd_freelist_mu);
184 while (fd_freelist != NULL) {
185 grpc_fd *fd = fd_freelist;
186 fd_freelist = fd_freelist->freelist_next;
Craig Tillerc67cc992017-04-27 10:15:51 -0700187 gpr_free(fd);
188 }
189 gpr_mu_destroy(&fd_freelist_mu);
190}
191
192static grpc_fd *fd_create(int fd, const char *name) {
193 grpc_fd *new_fd = NULL;
194
195 gpr_mu_lock(&fd_freelist_mu);
196 if (fd_freelist != NULL) {
197 new_fd = fd_freelist;
198 fd_freelist = fd_freelist->freelist_next;
199 }
200 gpr_mu_unlock(&fd_freelist_mu);
201
202 if (new_fd == NULL) {
203 new_fd = gpr_malloc(sizeof(grpc_fd));
Craig Tillerc67cc992017-04-27 10:15:51 -0700204 }
205
Craig Tillerc67cc992017-04-27 10:15:51 -0700206 new_fd->fd = fd;
Craig Tillerc67cc992017-04-27 10:15:51 -0700207 grpc_lfev_init(&new_fd->read_closure);
208 grpc_lfev_init(&new_fd->write_closure);
209 gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
210
211 new_fd->freelist_next = NULL;
Craig Tillerc67cc992017-04-27 10:15:51 -0700212
213 char *fd_name;
214 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
215 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
216#ifdef GRPC_FD_REF_COUNT_DEBUG
217 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, (void *)new_fd, fd_name);
218#endif
219 gpr_free(fd_name);
Craig Tiller9ddb3152017-04-27 21:32:56 +0000220
221 struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET),
222 .data.ptr = new_fd};
223 if (epoll_ctl(g_epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
224 gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno));
225 }
226
Craig Tillerc67cc992017-04-27 10:15:51 -0700227 return new_fd;
228}
229
Craig Tiller4509c472017-04-27 19:05:13 +0000230static int fd_wrapped_fd(grpc_fd *fd) { return fd->fd; }
Craig Tillerc67cc992017-04-27 10:15:51 -0700231
Craig Tiller9ddb3152017-04-27 21:32:56 +0000232/* Might be called multiple times */
233static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
234 if (grpc_lfev_set_shutdown(exec_ctx, &fd->read_closure,
235 GRPC_ERROR_REF(why))) {
236 shutdown(fd->fd, SHUT_RDWR);
237 grpc_lfev_set_shutdown(exec_ctx, &fd->write_closure, GRPC_ERROR_REF(why));
238 }
239 GRPC_ERROR_UNREF(why);
240}
241
Craig Tillerc67cc992017-04-27 10:15:51 -0700242static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
243 grpc_closure *on_done, int *release_fd,
244 const char *reason) {
Craig Tillerc67cc992017-04-27 10:15:51 -0700245 grpc_error *error = GRPC_ERROR_NONE;
Craig Tillerc67cc992017-04-27 10:15:51 -0700246
Craig Tiller9ddb3152017-04-27 21:32:56 +0000247 if (!grpc_lfev_is_shutdown(&fd->read_closure)) {
248 fd_shutdown(exec_ctx, fd, GRPC_ERROR_CREATE_FROM_COPIED_STRING(reason));
249 }
250
Craig Tillerc67cc992017-04-27 10:15:51 -0700251 /* If release_fd is not NULL, we should be relinquishing control of the file
252 descriptor fd->fd (but we still own the grpc_fd structure). */
253 if (release_fd != NULL) {
254 *release_fd = fd->fd;
255 } else {
256 close(fd->fd);
Craig Tillerc67cc992017-04-27 10:15:51 -0700257 }
258
Craig Tiller4509c472017-04-27 19:05:13 +0000259 grpc_closure_sched(exec_ctx, on_done, GRPC_ERROR_REF(error));
Craig Tillerc67cc992017-04-27 10:15:51 -0700260
Craig Tiller4509c472017-04-27 19:05:13 +0000261 grpc_iomgr_unregister_object(&fd->iomgr_object);
262 grpc_lfev_destroy(&fd->read_closure);
263 grpc_lfev_destroy(&fd->write_closure);
Craig Tillerc67cc992017-04-27 10:15:51 -0700264
Craig Tiller4509c472017-04-27 19:05:13 +0000265 gpr_mu_lock(&fd_freelist_mu);
266 fd->freelist_next = fd_freelist;
267 fd_freelist = fd;
268 gpr_mu_unlock(&fd_freelist_mu);
Craig Tillerc67cc992017-04-27 10:15:51 -0700269}
270
271static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
272 grpc_fd *fd) {
273 gpr_atm notifier = gpr_atm_acq_load(&fd->read_notifier_pollset);
274 return (grpc_pollset *)notifier;
275}
276
277static bool fd_is_shutdown(grpc_fd *fd) {
278 return grpc_lfev_is_shutdown(&fd->read_closure);
279}
280
Craig Tillerc67cc992017-04-27 10:15:51 -0700281static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
282 grpc_closure *closure) {
283 grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure);
284}
285
286static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
287 grpc_closure *closure) {
288 grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
289}
290
291static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
Craig Tiller50da5ec2017-05-01 13:51:14 -0700292 return (grpc_workqueue *)0xb0b51ed;
Craig Tiller4509c472017-04-27 19:05:13 +0000293}
294
295static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
296 grpc_pollset *notifier) {
297 grpc_lfev_set_ready(exec_ctx, &fd->read_closure);
298
299 /* Note, it is possible that fd_become_readable might be called twice with
300 different 'notifier's when an fd becomes readable and it is in two epoll
301 sets (This can happen briefly during polling island merges). In such cases
302 it does not really matter which notifer is set as the read_notifier_pollset
303 (They would both point to the same polling island anyway) */
304 /* Use release store to match with acquire load in fd_get_read_notifier */
305 gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
306}
307
308static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
309 grpc_lfev_set_ready(exec_ctx, &fd->write_closure);
Craig Tillerc67cc992017-04-27 10:15:51 -0700310}
311
312/*******************************************************************************
313 * Pollset Definitions
314 */
315
Craig Tiller6de05932017-04-28 09:17:38 -0700316GPR_TLS_DECL(g_current_thread_pollset);
317GPR_TLS_DECL(g_current_thread_worker);
318static gpr_atm g_active_poller;
319static pollset_neighbourhood *g_neighbourhoods;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700320static size_t g_num_neighbourhoods;
Craig Tiller50da5ec2017-05-01 13:51:14 -0700321static gpr_mpscq g_workqueue_items;
Craig Tiller6de05932017-04-28 09:17:38 -0700322
Craig Tillerc67cc992017-04-27 10:15:51 -0700323/* Return true if first in list */
Craig Tiller32f90ee2017-04-28 12:46:41 -0700324static bool worker_insert(grpc_pollset *pollset, grpc_pollset_worker *worker) {
325 if (pollset->root_worker == NULL) {
326 pollset->root_worker = worker;
327 worker->next = worker->prev = worker;
Craig Tillerc67cc992017-04-27 10:15:51 -0700328 return true;
329 } else {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700330 worker->next = pollset->root_worker;
331 worker->prev = worker->next->prev;
332 worker->next->prev = worker;
333 worker->prev->next = worker;
Craig Tillerc67cc992017-04-27 10:15:51 -0700334 return false;
335 }
336}
337
338/* Return true if last in list */
339typedef enum { EMPTIED, NEW_ROOT, REMOVED } worker_remove_result;
340
Craig Tiller32f90ee2017-04-28 12:46:41 -0700341static worker_remove_result worker_remove(grpc_pollset *pollset,
Craig Tillerc67cc992017-04-27 10:15:51 -0700342 grpc_pollset_worker *worker) {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700343 if (worker == pollset->root_worker) {
344 if (worker == worker->next) {
345 pollset->root_worker = NULL;
Craig Tillerc67cc992017-04-27 10:15:51 -0700346 return EMPTIED;
347 } else {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700348 pollset->root_worker = worker->next;
349 worker->prev->next = worker->next;
350 worker->next->prev = worker->prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700351 return NEW_ROOT;
352 }
353 } else {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700354 worker->prev->next = worker->next;
355 worker->next->prev = worker->prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700356 return REMOVED;
357 }
358}
359
Craig Tillerba550da2017-05-01 14:26:31 +0000360static size_t choose_neighbourhood(void) {
361 return (size_t)gpr_cpu_current_cpu() % g_num_neighbourhoods;
362}
363
Craig Tiller4509c472017-04-27 19:05:13 +0000364static grpc_error *pollset_global_init(void) {
Craig Tiller4509c472017-04-27 19:05:13 +0000365 gpr_tls_init(&g_current_thread_pollset);
366 gpr_tls_init(&g_current_thread_worker);
Craig Tiller6de05932017-04-28 09:17:38 -0700367 gpr_atm_no_barrier_store(&g_active_poller, 0);
Craig Tiller375eb252017-04-27 23:29:12 +0000368 global_wakeup_fd.read_fd = -1;
369 grpc_error *err = grpc_wakeup_fd_init(&global_wakeup_fd);
Craig Tiller50da5ec2017-05-01 13:51:14 -0700370 gpr_mpscq_init(&g_workqueue_items);
Craig Tiller375eb252017-04-27 23:29:12 +0000371 if (err != GRPC_ERROR_NONE) return err;
Craig Tiller4509c472017-04-27 19:05:13 +0000372 struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLET),
373 .data.ptr = &global_wakeup_fd};
374 if (epoll_ctl(g_epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd, &ev) != 0) {
375 return GRPC_OS_ERROR(errno, "epoll_ctl");
376 }
Craig Tillerba550da2017-05-01 14:26:31 +0000377 g_num_neighbourhoods = GPR_CLAMP(gpr_cpu_num_cores(), 1, MAX_NEIGHBOURHOODS);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700378 g_neighbourhoods =
379 gpr_zalloc(sizeof(*g_neighbourhoods) * g_num_neighbourhoods);
380 for (size_t i = 0; i < g_num_neighbourhoods; i++) {
381 gpr_mu_init(&g_neighbourhoods[i].mu);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700382 }
Craig Tiller4509c472017-04-27 19:05:13 +0000383 return GRPC_ERROR_NONE;
384}
385
386static void pollset_global_shutdown(void) {
Craig Tiller4509c472017-04-27 19:05:13 +0000387 gpr_tls_destroy(&g_current_thread_pollset);
388 gpr_tls_destroy(&g_current_thread_worker);
Craig Tiller50da5ec2017-05-01 13:51:14 -0700389 gpr_mpscq_destroy(&g_workqueue_items);
Craig Tiller375eb252017-04-27 23:29:12 +0000390 if (global_wakeup_fd.read_fd != -1) grpc_wakeup_fd_destroy(&global_wakeup_fd);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700391 for (size_t i = 0; i < g_num_neighbourhoods; i++) {
392 gpr_mu_destroy(&g_neighbourhoods[i].mu);
393 }
394 gpr_free(g_neighbourhoods);
Craig Tiller4509c472017-04-27 19:05:13 +0000395}
396
397static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
Craig Tiller6de05932017-04-28 09:17:38 -0700398 gpr_mu_init(&pollset->mu);
399 *mu = &pollset->mu;
Craig Tillerba550da2017-05-01 14:26:31 +0000400 pollset->neighbourhood = &g_neighbourhoods[choose_neighbourhood()];
Craig Tiller6de05932017-04-28 09:17:38 -0700401 pollset->seen_inactive = true;
Craig Tiller6de05932017-04-28 09:17:38 -0700402}
403
404static void pollset_destroy(grpc_pollset *pollset) {
Craig Tillere00d7332017-05-01 15:43:51 +0000405 gpr_mu_lock(&pollset->mu);
Craig Tillerba550da2017-05-01 14:26:31 +0000406 if (!pollset->seen_inactive) {
Craig Tillere00d7332017-05-01 15:43:51 +0000407 pollset_neighbourhood *neighbourhood = pollset->neighbourhood;
408 gpr_mu_unlock(&pollset->mu);
Craig Tillera95bacf2017-05-01 12:51:24 -0700409 retry_lock_neighbourhood:
Craig Tillere00d7332017-05-01 15:43:51 +0000410 gpr_mu_lock(&neighbourhood->mu);
411 gpr_mu_lock(&pollset->mu);
412 if (!pollset->seen_inactive) {
413 if (pollset->neighbourhood != neighbourhood) {
414 gpr_mu_unlock(&neighbourhood->mu);
415 neighbourhood = pollset->neighbourhood;
416 gpr_mu_unlock(&pollset->mu);
417 goto retry_lock_neighbourhood;
418 }
419 pollset->prev->next = pollset->next;
420 pollset->next->prev = pollset->prev;
421 if (pollset == pollset->neighbourhood->active_root) {
422 pollset->neighbourhood->active_root =
423 pollset->next == pollset ? NULL : pollset->next;
424 }
Craig Tillerba550da2017-05-01 14:26:31 +0000425 }
426 gpr_mu_unlock(&pollset->neighbourhood->mu);
Craig Tiller6de05932017-04-28 09:17:38 -0700427 }
Craig Tillere00d7332017-05-01 15:43:51 +0000428 gpr_mu_unlock(&pollset->mu);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700429 gpr_mu_destroy(&pollset->mu);
Craig Tiller4509c472017-04-27 19:05:13 +0000430}
431
432static grpc_error *pollset_kick_all(grpc_pollset *pollset) {
433 grpc_error *error = GRPC_ERROR_NONE;
434 if (pollset->root_worker != NULL) {
435 grpc_pollset_worker *worker = pollset->root_worker;
436 do {
437 if (worker->initialized_cv) {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700438 worker->kick_state = KICKED;
Craig Tiller4509c472017-04-27 19:05:13 +0000439 gpr_cv_signal(&worker->cv);
440 } else {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700441 worker->kick_state = KICKED;
Craig Tiller4509c472017-04-27 19:05:13 +0000442 append_error(&error, grpc_wakeup_fd_wakeup(&global_wakeup_fd),
443 "pollset_shutdown");
444 }
445
Craig Tiller32f90ee2017-04-28 12:46:41 -0700446 worker = worker->next;
Craig Tiller4509c472017-04-27 19:05:13 +0000447 } while (worker != pollset->root_worker);
448 }
449 return error;
450}
451
452static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx,
453 grpc_pollset *pollset) {
Craig Tillerba550da2017-05-01 14:26:31 +0000454 if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL &&
455 pollset->begin_refs == 0) {
Craig Tiller4509c472017-04-27 19:05:13 +0000456 grpc_closure_sched(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE);
457 pollset->shutdown_closure = NULL;
458 }
459}
460
461static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
462 grpc_closure *closure) {
463 GPR_ASSERT(pollset->shutdown_closure == NULL);
464 pollset->shutdown_closure = closure;
465 GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(pollset));
466 pollset_maybe_finish_shutdown(exec_ctx, pollset);
467}
468
Craig Tillera95bacf2017-05-01 12:51:24 -0700469#define MAX_EPOLL_EVENTS 100
Craig Tiller4509c472017-04-27 19:05:13 +0000470
471static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
472 gpr_timespec now) {
473 gpr_timespec timeout;
474 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
475 return -1;
476 }
477
478 if (gpr_time_cmp(deadline, now) <= 0) {
479 return 0;
480 }
481
482 static const gpr_timespec round_up = {
483 .clock_type = GPR_TIMESPAN, .tv_sec = 0, .tv_nsec = GPR_NS_PER_MS - 1};
484 timeout = gpr_time_sub(deadline, now);
485 int millis = gpr_time_to_millis(gpr_time_add(timeout, round_up));
486 return millis >= 1 ? millis : 1;
487}
488
489static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
490 gpr_timespec now, gpr_timespec deadline) {
491 struct epoll_event events[MAX_EPOLL_EVENTS];
492 static const char *err_desc = "pollset_poll";
493
494 int timeout = poll_deadline_to_millis_timeout(deadline, now);
495
496 if (timeout != 0) {
497 GRPC_SCHEDULING_START_BLOCKING_REGION;
498 }
499 int r;
500 do {
501 r = epoll_wait(g_epfd, events, MAX_EPOLL_EVENTS, timeout);
502 } while (r < 0 && errno == EINTR);
503 if (timeout != 0) {
504 GRPC_SCHEDULING_END_BLOCKING_REGION;
505 }
506
507 if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
508
509 grpc_error *error = GRPC_ERROR_NONE;
510 for (int i = 0; i < r; i++) {
511 void *data_ptr = events[i].data.ptr;
512 if (data_ptr == &global_wakeup_fd) {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700513 if (gpr_atm_no_barrier_cas(&g_timer_kick, 1, 0)) {
Craig Tiller375eb252017-04-27 23:29:12 +0000514 grpc_timer_consume_kick();
515 }
Craig Tiller4509c472017-04-27 19:05:13 +0000516 append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
517 err_desc);
518 } else {
519 grpc_fd *fd = (grpc_fd *)(data_ptr);
520 bool cancel = (events[i].events & (EPOLLERR | EPOLLHUP)) != 0;
521 bool read_ev = (events[i].events & (EPOLLIN | EPOLLPRI)) != 0;
522 bool write_ev = (events[i].events & EPOLLOUT) != 0;
523 if (read_ev || cancel) {
524 fd_become_readable(exec_ctx, fd, pollset);
525 }
526 if (write_ev || cancel) {
527 fd_become_writable(exec_ctx, fd);
528 }
529 }
530 }
531
532 return error;
533}
534
535static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
536 grpc_pollset_worker **worker_hdl, gpr_timespec *now,
537 gpr_timespec deadline) {
Craig Tiller4509c472017-04-27 19:05:13 +0000538 if (worker_hdl != NULL) *worker_hdl = worker;
539 worker->initialized_cv = false;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700540 worker->kick_state = UNKICKED;
Craig Tiller50da5ec2017-05-01 13:51:14 -0700541 worker->schedule_on_end_work = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
Craig Tillerba550da2017-05-01 14:26:31 +0000542 pollset->begin_refs++;
Craig Tiller4509c472017-04-27 19:05:13 +0000543
Craig Tiller32f90ee2017-04-28 12:46:41 -0700544 if (pollset->seen_inactive) {
545 // pollset has been observed to be inactive, we need to move back to the
546 // active list
Craig Tillere00d7332017-05-01 15:43:51 +0000547 bool is_reassigning = false;
548 if (!pollset->reassigning_neighbourhood) {
549 is_reassigning = true;
550 pollset->reassigning_neighbourhood = true;
551 pollset->neighbourhood = &g_neighbourhoods[choose_neighbourhood()];
552 }
553 pollset_neighbourhood *neighbourhood = pollset->neighbourhood;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700554 gpr_mu_unlock(&pollset->mu);
Craig Tillerba550da2017-05-01 14:26:31 +0000555 // pollset unlocked: state may change (even worker->kick_state)
556 retry_lock_neighbourhood:
Craig Tiller32f90ee2017-04-28 12:46:41 -0700557 gpr_mu_lock(&neighbourhood->mu);
558 gpr_mu_lock(&pollset->mu);
559 if (pollset->seen_inactive) {
Craig Tiller2acab6e2017-04-30 23:06:33 +0000560 if (neighbourhood != pollset->neighbourhood) {
561 gpr_mu_unlock(&neighbourhood->mu);
562 neighbourhood = pollset->neighbourhood;
563 gpr_mu_unlock(&pollset->mu);
564 goto retry_lock_neighbourhood;
565 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700566 pollset->seen_inactive = false;
Craig Tiller2acab6e2017-04-30 23:06:33 +0000567 if (neighbourhood->active_root == NULL) {
568 neighbourhood->active_root = pollset->next = pollset->prev = pollset;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700569 if (gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)worker)) {
Craig Tiller43bf2592017-04-28 23:21:01 +0000570 worker->kick_state = DESIGNATED_POLLER;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700571 }
Craig Tiller2acab6e2017-04-30 23:06:33 +0000572 } else {
573 pollset->next = neighbourhood->active_root;
574 pollset->prev = pollset->next->prev;
575 pollset->next->prev = pollset->prev->next = pollset;
Craig Tiller4509c472017-04-27 19:05:13 +0000576 }
577 }
Craig Tillere00d7332017-05-01 15:43:51 +0000578 if (is_reassigning) {
579 GPR_ASSERT(pollset->reassigning_neighbourhood);
580 pollset->reassigning_neighbourhood = false;
581 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700582 gpr_mu_unlock(&neighbourhood->mu);
583 }
584 worker_insert(pollset, worker);
Craig Tillerba550da2017-05-01 14:26:31 +0000585 pollset->begin_refs--;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700586 if (worker->kick_state == UNKICKED) {
Craig Tillera4b8eb02017-04-29 00:13:52 +0000587 GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700588 worker->initialized_cv = true;
589 gpr_cv_init(&worker->cv);
Craig Tillerba550da2017-05-01 14:26:31 +0000590 while (worker->kick_state == UNKICKED &&
591 pollset->shutdown_closure == NULL) {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700592 if (gpr_cv_wait(&worker->cv, &pollset->mu, deadline) &&
593 worker->kick_state == UNKICKED) {
594 worker->kick_state = KICKED;
595 }
Craig Tillerba550da2017-05-01 14:26:31 +0000596 }
Craig Tiller4509c472017-04-27 19:05:13 +0000597 *now = gpr_now(now->clock_type);
598 }
599
Craig Tiller43bf2592017-04-28 23:21:01 +0000600 return worker->kick_state == DESIGNATED_POLLER &&
Craig Tiller32f90ee2017-04-28 12:46:41 -0700601 pollset->shutdown_closure == NULL;
Craig Tiller4509c472017-04-27 19:05:13 +0000602}
603
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700604static bool check_neighbourhood_for_available_poller(
Craig Tillera4b8eb02017-04-29 00:13:52 +0000605 pollset_neighbourhood *neighbourhood) {
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700606 bool found_worker = false;
607 do {
608 grpc_pollset *inspect = neighbourhood->active_root;
609 if (inspect == NULL) {
610 break;
611 }
612 gpr_mu_lock(&inspect->mu);
613 GPR_ASSERT(!inspect->seen_inactive);
614 grpc_pollset_worker *inspect_worker = inspect->root_worker;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700615 if (inspect_worker != NULL) {
Craig Tillera4b8eb02017-04-29 00:13:52 +0000616 do {
Craig Tillerba550da2017-05-01 14:26:31 +0000617 switch (inspect_worker->kick_state) {
618 case UNKICKED:
619 if (gpr_atm_no_barrier_cas(&g_active_poller, 0,
620 (gpr_atm)inspect_worker)) {
621 inspect_worker->kick_state = DESIGNATED_POLLER;
622 if (inspect_worker->initialized_cv) {
623 gpr_cv_signal(&inspect_worker->cv);
624 }
Craig Tillera4b8eb02017-04-29 00:13:52 +0000625 }
Craig Tillerba550da2017-05-01 14:26:31 +0000626 // even if we didn't win the cas, there's a worker, we can stop
627 found_worker = true;
628 break;
629 case KICKED:
630 break;
631 case DESIGNATED_POLLER:
632 found_worker = true; // ok, so someone else found the worker, but
633 // we'll accept that
634 break;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700635 }
Craig Tillera4b8eb02017-04-29 00:13:52 +0000636 inspect_worker = inspect_worker->next;
637 } while (inspect_worker != inspect->root_worker);
638 }
639 if (!found_worker) {
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700640 inspect->seen_inactive = true;
Craig Tiller2acab6e2017-04-30 23:06:33 +0000641 if (inspect == neighbourhood->active_root) {
Craig Tillera95bacf2017-05-01 12:51:24 -0700642 neighbourhood->active_root =
643 inspect->next == inspect ? NULL : inspect->next;
Craig Tiller2acab6e2017-04-30 23:06:33 +0000644 }
645 inspect->next->prev = inspect->prev;
646 inspect->prev->next = inspect->next;
Craig Tillere00d7332017-05-01 15:43:51 +0000647 inspect->next = inspect->prev = NULL;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700648 }
649 gpr_mu_unlock(&inspect->mu);
650 } while (!found_worker);
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700651 return found_worker;
652}
653
Craig Tiller4509c472017-04-27 19:05:13 +0000654static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
655 grpc_pollset_worker *worker,
656 grpc_pollset_worker **worker_hdl) {
Craig Tiller8502ecb2017-04-28 14:22:01 -0700657 if (worker_hdl != NULL) *worker_hdl = NULL;
Craig Tillera4b8eb02017-04-29 00:13:52 +0000658 worker->kick_state = KICKED;
Craig Tiller50da5ec2017-05-01 13:51:14 -0700659 grpc_closure_list_move(&worker->schedule_on_end_work,
660 &exec_ctx->closure_list);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700661 if (gpr_atm_no_barrier_load(&g_active_poller) == (gpr_atm)worker) {
Craig Tillera4b8eb02017-04-29 00:13:52 +0000662 if (worker->next != worker && worker->next->kick_state == UNKICKED) {
Craig Tiller2acab6e2017-04-30 23:06:33 +0000663 GPR_ASSERT(worker->next->initialized_cv);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700664 gpr_atm_no_barrier_store(&g_active_poller, (gpr_atm)worker->next);
Craig Tiller43bf2592017-04-28 23:21:01 +0000665 worker->next->kick_state = DESIGNATED_POLLER;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700666 gpr_cv_signal(&worker->next->cv);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700667 if (grpc_exec_ctx_has_work(exec_ctx)) {
668 gpr_mu_unlock(&pollset->mu);
669 grpc_exec_ctx_flush(exec_ctx);
670 gpr_mu_lock(&pollset->mu);
671 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700672 } else {
673 gpr_atm_no_barrier_store(&g_active_poller, 0);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700674 gpr_mu_unlock(&pollset->mu);
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700675 size_t poller_neighbourhood_idx =
676 (size_t)(pollset->neighbourhood - g_neighbourhoods);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700677 bool found_worker = false;
Craig Tillerba550da2017-05-01 14:26:31 +0000678 bool scan_state[MAX_NEIGHBOURHOODS];
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700679 for (size_t i = 0; !found_worker && i < g_num_neighbourhoods; i++) {
680 pollset_neighbourhood *neighbourhood =
681 &g_neighbourhoods[(poller_neighbourhood_idx + i) %
682 g_num_neighbourhoods];
683 if (gpr_mu_trylock(&neighbourhood->mu)) {
684 found_worker =
Craig Tillera4b8eb02017-04-29 00:13:52 +0000685 check_neighbourhood_for_available_poller(neighbourhood);
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700686 gpr_mu_unlock(&neighbourhood->mu);
Craig Tillerba550da2017-05-01 14:26:31 +0000687 scan_state[i] = true;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700688 } else {
Craig Tillerba550da2017-05-01 14:26:31 +0000689 scan_state[i] = false;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700690 }
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700691 }
Craig Tiller2acab6e2017-04-30 23:06:33 +0000692 for (size_t i = 0; !found_worker && i < g_num_neighbourhoods; i++) {
Craig Tillerba550da2017-05-01 14:26:31 +0000693 if (scan_state[i]) continue;
Craig Tiller2acab6e2017-04-30 23:06:33 +0000694 pollset_neighbourhood *neighbourhood =
695 &g_neighbourhoods[(poller_neighbourhood_idx + i) %
696 g_num_neighbourhoods];
697 gpr_mu_lock(&neighbourhood->mu);
Craig Tillerba550da2017-05-01 14:26:31 +0000698 found_worker = check_neighbourhood_for_available_poller(neighbourhood);
Craig Tiller2acab6e2017-04-30 23:06:33 +0000699 gpr_mu_unlock(&neighbourhood->mu);
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700700 }
Craig Tiller8502ecb2017-04-28 14:22:01 -0700701 grpc_exec_ctx_flush(exec_ctx);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700702 gpr_mu_lock(&pollset->mu);
703 }
Craig Tiller50da5ec2017-05-01 13:51:14 -0700704 } else if (grpc_exec_ctx_has_work(exec_ctx)) {
705 gpr_mu_unlock(&pollset->mu);
706 grpc_exec_ctx_flush(exec_ctx);
707 gpr_mu_lock(&pollset->mu);
Craig Tiller4509c472017-04-27 19:05:13 +0000708 }
709 if (worker->initialized_cv) {
710 gpr_cv_destroy(&worker->cv);
711 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700712 if (EMPTIED == worker_remove(pollset, worker)) {
Craig Tiller4509c472017-04-27 19:05:13 +0000713 pollset_maybe_finish_shutdown(exec_ctx, pollset);
714 }
Craig Tillera4b8eb02017-04-29 00:13:52 +0000715 GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
Craig Tiller4509c472017-04-27 19:05:13 +0000716}
717
718/* pollset->po.mu lock must be held by the caller before calling this.
719 The function pollset_work() may temporarily release the lock (pollset->po.mu)
720 during the course of its execution but it will always re-acquire the lock and
721 ensure that it is held by the time the function returns */
722static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
723 grpc_pollset_worker **worker_hdl,
724 gpr_timespec now, gpr_timespec deadline) {
725 grpc_pollset_worker worker;
726 grpc_error *error = GRPC_ERROR_NONE;
727 static const char *err_desc = "pollset_work";
728 if (pollset->kicked_without_poller) {
729 pollset->kicked_without_poller = false;
730 return GRPC_ERROR_NONE;
731 }
Craig Tiller8502ecb2017-04-28 14:22:01 -0700732 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
Craig Tiller4509c472017-04-27 19:05:13 +0000733 if (begin_worker(pollset, &worker, worker_hdl, &now, deadline)) {
Craig Tiller4509c472017-04-27 19:05:13 +0000734 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
735 GPR_ASSERT(!pollset->shutdown_closure);
Craig Tiller2acab6e2017-04-30 23:06:33 +0000736 GPR_ASSERT(!pollset->seen_inactive);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700737 gpr_mu_unlock(&pollset->mu);
Craig Tiller4509c472017-04-27 19:05:13 +0000738 append_error(&error, pollset_epoll(exec_ctx, pollset, now, deadline),
739 err_desc);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700740 gpr_mu_lock(&pollset->mu);
Craig Tiller4509c472017-04-27 19:05:13 +0000741 gpr_tls_set(&g_current_thread_worker, 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000742 }
743 end_worker(exec_ctx, pollset, &worker, worker_hdl);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700744 gpr_tls_set(&g_current_thread_pollset, 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000745 return error;
746}
747
748static grpc_error *pollset_kick(grpc_pollset *pollset,
749 grpc_pollset_worker *specific_worker) {
750 if (specific_worker == NULL) {
751 if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) {
Craig Tiller375eb252017-04-27 23:29:12 +0000752 grpc_pollset_worker *root_worker = pollset->root_worker;
753 if (root_worker == NULL) {
Craig Tiller4509c472017-04-27 19:05:13 +0000754 pollset->kicked_without_poller = true;
755 return GRPC_ERROR_NONE;
Craig Tiller375eb252017-04-27 23:29:12 +0000756 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700757 grpc_pollset_worker *next_worker = root_worker->next;
758 if (root_worker == next_worker &&
759 root_worker == (grpc_pollset_worker *)gpr_atm_no_barrier_load(
760 &g_active_poller)) {
761 root_worker->kick_state = KICKED;
Craig Tiller4509c472017-04-27 19:05:13 +0000762 return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700763 } else if (next_worker->kick_state == UNKICKED) {
764 GPR_ASSERT(next_worker->initialized_cv);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700765 next_worker->kick_state = KICKED;
Craig Tiller375eb252017-04-27 23:29:12 +0000766 gpr_cv_signal(&next_worker->cv);
767 return GRPC_ERROR_NONE;
Craig Tiller8502ecb2017-04-28 14:22:01 -0700768 } else {
769 return GRPC_ERROR_NONE;
Craig Tiller4509c472017-04-27 19:05:13 +0000770 }
771 } else {
772 return GRPC_ERROR_NONE;
773 }
Craig Tiller43bf2592017-04-28 23:21:01 +0000774 } else if (specific_worker->kick_state == KICKED) {
Craig Tiller4509c472017-04-27 19:05:13 +0000775 return GRPC_ERROR_NONE;
776 } else if (gpr_tls_get(&g_current_thread_worker) ==
777 (intptr_t)specific_worker) {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700778 specific_worker->kick_state = KICKED;
Craig Tiller4509c472017-04-27 19:05:13 +0000779 return GRPC_ERROR_NONE;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700780 } else if (specific_worker ==
781 (grpc_pollset_worker *)gpr_atm_no_barrier_load(&g_active_poller)) {
782 specific_worker->kick_state = KICKED;
Craig Tiller4509c472017-04-27 19:05:13 +0000783 return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700784 } else if (specific_worker->initialized_cv) {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700785 specific_worker->kick_state = KICKED;
Craig Tiller4509c472017-04-27 19:05:13 +0000786 gpr_cv_signal(&specific_worker->cv);
787 return GRPC_ERROR_NONE;
Craig Tiller8502ecb2017-04-28 14:22:01 -0700788 } else {
789 specific_worker->kick_state = KICKED;
790 return GRPC_ERROR_NONE;
Craig Tiller4509c472017-04-27 19:05:13 +0000791 }
792}
793
794static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
795 grpc_fd *fd) {}
796
797static grpc_error *kick_poller(void) {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700798 gpr_atm_no_barrier_store(&g_timer_kick, 1);
Craig Tiller4509c472017-04-27 19:05:13 +0000799 return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
800}
801
802/*******************************************************************************
803 * Workqueue Definitions
804 */
805
806#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
807static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
808 const char *file, int line,
809 const char *reason) {
810 return workqueue;
811}
812
813static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
814 const char *file, int line, const char *reason) {}
815#else
816static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
817 return workqueue;
818}
819
820static void workqueue_unref(grpc_exec_ctx *exec_ctx,
821 grpc_workqueue *workqueue) {}
822#endif
823
Craig Tiller50da5ec2017-05-01 13:51:14 -0700824static void wq_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
825 grpc_error *error) {
826 // find a neighbourhood to wakeup
827 bool scheduled = false;
828 size_t initial_neighbourhood = choose_neighbourhood();
829 for (size_t i = 0; !scheduled && i < g_num_neighbourhoods; i++) {
830 pollset_neighbourhood *neighbourhood =
831 &g_neighbourhoods[(initial_neighbourhood + i) % g_num_neighbourhoods];
832 if (gpr_mu_trylock(&neighbourhood->mu)) {
833 if (neighbourhood->active_root != NULL) {
834 grpc_pollset *inspect = neighbourhood->active_root;
835 do {
836 if (gpr_mu_trylock(&inspect->mu)) {
837 if (inspect->root_worker != NULL) {
838 grpc_pollset_worker *inspect_worker = inspect->root_worker;
839 do {
840 if (inspect_worker->kick_state == UNKICKED) {
841 inspect_worker->kick_state = KICKED;
842 grpc_closure_list_append(
843 &inspect_worker->schedule_on_end_work, closure, error);
844 if (inspect_worker->initialized_cv) {
845 gpr_cv_signal(&inspect_worker->cv);
846 }
847 scheduled = true;
848 }
849 inspect_worker = inspect_worker->next;
850 } while (!scheduled && inspect_worker != inspect->root_worker);
851 }
852 gpr_mu_unlock(&inspect->mu);
853 }
854 inspect = inspect->next;
855 } while (!scheduled && inspect != neighbourhood->active_root);
856 }
857 gpr_mu_unlock(&neighbourhood->mu);
858 }
859 }
860 if (!scheduled) {
861 closure->error_data.error = error;
862 gpr_mpscq_push(&g_workqueue_items, &closure->next_data.atm_next);
863 GRPC_LOG_IF_ERROR("workqueue_scheduler",
864 grpc_wakeup_fd_wakeup(&global_wakeup_fd));
865 }
866}
867
868static const grpc_closure_scheduler_vtable
869 singleton_workqueue_scheduler_vtable = {wq_sched, wq_sched,
870 "epoll1_workqueue"};
871
872static grpc_closure_scheduler singleton_workqueue_scheduler = {
873 &singleton_workqueue_scheduler_vtable};
874
Craig Tiller4509c472017-04-27 19:05:13 +0000875static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) {
Craig Tiller50da5ec2017-05-01 13:51:14 -0700876 return &singleton_workqueue_scheduler;
Craig Tiller4509c472017-04-27 19:05:13 +0000877}
Craig Tillerc67cc992017-04-27 10:15:51 -0700878
879/*******************************************************************************
880 * Pollset-set Definitions
881 */
882
883static grpc_pollset_set *pollset_set_create(void) {
884 return (grpc_pollset_set *)((intptr_t)0xdeafbeef);
885}
886
887static void pollset_set_destroy(grpc_exec_ctx *exec_ctx,
888 grpc_pollset_set *pss) {}
889
890static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
891 grpc_fd *fd) {}
892
893static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
894 grpc_fd *fd) {}
895
896static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
897 grpc_pollset_set *pss, grpc_pollset *ps) {}
898
899static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
900 grpc_pollset_set *pss, grpc_pollset *ps) {}
901
902static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
903 grpc_pollset_set *bag,
904 grpc_pollset_set *item) {}
905
906static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
907 grpc_pollset_set *bag,
908 grpc_pollset_set *item) {}
909
910/*******************************************************************************
911 * Event engine binding
912 */
913
914static void shutdown_engine(void) {
915 fd_global_shutdown();
916 pollset_global_shutdown();
Craig Tillerc67cc992017-04-27 10:15:51 -0700917}
918
919static const grpc_event_engine_vtable vtable = {
920 .pollset_size = sizeof(grpc_pollset),
921
922 .fd_create = fd_create,
923 .fd_wrapped_fd = fd_wrapped_fd,
924 .fd_orphan = fd_orphan,
925 .fd_shutdown = fd_shutdown,
926 .fd_is_shutdown = fd_is_shutdown,
927 .fd_notify_on_read = fd_notify_on_read,
928 .fd_notify_on_write = fd_notify_on_write,
929 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
930 .fd_get_workqueue = fd_get_workqueue,
931
932 .pollset_init = pollset_init,
933 .pollset_shutdown = pollset_shutdown,
934 .pollset_destroy = pollset_destroy,
935 .pollset_work = pollset_work,
936 .pollset_kick = pollset_kick,
937 .pollset_add_fd = pollset_add_fd,
938
939 .pollset_set_create = pollset_set_create,
940 .pollset_set_destroy = pollset_set_destroy,
941 .pollset_set_add_pollset = pollset_set_add_pollset,
942 .pollset_set_del_pollset = pollset_set_del_pollset,
943 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
944 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
945 .pollset_set_add_fd = pollset_set_add_fd,
946 .pollset_set_del_fd = pollset_set_del_fd,
947
948 .kick_poller = kick_poller,
949
950 .workqueue_ref = workqueue_ref,
951 .workqueue_unref = workqueue_unref,
952 .workqueue_scheduler = workqueue_scheduler,
953
954 .shutdown_engine = shutdown_engine,
955};
956
957/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
958 * Create a dummy epoll_fd to make sure epoll support is available */
Craig Tiller6f0af492017-04-27 19:26:16 +0000959const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) {
Craig Tillerc67cc992017-04-27 10:15:51 -0700960 if (!grpc_has_wakeup_fd()) {
961 return NULL;
962 }
963
Craig Tiller4509c472017-04-27 19:05:13 +0000964 g_epfd = epoll_create1(EPOLL_CLOEXEC);
965 if (g_epfd < 0) {
966 gpr_log(GPR_ERROR, "epoll unavailable");
Craig Tillerc67cc992017-04-27 10:15:51 -0700967 return NULL;
968 }
969
Craig Tillerc67cc992017-04-27 10:15:51 -0700970 fd_global_init();
971
972 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
Craig Tiller4509c472017-04-27 19:05:13 +0000973 close(g_epfd);
974 fd_global_shutdown();
Craig Tillerc67cc992017-04-27 10:15:51 -0700975 return NULL;
976 }
977
978 return &vtable;
979}
980
981#else /* defined(GRPC_LINUX_EPOLL) */
982#if defined(GRPC_POSIX_SOCKET)
983#include "src/core/lib/iomgr/ev_posix.h"
984/* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
985 * NULL */
Craig Tiller9ddb3152017-04-27 21:32:56 +0000986const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) {
987 return NULL;
988}
Craig Tillerc67cc992017-04-27 10:15:51 -0700989#endif /* defined(GRPC_POSIX_SOCKET) */
990#endif /* !defined(GRPC_LINUX_EPOLL) */