blob: 5ddd5313e2b0e4ce7db05a688c32b2bf3ce5248d [file] [log] [blame]
Craig Tiller253bd502016-02-25 12:30:23 -08001/*
2 *
Craig Tillerd6a5b802016-05-13 20:17:38 -07003 * Copyright 2016, Google Inc.
Craig Tiller253bd502016-02-25 12:30:23 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
murgatroid9954070892016-08-08 17:01:18 -070034#include "src/core/lib/iomgr/port.h"
Craig Tiller253bd502016-02-25 12:30:23 -080035
murgatroid99623dd4f2016-08-08 17:31:27 -070036#ifdef GRPC_POSIX_SOCKET
Craig Tiller253bd502016-02-25 12:30:23 -080037
Craig Tillerd9a60bb2016-03-28 23:13:19 -070038#include "src/core/lib/iomgr/ev_poll_posix.h"
Craig Tiller253bd502016-02-25 12:30:23 -080039
40#include <assert.h>
41#include <errno.h>
42#include <poll.h>
43#include <string.h>
44#include <sys/socket.h>
45#include <unistd.h>
46
47#include <grpc/support/alloc.h>
48#include <grpc/support/log.h>
49#include <grpc/support/string_util.h>
Ken Payson82e4ec72016-10-13 12:26:01 -070050#include <grpc/support/thd.h>
Craig Tiller253bd502016-02-25 12:30:23 -080051#include <grpc/support/tls.h>
52#include <grpc/support/useful.h>
53
Craig Tillerd9a60bb2016-03-28 23:13:19 -070054#include "src/core/lib/iomgr/iomgr_internal.h"
Ken Payson82e4ec72016-10-13 12:26:01 -070055#include "src/core/lib/iomgr/wakeup_fd_cv.h"
Craig Tillerd9a60bb2016-03-28 23:13:19 -070056#include "src/core/lib/iomgr/wakeup_fd_posix.h"
57#include "src/core/lib/profiling/timers.h"
58#include "src/core/lib/support/block_annotate.h"
Craig Tiller253bd502016-02-25 12:30:23 -080059
60/*******************************************************************************
61 * FD declarations
62 */
63
64typedef struct grpc_fd_watcher {
65 struct grpc_fd_watcher *next;
66 struct grpc_fd_watcher *prev;
67 grpc_pollset *pollset;
68 grpc_pollset_worker *worker;
69 grpc_fd *fd;
70} grpc_fd_watcher;
71
72struct grpc_fd {
73 int fd;
74 /* refst format:
75 bit0: 1=active/0=orphaned
76 bit1-n: refcount
77 meaning that mostly we ref by two to avoid altering the orphaned bit,
78 and just unref by 1 when we're ready to flag the object as orphaned */
79 gpr_atm refst;
80
81 gpr_mu mu;
82 int shutdown;
83 int closed;
84 int released;
Craig Tillercda759d2017-01-27 11:37:37 -080085 grpc_error *shutdown_error;
Craig Tiller253bd502016-02-25 12:30:23 -080086
87 /* The watcher list.
88
89 The following watcher related fields are protected by watcher_mu.
90
91 An fd_watcher is an ephemeral object created when an fd wants to
92 begin polling, and destroyed after the poll.
93
94 It denotes the fd's interest in whether to read poll or write poll
95 or both or neither on this fd.
96
97 If a watcher is asked to poll for reads or writes, the read_watcher
98 or write_watcher fields are set respectively. A watcher may be asked
99 to poll for both, in which case both fields will be set.
100
101 read_watcher and write_watcher may be NULL if no watcher has been
102 asked to poll for reads or writes.
103
104 If an fd_watcher is not asked to poll for reads or writes, it's added
105 to a linked list of inactive watchers, rooted at inactive_watcher_root.
106 If at a later time there becomes need of a poller to poll, one of
107 the inactive pollers may be kicked out of their poll loops to take
108 that responsibility. */
109 grpc_fd_watcher inactive_watcher_root;
110 grpc_fd_watcher *read_watcher;
111 grpc_fd_watcher *write_watcher;
112
113 grpc_closure *read_closure;
114 grpc_closure *write_closure;
115
Craig Tiller253bd502016-02-25 12:30:23 -0800116 grpc_closure *on_done_closure;
117
118 grpc_iomgr_object iomgr_object;
Craig Tillerb1d3b362016-05-14 13:20:21 -0700119
120 /* The pollset that last noticed and notified that the fd is readable */
121 grpc_pollset *read_notifier_pollset;
Craig Tiller253bd502016-02-25 12:30:23 -0800122};
123
Craig Tillerb4b8e1e2016-11-28 07:33:13 -0800124static grpc_wakeup_fd global_wakeup_fd;
125
Craig Tiller253bd502016-02-25 12:30:23 -0800126/* Begin polling on an fd.
127 Registers that the given pollset is interested in this fd - so that if read
128 or writability interest changes, the pollset can be kicked to pick up that
129 new interest.
130 Return value is:
131 (fd_needs_read? read_mask : 0) | (fd_needs_write? write_mask : 0)
132 i.e. a combination of read_mask and write_mask determined by the fd's current
133 interest in said events.
134 Polling strategies that do not need to alter their behavior depending on the
135 fd's current interest (such as epoll) do not need to call this function.
136 MUST NOT be called with a pollset lock taken */
137static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
138 grpc_pollset_worker *worker, uint32_t read_mask,
139 uint32_t write_mask, grpc_fd_watcher *rec);
140/* Complete polling previously started with fd_begin_poll
141 MUST NOT be called with a pollset lock taken
142 if got_read or got_write are 1, also does the become_{readable,writable} as
143 appropriate. */
144static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *rec,
Craig Tillerb1d3b362016-05-14 13:20:21 -0700145 int got_read, int got_write,
146 grpc_pollset *read_notifier_pollset);
Craig Tiller253bd502016-02-25 12:30:23 -0800147
148/* Return 1 if this fd is orphaned, 0 otherwise */
149static bool fd_is_orphaned(grpc_fd *fd);
150
Craig Tiller253bd502016-02-25 12:30:23 -0800151/* Reference counting for fds */
Craig Tiller9e5ac1b2017-02-14 22:25:50 -0800152//#define GRPC_FD_REF_COUNT_DEBUG
Craig Tiller253bd502016-02-25 12:30:23 -0800153#ifdef GRPC_FD_REF_COUNT_DEBUG
154static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
155static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
156 int line);
157#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
158#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
159#else
160static void fd_ref(grpc_fd *fd);
161static void fd_unref(grpc_fd *fd);
162#define GRPC_FD_REF(fd, reason) fd_ref(fd)
163#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
164#endif
165
Craig Tiller253bd502016-02-25 12:30:23 -0800166#define CLOSURE_NOT_READY ((grpc_closure *)0)
167#define CLOSURE_READY ((grpc_closure *)1)
168
169/*******************************************************************************
170 * pollset declarations
171 */
172
Craig Tiller253bd502016-02-25 12:30:23 -0800173typedef struct grpc_cached_wakeup_fd {
174 grpc_wakeup_fd fd;
175 struct grpc_cached_wakeup_fd *next;
176} grpc_cached_wakeup_fd;
177
178struct grpc_pollset_worker {
179 grpc_cached_wakeup_fd *wakeup_fd;
180 int reevaluate_polling_on_wakeup;
181 int kicked_specifically;
182 struct grpc_pollset_worker *next;
183 struct grpc_pollset_worker *prev;
184};
185
186struct grpc_pollset {
Craig Tillerac04b7f2016-02-26 08:36:44 -0800187 gpr_mu mu;
Craig Tiller253bd502016-02-25 12:30:23 -0800188 grpc_pollset_worker root_worker;
Craig Tiller253bd502016-02-25 12:30:23 -0800189 int shutting_down;
190 int called_shutdown;
191 int kicked_without_pollers;
192 grpc_closure *shutdown_done;
193 grpc_closure_list idle_jobs;
Craig Tiller22543592017-02-14 10:29:36 -0800194 int pollset_set_count;
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800195 /* all polled fds */
196 size_t fd_count;
197 size_t fd_capacity;
198 grpc_fd **fds;
Craig Tiller253bd502016-02-25 12:30:23 -0800199 /* Local cache of eventfds for workers */
200 grpc_cached_wakeup_fd *local_wakeup_cache;
201};
202
Craig Tiller253bd502016-02-25 12:30:23 -0800203/* Add an fd to a pollset */
204static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
205 struct grpc_fd *fd);
206
207static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
208 grpc_pollset_set *pollset_set, grpc_fd *fd);
209
210/* Convert a timespec to milliseconds:
211 - very small or negative poll times are clamped to zero to do a
212 non-blocking poll (which becomes spin polling)
213 - other small values are rounded up to one millisecond
214 - longer than a millisecond polls are rounded up to the next nearest
215 millisecond to avoid spinning
216 - infinite timeouts are converted to -1 */
217static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
218 gpr_timespec now);
219
220/* Allow kick to wakeup the currently polling worker */
221#define GRPC_POLLSET_CAN_KICK_SELF 1
222/* Force the wakee to repoll when awoken */
223#define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2
224/* As per pollset_kick, with an extended set of flags (defined above)
225 -- mostly for fd_posix's use. */
Craig Tillerd6a5b802016-05-13 20:17:38 -0700226static grpc_error *pollset_kick_ext(grpc_pollset *p,
227 grpc_pollset_worker *specific_worker,
228 uint32_t flags) GRPC_MUST_USE_RESULT;
Craig Tiller253bd502016-02-25 12:30:23 -0800229
Craig Tiller253bd502016-02-25 12:30:23 -0800230/* Return 1 if the pollset has active threads in pollset_work (pollset must
231 * be locked) */
Craig Tiller22543592017-02-14 10:29:36 -0800232static bool pollset_has_workers(grpc_pollset *pollset);
Craig Tiller253bd502016-02-25 12:30:23 -0800233
Craig Tiller253bd502016-02-25 12:30:23 -0800234/*******************************************************************************
235 * pollset_set definitions
236 */
237
238struct grpc_pollset_set {
239 gpr_mu mu;
240
241 size_t pollset_count;
242 size_t pollset_capacity;
243 grpc_pollset **pollsets;
244
245 size_t pollset_set_count;
246 size_t pollset_set_capacity;
247 struct grpc_pollset_set **pollset_sets;
248
249 size_t fd_count;
250 size_t fd_capacity;
251 grpc_fd **fds;
252};
253
254/*******************************************************************************
Ken Payson82e4ec72016-10-13 12:26:01 -0700255 * condition variable polling definitions
256 */
257
258#define CV_POLL_PERIOD_MS 1000
259#define CV_DEFAULT_TABLE_SIZE 16
260
Ken Paysonb97cbd72016-10-13 15:06:01 -0700261typedef enum poll_status_t { INPROGRESS, COMPLETED, CANCELLED } poll_status_t;
Ken Payson82e4ec72016-10-13 12:26:01 -0700262
263typedef struct poll_args {
264 gpr_refcount refcount;
265 gpr_cv *cv;
266 struct pollfd *fds;
267 nfds_t nfds;
268 int timeout;
269 int retval;
270 int err;
Ken Paysonb97cbd72016-10-13 15:06:01 -0700271 gpr_atm status;
Ken Payson82e4ec72016-10-13 12:26:01 -0700272} poll_args;
273
274cv_fd_table g_cvfds;
275
276/*******************************************************************************
Craig Tiller253bd502016-02-25 12:30:23 -0800277 * fd_posix.c
278 */
279
Craig Tiller253bd502016-02-25 12:30:23 -0800280#ifdef GRPC_FD_REF_COUNT_DEBUG
281#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
282#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
283static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
284 int line) {
285 gpr_log(GPR_DEBUG, "FD %d %p ref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
Craig Tiller9e5ac1b2017-02-14 22:25:50 -0800286 (int)gpr_atm_no_barrier_load(&fd->refst),
287 (int)gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
Craig Tiller253bd502016-02-25 12:30:23 -0800288#else
289#define REF_BY(fd, n, reason) ref_by(fd, n)
290#define UNREF_BY(fd, n, reason) unref_by(fd, n)
291static void ref_by(grpc_fd *fd, int n) {
292#endif
293 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
294}
295
296#ifdef GRPC_FD_REF_COUNT_DEBUG
297static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
298 int line) {
299 gpr_atm old;
300 gpr_log(GPR_DEBUG, "FD %d %p unref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
Craig Tiller9e5ac1b2017-02-14 22:25:50 -0800301 (int)gpr_atm_no_barrier_load(&fd->refst),
302 (int)gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
Craig Tiller253bd502016-02-25 12:30:23 -0800303#else
304static void unref_by(grpc_fd *fd, int n) {
305 gpr_atm old;
306#endif
307 old = gpr_atm_full_fetch_add(&fd->refst, -n);
308 if (old == n) {
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800309 gpr_mu_destroy(&fd->mu);
Craig Tillerb38197e2016-02-26 10:14:54 -0800310 grpc_iomgr_unregister_object(&fd->iomgr_object);
Craig Tillercda759d2017-01-27 11:37:37 -0800311 if (fd->shutdown) GRPC_ERROR_UNREF(fd->shutdown_error);
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800312 gpr_free(fd);
Craig Tiller253bd502016-02-25 12:30:23 -0800313 } else {
314 GPR_ASSERT(old > n);
315 }
316}
317
Craig Tiller253bd502016-02-25 12:30:23 -0800318static grpc_fd *fd_create(int fd, const char *name) {
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800319 grpc_fd *r = gpr_malloc(sizeof(*r));
320 gpr_mu_init(&r->mu);
321 gpr_atm_rel_store(&r->refst, 1);
322 r->shutdown = 0;
323 r->read_closure = CLOSURE_NOT_READY;
324 r->write_closure = CLOSURE_NOT_READY;
325 r->fd = fd;
326 r->inactive_watcher_root.next = r->inactive_watcher_root.prev =
327 &r->inactive_watcher_root;
328 r->read_watcher = r->write_watcher = NULL;
329 r->on_done_closure = NULL;
330 r->closed = 0;
331 r->released = 0;
Craig Tillerb1d3b362016-05-14 13:20:21 -0700332 r->read_notifier_pollset = NULL;
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800333
Craig Tiller253bd502016-02-25 12:30:23 -0800334 char *name2;
335 gpr_asprintf(&name2, "%s fd=%d", name, fd);
336 grpc_iomgr_register_object(&r->iomgr_object, name2);
337 gpr_free(name2);
338#ifdef GRPC_FD_REF_COUNT_DEBUG
339 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, r, name);
340#endif
341 return r;
342}
343
344static bool fd_is_orphaned(grpc_fd *fd) {
345 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
346}
347
Craig Tillerb1d3b362016-05-14 13:20:21 -0700348/* Return the read-notifier pollset */
349static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
350 grpc_fd *fd) {
351 grpc_pollset *notifier = NULL;
352
353 gpr_mu_lock(&fd->mu);
354 notifier = fd->read_notifier_pollset;
355 gpr_mu_unlock(&fd->mu);
356
357 return notifier;
358}
359
Craig Tillerd6a5b802016-05-13 20:17:38 -0700360static grpc_error *pollset_kick_locked(grpc_fd_watcher *watcher) {
Craig Tillerac04b7f2016-02-26 08:36:44 -0800361 gpr_mu_lock(&watcher->pollset->mu);
Craig Tiller253bd502016-02-25 12:30:23 -0800362 GPR_ASSERT(watcher->worker);
Craig Tillerd6a5b802016-05-13 20:17:38 -0700363 grpc_error *err = pollset_kick_ext(watcher->pollset, watcher->worker,
364 GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP);
Craig Tillerac04b7f2016-02-26 08:36:44 -0800365 gpr_mu_unlock(&watcher->pollset->mu);
Craig Tillerd6a5b802016-05-13 20:17:38 -0700366 return err;
Craig Tiller253bd502016-02-25 12:30:23 -0800367}
368
369static void maybe_wake_one_watcher_locked(grpc_fd *fd) {
370 if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) {
371 pollset_kick_locked(fd->inactive_watcher_root.next);
372 } else if (fd->read_watcher) {
373 pollset_kick_locked(fd->read_watcher);
374 } else if (fd->write_watcher) {
375 pollset_kick_locked(fd->write_watcher);
376 }
377}
378
379static void wake_all_watchers_locked(grpc_fd *fd) {
380 grpc_fd_watcher *watcher;
381 for (watcher = fd->inactive_watcher_root.next;
382 watcher != &fd->inactive_watcher_root; watcher = watcher->next) {
383 pollset_kick_locked(watcher);
384 }
385 if (fd->read_watcher) {
386 pollset_kick_locked(fd->read_watcher);
387 }
388 if (fd->write_watcher && fd->write_watcher != fd->read_watcher) {
389 pollset_kick_locked(fd->write_watcher);
390 }
391}
392
393static int has_watchers(grpc_fd *fd) {
394 return fd->read_watcher != NULL || fd->write_watcher != NULL ||
395 fd->inactive_watcher_root.next != &fd->inactive_watcher_root;
396}
397
398static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
399 fd->closed = 1;
400 if (!fd->released) {
401 close(fd->fd);
Craig Tiller253bd502016-02-25 12:30:23 -0800402 }
Craig Tiller91031da2016-12-28 15:44:25 -0800403 grpc_closure_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE);
Craig Tiller253bd502016-02-25 12:30:23 -0800404}
405
406static int fd_wrapped_fd(grpc_fd *fd) {
407 if (fd->released || fd->closed) {
408 return -1;
409 } else {
410 return fd->fd;
411 }
412}
413
414static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
415 grpc_closure *on_done, int *release_fd,
416 const char *reason) {
417 fd->on_done_closure = on_done;
418 fd->released = release_fd != NULL;
David Garcia Quintase272af02016-12-12 14:51:31 -0800419 if (fd->released) {
Craig Tiller253bd502016-02-25 12:30:23 -0800420 *release_fd = fd->fd;
421 }
422 gpr_mu_lock(&fd->mu);
423 REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
424 if (!has_watchers(fd)) {
425 close_fd_locked(exec_ctx, fd);
426 } else {
427 wake_all_watchers_locked(fd);
428 }
429 gpr_mu_unlock(&fd->mu);
430 UNREF_BY(fd, 2, reason); /* drop the reference */
431}
432
433/* increment refcount by two to avoid changing the orphan bit */
434#ifdef GRPC_FD_REF_COUNT_DEBUG
435static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
436 int line) {
437 ref_by(fd, 2, reason, file, line);
438}
439
440static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
441 int line) {
442 unref_by(fd, 2, reason, file, line);
443}
444#else
445static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
446
447static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
448#endif
449
Craig Tillercda759d2017-01-27 11:37:37 -0800450static grpc_error *fd_shutdown_error(grpc_fd *fd) {
451 if (!fd->shutdown) {
Craig Tillerd6a5b802016-05-13 20:17:38 -0700452 return GRPC_ERROR_NONE;
453 } else {
Craig Tillercda759d2017-01-27 11:37:37 -0800454 return GRPC_ERROR_CREATE_REFERENCING("FD shutdown", &fd->shutdown_error, 1);
Craig Tillerd6a5b802016-05-13 20:17:38 -0700455 }
456}
457
Craig Tiller253bd502016-02-25 12:30:23 -0800458static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
459 grpc_closure **st, grpc_closure *closure) {
Craig Tiller52f23122016-06-15 09:34:14 -0700460 if (fd->shutdown) {
Craig Tiller91031da2016-12-28 15:44:25 -0800461 grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"));
Craig Tiller52f23122016-06-15 09:34:14 -0700462 } else if (*st == CLOSURE_NOT_READY) {
Craig Tiller253bd502016-02-25 12:30:23 -0800463 /* not ready ==> switch to a waiting state by setting the closure */
464 *st = closure;
465 } else if (*st == CLOSURE_READY) {
466 /* already ready ==> queue the closure to run immediately */
467 *st = CLOSURE_NOT_READY;
Craig Tillercda759d2017-01-27 11:37:37 -0800468 grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd));
Craig Tiller253bd502016-02-25 12:30:23 -0800469 maybe_wake_one_watcher_locked(fd);
470 } else {
471 /* upcallptr was set to a different closure. This is an error! */
472 gpr_log(GPR_ERROR,
473 "User called a notify_on function with a previous callback still "
474 "pending");
475 abort();
476 }
477}
478
479/* returns 1 if state becomes not ready */
480static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
481 grpc_closure **st) {
482 if (*st == CLOSURE_READY) {
483 /* duplicate ready ==> ignore */
484 return 0;
485 } else if (*st == CLOSURE_NOT_READY) {
486 /* not ready, and not waiting ==> flag ready */
487 *st = CLOSURE_READY;
488 return 0;
489 } else {
490 /* waiting ==> queue closure */
Craig Tillercda759d2017-01-27 11:37:37 -0800491 grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd));
Craig Tiller253bd502016-02-25 12:30:23 -0800492 *st = CLOSURE_NOT_READY;
493 return 1;
494 }
495}
496
Craig Tillerb1d3b362016-05-14 13:20:21 -0700497static void set_read_notifier_pollset_locked(
498 grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_pollset *read_notifier_pollset) {
499 fd->read_notifier_pollset = read_notifier_pollset;
500}
501
Craig Tillercda759d2017-01-27 11:37:37 -0800502static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
Craig Tiller253bd502016-02-25 12:30:23 -0800503 gpr_mu_lock(&fd->mu);
Craig Tiller52f23122016-06-15 09:34:14 -0700504 /* only shutdown once */
505 if (!fd->shutdown) {
506 fd->shutdown = 1;
Craig Tillercda759d2017-01-27 11:37:37 -0800507 fd->shutdown_error = why;
Craig Tiller52f23122016-06-15 09:34:14 -0700508 /* signal read/write closed to OS so that future operations fail */
509 shutdown(fd->fd, SHUT_RDWR);
510 set_ready_locked(exec_ctx, fd, &fd->read_closure);
511 set_ready_locked(exec_ctx, fd, &fd->write_closure);
Craig Tillercda759d2017-01-27 11:37:37 -0800512 } else {
513 GRPC_ERROR_UNREF(why);
Craig Tiller52f23122016-06-15 09:34:14 -0700514 }
Craig Tiller253bd502016-02-25 12:30:23 -0800515 gpr_mu_unlock(&fd->mu);
516}
517
Craig Tiller52f23122016-06-15 09:34:14 -0700518static bool fd_is_shutdown(grpc_fd *fd) {
519 gpr_mu_lock(&fd->mu);
520 bool r = fd->shutdown;
521 gpr_mu_unlock(&fd->mu);
522 return r;
523}
524
Craig Tiller253bd502016-02-25 12:30:23 -0800525static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
526 grpc_closure *closure) {
527 gpr_mu_lock(&fd->mu);
528 notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
529 gpr_mu_unlock(&fd->mu);
530}
531
532static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
533 grpc_closure *closure) {
534 gpr_mu_lock(&fd->mu);
535 notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
536 gpr_mu_unlock(&fd->mu);
537}
538
539static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
540 grpc_pollset_worker *worker, uint32_t read_mask,
541 uint32_t write_mask, grpc_fd_watcher *watcher) {
542 uint32_t mask = 0;
543 grpc_closure *cur;
544 int requested;
545 /* keep track of pollers that have requested our events, in case they change
546 */
547 GRPC_FD_REF(fd, "poll");
548
549 gpr_mu_lock(&fd->mu);
550
551 /* if we are shutdown, then don't add to the watcher set */
552 if (fd->shutdown) {
553 watcher->fd = NULL;
554 watcher->pollset = NULL;
555 watcher->worker = NULL;
556 gpr_mu_unlock(&fd->mu);
557 GRPC_FD_UNREF(fd, "poll");
558 return 0;
559 }
560
561 /* if there is nobody polling for read, but we need to, then start doing so */
562 cur = fd->read_closure;
563 requested = cur != CLOSURE_READY;
564 if (read_mask && fd->read_watcher == NULL && requested) {
565 fd->read_watcher = watcher;
566 mask |= read_mask;
567 }
568 /* if there is nobody polling for write, but we need to, then start doing so
569 */
570 cur = fd->write_closure;
571 requested = cur != CLOSURE_READY;
572 if (write_mask && fd->write_watcher == NULL && requested) {
573 fd->write_watcher = watcher;
574 mask |= write_mask;
575 }
576 /* if not polling, remember this watcher in case we need someone to later */
577 if (mask == 0 && worker != NULL) {
578 watcher->next = &fd->inactive_watcher_root;
579 watcher->prev = watcher->next->prev;
580 watcher->next->prev = watcher->prev->next = watcher;
581 }
582 watcher->pollset = pollset;
583 watcher->worker = worker;
584 watcher->fd = fd;
585 gpr_mu_unlock(&fd->mu);
586
587 return mask;
588}
589
590static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
Craig Tillerb1d3b362016-05-14 13:20:21 -0700591 int got_read, int got_write,
592 grpc_pollset *read_notifier_pollset) {
Craig Tiller253bd502016-02-25 12:30:23 -0800593 int was_polling = 0;
594 int kick = 0;
595 grpc_fd *fd = watcher->fd;
596
597 if (fd == NULL) {
598 return;
599 }
600
601 gpr_mu_lock(&fd->mu);
602
603 if (watcher == fd->read_watcher) {
604 /* remove read watcher, kick if we still need a read */
605 was_polling = 1;
606 if (!got_read) {
607 kick = 1;
608 }
609 fd->read_watcher = NULL;
610 }
611 if (watcher == fd->write_watcher) {
612 /* remove write watcher, kick if we still need a write */
613 was_polling = 1;
614 if (!got_write) {
615 kick = 1;
616 }
617 fd->write_watcher = NULL;
618 }
619 if (!was_polling && watcher->worker != NULL) {
620 /* remove from inactive list */
621 watcher->next->prev = watcher->prev;
622 watcher->prev->next = watcher->next;
623 }
624 if (got_read) {
625 if (set_ready_locked(exec_ctx, fd, &fd->read_closure)) {
626 kick = 1;
627 }
Craig Tillerb1d3b362016-05-14 13:20:21 -0700628 if (read_notifier_pollset != NULL) {
629 set_read_notifier_pollset_locked(exec_ctx, fd, read_notifier_pollset);
630 }
Craig Tiller253bd502016-02-25 12:30:23 -0800631 }
632 if (got_write) {
633 if (set_ready_locked(exec_ctx, fd, &fd->write_closure)) {
634 kick = 1;
635 }
636 }
637 if (kick) {
638 maybe_wake_one_watcher_locked(fd);
639 }
640 if (fd_is_orphaned(fd) && !has_watchers(fd) && !fd->closed) {
641 close_fd_locked(exec_ctx, fd);
642 }
643 gpr_mu_unlock(&fd->mu);
644
645 GRPC_FD_UNREF(fd, "poll");
646}
647
Craig Tiller70bd4832016-06-30 14:20:46 -0700648static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { return NULL; }
649
Craig Tiller253bd502016-02-25 12:30:23 -0800650/*******************************************************************************
651 * pollset_posix.c
652 */
653
654GPR_TLS_DECL(g_current_thread_poller);
655GPR_TLS_DECL(g_current_thread_worker);
656
Craig Tiller253bd502016-02-25 12:30:23 -0800657static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
658 worker->prev->next = worker->next;
659 worker->next->prev = worker->prev;
660}
661
Craig Tiller22543592017-02-14 10:29:36 -0800662static bool pollset_has_workers(grpc_pollset *p) {
Craig Tiller253bd502016-02-25 12:30:23 -0800663 return p->root_worker.next != &p->root_worker;
664}
665
Craig Tiller22543592017-02-14 10:29:36 -0800666static bool pollset_in_pollset_sets(grpc_pollset *p) {
667 return p->pollset_set_count;
668}
669
670static bool pollset_has_observers(grpc_pollset *p) {
671 return pollset_has_workers(p) || pollset_in_pollset_sets(p);
672}
673
Craig Tiller253bd502016-02-25 12:30:23 -0800674static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
675 if (pollset_has_workers(p)) {
676 grpc_pollset_worker *w = p->root_worker.next;
677 remove_worker(p, w);
678 return w;
679 } else {
680 return NULL;
681 }
682}
683
684static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
685 worker->next = &p->root_worker;
686 worker->prev = worker->next->prev;
687 worker->prev->next = worker->next->prev = worker;
688}
689
690static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
691 worker->prev = &p->root_worker;
692 worker->next = worker->prev->next;
693 worker->prev->next = worker->next->prev = worker;
694}
695
Craig Tillerd6a5b802016-05-13 20:17:38 -0700696static void kick_append_error(grpc_error **composite, grpc_error *error) {
697 if (error == GRPC_ERROR_NONE) return;
698 if (*composite == GRPC_ERROR_NONE) {
699 *composite = GRPC_ERROR_CREATE("Kick Failure");
700 }
701 *composite = grpc_error_add_child(*composite, error);
702}
703
704static grpc_error *pollset_kick_ext(grpc_pollset *p,
705 grpc_pollset_worker *specific_worker,
706 uint32_t flags) {
Craig Tiller253bd502016-02-25 12:30:23 -0800707 GPR_TIMER_BEGIN("pollset_kick_ext", 0);
Craig Tillerd6a5b802016-05-13 20:17:38 -0700708 grpc_error *error = GRPC_ERROR_NONE;
Craig Tiller253bd502016-02-25 12:30:23 -0800709
710 /* pollset->mu already held */
711 if (specific_worker != NULL) {
712 if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
713 GPR_TIMER_BEGIN("pollset_kick_ext.broadcast", 0);
714 GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
715 for (specific_worker = p->root_worker.next;
716 specific_worker != &p->root_worker;
717 specific_worker = specific_worker->next) {
Craig Tillerd6a5b802016-05-13 20:17:38 -0700718 kick_append_error(
719 &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
Craig Tiller253bd502016-02-25 12:30:23 -0800720 }
Craig Tillerd6a5b802016-05-13 20:17:38 -0700721 p->kicked_without_pollers = true;
Craig Tiller253bd502016-02-25 12:30:23 -0800722 GPR_TIMER_END("pollset_kick_ext.broadcast", 0);
723 } else if (gpr_tls_get(&g_current_thread_worker) !=
724 (intptr_t)specific_worker) {
725 GPR_TIMER_MARK("different_thread_worker", 0);
726 if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
Craig Tillerd6a5b802016-05-13 20:17:38 -0700727 specific_worker->reevaluate_polling_on_wakeup = true;
Craig Tiller253bd502016-02-25 12:30:23 -0800728 }
Craig Tillerd6a5b802016-05-13 20:17:38 -0700729 specific_worker->kicked_specifically = true;
730 kick_append_error(&error,
731 grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
Craig Tiller253bd502016-02-25 12:30:23 -0800732 } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) {
733 GPR_TIMER_MARK("kick_yoself", 0);
734 if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
Craig Tillerd6a5b802016-05-13 20:17:38 -0700735 specific_worker->reevaluate_polling_on_wakeup = true;
Craig Tiller253bd502016-02-25 12:30:23 -0800736 }
Craig Tillerd6a5b802016-05-13 20:17:38 -0700737 specific_worker->kicked_specifically = true;
738 kick_append_error(&error,
739 grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
Craig Tiller253bd502016-02-25 12:30:23 -0800740 }
741 } else if (gpr_tls_get(&g_current_thread_poller) != (intptr_t)p) {
742 GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
743 GPR_TIMER_MARK("kick_anonymous", 0);
744 specific_worker = pop_front_worker(p);
745 if (specific_worker != NULL) {
746 if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) {
747 GPR_TIMER_MARK("kick_anonymous_not_self", 0);
748 push_back_worker(p, specific_worker);
749 specific_worker = pop_front_worker(p);
750 if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 &&
751 gpr_tls_get(&g_current_thread_worker) ==
752 (intptr_t)specific_worker) {
753 push_back_worker(p, specific_worker);
754 specific_worker = NULL;
755 }
756 }
757 if (specific_worker != NULL) {
758 GPR_TIMER_MARK("finally_kick", 0);
759 push_back_worker(p, specific_worker);
Craig Tillerd6a5b802016-05-13 20:17:38 -0700760 kick_append_error(
761 &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
Craig Tiller253bd502016-02-25 12:30:23 -0800762 }
763 } else {
764 GPR_TIMER_MARK("kicked_no_pollers", 0);
Craig Tillerd6a5b802016-05-13 20:17:38 -0700765 p->kicked_without_pollers = true;
Craig Tiller253bd502016-02-25 12:30:23 -0800766 }
767 }
768
769 GPR_TIMER_END("pollset_kick_ext", 0);
Craig Tiller449c64b2016-06-13 16:26:50 -0700770 GRPC_LOG_IF_ERROR("pollset_kick_ext", GRPC_ERROR_REF(error));
Craig Tillerd6a5b802016-05-13 20:17:38 -0700771 return error;
Craig Tiller253bd502016-02-25 12:30:23 -0800772}
773
Craig Tillerd6a5b802016-05-13 20:17:38 -0700774static grpc_error *pollset_kick(grpc_pollset *p,
775 grpc_pollset_worker *specific_worker) {
776 return pollset_kick_ext(p, specific_worker, 0);
Craig Tiller253bd502016-02-25 12:30:23 -0800777}
778
779/* global state management */
780
Craig Tillerd6a5b802016-05-13 20:17:38 -0700781static grpc_error *pollset_global_init(void) {
Craig Tiller253bd502016-02-25 12:30:23 -0800782 gpr_tls_init(&g_current_thread_poller);
783 gpr_tls_init(&g_current_thread_worker);
Craig Tillerb4b8e1e2016-11-28 07:33:13 -0800784 return grpc_wakeup_fd_init(&global_wakeup_fd);
Craig Tiller253bd502016-02-25 12:30:23 -0800785}
786
787static void pollset_global_shutdown(void) {
Craig Tillerb4b8e1e2016-11-28 07:33:13 -0800788 grpc_wakeup_fd_destroy(&global_wakeup_fd);
Craig Tiller253bd502016-02-25 12:30:23 -0800789 gpr_tls_destroy(&g_current_thread_poller);
790 gpr_tls_destroy(&g_current_thread_worker);
Craig Tiller253bd502016-02-25 12:30:23 -0800791}
792
Craig Tillerd6a5b802016-05-13 20:17:38 -0700793static grpc_error *kick_poller(void) {
Craig Tillerb4b8e1e2016-11-28 07:33:13 -0800794 return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
Craig Tillerd6a5b802016-05-13 20:17:38 -0700795}
Craig Tiller253bd502016-02-25 12:30:23 -0800796
797/* main interface */
798
Craig Tillerac04b7f2016-02-26 08:36:44 -0800799static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
800 gpr_mu_init(&pollset->mu);
801 *mu = &pollset->mu;
Craig Tiller253bd502016-02-25 12:30:23 -0800802 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
Craig Tiller253bd502016-02-25 12:30:23 -0800803 pollset->shutting_down = 0;
804 pollset->called_shutdown = 0;
805 pollset->kicked_without_pollers = 0;
806 pollset->idle_jobs.head = pollset->idle_jobs.tail = NULL;
807 pollset->local_wakeup_cache = NULL;
808 pollset->kicked_without_pollers = 0;
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800809 pollset->fd_count = 0;
Craig Tillerb38197e2016-02-26 10:14:54 -0800810 pollset->fd_capacity = 0;
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800811 pollset->fds = NULL;
Craig Tiller22543592017-02-14 10:29:36 -0800812 pollset->pollset_set_count = 0;
Craig Tiller253bd502016-02-25 12:30:23 -0800813}
814
815static void pollset_destroy(grpc_pollset *pollset) {
Craig Tiller253bd502016-02-25 12:30:23 -0800816 GPR_ASSERT(!pollset_has_workers(pollset));
817 GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail);
Craig Tiller253bd502016-02-25 12:30:23 -0800818 while (pollset->local_wakeup_cache) {
819 grpc_cached_wakeup_fd *next = pollset->local_wakeup_cache->next;
820 grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd);
821 gpr_free(pollset->local_wakeup_cache);
822 pollset->local_wakeup_cache = next;
823 }
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800824 gpr_free(pollset->fds);
Craig Tillerac04b7f2016-02-26 08:36:44 -0800825 gpr_mu_destroy(&pollset->mu);
Craig Tiller253bd502016-02-25 12:30:23 -0800826}
827
Craig Tiller253bd502016-02-25 12:30:23 -0800828static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
829 grpc_fd *fd) {
Craig Tillerac04b7f2016-02-26 08:36:44 -0800830 gpr_mu_lock(&pollset->mu);
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800831 size_t i;
832 /* TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here */
833 for (i = 0; i < pollset->fd_count; i++) {
834 if (pollset->fds[i] == fd) goto exit;
835 }
836 if (pollset->fd_count == pollset->fd_capacity) {
837 pollset->fd_capacity =
838 GPR_MAX(pollset->fd_capacity + 8, pollset->fd_count * 3 / 2);
839 pollset->fds =
840 gpr_realloc(pollset->fds, sizeof(grpc_fd *) * pollset->fd_capacity);
841 }
842 pollset->fds[pollset->fd_count++] = fd;
843 GRPC_FD_REF(fd, "multipoller");
Craig Tiller4c2218e2016-05-11 10:27:08 -0700844 pollset_kick(pollset, NULL);
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800845exit:
Craig Tillerac04b7f2016-02-26 08:36:44 -0800846 gpr_mu_unlock(&pollset->mu);
Craig Tiller253bd502016-02-25 12:30:23 -0800847}
848
849static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
850 GPR_ASSERT(grpc_closure_list_empty(pollset->idle_jobs));
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800851 size_t i;
852 for (i = 0; i < pollset->fd_count; i++) {
853 GRPC_FD_UNREF(pollset->fds[i], "multipoller");
854 }
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800855 pollset->fd_count = 0;
Craig Tiller91031da2016-12-28 15:44:25 -0800856 grpc_closure_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE);
Craig Tiller253bd502016-02-25 12:30:23 -0800857}
858
Craig Tillerd6a5b802016-05-13 20:17:38 -0700859static void work_combine_error(grpc_error **composite, grpc_error *error) {
860 if (error == GRPC_ERROR_NONE) return;
861 if (*composite == GRPC_ERROR_NONE) {
862 *composite = GRPC_ERROR_CREATE("pollset_work");
863 }
864 *composite = grpc_error_add_child(*composite, error);
865}
866
867static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
868 grpc_pollset_worker **worker_hdl,
869 gpr_timespec now, gpr_timespec deadline) {
Craig Tiller253bd502016-02-25 12:30:23 -0800870 grpc_pollset_worker worker;
871 *worker_hdl = &worker;
Craig Tillerd6a5b802016-05-13 20:17:38 -0700872 grpc_error *error = GRPC_ERROR_NONE;
Craig Tiller253bd502016-02-25 12:30:23 -0800873
Sree Kuchibhotla69b74782016-07-14 21:23:22 -0700874 /* Avoid malloc for small number of elements. */
875 enum { inline_elements = 96 };
876 struct pollfd pollfd_space[inline_elements];
877 struct grpc_fd_watcher watcher_space[inline_elements];
878
Craig Tiller253bd502016-02-25 12:30:23 -0800879 /* pollset->mu already held */
880 int added_worker = 0;
881 int locked = 1;
882 int queued_work = 0;
883 int keep_polling = 0;
884 GPR_TIMER_BEGIN("pollset_work", 0);
885 /* this must happen before we (potentially) drop pollset->mu */
886 worker.next = worker.prev = NULL;
887 worker.reevaluate_polling_on_wakeup = 0;
888 if (pollset->local_wakeup_cache != NULL) {
889 worker.wakeup_fd = pollset->local_wakeup_cache;
890 pollset->local_wakeup_cache = worker.wakeup_fd->next;
891 } else {
892 worker.wakeup_fd = gpr_malloc(sizeof(*worker.wakeup_fd));
Craig Tillerd6a5b802016-05-13 20:17:38 -0700893 error = grpc_wakeup_fd_init(&worker.wakeup_fd->fd);
894 if (error != GRPC_ERROR_NONE) {
Craig Tiller449c64b2016-06-13 16:26:50 -0700895 GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
Craig Tillerd6a5b802016-05-13 20:17:38 -0700896 return error;
897 }
Craig Tiller253bd502016-02-25 12:30:23 -0800898 }
899 worker.kicked_specifically = 0;
900 /* If there's work waiting for the pollset to be idle, and the
901 pollset is idle, then do that work */
902 if (!pollset_has_workers(pollset) &&
903 !grpc_closure_list_empty(pollset->idle_jobs)) {
904 GPR_TIMER_MARK("pollset_work.idle_jobs", 0);
Craig Tiller91031da2016-12-28 15:44:25 -0800905 grpc_closure_list_sched(exec_ctx, &pollset->idle_jobs);
Craig Tiller253bd502016-02-25 12:30:23 -0800906 goto done;
907 }
908 /* If we're shutting down then we don't execute any extended work */
909 if (pollset->shutting_down) {
910 GPR_TIMER_MARK("pollset_work.shutting_down", 0);
911 goto done;
912 }
Craig Tiller253bd502016-02-25 12:30:23 -0800913 /* Start polling, and keep doing so while we're being asked to
914 re-evaluate our pollers (this allows poll() based pollers to
915 ensure they don't miss wakeups) */
916 keep_polling = 1;
Craig Tiller556e5ae2016-05-16 11:00:33 -0700917 gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset);
Craig Tiller253bd502016-02-25 12:30:23 -0800918 while (keep_polling) {
919 keep_polling = 0;
920 if (!pollset->kicked_without_pollers) {
921 if (!added_worker) {
922 push_front_worker(pollset, &worker);
923 added_worker = 1;
924 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
925 }
Craig Tiller253bd502016-02-25 12:30:23 -0800926 GPR_TIMER_BEGIN("maybe_work_and_unlock", 0);
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800927#define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR)
928#define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR)
929
930 int timeout;
931 int r;
Craig Tillerae09d9d2016-05-20 22:23:37 -0700932 size_t i, fd_count;
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800933 nfds_t pfd_count;
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800934 grpc_fd_watcher *watchers;
935 struct pollfd *pfds;
936
937 timeout = poll_deadline_to_millis_timeout(deadline, now);
Sree Kuchibhotla69b74782016-07-14 21:23:22 -0700938
939 if (pollset->fd_count + 2 <= inline_elements) {
940 pfds = pollfd_space;
941 watchers = watcher_space;
942 } else {
943 /* Allocate one buffer to hold both pfds and watchers arrays */
944 const size_t pfd_size = sizeof(*pfds) * (pollset->fd_count + 2);
945 const size_t watch_size = sizeof(*watchers) * (pollset->fd_count + 2);
946 void *buf = gpr_malloc(pfd_size + watch_size);
947 pfds = buf;
948 watchers = (void *)((char *)buf + pfd_size);
949 }
950
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800951 fd_count = 0;
952 pfd_count = 2;
Craig Tillerb4b8e1e2016-11-28 07:33:13 -0800953 pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&global_wakeup_fd);
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800954 pfds[0].events = POLLIN;
955 pfds[0].revents = 0;
956 pfds[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker.wakeup_fd->fd);
957 pfds[1].events = POLLIN;
958 pfds[1].revents = 0;
959 for (i = 0; i < pollset->fd_count; i++) {
Craig Tillerae09d9d2016-05-20 22:23:37 -0700960 if (fd_is_orphaned(pollset->fds[i])) {
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800961 GRPC_FD_UNREF(pollset->fds[i], "multipoller");
962 } else {
963 pollset->fds[fd_count++] = pollset->fds[i];
964 watchers[pfd_count].fd = pollset->fds[i];
Craig Tiller2fad50d2016-03-08 07:52:42 -0800965 GRPC_FD_REF(watchers[pfd_count].fd, "multipoller_start");
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800966 pfds[pfd_count].fd = pollset->fds[i]->fd;
967 pfds[pfd_count].revents = 0;
968 pfd_count++;
969 }
970 }
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800971 pollset->fd_count = fd_count;
Craig Tillerac04b7f2016-02-26 08:36:44 -0800972 gpr_mu_unlock(&pollset->mu);
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800973
974 for (i = 2; i < pfd_count; i++) {
Craig Tiller2fad50d2016-03-08 07:52:42 -0800975 grpc_fd *fd = watchers[i].fd;
976 pfds[i].events = (short)fd_begin_poll(fd, pollset, &worker, POLLIN,
Craig Tillerd9a60bb2016-03-28 23:13:19 -0700977 POLLOUT, &watchers[i]);
Craig Tiller2fad50d2016-03-08 07:52:42 -0800978 GRPC_FD_UNREF(fd, "multipoller_start");
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800979 }
980
981 /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
982 even going into the blocking annotation if possible */
983 GRPC_SCHEDULING_START_BLOCKING_REGION;
984 r = grpc_poll_function(pfds, pfd_count, timeout);
985 GRPC_SCHEDULING_END_BLOCKING_REGION;
986
987 if (r < 0) {
988 if (errno != EINTR) {
Craig Tillerd6a5b802016-05-13 20:17:38 -0700989 work_combine_error(&error, GRPC_OS_ERROR(errno, "poll"));
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800990 }
kpayson64668a8422016-10-10 01:40:09 -0700991
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800992 for (i = 2; i < pfd_count; i++) {
kpayson64668a8422016-10-10 01:40:09 -0700993 if (watchers[i].fd == NULL) {
994 fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL);
995 } else {
996 // Wake up all the file descriptors, if we have an invalid one
997 // we can identify it on the next pollset_work()
Ken Payson7347ad82016-10-11 11:12:29 -0700998 fd_end_poll(exec_ctx, &watchers[i], 1, 1, pollset);
kpayson64668a8422016-10-10 01:40:09 -0700999 }
Craig Tiller7ac6bf02016-02-25 12:54:59 -08001000 }
1001 } else if (r == 0) {
1002 for (i = 2; i < pfd_count; i++) {
Craig Tillerb1d3b362016-05-14 13:20:21 -07001003 fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL);
Craig Tiller7ac6bf02016-02-25 12:54:59 -08001004 }
1005 } else {
1006 if (pfds[0].revents & POLLIN_CHECK) {
Craig Tiller1fa9ddb2016-11-28 08:19:37 -08001007 work_combine_error(&error,
1008 grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd));
Craig Tiller7ac6bf02016-02-25 12:54:59 -08001009 }
1010 if (pfds[1].revents & POLLIN_CHECK) {
Craig Tillerd6a5b802016-05-13 20:17:38 -07001011 work_combine_error(
1012 &error, grpc_wakeup_fd_consume_wakeup(&worker.wakeup_fd->fd));
Craig Tiller7ac6bf02016-02-25 12:54:59 -08001013 }
1014 for (i = 2; i < pfd_count; i++) {
1015 if (watchers[i].fd == NULL) {
Craig Tillerb1d3b362016-05-14 13:20:21 -07001016 fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL);
Craig Tiller9d861202016-05-11 10:12:57 -07001017 } else {
1018 fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN_CHECK,
Craig Tillerb1d3b362016-05-14 13:20:21 -07001019 pfds[i].revents & POLLOUT_CHECK, pollset);
Craig Tiller7ac6bf02016-02-25 12:54:59 -08001020 }
Craig Tiller7ac6bf02016-02-25 12:54:59 -08001021 }
1022 }
1023
Sree Kuchibhotla69b74782016-07-14 21:23:22 -07001024 if (pfds != pollfd_space) {
1025 /* pfds and watchers are in the same memory block pointed to by pfds */
1026 gpr_free(pfds);
1027 }
1028
Craig Tiller253bd502016-02-25 12:30:23 -08001029 GPR_TIMER_END("maybe_work_and_unlock", 0);
1030 locked = 0;
Craig Tiller253bd502016-02-25 12:30:23 -08001031 } else {
1032 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1033 pollset->kicked_without_pollers = 0;
1034 }
1035 /* Finished execution - start cleaning up.
1036 Note that we may arrive here from outside the enclosing while() loop.
1037 In that case we won't loop though as we haven't added worker to the
1038 worker list, which means nobody could ask us to re-evaluate polling). */
1039 done:
1040 if (!locked) {
1041 queued_work |= grpc_exec_ctx_flush(exec_ctx);
Craig Tillerac04b7f2016-02-26 08:36:44 -08001042 gpr_mu_lock(&pollset->mu);
Craig Tiller253bd502016-02-25 12:30:23 -08001043 locked = 1;
1044 }
1045 /* If we're forced to re-evaluate polling (via pollset_kick with
1046 GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force
1047 a loop */
Craig Tillerd6a5b802016-05-13 20:17:38 -07001048 if (worker.reevaluate_polling_on_wakeup && error == GRPC_ERROR_NONE) {
Craig Tiller253bd502016-02-25 12:30:23 -08001049 worker.reevaluate_polling_on_wakeup = 0;
1050 pollset->kicked_without_pollers = 0;
1051 if (queued_work || worker.kicked_specifically) {
1052 /* If there's queued work on the list, then set the deadline to be
1053 immediate so we get back out of the polling loop quickly */
1054 deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC);
1055 }
1056 keep_polling = 1;
1057 }
Craig Tiller9d861202016-05-11 10:12:57 -07001058 if (keep_polling) {
1059 now = gpr_now(now.clock_type);
1060 }
Craig Tiller253bd502016-02-25 12:30:23 -08001061 }
Craig Tiller556e5ae2016-05-16 11:00:33 -07001062 gpr_tls_set(&g_current_thread_poller, 0);
Craig Tiller253bd502016-02-25 12:30:23 -08001063 if (added_worker) {
1064 remove_worker(pollset, &worker);
1065 gpr_tls_set(&g_current_thread_worker, 0);
1066 }
1067 /* release wakeup fd to the local pool */
1068 worker.wakeup_fd->next = pollset->local_wakeup_cache;
1069 pollset->local_wakeup_cache = worker.wakeup_fd;
1070 /* check shutdown conditions */
1071 if (pollset->shutting_down) {
1072 if (pollset_has_workers(pollset)) {
1073 pollset_kick(pollset, NULL);
Craig Tiller22543592017-02-14 10:29:36 -08001074 } else if (!pollset->called_shutdown && !pollset_has_observers(pollset)) {
Craig Tiller253bd502016-02-25 12:30:23 -08001075 pollset->called_shutdown = 1;
Craig Tillerac04b7f2016-02-26 08:36:44 -08001076 gpr_mu_unlock(&pollset->mu);
Craig Tiller253bd502016-02-25 12:30:23 -08001077 finish_shutdown(exec_ctx, pollset);
1078 grpc_exec_ctx_flush(exec_ctx);
1079 /* Continuing to access pollset here is safe -- it is the caller's
1080 * responsibility to not destroy when it has outstanding calls to
1081 * pollset_work.
1082 * TODO(dklempner): Can we refactor the shutdown logic to avoid this? */
Craig Tillerac04b7f2016-02-26 08:36:44 -08001083 gpr_mu_lock(&pollset->mu);
Craig Tiller253bd502016-02-25 12:30:23 -08001084 } else if (!grpc_closure_list_empty(pollset->idle_jobs)) {
Craig Tiller91031da2016-12-28 15:44:25 -08001085 grpc_closure_list_sched(exec_ctx, &pollset->idle_jobs);
Craig Tillerac04b7f2016-02-26 08:36:44 -08001086 gpr_mu_unlock(&pollset->mu);
Craig Tiller253bd502016-02-25 12:30:23 -08001087 grpc_exec_ctx_flush(exec_ctx);
Craig Tillerac04b7f2016-02-26 08:36:44 -08001088 gpr_mu_lock(&pollset->mu);
Craig Tiller253bd502016-02-25 12:30:23 -08001089 }
1090 }
1091 *worker_hdl = NULL;
1092 GPR_TIMER_END("pollset_work", 0);
Craig Tiller449c64b2016-06-13 16:26:50 -07001093 GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
Craig Tillerd6a5b802016-05-13 20:17:38 -07001094 return error;
Craig Tiller253bd502016-02-25 12:30:23 -08001095}
1096
1097static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1098 grpc_closure *closure) {
1099 GPR_ASSERT(!pollset->shutting_down);
1100 pollset->shutting_down = 1;
1101 pollset->shutdown_done = closure;
1102 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1103 if (!pollset_has_workers(pollset)) {
Craig Tiller91031da2016-12-28 15:44:25 -08001104 grpc_closure_list_sched(exec_ctx, &pollset->idle_jobs);
Craig Tiller253bd502016-02-25 12:30:23 -08001105 }
Craig Tiller22543592017-02-14 10:29:36 -08001106 if (!pollset->called_shutdown && !pollset_has_observers(pollset)) {
Craig Tiller253bd502016-02-25 12:30:23 -08001107 pollset->called_shutdown = 1;
1108 finish_shutdown(exec_ctx, pollset);
1109 }
1110}
1111
1112static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
1113 gpr_timespec now) {
1114 gpr_timespec timeout;
1115 static const int64_t max_spin_polling_us = 10;
1116 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
1117 return -1;
1118 }
1119 if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
1120 max_spin_polling_us,
1121 GPR_TIMESPAN))) <= 0) {
1122 return 0;
1123 }
1124 timeout = gpr_time_sub(deadline, now);
1125 return gpr_time_to_millis(gpr_time_add(
1126 timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
1127}
1128
Craig Tiller253bd502016-02-25 12:30:23 -08001129/*******************************************************************************
1130 * pollset_set_posix.c
1131 */
1132
1133static grpc_pollset_set *pollset_set_create(void) {
Craig Tiller6f417882017-02-16 14:09:39 -08001134 grpc_pollset_set *pollset_set = gpr_zalloc(sizeof(*pollset_set));
Craig Tiller253bd502016-02-25 12:30:23 -08001135 gpr_mu_init(&pollset_set->mu);
1136 return pollset_set;
1137}
1138
Craig Tiller9e5ac1b2017-02-14 22:25:50 -08001139static void pollset_set_destroy(grpc_exec_ctx *exec_ctx,
1140 grpc_pollset_set *pollset_set) {
Craig Tiller253bd502016-02-25 12:30:23 -08001141 size_t i;
1142 gpr_mu_destroy(&pollset_set->mu);
1143 for (i = 0; i < pollset_set->fd_count; i++) {
1144 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1145 }
Craig Tiller9e5ac1b2017-02-14 22:25:50 -08001146 for (i = 0; i < pollset_set->pollset_count; i++) {
1147 grpc_pollset *pollset = pollset_set->pollsets[i];
1148 gpr_mu_lock(&pollset->mu);
1149 pollset->pollset_set_count--;
1150 /* check shutdown */
1151 if (pollset->shutting_down && !pollset->called_shutdown &&
1152 !pollset_has_observers(pollset)) {
1153 pollset->called_shutdown = 1;
1154 gpr_mu_unlock(&pollset->mu);
1155 finish_shutdown(exec_ctx, pollset);
1156 } else {
1157 gpr_mu_unlock(&pollset->mu);
1158 }
1159 }
Craig Tiller253bd502016-02-25 12:30:23 -08001160 gpr_free(pollset_set->pollsets);
1161 gpr_free(pollset_set->pollset_sets);
1162 gpr_free(pollset_set->fds);
1163 gpr_free(pollset_set);
1164}
1165
1166static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
1167 grpc_pollset_set *pollset_set,
1168 grpc_pollset *pollset) {
1169 size_t i, j;
Craig Tiller22543592017-02-14 10:29:36 -08001170 gpr_mu_lock(&pollset->mu);
1171 pollset->pollset_set_count++;
1172 gpr_mu_unlock(&pollset->mu);
Craig Tiller253bd502016-02-25 12:30:23 -08001173 gpr_mu_lock(&pollset_set->mu);
1174 if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
1175 pollset_set->pollset_capacity =
1176 GPR_MAX(8, 2 * pollset_set->pollset_capacity);
Craig Tillercea72a02017-02-17 07:04:21 -08001177 pollset_set->pollsets =
1178 gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity *
1179 sizeof(*pollset_set->pollsets));
Craig Tiller253bd502016-02-25 12:30:23 -08001180 }
1181 pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
1182 for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
1183 if (fd_is_orphaned(pollset_set->fds[i])) {
1184 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1185 } else {
1186 pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
1187 pollset_set->fds[j++] = pollset_set->fds[i];
1188 }
1189 }
1190 pollset_set->fd_count = j;
1191 gpr_mu_unlock(&pollset_set->mu);
1192}
1193
1194static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
1195 grpc_pollset_set *pollset_set,
1196 grpc_pollset *pollset) {
1197 size_t i;
1198 gpr_mu_lock(&pollset_set->mu);
1199 for (i = 0; i < pollset_set->pollset_count; i++) {
1200 if (pollset_set->pollsets[i] == pollset) {
1201 pollset_set->pollset_count--;
1202 GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i],
1203 pollset_set->pollsets[pollset_set->pollset_count]);
1204 break;
1205 }
1206 }
1207 gpr_mu_unlock(&pollset_set->mu);
Craig Tiller22543592017-02-14 10:29:36 -08001208 gpr_mu_lock(&pollset->mu);
1209 pollset->pollset_set_count--;
1210 /* check shutdown */
1211 if (pollset->shutting_down && !pollset->called_shutdown &&
1212 !pollset_has_observers(pollset)) {
1213 pollset->called_shutdown = 1;
1214 gpr_mu_unlock(&pollset->mu);
1215 finish_shutdown(exec_ctx, pollset);
1216 } else {
1217 gpr_mu_unlock(&pollset->mu);
1218 }
Craig Tiller253bd502016-02-25 12:30:23 -08001219}
1220
1221static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
1222 grpc_pollset_set *bag,
1223 grpc_pollset_set *item) {
1224 size_t i, j;
1225 gpr_mu_lock(&bag->mu);
1226 if (bag->pollset_set_count == bag->pollset_set_capacity) {
1227 bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
1228 bag->pollset_sets =
1229 gpr_realloc(bag->pollset_sets,
1230 bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
1231 }
1232 bag->pollset_sets[bag->pollset_set_count++] = item;
1233 for (i = 0, j = 0; i < bag->fd_count; i++) {
1234 if (fd_is_orphaned(bag->fds[i])) {
1235 GRPC_FD_UNREF(bag->fds[i], "pollset_set");
1236 } else {
1237 pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
1238 bag->fds[j++] = bag->fds[i];
1239 }
1240 }
1241 bag->fd_count = j;
1242 gpr_mu_unlock(&bag->mu);
1243}
1244
1245static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
1246 grpc_pollset_set *bag,
1247 grpc_pollset_set *item) {
1248 size_t i;
1249 gpr_mu_lock(&bag->mu);
1250 for (i = 0; i < bag->pollset_set_count; i++) {
1251 if (bag->pollset_sets[i] == item) {
1252 bag->pollset_set_count--;
1253 GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i],
1254 bag->pollset_sets[bag->pollset_set_count]);
1255 break;
1256 }
1257 }
1258 gpr_mu_unlock(&bag->mu);
1259}
1260
1261static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
1262 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1263 size_t i;
1264 gpr_mu_lock(&pollset_set->mu);
1265 if (pollset_set->fd_count == pollset_set->fd_capacity) {
1266 pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
1267 pollset_set->fds = gpr_realloc(
1268 pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds));
1269 }
1270 GRPC_FD_REF(fd, "pollset_set");
1271 pollset_set->fds[pollset_set->fd_count++] = fd;
1272 for (i = 0; i < pollset_set->pollset_count; i++) {
1273 pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd);
1274 }
1275 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1276 pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1277 }
1278 gpr_mu_unlock(&pollset_set->mu);
1279}
1280
1281static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
1282 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1283 size_t i;
1284 gpr_mu_lock(&pollset_set->mu);
1285 for (i = 0; i < pollset_set->fd_count; i++) {
1286 if (pollset_set->fds[i] == fd) {
1287 pollset_set->fd_count--;
1288 GPR_SWAP(grpc_fd *, pollset_set->fds[i],
1289 pollset_set->fds[pollset_set->fd_count]);
1290 GRPC_FD_UNREF(fd, "pollset_set");
1291 break;
1292 }
1293 }
1294 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1295 pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1296 }
1297 gpr_mu_unlock(&pollset_set->mu);
1298}
1299
1300/*******************************************************************************
Craig Tillerd8a3c042016-09-09 12:42:37 -07001301 * workqueue stubs
1302 */
1303
1304#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
1305static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
1306 const char *file, int line,
1307 const char *reason) {
1308 return workqueue;
1309}
1310static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
1311 const char *file, int line, const char *reason) {}
1312#else
1313static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
1314 return workqueue;
1315}
1316static void workqueue_unref(grpc_exec_ctx *exec_ctx,
1317 grpc_workqueue *workqueue) {}
1318#endif
1319
Craig Tiller91031da2016-12-28 15:44:25 -08001320static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) {
1321 return grpc_schedule_on_exec_ctx;
Craig Tillerd8a3c042016-09-09 12:42:37 -07001322}
1323
1324/*******************************************************************************
Ken Payson82e4ec72016-10-13 12:26:01 -07001325 * Condition Variable polling extensions
1326 */
1327
1328static void decref_poll_args(poll_args *args) {
1329 if (gpr_unref(&args->refcount)) {
1330 gpr_free(args->fds);
1331 gpr_cv_destroy(args->cv);
1332 gpr_free(args->cv);
1333 gpr_free(args);
1334 }
1335}
1336
1337// Poll in a background thread
1338static void run_poll(void *arg) {
1339 int timeout, retval;
1340 poll_args *pargs = (poll_args *)arg;
Ken Paysonb97cbd72016-10-13 15:06:01 -07001341 while (gpr_atm_no_barrier_load(&pargs->status) == INPROGRESS) {
Ken Payson82e4ec72016-10-13 12:26:01 -07001342 if (pargs->timeout < 0) {
1343 timeout = CV_POLL_PERIOD_MS;
1344 } else {
1345 timeout = GPR_MIN(CV_POLL_PERIOD_MS, pargs->timeout);
1346 pargs->timeout -= timeout;
1347 }
1348 retval = g_cvfds.poll(pargs->fds, pargs->nfds, timeout);
1349 if (retval != 0 || pargs->timeout == 0) {
1350 pargs->retval = retval;
1351 pargs->err = errno;
1352 break;
1353 }
1354 }
1355 gpr_mu_lock(&g_cvfds.mu);
Ken Paysonb97cbd72016-10-13 15:06:01 -07001356 if (gpr_atm_no_barrier_load(&pargs->status) == INPROGRESS) {
Ken Payson82e4ec72016-10-13 12:26:01 -07001357 // Signal main thread that the poll completed
Ken Paysonb97cbd72016-10-13 15:06:01 -07001358 gpr_atm_no_barrier_store(&pargs->status, COMPLETED);
Ken Payson82e4ec72016-10-13 12:26:01 -07001359 gpr_cv_signal(pargs->cv);
1360 }
1361 decref_poll_args(pargs);
1362 g_cvfds.pollcount--;
1363 if (g_cvfds.shutdown && g_cvfds.pollcount == 0) {
1364 gpr_cv_signal(&g_cvfds.shutdown_complete);
1365 }
1366 gpr_mu_unlock(&g_cvfds.mu);
1367}
1368
1369// This function overrides poll() to handle condition variable wakeup fds
1370static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) {
1371 unsigned int i;
1372 int res, idx;
1373 gpr_cv *pollcv;
1374 cv_node *cvn, *prev;
Ken Payson42909c52016-11-06 20:06:12 -08001375 int skip_poll = 0;
Ken Payson82e4ec72016-10-13 12:26:01 -07001376 nfds_t nsockfds = 0;
1377 gpr_thd_id t_id;
1378 gpr_thd_options opt;
1379 poll_args *pargs = NULL;
1380 gpr_mu_lock(&g_cvfds.mu);
1381 pollcv = gpr_malloc(sizeof(gpr_cv));
1382 gpr_cv_init(pollcv);
1383 for (i = 0; i < nfds; i++) {
1384 fds[i].revents = 0;
1385 if (fds[i].fd < 0 && (fds[i].events & POLLIN)) {
1386 idx = FD_TO_IDX(fds[i].fd);
1387 cvn = gpr_malloc(sizeof(cv_node));
1388 cvn->cv = pollcv;
1389 cvn->next = g_cvfds.cvfds[idx].cvs;
1390 g_cvfds.cvfds[idx].cvs = cvn;
Ken Payson42909c52016-11-06 20:06:12 -08001391 // Don't bother polling if a wakeup fd is ready
Ken Payson82e4ec72016-10-13 12:26:01 -07001392 if (g_cvfds.cvfds[idx].is_set) {
yang-gd255a722016-11-23 13:10:44 -08001393 skip_poll = 1;
Ken Payson82e4ec72016-10-13 12:26:01 -07001394 }
1395 } else if (fds[i].fd >= 0) {
1396 nsockfds++;
1397 }
1398 }
1399
Ken Payson42909c52016-11-06 20:06:12 -08001400 res = 0;
1401 if (!skip_poll && nsockfds > 0) {
Ken Payson82e4ec72016-10-13 12:26:01 -07001402 pargs = gpr_malloc(sizeof(struct poll_args));
1403 // Both the main thread and calling thread get a reference
1404 gpr_ref_init(&pargs->refcount, 2);
1405 pargs->cv = pollcv;
1406 pargs->fds = gpr_malloc(sizeof(struct pollfd) * nsockfds);
1407 pargs->nfds = nsockfds;
1408 pargs->timeout = timeout;
1409 pargs->retval = 0;
1410 pargs->err = 0;
Ken Paysonb97cbd72016-10-13 15:06:01 -07001411 gpr_atm_no_barrier_store(&pargs->status, INPROGRESS);
Ken Payson82e4ec72016-10-13 12:26:01 -07001412 idx = 0;
1413 for (i = 0; i < nfds; i++) {
1414 if (fds[i].fd >= 0) {
1415 pargs->fds[idx].fd = fds[i].fd;
1416 pargs->fds[idx].events = fds[i].events;
1417 pargs->fds[idx].revents = 0;
1418 idx++;
1419 }
1420 }
1421 g_cvfds.pollcount++;
1422 opt = gpr_thd_options_default();
1423 gpr_thd_options_set_detached(&opt);
1424 gpr_thd_new(&t_id, &run_poll, pargs, &opt);
1425 // We want the poll() thread to trigger the deadline, so wait forever here
1426 gpr_cv_wait(pollcv, &g_cvfds.mu, gpr_inf_future(GPR_CLOCK_MONOTONIC));
Ken Paysonb97cbd72016-10-13 15:06:01 -07001427 if (gpr_atm_no_barrier_load(&pargs->status) == COMPLETED) {
Ken Payson82e4ec72016-10-13 12:26:01 -07001428 res = pargs->retval;
1429 errno = pargs->err;
1430 } else {
Ken Payson82e4ec72016-10-13 12:26:01 -07001431 errno = 0;
Ken Paysonb97cbd72016-10-13 15:06:01 -07001432 gpr_atm_no_barrier_store(&pargs->status, CANCELLED);
Ken Payson82e4ec72016-10-13 12:26:01 -07001433 }
Ken Payson42909c52016-11-06 20:06:12 -08001434 } else if (!skip_poll) {
Ken Payson82e4ec72016-10-13 12:26:01 -07001435 gpr_timespec deadline = gpr_now(GPR_CLOCK_REALTIME);
1436 deadline =
1437 gpr_time_add(deadline, gpr_time_from_millis(timeout, GPR_TIMESPAN));
1438 gpr_cv_wait(pollcv, &g_cvfds.mu, deadline);
Ken Payson82e4ec72016-10-13 12:26:01 -07001439 }
1440
1441 idx = 0;
1442 for (i = 0; i < nfds; i++) {
1443 if (fds[i].fd < 0 && (fds[i].events & POLLIN)) {
1444 cvn = g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs;
1445 prev = NULL;
1446 while (cvn->cv != pollcv) {
1447 prev = cvn;
1448 cvn = cvn->next;
1449 GPR_ASSERT(cvn);
1450 }
1451 if (!prev) {
1452 g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs = cvn->next;
1453 } else {
1454 prev->next = cvn->next;
1455 }
1456 gpr_free(cvn);
1457
1458 if (g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].is_set) {
1459 fds[i].revents = POLLIN;
1460 if (res >= 0) res++;
1461 }
Ken Payson42909c52016-11-06 20:06:12 -08001462 } else if (!skip_poll && fds[i].fd >= 0 &&
Ken Paysonb97cbd72016-10-13 15:06:01 -07001463 gpr_atm_no_barrier_load(&pargs->status) == COMPLETED) {
Ken Payson82e4ec72016-10-13 12:26:01 -07001464 fds[i].revents = pargs->fds[idx].revents;
1465 idx++;
1466 }
1467 }
1468
1469 if (pargs) {
1470 decref_poll_args(pargs);
1471 } else {
1472 gpr_cv_destroy(pollcv);
1473 gpr_free(pollcv);
1474 }
1475 gpr_mu_unlock(&g_cvfds.mu);
1476
1477 return res;
1478}
1479
1480static void global_cv_fd_table_init() {
1481 gpr_mu_init(&g_cvfds.mu);
1482 gpr_mu_lock(&g_cvfds.mu);
1483 gpr_cv_init(&g_cvfds.shutdown_complete);
1484 g_cvfds.shutdown = 0;
1485 g_cvfds.pollcount = 0;
1486 g_cvfds.size = CV_DEFAULT_TABLE_SIZE;
1487 g_cvfds.cvfds = gpr_malloc(sizeof(fd_node) * CV_DEFAULT_TABLE_SIZE);
1488 g_cvfds.free_fds = NULL;
1489 for (int i = 0; i < CV_DEFAULT_TABLE_SIZE; i++) {
1490 g_cvfds.cvfds[i].is_set = 0;
1491 g_cvfds.cvfds[i].cvs = NULL;
1492 g_cvfds.cvfds[i].next_free = g_cvfds.free_fds;
1493 g_cvfds.free_fds = &g_cvfds.cvfds[i];
1494 }
1495 // Override the poll function with one that supports cvfds
1496 g_cvfds.poll = grpc_poll_function;
1497 grpc_poll_function = &cvfd_poll;
1498 gpr_mu_unlock(&g_cvfds.mu);
1499}
1500
1501static void global_cv_fd_table_shutdown() {
1502 gpr_mu_lock(&g_cvfds.mu);
1503 g_cvfds.shutdown = 1;
1504 // Attempt to wait for all abandoned poll() threads to terminate
1505 // Not doing so will result in reported memory leaks
1506 if (g_cvfds.pollcount > 0) {
1507 int res = gpr_cv_wait(&g_cvfds.shutdown_complete, &g_cvfds.mu,
1508 gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
1509 gpr_time_from_seconds(3, GPR_TIMESPAN)));
1510 GPR_ASSERT(res == 0);
1511 }
1512 gpr_cv_destroy(&g_cvfds.shutdown_complete);
1513 grpc_poll_function = g_cvfds.poll;
1514 gpr_free(g_cvfds.cvfds);
1515 gpr_mu_unlock(&g_cvfds.mu);
1516 gpr_mu_destroy(&g_cvfds.mu);
1517}
1518
1519/*******************************************************************************
Craig Tiller253bd502016-02-25 12:30:23 -08001520 * event engine binding
1521 */
1522
Ken Payson82e4ec72016-10-13 12:26:01 -07001523static void shutdown_engine(void) {
1524 pollset_global_shutdown();
1525 if (grpc_cv_wakeup_fds_enabled()) {
1526 global_cv_fd_table_shutdown();
1527 }
1528}
Craig Tiller253bd502016-02-25 12:30:23 -08001529
1530static const grpc_event_engine_vtable vtable = {
1531 .pollset_size = sizeof(grpc_pollset),
1532
1533 .fd_create = fd_create,
1534 .fd_wrapped_fd = fd_wrapped_fd,
1535 .fd_orphan = fd_orphan,
1536 .fd_shutdown = fd_shutdown,
Craig Tiller52f23122016-06-15 09:34:14 -07001537 .fd_is_shutdown = fd_is_shutdown,
Craig Tiller253bd502016-02-25 12:30:23 -08001538 .fd_notify_on_read = fd_notify_on_read,
1539 .fd_notify_on_write = fd_notify_on_write,
Craig Tillerb1d3b362016-05-14 13:20:21 -07001540 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
Craig Tiller70bd4832016-06-30 14:20:46 -07001541 .fd_get_workqueue = fd_get_workqueue,
Craig Tiller253bd502016-02-25 12:30:23 -08001542
1543 .pollset_init = pollset_init,
1544 .pollset_shutdown = pollset_shutdown,
Craig Tiller253bd502016-02-25 12:30:23 -08001545 .pollset_destroy = pollset_destroy,
1546 .pollset_work = pollset_work,
1547 .pollset_kick = pollset_kick,
1548 .pollset_add_fd = pollset_add_fd,
1549
1550 .pollset_set_create = pollset_set_create,
1551 .pollset_set_destroy = pollset_set_destroy,
1552 .pollset_set_add_pollset = pollset_set_add_pollset,
1553 .pollset_set_del_pollset = pollset_set_del_pollset,
1554 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
1555 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
1556 .pollset_set_add_fd = pollset_set_add_fd,
1557 .pollset_set_del_fd = pollset_set_del_fd,
1558
1559 .kick_poller = kick_poller,
1560
Craig Tillerd8a3c042016-09-09 12:42:37 -07001561 .workqueue_ref = workqueue_ref,
1562 .workqueue_unref = workqueue_unref,
Craig Tiller91031da2016-12-28 15:44:25 -08001563 .workqueue_scheduler = workqueue_scheduler,
Craig Tillerd8a3c042016-09-09 12:42:37 -07001564
Craig Tiller253bd502016-02-25 12:30:23 -08001565 .shutdown_engine = shutdown_engine,
1566};
1567
1568const grpc_event_engine_vtable *grpc_init_poll_posix(void) {
Ken Paysoncd7d0472016-10-11 12:24:20 -07001569 if (!grpc_has_wakeup_fd()) {
Ken Paysonbc544be2016-10-06 19:23:47 -07001570 return NULL;
1571 }
Craig Tiller5e3a0ef2016-06-01 10:28:15 -07001572 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1573 return NULL;
1574 }
Craig Tiller253bd502016-02-25 12:30:23 -08001575 return &vtable;
1576}
1577
Ken Payson82e4ec72016-10-13 12:26:01 -07001578const grpc_event_engine_vtable *grpc_init_poll_cv_posix(void) {
1579 global_cv_fd_table_init();
1580 grpc_enable_cv_wakeup_fds(1);
1581 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1582 global_cv_fd_table_shutdown();
1583 grpc_enable_cv_wakeup_fds(0);
1584 return NULL;
1585 }
1586 return &vtable;
1587}
1588
Craig Tiller253bd502016-02-25 12:30:23 -08001589#endif