blob: effa9bde8a32861fd74aa8461f5810834c1c2a9d [file] [log] [blame]
Craig Tillerc67cc992017-04-27 10:15:51 -07001/*
2 *
Craig Tillerd4838a92017-04-27 12:08:18 -07003 * Copyright 2017, Google Inc.
Craig Tillerc67cc992017-04-27 10:15:51 -07004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/lib/iomgr/port.h"
35
36/* This polling engine is only relevant on linux kernels supporting epoll() */
37#ifdef GRPC_LINUX_EPOLL
38
Craig Tiller4509c472017-04-27 19:05:13 +000039#include "src/core/lib/iomgr/ev_epoll1_linux.h"
Craig Tillerc67cc992017-04-27 10:15:51 -070040
41#include <assert.h>
42#include <errno.h>
43#include <poll.h>
44#include <pthread.h>
45#include <string.h>
46#include <sys/epoll.h>
47#include <sys/socket.h>
48#include <unistd.h>
49
50#include <grpc/support/alloc.h>
Craig Tiller6de05932017-04-28 09:17:38 -070051#include <grpc/support/cpu.h>
Craig Tillerc67cc992017-04-27 10:15:51 -070052#include <grpc/support/log.h>
53#include <grpc/support/string_util.h>
54#include <grpc/support/tls.h>
55#include <grpc/support/useful.h>
56
57#include "src/core/lib/iomgr/ev_posix.h"
58#include "src/core/lib/iomgr/iomgr_internal.h"
59#include "src/core/lib/iomgr/lockfree_event.h"
Craig Tillerc67cc992017-04-27 10:15:51 -070060#include "src/core/lib/iomgr/wakeup_fd_posix.h"
61#include "src/core/lib/iomgr/workqueue.h"
62#include "src/core/lib/profiling/timers.h"
63#include "src/core/lib/support/block_annotate.h"
Craig Tillerb89bac02017-05-26 15:20:32 +000064#include "src/core/lib/support/string.h"
Craig Tillerc67cc992017-04-27 10:15:51 -070065
Craig Tillerc67cc992017-04-27 10:15:51 -070066static grpc_wakeup_fd global_wakeup_fd;
67static int g_epfd;
68
69/*******************************************************************************
70 * Fd Declarations
71 */
72
73struct grpc_fd {
74 int fd;
75
Craig Tillerc67cc992017-04-27 10:15:51 -070076 gpr_atm read_closure;
77 gpr_atm write_closure;
78
79 struct grpc_fd *freelist_next;
Craig Tillerc67cc992017-04-27 10:15:51 -070080
81 /* The pollset that last noticed that the fd is readable. The actual type
82 * stored in this is (grpc_pollset *) */
83 gpr_atm read_notifier_pollset;
84
85 grpc_iomgr_object iomgr_object;
86};
87
88static void fd_global_init(void);
89static void fd_global_shutdown(void);
90
91/*******************************************************************************
92 * Pollset Declarations
93 */
94
Craig Tiller43bf2592017-04-28 23:21:01 +000095typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state;
Craig Tillerc67cc992017-04-27 10:15:51 -070096
97struct grpc_pollset_worker {
Craig Tiller32f90ee2017-04-28 12:46:41 -070098 kick_state kick_state;
Craig Tiller55624a32017-05-26 08:14:44 -070099 int kick_state_mutator; // which line of code last changed kick state
Craig Tillerc67cc992017-04-27 10:15:51 -0700100 bool initialized_cv;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700101 grpc_pollset_worker *next;
102 grpc_pollset_worker *prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700103 gpr_cv cv;
Craig Tiller50da5ec2017-05-01 13:51:14 -0700104 grpc_closure_list schedule_on_end_work;
Craig Tillerc67cc992017-04-27 10:15:51 -0700105};
106
Craig Tiller55624a32017-05-26 08:14:44 -0700107#define SET_KICK_STATE(worker, state) \
108 do { \
109 (worker)->kick_state = (state); \
110 (worker)->kick_state_mutator = __LINE__; \
111 } while (false)
112
Craig Tillerba550da2017-05-01 14:26:31 +0000113#define MAX_NEIGHBOURHOODS 1024
114
Craig Tiller6de05932017-04-28 09:17:38 -0700115typedef struct pollset_neighbourhood {
116 gpr_mu mu;
117 grpc_pollset *active_root;
Craig Tiller6de05932017-04-28 09:17:38 -0700118 char pad[GPR_CACHELINE_SIZE];
119} pollset_neighbourhood;
120
Craig Tillerc67cc992017-04-27 10:15:51 -0700121struct grpc_pollset {
Craig Tiller6de05932017-04-28 09:17:38 -0700122 gpr_mu mu;
123 pollset_neighbourhood *neighbourhood;
Craig Tillere00d7332017-05-01 15:43:51 +0000124 bool reassigning_neighbourhood;
Craig Tiller4509c472017-04-27 19:05:13 +0000125 grpc_pollset_worker *root_worker;
126 bool kicked_without_poller;
Craig Tiller6de05932017-04-28 09:17:38 -0700127 bool seen_inactive;
Craig Tillerc67cc992017-04-27 10:15:51 -0700128 bool shutting_down; /* Is the pollset shutting down ? */
129 bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
Craig Tiller4509c472017-04-27 19:05:13 +0000130 grpc_closure *shutdown_closure; /* Called after after shutdown is complete */
Craig Tillerba550da2017-05-01 14:26:31 +0000131 int begin_refs;
Craig Tiller6de05932017-04-28 09:17:38 -0700132
133 grpc_pollset *next;
134 grpc_pollset *prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700135};
136
137/*******************************************************************************
138 * Pollset-set Declarations
139 */
Craig Tiller6de05932017-04-28 09:17:38 -0700140
Craig Tillerc67cc992017-04-27 10:15:51 -0700141struct grpc_pollset_set {};
142
143/*******************************************************************************
144 * Common helpers
145 */
146
147static bool append_error(grpc_error **composite, grpc_error *error,
148 const char *desc) {
149 if (error == GRPC_ERROR_NONE) return true;
150 if (*composite == GRPC_ERROR_NONE) {
151 *composite = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc);
152 }
153 *composite = grpc_error_add_child(*composite, error);
154 return false;
155}
156
157/*******************************************************************************
158 * Fd Definitions
159 */
160
161/* We need to keep a freelist not because of any concerns of malloc performance
162 * but instead so that implementations with multiple threads in (for example)
163 * epoll_wait deal with the race between pollset removal and incoming poll
164 * notifications.
165 *
166 * The problem is that the poller ultimately holds a reference to this
167 * object, so it is very difficult to know when is safe to free it, at least
168 * without some expensive synchronization.
169 *
170 * If we keep the object freelisted, in the worst case losing this race just
171 * becomes a spurious read notification on a reused fd.
172 */
173
174/* The alarm system needs to be able to wakeup 'some poller' sometimes
175 * (specifically when a new alarm needs to be triggered earlier than the next
176 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
177 * case occurs. */
178
179static grpc_fd *fd_freelist = NULL;
180static gpr_mu fd_freelist_mu;
181
Craig Tillerc67cc992017-04-27 10:15:51 -0700182static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
183
184static void fd_global_shutdown(void) {
185 gpr_mu_lock(&fd_freelist_mu);
186 gpr_mu_unlock(&fd_freelist_mu);
187 while (fd_freelist != NULL) {
188 grpc_fd *fd = fd_freelist;
189 fd_freelist = fd_freelist->freelist_next;
Craig Tillerc67cc992017-04-27 10:15:51 -0700190 gpr_free(fd);
191 }
192 gpr_mu_destroy(&fd_freelist_mu);
193}
194
195static grpc_fd *fd_create(int fd, const char *name) {
196 grpc_fd *new_fd = NULL;
197
198 gpr_mu_lock(&fd_freelist_mu);
199 if (fd_freelist != NULL) {
200 new_fd = fd_freelist;
201 fd_freelist = fd_freelist->freelist_next;
202 }
203 gpr_mu_unlock(&fd_freelist_mu);
204
205 if (new_fd == NULL) {
206 new_fd = gpr_malloc(sizeof(grpc_fd));
Craig Tillerc67cc992017-04-27 10:15:51 -0700207 }
208
Craig Tillerc67cc992017-04-27 10:15:51 -0700209 new_fd->fd = fd;
Craig Tillerc67cc992017-04-27 10:15:51 -0700210 grpc_lfev_init(&new_fd->read_closure);
211 grpc_lfev_init(&new_fd->write_closure);
212 gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
213
214 new_fd->freelist_next = NULL;
Craig Tillerc67cc992017-04-27 10:15:51 -0700215
216 char *fd_name;
217 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
218 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
219#ifdef GRPC_FD_REF_COUNT_DEBUG
220 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, (void *)new_fd, fd_name);
221#endif
222 gpr_free(fd_name);
Craig Tiller9ddb3152017-04-27 21:32:56 +0000223
224 struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET),
225 .data.ptr = new_fd};
226 if (epoll_ctl(g_epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
227 gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno));
228 }
229
Craig Tillerc67cc992017-04-27 10:15:51 -0700230 return new_fd;
231}
232
Craig Tiller4509c472017-04-27 19:05:13 +0000233static int fd_wrapped_fd(grpc_fd *fd) { return fd->fd; }
Craig Tillerc67cc992017-04-27 10:15:51 -0700234
Craig Tiller9ddb3152017-04-27 21:32:56 +0000235/* Might be called multiple times */
236static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
237 if (grpc_lfev_set_shutdown(exec_ctx, &fd->read_closure,
238 GRPC_ERROR_REF(why))) {
239 shutdown(fd->fd, SHUT_RDWR);
240 grpc_lfev_set_shutdown(exec_ctx, &fd->write_closure, GRPC_ERROR_REF(why));
241 }
242 GRPC_ERROR_UNREF(why);
243}
244
Craig Tillerc67cc992017-04-27 10:15:51 -0700245static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
246 grpc_closure *on_done, int *release_fd,
247 const char *reason) {
Craig Tillerc67cc992017-04-27 10:15:51 -0700248 grpc_error *error = GRPC_ERROR_NONE;
Craig Tillerc67cc992017-04-27 10:15:51 -0700249
Craig Tiller9ddb3152017-04-27 21:32:56 +0000250 if (!grpc_lfev_is_shutdown(&fd->read_closure)) {
251 fd_shutdown(exec_ctx, fd, GRPC_ERROR_CREATE_FROM_COPIED_STRING(reason));
252 }
253
Craig Tillerc67cc992017-04-27 10:15:51 -0700254 /* If release_fd is not NULL, we should be relinquishing control of the file
255 descriptor fd->fd (but we still own the grpc_fd structure). */
256 if (release_fd != NULL) {
257 *release_fd = fd->fd;
258 } else {
259 close(fd->fd);
Craig Tillerc67cc992017-04-27 10:15:51 -0700260 }
261
Craig Tiller4509c472017-04-27 19:05:13 +0000262 grpc_closure_sched(exec_ctx, on_done, GRPC_ERROR_REF(error));
Craig Tillerc67cc992017-04-27 10:15:51 -0700263
Craig Tiller4509c472017-04-27 19:05:13 +0000264 grpc_iomgr_unregister_object(&fd->iomgr_object);
265 grpc_lfev_destroy(&fd->read_closure);
266 grpc_lfev_destroy(&fd->write_closure);
Craig Tillerc67cc992017-04-27 10:15:51 -0700267
Craig Tiller4509c472017-04-27 19:05:13 +0000268 gpr_mu_lock(&fd_freelist_mu);
269 fd->freelist_next = fd_freelist;
270 fd_freelist = fd;
271 gpr_mu_unlock(&fd_freelist_mu);
Craig Tillerc67cc992017-04-27 10:15:51 -0700272}
273
274static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
275 grpc_fd *fd) {
276 gpr_atm notifier = gpr_atm_acq_load(&fd->read_notifier_pollset);
277 return (grpc_pollset *)notifier;
278}
279
280static bool fd_is_shutdown(grpc_fd *fd) {
281 return grpc_lfev_is_shutdown(&fd->read_closure);
282}
283
Craig Tillerc67cc992017-04-27 10:15:51 -0700284static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
285 grpc_closure *closure) {
286 grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure);
287}
288
289static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
290 grpc_closure *closure) {
291 grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
292}
293
294static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
Craig Tiller50da5ec2017-05-01 13:51:14 -0700295 return (grpc_workqueue *)0xb0b51ed;
Craig Tiller4509c472017-04-27 19:05:13 +0000296}
297
298static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
299 grpc_pollset *notifier) {
300 grpc_lfev_set_ready(exec_ctx, &fd->read_closure);
301
302 /* Note, it is possible that fd_become_readable might be called twice with
303 different 'notifier's when an fd becomes readable and it is in two epoll
304 sets (This can happen briefly during polling island merges). In such cases
305 it does not really matter which notifer is set as the read_notifier_pollset
306 (They would both point to the same polling island anyway) */
307 /* Use release store to match with acquire load in fd_get_read_notifier */
308 gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
309}
310
311static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
312 grpc_lfev_set_ready(exec_ctx, &fd->write_closure);
Craig Tillerc67cc992017-04-27 10:15:51 -0700313}
314
315/*******************************************************************************
316 * Pollset Definitions
317 */
318
Craig Tiller6de05932017-04-28 09:17:38 -0700319GPR_TLS_DECL(g_current_thread_pollset);
320GPR_TLS_DECL(g_current_thread_worker);
321static gpr_atm g_active_poller;
322static pollset_neighbourhood *g_neighbourhoods;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700323static size_t g_num_neighbourhoods;
Craig Tiller67e229e2017-05-01 20:57:59 +0000324static gpr_mu g_wq_mu;
325static grpc_closure_list g_wq_items;
Craig Tiller6de05932017-04-28 09:17:38 -0700326
Craig Tillerc67cc992017-04-27 10:15:51 -0700327/* Return true if first in list */
Craig Tiller32f90ee2017-04-28 12:46:41 -0700328static bool worker_insert(grpc_pollset *pollset, grpc_pollset_worker *worker) {
329 if (pollset->root_worker == NULL) {
330 pollset->root_worker = worker;
331 worker->next = worker->prev = worker;
Craig Tillerc67cc992017-04-27 10:15:51 -0700332 return true;
333 } else {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700334 worker->next = pollset->root_worker;
335 worker->prev = worker->next->prev;
336 worker->next->prev = worker;
337 worker->prev->next = worker;
Craig Tillerc67cc992017-04-27 10:15:51 -0700338 return false;
339 }
340}
341
342/* Return true if last in list */
343typedef enum { EMPTIED, NEW_ROOT, REMOVED } worker_remove_result;
344
Craig Tiller32f90ee2017-04-28 12:46:41 -0700345static worker_remove_result worker_remove(grpc_pollset *pollset,
Craig Tillerc67cc992017-04-27 10:15:51 -0700346 grpc_pollset_worker *worker) {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700347 if (worker == pollset->root_worker) {
348 if (worker == worker->next) {
349 pollset->root_worker = NULL;
Craig Tillerc67cc992017-04-27 10:15:51 -0700350 return EMPTIED;
351 } else {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700352 pollset->root_worker = worker->next;
353 worker->prev->next = worker->next;
354 worker->next->prev = worker->prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700355 return NEW_ROOT;
356 }
357 } else {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700358 worker->prev->next = worker->next;
359 worker->next->prev = worker->prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700360 return REMOVED;
361 }
362}
363
Craig Tillerba550da2017-05-01 14:26:31 +0000364static size_t choose_neighbourhood(void) {
365 return (size_t)gpr_cpu_current_cpu() % g_num_neighbourhoods;
366}
367
Craig Tiller4509c472017-04-27 19:05:13 +0000368static grpc_error *pollset_global_init(void) {
Craig Tiller4509c472017-04-27 19:05:13 +0000369 gpr_tls_init(&g_current_thread_pollset);
370 gpr_tls_init(&g_current_thread_worker);
Craig Tiller6de05932017-04-28 09:17:38 -0700371 gpr_atm_no_barrier_store(&g_active_poller, 0);
Craig Tiller375eb252017-04-27 23:29:12 +0000372 global_wakeup_fd.read_fd = -1;
373 grpc_error *err = grpc_wakeup_fd_init(&global_wakeup_fd);
Craig Tiller67e229e2017-05-01 20:57:59 +0000374 gpr_mu_init(&g_wq_mu);
375 g_wq_items = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
Craig Tiller375eb252017-04-27 23:29:12 +0000376 if (err != GRPC_ERROR_NONE) return err;
Craig Tiller4509c472017-04-27 19:05:13 +0000377 struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLET),
378 .data.ptr = &global_wakeup_fd};
379 if (epoll_ctl(g_epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd, &ev) != 0) {
380 return GRPC_OS_ERROR(errno, "epoll_ctl");
381 }
Craig Tillerba550da2017-05-01 14:26:31 +0000382 g_num_neighbourhoods = GPR_CLAMP(gpr_cpu_num_cores(), 1, MAX_NEIGHBOURHOODS);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700383 g_neighbourhoods =
384 gpr_zalloc(sizeof(*g_neighbourhoods) * g_num_neighbourhoods);
385 for (size_t i = 0; i < g_num_neighbourhoods; i++) {
386 gpr_mu_init(&g_neighbourhoods[i].mu);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700387 }
Craig Tiller4509c472017-04-27 19:05:13 +0000388 return GRPC_ERROR_NONE;
389}
390
391static void pollset_global_shutdown(void) {
Craig Tiller4509c472017-04-27 19:05:13 +0000392 gpr_tls_destroy(&g_current_thread_pollset);
393 gpr_tls_destroy(&g_current_thread_worker);
Craig Tiller67e229e2017-05-01 20:57:59 +0000394 gpr_mu_destroy(&g_wq_mu);
Craig Tiller375eb252017-04-27 23:29:12 +0000395 if (global_wakeup_fd.read_fd != -1) grpc_wakeup_fd_destroy(&global_wakeup_fd);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700396 for (size_t i = 0; i < g_num_neighbourhoods; i++) {
397 gpr_mu_destroy(&g_neighbourhoods[i].mu);
398 }
399 gpr_free(g_neighbourhoods);
Craig Tiller4509c472017-04-27 19:05:13 +0000400}
401
402static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
Craig Tiller6de05932017-04-28 09:17:38 -0700403 gpr_mu_init(&pollset->mu);
404 *mu = &pollset->mu;
Craig Tillerba550da2017-05-01 14:26:31 +0000405 pollset->neighbourhood = &g_neighbourhoods[choose_neighbourhood()];
Craig Tiller6de05932017-04-28 09:17:38 -0700406 pollset->seen_inactive = true;
Craig Tiller6de05932017-04-28 09:17:38 -0700407}
408
Craig Tillerc6109852017-05-01 14:26:49 -0700409static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
Craig Tillere00d7332017-05-01 15:43:51 +0000410 gpr_mu_lock(&pollset->mu);
Craig Tillerba550da2017-05-01 14:26:31 +0000411 if (!pollset->seen_inactive) {
Craig Tillere00d7332017-05-01 15:43:51 +0000412 pollset_neighbourhood *neighbourhood = pollset->neighbourhood;
413 gpr_mu_unlock(&pollset->mu);
Craig Tillera95bacf2017-05-01 12:51:24 -0700414 retry_lock_neighbourhood:
Craig Tillere00d7332017-05-01 15:43:51 +0000415 gpr_mu_lock(&neighbourhood->mu);
416 gpr_mu_lock(&pollset->mu);
417 if (!pollset->seen_inactive) {
418 if (pollset->neighbourhood != neighbourhood) {
419 gpr_mu_unlock(&neighbourhood->mu);
420 neighbourhood = pollset->neighbourhood;
421 gpr_mu_unlock(&pollset->mu);
422 goto retry_lock_neighbourhood;
423 }
424 pollset->prev->next = pollset->next;
425 pollset->next->prev = pollset->prev;
426 if (pollset == pollset->neighbourhood->active_root) {
427 pollset->neighbourhood->active_root =
428 pollset->next == pollset ? NULL : pollset->next;
429 }
Craig Tillerba550da2017-05-01 14:26:31 +0000430 }
431 gpr_mu_unlock(&pollset->neighbourhood->mu);
Craig Tiller6de05932017-04-28 09:17:38 -0700432 }
Craig Tillere00d7332017-05-01 15:43:51 +0000433 gpr_mu_unlock(&pollset->mu);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700434 gpr_mu_destroy(&pollset->mu);
Craig Tiller4509c472017-04-27 19:05:13 +0000435}
436
437static grpc_error *pollset_kick_all(grpc_pollset *pollset) {
438 grpc_error *error = GRPC_ERROR_NONE;
439 if (pollset->root_worker != NULL) {
440 grpc_pollset_worker *worker = pollset->root_worker;
441 do {
Craig Tiller55624a32017-05-26 08:14:44 -0700442 switch (worker->kick_state) {
443 case KICKED:
444 break;
445 case UNKICKED:
446 SET_KICK_STATE(worker, KICKED);
447 if (worker->initialized_cv) {
448 gpr_cv_signal(&worker->cv);
449 }
450 break;
451 case DESIGNATED_POLLER:
452 SET_KICK_STATE(worker, KICKED);
453 append_error(&error, grpc_wakeup_fd_wakeup(&global_wakeup_fd),
454 "pollset_shutdown");
455 break;
Craig Tiller4509c472017-04-27 19:05:13 +0000456 }
457
Craig Tiller32f90ee2017-04-28 12:46:41 -0700458 worker = worker->next;
Craig Tiller4509c472017-04-27 19:05:13 +0000459 } while (worker != pollset->root_worker);
460 }
461 return error;
462}
463
464static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx,
465 grpc_pollset *pollset) {
Craig Tillerba550da2017-05-01 14:26:31 +0000466 if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL &&
467 pollset->begin_refs == 0) {
Craig Tiller4509c472017-04-27 19:05:13 +0000468 grpc_closure_sched(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE);
469 pollset->shutdown_closure = NULL;
470 }
471}
472
473static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
474 grpc_closure *closure) {
475 GPR_ASSERT(pollset->shutdown_closure == NULL);
476 pollset->shutdown_closure = closure;
477 GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(pollset));
478 pollset_maybe_finish_shutdown(exec_ctx, pollset);
479}
480
Craig Tillera95bacf2017-05-01 12:51:24 -0700481#define MAX_EPOLL_EVENTS 100
Craig Tiller4509c472017-04-27 19:05:13 +0000482
483static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
484 gpr_timespec now) {
485 gpr_timespec timeout;
486 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
487 return -1;
488 }
489
490 if (gpr_time_cmp(deadline, now) <= 0) {
491 return 0;
492 }
493
494 static const gpr_timespec round_up = {
495 .clock_type = GPR_TIMESPAN, .tv_sec = 0, .tv_nsec = GPR_NS_PER_MS - 1};
496 timeout = gpr_time_sub(deadline, now);
497 int millis = gpr_time_to_millis(gpr_time_add(timeout, round_up));
498 return millis >= 1 ? millis : 1;
499}
500
501static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
502 gpr_timespec now, gpr_timespec deadline) {
503 struct epoll_event events[MAX_EPOLL_EVENTS];
504 static const char *err_desc = "pollset_poll";
505
506 int timeout = poll_deadline_to_millis_timeout(deadline, now);
507
508 if (timeout != 0) {
509 GRPC_SCHEDULING_START_BLOCKING_REGION;
510 }
511 int r;
512 do {
513 r = epoll_wait(g_epfd, events, MAX_EPOLL_EVENTS, timeout);
514 } while (r < 0 && errno == EINTR);
515 if (timeout != 0) {
516 GRPC_SCHEDULING_END_BLOCKING_REGION;
517 }
518
519 if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
520
521 grpc_error *error = GRPC_ERROR_NONE;
522 for (int i = 0; i < r; i++) {
523 void *data_ptr = events[i].data.ptr;
524 if (data_ptr == &global_wakeup_fd) {
Craig Tiller67e229e2017-05-01 20:57:59 +0000525 gpr_mu_lock(&g_wq_mu);
526 grpc_closure_list_move(&g_wq_items, &exec_ctx->closure_list);
527 gpr_mu_unlock(&g_wq_mu);
Craig Tiller4509c472017-04-27 19:05:13 +0000528 append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
529 err_desc);
530 } else {
531 grpc_fd *fd = (grpc_fd *)(data_ptr);
532 bool cancel = (events[i].events & (EPOLLERR | EPOLLHUP)) != 0;
533 bool read_ev = (events[i].events & (EPOLLIN | EPOLLPRI)) != 0;
534 bool write_ev = (events[i].events & EPOLLOUT) != 0;
535 if (read_ev || cancel) {
536 fd_become_readable(exec_ctx, fd, pollset);
537 }
538 if (write_ev || cancel) {
539 fd_become_writable(exec_ctx, fd);
540 }
541 }
542 }
543
544 return error;
545}
546
547static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
548 grpc_pollset_worker **worker_hdl, gpr_timespec *now,
549 gpr_timespec deadline) {
Craig Tiller4509c472017-04-27 19:05:13 +0000550 if (worker_hdl != NULL) *worker_hdl = worker;
551 worker->initialized_cv = false;
Craig Tiller55624a32017-05-26 08:14:44 -0700552 SET_KICK_STATE(worker, UNKICKED);
Craig Tiller50da5ec2017-05-01 13:51:14 -0700553 worker->schedule_on_end_work = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
Craig Tillerba550da2017-05-01 14:26:31 +0000554 pollset->begin_refs++;
Craig Tiller4509c472017-04-27 19:05:13 +0000555
Craig Tiller32f90ee2017-04-28 12:46:41 -0700556 if (pollset->seen_inactive) {
557 // pollset has been observed to be inactive, we need to move back to the
558 // active list
Craig Tillere00d7332017-05-01 15:43:51 +0000559 bool is_reassigning = false;
560 if (!pollset->reassigning_neighbourhood) {
561 is_reassigning = true;
562 pollset->reassigning_neighbourhood = true;
563 pollset->neighbourhood = &g_neighbourhoods[choose_neighbourhood()];
564 }
565 pollset_neighbourhood *neighbourhood = pollset->neighbourhood;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700566 gpr_mu_unlock(&pollset->mu);
Craig Tillerba550da2017-05-01 14:26:31 +0000567 // pollset unlocked: state may change (even worker->kick_state)
568 retry_lock_neighbourhood:
Craig Tiller32f90ee2017-04-28 12:46:41 -0700569 gpr_mu_lock(&neighbourhood->mu);
570 gpr_mu_lock(&pollset->mu);
571 if (pollset->seen_inactive) {
Craig Tiller2acab6e2017-04-30 23:06:33 +0000572 if (neighbourhood != pollset->neighbourhood) {
573 gpr_mu_unlock(&neighbourhood->mu);
574 neighbourhood = pollset->neighbourhood;
575 gpr_mu_unlock(&pollset->mu);
576 goto retry_lock_neighbourhood;
577 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700578 pollset->seen_inactive = false;
Craig Tiller2acab6e2017-04-30 23:06:33 +0000579 if (neighbourhood->active_root == NULL) {
580 neighbourhood->active_root = pollset->next = pollset->prev = pollset;
Craig Tiller55624a32017-05-26 08:14:44 -0700581 if (worker->kick_state == UNKICKED &&
582 gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)worker)) {
583 SET_KICK_STATE(worker, DESIGNATED_POLLER);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700584 }
Craig Tiller2acab6e2017-04-30 23:06:33 +0000585 } else {
586 pollset->next = neighbourhood->active_root;
587 pollset->prev = pollset->next->prev;
588 pollset->next->prev = pollset->prev->next = pollset;
Craig Tiller4509c472017-04-27 19:05:13 +0000589 }
590 }
Craig Tillere00d7332017-05-01 15:43:51 +0000591 if (is_reassigning) {
592 GPR_ASSERT(pollset->reassigning_neighbourhood);
593 pollset->reassigning_neighbourhood = false;
594 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700595 gpr_mu_unlock(&neighbourhood->mu);
596 }
597 worker_insert(pollset, worker);
Craig Tillerba550da2017-05-01 14:26:31 +0000598 pollset->begin_refs--;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700599 if (worker->kick_state == UNKICKED) {
Craig Tillera4b8eb02017-04-29 00:13:52 +0000600 GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700601 worker->initialized_cv = true;
602 gpr_cv_init(&worker->cv);
Craig Tillerba550da2017-05-01 14:26:31 +0000603 while (worker->kick_state == UNKICKED &&
604 pollset->shutdown_closure == NULL) {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700605 if (gpr_cv_wait(&worker->cv, &pollset->mu, deadline) &&
606 worker->kick_state == UNKICKED) {
Craig Tiller55624a32017-05-26 08:14:44 -0700607 SET_KICK_STATE(worker, KICKED);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700608 }
Craig Tillerba550da2017-05-01 14:26:31 +0000609 }
Craig Tiller4509c472017-04-27 19:05:13 +0000610 *now = gpr_now(now->clock_type);
611 }
612
Craig Tiller43bf2592017-04-28 23:21:01 +0000613 return worker->kick_state == DESIGNATED_POLLER &&
Craig Tiller32f90ee2017-04-28 12:46:41 -0700614 pollset->shutdown_closure == NULL;
Craig Tiller4509c472017-04-27 19:05:13 +0000615}
616
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700617static bool check_neighbourhood_for_available_poller(
Craig Tillera4b8eb02017-04-29 00:13:52 +0000618 pollset_neighbourhood *neighbourhood) {
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700619 bool found_worker = false;
620 do {
621 grpc_pollset *inspect = neighbourhood->active_root;
622 if (inspect == NULL) {
623 break;
624 }
625 gpr_mu_lock(&inspect->mu);
626 GPR_ASSERT(!inspect->seen_inactive);
627 grpc_pollset_worker *inspect_worker = inspect->root_worker;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700628 if (inspect_worker != NULL) {
Craig Tillera4b8eb02017-04-29 00:13:52 +0000629 do {
Craig Tillerba550da2017-05-01 14:26:31 +0000630 switch (inspect_worker->kick_state) {
631 case UNKICKED:
632 if (gpr_atm_no_barrier_cas(&g_active_poller, 0,
633 (gpr_atm)inspect_worker)) {
Craig Tiller55624a32017-05-26 08:14:44 -0700634 SET_KICK_STATE(inspect_worker, DESIGNATED_POLLER);
Craig Tillerba550da2017-05-01 14:26:31 +0000635 if (inspect_worker->initialized_cv) {
636 gpr_cv_signal(&inspect_worker->cv);
637 }
Craig Tillera4b8eb02017-04-29 00:13:52 +0000638 }
Craig Tillerba550da2017-05-01 14:26:31 +0000639 // even if we didn't win the cas, there's a worker, we can stop
640 found_worker = true;
641 break;
642 case KICKED:
643 break;
644 case DESIGNATED_POLLER:
645 found_worker = true; // ok, so someone else found the worker, but
646 // we'll accept that
647 break;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700648 }
Craig Tillera4b8eb02017-04-29 00:13:52 +0000649 inspect_worker = inspect_worker->next;
650 } while (inspect_worker != inspect->root_worker);
651 }
652 if (!found_worker) {
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700653 inspect->seen_inactive = true;
Craig Tiller2acab6e2017-04-30 23:06:33 +0000654 if (inspect == neighbourhood->active_root) {
Craig Tillera95bacf2017-05-01 12:51:24 -0700655 neighbourhood->active_root =
656 inspect->next == inspect ? NULL : inspect->next;
Craig Tiller2acab6e2017-04-30 23:06:33 +0000657 }
658 inspect->next->prev = inspect->prev;
659 inspect->prev->next = inspect->next;
Craig Tillere00d7332017-05-01 15:43:51 +0000660 inspect->next = inspect->prev = NULL;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700661 }
662 gpr_mu_unlock(&inspect->mu);
663 } while (!found_worker);
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700664 return found_worker;
665}
666
Craig Tiller4509c472017-04-27 19:05:13 +0000667static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
668 grpc_pollset_worker *worker,
669 grpc_pollset_worker **worker_hdl) {
Craig Tiller8502ecb2017-04-28 14:22:01 -0700670 if (worker_hdl != NULL) *worker_hdl = NULL;
Craig Tiller55624a32017-05-26 08:14:44 -0700671 SET_KICK_STATE(worker, KICKED);
Craig Tiller50da5ec2017-05-01 13:51:14 -0700672 grpc_closure_list_move(&worker->schedule_on_end_work,
673 &exec_ctx->closure_list);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700674 if (gpr_atm_no_barrier_load(&g_active_poller) == (gpr_atm)worker) {
Craig Tillera4b8eb02017-04-29 00:13:52 +0000675 if (worker->next != worker && worker->next->kick_state == UNKICKED) {
Craig Tiller2acab6e2017-04-30 23:06:33 +0000676 GPR_ASSERT(worker->next->initialized_cv);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700677 gpr_atm_no_barrier_store(&g_active_poller, (gpr_atm)worker->next);
Craig Tiller55624a32017-05-26 08:14:44 -0700678 SET_KICK_STATE(worker->next, DESIGNATED_POLLER);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700679 gpr_cv_signal(&worker->next->cv);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700680 if (grpc_exec_ctx_has_work(exec_ctx)) {
681 gpr_mu_unlock(&pollset->mu);
682 grpc_exec_ctx_flush(exec_ctx);
683 gpr_mu_lock(&pollset->mu);
684 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700685 } else {
686 gpr_atm_no_barrier_store(&g_active_poller, 0);
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700687 size_t poller_neighbourhood_idx =
688 (size_t)(pollset->neighbourhood - g_neighbourhoods);
Craig Tillerbb742672017-05-17 22:19:05 +0000689 gpr_mu_unlock(&pollset->mu);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700690 bool found_worker = false;
Craig Tillerba550da2017-05-01 14:26:31 +0000691 bool scan_state[MAX_NEIGHBOURHOODS];
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700692 for (size_t i = 0; !found_worker && i < g_num_neighbourhoods; i++) {
693 pollset_neighbourhood *neighbourhood =
694 &g_neighbourhoods[(poller_neighbourhood_idx + i) %
695 g_num_neighbourhoods];
696 if (gpr_mu_trylock(&neighbourhood->mu)) {
697 found_worker =
Craig Tillera4b8eb02017-04-29 00:13:52 +0000698 check_neighbourhood_for_available_poller(neighbourhood);
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700699 gpr_mu_unlock(&neighbourhood->mu);
Craig Tillerba550da2017-05-01 14:26:31 +0000700 scan_state[i] = true;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700701 } else {
Craig Tillerba550da2017-05-01 14:26:31 +0000702 scan_state[i] = false;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700703 }
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700704 }
Craig Tiller2acab6e2017-04-30 23:06:33 +0000705 for (size_t i = 0; !found_worker && i < g_num_neighbourhoods; i++) {
Craig Tillerba550da2017-05-01 14:26:31 +0000706 if (scan_state[i]) continue;
Craig Tiller2acab6e2017-04-30 23:06:33 +0000707 pollset_neighbourhood *neighbourhood =
708 &g_neighbourhoods[(poller_neighbourhood_idx + i) %
709 g_num_neighbourhoods];
710 gpr_mu_lock(&neighbourhood->mu);
Craig Tillerba550da2017-05-01 14:26:31 +0000711 found_worker = check_neighbourhood_for_available_poller(neighbourhood);
Craig Tiller2acab6e2017-04-30 23:06:33 +0000712 gpr_mu_unlock(&neighbourhood->mu);
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700713 }
Craig Tiller8502ecb2017-04-28 14:22:01 -0700714 grpc_exec_ctx_flush(exec_ctx);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700715 gpr_mu_lock(&pollset->mu);
716 }
Craig Tiller50da5ec2017-05-01 13:51:14 -0700717 } else if (grpc_exec_ctx_has_work(exec_ctx)) {
718 gpr_mu_unlock(&pollset->mu);
719 grpc_exec_ctx_flush(exec_ctx);
720 gpr_mu_lock(&pollset->mu);
Craig Tiller4509c472017-04-27 19:05:13 +0000721 }
722 if (worker->initialized_cv) {
723 gpr_cv_destroy(&worker->cv);
724 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700725 if (EMPTIED == worker_remove(pollset, worker)) {
Craig Tiller4509c472017-04-27 19:05:13 +0000726 pollset_maybe_finish_shutdown(exec_ctx, pollset);
727 }
Craig Tillera4b8eb02017-04-29 00:13:52 +0000728 GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
Craig Tiller4509c472017-04-27 19:05:13 +0000729}
730
731/* pollset->po.mu lock must be held by the caller before calling this.
732 The function pollset_work() may temporarily release the lock (pollset->po.mu)
733 during the course of its execution but it will always re-acquire the lock and
734 ensure that it is held by the time the function returns */
735static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
736 grpc_pollset_worker **worker_hdl,
737 gpr_timespec now, gpr_timespec deadline) {
738 grpc_pollset_worker worker;
739 grpc_error *error = GRPC_ERROR_NONE;
740 static const char *err_desc = "pollset_work";
741 if (pollset->kicked_without_poller) {
742 pollset->kicked_without_poller = false;
743 return GRPC_ERROR_NONE;
744 }
Craig Tiller8502ecb2017-04-28 14:22:01 -0700745 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
Craig Tiller4509c472017-04-27 19:05:13 +0000746 if (begin_worker(pollset, &worker, worker_hdl, &now, deadline)) {
Craig Tiller4509c472017-04-27 19:05:13 +0000747 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
748 GPR_ASSERT(!pollset->shutdown_closure);
Craig Tiller2acab6e2017-04-30 23:06:33 +0000749 GPR_ASSERT(!pollset->seen_inactive);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700750 gpr_mu_unlock(&pollset->mu);
Craig Tiller4509c472017-04-27 19:05:13 +0000751 append_error(&error, pollset_epoll(exec_ctx, pollset, now, deadline),
752 err_desc);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700753 gpr_mu_lock(&pollset->mu);
Craig Tiller4509c472017-04-27 19:05:13 +0000754 gpr_tls_set(&g_current_thread_worker, 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000755 }
756 end_worker(exec_ctx, pollset, &worker, worker_hdl);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700757 gpr_tls_set(&g_current_thread_pollset, 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000758 return error;
759}
760
761static grpc_error *pollset_kick(grpc_pollset *pollset,
762 grpc_pollset_worker *specific_worker) {
Craig Tillerb89bac02017-05-26 15:20:32 +0000763 if (GRPC_TRACER_ON(grpc_polling_trace)) {
764 gpr_strvec log;
765 gpr_strvec_init(&log);
766 char *tmp;
Craig Tiller75aef7f2017-05-26 08:26:08 -0700767 gpr_asprintf(
768 &tmp, "PS:%p KICK:%p curps=%p curworker=%p root=%p", pollset,
769 specific_worker, (void *)gpr_tls_get(&g_current_thread_pollset),
770 (void *)gpr_tls_get(&g_current_thread_worker), pollset->root_worker);
Craig Tillerb89bac02017-05-26 15:20:32 +0000771 gpr_strvec_add(&log, tmp);
772 if (pollset->root_worker != NULL) {
Craig Tiller75aef7f2017-05-26 08:26:08 -0700773 gpr_asprintf(&tmp, " {kicked=%d next=%p {kicked=%d}}",
774 pollset->root_worker->kick_state, pollset->root_worker->next,
775 pollset->root_worker->next->kick_state);
Craig Tillerb89bac02017-05-26 15:20:32 +0000776 gpr_strvec_add(&log, tmp);
777 }
778 if (specific_worker != NULL) {
779 gpr_asprintf(&tmp, " worker_kicked=%d", specific_worker->kick_state);
780 gpr_strvec_add(&log, tmp);
781 }
782 tmp = gpr_strvec_flatten(&log, NULL);
783 gpr_strvec_destroy(&log);
784 gpr_log(GPR_DEBUG, "%s", tmp);
785 gpr_free(tmp);
786 }
Craig Tiller4509c472017-04-27 19:05:13 +0000787 if (specific_worker == NULL) {
788 if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) {
Craig Tiller375eb252017-04-27 23:29:12 +0000789 grpc_pollset_worker *root_worker = pollset->root_worker;
790 if (root_worker == NULL) {
Craig Tiller4509c472017-04-27 19:05:13 +0000791 pollset->kicked_without_poller = true;
Craig Tiller75aef7f2017-05-26 08:26:08 -0700792 if (GRPC_TRACER_ON(grpc_polling_trace)) {
793 gpr_log(GPR_DEBUG, " .. kicked_without_poller");
794 }
Craig Tiller4509c472017-04-27 19:05:13 +0000795 return GRPC_ERROR_NONE;
Craig Tiller375eb252017-04-27 23:29:12 +0000796 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700797 grpc_pollset_worker *next_worker = root_worker->next;
Craig Tiller55624a32017-05-26 08:14:44 -0700798 if (root_worker == next_worker && // only try and wake up a poller if
799 // there is no next worker
Craig Tiller32f90ee2017-04-28 12:46:41 -0700800 root_worker == (grpc_pollset_worker *)gpr_atm_no_barrier_load(
801 &g_active_poller)) {
Craig Tiller75aef7f2017-05-26 08:26:08 -0700802 if (GRPC_TRACER_ON(grpc_polling_trace)) {
803 gpr_log(GPR_DEBUG, " .. kicked %p", root_worker);
804 }
Craig Tiller55624a32017-05-26 08:14:44 -0700805 SET_KICK_STATE(root_worker, KICKED);
Craig Tiller4509c472017-04-27 19:05:13 +0000806 return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700807 } else if (next_worker->kick_state == UNKICKED) {
Craig Tiller75aef7f2017-05-26 08:26:08 -0700808 if (GRPC_TRACER_ON(grpc_polling_trace)) {
809 gpr_log(GPR_DEBUG, " .. kicked %p", next_worker);
810 }
Craig Tiller8502ecb2017-04-28 14:22:01 -0700811 GPR_ASSERT(next_worker->initialized_cv);
Craig Tiller55624a32017-05-26 08:14:44 -0700812 SET_KICK_STATE(next_worker, KICKED);
Craig Tiller375eb252017-04-27 23:29:12 +0000813 gpr_cv_signal(&next_worker->cv);
814 return GRPC_ERROR_NONE;
Craig Tiller55624a32017-05-26 08:14:44 -0700815 } else if (next_worker->kick_state == DESIGNATED_POLLER) {
816 if (root_worker->kick_state != DESIGNATED_POLLER) {
Craig Tiller75aef7f2017-05-26 08:26:08 -0700817 if (GRPC_TRACER_ON(grpc_polling_trace)) {
818 gpr_log(GPR_DEBUG, " .. kicked root non-poller %p", next_worker);
819 }
Craig Tiller55624a32017-05-26 08:14:44 -0700820 SET_KICK_STATE(root_worker, KICKED);
821 if (root_worker->initialized_cv) {
822 gpr_cv_signal(&root_worker->cv);
823 }
824 return GRPC_ERROR_NONE;
825 } else {
Craig Tiller75aef7f2017-05-26 08:26:08 -0700826 if (GRPC_TRACER_ON(grpc_polling_trace)) {
827 gpr_log(GPR_DEBUG, " .. non-root poller %p (root=%p)", next_worker,
828 root_worker);
829 }
Craig Tiller55624a32017-05-26 08:14:44 -0700830 SET_KICK_STATE(next_worker, KICKED);
831 return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
832 }
Craig Tiller8502ecb2017-04-28 14:22:01 -0700833 } else {
Craig Tiller55624a32017-05-26 08:14:44 -0700834 GPR_ASSERT(next_worker->kick_state == KICKED);
835 SET_KICK_STATE(next_worker, KICKED);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700836 return GRPC_ERROR_NONE;
Craig Tiller4509c472017-04-27 19:05:13 +0000837 }
838 } else {
Craig Tiller55624a32017-05-26 08:14:44 -0700839 GPR_ASSERT(false);
Craig Tiller4509c472017-04-27 19:05:13 +0000840 return GRPC_ERROR_NONE;
841 }
Craig Tiller43bf2592017-04-28 23:21:01 +0000842 } else if (specific_worker->kick_state == KICKED) {
Craig Tiller75aef7f2017-05-26 08:26:08 -0700843 if (GRPC_TRACER_ON(grpc_polling_trace)) {
844 gpr_log(GPR_DEBUG, " .. specific worker already kicked");
845 }
Craig Tiller4509c472017-04-27 19:05:13 +0000846 return GRPC_ERROR_NONE;
847 } else if (gpr_tls_get(&g_current_thread_worker) ==
848 (intptr_t)specific_worker) {
Craig Tiller75aef7f2017-05-26 08:26:08 -0700849 if (GRPC_TRACER_ON(grpc_polling_trace)) {
850 gpr_log(GPR_DEBUG, " .. mark %p kicked", specific_worker);
851 }
Craig Tiller55624a32017-05-26 08:14:44 -0700852 SET_KICK_STATE(specific_worker, KICKED);
Craig Tiller4509c472017-04-27 19:05:13 +0000853 return GRPC_ERROR_NONE;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700854 } else if (specific_worker ==
855 (grpc_pollset_worker *)gpr_atm_no_barrier_load(&g_active_poller)) {
Craig Tiller75aef7f2017-05-26 08:26:08 -0700856 if (GRPC_TRACER_ON(grpc_polling_trace)) {
857 gpr_log(GPR_DEBUG, " .. kick active poller");
858 }
Craig Tiller55624a32017-05-26 08:14:44 -0700859 SET_KICK_STATE(specific_worker, KICKED);
Craig Tiller4509c472017-04-27 19:05:13 +0000860 return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700861 } else if (specific_worker->initialized_cv) {
Craig Tiller75aef7f2017-05-26 08:26:08 -0700862 if (GRPC_TRACER_ON(grpc_polling_trace)) {
863 gpr_log(GPR_DEBUG, " .. kick waiting worker");
864 }
Craig Tiller55624a32017-05-26 08:14:44 -0700865 SET_KICK_STATE(specific_worker, KICKED);
Craig Tiller4509c472017-04-27 19:05:13 +0000866 gpr_cv_signal(&specific_worker->cv);
867 return GRPC_ERROR_NONE;
Craig Tiller8502ecb2017-04-28 14:22:01 -0700868 } else {
Craig Tiller75aef7f2017-05-26 08:26:08 -0700869 if (GRPC_TRACER_ON(grpc_polling_trace)) {
870 gpr_log(GPR_DEBUG, " .. kick non-waiting worker");
871 }
Craig Tiller55624a32017-05-26 08:14:44 -0700872 SET_KICK_STATE(specific_worker, KICKED);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700873 return GRPC_ERROR_NONE;
Craig Tiller4509c472017-04-27 19:05:13 +0000874 }
875}
876
877static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
878 grpc_fd *fd) {}
879
Craig Tiller4509c472017-04-27 19:05:13 +0000880/*******************************************************************************
881 * Workqueue Definitions
882 */
883
884#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
885static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
886 const char *file, int line,
887 const char *reason) {
888 return workqueue;
889}
890
891static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
892 const char *file, int line, const char *reason) {}
893#else
894static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
895 return workqueue;
896}
897
898static void workqueue_unref(grpc_exec_ctx *exec_ctx,
899 grpc_workqueue *workqueue) {}
900#endif
901
Craig Tiller50da5ec2017-05-01 13:51:14 -0700902static void wq_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
903 grpc_error *error) {
904 // find a neighbourhood to wakeup
905 bool scheduled = false;
906 size_t initial_neighbourhood = choose_neighbourhood();
907 for (size_t i = 0; !scheduled && i < g_num_neighbourhoods; i++) {
908 pollset_neighbourhood *neighbourhood =
909 &g_neighbourhoods[(initial_neighbourhood + i) % g_num_neighbourhoods];
910 if (gpr_mu_trylock(&neighbourhood->mu)) {
911 if (neighbourhood->active_root != NULL) {
912 grpc_pollset *inspect = neighbourhood->active_root;
913 do {
914 if (gpr_mu_trylock(&inspect->mu)) {
915 if (inspect->root_worker != NULL) {
916 grpc_pollset_worker *inspect_worker = inspect->root_worker;
917 do {
918 if (inspect_worker->kick_state == UNKICKED) {
Craig Tiller55624a32017-05-26 08:14:44 -0700919 SET_KICK_STATE(inspect_worker, KICKED);
Craig Tiller50da5ec2017-05-01 13:51:14 -0700920 grpc_closure_list_append(
921 &inspect_worker->schedule_on_end_work, closure, error);
922 if (inspect_worker->initialized_cv) {
923 gpr_cv_signal(&inspect_worker->cv);
924 }
925 scheduled = true;
926 }
927 inspect_worker = inspect_worker->next;
928 } while (!scheduled && inspect_worker != inspect->root_worker);
929 }
930 gpr_mu_unlock(&inspect->mu);
931 }
932 inspect = inspect->next;
933 } while (!scheduled && inspect != neighbourhood->active_root);
934 }
935 gpr_mu_unlock(&neighbourhood->mu);
936 }
937 }
938 if (!scheduled) {
Craig Tiller67e229e2017-05-01 20:57:59 +0000939 gpr_mu_lock(&g_wq_mu);
940 grpc_closure_list_append(&g_wq_items, closure, error);
941 gpr_mu_unlock(&g_wq_mu);
Craig Tiller50da5ec2017-05-01 13:51:14 -0700942 GRPC_LOG_IF_ERROR("workqueue_scheduler",
943 grpc_wakeup_fd_wakeup(&global_wakeup_fd));
944 }
945}
946
947static const grpc_closure_scheduler_vtable
948 singleton_workqueue_scheduler_vtable = {wq_sched, wq_sched,
949 "epoll1_workqueue"};
950
951static grpc_closure_scheduler singleton_workqueue_scheduler = {
952 &singleton_workqueue_scheduler_vtable};
953
Craig Tiller4509c472017-04-27 19:05:13 +0000954static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) {
Craig Tiller50da5ec2017-05-01 13:51:14 -0700955 return &singleton_workqueue_scheduler;
Craig Tiller4509c472017-04-27 19:05:13 +0000956}
Craig Tillerc67cc992017-04-27 10:15:51 -0700957
958/*******************************************************************************
959 * Pollset-set Definitions
960 */
961
962static grpc_pollset_set *pollset_set_create(void) {
963 return (grpc_pollset_set *)((intptr_t)0xdeafbeef);
964}
965
966static void pollset_set_destroy(grpc_exec_ctx *exec_ctx,
967 grpc_pollset_set *pss) {}
968
969static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
970 grpc_fd *fd) {}
971
972static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
973 grpc_fd *fd) {}
974
975static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
976 grpc_pollset_set *pss, grpc_pollset *ps) {}
977
978static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
979 grpc_pollset_set *pss, grpc_pollset *ps) {}
980
981static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
982 grpc_pollset_set *bag,
983 grpc_pollset_set *item) {}
984
985static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
986 grpc_pollset_set *bag,
987 grpc_pollset_set *item) {}
988
989/*******************************************************************************
990 * Event engine binding
991 */
992
993static void shutdown_engine(void) {
994 fd_global_shutdown();
995 pollset_global_shutdown();
Craig Tillerc67cc992017-04-27 10:15:51 -0700996}
997
998static const grpc_event_engine_vtable vtable = {
999 .pollset_size = sizeof(grpc_pollset),
1000
1001 .fd_create = fd_create,
1002 .fd_wrapped_fd = fd_wrapped_fd,
1003 .fd_orphan = fd_orphan,
1004 .fd_shutdown = fd_shutdown,
1005 .fd_is_shutdown = fd_is_shutdown,
1006 .fd_notify_on_read = fd_notify_on_read,
1007 .fd_notify_on_write = fd_notify_on_write,
1008 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
1009 .fd_get_workqueue = fd_get_workqueue,
1010
1011 .pollset_init = pollset_init,
1012 .pollset_shutdown = pollset_shutdown,
1013 .pollset_destroy = pollset_destroy,
1014 .pollset_work = pollset_work,
1015 .pollset_kick = pollset_kick,
1016 .pollset_add_fd = pollset_add_fd,
1017
1018 .pollset_set_create = pollset_set_create,
1019 .pollset_set_destroy = pollset_set_destroy,
1020 .pollset_set_add_pollset = pollset_set_add_pollset,
1021 .pollset_set_del_pollset = pollset_set_del_pollset,
1022 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
1023 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
1024 .pollset_set_add_fd = pollset_set_add_fd,
1025 .pollset_set_del_fd = pollset_set_del_fd,
1026
Craig Tillerc67cc992017-04-27 10:15:51 -07001027 .workqueue_ref = workqueue_ref,
1028 .workqueue_unref = workqueue_unref,
1029 .workqueue_scheduler = workqueue_scheduler,
1030
1031 .shutdown_engine = shutdown_engine,
1032};
1033
1034/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
1035 * Create a dummy epoll_fd to make sure epoll support is available */
Craig Tiller6f0af492017-04-27 19:26:16 +00001036const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) {
Craig Tillerc67cc992017-04-27 10:15:51 -07001037 if (!grpc_has_wakeup_fd()) {
1038 return NULL;
1039 }
1040
Craig Tiller4509c472017-04-27 19:05:13 +00001041 g_epfd = epoll_create1(EPOLL_CLOEXEC);
1042 if (g_epfd < 0) {
1043 gpr_log(GPR_ERROR, "epoll unavailable");
Craig Tillerc67cc992017-04-27 10:15:51 -07001044 return NULL;
1045 }
1046
Craig Tillerc67cc992017-04-27 10:15:51 -07001047 fd_global_init();
1048
1049 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
Craig Tiller4509c472017-04-27 19:05:13 +00001050 close(g_epfd);
1051 fd_global_shutdown();
Craig Tillerc67cc992017-04-27 10:15:51 -07001052 return NULL;
1053 }
1054
1055 return &vtable;
1056}
1057
1058#else /* defined(GRPC_LINUX_EPOLL) */
1059#if defined(GRPC_POSIX_SOCKET)
1060#include "src/core/lib/iomgr/ev_posix.h"
1061/* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
1062 * NULL */
Craig Tiller9ddb3152017-04-27 21:32:56 +00001063const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) {
1064 return NULL;
1065}
Craig Tillerc67cc992017-04-27 10:15:51 -07001066#endif /* defined(GRPC_POSIX_SOCKET) */
1067#endif /* !defined(GRPC_LINUX_EPOLL) */