blob: e91ae4021221f1906e8221d293d9621ad8fbdfc3 [file] [log] [blame]
Craig Tiller253bd502016-02-25 12:30:23 -08001/*
2 *
3 * Copyright 2015-2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Craig Tiller253bd502016-02-25 12:30:23 -080034#include <grpc/support/port_platform.h>
35
36#ifdef GPR_POSIX_SOCKET
37
Craig Tillerd9a60bb2016-03-28 23:13:19 -070038#include "src/core/lib/iomgr/ev_poll_posix.h"
Craig Tiller253bd502016-02-25 12:30:23 -080039
40#include <assert.h>
41#include <errno.h>
42#include <poll.h>
43#include <string.h>
44#include <sys/socket.h>
45#include <unistd.h>
46
47#include <grpc/support/alloc.h>
48#include <grpc/support/log.h>
49#include <grpc/support/string_util.h>
50#include <grpc/support/tls.h>
51#include <grpc/support/useful.h>
52
Craig Tillerd9a60bb2016-03-28 23:13:19 -070053#include "src/core/lib/iomgr/iomgr_internal.h"
54#include "src/core/lib/iomgr/wakeup_fd_posix.h"
55#include "src/core/lib/profiling/timers.h"
56#include "src/core/lib/support/block_annotate.h"
Craig Tiller253bd502016-02-25 12:30:23 -080057
58/*******************************************************************************
59 * FD declarations
60 */
61
62typedef struct grpc_fd_watcher {
63 struct grpc_fd_watcher *next;
64 struct grpc_fd_watcher *prev;
65 grpc_pollset *pollset;
66 grpc_pollset_worker *worker;
67 grpc_fd *fd;
68} grpc_fd_watcher;
69
70struct grpc_fd {
71 int fd;
72 /* refst format:
73 bit0: 1=active/0=orphaned
74 bit1-n: refcount
75 meaning that mostly we ref by two to avoid altering the orphaned bit,
76 and just unref by 1 when we're ready to flag the object as orphaned */
77 gpr_atm refst;
78
79 gpr_mu mu;
80 int shutdown;
81 int closed;
82 int released;
83
84 /* The watcher list.
85
86 The following watcher related fields are protected by watcher_mu.
87
88 An fd_watcher is an ephemeral object created when an fd wants to
89 begin polling, and destroyed after the poll.
90
91 It denotes the fd's interest in whether to read poll or write poll
92 or both or neither on this fd.
93
94 If a watcher is asked to poll for reads or writes, the read_watcher
95 or write_watcher fields are set respectively. A watcher may be asked
96 to poll for both, in which case both fields will be set.
97
98 read_watcher and write_watcher may be NULL if no watcher has been
99 asked to poll for reads or writes.
100
101 If an fd_watcher is not asked to poll for reads or writes, it's added
102 to a linked list of inactive watchers, rooted at inactive_watcher_root.
103 If at a later time there becomes need of a poller to poll, one of
104 the inactive pollers may be kicked out of their poll loops to take
105 that responsibility. */
106 grpc_fd_watcher inactive_watcher_root;
107 grpc_fd_watcher *read_watcher;
108 grpc_fd_watcher *write_watcher;
109
110 grpc_closure *read_closure;
111 grpc_closure *write_closure;
112
Craig Tiller253bd502016-02-25 12:30:23 -0800113 grpc_closure *on_done_closure;
114
115 grpc_iomgr_object iomgr_object;
116};
117
118/* Begin polling on an fd.
119 Registers that the given pollset is interested in this fd - so that if read
120 or writability interest changes, the pollset can be kicked to pick up that
121 new interest.
122 Return value is:
123 (fd_needs_read? read_mask : 0) | (fd_needs_write? write_mask : 0)
124 i.e. a combination of read_mask and write_mask determined by the fd's current
125 interest in said events.
126 Polling strategies that do not need to alter their behavior depending on the
127 fd's current interest (such as epoll) do not need to call this function.
128 MUST NOT be called with a pollset lock taken */
129static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
130 grpc_pollset_worker *worker, uint32_t read_mask,
131 uint32_t write_mask, grpc_fd_watcher *rec);
132/* Complete polling previously started with fd_begin_poll
133 MUST NOT be called with a pollset lock taken
134 if got_read or got_write are 1, also does the become_{readable,writable} as
135 appropriate. */
136static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *rec,
137 int got_read, int got_write);
138
139/* Return 1 if this fd is orphaned, 0 otherwise */
140static bool fd_is_orphaned(grpc_fd *fd);
141
Craig Tiller253bd502016-02-25 12:30:23 -0800142/* Reference counting for fds */
143/*#define GRPC_FD_REF_COUNT_DEBUG*/
144#ifdef GRPC_FD_REF_COUNT_DEBUG
145static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
146static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
147 int line);
148#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
149#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
150#else
151static void fd_ref(grpc_fd *fd);
152static void fd_unref(grpc_fd *fd);
153#define GRPC_FD_REF(fd, reason) fd_ref(fd)
154#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
155#endif
156
Craig Tiller253bd502016-02-25 12:30:23 -0800157#define CLOSURE_NOT_READY ((grpc_closure *)0)
158#define CLOSURE_READY ((grpc_closure *)1)
159
160/*******************************************************************************
161 * pollset declarations
162 */
163
Craig Tiller253bd502016-02-25 12:30:23 -0800164typedef struct grpc_cached_wakeup_fd {
165 grpc_wakeup_fd fd;
166 struct grpc_cached_wakeup_fd *next;
167} grpc_cached_wakeup_fd;
168
169struct grpc_pollset_worker {
170 grpc_cached_wakeup_fd *wakeup_fd;
171 int reevaluate_polling_on_wakeup;
172 int kicked_specifically;
173 struct grpc_pollset_worker *next;
174 struct grpc_pollset_worker *prev;
175};
176
177struct grpc_pollset {
Craig Tillerac04b7f2016-02-26 08:36:44 -0800178 gpr_mu mu;
Craig Tiller253bd502016-02-25 12:30:23 -0800179 grpc_pollset_worker root_worker;
180 int in_flight_cbs;
181 int shutting_down;
182 int called_shutdown;
183 int kicked_without_pollers;
184 grpc_closure *shutdown_done;
185 grpc_closure_list idle_jobs;
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800186 /* all polled fds */
187 size_t fd_count;
188 size_t fd_capacity;
189 grpc_fd **fds;
190 /* fds that have been removed from the pollset explicitly */
191 size_t del_count;
192 size_t del_capacity;
193 grpc_fd **dels;
Craig Tiller253bd502016-02-25 12:30:23 -0800194 /* Local cache of eventfds for workers */
195 grpc_cached_wakeup_fd *local_wakeup_cache;
196};
197
Craig Tiller253bd502016-02-25 12:30:23 -0800198/* Add an fd to a pollset */
199static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
200 struct grpc_fd *fd);
201
202static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
203 grpc_pollset_set *pollset_set, grpc_fd *fd);
204
205/* Convert a timespec to milliseconds:
206 - very small or negative poll times are clamped to zero to do a
207 non-blocking poll (which becomes spin polling)
208 - other small values are rounded up to one millisecond
209 - longer than a millisecond polls are rounded up to the next nearest
210 millisecond to avoid spinning
211 - infinite timeouts are converted to -1 */
212static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
213 gpr_timespec now);
214
215/* Allow kick to wakeup the currently polling worker */
216#define GRPC_POLLSET_CAN_KICK_SELF 1
217/* Force the wakee to repoll when awoken */
218#define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2
219/* As per pollset_kick, with an extended set of flags (defined above)
220 -- mostly for fd_posix's use. */
221static void pollset_kick_ext(grpc_pollset *p,
222 grpc_pollset_worker *specific_worker,
223 uint32_t flags);
224
Craig Tiller253bd502016-02-25 12:30:23 -0800225/* Return 1 if the pollset has active threads in pollset_work (pollset must
226 * be locked) */
227static int pollset_has_workers(grpc_pollset *pollset);
228
Craig Tiller253bd502016-02-25 12:30:23 -0800229/*******************************************************************************
230 * pollset_set definitions
231 */
232
233struct grpc_pollset_set {
234 gpr_mu mu;
235
236 size_t pollset_count;
237 size_t pollset_capacity;
238 grpc_pollset **pollsets;
239
240 size_t pollset_set_count;
241 size_t pollset_set_capacity;
242 struct grpc_pollset_set **pollset_sets;
243
244 size_t fd_count;
245 size_t fd_capacity;
246 grpc_fd **fds;
247};
248
249/*******************************************************************************
250 * fd_posix.c
251 */
252
Craig Tiller253bd502016-02-25 12:30:23 -0800253#ifdef GRPC_FD_REF_COUNT_DEBUG
254#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
255#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
256static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
257 int line) {
258 gpr_log(GPR_DEBUG, "FD %d %p ref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
259 gpr_atm_no_barrier_load(&fd->refst),
260 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
261#else
262#define REF_BY(fd, n, reason) ref_by(fd, n)
263#define UNREF_BY(fd, n, reason) unref_by(fd, n)
264static void ref_by(grpc_fd *fd, int n) {
265#endif
266 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
267}
268
269#ifdef GRPC_FD_REF_COUNT_DEBUG
270static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
271 int line) {
272 gpr_atm old;
273 gpr_log(GPR_DEBUG, "FD %d %p unref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
274 gpr_atm_no_barrier_load(&fd->refst),
275 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
276#else
277static void unref_by(grpc_fd *fd, int n) {
278 gpr_atm old;
279#endif
280 old = gpr_atm_full_fetch_add(&fd->refst, -n);
281 if (old == n) {
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800282 gpr_mu_destroy(&fd->mu);
Craig Tillerb38197e2016-02-26 10:14:54 -0800283 grpc_iomgr_unregister_object(&fd->iomgr_object);
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800284 gpr_free(fd);
Craig Tiller253bd502016-02-25 12:30:23 -0800285 } else {
286 GPR_ASSERT(old > n);
287 }
288}
289
Craig Tiller253bd502016-02-25 12:30:23 -0800290static grpc_fd *fd_create(int fd, const char *name) {
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800291 grpc_fd *r = gpr_malloc(sizeof(*r));
292 gpr_mu_init(&r->mu);
293 gpr_atm_rel_store(&r->refst, 1);
294 r->shutdown = 0;
295 r->read_closure = CLOSURE_NOT_READY;
296 r->write_closure = CLOSURE_NOT_READY;
297 r->fd = fd;
298 r->inactive_watcher_root.next = r->inactive_watcher_root.prev =
299 &r->inactive_watcher_root;
300 r->read_watcher = r->write_watcher = NULL;
301 r->on_done_closure = NULL;
302 r->closed = 0;
303 r->released = 0;
304
Craig Tiller253bd502016-02-25 12:30:23 -0800305 char *name2;
306 gpr_asprintf(&name2, "%s fd=%d", name, fd);
307 grpc_iomgr_register_object(&r->iomgr_object, name2);
308 gpr_free(name2);
309#ifdef GRPC_FD_REF_COUNT_DEBUG
310 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, r, name);
311#endif
312 return r;
313}
314
315static bool fd_is_orphaned(grpc_fd *fd) {
316 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
317}
318
319static void pollset_kick_locked(grpc_fd_watcher *watcher) {
Craig Tillerac04b7f2016-02-26 08:36:44 -0800320 gpr_mu_lock(&watcher->pollset->mu);
Craig Tiller253bd502016-02-25 12:30:23 -0800321 GPR_ASSERT(watcher->worker);
322 pollset_kick_ext(watcher->pollset, watcher->worker,
323 GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP);
Craig Tillerac04b7f2016-02-26 08:36:44 -0800324 gpr_mu_unlock(&watcher->pollset->mu);
Craig Tiller253bd502016-02-25 12:30:23 -0800325}
326
327static void maybe_wake_one_watcher_locked(grpc_fd *fd) {
328 if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) {
329 pollset_kick_locked(fd->inactive_watcher_root.next);
330 } else if (fd->read_watcher) {
331 pollset_kick_locked(fd->read_watcher);
332 } else if (fd->write_watcher) {
333 pollset_kick_locked(fd->write_watcher);
334 }
335}
336
337static void wake_all_watchers_locked(grpc_fd *fd) {
338 grpc_fd_watcher *watcher;
339 for (watcher = fd->inactive_watcher_root.next;
340 watcher != &fd->inactive_watcher_root; watcher = watcher->next) {
341 pollset_kick_locked(watcher);
342 }
343 if (fd->read_watcher) {
344 pollset_kick_locked(fd->read_watcher);
345 }
346 if (fd->write_watcher && fd->write_watcher != fd->read_watcher) {
347 pollset_kick_locked(fd->write_watcher);
348 }
349}
350
351static int has_watchers(grpc_fd *fd) {
352 return fd->read_watcher != NULL || fd->write_watcher != NULL ||
353 fd->inactive_watcher_root.next != &fd->inactive_watcher_root;
354}
355
356static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
357 fd->closed = 1;
358 if (!fd->released) {
359 close(fd->fd);
Craig Tiller253bd502016-02-25 12:30:23 -0800360 }
361 grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, true, NULL);
362}
363
364static int fd_wrapped_fd(grpc_fd *fd) {
365 if (fd->released || fd->closed) {
366 return -1;
367 } else {
368 return fd->fd;
369 }
370}
371
372static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
373 grpc_closure *on_done, int *release_fd,
374 const char *reason) {
375 fd->on_done_closure = on_done;
376 fd->released = release_fd != NULL;
377 if (!fd->released) {
378 shutdown(fd->fd, SHUT_RDWR);
379 } else {
380 *release_fd = fd->fd;
381 }
382 gpr_mu_lock(&fd->mu);
383 REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
384 if (!has_watchers(fd)) {
385 close_fd_locked(exec_ctx, fd);
386 } else {
387 wake_all_watchers_locked(fd);
388 }
389 gpr_mu_unlock(&fd->mu);
390 UNREF_BY(fd, 2, reason); /* drop the reference */
391}
392
393/* increment refcount by two to avoid changing the orphan bit */
394#ifdef GRPC_FD_REF_COUNT_DEBUG
395static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
396 int line) {
397 ref_by(fd, 2, reason, file, line);
398}
399
400static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
401 int line) {
402 unref_by(fd, 2, reason, file, line);
403}
404#else
405static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
406
407static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
408#endif
409
410static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
411 grpc_closure **st, grpc_closure *closure) {
412 if (*st == CLOSURE_NOT_READY) {
413 /* not ready ==> switch to a waiting state by setting the closure */
414 *st = closure;
415 } else if (*st == CLOSURE_READY) {
416 /* already ready ==> queue the closure to run immediately */
417 *st = CLOSURE_NOT_READY;
418 grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown, NULL);
419 maybe_wake_one_watcher_locked(fd);
420 } else {
421 /* upcallptr was set to a different closure. This is an error! */
422 gpr_log(GPR_ERROR,
423 "User called a notify_on function with a previous callback still "
424 "pending");
425 abort();
426 }
427}
428
429/* returns 1 if state becomes not ready */
430static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
431 grpc_closure **st) {
432 if (*st == CLOSURE_READY) {
433 /* duplicate ready ==> ignore */
434 return 0;
435 } else if (*st == CLOSURE_NOT_READY) {
436 /* not ready, and not waiting ==> flag ready */
437 *st = CLOSURE_READY;
438 return 0;
439 } else {
440 /* waiting ==> queue closure */
441 grpc_exec_ctx_enqueue(exec_ctx, *st, !fd->shutdown, NULL);
442 *st = CLOSURE_NOT_READY;
443 return 1;
444 }
445}
446
Craig Tiller253bd502016-02-25 12:30:23 -0800447static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
448 gpr_mu_lock(&fd->mu);
449 GPR_ASSERT(!fd->shutdown);
450 fd->shutdown = 1;
451 set_ready_locked(exec_ctx, fd, &fd->read_closure);
452 set_ready_locked(exec_ctx, fd, &fd->write_closure);
453 gpr_mu_unlock(&fd->mu);
454}
455
456static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
457 grpc_closure *closure) {
458 gpr_mu_lock(&fd->mu);
459 notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
460 gpr_mu_unlock(&fd->mu);
461}
462
463static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
464 grpc_closure *closure) {
465 gpr_mu_lock(&fd->mu);
466 notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
467 gpr_mu_unlock(&fd->mu);
468}
469
470static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
471 grpc_pollset_worker *worker, uint32_t read_mask,
472 uint32_t write_mask, grpc_fd_watcher *watcher) {
473 uint32_t mask = 0;
474 grpc_closure *cur;
475 int requested;
476 /* keep track of pollers that have requested our events, in case they change
477 */
478 GRPC_FD_REF(fd, "poll");
479
480 gpr_mu_lock(&fd->mu);
481
482 /* if we are shutdown, then don't add to the watcher set */
483 if (fd->shutdown) {
484 watcher->fd = NULL;
485 watcher->pollset = NULL;
486 watcher->worker = NULL;
487 gpr_mu_unlock(&fd->mu);
488 GRPC_FD_UNREF(fd, "poll");
489 return 0;
490 }
491
492 /* if there is nobody polling for read, but we need to, then start doing so */
493 cur = fd->read_closure;
494 requested = cur != CLOSURE_READY;
495 if (read_mask && fd->read_watcher == NULL && requested) {
496 fd->read_watcher = watcher;
497 mask |= read_mask;
498 }
499 /* if there is nobody polling for write, but we need to, then start doing so
500 */
501 cur = fd->write_closure;
502 requested = cur != CLOSURE_READY;
503 if (write_mask && fd->write_watcher == NULL && requested) {
504 fd->write_watcher = watcher;
505 mask |= write_mask;
506 }
507 /* if not polling, remember this watcher in case we need someone to later */
508 if (mask == 0 && worker != NULL) {
509 watcher->next = &fd->inactive_watcher_root;
510 watcher->prev = watcher->next->prev;
511 watcher->next->prev = watcher->prev->next = watcher;
512 }
513 watcher->pollset = pollset;
514 watcher->worker = worker;
515 watcher->fd = fd;
516 gpr_mu_unlock(&fd->mu);
517
518 return mask;
519}
520
521static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
522 int got_read, int got_write) {
523 int was_polling = 0;
524 int kick = 0;
525 grpc_fd *fd = watcher->fd;
526
527 if (fd == NULL) {
528 return;
529 }
530
531 gpr_mu_lock(&fd->mu);
532
533 if (watcher == fd->read_watcher) {
534 /* remove read watcher, kick if we still need a read */
535 was_polling = 1;
536 if (!got_read) {
537 kick = 1;
538 }
539 fd->read_watcher = NULL;
540 }
541 if (watcher == fd->write_watcher) {
542 /* remove write watcher, kick if we still need a write */
543 was_polling = 1;
544 if (!got_write) {
545 kick = 1;
546 }
547 fd->write_watcher = NULL;
548 }
549 if (!was_polling && watcher->worker != NULL) {
550 /* remove from inactive list */
551 watcher->next->prev = watcher->prev;
552 watcher->prev->next = watcher->next;
553 }
554 if (got_read) {
555 if (set_ready_locked(exec_ctx, fd, &fd->read_closure)) {
556 kick = 1;
557 }
558 }
559 if (got_write) {
560 if (set_ready_locked(exec_ctx, fd, &fd->write_closure)) {
561 kick = 1;
562 }
563 }
564 if (kick) {
565 maybe_wake_one_watcher_locked(fd);
566 }
567 if (fd_is_orphaned(fd) && !has_watchers(fd) && !fd->closed) {
568 close_fd_locked(exec_ctx, fd);
569 }
570 gpr_mu_unlock(&fd->mu);
571
572 GRPC_FD_UNREF(fd, "poll");
573}
574
Craig Tiller253bd502016-02-25 12:30:23 -0800575/*******************************************************************************
576 * pollset_posix.c
577 */
578
579GPR_TLS_DECL(g_current_thread_poller);
580GPR_TLS_DECL(g_current_thread_worker);
581
Craig Tiller253bd502016-02-25 12:30:23 -0800582static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
583 worker->prev->next = worker->next;
584 worker->next->prev = worker->prev;
585}
586
587static int pollset_has_workers(grpc_pollset *p) {
588 return p->root_worker.next != &p->root_worker;
589}
590
591static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
592 if (pollset_has_workers(p)) {
593 grpc_pollset_worker *w = p->root_worker.next;
594 remove_worker(p, w);
595 return w;
596 } else {
597 return NULL;
598 }
599}
600
601static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
602 worker->next = &p->root_worker;
603 worker->prev = worker->next->prev;
604 worker->prev->next = worker->next->prev = worker;
605}
606
607static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
608 worker->prev = &p->root_worker;
609 worker->next = worker->prev->next;
610 worker->prev->next = worker->next->prev = worker;
611}
612
613static void pollset_kick_ext(grpc_pollset *p,
614 grpc_pollset_worker *specific_worker,
615 uint32_t flags) {
616 GPR_TIMER_BEGIN("pollset_kick_ext", 0);
617
618 /* pollset->mu already held */
619 if (specific_worker != NULL) {
620 if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
621 GPR_TIMER_BEGIN("pollset_kick_ext.broadcast", 0);
622 GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
623 for (specific_worker = p->root_worker.next;
624 specific_worker != &p->root_worker;
625 specific_worker = specific_worker->next) {
626 grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
627 }
628 p->kicked_without_pollers = 1;
629 GPR_TIMER_END("pollset_kick_ext.broadcast", 0);
630 } else if (gpr_tls_get(&g_current_thread_worker) !=
631 (intptr_t)specific_worker) {
632 GPR_TIMER_MARK("different_thread_worker", 0);
633 if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
634 specific_worker->reevaluate_polling_on_wakeup = 1;
635 }
636 specific_worker->kicked_specifically = 1;
637 grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
638 } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) {
639 GPR_TIMER_MARK("kick_yoself", 0);
640 if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
641 specific_worker->reevaluate_polling_on_wakeup = 1;
642 }
643 specific_worker->kicked_specifically = 1;
644 grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
645 }
646 } else if (gpr_tls_get(&g_current_thread_poller) != (intptr_t)p) {
647 GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
648 GPR_TIMER_MARK("kick_anonymous", 0);
649 specific_worker = pop_front_worker(p);
650 if (specific_worker != NULL) {
651 if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) {
652 GPR_TIMER_MARK("kick_anonymous_not_self", 0);
653 push_back_worker(p, specific_worker);
654 specific_worker = pop_front_worker(p);
655 if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 &&
656 gpr_tls_get(&g_current_thread_worker) ==
657 (intptr_t)specific_worker) {
658 push_back_worker(p, specific_worker);
659 specific_worker = NULL;
660 }
661 }
662 if (specific_worker != NULL) {
663 GPR_TIMER_MARK("finally_kick", 0);
664 push_back_worker(p, specific_worker);
665 grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
666 }
667 } else {
668 GPR_TIMER_MARK("kicked_no_pollers", 0);
669 p->kicked_without_pollers = 1;
670 }
671 }
672
673 GPR_TIMER_END("pollset_kick_ext", 0);
674}
675
676static void pollset_kick(grpc_pollset *p,
677 grpc_pollset_worker *specific_worker) {
678 pollset_kick_ext(p, specific_worker, 0);
679}
680
681/* global state management */
682
683static void pollset_global_init(void) {
684 gpr_tls_init(&g_current_thread_poller);
685 gpr_tls_init(&g_current_thread_worker);
Craig Tiller253bd502016-02-25 12:30:23 -0800686 grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
687}
688
689static void pollset_global_shutdown(void) {
690 grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
691 gpr_tls_destroy(&g_current_thread_poller);
692 gpr_tls_destroy(&g_current_thread_worker);
Craig Tiller253bd502016-02-25 12:30:23 -0800693}
694
695static void kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); }
696
697/* main interface */
698
Craig Tillerac04b7f2016-02-26 08:36:44 -0800699static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
700 gpr_mu_init(&pollset->mu);
701 *mu = &pollset->mu;
Craig Tiller253bd502016-02-25 12:30:23 -0800702 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
703 pollset->in_flight_cbs = 0;
704 pollset->shutting_down = 0;
705 pollset->called_shutdown = 0;
706 pollset->kicked_without_pollers = 0;
707 pollset->idle_jobs.head = pollset->idle_jobs.tail = NULL;
708 pollset->local_wakeup_cache = NULL;
709 pollset->kicked_without_pollers = 0;
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800710 pollset->fd_count = 0;
Craig Tillerb38197e2016-02-26 10:14:54 -0800711 pollset->fd_capacity = 0;
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800712 pollset->del_count = 0;
Craig Tillerb38197e2016-02-26 10:14:54 -0800713 pollset->del_capacity = 0;
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800714 pollset->fds = NULL;
715 pollset->dels = NULL;
Craig Tiller253bd502016-02-25 12:30:23 -0800716}
717
718static void pollset_destroy(grpc_pollset *pollset) {
719 GPR_ASSERT(pollset->in_flight_cbs == 0);
720 GPR_ASSERT(!pollset_has_workers(pollset));
721 GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail);
Craig Tiller253bd502016-02-25 12:30:23 -0800722 while (pollset->local_wakeup_cache) {
723 grpc_cached_wakeup_fd *next = pollset->local_wakeup_cache->next;
724 grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd);
725 gpr_free(pollset->local_wakeup_cache);
726 pollset->local_wakeup_cache = next;
727 }
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800728 gpr_free(pollset->fds);
729 gpr_free(pollset->dels);
Craig Tillerac04b7f2016-02-26 08:36:44 -0800730 gpr_mu_destroy(&pollset->mu);
Craig Tiller253bd502016-02-25 12:30:23 -0800731}
732
733static void pollset_reset(grpc_pollset *pollset) {
734 GPR_ASSERT(pollset->shutting_down);
735 GPR_ASSERT(pollset->in_flight_cbs == 0);
736 GPR_ASSERT(!pollset_has_workers(pollset));
737 GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail);
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800738 GPR_ASSERT(pollset->fd_count == 0);
739 GPR_ASSERT(pollset->del_count == 0);
Craig Tiller253bd502016-02-25 12:30:23 -0800740 pollset->shutting_down = 0;
741 pollset->called_shutdown = 0;
742 pollset->kicked_without_pollers = 0;
Craig Tiller253bd502016-02-25 12:30:23 -0800743}
744
745static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
746 grpc_fd *fd) {
Craig Tillerac04b7f2016-02-26 08:36:44 -0800747 gpr_mu_lock(&pollset->mu);
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800748 size_t i;
749 /* TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here */
750 for (i = 0; i < pollset->fd_count; i++) {
751 if (pollset->fds[i] == fd) goto exit;
752 }
753 if (pollset->fd_count == pollset->fd_capacity) {
754 pollset->fd_capacity =
755 GPR_MAX(pollset->fd_capacity + 8, pollset->fd_count * 3 / 2);
756 pollset->fds =
757 gpr_realloc(pollset->fds, sizeof(grpc_fd *) * pollset->fd_capacity);
758 }
759 pollset->fds[pollset->fd_count++] = fd;
760 GRPC_FD_REF(fd, "multipoller");
Craig Tiller4c2218e2016-05-11 10:27:08 -0700761 pollset_kick(pollset, NULL);
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800762exit:
Craig Tillerac04b7f2016-02-26 08:36:44 -0800763 gpr_mu_unlock(&pollset->mu);
Craig Tiller253bd502016-02-25 12:30:23 -0800764}
765
766static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
767 GPR_ASSERT(grpc_closure_list_empty(pollset->idle_jobs));
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800768 size_t i;
769 for (i = 0; i < pollset->fd_count; i++) {
770 GRPC_FD_UNREF(pollset->fds[i], "multipoller");
771 }
772 for (i = 0; i < pollset->del_count; i++) {
773 GRPC_FD_UNREF(pollset->dels[i], "multipoller_del");
774 }
775 pollset->fd_count = 0;
776 pollset->del_count = 0;
Craig Tiller253bd502016-02-25 12:30:23 -0800777 grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL);
778}
779
780static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
781 grpc_pollset_worker **worker_hdl, gpr_timespec now,
782 gpr_timespec deadline) {
783 grpc_pollset_worker worker;
784 *worker_hdl = &worker;
785
786 /* pollset->mu already held */
787 int added_worker = 0;
788 int locked = 1;
789 int queued_work = 0;
790 int keep_polling = 0;
791 GPR_TIMER_BEGIN("pollset_work", 0);
792 /* this must happen before we (potentially) drop pollset->mu */
793 worker.next = worker.prev = NULL;
794 worker.reevaluate_polling_on_wakeup = 0;
795 if (pollset->local_wakeup_cache != NULL) {
796 worker.wakeup_fd = pollset->local_wakeup_cache;
797 pollset->local_wakeup_cache = worker.wakeup_fd->next;
798 } else {
799 worker.wakeup_fd = gpr_malloc(sizeof(*worker.wakeup_fd));
800 grpc_wakeup_fd_init(&worker.wakeup_fd->fd);
801 }
802 worker.kicked_specifically = 0;
803 /* If there's work waiting for the pollset to be idle, and the
804 pollset is idle, then do that work */
805 if (!pollset_has_workers(pollset) &&
806 !grpc_closure_list_empty(pollset->idle_jobs)) {
807 GPR_TIMER_MARK("pollset_work.idle_jobs", 0);
808 grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
809 goto done;
810 }
811 /* If we're shutting down then we don't execute any extended work */
812 if (pollset->shutting_down) {
813 GPR_TIMER_MARK("pollset_work.shutting_down", 0);
814 goto done;
815 }
816 /* Give do_promote priority so we don't starve it out */
817 if (pollset->in_flight_cbs) {
818 GPR_TIMER_MARK("pollset_work.in_flight_cbs", 0);
Craig Tillerac04b7f2016-02-26 08:36:44 -0800819 gpr_mu_unlock(&pollset->mu);
Craig Tiller253bd502016-02-25 12:30:23 -0800820 locked = 0;
821 goto done;
822 }
823 /* Start polling, and keep doing so while we're being asked to
824 re-evaluate our pollers (this allows poll() based pollers to
825 ensure they don't miss wakeups) */
826 keep_polling = 1;
Craig Tiller556e5ae2016-05-16 11:00:33 -0700827 gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset);
Craig Tiller253bd502016-02-25 12:30:23 -0800828 while (keep_polling) {
829 keep_polling = 0;
830 if (!pollset->kicked_without_pollers) {
831 if (!added_worker) {
832 push_front_worker(pollset, &worker);
833 added_worker = 1;
834 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
835 }
Craig Tiller253bd502016-02-25 12:30:23 -0800836 GPR_TIMER_BEGIN("maybe_work_and_unlock", 0);
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800837#define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR)
838#define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR)
839
840 int timeout;
841 int r;
842 size_t i, j, fd_count;
843 nfds_t pfd_count;
844 /* TODO(ctiller): inline some elements to avoid an allocation */
845 grpc_fd_watcher *watchers;
846 struct pollfd *pfds;
847
848 timeout = poll_deadline_to_millis_timeout(deadline, now);
849 /* TODO(ctiller): perform just one malloc here if we exceed the inline
850 * case */
851 pfds = gpr_malloc(sizeof(*pfds) * (pollset->fd_count + 2));
852 watchers = gpr_malloc(sizeof(*watchers) * (pollset->fd_count + 2));
853 fd_count = 0;
854 pfd_count = 2;
855 pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd);
856 pfds[0].events = POLLIN;
857 pfds[0].revents = 0;
858 pfds[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker.wakeup_fd->fd);
859 pfds[1].events = POLLIN;
860 pfds[1].revents = 0;
861 for (i = 0; i < pollset->fd_count; i++) {
862 int remove = fd_is_orphaned(pollset->fds[i]);
863 for (j = 0; !remove && j < pollset->del_count; j++) {
864 if (pollset->fds[i] == pollset->dels[j]) remove = 1;
865 }
866 if (remove) {
867 GRPC_FD_UNREF(pollset->fds[i], "multipoller");
868 } else {
869 pollset->fds[fd_count++] = pollset->fds[i];
870 watchers[pfd_count].fd = pollset->fds[i];
Craig Tiller2fad50d2016-03-08 07:52:42 -0800871 GRPC_FD_REF(watchers[pfd_count].fd, "multipoller_start");
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800872 pfds[pfd_count].fd = pollset->fds[i]->fd;
873 pfds[pfd_count].revents = 0;
874 pfd_count++;
875 }
876 }
877 for (j = 0; j < pollset->del_count; j++) {
878 GRPC_FD_UNREF(pollset->dels[j], "multipoller_del");
879 }
880 pollset->del_count = 0;
881 pollset->fd_count = fd_count;
Craig Tillerac04b7f2016-02-26 08:36:44 -0800882 gpr_mu_unlock(&pollset->mu);
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800883
884 for (i = 2; i < pfd_count; i++) {
Craig Tiller2fad50d2016-03-08 07:52:42 -0800885 grpc_fd *fd = watchers[i].fd;
886 pfds[i].events = (short)fd_begin_poll(fd, pollset, &worker, POLLIN,
Craig Tillerd9a60bb2016-03-28 23:13:19 -0700887 POLLOUT, &watchers[i]);
Craig Tiller2fad50d2016-03-08 07:52:42 -0800888 GRPC_FD_UNREF(fd, "multipoller_start");
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800889 }
890
891 /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
892 even going into the blocking annotation if possible */
893 GRPC_SCHEDULING_START_BLOCKING_REGION;
894 r = grpc_poll_function(pfds, pfd_count, timeout);
895 GRPC_SCHEDULING_END_BLOCKING_REGION;
896
897 if (r < 0) {
898 if (errno != EINTR) {
899 gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
900 }
901 for (i = 2; i < pfd_count; i++) {
902 fd_end_poll(exec_ctx, &watchers[i], 0, 0);
903 }
904 } else if (r == 0) {
905 for (i = 2; i < pfd_count; i++) {
906 fd_end_poll(exec_ctx, &watchers[i], 0, 0);
907 }
908 } else {
909 if (pfds[0].revents & POLLIN_CHECK) {
910 grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
911 }
912 if (pfds[1].revents & POLLIN_CHECK) {
913 grpc_wakeup_fd_consume_wakeup(&worker.wakeup_fd->fd);
914 }
915 for (i = 2; i < pfd_count; i++) {
916 if (watchers[i].fd == NULL) {
917 fd_end_poll(exec_ctx, &watchers[i], 0, 0);
Craig Tiller9d861202016-05-11 10:12:57 -0700918 } else {
919 fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN_CHECK,
920 pfds[i].revents & POLLOUT_CHECK);
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800921 }
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800922 }
923 }
924
925 gpr_free(pfds);
926 gpr_free(watchers);
Craig Tiller253bd502016-02-25 12:30:23 -0800927 GPR_TIMER_END("maybe_work_and_unlock", 0);
928 locked = 0;
Craig Tiller253bd502016-02-25 12:30:23 -0800929 } else {
930 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
931 pollset->kicked_without_pollers = 0;
932 }
933 /* Finished execution - start cleaning up.
934 Note that we may arrive here from outside the enclosing while() loop.
935 In that case we won't loop though as we haven't added worker to the
936 worker list, which means nobody could ask us to re-evaluate polling). */
937 done:
938 if (!locked) {
939 queued_work |= grpc_exec_ctx_flush(exec_ctx);
Craig Tillerac04b7f2016-02-26 08:36:44 -0800940 gpr_mu_lock(&pollset->mu);
Craig Tiller253bd502016-02-25 12:30:23 -0800941 locked = 1;
942 }
943 /* If we're forced to re-evaluate polling (via pollset_kick with
944 GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force
945 a loop */
946 if (worker.reevaluate_polling_on_wakeup) {
947 worker.reevaluate_polling_on_wakeup = 0;
948 pollset->kicked_without_pollers = 0;
949 if (queued_work || worker.kicked_specifically) {
950 /* If there's queued work on the list, then set the deadline to be
951 immediate so we get back out of the polling loop quickly */
952 deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC);
953 }
954 keep_polling = 1;
955 }
Craig Tiller9d861202016-05-11 10:12:57 -0700956 if (keep_polling) {
957 now = gpr_now(now.clock_type);
958 }
Craig Tiller253bd502016-02-25 12:30:23 -0800959 }
Craig Tiller556e5ae2016-05-16 11:00:33 -0700960 gpr_tls_set(&g_current_thread_poller, 0);
Craig Tiller253bd502016-02-25 12:30:23 -0800961 if (added_worker) {
962 remove_worker(pollset, &worker);
963 gpr_tls_set(&g_current_thread_worker, 0);
964 }
965 /* release wakeup fd to the local pool */
966 worker.wakeup_fd->next = pollset->local_wakeup_cache;
967 pollset->local_wakeup_cache = worker.wakeup_fd;
968 /* check shutdown conditions */
969 if (pollset->shutting_down) {
970 if (pollset_has_workers(pollset)) {
971 pollset_kick(pollset, NULL);
972 } else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) {
973 pollset->called_shutdown = 1;
Craig Tillerac04b7f2016-02-26 08:36:44 -0800974 gpr_mu_unlock(&pollset->mu);
Craig Tiller253bd502016-02-25 12:30:23 -0800975 finish_shutdown(exec_ctx, pollset);
976 grpc_exec_ctx_flush(exec_ctx);
977 /* Continuing to access pollset here is safe -- it is the caller's
978 * responsibility to not destroy when it has outstanding calls to
979 * pollset_work.
980 * TODO(dklempner): Can we refactor the shutdown logic to avoid this? */
Craig Tillerac04b7f2016-02-26 08:36:44 -0800981 gpr_mu_lock(&pollset->mu);
Craig Tiller253bd502016-02-25 12:30:23 -0800982 } else if (!grpc_closure_list_empty(pollset->idle_jobs)) {
983 grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
Craig Tillerac04b7f2016-02-26 08:36:44 -0800984 gpr_mu_unlock(&pollset->mu);
Craig Tiller253bd502016-02-25 12:30:23 -0800985 grpc_exec_ctx_flush(exec_ctx);
Craig Tillerac04b7f2016-02-26 08:36:44 -0800986 gpr_mu_lock(&pollset->mu);
Craig Tiller253bd502016-02-25 12:30:23 -0800987 }
988 }
989 *worker_hdl = NULL;
990 GPR_TIMER_END("pollset_work", 0);
991}
992
993static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
994 grpc_closure *closure) {
995 GPR_ASSERT(!pollset->shutting_down);
996 pollset->shutting_down = 1;
997 pollset->shutdown_done = closure;
998 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
999 if (!pollset_has_workers(pollset)) {
1000 grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
1001 }
1002 if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 &&
1003 !pollset_has_workers(pollset)) {
1004 pollset->called_shutdown = 1;
1005 finish_shutdown(exec_ctx, pollset);
1006 }
1007}
1008
1009static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
1010 gpr_timespec now) {
1011 gpr_timespec timeout;
1012 static const int64_t max_spin_polling_us = 10;
1013 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
1014 return -1;
1015 }
1016 if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
1017 max_spin_polling_us,
1018 GPR_TIMESPAN))) <= 0) {
1019 return 0;
1020 }
1021 timeout = gpr_time_sub(deadline, now);
1022 return gpr_time_to_millis(gpr_time_add(
1023 timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
1024}
1025
Craig Tiller253bd502016-02-25 12:30:23 -08001026/*******************************************************************************
1027 * pollset_set_posix.c
1028 */
1029
1030static grpc_pollset_set *pollset_set_create(void) {
1031 grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
1032 memset(pollset_set, 0, sizeof(*pollset_set));
1033 gpr_mu_init(&pollset_set->mu);
1034 return pollset_set;
1035}
1036
1037static void pollset_set_destroy(grpc_pollset_set *pollset_set) {
1038 size_t i;
1039 gpr_mu_destroy(&pollset_set->mu);
1040 for (i = 0; i < pollset_set->fd_count; i++) {
1041 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1042 }
1043 gpr_free(pollset_set->pollsets);
1044 gpr_free(pollset_set->pollset_sets);
1045 gpr_free(pollset_set->fds);
1046 gpr_free(pollset_set);
1047}
1048
1049static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
1050 grpc_pollset_set *pollset_set,
1051 grpc_pollset *pollset) {
1052 size_t i, j;
1053 gpr_mu_lock(&pollset_set->mu);
1054 if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
1055 pollset_set->pollset_capacity =
1056 GPR_MAX(8, 2 * pollset_set->pollset_capacity);
1057 pollset_set->pollsets =
1058 gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity *
1059 sizeof(*pollset_set->pollsets));
1060 }
1061 pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
1062 for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
1063 if (fd_is_orphaned(pollset_set->fds[i])) {
1064 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1065 } else {
1066 pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
1067 pollset_set->fds[j++] = pollset_set->fds[i];
1068 }
1069 }
1070 pollset_set->fd_count = j;
1071 gpr_mu_unlock(&pollset_set->mu);
1072}
1073
1074static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
1075 grpc_pollset_set *pollset_set,
1076 grpc_pollset *pollset) {
1077 size_t i;
1078 gpr_mu_lock(&pollset_set->mu);
1079 for (i = 0; i < pollset_set->pollset_count; i++) {
1080 if (pollset_set->pollsets[i] == pollset) {
1081 pollset_set->pollset_count--;
1082 GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i],
1083 pollset_set->pollsets[pollset_set->pollset_count]);
1084 break;
1085 }
1086 }
1087 gpr_mu_unlock(&pollset_set->mu);
1088}
1089
1090static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
1091 grpc_pollset_set *bag,
1092 grpc_pollset_set *item) {
1093 size_t i, j;
1094 gpr_mu_lock(&bag->mu);
1095 if (bag->pollset_set_count == bag->pollset_set_capacity) {
1096 bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
1097 bag->pollset_sets =
1098 gpr_realloc(bag->pollset_sets,
1099 bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
1100 }
1101 bag->pollset_sets[bag->pollset_set_count++] = item;
1102 for (i = 0, j = 0; i < bag->fd_count; i++) {
1103 if (fd_is_orphaned(bag->fds[i])) {
1104 GRPC_FD_UNREF(bag->fds[i], "pollset_set");
1105 } else {
1106 pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
1107 bag->fds[j++] = bag->fds[i];
1108 }
1109 }
1110 bag->fd_count = j;
1111 gpr_mu_unlock(&bag->mu);
1112}
1113
1114static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
1115 grpc_pollset_set *bag,
1116 grpc_pollset_set *item) {
1117 size_t i;
1118 gpr_mu_lock(&bag->mu);
1119 for (i = 0; i < bag->pollset_set_count; i++) {
1120 if (bag->pollset_sets[i] == item) {
1121 bag->pollset_set_count--;
1122 GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i],
1123 bag->pollset_sets[bag->pollset_set_count]);
1124 break;
1125 }
1126 }
1127 gpr_mu_unlock(&bag->mu);
1128}
1129
1130static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
1131 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1132 size_t i;
1133 gpr_mu_lock(&pollset_set->mu);
1134 if (pollset_set->fd_count == pollset_set->fd_capacity) {
1135 pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
1136 pollset_set->fds = gpr_realloc(
1137 pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds));
1138 }
1139 GRPC_FD_REF(fd, "pollset_set");
1140 pollset_set->fds[pollset_set->fd_count++] = fd;
1141 for (i = 0; i < pollset_set->pollset_count; i++) {
1142 pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd);
1143 }
1144 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1145 pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1146 }
1147 gpr_mu_unlock(&pollset_set->mu);
1148}
1149
1150static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
1151 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1152 size_t i;
1153 gpr_mu_lock(&pollset_set->mu);
1154 for (i = 0; i < pollset_set->fd_count; i++) {
1155 if (pollset_set->fds[i] == fd) {
1156 pollset_set->fd_count--;
1157 GPR_SWAP(grpc_fd *, pollset_set->fds[i],
1158 pollset_set->fds[pollset_set->fd_count]);
1159 GRPC_FD_UNREF(fd, "pollset_set");
1160 break;
1161 }
1162 }
1163 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1164 pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1165 }
1166 gpr_mu_unlock(&pollset_set->mu);
1167}
1168
1169/*******************************************************************************
1170 * event engine binding
1171 */
1172
Craig Tiller7ac6bf02016-02-25 12:54:59 -08001173static void shutdown_engine(void) { pollset_global_shutdown(); }
Craig Tiller253bd502016-02-25 12:30:23 -08001174
1175static const grpc_event_engine_vtable vtable = {
1176 .pollset_size = sizeof(grpc_pollset),
1177
1178 .fd_create = fd_create,
1179 .fd_wrapped_fd = fd_wrapped_fd,
1180 .fd_orphan = fd_orphan,
1181 .fd_shutdown = fd_shutdown,
1182 .fd_notify_on_read = fd_notify_on_read,
1183 .fd_notify_on_write = fd_notify_on_write,
1184
1185 .pollset_init = pollset_init,
1186 .pollset_shutdown = pollset_shutdown,
1187 .pollset_reset = pollset_reset,
1188 .pollset_destroy = pollset_destroy,
1189 .pollset_work = pollset_work,
1190 .pollset_kick = pollset_kick,
1191 .pollset_add_fd = pollset_add_fd,
1192
1193 .pollset_set_create = pollset_set_create,
1194 .pollset_set_destroy = pollset_set_destroy,
1195 .pollset_set_add_pollset = pollset_set_add_pollset,
1196 .pollset_set_del_pollset = pollset_set_del_pollset,
1197 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
1198 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
1199 .pollset_set_add_fd = pollset_set_add_fd,
1200 .pollset_set_del_fd = pollset_set_del_fd,
1201
1202 .kick_poller = kick_poller,
1203
1204 .shutdown_engine = shutdown_engine,
1205};
1206
1207const grpc_event_engine_vtable *grpc_init_poll_posix(void) {
Craig Tiller253bd502016-02-25 12:30:23 -08001208 pollset_global_init();
1209 return &vtable;
1210}
1211
1212#endif