blob: 195c3bd14419e2889ed69eeaa7fb8dd3bf4cb0d2 [file] [log] [blame]
Craig Tillerc67cc992017-04-27 10:15:51 -07001/*
2 *
Craig Tillerd4838a92017-04-27 12:08:18 -07003 * Copyright 2017, Google Inc.
Craig Tillerc67cc992017-04-27 10:15:51 -07004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/lib/iomgr/port.h"
35
36/* This polling engine is only relevant on linux kernels supporting epoll() */
37#ifdef GRPC_LINUX_EPOLL
38
Craig Tiller4509c472017-04-27 19:05:13 +000039#include "src/core/lib/iomgr/ev_epoll1_linux.h"
Craig Tillerc67cc992017-04-27 10:15:51 -070040
41#include <assert.h>
42#include <errno.h>
43#include <poll.h>
44#include <pthread.h>
45#include <string.h>
46#include <sys/epoll.h>
47#include <sys/socket.h>
48#include <unistd.h>
49
50#include <grpc/support/alloc.h>
51#include <grpc/support/log.h>
52#include <grpc/support/string_util.h>
53#include <grpc/support/tls.h>
54#include <grpc/support/useful.h>
55
56#include "src/core/lib/iomgr/ev_posix.h"
57#include "src/core/lib/iomgr/iomgr_internal.h"
58#include "src/core/lib/iomgr/lockfree_event.h"
59#include "src/core/lib/iomgr/timer.h"
60#include "src/core/lib/iomgr/wakeup_fd_posix.h"
61#include "src/core/lib/iomgr/workqueue.h"
62#include "src/core/lib/profiling/timers.h"
63#include "src/core/lib/support/block_annotate.h"
64
65/* TODO: sreek: Right now, this wakes up all pollers. In future we should make
66 * sure to wake up one polling thread (which can wake up other threads if
67 * needed) */
68static grpc_wakeup_fd global_wakeup_fd;
69static int g_epfd;
Craig Tiller375eb252017-04-27 23:29:12 +000070static bool g_timer_kick = false;
Craig Tillerc67cc992017-04-27 10:15:51 -070071
72/*******************************************************************************
73 * Fd Declarations
74 */
75
76struct grpc_fd {
77 int fd;
78
Craig Tillerc67cc992017-04-27 10:15:51 -070079 gpr_atm read_closure;
80 gpr_atm write_closure;
81
82 struct grpc_fd *freelist_next;
Craig Tillerc67cc992017-04-27 10:15:51 -070083
84 /* The pollset that last noticed that the fd is readable. The actual type
85 * stored in this is (grpc_pollset *) */
86 gpr_atm read_notifier_pollset;
87
88 grpc_iomgr_object iomgr_object;
89};
90
91static void fd_global_init(void);
92static void fd_global_shutdown(void);
93
94/*******************************************************************************
95 * Pollset Declarations
96 */
97
98typedef struct pollset_worker_link {
99 grpc_pollset_worker *next;
100 grpc_pollset_worker *prev;
101} pollset_worker_link;
102
103typedef enum {
104 PWL_POLLSET,
105 PWL_POLLABLE,
106 POLLSET_WORKER_LINK_COUNT
107} pollset_worker_links;
108
109struct grpc_pollset_worker {
110 bool kicked;
111 bool initialized_cv;
112 pollset_worker_link links[POLLSET_WORKER_LINK_COUNT];
113 gpr_cv cv;
114};
115
116struct grpc_pollset {
Craig Tiller4509c472017-04-27 19:05:13 +0000117 grpc_pollset_worker *root_worker;
118 bool kicked_without_poller;
Craig Tillerc67cc992017-04-27 10:15:51 -0700119
120 bool shutting_down; /* Is the pollset shutting down ? */
121 bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
Craig Tiller4509c472017-04-27 19:05:13 +0000122 grpc_closure *shutdown_closure; /* Called after after shutdown is complete */
Craig Tillerc67cc992017-04-27 10:15:51 -0700123};
124
125/*******************************************************************************
126 * Pollset-set Declarations
127 */
128struct grpc_pollset_set {};
129
130/*******************************************************************************
131 * Common helpers
132 */
133
134static bool append_error(grpc_error **composite, grpc_error *error,
135 const char *desc) {
136 if (error == GRPC_ERROR_NONE) return true;
137 if (*composite == GRPC_ERROR_NONE) {
138 *composite = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc);
139 }
140 *composite = grpc_error_add_child(*composite, error);
141 return false;
142}
143
144/*******************************************************************************
145 * Fd Definitions
146 */
147
148/* We need to keep a freelist not because of any concerns of malloc performance
149 * but instead so that implementations with multiple threads in (for example)
150 * epoll_wait deal with the race between pollset removal and incoming poll
151 * notifications.
152 *
153 * The problem is that the poller ultimately holds a reference to this
154 * object, so it is very difficult to know when is safe to free it, at least
155 * without some expensive synchronization.
156 *
157 * If we keep the object freelisted, in the worst case losing this race just
158 * becomes a spurious read notification on a reused fd.
159 */
160
161/* The alarm system needs to be able to wakeup 'some poller' sometimes
162 * (specifically when a new alarm needs to be triggered earlier than the next
163 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
164 * case occurs. */
165
166static grpc_fd *fd_freelist = NULL;
167static gpr_mu fd_freelist_mu;
168
Craig Tillerc67cc992017-04-27 10:15:51 -0700169static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
170
171static void fd_global_shutdown(void) {
172 gpr_mu_lock(&fd_freelist_mu);
173 gpr_mu_unlock(&fd_freelist_mu);
174 while (fd_freelist != NULL) {
175 grpc_fd *fd = fd_freelist;
176 fd_freelist = fd_freelist->freelist_next;
Craig Tillerc67cc992017-04-27 10:15:51 -0700177 gpr_free(fd);
178 }
179 gpr_mu_destroy(&fd_freelist_mu);
180}
181
182static grpc_fd *fd_create(int fd, const char *name) {
183 grpc_fd *new_fd = NULL;
184
185 gpr_mu_lock(&fd_freelist_mu);
186 if (fd_freelist != NULL) {
187 new_fd = fd_freelist;
188 fd_freelist = fd_freelist->freelist_next;
189 }
190 gpr_mu_unlock(&fd_freelist_mu);
191
192 if (new_fd == NULL) {
193 new_fd = gpr_malloc(sizeof(grpc_fd));
Craig Tillerc67cc992017-04-27 10:15:51 -0700194 }
195
Craig Tillerc67cc992017-04-27 10:15:51 -0700196 new_fd->fd = fd;
Craig Tillerc67cc992017-04-27 10:15:51 -0700197 grpc_lfev_init(&new_fd->read_closure);
198 grpc_lfev_init(&new_fd->write_closure);
199 gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
200
201 new_fd->freelist_next = NULL;
Craig Tillerc67cc992017-04-27 10:15:51 -0700202
203 char *fd_name;
204 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
205 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
206#ifdef GRPC_FD_REF_COUNT_DEBUG
207 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, (void *)new_fd, fd_name);
208#endif
209 gpr_free(fd_name);
Craig Tiller9ddb3152017-04-27 21:32:56 +0000210
211 struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET),
212 .data.ptr = new_fd};
213 if (epoll_ctl(g_epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
214 gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno));
215 }
216
Craig Tillerc67cc992017-04-27 10:15:51 -0700217 return new_fd;
218}
219
Craig Tiller4509c472017-04-27 19:05:13 +0000220static int fd_wrapped_fd(grpc_fd *fd) { return fd->fd; }
Craig Tillerc67cc992017-04-27 10:15:51 -0700221
Craig Tiller9ddb3152017-04-27 21:32:56 +0000222/* Might be called multiple times */
223static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
224 if (grpc_lfev_set_shutdown(exec_ctx, &fd->read_closure,
225 GRPC_ERROR_REF(why))) {
226 shutdown(fd->fd, SHUT_RDWR);
227 grpc_lfev_set_shutdown(exec_ctx, &fd->write_closure, GRPC_ERROR_REF(why));
228 }
229 GRPC_ERROR_UNREF(why);
230}
231
Craig Tillerc67cc992017-04-27 10:15:51 -0700232static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
233 grpc_closure *on_done, int *release_fd,
234 const char *reason) {
Craig Tillerc67cc992017-04-27 10:15:51 -0700235 grpc_error *error = GRPC_ERROR_NONE;
Craig Tillerc67cc992017-04-27 10:15:51 -0700236
Craig Tiller9ddb3152017-04-27 21:32:56 +0000237 if (!grpc_lfev_is_shutdown(&fd->read_closure)) {
238 fd_shutdown(exec_ctx, fd, GRPC_ERROR_CREATE_FROM_COPIED_STRING(reason));
239 }
240
Craig Tillerc67cc992017-04-27 10:15:51 -0700241 /* If release_fd is not NULL, we should be relinquishing control of the file
242 descriptor fd->fd (but we still own the grpc_fd structure). */
243 if (release_fd != NULL) {
244 *release_fd = fd->fd;
245 } else {
246 close(fd->fd);
Craig Tillerc67cc992017-04-27 10:15:51 -0700247 }
248
Craig Tiller4509c472017-04-27 19:05:13 +0000249 grpc_closure_sched(exec_ctx, on_done, GRPC_ERROR_REF(error));
Craig Tillerc67cc992017-04-27 10:15:51 -0700250
Craig Tiller4509c472017-04-27 19:05:13 +0000251 grpc_iomgr_unregister_object(&fd->iomgr_object);
252 grpc_lfev_destroy(&fd->read_closure);
253 grpc_lfev_destroy(&fd->write_closure);
Craig Tillerc67cc992017-04-27 10:15:51 -0700254
Craig Tiller4509c472017-04-27 19:05:13 +0000255 gpr_mu_lock(&fd_freelist_mu);
256 fd->freelist_next = fd_freelist;
257 fd_freelist = fd;
258 gpr_mu_unlock(&fd_freelist_mu);
Craig Tillerc67cc992017-04-27 10:15:51 -0700259}
260
261static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
262 grpc_fd *fd) {
263 gpr_atm notifier = gpr_atm_acq_load(&fd->read_notifier_pollset);
264 return (grpc_pollset *)notifier;
265}
266
267static bool fd_is_shutdown(grpc_fd *fd) {
268 return grpc_lfev_is_shutdown(&fd->read_closure);
269}
270
Craig Tillerc67cc992017-04-27 10:15:51 -0700271static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
272 grpc_closure *closure) {
273 grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure);
274}
275
276static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
277 grpc_closure *closure) {
278 grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
279}
280
281static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
Craig Tiller4509c472017-04-27 19:05:13 +0000282 return NULL; /* TODO(ctiller): add a global workqueue */
283}
284
285static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
286 grpc_pollset *notifier) {
287 grpc_lfev_set_ready(exec_ctx, &fd->read_closure);
288
289 /* Note, it is possible that fd_become_readable might be called twice with
290 different 'notifier's when an fd becomes readable and it is in two epoll
291 sets (This can happen briefly during polling island merges). In such cases
292 it does not really matter which notifer is set as the read_notifier_pollset
293 (They would both point to the same polling island anyway) */
294 /* Use release store to match with acquire load in fd_get_read_notifier */
295 gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
296}
297
298static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
299 grpc_lfev_set_ready(exec_ctx, &fd->write_closure);
Craig Tillerc67cc992017-04-27 10:15:51 -0700300}
301
302/*******************************************************************************
303 * Pollset Definitions
304 */
305
306/* Return true if first in list */
307static bool worker_insert(grpc_pollset_worker **root, pollset_worker_links link,
308 grpc_pollset_worker *worker) {
309 if (*root == NULL) {
310 *root = worker;
311 worker->links[link].next = worker->links[link].prev = worker;
312 return true;
313 } else {
314 worker->links[link].next = *root;
315 worker->links[link].prev = worker->links[link].next->links[link].prev;
316 worker->links[link].next->links[link].prev = worker;
317 worker->links[link].prev->links[link].next = worker;
318 return false;
319 }
320}
321
322/* Return true if last in list */
323typedef enum { EMPTIED, NEW_ROOT, REMOVED } worker_remove_result;
324
325static worker_remove_result worker_remove(grpc_pollset_worker **root,
326 pollset_worker_links link,
327 grpc_pollset_worker *worker) {
328 if (worker == *root) {
329 if (worker == worker->links[link].next) {
330 *root = NULL;
331 return EMPTIED;
332 } else {
333 *root = worker->links[link].next;
334 worker->links[link].prev->links[link].next = worker->links[link].next;
335 worker->links[link].next->links[link].prev = worker->links[link].prev;
336 return NEW_ROOT;
337 }
338 } else {
339 worker->links[link].prev->links[link].next = worker->links[link].next;
340 worker->links[link].next->links[link].prev = worker->links[link].prev;
341 return REMOVED;
342 }
343}
344
345GPR_TLS_DECL(g_current_thread_pollset);
346GPR_TLS_DECL(g_current_thread_worker);
Craig Tiller4509c472017-04-27 19:05:13 +0000347static gpr_mu g_pollset_mu;
348static grpc_pollset_worker *g_root_worker;
349
350static grpc_error *pollset_global_init(void) {
351 gpr_mu_init(&g_pollset_mu);
352 gpr_tls_init(&g_current_thread_pollset);
353 gpr_tls_init(&g_current_thread_worker);
Craig Tiller375eb252017-04-27 23:29:12 +0000354 global_wakeup_fd.read_fd = -1;
355 grpc_error *err = grpc_wakeup_fd_init(&global_wakeup_fd);
356 if (err != GRPC_ERROR_NONE) return err;
Craig Tiller4509c472017-04-27 19:05:13 +0000357 struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLET),
358 .data.ptr = &global_wakeup_fd};
359 if (epoll_ctl(g_epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd, &ev) != 0) {
360 return GRPC_OS_ERROR(errno, "epoll_ctl");
361 }
362 return GRPC_ERROR_NONE;
363}
364
365static void pollset_global_shutdown(void) {
366 gpr_mu_destroy(&g_pollset_mu);
367 gpr_tls_destroy(&g_current_thread_pollset);
368 gpr_tls_destroy(&g_current_thread_worker);
Craig Tiller375eb252017-04-27 23:29:12 +0000369 if (global_wakeup_fd.read_fd != -1) grpc_wakeup_fd_destroy(&global_wakeup_fd);
Craig Tiller4509c472017-04-27 19:05:13 +0000370}
371
372static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
373 *mu = &g_pollset_mu;
374}
375
376static grpc_error *pollset_kick_all(grpc_pollset *pollset) {
377 grpc_error *error = GRPC_ERROR_NONE;
378 if (pollset->root_worker != NULL) {
379 grpc_pollset_worker *worker = pollset->root_worker;
380 do {
381 if (worker->initialized_cv) {
382 worker->kicked = true;
383 gpr_cv_signal(&worker->cv);
384 } else {
385 append_error(&error, grpc_wakeup_fd_wakeup(&global_wakeup_fd),
386 "pollset_shutdown");
387 }
388
389 worker = worker->links[PWL_POLLSET].next;
390 } while (worker != pollset->root_worker);
391 }
392 return error;
393}
394
395static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx,
396 grpc_pollset *pollset) {
397 if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL) {
398 grpc_closure_sched(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE);
399 pollset->shutdown_closure = NULL;
400 }
401}
402
403static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
404 grpc_closure *closure) {
405 GPR_ASSERT(pollset->shutdown_closure == NULL);
406 pollset->shutdown_closure = closure;
407 GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(pollset));
408 pollset_maybe_finish_shutdown(exec_ctx, pollset);
409}
410
411static void pollset_destroy(grpc_pollset *pollset) {}
412
413#define MAX_EPOLL_EVENTS 100
414
415static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
416 gpr_timespec now) {
417 gpr_timespec timeout;
418 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
419 return -1;
420 }
421
422 if (gpr_time_cmp(deadline, now) <= 0) {
423 return 0;
424 }
425
426 static const gpr_timespec round_up = {
427 .clock_type = GPR_TIMESPAN, .tv_sec = 0, .tv_nsec = GPR_NS_PER_MS - 1};
428 timeout = gpr_time_sub(deadline, now);
429 int millis = gpr_time_to_millis(gpr_time_add(timeout, round_up));
430 return millis >= 1 ? millis : 1;
431}
432
433static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
434 gpr_timespec now, gpr_timespec deadline) {
435 struct epoll_event events[MAX_EPOLL_EVENTS];
436 static const char *err_desc = "pollset_poll";
437
438 int timeout = poll_deadline_to_millis_timeout(deadline, now);
439
440 if (timeout != 0) {
441 GRPC_SCHEDULING_START_BLOCKING_REGION;
442 }
443 int r;
444 do {
445 r = epoll_wait(g_epfd, events, MAX_EPOLL_EVENTS, timeout);
446 } while (r < 0 && errno == EINTR);
447 if (timeout != 0) {
448 GRPC_SCHEDULING_END_BLOCKING_REGION;
449 }
450
451 if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
452
453 grpc_error *error = GRPC_ERROR_NONE;
454 for (int i = 0; i < r; i++) {
455 void *data_ptr = events[i].data.ptr;
456 if (data_ptr == &global_wakeup_fd) {
Craig Tiller375eb252017-04-27 23:29:12 +0000457 if (g_timer_kick) {
458 g_timer_kick = false;
459 grpc_timer_consume_kick();
460 }
Craig Tiller4509c472017-04-27 19:05:13 +0000461 append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
462 err_desc);
463 } else {
464 grpc_fd *fd = (grpc_fd *)(data_ptr);
465 bool cancel = (events[i].events & (EPOLLERR | EPOLLHUP)) != 0;
466 bool read_ev = (events[i].events & (EPOLLIN | EPOLLPRI)) != 0;
467 bool write_ev = (events[i].events & EPOLLOUT) != 0;
468 if (read_ev || cancel) {
469 fd_become_readable(exec_ctx, fd, pollset);
470 }
471 if (write_ev || cancel) {
472 fd_become_writable(exec_ctx, fd);
473 }
474 }
475 }
476
477 return error;
478}
479
480static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
481 grpc_pollset_worker **worker_hdl, gpr_timespec *now,
482 gpr_timespec deadline) {
483 bool do_poll = true;
484 if (worker_hdl != NULL) *worker_hdl = worker;
485 worker->initialized_cv = false;
486 worker->kicked = false;
487
488 worker_insert(&pollset->root_worker, PWL_POLLSET, worker);
489 if (!worker_insert(&g_root_worker, PWL_POLLABLE, worker)) {
490 worker->initialized_cv = true;
491 gpr_cv_init(&worker->cv);
492 while (do_poll && g_root_worker != worker) {
493 if (gpr_cv_wait(&worker->cv, &g_pollset_mu, deadline)) {
494 do_poll = false;
495 } else if (worker->kicked) {
496 do_poll = false;
497 }
498 }
499 *now = gpr_now(now->clock_type);
500 }
501
502 return do_poll && pollset->shutdown_closure == NULL;
503}
504
505static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
506 grpc_pollset_worker *worker,
507 grpc_pollset_worker **worker_hdl) {
508 if (NEW_ROOT == worker_remove(&g_root_worker, PWL_POLLABLE, worker)) {
509 gpr_cv_signal(&g_root_worker->cv);
510 }
511 if (worker->initialized_cv) {
512 gpr_cv_destroy(&worker->cv);
513 }
514 if (EMPTIED == worker_remove(&pollset->root_worker, PWL_POLLSET, worker)) {
515 pollset_maybe_finish_shutdown(exec_ctx, pollset);
516 }
517}
518
519/* pollset->po.mu lock must be held by the caller before calling this.
520 The function pollset_work() may temporarily release the lock (pollset->po.mu)
521 during the course of its execution but it will always re-acquire the lock and
522 ensure that it is held by the time the function returns */
523static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
524 grpc_pollset_worker **worker_hdl,
525 gpr_timespec now, gpr_timespec deadline) {
526 grpc_pollset_worker worker;
527 grpc_error *error = GRPC_ERROR_NONE;
528 static const char *err_desc = "pollset_work";
529 if (pollset->kicked_without_poller) {
530 pollset->kicked_without_poller = false;
531 return GRPC_ERROR_NONE;
532 }
533 if (begin_worker(pollset, &worker, worker_hdl, &now, deadline)) {
534 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
535 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
536 GPR_ASSERT(!pollset->shutdown_closure);
537 gpr_mu_unlock(&g_pollset_mu);
538 append_error(&error, pollset_epoll(exec_ctx, pollset, now, deadline),
539 err_desc);
540 grpc_exec_ctx_flush(exec_ctx);
541 gpr_mu_lock(&g_pollset_mu);
542 gpr_tls_set(&g_current_thread_pollset, 0);
543 gpr_tls_set(&g_current_thread_worker, 0);
544 pollset_maybe_finish_shutdown(exec_ctx, pollset);
545 }
546 end_worker(exec_ctx, pollset, &worker, worker_hdl);
547 return error;
548}
549
550static grpc_error *pollset_kick(grpc_pollset *pollset,
551 grpc_pollset_worker *specific_worker) {
552 if (specific_worker == NULL) {
553 if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) {
Craig Tiller375eb252017-04-27 23:29:12 +0000554 grpc_pollset_worker *root_worker = pollset->root_worker;
555 if (root_worker == NULL) {
Craig Tiller4509c472017-04-27 19:05:13 +0000556 pollset->kicked_without_poller = true;
557 return GRPC_ERROR_NONE;
Craig Tiller375eb252017-04-27 23:29:12 +0000558 }
559 grpc_pollset_worker *next_worker = root_worker->links[PWL_POLLSET].next;
560 if (root_worker == next_worker && root_worker == g_root_worker) {
561 root_worker->kicked = true;
Craig Tiller4509c472017-04-27 19:05:13 +0000562 return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
Craig Tiller375eb252017-04-27 23:29:12 +0000563 } else {
564 next_worker->kicked = true;
565 gpr_cv_signal(&next_worker->cv);
566 return GRPC_ERROR_NONE;
Craig Tiller4509c472017-04-27 19:05:13 +0000567 }
568 } else {
569 return GRPC_ERROR_NONE;
570 }
571 } else if (specific_worker->kicked) {
572 return GRPC_ERROR_NONE;
573 } else if (gpr_tls_get(&g_current_thread_worker) ==
574 (intptr_t)specific_worker) {
575 specific_worker->kicked = true;
576 return GRPC_ERROR_NONE;
577 } else if (specific_worker == g_root_worker) {
578 specific_worker->kicked = true;
579 return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
580 } else {
581 specific_worker->kicked = true;
582 gpr_cv_signal(&specific_worker->cv);
583 return GRPC_ERROR_NONE;
584 }
585}
586
587static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
588 grpc_fd *fd) {}
589
590static grpc_error *kick_poller(void) {
Craig Tiller375eb252017-04-27 23:29:12 +0000591 gpr_mu_lock(&g_pollset_mu);
592 g_timer_kick = true;
593 gpr_mu_unlock(&g_pollset_mu);
Craig Tiller4509c472017-04-27 19:05:13 +0000594 return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
595}
596
597/*******************************************************************************
598 * Workqueue Definitions
599 */
600
601#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
602static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
603 const char *file, int line,
604 const char *reason) {
605 return workqueue;
606}
607
608static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
609 const char *file, int line, const char *reason) {}
610#else
611static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
612 return workqueue;
613}
614
615static void workqueue_unref(grpc_exec_ctx *exec_ctx,
616 grpc_workqueue *workqueue) {}
617#endif
618
619static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) {
620 return grpc_schedule_on_exec_ctx;
621}
Craig Tillerc67cc992017-04-27 10:15:51 -0700622
623/*******************************************************************************
624 * Pollset-set Definitions
625 */
626
627static grpc_pollset_set *pollset_set_create(void) {
628 return (grpc_pollset_set *)((intptr_t)0xdeafbeef);
629}
630
631static void pollset_set_destroy(grpc_exec_ctx *exec_ctx,
632 grpc_pollset_set *pss) {}
633
634static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
635 grpc_fd *fd) {}
636
637static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
638 grpc_fd *fd) {}
639
640static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
641 grpc_pollset_set *pss, grpc_pollset *ps) {}
642
643static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
644 grpc_pollset_set *pss, grpc_pollset *ps) {}
645
646static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
647 grpc_pollset_set *bag,
648 grpc_pollset_set *item) {}
649
650static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
651 grpc_pollset_set *bag,
652 grpc_pollset_set *item) {}
653
654/*******************************************************************************
655 * Event engine binding
656 */
657
658static void shutdown_engine(void) {
659 fd_global_shutdown();
660 pollset_global_shutdown();
Craig Tillerc67cc992017-04-27 10:15:51 -0700661}
662
663static const grpc_event_engine_vtable vtable = {
664 .pollset_size = sizeof(grpc_pollset),
665
666 .fd_create = fd_create,
667 .fd_wrapped_fd = fd_wrapped_fd,
668 .fd_orphan = fd_orphan,
669 .fd_shutdown = fd_shutdown,
670 .fd_is_shutdown = fd_is_shutdown,
671 .fd_notify_on_read = fd_notify_on_read,
672 .fd_notify_on_write = fd_notify_on_write,
673 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
674 .fd_get_workqueue = fd_get_workqueue,
675
676 .pollset_init = pollset_init,
677 .pollset_shutdown = pollset_shutdown,
678 .pollset_destroy = pollset_destroy,
679 .pollset_work = pollset_work,
680 .pollset_kick = pollset_kick,
681 .pollset_add_fd = pollset_add_fd,
682
683 .pollset_set_create = pollset_set_create,
684 .pollset_set_destroy = pollset_set_destroy,
685 .pollset_set_add_pollset = pollset_set_add_pollset,
686 .pollset_set_del_pollset = pollset_set_del_pollset,
687 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
688 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
689 .pollset_set_add_fd = pollset_set_add_fd,
690 .pollset_set_del_fd = pollset_set_del_fd,
691
692 .kick_poller = kick_poller,
693
694 .workqueue_ref = workqueue_ref,
695 .workqueue_unref = workqueue_unref,
696 .workqueue_scheduler = workqueue_scheduler,
697
698 .shutdown_engine = shutdown_engine,
699};
700
701/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
702 * Create a dummy epoll_fd to make sure epoll support is available */
Craig Tiller6f0af492017-04-27 19:26:16 +0000703const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) {
Craig Tillerc67cc992017-04-27 10:15:51 -0700704 if (!grpc_has_wakeup_fd()) {
705 return NULL;
706 }
707
Craig Tiller4509c472017-04-27 19:05:13 +0000708 g_epfd = epoll_create1(EPOLL_CLOEXEC);
709 if (g_epfd < 0) {
710 gpr_log(GPR_ERROR, "epoll unavailable");
Craig Tillerc67cc992017-04-27 10:15:51 -0700711 return NULL;
712 }
713
Craig Tillerc67cc992017-04-27 10:15:51 -0700714 fd_global_init();
715
716 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
Craig Tiller4509c472017-04-27 19:05:13 +0000717 close(g_epfd);
718 fd_global_shutdown();
Craig Tillerc67cc992017-04-27 10:15:51 -0700719 return NULL;
720 }
721
722 return &vtable;
723}
724
725#else /* defined(GRPC_LINUX_EPOLL) */
726#if defined(GRPC_POSIX_SOCKET)
727#include "src/core/lib/iomgr/ev_posix.h"
728/* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
729 * NULL */
Craig Tiller9ddb3152017-04-27 21:32:56 +0000730const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) {
731 return NULL;
732}
Craig Tillerc67cc992017-04-27 10:15:51 -0700733#endif /* defined(GRPC_POSIX_SOCKET) */
734#endif /* !defined(GRPC_LINUX_EPOLL) */