blob: 0f067b52a3a68398ef8fd08e03ff5a2178d7ac73 [file] [log] [blame]
Craig Tillerc67cc992017-04-27 10:15:51 -07001/*
2 *
Craig Tillerd4838a92017-04-27 12:08:18 -07003 * Copyright 2017, Google Inc.
Craig Tillerc67cc992017-04-27 10:15:51 -07004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/lib/iomgr/port.h"
35
36/* This polling engine is only relevant on linux kernels supporting epoll() */
37#ifdef GRPC_LINUX_EPOLL
38
Craig Tiller4509c472017-04-27 19:05:13 +000039#include "src/core/lib/iomgr/ev_epoll1_linux.h"
Craig Tillerc67cc992017-04-27 10:15:51 -070040
41#include <assert.h>
42#include <errno.h>
43#include <poll.h>
44#include <pthread.h>
45#include <string.h>
46#include <sys/epoll.h>
47#include <sys/socket.h>
48#include <unistd.h>
49
50#include <grpc/support/alloc.h>
Craig Tiller6de05932017-04-28 09:17:38 -070051#include <grpc/support/cpu.h>
Craig Tillerc67cc992017-04-27 10:15:51 -070052#include <grpc/support/log.h>
53#include <grpc/support/string_util.h>
54#include <grpc/support/tls.h>
55#include <grpc/support/useful.h>
56
57#include "src/core/lib/iomgr/ev_posix.h"
58#include "src/core/lib/iomgr/iomgr_internal.h"
59#include "src/core/lib/iomgr/lockfree_event.h"
Craig Tillerc67cc992017-04-27 10:15:51 -070060#include "src/core/lib/iomgr/wakeup_fd_posix.h"
61#include "src/core/lib/iomgr/workqueue.h"
62#include "src/core/lib/profiling/timers.h"
63#include "src/core/lib/support/block_annotate.h"
64
Craig Tillerc67cc992017-04-27 10:15:51 -070065static grpc_wakeup_fd global_wakeup_fd;
66static int g_epfd;
67
68/*******************************************************************************
69 * Fd Declarations
70 */
71
72struct grpc_fd {
73 int fd;
74
Craig Tillerc67cc992017-04-27 10:15:51 -070075 gpr_atm read_closure;
76 gpr_atm write_closure;
77
78 struct grpc_fd *freelist_next;
Craig Tillerc67cc992017-04-27 10:15:51 -070079
80 /* The pollset that last noticed that the fd is readable. The actual type
81 * stored in this is (grpc_pollset *) */
82 gpr_atm read_notifier_pollset;
83
84 grpc_iomgr_object iomgr_object;
85};
86
87static void fd_global_init(void);
88static void fd_global_shutdown(void);
89
90/*******************************************************************************
91 * Pollset Declarations
92 */
93
Craig Tiller43bf2592017-04-28 23:21:01 +000094typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state;
Craig Tillerc67cc992017-04-27 10:15:51 -070095
96struct grpc_pollset_worker {
Craig Tiller32f90ee2017-04-28 12:46:41 -070097 kick_state kick_state;
Craig Tillerc67cc992017-04-27 10:15:51 -070098 bool initialized_cv;
Craig Tiller32f90ee2017-04-28 12:46:41 -070099 grpc_pollset_worker *next;
100 grpc_pollset_worker *prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700101 gpr_cv cv;
Craig Tiller50da5ec2017-05-01 13:51:14 -0700102 grpc_closure_list schedule_on_end_work;
Craig Tillerc67cc992017-04-27 10:15:51 -0700103};
104
Craig Tillerba550da2017-05-01 14:26:31 +0000105#define MAX_NEIGHBOURHOODS 1024
106
Craig Tiller6de05932017-04-28 09:17:38 -0700107typedef struct pollset_neighbourhood {
108 gpr_mu mu;
109 grpc_pollset *active_root;
Craig Tiller6de05932017-04-28 09:17:38 -0700110 char pad[GPR_CACHELINE_SIZE];
111} pollset_neighbourhood;
112
Craig Tillerc67cc992017-04-27 10:15:51 -0700113struct grpc_pollset {
Craig Tiller6de05932017-04-28 09:17:38 -0700114 gpr_mu mu;
115 pollset_neighbourhood *neighbourhood;
Craig Tillere00d7332017-05-01 15:43:51 +0000116 bool reassigning_neighbourhood;
Craig Tiller4509c472017-04-27 19:05:13 +0000117 grpc_pollset_worker *root_worker;
118 bool kicked_without_poller;
Craig Tiller6de05932017-04-28 09:17:38 -0700119 bool seen_inactive;
Craig Tillerc67cc992017-04-27 10:15:51 -0700120 bool shutting_down; /* Is the pollset shutting down ? */
121 bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
Craig Tiller4509c472017-04-27 19:05:13 +0000122 grpc_closure *shutdown_closure; /* Called after after shutdown is complete */
Craig Tillerba550da2017-05-01 14:26:31 +0000123 int begin_refs;
Craig Tiller6de05932017-04-28 09:17:38 -0700124
125 grpc_pollset *next;
126 grpc_pollset *prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700127};
128
129/*******************************************************************************
130 * Pollset-set Declarations
131 */
Craig Tiller6de05932017-04-28 09:17:38 -0700132
Craig Tillerc67cc992017-04-27 10:15:51 -0700133struct grpc_pollset_set {};
134
135/*******************************************************************************
136 * Common helpers
137 */
138
139static bool append_error(grpc_error **composite, grpc_error *error,
140 const char *desc) {
141 if (error == GRPC_ERROR_NONE) return true;
142 if (*composite == GRPC_ERROR_NONE) {
143 *composite = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc);
144 }
145 *composite = grpc_error_add_child(*composite, error);
146 return false;
147}
148
149/*******************************************************************************
150 * Fd Definitions
151 */
152
153/* We need to keep a freelist not because of any concerns of malloc performance
154 * but instead so that implementations with multiple threads in (for example)
155 * epoll_wait deal with the race between pollset removal and incoming poll
156 * notifications.
157 *
158 * The problem is that the poller ultimately holds a reference to this
159 * object, so it is very difficult to know when is safe to free it, at least
160 * without some expensive synchronization.
161 *
162 * If we keep the object freelisted, in the worst case losing this race just
163 * becomes a spurious read notification on a reused fd.
164 */
165
166/* The alarm system needs to be able to wakeup 'some poller' sometimes
167 * (specifically when a new alarm needs to be triggered earlier than the next
168 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
169 * case occurs. */
170
171static grpc_fd *fd_freelist = NULL;
172static gpr_mu fd_freelist_mu;
173
Craig Tillerc67cc992017-04-27 10:15:51 -0700174static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
175
176static void fd_global_shutdown(void) {
177 gpr_mu_lock(&fd_freelist_mu);
178 gpr_mu_unlock(&fd_freelist_mu);
179 while (fd_freelist != NULL) {
180 grpc_fd *fd = fd_freelist;
181 fd_freelist = fd_freelist->freelist_next;
Craig Tillerc67cc992017-04-27 10:15:51 -0700182 gpr_free(fd);
183 }
184 gpr_mu_destroy(&fd_freelist_mu);
185}
186
187static grpc_fd *fd_create(int fd, const char *name) {
188 grpc_fd *new_fd = NULL;
189
190 gpr_mu_lock(&fd_freelist_mu);
191 if (fd_freelist != NULL) {
192 new_fd = fd_freelist;
193 fd_freelist = fd_freelist->freelist_next;
194 }
195 gpr_mu_unlock(&fd_freelist_mu);
196
197 if (new_fd == NULL) {
198 new_fd = gpr_malloc(sizeof(grpc_fd));
Craig Tillerc67cc992017-04-27 10:15:51 -0700199 }
200
Craig Tillerc67cc992017-04-27 10:15:51 -0700201 new_fd->fd = fd;
Craig Tillerc67cc992017-04-27 10:15:51 -0700202 grpc_lfev_init(&new_fd->read_closure);
203 grpc_lfev_init(&new_fd->write_closure);
204 gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
205
206 new_fd->freelist_next = NULL;
Craig Tillerc67cc992017-04-27 10:15:51 -0700207
208 char *fd_name;
209 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
210 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
211#ifdef GRPC_FD_REF_COUNT_DEBUG
212 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, (void *)new_fd, fd_name);
213#endif
214 gpr_free(fd_name);
Craig Tiller9ddb3152017-04-27 21:32:56 +0000215
216 struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET),
217 .data.ptr = new_fd};
218 if (epoll_ctl(g_epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
219 gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno));
220 }
221
Craig Tillerc67cc992017-04-27 10:15:51 -0700222 return new_fd;
223}
224
Craig Tiller4509c472017-04-27 19:05:13 +0000225static int fd_wrapped_fd(grpc_fd *fd) { return fd->fd; }
Craig Tillerc67cc992017-04-27 10:15:51 -0700226
Craig Tiller9ddb3152017-04-27 21:32:56 +0000227/* Might be called multiple times */
228static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
229 if (grpc_lfev_set_shutdown(exec_ctx, &fd->read_closure,
230 GRPC_ERROR_REF(why))) {
231 shutdown(fd->fd, SHUT_RDWR);
232 grpc_lfev_set_shutdown(exec_ctx, &fd->write_closure, GRPC_ERROR_REF(why));
233 }
234 GRPC_ERROR_UNREF(why);
235}
236
Craig Tillerc67cc992017-04-27 10:15:51 -0700237static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
238 grpc_closure *on_done, int *release_fd,
239 const char *reason) {
Craig Tillerc67cc992017-04-27 10:15:51 -0700240 grpc_error *error = GRPC_ERROR_NONE;
Craig Tillerc67cc992017-04-27 10:15:51 -0700241
Craig Tiller9ddb3152017-04-27 21:32:56 +0000242 if (!grpc_lfev_is_shutdown(&fd->read_closure)) {
243 fd_shutdown(exec_ctx, fd, GRPC_ERROR_CREATE_FROM_COPIED_STRING(reason));
244 }
245
Craig Tillerc67cc992017-04-27 10:15:51 -0700246 /* If release_fd is not NULL, we should be relinquishing control of the file
247 descriptor fd->fd (but we still own the grpc_fd structure). */
248 if (release_fd != NULL) {
249 *release_fd = fd->fd;
250 } else {
251 close(fd->fd);
Craig Tillerc67cc992017-04-27 10:15:51 -0700252 }
253
Craig Tiller4509c472017-04-27 19:05:13 +0000254 grpc_closure_sched(exec_ctx, on_done, GRPC_ERROR_REF(error));
Craig Tillerc67cc992017-04-27 10:15:51 -0700255
Craig Tiller4509c472017-04-27 19:05:13 +0000256 grpc_iomgr_unregister_object(&fd->iomgr_object);
257 grpc_lfev_destroy(&fd->read_closure);
258 grpc_lfev_destroy(&fd->write_closure);
Craig Tillerc67cc992017-04-27 10:15:51 -0700259
Craig Tiller4509c472017-04-27 19:05:13 +0000260 gpr_mu_lock(&fd_freelist_mu);
261 fd->freelist_next = fd_freelist;
262 fd_freelist = fd;
263 gpr_mu_unlock(&fd_freelist_mu);
Craig Tillerc67cc992017-04-27 10:15:51 -0700264}
265
266static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
267 grpc_fd *fd) {
268 gpr_atm notifier = gpr_atm_acq_load(&fd->read_notifier_pollset);
269 return (grpc_pollset *)notifier;
270}
271
272static bool fd_is_shutdown(grpc_fd *fd) {
273 return grpc_lfev_is_shutdown(&fd->read_closure);
274}
275
Craig Tillerc67cc992017-04-27 10:15:51 -0700276static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
277 grpc_closure *closure) {
278 grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure);
279}
280
281static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
282 grpc_closure *closure) {
283 grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
284}
285
286static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
Craig Tiller50da5ec2017-05-01 13:51:14 -0700287 return (grpc_workqueue *)0xb0b51ed;
Craig Tiller4509c472017-04-27 19:05:13 +0000288}
289
290static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
291 grpc_pollset *notifier) {
292 grpc_lfev_set_ready(exec_ctx, &fd->read_closure);
293
294 /* Note, it is possible that fd_become_readable might be called twice with
295 different 'notifier's when an fd becomes readable and it is in two epoll
296 sets (This can happen briefly during polling island merges). In such cases
297 it does not really matter which notifer is set as the read_notifier_pollset
298 (They would both point to the same polling island anyway) */
299 /* Use release store to match with acquire load in fd_get_read_notifier */
300 gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
301}
302
303static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
304 grpc_lfev_set_ready(exec_ctx, &fd->write_closure);
Craig Tillerc67cc992017-04-27 10:15:51 -0700305}
306
307/*******************************************************************************
308 * Pollset Definitions
309 */
310
Craig Tiller6de05932017-04-28 09:17:38 -0700311GPR_TLS_DECL(g_current_thread_pollset);
312GPR_TLS_DECL(g_current_thread_worker);
313static gpr_atm g_active_poller;
314static pollset_neighbourhood *g_neighbourhoods;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700315static size_t g_num_neighbourhoods;
Craig Tiller67e229e2017-05-01 20:57:59 +0000316static gpr_mu g_wq_mu;
317static grpc_closure_list g_wq_items;
Craig Tiller6de05932017-04-28 09:17:38 -0700318
Craig Tillerc67cc992017-04-27 10:15:51 -0700319/* Return true if first in list */
Craig Tiller32f90ee2017-04-28 12:46:41 -0700320static bool worker_insert(grpc_pollset *pollset, grpc_pollset_worker *worker) {
321 if (pollset->root_worker == NULL) {
322 pollset->root_worker = worker;
323 worker->next = worker->prev = worker;
Craig Tillerc67cc992017-04-27 10:15:51 -0700324 return true;
325 } else {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700326 worker->next = pollset->root_worker;
327 worker->prev = worker->next->prev;
328 worker->next->prev = worker;
329 worker->prev->next = worker;
Craig Tillerc67cc992017-04-27 10:15:51 -0700330 return false;
331 }
332}
333
334/* Return true if last in list */
335typedef enum { EMPTIED, NEW_ROOT, REMOVED } worker_remove_result;
336
Craig Tiller32f90ee2017-04-28 12:46:41 -0700337static worker_remove_result worker_remove(grpc_pollset *pollset,
Craig Tillerc67cc992017-04-27 10:15:51 -0700338 grpc_pollset_worker *worker) {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700339 if (worker == pollset->root_worker) {
340 if (worker == worker->next) {
341 pollset->root_worker = NULL;
Craig Tillerc67cc992017-04-27 10:15:51 -0700342 return EMPTIED;
343 } else {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700344 pollset->root_worker = worker->next;
345 worker->prev->next = worker->next;
346 worker->next->prev = worker->prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700347 return NEW_ROOT;
348 }
349 } else {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700350 worker->prev->next = worker->next;
351 worker->next->prev = worker->prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700352 return REMOVED;
353 }
354}
355
Craig Tillerba550da2017-05-01 14:26:31 +0000356static size_t choose_neighbourhood(void) {
357 return (size_t)gpr_cpu_current_cpu() % g_num_neighbourhoods;
358}
359
Craig Tiller4509c472017-04-27 19:05:13 +0000360static grpc_error *pollset_global_init(void) {
Craig Tiller4509c472017-04-27 19:05:13 +0000361 gpr_tls_init(&g_current_thread_pollset);
362 gpr_tls_init(&g_current_thread_worker);
Craig Tiller6de05932017-04-28 09:17:38 -0700363 gpr_atm_no_barrier_store(&g_active_poller, 0);
Craig Tiller375eb252017-04-27 23:29:12 +0000364 global_wakeup_fd.read_fd = -1;
365 grpc_error *err = grpc_wakeup_fd_init(&global_wakeup_fd);
Craig Tiller67e229e2017-05-01 20:57:59 +0000366 gpr_mu_init(&g_wq_mu);
367 g_wq_items = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
Craig Tiller375eb252017-04-27 23:29:12 +0000368 if (err != GRPC_ERROR_NONE) return err;
Craig Tiller4509c472017-04-27 19:05:13 +0000369 struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLET),
370 .data.ptr = &global_wakeup_fd};
371 if (epoll_ctl(g_epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd, &ev) != 0) {
372 return GRPC_OS_ERROR(errno, "epoll_ctl");
373 }
Craig Tillerba550da2017-05-01 14:26:31 +0000374 g_num_neighbourhoods = GPR_CLAMP(gpr_cpu_num_cores(), 1, MAX_NEIGHBOURHOODS);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700375 g_neighbourhoods =
376 gpr_zalloc(sizeof(*g_neighbourhoods) * g_num_neighbourhoods);
377 for (size_t i = 0; i < g_num_neighbourhoods; i++) {
378 gpr_mu_init(&g_neighbourhoods[i].mu);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700379 }
Craig Tiller4509c472017-04-27 19:05:13 +0000380 return GRPC_ERROR_NONE;
381}
382
383static void pollset_global_shutdown(void) {
Craig Tiller4509c472017-04-27 19:05:13 +0000384 gpr_tls_destroy(&g_current_thread_pollset);
385 gpr_tls_destroy(&g_current_thread_worker);
Craig Tiller67e229e2017-05-01 20:57:59 +0000386 gpr_mu_destroy(&g_wq_mu);
Craig Tiller375eb252017-04-27 23:29:12 +0000387 if (global_wakeup_fd.read_fd != -1) grpc_wakeup_fd_destroy(&global_wakeup_fd);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700388 for (size_t i = 0; i < g_num_neighbourhoods; i++) {
389 gpr_mu_destroy(&g_neighbourhoods[i].mu);
390 }
391 gpr_free(g_neighbourhoods);
Craig Tiller4509c472017-04-27 19:05:13 +0000392}
393
394static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
Craig Tiller6de05932017-04-28 09:17:38 -0700395 gpr_mu_init(&pollset->mu);
396 *mu = &pollset->mu;
Craig Tillerba550da2017-05-01 14:26:31 +0000397 pollset->neighbourhood = &g_neighbourhoods[choose_neighbourhood()];
Craig Tiller6de05932017-04-28 09:17:38 -0700398 pollset->seen_inactive = true;
Craig Tiller6de05932017-04-28 09:17:38 -0700399}
400
Craig Tillerc6109852017-05-01 14:26:49 -0700401static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
Craig Tillere00d7332017-05-01 15:43:51 +0000402 gpr_mu_lock(&pollset->mu);
Craig Tillerba550da2017-05-01 14:26:31 +0000403 if (!pollset->seen_inactive) {
Craig Tillere00d7332017-05-01 15:43:51 +0000404 pollset_neighbourhood *neighbourhood = pollset->neighbourhood;
405 gpr_mu_unlock(&pollset->mu);
Craig Tillera95bacf2017-05-01 12:51:24 -0700406 retry_lock_neighbourhood:
Craig Tillere00d7332017-05-01 15:43:51 +0000407 gpr_mu_lock(&neighbourhood->mu);
408 gpr_mu_lock(&pollset->mu);
409 if (!pollset->seen_inactive) {
410 if (pollset->neighbourhood != neighbourhood) {
411 gpr_mu_unlock(&neighbourhood->mu);
412 neighbourhood = pollset->neighbourhood;
413 gpr_mu_unlock(&pollset->mu);
414 goto retry_lock_neighbourhood;
415 }
416 pollset->prev->next = pollset->next;
417 pollset->next->prev = pollset->prev;
418 if (pollset == pollset->neighbourhood->active_root) {
419 pollset->neighbourhood->active_root =
420 pollset->next == pollset ? NULL : pollset->next;
421 }
Craig Tillerba550da2017-05-01 14:26:31 +0000422 }
423 gpr_mu_unlock(&pollset->neighbourhood->mu);
Craig Tiller6de05932017-04-28 09:17:38 -0700424 }
Craig Tillere00d7332017-05-01 15:43:51 +0000425 gpr_mu_unlock(&pollset->mu);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700426 gpr_mu_destroy(&pollset->mu);
Craig Tiller4509c472017-04-27 19:05:13 +0000427}
428
429static grpc_error *pollset_kick_all(grpc_pollset *pollset) {
430 grpc_error *error = GRPC_ERROR_NONE;
431 if (pollset->root_worker != NULL) {
432 grpc_pollset_worker *worker = pollset->root_worker;
433 do {
434 if (worker->initialized_cv) {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700435 worker->kick_state = KICKED;
Craig Tiller4509c472017-04-27 19:05:13 +0000436 gpr_cv_signal(&worker->cv);
437 } else {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700438 worker->kick_state = KICKED;
Craig Tiller4509c472017-04-27 19:05:13 +0000439 append_error(&error, grpc_wakeup_fd_wakeup(&global_wakeup_fd),
440 "pollset_shutdown");
441 }
442
Craig Tiller32f90ee2017-04-28 12:46:41 -0700443 worker = worker->next;
Craig Tiller4509c472017-04-27 19:05:13 +0000444 } while (worker != pollset->root_worker);
445 }
446 return error;
447}
448
449static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx,
450 grpc_pollset *pollset) {
Craig Tillerba550da2017-05-01 14:26:31 +0000451 if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL &&
452 pollset->begin_refs == 0) {
Craig Tiller4509c472017-04-27 19:05:13 +0000453 grpc_closure_sched(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE);
454 pollset->shutdown_closure = NULL;
455 }
456}
457
458static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
459 grpc_closure *closure) {
460 GPR_ASSERT(pollset->shutdown_closure == NULL);
461 pollset->shutdown_closure = closure;
462 GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(pollset));
463 pollset_maybe_finish_shutdown(exec_ctx, pollset);
464}
465
Craig Tillera95bacf2017-05-01 12:51:24 -0700466#define MAX_EPOLL_EVENTS 100
Craig Tiller4509c472017-04-27 19:05:13 +0000467
468static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
469 gpr_timespec now) {
470 gpr_timespec timeout;
471 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
472 return -1;
473 }
474
475 if (gpr_time_cmp(deadline, now) <= 0) {
476 return 0;
477 }
478
479 static const gpr_timespec round_up = {
480 .clock_type = GPR_TIMESPAN, .tv_sec = 0, .tv_nsec = GPR_NS_PER_MS - 1};
481 timeout = gpr_time_sub(deadline, now);
482 int millis = gpr_time_to_millis(gpr_time_add(timeout, round_up));
483 return millis >= 1 ? millis : 1;
484}
485
486static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
487 gpr_timespec now, gpr_timespec deadline) {
488 struct epoll_event events[MAX_EPOLL_EVENTS];
489 static const char *err_desc = "pollset_poll";
490
491 int timeout = poll_deadline_to_millis_timeout(deadline, now);
492
493 if (timeout != 0) {
494 GRPC_SCHEDULING_START_BLOCKING_REGION;
495 }
496 int r;
497 do {
498 r = epoll_wait(g_epfd, events, MAX_EPOLL_EVENTS, timeout);
499 } while (r < 0 && errno == EINTR);
500 if (timeout != 0) {
501 GRPC_SCHEDULING_END_BLOCKING_REGION;
502 }
503
504 if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
505
506 grpc_error *error = GRPC_ERROR_NONE;
507 for (int i = 0; i < r; i++) {
508 void *data_ptr = events[i].data.ptr;
509 if (data_ptr == &global_wakeup_fd) {
Craig Tiller67e229e2017-05-01 20:57:59 +0000510 gpr_mu_lock(&g_wq_mu);
511 grpc_closure_list_move(&g_wq_items, &exec_ctx->closure_list);
512 gpr_mu_unlock(&g_wq_mu);
Craig Tiller4509c472017-04-27 19:05:13 +0000513 append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
514 err_desc);
515 } else {
516 grpc_fd *fd = (grpc_fd *)(data_ptr);
517 bool cancel = (events[i].events & (EPOLLERR | EPOLLHUP)) != 0;
518 bool read_ev = (events[i].events & (EPOLLIN | EPOLLPRI)) != 0;
519 bool write_ev = (events[i].events & EPOLLOUT) != 0;
520 if (read_ev || cancel) {
521 fd_become_readable(exec_ctx, fd, pollset);
522 }
523 if (write_ev || cancel) {
524 fd_become_writable(exec_ctx, fd);
525 }
526 }
527 }
528
529 return error;
530}
531
532static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
533 grpc_pollset_worker **worker_hdl, gpr_timespec *now,
534 gpr_timespec deadline) {
Craig Tiller4509c472017-04-27 19:05:13 +0000535 if (worker_hdl != NULL) *worker_hdl = worker;
536 worker->initialized_cv = false;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700537 worker->kick_state = UNKICKED;
Craig Tiller50da5ec2017-05-01 13:51:14 -0700538 worker->schedule_on_end_work = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
Craig Tillerba550da2017-05-01 14:26:31 +0000539 pollset->begin_refs++;
Craig Tiller4509c472017-04-27 19:05:13 +0000540
Craig Tiller32f90ee2017-04-28 12:46:41 -0700541 if (pollset->seen_inactive) {
542 // pollset has been observed to be inactive, we need to move back to the
543 // active list
Craig Tillere00d7332017-05-01 15:43:51 +0000544 bool is_reassigning = false;
545 if (!pollset->reassigning_neighbourhood) {
546 is_reassigning = true;
547 pollset->reassigning_neighbourhood = true;
548 pollset->neighbourhood = &g_neighbourhoods[choose_neighbourhood()];
549 }
550 pollset_neighbourhood *neighbourhood = pollset->neighbourhood;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700551 gpr_mu_unlock(&pollset->mu);
Craig Tillerba550da2017-05-01 14:26:31 +0000552 // pollset unlocked: state may change (even worker->kick_state)
553 retry_lock_neighbourhood:
Craig Tiller32f90ee2017-04-28 12:46:41 -0700554 gpr_mu_lock(&neighbourhood->mu);
555 gpr_mu_lock(&pollset->mu);
556 if (pollset->seen_inactive) {
Craig Tiller2acab6e2017-04-30 23:06:33 +0000557 if (neighbourhood != pollset->neighbourhood) {
558 gpr_mu_unlock(&neighbourhood->mu);
559 neighbourhood = pollset->neighbourhood;
560 gpr_mu_unlock(&pollset->mu);
561 goto retry_lock_neighbourhood;
562 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700563 pollset->seen_inactive = false;
Craig Tiller2acab6e2017-04-30 23:06:33 +0000564 if (neighbourhood->active_root == NULL) {
565 neighbourhood->active_root = pollset->next = pollset->prev = pollset;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700566 if (gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)worker)) {
Craig Tiller43bf2592017-04-28 23:21:01 +0000567 worker->kick_state = DESIGNATED_POLLER;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700568 }
Craig Tiller2acab6e2017-04-30 23:06:33 +0000569 } else {
570 pollset->next = neighbourhood->active_root;
571 pollset->prev = pollset->next->prev;
572 pollset->next->prev = pollset->prev->next = pollset;
Craig Tiller4509c472017-04-27 19:05:13 +0000573 }
574 }
Craig Tillere00d7332017-05-01 15:43:51 +0000575 if (is_reassigning) {
576 GPR_ASSERT(pollset->reassigning_neighbourhood);
577 pollset->reassigning_neighbourhood = false;
578 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700579 gpr_mu_unlock(&neighbourhood->mu);
580 }
581 worker_insert(pollset, worker);
Craig Tillerba550da2017-05-01 14:26:31 +0000582 pollset->begin_refs--;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700583 if (worker->kick_state == UNKICKED) {
Craig Tillera4b8eb02017-04-29 00:13:52 +0000584 GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700585 worker->initialized_cv = true;
586 gpr_cv_init(&worker->cv);
Craig Tillerba550da2017-05-01 14:26:31 +0000587 while (worker->kick_state == UNKICKED &&
588 pollset->shutdown_closure == NULL) {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700589 if (gpr_cv_wait(&worker->cv, &pollset->mu, deadline) &&
590 worker->kick_state == UNKICKED) {
591 worker->kick_state = KICKED;
592 }
Craig Tillerba550da2017-05-01 14:26:31 +0000593 }
Craig Tiller4509c472017-04-27 19:05:13 +0000594 *now = gpr_now(now->clock_type);
595 }
596
Craig Tiller43bf2592017-04-28 23:21:01 +0000597 return worker->kick_state == DESIGNATED_POLLER &&
Craig Tiller32f90ee2017-04-28 12:46:41 -0700598 pollset->shutdown_closure == NULL;
Craig Tiller4509c472017-04-27 19:05:13 +0000599}
600
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700601static bool check_neighbourhood_for_available_poller(
Craig Tillera4b8eb02017-04-29 00:13:52 +0000602 pollset_neighbourhood *neighbourhood) {
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700603 bool found_worker = false;
604 do {
605 grpc_pollset *inspect = neighbourhood->active_root;
606 if (inspect == NULL) {
607 break;
608 }
609 gpr_mu_lock(&inspect->mu);
610 GPR_ASSERT(!inspect->seen_inactive);
611 grpc_pollset_worker *inspect_worker = inspect->root_worker;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700612 if (inspect_worker != NULL) {
Craig Tillera4b8eb02017-04-29 00:13:52 +0000613 do {
Craig Tillerba550da2017-05-01 14:26:31 +0000614 switch (inspect_worker->kick_state) {
615 case UNKICKED:
616 if (gpr_atm_no_barrier_cas(&g_active_poller, 0,
617 (gpr_atm)inspect_worker)) {
618 inspect_worker->kick_state = DESIGNATED_POLLER;
619 if (inspect_worker->initialized_cv) {
620 gpr_cv_signal(&inspect_worker->cv);
621 }
Craig Tillera4b8eb02017-04-29 00:13:52 +0000622 }
Craig Tillerba550da2017-05-01 14:26:31 +0000623 // even if we didn't win the cas, there's a worker, we can stop
624 found_worker = true;
625 break;
626 case KICKED:
627 break;
628 case DESIGNATED_POLLER:
629 found_worker = true; // ok, so someone else found the worker, but
630 // we'll accept that
631 break;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700632 }
Craig Tillera4b8eb02017-04-29 00:13:52 +0000633 inspect_worker = inspect_worker->next;
634 } while (inspect_worker != inspect->root_worker);
635 }
636 if (!found_worker) {
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700637 inspect->seen_inactive = true;
Craig Tiller2acab6e2017-04-30 23:06:33 +0000638 if (inspect == neighbourhood->active_root) {
Craig Tillera95bacf2017-05-01 12:51:24 -0700639 neighbourhood->active_root =
640 inspect->next == inspect ? NULL : inspect->next;
Craig Tiller2acab6e2017-04-30 23:06:33 +0000641 }
642 inspect->next->prev = inspect->prev;
643 inspect->prev->next = inspect->next;
Craig Tillere00d7332017-05-01 15:43:51 +0000644 inspect->next = inspect->prev = NULL;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700645 }
646 gpr_mu_unlock(&inspect->mu);
647 } while (!found_worker);
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700648 return found_worker;
649}
650
Craig Tiller4509c472017-04-27 19:05:13 +0000651static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
652 grpc_pollset_worker *worker,
653 grpc_pollset_worker **worker_hdl) {
Craig Tiller8502ecb2017-04-28 14:22:01 -0700654 if (worker_hdl != NULL) *worker_hdl = NULL;
Craig Tillera4b8eb02017-04-29 00:13:52 +0000655 worker->kick_state = KICKED;
Craig Tiller50da5ec2017-05-01 13:51:14 -0700656 grpc_closure_list_move(&worker->schedule_on_end_work,
657 &exec_ctx->closure_list);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700658 if (gpr_atm_no_barrier_load(&g_active_poller) == (gpr_atm)worker) {
Craig Tillera4b8eb02017-04-29 00:13:52 +0000659 if (worker->next != worker && worker->next->kick_state == UNKICKED) {
Craig Tiller2acab6e2017-04-30 23:06:33 +0000660 GPR_ASSERT(worker->next->initialized_cv);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700661 gpr_atm_no_barrier_store(&g_active_poller, (gpr_atm)worker->next);
Craig Tiller43bf2592017-04-28 23:21:01 +0000662 worker->next->kick_state = DESIGNATED_POLLER;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700663 gpr_cv_signal(&worker->next->cv);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700664 if (grpc_exec_ctx_has_work(exec_ctx)) {
665 gpr_mu_unlock(&pollset->mu);
666 grpc_exec_ctx_flush(exec_ctx);
667 gpr_mu_lock(&pollset->mu);
668 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700669 } else {
670 gpr_atm_no_barrier_store(&g_active_poller, 0);
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700671 size_t poller_neighbourhood_idx =
672 (size_t)(pollset->neighbourhood - g_neighbourhoods);
Craig Tillerbb742672017-05-17 22:19:05 +0000673 gpr_mu_unlock(&pollset->mu);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700674 bool found_worker = false;
Craig Tillerba550da2017-05-01 14:26:31 +0000675 bool scan_state[MAX_NEIGHBOURHOODS];
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700676 for (size_t i = 0; !found_worker && i < g_num_neighbourhoods; i++) {
677 pollset_neighbourhood *neighbourhood =
678 &g_neighbourhoods[(poller_neighbourhood_idx + i) %
679 g_num_neighbourhoods];
680 if (gpr_mu_trylock(&neighbourhood->mu)) {
681 found_worker =
Craig Tillera4b8eb02017-04-29 00:13:52 +0000682 check_neighbourhood_for_available_poller(neighbourhood);
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700683 gpr_mu_unlock(&neighbourhood->mu);
Craig Tillerba550da2017-05-01 14:26:31 +0000684 scan_state[i] = true;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700685 } else {
Craig Tillerba550da2017-05-01 14:26:31 +0000686 scan_state[i] = false;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700687 }
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700688 }
Craig Tiller2acab6e2017-04-30 23:06:33 +0000689 for (size_t i = 0; !found_worker && i < g_num_neighbourhoods; i++) {
Craig Tillerba550da2017-05-01 14:26:31 +0000690 if (scan_state[i]) continue;
Craig Tiller2acab6e2017-04-30 23:06:33 +0000691 pollset_neighbourhood *neighbourhood =
692 &g_neighbourhoods[(poller_neighbourhood_idx + i) %
693 g_num_neighbourhoods];
694 gpr_mu_lock(&neighbourhood->mu);
Craig Tillerba550da2017-05-01 14:26:31 +0000695 found_worker = check_neighbourhood_for_available_poller(neighbourhood);
Craig Tiller2acab6e2017-04-30 23:06:33 +0000696 gpr_mu_unlock(&neighbourhood->mu);
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700697 }
Craig Tiller8502ecb2017-04-28 14:22:01 -0700698 grpc_exec_ctx_flush(exec_ctx);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700699 gpr_mu_lock(&pollset->mu);
700 }
Craig Tiller50da5ec2017-05-01 13:51:14 -0700701 } else if (grpc_exec_ctx_has_work(exec_ctx)) {
702 gpr_mu_unlock(&pollset->mu);
703 grpc_exec_ctx_flush(exec_ctx);
704 gpr_mu_lock(&pollset->mu);
Craig Tiller4509c472017-04-27 19:05:13 +0000705 }
706 if (worker->initialized_cv) {
707 gpr_cv_destroy(&worker->cv);
708 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700709 if (EMPTIED == worker_remove(pollset, worker)) {
Craig Tiller4509c472017-04-27 19:05:13 +0000710 pollset_maybe_finish_shutdown(exec_ctx, pollset);
711 }
Craig Tillera4b8eb02017-04-29 00:13:52 +0000712 GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
Craig Tiller4509c472017-04-27 19:05:13 +0000713}
714
715/* pollset->po.mu lock must be held by the caller before calling this.
716 The function pollset_work() may temporarily release the lock (pollset->po.mu)
717 during the course of its execution but it will always re-acquire the lock and
718 ensure that it is held by the time the function returns */
719static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
720 grpc_pollset_worker **worker_hdl,
721 gpr_timespec now, gpr_timespec deadline) {
722 grpc_pollset_worker worker;
723 grpc_error *error = GRPC_ERROR_NONE;
724 static const char *err_desc = "pollset_work";
725 if (pollset->kicked_without_poller) {
726 pollset->kicked_without_poller = false;
727 return GRPC_ERROR_NONE;
728 }
Craig Tiller8502ecb2017-04-28 14:22:01 -0700729 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
Craig Tiller4509c472017-04-27 19:05:13 +0000730 if (begin_worker(pollset, &worker, worker_hdl, &now, deadline)) {
Craig Tiller4509c472017-04-27 19:05:13 +0000731 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
732 GPR_ASSERT(!pollset->shutdown_closure);
Craig Tiller2acab6e2017-04-30 23:06:33 +0000733 GPR_ASSERT(!pollset->seen_inactive);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700734 gpr_mu_unlock(&pollset->mu);
Craig Tiller4509c472017-04-27 19:05:13 +0000735 append_error(&error, pollset_epoll(exec_ctx, pollset, now, deadline),
736 err_desc);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700737 gpr_mu_lock(&pollset->mu);
Craig Tiller4509c472017-04-27 19:05:13 +0000738 gpr_tls_set(&g_current_thread_worker, 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000739 }
740 end_worker(exec_ctx, pollset, &worker, worker_hdl);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700741 gpr_tls_set(&g_current_thread_pollset, 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000742 return error;
743}
744
745static grpc_error *pollset_kick(grpc_pollset *pollset,
746 grpc_pollset_worker *specific_worker) {
747 if (specific_worker == NULL) {
748 if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) {
Craig Tiller375eb252017-04-27 23:29:12 +0000749 grpc_pollset_worker *root_worker = pollset->root_worker;
750 if (root_worker == NULL) {
Craig Tiller4509c472017-04-27 19:05:13 +0000751 pollset->kicked_without_poller = true;
752 return GRPC_ERROR_NONE;
Craig Tiller375eb252017-04-27 23:29:12 +0000753 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700754 grpc_pollset_worker *next_worker = root_worker->next;
755 if (root_worker == next_worker &&
756 root_worker == (grpc_pollset_worker *)gpr_atm_no_barrier_load(
757 &g_active_poller)) {
758 root_worker->kick_state = KICKED;
Craig Tiller4509c472017-04-27 19:05:13 +0000759 return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700760 } else if (next_worker->kick_state == UNKICKED) {
761 GPR_ASSERT(next_worker->initialized_cv);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700762 next_worker->kick_state = KICKED;
Craig Tiller375eb252017-04-27 23:29:12 +0000763 gpr_cv_signal(&next_worker->cv);
764 return GRPC_ERROR_NONE;
Craig Tiller8502ecb2017-04-28 14:22:01 -0700765 } else {
766 return GRPC_ERROR_NONE;
Craig Tiller4509c472017-04-27 19:05:13 +0000767 }
768 } else {
769 return GRPC_ERROR_NONE;
770 }
Craig Tiller43bf2592017-04-28 23:21:01 +0000771 } else if (specific_worker->kick_state == KICKED) {
Craig Tiller4509c472017-04-27 19:05:13 +0000772 return GRPC_ERROR_NONE;
773 } else if (gpr_tls_get(&g_current_thread_worker) ==
774 (intptr_t)specific_worker) {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700775 specific_worker->kick_state = KICKED;
Craig Tiller4509c472017-04-27 19:05:13 +0000776 return GRPC_ERROR_NONE;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700777 } else if (specific_worker ==
778 (grpc_pollset_worker *)gpr_atm_no_barrier_load(&g_active_poller)) {
779 specific_worker->kick_state = KICKED;
Craig Tiller4509c472017-04-27 19:05:13 +0000780 return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700781 } else if (specific_worker->initialized_cv) {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700782 specific_worker->kick_state = KICKED;
Craig Tiller4509c472017-04-27 19:05:13 +0000783 gpr_cv_signal(&specific_worker->cv);
784 return GRPC_ERROR_NONE;
Craig Tiller8502ecb2017-04-28 14:22:01 -0700785 } else {
786 specific_worker->kick_state = KICKED;
787 return GRPC_ERROR_NONE;
Craig Tiller4509c472017-04-27 19:05:13 +0000788 }
789}
790
791static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
792 grpc_fd *fd) {}
793
Craig Tiller4509c472017-04-27 19:05:13 +0000794/*******************************************************************************
795 * Workqueue Definitions
796 */
797
798#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
799static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
800 const char *file, int line,
801 const char *reason) {
802 return workqueue;
803}
804
805static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
806 const char *file, int line, const char *reason) {}
807#else
808static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
809 return workqueue;
810}
811
812static void workqueue_unref(grpc_exec_ctx *exec_ctx,
813 grpc_workqueue *workqueue) {}
814#endif
815
Craig Tiller50da5ec2017-05-01 13:51:14 -0700816static void wq_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
817 grpc_error *error) {
818 // find a neighbourhood to wakeup
819 bool scheduled = false;
820 size_t initial_neighbourhood = choose_neighbourhood();
821 for (size_t i = 0; !scheduled && i < g_num_neighbourhoods; i++) {
822 pollset_neighbourhood *neighbourhood =
823 &g_neighbourhoods[(initial_neighbourhood + i) % g_num_neighbourhoods];
824 if (gpr_mu_trylock(&neighbourhood->mu)) {
825 if (neighbourhood->active_root != NULL) {
826 grpc_pollset *inspect = neighbourhood->active_root;
827 do {
828 if (gpr_mu_trylock(&inspect->mu)) {
829 if (inspect->root_worker != NULL) {
830 grpc_pollset_worker *inspect_worker = inspect->root_worker;
831 do {
832 if (inspect_worker->kick_state == UNKICKED) {
833 inspect_worker->kick_state = KICKED;
834 grpc_closure_list_append(
835 &inspect_worker->schedule_on_end_work, closure, error);
836 if (inspect_worker->initialized_cv) {
837 gpr_cv_signal(&inspect_worker->cv);
838 }
839 scheduled = true;
840 }
841 inspect_worker = inspect_worker->next;
842 } while (!scheduled && inspect_worker != inspect->root_worker);
843 }
844 gpr_mu_unlock(&inspect->mu);
845 }
846 inspect = inspect->next;
847 } while (!scheduled && inspect != neighbourhood->active_root);
848 }
849 gpr_mu_unlock(&neighbourhood->mu);
850 }
851 }
852 if (!scheduled) {
Craig Tiller67e229e2017-05-01 20:57:59 +0000853 gpr_mu_lock(&g_wq_mu);
854 grpc_closure_list_append(&g_wq_items, closure, error);
855 gpr_mu_unlock(&g_wq_mu);
Craig Tiller50da5ec2017-05-01 13:51:14 -0700856 GRPC_LOG_IF_ERROR("workqueue_scheduler",
857 grpc_wakeup_fd_wakeup(&global_wakeup_fd));
858 }
859}
860
861static const grpc_closure_scheduler_vtable
862 singleton_workqueue_scheduler_vtable = {wq_sched, wq_sched,
863 "epoll1_workqueue"};
864
865static grpc_closure_scheduler singleton_workqueue_scheduler = {
866 &singleton_workqueue_scheduler_vtable};
867
Craig Tiller4509c472017-04-27 19:05:13 +0000868static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) {
Craig Tiller50da5ec2017-05-01 13:51:14 -0700869 return &singleton_workqueue_scheduler;
Craig Tiller4509c472017-04-27 19:05:13 +0000870}
Craig Tillerc67cc992017-04-27 10:15:51 -0700871
872/*******************************************************************************
873 * Pollset-set Definitions
874 */
875
876static grpc_pollset_set *pollset_set_create(void) {
877 return (grpc_pollset_set *)((intptr_t)0xdeafbeef);
878}
879
880static void pollset_set_destroy(grpc_exec_ctx *exec_ctx,
881 grpc_pollset_set *pss) {}
882
883static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
884 grpc_fd *fd) {}
885
886static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
887 grpc_fd *fd) {}
888
889static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
890 grpc_pollset_set *pss, grpc_pollset *ps) {}
891
892static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
893 grpc_pollset_set *pss, grpc_pollset *ps) {}
894
895static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
896 grpc_pollset_set *bag,
897 grpc_pollset_set *item) {}
898
899static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
900 grpc_pollset_set *bag,
901 grpc_pollset_set *item) {}
902
903/*******************************************************************************
904 * Event engine binding
905 */
906
907static void shutdown_engine(void) {
908 fd_global_shutdown();
909 pollset_global_shutdown();
Craig Tillerc67cc992017-04-27 10:15:51 -0700910}
911
912static const grpc_event_engine_vtable vtable = {
913 .pollset_size = sizeof(grpc_pollset),
914
915 .fd_create = fd_create,
916 .fd_wrapped_fd = fd_wrapped_fd,
917 .fd_orphan = fd_orphan,
918 .fd_shutdown = fd_shutdown,
919 .fd_is_shutdown = fd_is_shutdown,
920 .fd_notify_on_read = fd_notify_on_read,
921 .fd_notify_on_write = fd_notify_on_write,
922 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
923 .fd_get_workqueue = fd_get_workqueue,
924
925 .pollset_init = pollset_init,
926 .pollset_shutdown = pollset_shutdown,
927 .pollset_destroy = pollset_destroy,
928 .pollset_work = pollset_work,
929 .pollset_kick = pollset_kick,
930 .pollset_add_fd = pollset_add_fd,
931
932 .pollset_set_create = pollset_set_create,
933 .pollset_set_destroy = pollset_set_destroy,
934 .pollset_set_add_pollset = pollset_set_add_pollset,
935 .pollset_set_del_pollset = pollset_set_del_pollset,
936 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
937 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
938 .pollset_set_add_fd = pollset_set_add_fd,
939 .pollset_set_del_fd = pollset_set_del_fd,
940
Craig Tillerc67cc992017-04-27 10:15:51 -0700941 .workqueue_ref = workqueue_ref,
942 .workqueue_unref = workqueue_unref,
943 .workqueue_scheduler = workqueue_scheduler,
944
945 .shutdown_engine = shutdown_engine,
946};
947
948/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
949 * Create a dummy epoll_fd to make sure epoll support is available */
Craig Tiller6f0af492017-04-27 19:26:16 +0000950const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) {
Craig Tillerc67cc992017-04-27 10:15:51 -0700951 if (!grpc_has_wakeup_fd()) {
952 return NULL;
953 }
954
Craig Tiller4509c472017-04-27 19:05:13 +0000955 g_epfd = epoll_create1(EPOLL_CLOEXEC);
956 if (g_epfd < 0) {
957 gpr_log(GPR_ERROR, "epoll unavailable");
Craig Tillerc67cc992017-04-27 10:15:51 -0700958 return NULL;
959 }
960
Craig Tillerc67cc992017-04-27 10:15:51 -0700961 fd_global_init();
962
963 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
Craig Tiller4509c472017-04-27 19:05:13 +0000964 close(g_epfd);
965 fd_global_shutdown();
Craig Tillerc67cc992017-04-27 10:15:51 -0700966 return NULL;
967 }
968
969 return &vtable;
970}
971
972#else /* defined(GRPC_LINUX_EPOLL) */
973#if defined(GRPC_POSIX_SOCKET)
974#include "src/core/lib/iomgr/ev_posix.h"
975/* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
976 * NULL */
Craig Tiller9ddb3152017-04-27 21:32:56 +0000977const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) {
978 return NULL;
979}
Craig Tillerc67cc992017-04-27 10:15:51 -0700980#endif /* defined(GRPC_POSIX_SOCKET) */
981#endif /* !defined(GRPC_LINUX_EPOLL) */