blob: c03fadaebb5a10fa7f60647ae745d03813d4fe12 [file] [log] [blame]
Craig Tiller253bd502016-02-25 12:30:23 -08001/*
2 *
Craig Tillerd6a5b802016-05-13 20:17:38 -07003 * Copyright 2016, Google Inc.
Craig Tiller253bd502016-02-25 12:30:23 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
murgatroid9954070892016-08-08 17:01:18 -070034#include "src/core/lib/iomgr/port.h"
Craig Tiller253bd502016-02-25 12:30:23 -080035
murgatroid99623dd4f2016-08-08 17:31:27 -070036#ifdef GRPC_POSIX_SOCKET
Craig Tiller253bd502016-02-25 12:30:23 -080037
Craig Tillerd9a60bb2016-03-28 23:13:19 -070038#include "src/core/lib/iomgr/ev_poll_posix.h"
Craig Tiller253bd502016-02-25 12:30:23 -080039
40#include <assert.h>
41#include <errno.h>
42#include <poll.h>
43#include <string.h>
44#include <sys/socket.h>
45#include <unistd.h>
46
47#include <grpc/support/alloc.h>
48#include <grpc/support/log.h>
49#include <grpc/support/string_util.h>
Ken Payson82e4ec72016-10-13 12:26:01 -070050#include <grpc/support/thd.h>
Craig Tiller253bd502016-02-25 12:30:23 -080051#include <grpc/support/tls.h>
52#include <grpc/support/useful.h>
53
Craig Tillerd9a60bb2016-03-28 23:13:19 -070054#include "src/core/lib/iomgr/iomgr_internal.h"
Ken Payson82e4ec72016-10-13 12:26:01 -070055#include "src/core/lib/iomgr/wakeup_fd_cv.h"
Craig Tillerd9a60bb2016-03-28 23:13:19 -070056#include "src/core/lib/iomgr/wakeup_fd_posix.h"
57#include "src/core/lib/profiling/timers.h"
58#include "src/core/lib/support/block_annotate.h"
Craig Tiller253bd502016-02-25 12:30:23 -080059
60/*******************************************************************************
61 * FD declarations
62 */
63
64typedef struct grpc_fd_watcher {
65 struct grpc_fd_watcher *next;
66 struct grpc_fd_watcher *prev;
67 grpc_pollset *pollset;
68 grpc_pollset_worker *worker;
69 grpc_fd *fd;
70} grpc_fd_watcher;
71
72struct grpc_fd {
73 int fd;
74 /* refst format:
75 bit0: 1=active/0=orphaned
76 bit1-n: refcount
77 meaning that mostly we ref by two to avoid altering the orphaned bit,
78 and just unref by 1 when we're ready to flag the object as orphaned */
79 gpr_atm refst;
80
81 gpr_mu mu;
82 int shutdown;
83 int closed;
84 int released;
Craig Tillercda759d2017-01-27 11:37:37 -080085 grpc_error *shutdown_error;
Craig Tiller253bd502016-02-25 12:30:23 -080086
87 /* The watcher list.
88
89 The following watcher related fields are protected by watcher_mu.
90
91 An fd_watcher is an ephemeral object created when an fd wants to
92 begin polling, and destroyed after the poll.
93
94 It denotes the fd's interest in whether to read poll or write poll
95 or both or neither on this fd.
96
97 If a watcher is asked to poll for reads or writes, the read_watcher
98 or write_watcher fields are set respectively. A watcher may be asked
99 to poll for both, in which case both fields will be set.
100
101 read_watcher and write_watcher may be NULL if no watcher has been
102 asked to poll for reads or writes.
103
104 If an fd_watcher is not asked to poll for reads or writes, it's added
105 to a linked list of inactive watchers, rooted at inactive_watcher_root.
106 If at a later time there becomes need of a poller to poll, one of
107 the inactive pollers may be kicked out of their poll loops to take
108 that responsibility. */
109 grpc_fd_watcher inactive_watcher_root;
110 grpc_fd_watcher *read_watcher;
111 grpc_fd_watcher *write_watcher;
112
113 grpc_closure *read_closure;
114 grpc_closure *write_closure;
115
Craig Tiller253bd502016-02-25 12:30:23 -0800116 grpc_closure *on_done_closure;
117
118 grpc_iomgr_object iomgr_object;
Craig Tillerb1d3b362016-05-14 13:20:21 -0700119
120 /* The pollset that last noticed and notified that the fd is readable */
121 grpc_pollset *read_notifier_pollset;
Craig Tiller253bd502016-02-25 12:30:23 -0800122};
123
Craig Tillerb4b8e1e2016-11-28 07:33:13 -0800124static grpc_wakeup_fd global_wakeup_fd;
125
Craig Tiller253bd502016-02-25 12:30:23 -0800126/* Begin polling on an fd.
127 Registers that the given pollset is interested in this fd - so that if read
128 or writability interest changes, the pollset can be kicked to pick up that
129 new interest.
130 Return value is:
131 (fd_needs_read? read_mask : 0) | (fd_needs_write? write_mask : 0)
132 i.e. a combination of read_mask and write_mask determined by the fd's current
133 interest in said events.
134 Polling strategies that do not need to alter their behavior depending on the
135 fd's current interest (such as epoll) do not need to call this function.
136 MUST NOT be called with a pollset lock taken */
137static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
138 grpc_pollset_worker *worker, uint32_t read_mask,
139 uint32_t write_mask, grpc_fd_watcher *rec);
140/* Complete polling previously started with fd_begin_poll
141 MUST NOT be called with a pollset lock taken
142 if got_read or got_write are 1, also does the become_{readable,writable} as
143 appropriate. */
144static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *rec,
Craig Tillerb1d3b362016-05-14 13:20:21 -0700145 int got_read, int got_write,
146 grpc_pollset *read_notifier_pollset);
Craig Tiller253bd502016-02-25 12:30:23 -0800147
148/* Return 1 if this fd is orphaned, 0 otherwise */
149static bool fd_is_orphaned(grpc_fd *fd);
150
Craig Tiller253bd502016-02-25 12:30:23 -0800151/* Reference counting for fds */
Craig Tiller9e5ac1b2017-02-14 22:25:50 -0800152//#define GRPC_FD_REF_COUNT_DEBUG
Craig Tiller253bd502016-02-25 12:30:23 -0800153#ifdef GRPC_FD_REF_COUNT_DEBUG
154static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
155static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
156 int line);
157#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
158#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
159#else
160static void fd_ref(grpc_fd *fd);
161static void fd_unref(grpc_fd *fd);
162#define GRPC_FD_REF(fd, reason) fd_ref(fd)
163#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
164#endif
165
Craig Tiller253bd502016-02-25 12:30:23 -0800166#define CLOSURE_NOT_READY ((grpc_closure *)0)
167#define CLOSURE_READY ((grpc_closure *)1)
168
169/*******************************************************************************
170 * pollset declarations
171 */
172
Craig Tiller253bd502016-02-25 12:30:23 -0800173typedef struct grpc_cached_wakeup_fd {
174 grpc_wakeup_fd fd;
175 struct grpc_cached_wakeup_fd *next;
176} grpc_cached_wakeup_fd;
177
178struct grpc_pollset_worker {
179 grpc_cached_wakeup_fd *wakeup_fd;
180 int reevaluate_polling_on_wakeup;
181 int kicked_specifically;
182 struct grpc_pollset_worker *next;
183 struct grpc_pollset_worker *prev;
184};
185
186struct grpc_pollset {
Craig Tillerac04b7f2016-02-26 08:36:44 -0800187 gpr_mu mu;
Craig Tiller253bd502016-02-25 12:30:23 -0800188 grpc_pollset_worker root_worker;
Craig Tiller253bd502016-02-25 12:30:23 -0800189 int shutting_down;
190 int called_shutdown;
191 int kicked_without_pollers;
192 grpc_closure *shutdown_done;
193 grpc_closure_list idle_jobs;
Craig Tiller22543592017-02-14 10:29:36 -0800194 int pollset_set_count;
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800195 /* all polled fds */
196 size_t fd_count;
197 size_t fd_capacity;
198 grpc_fd **fds;
Craig Tiller253bd502016-02-25 12:30:23 -0800199 /* Local cache of eventfds for workers */
200 grpc_cached_wakeup_fd *local_wakeup_cache;
201};
202
Craig Tiller253bd502016-02-25 12:30:23 -0800203/* Add an fd to a pollset */
204static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
205 struct grpc_fd *fd);
206
207static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
208 grpc_pollset_set *pollset_set, grpc_fd *fd);
209
210/* Convert a timespec to milliseconds:
211 - very small or negative poll times are clamped to zero to do a
212 non-blocking poll (which becomes spin polling)
213 - other small values are rounded up to one millisecond
214 - longer than a millisecond polls are rounded up to the next nearest
215 millisecond to avoid spinning
216 - infinite timeouts are converted to -1 */
217static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
218 gpr_timespec now);
219
220/* Allow kick to wakeup the currently polling worker */
221#define GRPC_POLLSET_CAN_KICK_SELF 1
222/* Force the wakee to repoll when awoken */
223#define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2
224/* As per pollset_kick, with an extended set of flags (defined above)
225 -- mostly for fd_posix's use. */
Craig Tillerd6a5b802016-05-13 20:17:38 -0700226static grpc_error *pollset_kick_ext(grpc_pollset *p,
227 grpc_pollset_worker *specific_worker,
228 uint32_t flags) GRPC_MUST_USE_RESULT;
Craig Tiller253bd502016-02-25 12:30:23 -0800229
Craig Tiller253bd502016-02-25 12:30:23 -0800230/* Return 1 if the pollset has active threads in pollset_work (pollset must
231 * be locked) */
Craig Tiller22543592017-02-14 10:29:36 -0800232static bool pollset_has_workers(grpc_pollset *pollset);
Craig Tiller253bd502016-02-25 12:30:23 -0800233
Craig Tiller253bd502016-02-25 12:30:23 -0800234/*******************************************************************************
235 * pollset_set definitions
236 */
237
238struct grpc_pollset_set {
239 gpr_mu mu;
240
241 size_t pollset_count;
242 size_t pollset_capacity;
243 grpc_pollset **pollsets;
244
245 size_t pollset_set_count;
246 size_t pollset_set_capacity;
247 struct grpc_pollset_set **pollset_sets;
248
249 size_t fd_count;
250 size_t fd_capacity;
251 grpc_fd **fds;
252};
253
254/*******************************************************************************
Ken Payson82e4ec72016-10-13 12:26:01 -0700255 * condition variable polling definitions
256 */
257
258#define CV_POLL_PERIOD_MS 1000
259#define CV_DEFAULT_TABLE_SIZE 16
260
Ken Paysonb97cbd72016-10-13 15:06:01 -0700261typedef enum poll_status_t { INPROGRESS, COMPLETED, CANCELLED } poll_status_t;
Ken Payson82e4ec72016-10-13 12:26:01 -0700262
263typedef struct poll_args {
264 gpr_refcount refcount;
265 gpr_cv *cv;
266 struct pollfd *fds;
267 nfds_t nfds;
268 int timeout;
269 int retval;
270 int err;
Ken Paysonb97cbd72016-10-13 15:06:01 -0700271 gpr_atm status;
Ken Payson82e4ec72016-10-13 12:26:01 -0700272} poll_args;
273
274cv_fd_table g_cvfds;
275
276/*******************************************************************************
Craig Tiller253bd502016-02-25 12:30:23 -0800277 * fd_posix.c
278 */
279
Craig Tiller253bd502016-02-25 12:30:23 -0800280#ifdef GRPC_FD_REF_COUNT_DEBUG
281#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
282#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
283static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
284 int line) {
285 gpr_log(GPR_DEBUG, "FD %d %p ref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
Craig Tiller9e5ac1b2017-02-14 22:25:50 -0800286 (int)gpr_atm_no_barrier_load(&fd->refst),
287 (int)gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
Craig Tiller253bd502016-02-25 12:30:23 -0800288#else
289#define REF_BY(fd, n, reason) ref_by(fd, n)
290#define UNREF_BY(fd, n, reason) unref_by(fd, n)
291static void ref_by(grpc_fd *fd, int n) {
292#endif
293 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
294}
295
296#ifdef GRPC_FD_REF_COUNT_DEBUG
297static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
298 int line) {
299 gpr_atm old;
300 gpr_log(GPR_DEBUG, "FD %d %p unref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
Craig Tiller9e5ac1b2017-02-14 22:25:50 -0800301 (int)gpr_atm_no_barrier_load(&fd->refst),
302 (int)gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
Craig Tiller253bd502016-02-25 12:30:23 -0800303#else
304static void unref_by(grpc_fd *fd, int n) {
305 gpr_atm old;
306#endif
307 old = gpr_atm_full_fetch_add(&fd->refst, -n);
308 if (old == n) {
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800309 gpr_mu_destroy(&fd->mu);
Craig Tillerb38197e2016-02-26 10:14:54 -0800310 grpc_iomgr_unregister_object(&fd->iomgr_object);
Craig Tillercda759d2017-01-27 11:37:37 -0800311 if (fd->shutdown) GRPC_ERROR_UNREF(fd->shutdown_error);
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800312 gpr_free(fd);
Craig Tiller253bd502016-02-25 12:30:23 -0800313 } else {
314 GPR_ASSERT(old > n);
315 }
316}
317
Craig Tiller253bd502016-02-25 12:30:23 -0800318static grpc_fd *fd_create(int fd, const char *name) {
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800319 grpc_fd *r = gpr_malloc(sizeof(*r));
320 gpr_mu_init(&r->mu);
321 gpr_atm_rel_store(&r->refst, 1);
322 r->shutdown = 0;
323 r->read_closure = CLOSURE_NOT_READY;
324 r->write_closure = CLOSURE_NOT_READY;
325 r->fd = fd;
326 r->inactive_watcher_root.next = r->inactive_watcher_root.prev =
327 &r->inactive_watcher_root;
328 r->read_watcher = r->write_watcher = NULL;
329 r->on_done_closure = NULL;
330 r->closed = 0;
331 r->released = 0;
Craig Tillerb1d3b362016-05-14 13:20:21 -0700332 r->read_notifier_pollset = NULL;
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800333
Craig Tiller253bd502016-02-25 12:30:23 -0800334 char *name2;
335 gpr_asprintf(&name2, "%s fd=%d", name, fd);
336 grpc_iomgr_register_object(&r->iomgr_object, name2);
337 gpr_free(name2);
338#ifdef GRPC_FD_REF_COUNT_DEBUG
339 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, r, name);
340#endif
341 return r;
342}
343
344static bool fd_is_orphaned(grpc_fd *fd) {
345 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
346}
347
Craig Tillerb1d3b362016-05-14 13:20:21 -0700348/* Return the read-notifier pollset */
349static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
350 grpc_fd *fd) {
351 grpc_pollset *notifier = NULL;
352
353 gpr_mu_lock(&fd->mu);
354 notifier = fd->read_notifier_pollset;
355 gpr_mu_unlock(&fd->mu);
356
357 return notifier;
358}
359
Craig Tillerd6a5b802016-05-13 20:17:38 -0700360static grpc_error *pollset_kick_locked(grpc_fd_watcher *watcher) {
Craig Tillerac04b7f2016-02-26 08:36:44 -0800361 gpr_mu_lock(&watcher->pollset->mu);
Craig Tiller253bd502016-02-25 12:30:23 -0800362 GPR_ASSERT(watcher->worker);
Craig Tillerd6a5b802016-05-13 20:17:38 -0700363 grpc_error *err = pollset_kick_ext(watcher->pollset, watcher->worker,
364 GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP);
Craig Tillerac04b7f2016-02-26 08:36:44 -0800365 gpr_mu_unlock(&watcher->pollset->mu);
Craig Tillerd6a5b802016-05-13 20:17:38 -0700366 return err;
Craig Tiller253bd502016-02-25 12:30:23 -0800367}
368
369static void maybe_wake_one_watcher_locked(grpc_fd *fd) {
370 if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) {
371 pollset_kick_locked(fd->inactive_watcher_root.next);
372 } else if (fd->read_watcher) {
373 pollset_kick_locked(fd->read_watcher);
374 } else if (fd->write_watcher) {
375 pollset_kick_locked(fd->write_watcher);
376 }
377}
378
379static void wake_all_watchers_locked(grpc_fd *fd) {
380 grpc_fd_watcher *watcher;
381 for (watcher = fd->inactive_watcher_root.next;
382 watcher != &fd->inactive_watcher_root; watcher = watcher->next) {
383 pollset_kick_locked(watcher);
384 }
385 if (fd->read_watcher) {
386 pollset_kick_locked(fd->read_watcher);
387 }
388 if (fd->write_watcher && fd->write_watcher != fd->read_watcher) {
389 pollset_kick_locked(fd->write_watcher);
390 }
391}
392
393static int has_watchers(grpc_fd *fd) {
394 return fd->read_watcher != NULL || fd->write_watcher != NULL ||
395 fd->inactive_watcher_root.next != &fd->inactive_watcher_root;
396}
397
398static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
399 fd->closed = 1;
400 if (!fd->released) {
401 close(fd->fd);
Craig Tiller253bd502016-02-25 12:30:23 -0800402 }
Craig Tiller91031da2016-12-28 15:44:25 -0800403 grpc_closure_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE);
Craig Tiller253bd502016-02-25 12:30:23 -0800404}
405
406static int fd_wrapped_fd(grpc_fd *fd) {
407 if (fd->released || fd->closed) {
408 return -1;
409 } else {
410 return fd->fd;
411 }
412}
413
414static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
415 grpc_closure *on_done, int *release_fd,
416 const char *reason) {
417 fd->on_done_closure = on_done;
418 fd->released = release_fd != NULL;
David Garcia Quintase272af02016-12-12 14:51:31 -0800419 if (fd->released) {
Craig Tiller253bd502016-02-25 12:30:23 -0800420 *release_fd = fd->fd;
421 }
422 gpr_mu_lock(&fd->mu);
423 REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
424 if (!has_watchers(fd)) {
425 close_fd_locked(exec_ctx, fd);
426 } else {
427 wake_all_watchers_locked(fd);
428 }
429 gpr_mu_unlock(&fd->mu);
430 UNREF_BY(fd, 2, reason); /* drop the reference */
431}
432
433/* increment refcount by two to avoid changing the orphan bit */
434#ifdef GRPC_FD_REF_COUNT_DEBUG
435static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
436 int line) {
437 ref_by(fd, 2, reason, file, line);
438}
439
440static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
441 int line) {
442 unref_by(fd, 2, reason, file, line);
443}
444#else
445static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
446
447static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
448#endif
449
Craig Tillercda759d2017-01-27 11:37:37 -0800450static grpc_error *fd_shutdown_error(grpc_fd *fd) {
451 if (!fd->shutdown) {
Craig Tillerd6a5b802016-05-13 20:17:38 -0700452 return GRPC_ERROR_NONE;
453 } else {
Craig Tillercda759d2017-01-27 11:37:37 -0800454 return GRPC_ERROR_CREATE_REFERENCING("FD shutdown", &fd->shutdown_error, 1);
Craig Tillerd6a5b802016-05-13 20:17:38 -0700455 }
456}
457
Craig Tiller253bd502016-02-25 12:30:23 -0800458static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
459 grpc_closure **st, grpc_closure *closure) {
Craig Tiller52f23122016-06-15 09:34:14 -0700460 if (fd->shutdown) {
Craig Tiller91031da2016-12-28 15:44:25 -0800461 grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"));
Craig Tiller52f23122016-06-15 09:34:14 -0700462 } else if (*st == CLOSURE_NOT_READY) {
Craig Tiller253bd502016-02-25 12:30:23 -0800463 /* not ready ==> switch to a waiting state by setting the closure */
464 *st = closure;
465 } else if (*st == CLOSURE_READY) {
466 /* already ready ==> queue the closure to run immediately */
467 *st = CLOSURE_NOT_READY;
Craig Tillercda759d2017-01-27 11:37:37 -0800468 grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd));
Craig Tiller253bd502016-02-25 12:30:23 -0800469 maybe_wake_one_watcher_locked(fd);
470 } else {
471 /* upcallptr was set to a different closure. This is an error! */
472 gpr_log(GPR_ERROR,
473 "User called a notify_on function with a previous callback still "
474 "pending");
475 abort();
476 }
477}
478
479/* returns 1 if state becomes not ready */
480static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
481 grpc_closure **st) {
482 if (*st == CLOSURE_READY) {
483 /* duplicate ready ==> ignore */
484 return 0;
485 } else if (*st == CLOSURE_NOT_READY) {
486 /* not ready, and not waiting ==> flag ready */
487 *st = CLOSURE_READY;
488 return 0;
489 } else {
490 /* waiting ==> queue closure */
Craig Tillercda759d2017-01-27 11:37:37 -0800491 grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd));
Craig Tiller253bd502016-02-25 12:30:23 -0800492 *st = CLOSURE_NOT_READY;
493 return 1;
494 }
495}
496
Craig Tillerb1d3b362016-05-14 13:20:21 -0700497static void set_read_notifier_pollset_locked(
498 grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_pollset *read_notifier_pollset) {
499 fd->read_notifier_pollset = read_notifier_pollset;
500}
501
Craig Tillercda759d2017-01-27 11:37:37 -0800502static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
Craig Tiller253bd502016-02-25 12:30:23 -0800503 gpr_mu_lock(&fd->mu);
Craig Tiller52f23122016-06-15 09:34:14 -0700504 /* only shutdown once */
505 if (!fd->shutdown) {
506 fd->shutdown = 1;
Craig Tillercda759d2017-01-27 11:37:37 -0800507 fd->shutdown_error = why;
Craig Tiller52f23122016-06-15 09:34:14 -0700508 /* signal read/write closed to OS so that future operations fail */
509 shutdown(fd->fd, SHUT_RDWR);
510 set_ready_locked(exec_ctx, fd, &fd->read_closure);
511 set_ready_locked(exec_ctx, fd, &fd->write_closure);
Craig Tillercda759d2017-01-27 11:37:37 -0800512 } else {
513 GRPC_ERROR_UNREF(why);
Craig Tiller52f23122016-06-15 09:34:14 -0700514 }
Craig Tiller253bd502016-02-25 12:30:23 -0800515 gpr_mu_unlock(&fd->mu);
516}
517
Craig Tiller52f23122016-06-15 09:34:14 -0700518static bool fd_is_shutdown(grpc_fd *fd) {
519 gpr_mu_lock(&fd->mu);
520 bool r = fd->shutdown;
521 gpr_mu_unlock(&fd->mu);
522 return r;
523}
524
Craig Tiller253bd502016-02-25 12:30:23 -0800525static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
526 grpc_closure *closure) {
527 gpr_mu_lock(&fd->mu);
528 notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
529 gpr_mu_unlock(&fd->mu);
530}
531
532static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
533 grpc_closure *closure) {
534 gpr_mu_lock(&fd->mu);
535 notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
536 gpr_mu_unlock(&fd->mu);
537}
538
539static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
540 grpc_pollset_worker *worker, uint32_t read_mask,
541 uint32_t write_mask, grpc_fd_watcher *watcher) {
542 uint32_t mask = 0;
543 grpc_closure *cur;
544 int requested;
545 /* keep track of pollers that have requested our events, in case they change
546 */
547 GRPC_FD_REF(fd, "poll");
548
549 gpr_mu_lock(&fd->mu);
550
551 /* if we are shutdown, then don't add to the watcher set */
552 if (fd->shutdown) {
553 watcher->fd = NULL;
554 watcher->pollset = NULL;
555 watcher->worker = NULL;
556 gpr_mu_unlock(&fd->mu);
557 GRPC_FD_UNREF(fd, "poll");
558 return 0;
559 }
560
561 /* if there is nobody polling for read, but we need to, then start doing so */
562 cur = fd->read_closure;
563 requested = cur != CLOSURE_READY;
564 if (read_mask && fd->read_watcher == NULL && requested) {
565 fd->read_watcher = watcher;
566 mask |= read_mask;
567 }
568 /* if there is nobody polling for write, but we need to, then start doing so
569 */
570 cur = fd->write_closure;
571 requested = cur != CLOSURE_READY;
572 if (write_mask && fd->write_watcher == NULL && requested) {
573 fd->write_watcher = watcher;
574 mask |= write_mask;
575 }
576 /* if not polling, remember this watcher in case we need someone to later */
577 if (mask == 0 && worker != NULL) {
578 watcher->next = &fd->inactive_watcher_root;
579 watcher->prev = watcher->next->prev;
580 watcher->next->prev = watcher->prev->next = watcher;
581 }
582 watcher->pollset = pollset;
583 watcher->worker = worker;
584 watcher->fd = fd;
585 gpr_mu_unlock(&fd->mu);
586
587 return mask;
588}
589
590static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
Craig Tillerb1d3b362016-05-14 13:20:21 -0700591 int got_read, int got_write,
592 grpc_pollset *read_notifier_pollset) {
Craig Tiller253bd502016-02-25 12:30:23 -0800593 int was_polling = 0;
594 int kick = 0;
595 grpc_fd *fd = watcher->fd;
596
597 if (fd == NULL) {
598 return;
599 }
600
601 gpr_mu_lock(&fd->mu);
602
603 if (watcher == fd->read_watcher) {
604 /* remove read watcher, kick if we still need a read */
605 was_polling = 1;
606 if (!got_read) {
607 kick = 1;
608 }
609 fd->read_watcher = NULL;
610 }
611 if (watcher == fd->write_watcher) {
612 /* remove write watcher, kick if we still need a write */
613 was_polling = 1;
614 if (!got_write) {
615 kick = 1;
616 }
617 fd->write_watcher = NULL;
618 }
619 if (!was_polling && watcher->worker != NULL) {
620 /* remove from inactive list */
621 watcher->next->prev = watcher->prev;
622 watcher->prev->next = watcher->next;
623 }
624 if (got_read) {
625 if (set_ready_locked(exec_ctx, fd, &fd->read_closure)) {
626 kick = 1;
627 }
Craig Tillerb1d3b362016-05-14 13:20:21 -0700628 if (read_notifier_pollset != NULL) {
629 set_read_notifier_pollset_locked(exec_ctx, fd, read_notifier_pollset);
630 }
Craig Tiller253bd502016-02-25 12:30:23 -0800631 }
632 if (got_write) {
633 if (set_ready_locked(exec_ctx, fd, &fd->write_closure)) {
634 kick = 1;
635 }
636 }
637 if (kick) {
638 maybe_wake_one_watcher_locked(fd);
639 }
640 if (fd_is_orphaned(fd) && !has_watchers(fd) && !fd->closed) {
641 close_fd_locked(exec_ctx, fd);
642 }
643 gpr_mu_unlock(&fd->mu);
644
645 GRPC_FD_UNREF(fd, "poll");
646}
647
Craig Tiller70bd4832016-06-30 14:20:46 -0700648static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { return NULL; }
649
Craig Tiller253bd502016-02-25 12:30:23 -0800650/*******************************************************************************
651 * pollset_posix.c
652 */
653
654GPR_TLS_DECL(g_current_thread_poller);
655GPR_TLS_DECL(g_current_thread_worker);
656
Craig Tiller253bd502016-02-25 12:30:23 -0800657static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
658 worker->prev->next = worker->next;
659 worker->next->prev = worker->prev;
660}
661
Craig Tiller22543592017-02-14 10:29:36 -0800662static bool pollset_has_workers(grpc_pollset *p) {
Craig Tiller253bd502016-02-25 12:30:23 -0800663 return p->root_worker.next != &p->root_worker;
664}
665
Craig Tiller22543592017-02-14 10:29:36 -0800666static bool pollset_in_pollset_sets(grpc_pollset *p) {
667 return p->pollset_set_count;
668}
669
670static bool pollset_has_observers(grpc_pollset *p) {
671 return pollset_has_workers(p) || pollset_in_pollset_sets(p);
672}
673
Craig Tiller253bd502016-02-25 12:30:23 -0800674static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
675 if (pollset_has_workers(p)) {
676 grpc_pollset_worker *w = p->root_worker.next;
677 remove_worker(p, w);
678 return w;
679 } else {
680 return NULL;
681 }
682}
683
684static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
685 worker->next = &p->root_worker;
686 worker->prev = worker->next->prev;
687 worker->prev->next = worker->next->prev = worker;
688}
689
690static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
691 worker->prev = &p->root_worker;
692 worker->next = worker->prev->next;
693 worker->prev->next = worker->next->prev = worker;
694}
695
Craig Tillerd6a5b802016-05-13 20:17:38 -0700696static void kick_append_error(grpc_error **composite, grpc_error *error) {
697 if (error == GRPC_ERROR_NONE) return;
698 if (*composite == GRPC_ERROR_NONE) {
699 *composite = GRPC_ERROR_CREATE("Kick Failure");
700 }
701 *composite = grpc_error_add_child(*composite, error);
702}
703
704static grpc_error *pollset_kick_ext(grpc_pollset *p,
705 grpc_pollset_worker *specific_worker,
706 uint32_t flags) {
Craig Tiller253bd502016-02-25 12:30:23 -0800707 GPR_TIMER_BEGIN("pollset_kick_ext", 0);
Craig Tillerd6a5b802016-05-13 20:17:38 -0700708 grpc_error *error = GRPC_ERROR_NONE;
Craig Tiller253bd502016-02-25 12:30:23 -0800709
710 /* pollset->mu already held */
711 if (specific_worker != NULL) {
712 if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
713 GPR_TIMER_BEGIN("pollset_kick_ext.broadcast", 0);
714 GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
715 for (specific_worker = p->root_worker.next;
716 specific_worker != &p->root_worker;
717 specific_worker = specific_worker->next) {
Craig Tillerd6a5b802016-05-13 20:17:38 -0700718 kick_append_error(
719 &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
Craig Tiller253bd502016-02-25 12:30:23 -0800720 }
Craig Tillerd6a5b802016-05-13 20:17:38 -0700721 p->kicked_without_pollers = true;
Craig Tiller253bd502016-02-25 12:30:23 -0800722 GPR_TIMER_END("pollset_kick_ext.broadcast", 0);
723 } else if (gpr_tls_get(&g_current_thread_worker) !=
724 (intptr_t)specific_worker) {
725 GPR_TIMER_MARK("different_thread_worker", 0);
726 if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
Craig Tillerd6a5b802016-05-13 20:17:38 -0700727 specific_worker->reevaluate_polling_on_wakeup = true;
Craig Tiller253bd502016-02-25 12:30:23 -0800728 }
Craig Tillerd6a5b802016-05-13 20:17:38 -0700729 specific_worker->kicked_specifically = true;
730 kick_append_error(&error,
731 grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
Craig Tiller253bd502016-02-25 12:30:23 -0800732 } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) {
733 GPR_TIMER_MARK("kick_yoself", 0);
734 if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
Craig Tillerd6a5b802016-05-13 20:17:38 -0700735 specific_worker->reevaluate_polling_on_wakeup = true;
Craig Tiller253bd502016-02-25 12:30:23 -0800736 }
Craig Tillerd6a5b802016-05-13 20:17:38 -0700737 specific_worker->kicked_specifically = true;
738 kick_append_error(&error,
739 grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
Craig Tiller253bd502016-02-25 12:30:23 -0800740 }
741 } else if (gpr_tls_get(&g_current_thread_poller) != (intptr_t)p) {
742 GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
743 GPR_TIMER_MARK("kick_anonymous", 0);
744 specific_worker = pop_front_worker(p);
745 if (specific_worker != NULL) {
746 if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) {
747 GPR_TIMER_MARK("kick_anonymous_not_self", 0);
748 push_back_worker(p, specific_worker);
749 specific_worker = pop_front_worker(p);
750 if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 &&
751 gpr_tls_get(&g_current_thread_worker) ==
752 (intptr_t)specific_worker) {
753 push_back_worker(p, specific_worker);
754 specific_worker = NULL;
755 }
756 }
757 if (specific_worker != NULL) {
758 GPR_TIMER_MARK("finally_kick", 0);
759 push_back_worker(p, specific_worker);
Craig Tillerd6a5b802016-05-13 20:17:38 -0700760 kick_append_error(
761 &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
Craig Tiller253bd502016-02-25 12:30:23 -0800762 }
763 } else {
764 GPR_TIMER_MARK("kicked_no_pollers", 0);
Craig Tillerd6a5b802016-05-13 20:17:38 -0700765 p->kicked_without_pollers = true;
Craig Tiller253bd502016-02-25 12:30:23 -0800766 }
767 }
768
769 GPR_TIMER_END("pollset_kick_ext", 0);
Craig Tiller449c64b2016-06-13 16:26:50 -0700770 GRPC_LOG_IF_ERROR("pollset_kick_ext", GRPC_ERROR_REF(error));
Craig Tillerd6a5b802016-05-13 20:17:38 -0700771 return error;
Craig Tiller253bd502016-02-25 12:30:23 -0800772}
773
Craig Tillerd6a5b802016-05-13 20:17:38 -0700774static grpc_error *pollset_kick(grpc_pollset *p,
775 grpc_pollset_worker *specific_worker) {
776 return pollset_kick_ext(p, specific_worker, 0);
Craig Tiller253bd502016-02-25 12:30:23 -0800777}
778
779/* global state management */
780
Craig Tillerd6a5b802016-05-13 20:17:38 -0700781static grpc_error *pollset_global_init(void) {
Craig Tiller253bd502016-02-25 12:30:23 -0800782 gpr_tls_init(&g_current_thread_poller);
783 gpr_tls_init(&g_current_thread_worker);
Craig Tillerb4b8e1e2016-11-28 07:33:13 -0800784 return grpc_wakeup_fd_init(&global_wakeup_fd);
Craig Tiller253bd502016-02-25 12:30:23 -0800785}
786
787static void pollset_global_shutdown(void) {
Craig Tillerb4b8e1e2016-11-28 07:33:13 -0800788 grpc_wakeup_fd_destroy(&global_wakeup_fd);
Craig Tiller253bd502016-02-25 12:30:23 -0800789 gpr_tls_destroy(&g_current_thread_poller);
790 gpr_tls_destroy(&g_current_thread_worker);
Craig Tiller253bd502016-02-25 12:30:23 -0800791}
792
Craig Tillerd6a5b802016-05-13 20:17:38 -0700793static grpc_error *kick_poller(void) {
Craig Tillerb4b8e1e2016-11-28 07:33:13 -0800794 return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
Craig Tillerd6a5b802016-05-13 20:17:38 -0700795}
Craig Tiller253bd502016-02-25 12:30:23 -0800796
797/* main interface */
798
Craig Tillerac04b7f2016-02-26 08:36:44 -0800799static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
800 gpr_mu_init(&pollset->mu);
801 *mu = &pollset->mu;
Craig Tiller253bd502016-02-25 12:30:23 -0800802 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
Craig Tiller253bd502016-02-25 12:30:23 -0800803 pollset->shutting_down = 0;
804 pollset->called_shutdown = 0;
805 pollset->kicked_without_pollers = 0;
806 pollset->idle_jobs.head = pollset->idle_jobs.tail = NULL;
807 pollset->local_wakeup_cache = NULL;
808 pollset->kicked_without_pollers = 0;
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800809 pollset->fd_count = 0;
Craig Tillerb38197e2016-02-26 10:14:54 -0800810 pollset->fd_capacity = 0;
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800811 pollset->fds = NULL;
Craig Tiller22543592017-02-14 10:29:36 -0800812 pollset->pollset_set_count = 0;
Craig Tiller253bd502016-02-25 12:30:23 -0800813}
814
815static void pollset_destroy(grpc_pollset *pollset) {
Craig Tiller253bd502016-02-25 12:30:23 -0800816 GPR_ASSERT(!pollset_has_workers(pollset));
817 GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail);
Craig Tiller253bd502016-02-25 12:30:23 -0800818 while (pollset->local_wakeup_cache) {
819 grpc_cached_wakeup_fd *next = pollset->local_wakeup_cache->next;
820 grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd);
821 gpr_free(pollset->local_wakeup_cache);
822 pollset->local_wakeup_cache = next;
823 }
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800824 gpr_free(pollset->fds);
Craig Tillerac04b7f2016-02-26 08:36:44 -0800825 gpr_mu_destroy(&pollset->mu);
Craig Tiller253bd502016-02-25 12:30:23 -0800826}
827
Craig Tiller253bd502016-02-25 12:30:23 -0800828static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
829 grpc_fd *fd) {
Craig Tillerac04b7f2016-02-26 08:36:44 -0800830 gpr_mu_lock(&pollset->mu);
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800831 size_t i;
832 /* TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here */
833 for (i = 0; i < pollset->fd_count; i++) {
834 if (pollset->fds[i] == fd) goto exit;
835 }
836 if (pollset->fd_count == pollset->fd_capacity) {
837 pollset->fd_capacity =
838 GPR_MAX(pollset->fd_capacity + 8, pollset->fd_count * 3 / 2);
839 pollset->fds =
840 gpr_realloc(pollset->fds, sizeof(grpc_fd *) * pollset->fd_capacity);
841 }
842 pollset->fds[pollset->fd_count++] = fd;
843 GRPC_FD_REF(fd, "multipoller");
Craig Tiller4c2218e2016-05-11 10:27:08 -0700844 pollset_kick(pollset, NULL);
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800845exit:
Craig Tillerac04b7f2016-02-26 08:36:44 -0800846 gpr_mu_unlock(&pollset->mu);
Craig Tiller253bd502016-02-25 12:30:23 -0800847}
848
849static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
850 GPR_ASSERT(grpc_closure_list_empty(pollset->idle_jobs));
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800851 size_t i;
852 for (i = 0; i < pollset->fd_count; i++) {
853 GRPC_FD_UNREF(pollset->fds[i], "multipoller");
854 }
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800855 pollset->fd_count = 0;
Craig Tiller91031da2016-12-28 15:44:25 -0800856 grpc_closure_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE);
Craig Tiller253bd502016-02-25 12:30:23 -0800857}
858
Craig Tillerd6a5b802016-05-13 20:17:38 -0700859static void work_combine_error(grpc_error **composite, grpc_error *error) {
860 if (error == GRPC_ERROR_NONE) return;
861 if (*composite == GRPC_ERROR_NONE) {
862 *composite = GRPC_ERROR_CREATE("pollset_work");
863 }
864 *composite = grpc_error_add_child(*composite, error);
865}
866
867static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
868 grpc_pollset_worker **worker_hdl,
869 gpr_timespec now, gpr_timespec deadline) {
Craig Tiller253bd502016-02-25 12:30:23 -0800870 grpc_pollset_worker worker;
871 *worker_hdl = &worker;
Craig Tillerd6a5b802016-05-13 20:17:38 -0700872 grpc_error *error = GRPC_ERROR_NONE;
Craig Tiller253bd502016-02-25 12:30:23 -0800873
Sree Kuchibhotla69b74782016-07-14 21:23:22 -0700874 /* Avoid malloc for small number of elements. */
875 enum { inline_elements = 96 };
876 struct pollfd pollfd_space[inline_elements];
877 struct grpc_fd_watcher watcher_space[inline_elements];
878
Craig Tiller253bd502016-02-25 12:30:23 -0800879 /* pollset->mu already held */
880 int added_worker = 0;
881 int locked = 1;
882 int queued_work = 0;
883 int keep_polling = 0;
884 GPR_TIMER_BEGIN("pollset_work", 0);
885 /* this must happen before we (potentially) drop pollset->mu */
886 worker.next = worker.prev = NULL;
887 worker.reevaluate_polling_on_wakeup = 0;
888 if (pollset->local_wakeup_cache != NULL) {
889 worker.wakeup_fd = pollset->local_wakeup_cache;
890 pollset->local_wakeup_cache = worker.wakeup_fd->next;
891 } else {
892 worker.wakeup_fd = gpr_malloc(sizeof(*worker.wakeup_fd));
Craig Tillerd6a5b802016-05-13 20:17:38 -0700893 error = grpc_wakeup_fd_init(&worker.wakeup_fd->fd);
894 if (error != GRPC_ERROR_NONE) {
Craig Tiller449c64b2016-06-13 16:26:50 -0700895 GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
Craig Tillerd6a5b802016-05-13 20:17:38 -0700896 return error;
897 }
Craig Tiller253bd502016-02-25 12:30:23 -0800898 }
899 worker.kicked_specifically = 0;
900 /* If there's work waiting for the pollset to be idle, and the
901 pollset is idle, then do that work */
902 if (!pollset_has_workers(pollset) &&
903 !grpc_closure_list_empty(pollset->idle_jobs)) {
904 GPR_TIMER_MARK("pollset_work.idle_jobs", 0);
Craig Tiller91031da2016-12-28 15:44:25 -0800905 grpc_closure_list_sched(exec_ctx, &pollset->idle_jobs);
Craig Tiller253bd502016-02-25 12:30:23 -0800906 goto done;
907 }
908 /* If we're shutting down then we don't execute any extended work */
909 if (pollset->shutting_down) {
910 GPR_TIMER_MARK("pollset_work.shutting_down", 0);
911 goto done;
912 }
Craig Tiller253bd502016-02-25 12:30:23 -0800913 /* Start polling, and keep doing so while we're being asked to
914 re-evaluate our pollers (this allows poll() based pollers to
915 ensure they don't miss wakeups) */
916 keep_polling = 1;
Craig Tiller556e5ae2016-05-16 11:00:33 -0700917 gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset);
Craig Tiller253bd502016-02-25 12:30:23 -0800918 while (keep_polling) {
919 keep_polling = 0;
920 if (!pollset->kicked_without_pollers) {
921 if (!added_worker) {
922 push_front_worker(pollset, &worker);
923 added_worker = 1;
924 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
925 }
Craig Tiller253bd502016-02-25 12:30:23 -0800926 GPR_TIMER_BEGIN("maybe_work_and_unlock", 0);
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800927#define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR)
928#define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR)
929
930 int timeout;
931 int r;
Craig Tillerae09d9d2016-05-20 22:23:37 -0700932 size_t i, fd_count;
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800933 nfds_t pfd_count;
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800934 grpc_fd_watcher *watchers;
935 struct pollfd *pfds;
936
937 timeout = poll_deadline_to_millis_timeout(deadline, now);
Sree Kuchibhotla69b74782016-07-14 21:23:22 -0700938
939 if (pollset->fd_count + 2 <= inline_elements) {
940 pfds = pollfd_space;
941 watchers = watcher_space;
942 } else {
943 /* Allocate one buffer to hold both pfds and watchers arrays */
944 const size_t pfd_size = sizeof(*pfds) * (pollset->fd_count + 2);
945 const size_t watch_size = sizeof(*watchers) * (pollset->fd_count + 2);
946 void *buf = gpr_malloc(pfd_size + watch_size);
947 pfds = buf;
948 watchers = (void *)((char *)buf + pfd_size);
949 }
950
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800951 fd_count = 0;
952 pfd_count = 2;
Craig Tillerb4b8e1e2016-11-28 07:33:13 -0800953 pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&global_wakeup_fd);
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800954 pfds[0].events = POLLIN;
955 pfds[0].revents = 0;
956 pfds[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker.wakeup_fd->fd);
957 pfds[1].events = POLLIN;
958 pfds[1].revents = 0;
959 for (i = 0; i < pollset->fd_count; i++) {
Craig Tillerae09d9d2016-05-20 22:23:37 -0700960 if (fd_is_orphaned(pollset->fds[i])) {
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800961 GRPC_FD_UNREF(pollset->fds[i], "multipoller");
962 } else {
963 pollset->fds[fd_count++] = pollset->fds[i];
964 watchers[pfd_count].fd = pollset->fds[i];
Craig Tiller2fad50d2016-03-08 07:52:42 -0800965 GRPC_FD_REF(watchers[pfd_count].fd, "multipoller_start");
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800966 pfds[pfd_count].fd = pollset->fds[i]->fd;
967 pfds[pfd_count].revents = 0;
968 pfd_count++;
969 }
970 }
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800971 pollset->fd_count = fd_count;
Craig Tillerac04b7f2016-02-26 08:36:44 -0800972 gpr_mu_unlock(&pollset->mu);
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800973
974 for (i = 2; i < pfd_count; i++) {
Craig Tiller2fad50d2016-03-08 07:52:42 -0800975 grpc_fd *fd = watchers[i].fd;
976 pfds[i].events = (short)fd_begin_poll(fd, pollset, &worker, POLLIN,
Craig Tillerd9a60bb2016-03-28 23:13:19 -0700977 POLLOUT, &watchers[i]);
Craig Tiller2fad50d2016-03-08 07:52:42 -0800978 GRPC_FD_UNREF(fd, "multipoller_start");
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800979 }
980
981 /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
982 even going into the blocking annotation if possible */
983 GRPC_SCHEDULING_START_BLOCKING_REGION;
984 r = grpc_poll_function(pfds, pfd_count, timeout);
985 GRPC_SCHEDULING_END_BLOCKING_REGION;
986
987 if (r < 0) {
988 if (errno != EINTR) {
Craig Tillerd6a5b802016-05-13 20:17:38 -0700989 work_combine_error(&error, GRPC_OS_ERROR(errno, "poll"));
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800990 }
kpayson64668a8422016-10-10 01:40:09 -0700991
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800992 for (i = 2; i < pfd_count; i++) {
kpayson64668a8422016-10-10 01:40:09 -0700993 if (watchers[i].fd == NULL) {
994 fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL);
995 } else {
996 // Wake up all the file descriptors, if we have an invalid one
997 // we can identify it on the next pollset_work()
Ken Payson7347ad82016-10-11 11:12:29 -0700998 fd_end_poll(exec_ctx, &watchers[i], 1, 1, pollset);
kpayson64668a8422016-10-10 01:40:09 -0700999 }
Craig Tiller7ac6bf02016-02-25 12:54:59 -08001000 }
1001 } else if (r == 0) {
1002 for (i = 2; i < pfd_count; i++) {
Craig Tillerb1d3b362016-05-14 13:20:21 -07001003 fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL);
Craig Tiller7ac6bf02016-02-25 12:54:59 -08001004 }
1005 } else {
1006 if (pfds[0].revents & POLLIN_CHECK) {
Craig Tiller1fa9ddb2016-11-28 08:19:37 -08001007 work_combine_error(&error,
1008 grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd));
Craig Tiller7ac6bf02016-02-25 12:54:59 -08001009 }
1010 if (pfds[1].revents & POLLIN_CHECK) {
Craig Tillerd6a5b802016-05-13 20:17:38 -07001011 work_combine_error(
1012 &error, grpc_wakeup_fd_consume_wakeup(&worker.wakeup_fd->fd));
Craig Tiller7ac6bf02016-02-25 12:54:59 -08001013 }
1014 for (i = 2; i < pfd_count; i++) {
1015 if (watchers[i].fd == NULL) {
Craig Tillerb1d3b362016-05-14 13:20:21 -07001016 fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL);
Craig Tiller9d861202016-05-11 10:12:57 -07001017 } else {
1018 fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN_CHECK,
Craig Tillerb1d3b362016-05-14 13:20:21 -07001019 pfds[i].revents & POLLOUT_CHECK, pollset);
Craig Tiller7ac6bf02016-02-25 12:54:59 -08001020 }
Craig Tiller7ac6bf02016-02-25 12:54:59 -08001021 }
1022 }
1023
Sree Kuchibhotla69b74782016-07-14 21:23:22 -07001024 if (pfds != pollfd_space) {
1025 /* pfds and watchers are in the same memory block pointed to by pfds */
1026 gpr_free(pfds);
1027 }
1028
Craig Tiller253bd502016-02-25 12:30:23 -08001029 GPR_TIMER_END("maybe_work_and_unlock", 0);
1030 locked = 0;
Craig Tiller253bd502016-02-25 12:30:23 -08001031 } else {
1032 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1033 pollset->kicked_without_pollers = 0;
1034 }
1035 /* Finished execution - start cleaning up.
1036 Note that we may arrive here from outside the enclosing while() loop.
1037 In that case we won't loop though as we haven't added worker to the
1038 worker list, which means nobody could ask us to re-evaluate polling). */
1039 done:
1040 if (!locked) {
1041 queued_work |= grpc_exec_ctx_flush(exec_ctx);
Craig Tillerac04b7f2016-02-26 08:36:44 -08001042 gpr_mu_lock(&pollset->mu);
Craig Tiller253bd502016-02-25 12:30:23 -08001043 locked = 1;
1044 }
1045 /* If we're forced to re-evaluate polling (via pollset_kick with
1046 GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force
1047 a loop */
Craig Tillerd6a5b802016-05-13 20:17:38 -07001048 if (worker.reevaluate_polling_on_wakeup && error == GRPC_ERROR_NONE) {
Craig Tiller253bd502016-02-25 12:30:23 -08001049 worker.reevaluate_polling_on_wakeup = 0;
1050 pollset->kicked_without_pollers = 0;
1051 if (queued_work || worker.kicked_specifically) {
1052 /* If there's queued work on the list, then set the deadline to be
1053 immediate so we get back out of the polling loop quickly */
1054 deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC);
1055 }
1056 keep_polling = 1;
1057 }
Craig Tiller9d861202016-05-11 10:12:57 -07001058 if (keep_polling) {
1059 now = gpr_now(now.clock_type);
1060 }
Craig Tiller253bd502016-02-25 12:30:23 -08001061 }
Craig Tiller556e5ae2016-05-16 11:00:33 -07001062 gpr_tls_set(&g_current_thread_poller, 0);
Craig Tiller253bd502016-02-25 12:30:23 -08001063 if (added_worker) {
1064 remove_worker(pollset, &worker);
1065 gpr_tls_set(&g_current_thread_worker, 0);
1066 }
1067 /* release wakeup fd to the local pool */
1068 worker.wakeup_fd->next = pollset->local_wakeup_cache;
1069 pollset->local_wakeup_cache = worker.wakeup_fd;
1070 /* check shutdown conditions */
1071 if (pollset->shutting_down) {
1072 if (pollset_has_workers(pollset)) {
1073 pollset_kick(pollset, NULL);
Craig Tiller22543592017-02-14 10:29:36 -08001074 } else if (!pollset->called_shutdown && !pollset_has_observers(pollset)) {
Craig Tiller253bd502016-02-25 12:30:23 -08001075 pollset->called_shutdown = 1;
Craig Tillerac04b7f2016-02-26 08:36:44 -08001076 gpr_mu_unlock(&pollset->mu);
Craig Tiller253bd502016-02-25 12:30:23 -08001077 finish_shutdown(exec_ctx, pollset);
1078 grpc_exec_ctx_flush(exec_ctx);
1079 /* Continuing to access pollset here is safe -- it is the caller's
1080 * responsibility to not destroy when it has outstanding calls to
1081 * pollset_work.
1082 * TODO(dklempner): Can we refactor the shutdown logic to avoid this? */
Craig Tillerac04b7f2016-02-26 08:36:44 -08001083 gpr_mu_lock(&pollset->mu);
Craig Tiller253bd502016-02-25 12:30:23 -08001084 } else if (!grpc_closure_list_empty(pollset->idle_jobs)) {
Craig Tiller91031da2016-12-28 15:44:25 -08001085 grpc_closure_list_sched(exec_ctx, &pollset->idle_jobs);
Craig Tillerac04b7f2016-02-26 08:36:44 -08001086 gpr_mu_unlock(&pollset->mu);
Craig Tiller253bd502016-02-25 12:30:23 -08001087 grpc_exec_ctx_flush(exec_ctx);
Craig Tillerac04b7f2016-02-26 08:36:44 -08001088 gpr_mu_lock(&pollset->mu);
Craig Tiller253bd502016-02-25 12:30:23 -08001089 }
1090 }
1091 *worker_hdl = NULL;
1092 GPR_TIMER_END("pollset_work", 0);
Craig Tiller449c64b2016-06-13 16:26:50 -07001093 GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
Craig Tillerd6a5b802016-05-13 20:17:38 -07001094 return error;
Craig Tiller253bd502016-02-25 12:30:23 -08001095}
1096
1097static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1098 grpc_closure *closure) {
1099 GPR_ASSERT(!pollset->shutting_down);
1100 pollset->shutting_down = 1;
1101 pollset->shutdown_done = closure;
1102 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1103 if (!pollset_has_workers(pollset)) {
Craig Tiller91031da2016-12-28 15:44:25 -08001104 grpc_closure_list_sched(exec_ctx, &pollset->idle_jobs);
Craig Tiller253bd502016-02-25 12:30:23 -08001105 }
Craig Tiller22543592017-02-14 10:29:36 -08001106 if (!pollset->called_shutdown && !pollset_has_observers(pollset)) {
Craig Tiller253bd502016-02-25 12:30:23 -08001107 pollset->called_shutdown = 1;
1108 finish_shutdown(exec_ctx, pollset);
1109 }
1110}
1111
1112static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
1113 gpr_timespec now) {
1114 gpr_timespec timeout;
1115 static const int64_t max_spin_polling_us = 10;
1116 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
1117 return -1;
1118 }
1119 if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
1120 max_spin_polling_us,
1121 GPR_TIMESPAN))) <= 0) {
1122 return 0;
1123 }
1124 timeout = gpr_time_sub(deadline, now);
1125 return gpr_time_to_millis(gpr_time_add(
1126 timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
1127}
1128
Craig Tiller253bd502016-02-25 12:30:23 -08001129/*******************************************************************************
1130 * pollset_set_posix.c
1131 */
1132
1133static grpc_pollset_set *pollset_set_create(void) {
1134 grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
1135 memset(pollset_set, 0, sizeof(*pollset_set));
1136 gpr_mu_init(&pollset_set->mu);
1137 return pollset_set;
1138}
1139
Craig Tiller9e5ac1b2017-02-14 22:25:50 -08001140static void pollset_set_destroy(grpc_exec_ctx *exec_ctx,
1141 grpc_pollset_set *pollset_set) {
Craig Tiller253bd502016-02-25 12:30:23 -08001142 size_t i;
1143 gpr_mu_destroy(&pollset_set->mu);
1144 for (i = 0; i < pollset_set->fd_count; i++) {
1145 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1146 }
Craig Tiller9e5ac1b2017-02-14 22:25:50 -08001147 for (i = 0; i < pollset_set->pollset_count; i++) {
1148 grpc_pollset *pollset = pollset_set->pollsets[i];
1149 gpr_mu_lock(&pollset->mu);
1150 pollset->pollset_set_count--;
1151 /* check shutdown */
1152 if (pollset->shutting_down && !pollset->called_shutdown &&
1153 !pollset_has_observers(pollset)) {
1154 pollset->called_shutdown = 1;
1155 gpr_mu_unlock(&pollset->mu);
1156 finish_shutdown(exec_ctx, pollset);
1157 } else {
1158 gpr_mu_unlock(&pollset->mu);
1159 }
1160 }
Craig Tiller253bd502016-02-25 12:30:23 -08001161 gpr_free(pollset_set->pollsets);
1162 gpr_free(pollset_set->pollset_sets);
1163 gpr_free(pollset_set->fds);
1164 gpr_free(pollset_set);
1165}
1166
1167static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
1168 grpc_pollset_set *pollset_set,
1169 grpc_pollset *pollset) {
1170 size_t i, j;
Craig Tiller22543592017-02-14 10:29:36 -08001171 gpr_mu_lock(&pollset->mu);
1172 pollset->pollset_set_count++;
1173 gpr_mu_unlock(&pollset->mu);
Craig Tiller253bd502016-02-25 12:30:23 -08001174 gpr_mu_lock(&pollset_set->mu);
1175 if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
1176 pollset_set->pollset_capacity =
1177 GPR_MAX(8, 2 * pollset_set->pollset_capacity);
Craig Tiller9e5ac1b2017-02-14 22:25:50 -08001178 pollset_set->pollsets =
1179 gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity *
1180 sizeof(*pollset_set->pollsets));
Craig Tiller253bd502016-02-25 12:30:23 -08001181 }
1182 pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
1183 for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
1184 if (fd_is_orphaned(pollset_set->fds[i])) {
1185 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1186 } else {
1187 pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
1188 pollset_set->fds[j++] = pollset_set->fds[i];
1189 }
1190 }
1191 pollset_set->fd_count = j;
1192 gpr_mu_unlock(&pollset_set->mu);
1193}
1194
1195static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
1196 grpc_pollset_set *pollset_set,
1197 grpc_pollset *pollset) {
1198 size_t i;
1199 gpr_mu_lock(&pollset_set->mu);
1200 for (i = 0; i < pollset_set->pollset_count; i++) {
1201 if (pollset_set->pollsets[i] == pollset) {
1202 pollset_set->pollset_count--;
1203 GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i],
1204 pollset_set->pollsets[pollset_set->pollset_count]);
1205 break;
1206 }
1207 }
1208 gpr_mu_unlock(&pollset_set->mu);
Craig Tiller22543592017-02-14 10:29:36 -08001209 gpr_mu_lock(&pollset->mu);
1210 pollset->pollset_set_count--;
1211 /* check shutdown */
1212 if (pollset->shutting_down && !pollset->called_shutdown &&
1213 !pollset_has_observers(pollset)) {
1214 pollset->called_shutdown = 1;
1215 gpr_mu_unlock(&pollset->mu);
1216 finish_shutdown(exec_ctx, pollset);
1217 } else {
1218 gpr_mu_unlock(&pollset->mu);
1219 }
Craig Tiller253bd502016-02-25 12:30:23 -08001220}
1221
1222static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
1223 grpc_pollset_set *bag,
1224 grpc_pollset_set *item) {
1225 size_t i, j;
1226 gpr_mu_lock(&bag->mu);
1227 if (bag->pollset_set_count == bag->pollset_set_capacity) {
1228 bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
1229 bag->pollset_sets =
1230 gpr_realloc(bag->pollset_sets,
1231 bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
1232 }
1233 bag->pollset_sets[bag->pollset_set_count++] = item;
1234 for (i = 0, j = 0; i < bag->fd_count; i++) {
1235 if (fd_is_orphaned(bag->fds[i])) {
1236 GRPC_FD_UNREF(bag->fds[i], "pollset_set");
1237 } else {
1238 pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
1239 bag->fds[j++] = bag->fds[i];
1240 }
1241 }
1242 bag->fd_count = j;
1243 gpr_mu_unlock(&bag->mu);
1244}
1245
1246static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
1247 grpc_pollset_set *bag,
1248 grpc_pollset_set *item) {
1249 size_t i;
1250 gpr_mu_lock(&bag->mu);
1251 for (i = 0; i < bag->pollset_set_count; i++) {
1252 if (bag->pollset_sets[i] == item) {
1253 bag->pollset_set_count--;
1254 GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i],
1255 bag->pollset_sets[bag->pollset_set_count]);
1256 break;
1257 }
1258 }
1259 gpr_mu_unlock(&bag->mu);
1260}
1261
1262static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
1263 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1264 size_t i;
1265 gpr_mu_lock(&pollset_set->mu);
1266 if (pollset_set->fd_count == pollset_set->fd_capacity) {
1267 pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
1268 pollset_set->fds = gpr_realloc(
1269 pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds));
1270 }
1271 GRPC_FD_REF(fd, "pollset_set");
1272 pollset_set->fds[pollset_set->fd_count++] = fd;
1273 for (i = 0; i < pollset_set->pollset_count; i++) {
1274 pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd);
1275 }
1276 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1277 pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1278 }
1279 gpr_mu_unlock(&pollset_set->mu);
1280}
1281
1282static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
1283 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1284 size_t i;
1285 gpr_mu_lock(&pollset_set->mu);
1286 for (i = 0; i < pollset_set->fd_count; i++) {
1287 if (pollset_set->fds[i] == fd) {
1288 pollset_set->fd_count--;
1289 GPR_SWAP(grpc_fd *, pollset_set->fds[i],
1290 pollset_set->fds[pollset_set->fd_count]);
1291 GRPC_FD_UNREF(fd, "pollset_set");
1292 break;
1293 }
1294 }
1295 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1296 pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1297 }
1298 gpr_mu_unlock(&pollset_set->mu);
1299}
1300
1301/*******************************************************************************
Craig Tillerd8a3c042016-09-09 12:42:37 -07001302 * workqueue stubs
1303 */
1304
1305#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
1306static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
1307 const char *file, int line,
1308 const char *reason) {
1309 return workqueue;
1310}
1311static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
1312 const char *file, int line, const char *reason) {}
1313#else
1314static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
1315 return workqueue;
1316}
1317static void workqueue_unref(grpc_exec_ctx *exec_ctx,
1318 grpc_workqueue *workqueue) {}
1319#endif
1320
Craig Tiller91031da2016-12-28 15:44:25 -08001321static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) {
1322 return grpc_schedule_on_exec_ctx;
Craig Tillerd8a3c042016-09-09 12:42:37 -07001323}
1324
1325/*******************************************************************************
Ken Payson82e4ec72016-10-13 12:26:01 -07001326 * Condition Variable polling extensions
1327 */
1328
1329static void decref_poll_args(poll_args *args) {
1330 if (gpr_unref(&args->refcount)) {
1331 gpr_free(args->fds);
1332 gpr_cv_destroy(args->cv);
1333 gpr_free(args->cv);
1334 gpr_free(args);
1335 }
1336}
1337
1338// Poll in a background thread
1339static void run_poll(void *arg) {
1340 int timeout, retval;
1341 poll_args *pargs = (poll_args *)arg;
Ken Paysonb97cbd72016-10-13 15:06:01 -07001342 while (gpr_atm_no_barrier_load(&pargs->status) == INPROGRESS) {
Ken Payson82e4ec72016-10-13 12:26:01 -07001343 if (pargs->timeout < 0) {
1344 timeout = CV_POLL_PERIOD_MS;
1345 } else {
1346 timeout = GPR_MIN(CV_POLL_PERIOD_MS, pargs->timeout);
1347 pargs->timeout -= timeout;
1348 }
1349 retval = g_cvfds.poll(pargs->fds, pargs->nfds, timeout);
1350 if (retval != 0 || pargs->timeout == 0) {
1351 pargs->retval = retval;
1352 pargs->err = errno;
1353 break;
1354 }
1355 }
1356 gpr_mu_lock(&g_cvfds.mu);
Ken Paysonb97cbd72016-10-13 15:06:01 -07001357 if (gpr_atm_no_barrier_load(&pargs->status) == INPROGRESS) {
Ken Payson82e4ec72016-10-13 12:26:01 -07001358 // Signal main thread that the poll completed
Ken Paysonb97cbd72016-10-13 15:06:01 -07001359 gpr_atm_no_barrier_store(&pargs->status, COMPLETED);
Ken Payson82e4ec72016-10-13 12:26:01 -07001360 gpr_cv_signal(pargs->cv);
1361 }
1362 decref_poll_args(pargs);
1363 g_cvfds.pollcount--;
1364 if (g_cvfds.shutdown && g_cvfds.pollcount == 0) {
1365 gpr_cv_signal(&g_cvfds.shutdown_complete);
1366 }
1367 gpr_mu_unlock(&g_cvfds.mu);
1368}
1369
1370// This function overrides poll() to handle condition variable wakeup fds
1371static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) {
1372 unsigned int i;
1373 int res, idx;
1374 gpr_cv *pollcv;
1375 cv_node *cvn, *prev;
Ken Payson42909c52016-11-06 20:06:12 -08001376 int skip_poll = 0;
Ken Payson82e4ec72016-10-13 12:26:01 -07001377 nfds_t nsockfds = 0;
1378 gpr_thd_id t_id;
1379 gpr_thd_options opt;
1380 poll_args *pargs = NULL;
1381 gpr_mu_lock(&g_cvfds.mu);
1382 pollcv = gpr_malloc(sizeof(gpr_cv));
1383 gpr_cv_init(pollcv);
1384 for (i = 0; i < nfds; i++) {
1385 fds[i].revents = 0;
1386 if (fds[i].fd < 0 && (fds[i].events & POLLIN)) {
1387 idx = FD_TO_IDX(fds[i].fd);
1388 cvn = gpr_malloc(sizeof(cv_node));
1389 cvn->cv = pollcv;
1390 cvn->next = g_cvfds.cvfds[idx].cvs;
1391 g_cvfds.cvfds[idx].cvs = cvn;
Ken Payson42909c52016-11-06 20:06:12 -08001392 // Don't bother polling if a wakeup fd is ready
Ken Payson82e4ec72016-10-13 12:26:01 -07001393 if (g_cvfds.cvfds[idx].is_set) {
yang-gd255a722016-11-23 13:10:44 -08001394 skip_poll = 1;
Ken Payson82e4ec72016-10-13 12:26:01 -07001395 }
1396 } else if (fds[i].fd >= 0) {
1397 nsockfds++;
1398 }
1399 }
1400
Ken Payson42909c52016-11-06 20:06:12 -08001401 res = 0;
1402 if (!skip_poll && nsockfds > 0) {
Ken Payson82e4ec72016-10-13 12:26:01 -07001403 pargs = gpr_malloc(sizeof(struct poll_args));
1404 // Both the main thread and calling thread get a reference
1405 gpr_ref_init(&pargs->refcount, 2);
1406 pargs->cv = pollcv;
1407 pargs->fds = gpr_malloc(sizeof(struct pollfd) * nsockfds);
1408 pargs->nfds = nsockfds;
1409 pargs->timeout = timeout;
1410 pargs->retval = 0;
1411 pargs->err = 0;
Ken Paysonb97cbd72016-10-13 15:06:01 -07001412 gpr_atm_no_barrier_store(&pargs->status, INPROGRESS);
Ken Payson82e4ec72016-10-13 12:26:01 -07001413 idx = 0;
1414 for (i = 0; i < nfds; i++) {
1415 if (fds[i].fd >= 0) {
1416 pargs->fds[idx].fd = fds[i].fd;
1417 pargs->fds[idx].events = fds[i].events;
1418 pargs->fds[idx].revents = 0;
1419 idx++;
1420 }
1421 }
1422 g_cvfds.pollcount++;
1423 opt = gpr_thd_options_default();
1424 gpr_thd_options_set_detached(&opt);
1425 gpr_thd_new(&t_id, &run_poll, pargs, &opt);
1426 // We want the poll() thread to trigger the deadline, so wait forever here
1427 gpr_cv_wait(pollcv, &g_cvfds.mu, gpr_inf_future(GPR_CLOCK_MONOTONIC));
Ken Paysonb97cbd72016-10-13 15:06:01 -07001428 if (gpr_atm_no_barrier_load(&pargs->status) == COMPLETED) {
Ken Payson82e4ec72016-10-13 12:26:01 -07001429 res = pargs->retval;
1430 errno = pargs->err;
1431 } else {
Ken Payson82e4ec72016-10-13 12:26:01 -07001432 errno = 0;
Ken Paysonb97cbd72016-10-13 15:06:01 -07001433 gpr_atm_no_barrier_store(&pargs->status, CANCELLED);
Ken Payson82e4ec72016-10-13 12:26:01 -07001434 }
Ken Payson42909c52016-11-06 20:06:12 -08001435 } else if (!skip_poll) {
Ken Payson82e4ec72016-10-13 12:26:01 -07001436 gpr_timespec deadline = gpr_now(GPR_CLOCK_REALTIME);
1437 deadline =
1438 gpr_time_add(deadline, gpr_time_from_millis(timeout, GPR_TIMESPAN));
1439 gpr_cv_wait(pollcv, &g_cvfds.mu, deadline);
Ken Payson82e4ec72016-10-13 12:26:01 -07001440 }
1441
1442 idx = 0;
1443 for (i = 0; i < nfds; i++) {
1444 if (fds[i].fd < 0 && (fds[i].events & POLLIN)) {
1445 cvn = g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs;
1446 prev = NULL;
1447 while (cvn->cv != pollcv) {
1448 prev = cvn;
1449 cvn = cvn->next;
1450 GPR_ASSERT(cvn);
1451 }
1452 if (!prev) {
1453 g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs = cvn->next;
1454 } else {
1455 prev->next = cvn->next;
1456 }
1457 gpr_free(cvn);
1458
1459 if (g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].is_set) {
1460 fds[i].revents = POLLIN;
1461 if (res >= 0) res++;
1462 }
Ken Payson42909c52016-11-06 20:06:12 -08001463 } else if (!skip_poll && fds[i].fd >= 0 &&
Ken Paysonb97cbd72016-10-13 15:06:01 -07001464 gpr_atm_no_barrier_load(&pargs->status) == COMPLETED) {
Ken Payson82e4ec72016-10-13 12:26:01 -07001465 fds[i].revents = pargs->fds[idx].revents;
1466 idx++;
1467 }
1468 }
1469
1470 if (pargs) {
1471 decref_poll_args(pargs);
1472 } else {
1473 gpr_cv_destroy(pollcv);
1474 gpr_free(pollcv);
1475 }
1476 gpr_mu_unlock(&g_cvfds.mu);
1477
1478 return res;
1479}
1480
1481static void global_cv_fd_table_init() {
1482 gpr_mu_init(&g_cvfds.mu);
1483 gpr_mu_lock(&g_cvfds.mu);
1484 gpr_cv_init(&g_cvfds.shutdown_complete);
1485 g_cvfds.shutdown = 0;
1486 g_cvfds.pollcount = 0;
1487 g_cvfds.size = CV_DEFAULT_TABLE_SIZE;
1488 g_cvfds.cvfds = gpr_malloc(sizeof(fd_node) * CV_DEFAULT_TABLE_SIZE);
1489 g_cvfds.free_fds = NULL;
1490 for (int i = 0; i < CV_DEFAULT_TABLE_SIZE; i++) {
1491 g_cvfds.cvfds[i].is_set = 0;
1492 g_cvfds.cvfds[i].cvs = NULL;
1493 g_cvfds.cvfds[i].next_free = g_cvfds.free_fds;
1494 g_cvfds.free_fds = &g_cvfds.cvfds[i];
1495 }
1496 // Override the poll function with one that supports cvfds
1497 g_cvfds.poll = grpc_poll_function;
1498 grpc_poll_function = &cvfd_poll;
1499 gpr_mu_unlock(&g_cvfds.mu);
1500}
1501
1502static void global_cv_fd_table_shutdown() {
1503 gpr_mu_lock(&g_cvfds.mu);
1504 g_cvfds.shutdown = 1;
1505 // Attempt to wait for all abandoned poll() threads to terminate
1506 // Not doing so will result in reported memory leaks
1507 if (g_cvfds.pollcount > 0) {
1508 int res = gpr_cv_wait(&g_cvfds.shutdown_complete, &g_cvfds.mu,
1509 gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
1510 gpr_time_from_seconds(3, GPR_TIMESPAN)));
1511 GPR_ASSERT(res == 0);
1512 }
1513 gpr_cv_destroy(&g_cvfds.shutdown_complete);
1514 grpc_poll_function = g_cvfds.poll;
1515 gpr_free(g_cvfds.cvfds);
1516 gpr_mu_unlock(&g_cvfds.mu);
1517 gpr_mu_destroy(&g_cvfds.mu);
1518}
1519
1520/*******************************************************************************
Craig Tiller253bd502016-02-25 12:30:23 -08001521 * event engine binding
1522 */
1523
Ken Payson82e4ec72016-10-13 12:26:01 -07001524static void shutdown_engine(void) {
1525 pollset_global_shutdown();
1526 if (grpc_cv_wakeup_fds_enabled()) {
1527 global_cv_fd_table_shutdown();
1528 }
1529}
Craig Tiller253bd502016-02-25 12:30:23 -08001530
1531static const grpc_event_engine_vtable vtable = {
1532 .pollset_size = sizeof(grpc_pollset),
1533
1534 .fd_create = fd_create,
1535 .fd_wrapped_fd = fd_wrapped_fd,
1536 .fd_orphan = fd_orphan,
1537 .fd_shutdown = fd_shutdown,
Craig Tiller52f23122016-06-15 09:34:14 -07001538 .fd_is_shutdown = fd_is_shutdown,
Craig Tiller253bd502016-02-25 12:30:23 -08001539 .fd_notify_on_read = fd_notify_on_read,
1540 .fd_notify_on_write = fd_notify_on_write,
Craig Tillerb1d3b362016-05-14 13:20:21 -07001541 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
Craig Tiller70bd4832016-06-30 14:20:46 -07001542 .fd_get_workqueue = fd_get_workqueue,
Craig Tiller253bd502016-02-25 12:30:23 -08001543
1544 .pollset_init = pollset_init,
1545 .pollset_shutdown = pollset_shutdown,
Craig Tiller253bd502016-02-25 12:30:23 -08001546 .pollset_destroy = pollset_destroy,
1547 .pollset_work = pollset_work,
1548 .pollset_kick = pollset_kick,
1549 .pollset_add_fd = pollset_add_fd,
1550
1551 .pollset_set_create = pollset_set_create,
1552 .pollset_set_destroy = pollset_set_destroy,
1553 .pollset_set_add_pollset = pollset_set_add_pollset,
1554 .pollset_set_del_pollset = pollset_set_del_pollset,
1555 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
1556 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
1557 .pollset_set_add_fd = pollset_set_add_fd,
1558 .pollset_set_del_fd = pollset_set_del_fd,
1559
1560 .kick_poller = kick_poller,
1561
Craig Tillerd8a3c042016-09-09 12:42:37 -07001562 .workqueue_ref = workqueue_ref,
1563 .workqueue_unref = workqueue_unref,
Craig Tiller91031da2016-12-28 15:44:25 -08001564 .workqueue_scheduler = workqueue_scheduler,
Craig Tillerd8a3c042016-09-09 12:42:37 -07001565
Craig Tiller253bd502016-02-25 12:30:23 -08001566 .shutdown_engine = shutdown_engine,
1567};
1568
1569const grpc_event_engine_vtable *grpc_init_poll_posix(void) {
Ken Paysoncd7d0472016-10-11 12:24:20 -07001570 if (!grpc_has_wakeup_fd()) {
Ken Paysonbc544be2016-10-06 19:23:47 -07001571 return NULL;
1572 }
Craig Tiller5e3a0ef2016-06-01 10:28:15 -07001573 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1574 return NULL;
1575 }
Craig Tiller253bd502016-02-25 12:30:23 -08001576 return &vtable;
1577}
1578
Ken Payson82e4ec72016-10-13 12:26:01 -07001579const grpc_event_engine_vtable *grpc_init_poll_cv_posix(void) {
1580 global_cv_fd_table_init();
1581 grpc_enable_cv_wakeup_fds(1);
1582 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1583 global_cv_fd_table_shutdown();
1584 grpc_enable_cv_wakeup_fds(0);
1585 return NULL;
1586 }
1587 return &vtable;
1588}
1589
Craig Tiller253bd502016-02-25 12:30:23 -08001590#endif