blob: 836db6b3ceffcbf96cf36d011f670fc9e2840394 [file] [log] [blame]
ctiller58393c22015-01-07 14:03:30 -08001/*
2 *
Craig Tiller084aa622016-04-05 08:36:49 -07003 * Copyright 2015, Google Inc.
ctiller58393c22015-01-07 14:03:30 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Craig Tiller062127e2016-02-22 09:43:36 -080034/* This file will be removed shortly: it's here to keep refactoring
35 * steps simple and auditable.
36 * It's the combination of the old files:
37 * - fd_posix.{h,c}
38 * - pollset_posix.{h,c}
39 * - pullset_multipoller_with_{poll,epoll}.{h,c}
40 * The new version will be split into:
41 * - ev_poll_posix.{h,c}
42 * - ev_epoll_posix.{h,c}
43 */
Craig Tiller5a664462016-02-16 08:07:17 -080044
Craig Tillerd14a1a52015-01-21 15:26:29 -080045#include <grpc/support/port_platform.h>
46
47#ifdef GPR_POSIX_SOCKET
48
Craig Tiller8a034482016-03-28 16:09:04 -070049#include "src/core/lib/iomgr/ev_poll_and_epoll_posix.h"
ctiller58393c22015-01-07 14:03:30 -080050
51#include <assert.h>
Craig Tiller1a969c82016-02-22 12:13:16 -080052#include <errno.h>
53#include <poll.h>
54#include <string.h>
David Klempnerc6bccc22015-02-24 17:33:05 -080055#include <sys/socket.h>
ctiller58393c22015-01-07 14:03:30 -080056#include <unistd.h>
57
ctiller58393c22015-01-07 14:03:30 -080058#include <grpc/support/alloc.h>
59#include <grpc/support/log.h>
Craig Tiller50ec2672015-11-27 21:45:11 -080060#include <grpc/support/string_util.h>
Craig Tiller1a969c82016-02-22 12:13:16 -080061#include <grpc/support/tls.h>
ctiller58393c22015-01-07 14:03:30 -080062#include <grpc/support/useful.h>
63
Craig Tiller8a034482016-03-28 16:09:04 -070064#include "src/core/lib/iomgr/iomgr_internal.h"
65#include "src/core/lib/iomgr/wakeup_fd_posix.h"
66#include "src/core/lib/profiling/timers.h"
67#include "src/core/lib/support/block_annotate.h"
Craig Tiller1a969c82016-02-22 12:13:16 -080068
69/*******************************************************************************
70 * FD declarations
71 */
72
Craig Tiller5a664462016-02-16 08:07:17 -080073typedef struct grpc_fd_watcher {
74 struct grpc_fd_watcher *next;
75 struct grpc_fd_watcher *prev;
76 grpc_pollset *pollset;
77 grpc_pollset_worker *worker;
78 grpc_fd *fd;
79} grpc_fd_watcher;
80
81struct grpc_fd {
82 int fd;
83 /* refst format:
84 bit0: 1=active/0=orphaned
85 bit1-n: refcount
86 meaning that mostly we ref by two to avoid altering the orphaned bit,
87 and just unref by 1 when we're ready to flag the object as orphaned */
88 gpr_atm refst;
89
90 gpr_mu mu;
91 int shutdown;
92 int closed;
93 int released;
94
95 /* The watcher list.
96
97 The following watcher related fields are protected by watcher_mu.
98
99 An fd_watcher is an ephemeral object created when an fd wants to
100 begin polling, and destroyed after the poll.
101
102 It denotes the fd's interest in whether to read poll or write poll
103 or both or neither on this fd.
104
105 If a watcher is asked to poll for reads or writes, the read_watcher
106 or write_watcher fields are set respectively. A watcher may be asked
107 to poll for both, in which case both fields will be set.
108
109 read_watcher and write_watcher may be NULL if no watcher has been
110 asked to poll for reads or writes.
111
112 If an fd_watcher is not asked to poll for reads or writes, it's added
113 to a linked list of inactive watchers, rooted at inactive_watcher_root.
114 If at a later time there becomes need of a poller to poll, one of
115 the inactive pollers may be kicked out of their poll loops to take
116 that responsibility. */
117 grpc_fd_watcher inactive_watcher_root;
118 grpc_fd_watcher *read_watcher;
119 grpc_fd_watcher *write_watcher;
120
121 grpc_closure *read_closure;
122 grpc_closure *write_closure;
123
124 struct grpc_fd *freelist_next;
125
126 grpc_closure *on_done_closure;
127
128 grpc_iomgr_object iomgr_object;
129};
130
131/* Begin polling on an fd.
132 Registers that the given pollset is interested in this fd - so that if read
133 or writability interest changes, the pollset can be kicked to pick up that
134 new interest.
135 Return value is:
136 (fd_needs_read? read_mask : 0) | (fd_needs_write? write_mask : 0)
137 i.e. a combination of read_mask and write_mask determined by the fd's current
138 interest in said events.
139 Polling strategies that do not need to alter their behavior depending on the
140 fd's current interest (such as epoll) do not need to call this function.
141 MUST NOT be called with a pollset lock taken */
Craig Tillera75d18a2016-02-25 08:45:00 -0800142static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
143 grpc_pollset_worker *worker, uint32_t read_mask,
144 uint32_t write_mask, grpc_fd_watcher *rec);
145/* Complete polling previously started with fd_begin_poll
Craig Tiller5a664462016-02-16 08:07:17 -0800146 MUST NOT be called with a pollset lock taken
147 if got_read or got_write are 1, also does the become_{readable,writable} as
148 appropriate. */
Craig Tillera75d18a2016-02-25 08:45:00 -0800149static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *rec,
150 int got_read, int got_write);
Craig Tiller5a664462016-02-16 08:07:17 -0800151
152/* Return 1 if this fd is orphaned, 0 otherwise */
Craig Tillera75d18a2016-02-25 08:45:00 -0800153static bool fd_is_orphaned(grpc_fd *fd);
Craig Tiller5a664462016-02-16 08:07:17 -0800154
Craig Tiller5a664462016-02-16 08:07:17 -0800155/* Reference counting for fds */
156/*#define GRPC_FD_REF_COUNT_DEBUG*/
157#ifdef GRPC_FD_REF_COUNT_DEBUG
Craig Tillera75d18a2016-02-25 08:45:00 -0800158static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
159static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
160 int line);
161#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
162#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
Craig Tiller5a664462016-02-16 08:07:17 -0800163#else
Craig Tillera75d18a2016-02-25 08:45:00 -0800164static void fd_ref(grpc_fd *fd);
165static void fd_unref(grpc_fd *fd);
166#define GRPC_FD_REF(fd, reason) fd_ref(fd)
167#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
Craig Tiller5a664462016-02-16 08:07:17 -0800168#endif
169
Craig Tillera75d18a2016-02-25 08:45:00 -0800170static void fd_global_init(void);
171static void fd_global_shutdown(void);
Craig Tiller5a664462016-02-16 08:07:17 -0800172
Craig Tiller57f79d62015-10-02 14:00:12 -0700173#define CLOSURE_NOT_READY ((grpc_closure *)0)
174#define CLOSURE_READY ((grpc_closure *)1)
ctiller58393c22015-01-07 14:03:30 -0800175
Craig Tiller1a969c82016-02-22 12:13:16 -0800176/*******************************************************************************
177 * pollset declarations
178 */
179
180typedef struct grpc_pollset_vtable grpc_pollset_vtable;
181
182typedef struct grpc_cached_wakeup_fd {
183 grpc_wakeup_fd fd;
184 struct grpc_cached_wakeup_fd *next;
185} grpc_cached_wakeup_fd;
186
187struct grpc_pollset_worker {
188 grpc_cached_wakeup_fd *wakeup_fd;
189 int reevaluate_polling_on_wakeup;
190 int kicked_specifically;
191 struct grpc_pollset_worker *next;
192 struct grpc_pollset_worker *prev;
193};
194
195struct grpc_pollset {
196 /* pollsets under posix can mutate representation as fds are added and
197 removed.
198 For example, we may choose a poll() based implementation on linux for
199 few fds, and an epoll() based implementation for many fds */
200 const grpc_pollset_vtable *vtable;
Craig Tiller7f2cdc32016-02-25 16:02:11 -0800201 gpr_mu mu;
Craig Tiller1a969c82016-02-22 12:13:16 -0800202 grpc_pollset_worker root_worker;
203 int in_flight_cbs;
204 int shutting_down;
205 int called_shutdown;
206 int kicked_without_pollers;
207 grpc_closure *shutdown_done;
208 grpc_closure_list idle_jobs;
209 union {
210 int fd;
211 void *ptr;
212 } data;
213 /* Local cache of eventfds for workers */
214 grpc_cached_wakeup_fd *local_wakeup_cache;
215};
216
217struct grpc_pollset_vtable {
218 void (*add_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
219 struct grpc_fd *fd, int and_unlock_pollset);
Craig Tiller4f1d0f32016-05-06 17:12:37 -0700220 grpc_error *(*maybe_work_and_unlock)(grpc_exec_ctx *exec_ctx,
221 grpc_pollset *pollset,
222 grpc_pollset_worker *worker,
223 gpr_timespec deadline, gpr_timespec now);
Craig Tiller1a969c82016-02-22 12:13:16 -0800224 void (*finish_shutdown)(grpc_pollset *pollset);
225 void (*destroy)(grpc_pollset *pollset);
226};
227
228/* Add an fd to a pollset */
Craig Tillera75d18a2016-02-25 08:45:00 -0800229static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
230 struct grpc_fd *fd);
Craig Tiller1a969c82016-02-22 12:13:16 -0800231
Craig Tillera75d18a2016-02-25 08:45:00 -0800232static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
233 grpc_pollset_set *pollset_set, grpc_fd *fd);
Craig Tiller1a969c82016-02-22 12:13:16 -0800234
235/* Convert a timespec to milliseconds:
236 - very small or negative poll times are clamped to zero to do a
237 non-blocking poll (which becomes spin polling)
238 - other small values are rounded up to one millisecond
239 - longer than a millisecond polls are rounded up to the next nearest
240 millisecond to avoid spinning
241 - infinite timeouts are converted to -1 */
Craig Tillera75d18a2016-02-25 08:45:00 -0800242static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
243 gpr_timespec now);
Craig Tiller1a969c82016-02-22 12:13:16 -0800244
245/* Allow kick to wakeup the currently polling worker */
246#define GRPC_POLLSET_CAN_KICK_SELF 1
247/* Force the wakee to repoll when awoken */
248#define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2
Craig Tillera75d18a2016-02-25 08:45:00 -0800249/* As per pollset_kick, with an extended set of flags (defined above)
Craig Tiller1a969c82016-02-22 12:13:16 -0800250 -- mostly for fd_posix's use. */
Craig Tiller4f1d0f32016-05-06 17:12:37 -0700251static grpc_error *pollset_kick_ext(grpc_pollset *p,
252 grpc_pollset_worker *specific_worker,
253 uint32_t flags) GRPC_MUST_USE_RESULT;
Craig Tiller1a969c82016-02-22 12:13:16 -0800254
255/* turn a pollset into a multipoller: platform specific */
Craig Tillera75d18a2016-02-25 08:45:00 -0800256typedef void (*platform_become_multipoller_type)(grpc_exec_ctx *exec_ctx,
257 grpc_pollset *pollset,
258 struct grpc_fd **fds,
259 size_t fd_count);
260static platform_become_multipoller_type platform_become_multipoller;
Craig Tiller1a969c82016-02-22 12:13:16 -0800261
Craig Tillera75d18a2016-02-25 08:45:00 -0800262/* Return 1 if the pollset has active threads in pollset_work (pollset must
Craig Tiller1a969c82016-02-22 12:13:16 -0800263 * be locked) */
Craig Tillera75d18a2016-02-25 08:45:00 -0800264static int pollset_has_workers(grpc_pollset *pollset);
Craig Tiller1a969c82016-02-22 12:13:16 -0800265
Craig Tillera75d18a2016-02-25 08:45:00 -0800266static void remove_fd_from_all_epoll_sets(int fd);
Craig Tiller1a969c82016-02-22 12:13:16 -0800267
Craig Tiller1a969c82016-02-22 12:13:16 -0800268/*******************************************************************************
Craig Tiller9c2ad372016-02-25 07:26:01 -0800269 * pollset_set definitions
270 */
271
272struct grpc_pollset_set {
273 gpr_mu mu;
274
275 size_t pollset_count;
276 size_t pollset_capacity;
277 grpc_pollset **pollsets;
278
279 size_t pollset_set_count;
280 size_t pollset_set_capacity;
281 struct grpc_pollset_set **pollset_sets;
282
283 size_t fd_count;
284 size_t fd_capacity;
285 grpc_fd **fds;
286};
287
288/*******************************************************************************
Craig Tiller1a969c82016-02-22 12:13:16 -0800289 * fd_posix.c
290 */
291
David Klempnerd1785242015-01-28 17:00:21 -0800292/* We need to keep a freelist not because of any concerns of malloc performance
293 * but instead so that implementations with multiple threads in (for example)
294 * epoll_wait deal with the race between pollset removal and incoming poll
295 * notifications.
296 *
297 * The problem is that the poller ultimately holds a reference to this
298 * object, so it is very difficult to know when is safe to free it, at least
299 * without some expensive synchronization.
300 *
301 * If we keep the object freelisted, in the worst case losing this race just
302 * becomes a spurious read notification on a reused fd.
303 */
304/* TODO(klempner): We could use some form of polling generation count to know
305 * when these are safe to free. */
306/* TODO(klempner): Consider disabling freelisting if we don't have multiple
307 * threads in poll on the same fd */
308/* TODO(klempner): Batch these allocations to reduce fragmentation */
309static grpc_fd *fd_freelist = NULL;
310static gpr_mu fd_freelist_mu;
311
Craig Tillera82950e2015-09-22 12:33:20 -0700312static void freelist_fd(grpc_fd *fd) {
313 gpr_mu_lock(&fd_freelist_mu);
David Klempnerd1785242015-01-28 17:00:21 -0800314 fd->freelist_next = fd_freelist;
315 fd_freelist = fd;
Craig Tillera82950e2015-09-22 12:33:20 -0700316 grpc_iomgr_unregister_object(&fd->iomgr_object);
317 gpr_mu_unlock(&fd_freelist_mu);
David Klempnerd1785242015-01-28 17:00:21 -0800318}
319
Craig Tillera82950e2015-09-22 12:33:20 -0700320static grpc_fd *alloc_fd(int fd) {
David Klempnerd1785242015-01-28 17:00:21 -0800321 grpc_fd *r = NULL;
Craig Tillera82950e2015-09-22 12:33:20 -0700322 gpr_mu_lock(&fd_freelist_mu);
323 if (fd_freelist != NULL) {
324 r = fd_freelist;
325 fd_freelist = fd_freelist->freelist_next;
326 }
327 gpr_mu_unlock(&fd_freelist_mu);
328 if (r == NULL) {
329 r = gpr_malloc(sizeof(grpc_fd));
Craig Tiller58d05a62015-10-02 13:59:31 -0700330 gpr_mu_init(&r->mu);
Craig Tillera82950e2015-09-22 12:33:20 -0700331 }
David Garcia Quintas5f228f52015-05-26 19:58:50 -0700332
Craig Tillerafbae1f2016-03-28 22:59:51 -0700333 gpr_mu_lock(&r->mu);
Craig Tillera82950e2015-09-22 12:33:20 -0700334 gpr_atm_rel_store(&r->refst, 1);
Craig Tiller58d05a62015-10-02 13:59:31 -0700335 r->shutdown = 0;
336 r->read_closure = CLOSURE_NOT_READY;
337 r->write_closure = CLOSURE_NOT_READY;
David Klempnerd1785242015-01-28 17:00:21 -0800338 r->fd = fd;
Craig Tillera82950e2015-09-22 12:33:20 -0700339 r->inactive_watcher_root.next = r->inactive_watcher_root.prev =
340 &r->inactive_watcher_root;
David Klempnerd1785242015-01-28 17:00:21 -0800341 r->freelist_next = NULL;
Craig Tiller886d7ec2015-05-14 16:18:42 -0700342 r->read_watcher = r->write_watcher = NULL;
Craig Tiller8b6cb8d2015-06-26 08:08:35 -0700343 r->on_done_closure = NULL;
Craig Tiller0613e582015-07-30 11:55:43 -0700344 r->closed = 0;
yang-g8fefe372016-01-07 16:24:55 -0800345 r->released = 0;
Craig Tillerafbae1f2016-03-28 22:59:51 -0700346 gpr_mu_unlock(&r->mu);
David Klempnerd1785242015-01-28 17:00:21 -0800347 return r;
348}
349
Craig Tillera82950e2015-09-22 12:33:20 -0700350static void destroy(grpc_fd *fd) {
Craig Tiller58d05a62015-10-02 13:59:31 -0700351 gpr_mu_destroy(&fd->mu);
Craig Tillera82950e2015-09-22 12:33:20 -0700352 gpr_free(fd);
ctiller58393c22015-01-07 14:03:30 -0800353}
354
Craig Tiller9ae76972015-05-31 13:58:24 -0700355#ifdef GRPC_FD_REF_COUNT_DEBUG
356#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
357#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
Craig Tillera82950e2015-09-22 12:33:20 -0700358static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
359 int line) {
360 gpr_log(GPR_DEBUG, "FD %d %p ref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
361 gpr_atm_no_barrier_load(&fd->refst),
362 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
Craig Tiller9ae76972015-05-31 13:58:24 -0700363#else
364#define REF_BY(fd, n, reason) ref_by(fd, n)
365#define UNREF_BY(fd, n, reason) unref_by(fd, n)
Craig Tillera82950e2015-09-22 12:33:20 -0700366static void ref_by(grpc_fd *fd, int n) {
Craig Tiller9ae76972015-05-31 13:58:24 -0700367#endif
Craig Tillera82950e2015-09-22 12:33:20 -0700368 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
ctiller58393c22015-01-07 14:03:30 -0800369}
370
Craig Tiller9ae76972015-05-31 13:58:24 -0700371#ifdef GRPC_FD_REF_COUNT_DEBUG
Craig Tillera82950e2015-09-22 12:33:20 -0700372static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
373 int line) {
Craig Tiller9ae76972015-05-31 13:58:24 -0700374 gpr_atm old;
Craig Tillera82950e2015-09-22 12:33:20 -0700375 gpr_log(GPR_DEBUG, "FD %d %p unref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
376 gpr_atm_no_barrier_load(&fd->refst),
377 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
Craig Tiller9ae76972015-05-31 13:58:24 -0700378#else
Craig Tillera82950e2015-09-22 12:33:20 -0700379static void unref_by(grpc_fd *fd, int n) {
Craig Tiller9ae76972015-05-31 13:58:24 -0700380 gpr_atm old;
381#endif
Craig Tillera82950e2015-09-22 12:33:20 -0700382 old = gpr_atm_full_fetch_add(&fd->refst, -n);
383 if (old == n) {
384 freelist_fd(fd);
385 } else {
386 GPR_ASSERT(old > n);
387 }
David Klempnerd1785242015-01-28 17:00:21 -0800388}
389
Craig Tillera75d18a2016-02-25 08:45:00 -0800390static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
Craig Tillera82950e2015-09-22 12:33:20 -0700391
Craig Tillera75d18a2016-02-25 08:45:00 -0800392static void fd_global_shutdown(void) {
Craig Tillera82950e2015-09-22 12:33:20 -0700393 gpr_mu_lock(&fd_freelist_mu);
394 gpr_mu_unlock(&fd_freelist_mu);
395 while (fd_freelist != NULL) {
396 grpc_fd *fd = fd_freelist;
397 fd_freelist = fd_freelist->freelist_next;
398 destroy(fd);
399 }
400 gpr_mu_destroy(&fd_freelist_mu);
ctiller58393c22015-01-07 14:03:30 -0800401}
402
Craig Tillera75d18a2016-02-25 08:45:00 -0800403static grpc_fd *fd_create(int fd, const char *name) {
Craig Tillera82950e2015-09-22 12:33:20 -0700404 grpc_fd *r = alloc_fd(fd);
Craig Tiller50ec2672015-11-27 21:45:11 -0800405 char *name2;
406 gpr_asprintf(&name2, "%s fd=%d", name, fd);
407 grpc_iomgr_register_object(&r->iomgr_object, name2);
408 gpr_free(name2);
Craig Tiller84750472015-09-23 12:56:45 -0700409#ifdef GRPC_FD_REF_COUNT_DEBUG
410 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, r, name);
411#endif
ctiller58393c22015-01-07 14:03:30 -0800412 return r;
413}
414
Craig Tillera75d18a2016-02-25 08:45:00 -0800415static bool fd_is_orphaned(grpc_fd *fd) {
Craig Tillera82950e2015-09-22 12:33:20 -0700416 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
ctiller58393c22015-01-07 14:03:30 -0800417}
418
Craig Tiller4f1d0f32016-05-06 17:12:37 -0700419static grpc_error *pollset_kick_locked(grpc_fd_watcher *watcher) {
Craig Tiller7f2cdc32016-02-25 16:02:11 -0800420 gpr_mu_lock(&watcher->pollset->mu);
Craig Tiller58d05a62015-10-02 13:59:31 -0700421 GPR_ASSERT(watcher->worker);
Craig Tiller4f1d0f32016-05-06 17:12:37 -0700422 grpc_error *err = pollset_kick_ext(watcher->pollset, watcher->worker,
423 GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP);
Craig Tiller7f2cdc32016-02-25 16:02:11 -0800424 gpr_mu_unlock(&watcher->pollset->mu);
Craig Tiller4f1d0f32016-05-06 17:12:37 -0700425 return err;
Craig Tiller5ddbb9d2015-07-29 15:58:11 -0700426}
427
Craig Tillera82950e2015-09-22 12:33:20 -0700428static void maybe_wake_one_watcher_locked(grpc_fd *fd) {
429 if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) {
Craig Tiller1270b2b2015-10-02 16:12:25 -0700430 pollset_kick_locked(fd->inactive_watcher_root.next);
Craig Tillera82950e2015-09-22 12:33:20 -0700431 } else if (fd->read_watcher) {
Craig Tiller1270b2b2015-10-02 16:12:25 -0700432 pollset_kick_locked(fd->read_watcher);
Craig Tillera82950e2015-09-22 12:33:20 -0700433 } else if (fd->write_watcher) {
Craig Tiller1270b2b2015-10-02 16:12:25 -0700434 pollset_kick_locked(fd->write_watcher);
Craig Tillera82950e2015-09-22 12:33:20 -0700435 }
Craig Tiller886d7ec2015-05-14 16:18:42 -0700436}
437
Craig Tillera82950e2015-09-22 12:33:20 -0700438static void wake_all_watchers_locked(grpc_fd *fd) {
Craig Tiller886d7ec2015-05-14 16:18:42 -0700439 grpc_fd_watcher *watcher;
Craig Tillera82950e2015-09-22 12:33:20 -0700440 for (watcher = fd->inactive_watcher_root.next;
441 watcher != &fd->inactive_watcher_root; watcher = watcher->next) {
Craig Tiller1270b2b2015-10-02 16:12:25 -0700442 pollset_kick_locked(watcher);
Craig Tillera82950e2015-09-22 12:33:20 -0700443 }
444 if (fd->read_watcher) {
Craig Tiller1270b2b2015-10-02 16:12:25 -0700445 pollset_kick_locked(fd->read_watcher);
Craig Tillera82950e2015-09-22 12:33:20 -0700446 }
447 if (fd->write_watcher && fd->write_watcher != fd->read_watcher) {
Craig Tiller1270b2b2015-10-02 16:12:25 -0700448 pollset_kick_locked(fd->write_watcher);
Craig Tillera82950e2015-09-22 12:33:20 -0700449 }
ctiller58393c22015-01-07 14:03:30 -0800450}
451
Craig Tillera82950e2015-09-22 12:33:20 -0700452static int has_watchers(grpc_fd *fd) {
453 return fd->read_watcher != NULL || fd->write_watcher != NULL ||
454 fd->inactive_watcher_root.next != &fd->inactive_watcher_root;
Craig Tiller8b6cb8d2015-06-26 08:08:35 -0700455}
456
yang-g8fefe372016-01-07 16:24:55 -0800457static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
458 fd->closed = 1;
459 if (!fd->released) {
460 close(fd->fd);
461 } else {
Craig Tillera75d18a2016-02-25 08:45:00 -0800462 remove_fd_from_all_epoll_sets(fd->fd);
yang-g8fefe372016-01-07 16:24:55 -0800463 }
Craig Tiller27f59af2016-04-28 14:19:48 -0700464 grpc_exec_ctx_push(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE, NULL);
yang-g8fefe372016-01-07 16:24:55 -0800465}
466
Craig Tillera75d18a2016-02-25 08:45:00 -0800467static int fd_wrapped_fd(grpc_fd *fd) {
Dan Born43a78032016-01-05 17:17:45 -0800468 if (fd->released || fd->closed) {
469 return -1;
470 } else {
471 return fd->fd;
472 }
473}
474
Craig Tillera75d18a2016-02-25 08:45:00 -0800475static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
476 grpc_closure *on_done, int *release_fd,
477 const char *reason) {
Craig Tiller0317b3d2015-06-01 21:57:03 -0700478 fd->on_done_closure = on_done;
yang-g5d850372015-12-01 10:32:28 -0800479 fd->released = release_fd != NULL;
yang-gdc215932015-11-30 14:25:01 -0800480 if (!fd->released) {
481 shutdown(fd->fd, SHUT_RDWR);
yang-g5d850372015-12-01 10:32:28 -0800482 } else {
483 *release_fd = fd->fd;
yang-gdc215932015-11-30 14:25:01 -0800484 }
Craig Tiller58d05a62015-10-02 13:59:31 -0700485 gpr_mu_lock(&fd->mu);
Craig Tillera82950e2015-09-22 12:33:20 -0700486 REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
487 if (!has_watchers(fd)) {
yang-g8fefe372016-01-07 16:24:55 -0800488 close_fd_locked(exec_ctx, fd);
Craig Tillera82950e2015-09-22 12:33:20 -0700489 } else {
490 wake_all_watchers_locked(fd);
491 }
Craig Tiller58d05a62015-10-02 13:59:31 -0700492 gpr_mu_unlock(&fd->mu);
Craig Tillera82950e2015-09-22 12:33:20 -0700493 UNREF_BY(fd, 2, reason); /* drop the reference */
ctiller58393c22015-01-07 14:03:30 -0800494}
495
496/* increment refcount by two to avoid changing the orphan bit */
Craig Tiller9ae76972015-05-31 13:58:24 -0700497#ifdef GRPC_FD_REF_COUNT_DEBUG
Craig Tillera75d18a2016-02-25 08:45:00 -0800498static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
499 int line) {
Craig Tillera82950e2015-09-22 12:33:20 -0700500 ref_by(fd, 2, reason, file, line);
Craig Tiller9ae76972015-05-31 13:58:24 -0700501}
ctiller58393c22015-01-07 14:03:30 -0800502
Craig Tillera75d18a2016-02-25 08:45:00 -0800503static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
504 int line) {
Craig Tillera82950e2015-09-22 12:33:20 -0700505 unref_by(fd, 2, reason, file, line);
Craig Tiller9ae76972015-05-31 13:58:24 -0700506}
507#else
Craig Tillera75d18a2016-02-25 08:45:00 -0800508static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
Craig Tiller9ae76972015-05-31 13:58:24 -0700509
Craig Tillera75d18a2016-02-25 08:45:00 -0800510static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
Craig Tiller9ae76972015-05-31 13:58:24 -0700511#endif
ctiller58393c22015-01-07 14:03:30 -0800512
Craig Tiller27f59af2016-04-28 14:19:48 -0700513static grpc_error *fd_shutdown_error(bool shutdown) {
514 if (!shutdown) {
515 return GRPC_ERROR_NONE;
516 } else {
Craig Tillerc027e772016-05-03 16:27:00 -0700517 return GRPC_ERROR_CREATE("FD shutdown");
Craig Tiller27f59af2016-04-28 14:19:48 -0700518 }
519}
520
Craig Tiller57f79d62015-10-02 14:00:12 -0700521static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
522 grpc_closure **st, grpc_closure *closure) {
Craig Tiller58d05a62015-10-02 13:59:31 -0700523 if (*st == CLOSURE_NOT_READY) {
Craig Tillerd49e3a12015-10-06 11:33:14 -0700524 /* not ready ==> switch to a waiting state by setting the closure */
Craig Tiller58d05a62015-10-02 13:59:31 -0700525 *st = closure;
526 } else if (*st == CLOSURE_READY) {
Craig Tillerd49e3a12015-10-06 11:33:14 -0700527 /* already ready ==> queue the closure to run immediately */
Craig Tiller58d05a62015-10-02 13:59:31 -0700528 *st = CLOSURE_NOT_READY;
Craig Tiller27f59af2016-04-28 14:19:48 -0700529 grpc_exec_ctx_push(exec_ctx, closure, fd_shutdown_error(fd->shutdown),
530 NULL);
Craig Tiller58d05a62015-10-02 13:59:31 -0700531 maybe_wake_one_watcher_locked(fd);
532 } else {
533 /* upcallptr was set to a different closure. This is an error! */
534 gpr_log(GPR_ERROR,
535 "User called a notify_on function with a previous callback still "
536 "pending");
537 abort();
Craig Tillera82950e2015-09-22 12:33:20 -0700538 }
ctiller58393c22015-01-07 14:03:30 -0800539}
540
Craig Tiller58d05a62015-10-02 13:59:31 -0700541/* returns 1 if state becomes not ready */
542static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
Craig Tiller57f79d62015-10-02 14:00:12 -0700543 grpc_closure **st) {
Craig Tiller58d05a62015-10-02 13:59:31 -0700544 if (*st == CLOSURE_READY) {
Craig Tillerd49e3a12015-10-06 11:33:14 -0700545 /* duplicate ready ==> ignore */
Craig Tiller58d05a62015-10-02 13:59:31 -0700546 return 0;
547 } else if (*st == CLOSURE_NOT_READY) {
Craig Tillerd49e3a12015-10-06 11:33:14 -0700548 /* not ready, and not waiting ==> flag ready */
Craig Tiller58d05a62015-10-02 13:59:31 -0700549 *st = CLOSURE_READY;
550 return 0;
551 } else {
Craig Tillerd49e3a12015-10-06 11:33:14 -0700552 /* waiting ==> queue closure */
Craig Tiller27f59af2016-04-28 14:19:48 -0700553 grpc_exec_ctx_push(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL);
Craig Tiller58d05a62015-10-02 13:59:31 -0700554 *st = CLOSURE_NOT_READY;
555 return 1;
556 }
557}
558
Craig Tillera75d18a2016-02-25 08:45:00 -0800559static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
Craig Tiller58d05a62015-10-02 13:59:31 -0700560 gpr_mu_lock(&fd->mu);
Vijay Paib1004102015-10-12 18:30:04 +0000561 GPR_ASSERT(!fd->shutdown);
Craig Tiller58d05a62015-10-02 13:59:31 -0700562 fd->shutdown = 1;
563 set_ready_locked(exec_ctx, fd, &fd->read_closure);
564 set_ready_locked(exec_ctx, fd, &fd->write_closure);
565 gpr_mu_unlock(&fd->mu);
ctiller58393c22015-01-07 14:03:30 -0800566}
567
Craig Tillera75d18a2016-02-25 08:45:00 -0800568static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
569 grpc_closure *closure) {
Craig Tiller58d05a62015-10-02 13:59:31 -0700570 gpr_mu_lock(&fd->mu);
Craig Tiller58d05a62015-10-02 13:59:31 -0700571 notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
572 gpr_mu_unlock(&fd->mu);
ctiller58393c22015-01-07 14:03:30 -0800573}
574
Craig Tillera75d18a2016-02-25 08:45:00 -0800575static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
576 grpc_closure *closure) {
Craig Tiller58d05a62015-10-02 13:59:31 -0700577 gpr_mu_lock(&fd->mu);
Craig Tiller58d05a62015-10-02 13:59:31 -0700578 notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
579 gpr_mu_unlock(&fd->mu);
ctiller58393c22015-01-07 14:03:30 -0800580}
581
Craig Tillera75d18a2016-02-25 08:45:00 -0800582static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
583 grpc_pollset_worker *worker, uint32_t read_mask,
584 uint32_t write_mask, grpc_fd_watcher *watcher) {
Craig Tiller7536af02015-12-22 13:49:30 -0800585 uint32_t mask = 0;
Craig Tiller58d05a62015-10-02 13:59:31 -0700586 grpc_closure *cur;
587 int requested;
ctiller58393c22015-01-07 14:03:30 -0800588 /* keep track of pollers that have requested our events, in case they change
589 */
Craig Tillera82950e2015-09-22 12:33:20 -0700590 GRPC_FD_REF(fd, "poll");
Craig Tiller59ea16f2015-02-18 16:18:08 -0800591
Craig Tiller58d05a62015-10-02 13:59:31 -0700592 gpr_mu_lock(&fd->mu);
Craig Tiller58d05a62015-10-02 13:59:31 -0700593
Craig Tiller8b6cb8d2015-06-26 08:08:35 -0700594 /* if we are shutdown, then don't add to the watcher set */
Vijay Paib1004102015-10-12 18:30:04 +0000595 if (fd->shutdown) {
Craig Tillera82950e2015-09-22 12:33:20 -0700596 watcher->fd = NULL;
597 watcher->pollset = NULL;
Craig Tillerdc174712015-10-01 10:25:02 -0700598 watcher->worker = NULL;
Craig Tiller58d05a62015-10-02 13:59:31 -0700599 gpr_mu_unlock(&fd->mu);
Craig Tillera82950e2015-09-22 12:33:20 -0700600 GRPC_FD_UNREF(fd, "poll");
601 return 0;
602 }
Craig Tiller58d05a62015-10-02 13:59:31 -0700603
Craig Tiller886d7ec2015-05-14 16:18:42 -0700604 /* if there is nobody polling for read, but we need to, then start doing so */
Craig Tiller58d05a62015-10-02 13:59:31 -0700605 cur = fd->read_closure;
606 requested = cur != CLOSURE_READY;
607 if (read_mask && fd->read_watcher == NULL && requested) {
Craig Tillera82950e2015-09-22 12:33:20 -0700608 fd->read_watcher = watcher;
609 mask |= read_mask;
610 }
Craig Tiller8e50fe92015-05-18 10:45:04 -0700611 /* if there is nobody polling for write, but we need to, then start doing so
612 */
Craig Tiller58d05a62015-10-02 13:59:31 -0700613 cur = fd->write_closure;
614 requested = cur != CLOSURE_READY;
615 if (write_mask && fd->write_watcher == NULL && requested) {
Craig Tillera82950e2015-09-22 12:33:20 -0700616 fd->write_watcher = watcher;
617 mask |= write_mask;
618 }
Craig Tiller886d7ec2015-05-14 16:18:42 -0700619 /* if not polling, remember this watcher in case we need someone to later */
Craig Tillerdc174712015-10-01 10:25:02 -0700620 if (mask == 0 && worker != NULL) {
Craig Tillera82950e2015-09-22 12:33:20 -0700621 watcher->next = &fd->inactive_watcher_root;
622 watcher->prev = watcher->next->prev;
623 watcher->next->prev = watcher->prev->next = watcher;
624 }
Craig Tiller7d413212015-02-09 08:00:02 -0800625 watcher->pollset = pollset;
Craig Tillerdc174712015-10-01 10:25:02 -0700626 watcher->worker = worker;
Craig Tiller7d413212015-02-09 08:00:02 -0800627 watcher->fd = fd;
Craig Tiller58d05a62015-10-02 13:59:31 -0700628 gpr_mu_unlock(&fd->mu);
ctiller58393c22015-01-07 14:03:30 -0800629
Craig Tiller886d7ec2015-05-14 16:18:42 -0700630 return mask;
ctiller58393c22015-01-07 14:03:30 -0800631}
632
Craig Tillera75d18a2016-02-25 08:45:00 -0800633static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
634 int got_read, int got_write) {
Craig Tiller886d7ec2015-05-14 16:18:42 -0700635 int was_polling = 0;
636 int kick = 0;
637 grpc_fd *fd = watcher->fd;
Craig Tiller59ea16f2015-02-18 16:18:08 -0800638
Craig Tillera82950e2015-09-22 12:33:20 -0700639 if (fd == NULL) {
640 return;
641 }
Craig Tiller8b6cb8d2015-06-26 08:08:35 -0700642
Craig Tiller58d05a62015-10-02 13:59:31 -0700643 gpr_mu_lock(&fd->mu);
644
Craig Tillera82950e2015-09-22 12:33:20 -0700645 if (watcher == fd->read_watcher) {
646 /* remove read watcher, kick if we still need a read */
647 was_polling = 1;
Craig Tiller58d05a62015-10-02 13:59:31 -0700648 if (!got_read) {
649 kick = 1;
650 }
Craig Tillera82950e2015-09-22 12:33:20 -0700651 fd->read_watcher = NULL;
652 }
653 if (watcher == fd->write_watcher) {
654 /* remove write watcher, kick if we still need a write */
655 was_polling = 1;
Craig Tiller58d05a62015-10-02 13:59:31 -0700656 if (!got_write) {
657 kick = 1;
658 }
Craig Tillera82950e2015-09-22 12:33:20 -0700659 fd->write_watcher = NULL;
660 }
Craig Tillerdc174712015-10-01 10:25:02 -0700661 if (!was_polling && watcher->worker != NULL) {
Craig Tillera82950e2015-09-22 12:33:20 -0700662 /* remove from inactive list */
663 watcher->next->prev = watcher->prev;
664 watcher->prev->next = watcher->next;
665 }
Craig Tiller58d05a62015-10-02 13:59:31 -0700666 if (got_read) {
667 if (set_ready_locked(exec_ctx, fd, &fd->read_closure)) {
668 kick = 1;
669 }
670 }
671 if (got_write) {
672 if (set_ready_locked(exec_ctx, fd, &fd->write_closure)) {
673 kick = 1;
674 }
675 }
Craig Tillera82950e2015-09-22 12:33:20 -0700676 if (kick) {
677 maybe_wake_one_watcher_locked(fd);
678 }
Craig Tillera75d18a2016-02-25 08:45:00 -0800679 if (fd_is_orphaned(fd) && !has_watchers(fd) && !fd->closed) {
yang-g8fefe372016-01-07 16:24:55 -0800680 close_fd_locked(exec_ctx, fd);
Craig Tillera82950e2015-09-22 12:33:20 -0700681 }
Craig Tiller58d05a62015-10-02 13:59:31 -0700682 gpr_mu_unlock(&fd->mu);
Craig Tiller886d7ec2015-05-14 16:18:42 -0700683
Craig Tillera82950e2015-09-22 12:33:20 -0700684 GRPC_FD_UNREF(fd, "poll");
ctiller58393c22015-01-07 14:03:30 -0800685}
686
Craig Tiller1a969c82016-02-22 12:13:16 -0800687/*******************************************************************************
688 * pollset_posix.c
689 */
690
691GPR_TLS_DECL(g_current_thread_poller);
692GPR_TLS_DECL(g_current_thread_worker);
693
Craig Tiller1a969c82016-02-22 12:13:16 -0800694/** The alarm system needs to be able to wakeup 'some poller' sometimes
695 * (specifically when a new alarm needs to be triggered earlier than the next
696 * alarm 'epoch').
697 * This wakeup_fd gives us something to alert on when such a case occurs. */
698grpc_wakeup_fd grpc_global_wakeup_fd;
699
700static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
701 worker->prev->next = worker->next;
702 worker->next->prev = worker->prev;
703}
704
Craig Tillera75d18a2016-02-25 08:45:00 -0800705static int pollset_has_workers(grpc_pollset *p) {
Craig Tiller1a969c82016-02-22 12:13:16 -0800706 return p->root_worker.next != &p->root_worker;
707}
708
709static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
Craig Tillera75d18a2016-02-25 08:45:00 -0800710 if (pollset_has_workers(p)) {
Craig Tiller1a969c82016-02-22 12:13:16 -0800711 grpc_pollset_worker *w = p->root_worker.next;
712 remove_worker(p, w);
713 return w;
714 } else {
715 return NULL;
716 }
717}
718
719static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
720 worker->next = &p->root_worker;
721 worker->prev = worker->next->prev;
722 worker->prev->next = worker->next->prev = worker;
723}
724
725static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
726 worker->prev = &p->root_worker;
727 worker->next = worker->prev->next;
728 worker->prev->next = worker->next->prev = worker;
729}
730
Craig Tiller4f1d0f32016-05-06 17:12:37 -0700731static void kick_append_error(grpc_error **composite, grpc_error *error) {
732 if (error == GRPC_ERROR_NONE) return;
733 if (*composite == GRPC_ERROR_NONE) {
734 *composite = GRPC_ERROR_CREATE("Kick Failure");
735 }
736 *composite = grpc_error_add_child(*composite, error);
737}
738
739static grpc_error *pollset_kick_ext(grpc_pollset *p,
740 grpc_pollset_worker *specific_worker,
741 uint32_t flags) {
Craig Tillera75d18a2016-02-25 08:45:00 -0800742 GPR_TIMER_BEGIN("pollset_kick_ext", 0);
Craig Tiller4f1d0f32016-05-06 17:12:37 -0700743 grpc_error *error = GRPC_ERROR_NONE;
Craig Tiller1a969c82016-02-22 12:13:16 -0800744
745 /* pollset->mu already held */
746 if (specific_worker != NULL) {
747 if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
Craig Tillera75d18a2016-02-25 08:45:00 -0800748 GPR_TIMER_BEGIN("pollset_kick_ext.broadcast", 0);
Craig Tiller1a969c82016-02-22 12:13:16 -0800749 GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
750 for (specific_worker = p->root_worker.next;
751 specific_worker != &p->root_worker;
752 specific_worker = specific_worker->next) {
Craig Tiller4f1d0f32016-05-06 17:12:37 -0700753 kick_append_error(
754 &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
Craig Tiller1a969c82016-02-22 12:13:16 -0800755 }
756 p->kicked_without_pollers = 1;
Craig Tillera75d18a2016-02-25 08:45:00 -0800757 GPR_TIMER_END("pollset_kick_ext.broadcast", 0);
Craig Tiller1a969c82016-02-22 12:13:16 -0800758 } else if (gpr_tls_get(&g_current_thread_worker) !=
759 (intptr_t)specific_worker) {
760 GPR_TIMER_MARK("different_thread_worker", 0);
761 if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
762 specific_worker->reevaluate_polling_on_wakeup = 1;
763 }
764 specific_worker->kicked_specifically = 1;
Craig Tiller4f1d0f32016-05-06 17:12:37 -0700765 kick_append_error(&error,
766 grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
Craig Tiller1a969c82016-02-22 12:13:16 -0800767 } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) {
768 GPR_TIMER_MARK("kick_yoself", 0);
769 if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
770 specific_worker->reevaluate_polling_on_wakeup = 1;
771 }
772 specific_worker->kicked_specifically = 1;
Craig Tiller4f1d0f32016-05-06 17:12:37 -0700773 kick_append_error(&error,
774 grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
Craig Tiller1a969c82016-02-22 12:13:16 -0800775 }
776 } else if (gpr_tls_get(&g_current_thread_poller) != (intptr_t)p) {
777 GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
778 GPR_TIMER_MARK("kick_anonymous", 0);
779 specific_worker = pop_front_worker(p);
780 if (specific_worker != NULL) {
781 if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) {
782 GPR_TIMER_MARK("kick_anonymous_not_self", 0);
783 push_back_worker(p, specific_worker);
784 specific_worker = pop_front_worker(p);
785 if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 &&
786 gpr_tls_get(&g_current_thread_worker) ==
787 (intptr_t)specific_worker) {
788 push_back_worker(p, specific_worker);
789 specific_worker = NULL;
790 }
791 }
792 if (specific_worker != NULL) {
793 GPR_TIMER_MARK("finally_kick", 0);
794 push_back_worker(p, specific_worker);
Craig Tiller4f1d0f32016-05-06 17:12:37 -0700795 kick_append_error(
796 &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
Craig Tiller1a969c82016-02-22 12:13:16 -0800797 }
798 } else {
799 GPR_TIMER_MARK("kicked_no_pollers", 0);
800 p->kicked_without_pollers = 1;
801 }
802 }
803
Craig Tillera75d18a2016-02-25 08:45:00 -0800804 GPR_TIMER_END("pollset_kick_ext", 0);
Craig Tiller4f1d0f32016-05-06 17:12:37 -0700805 return error;
Craig Tiller1a969c82016-02-22 12:13:16 -0800806}
807
Craig Tiller4f1d0f32016-05-06 17:12:37 -0700808static grpc_error *pollset_kick(grpc_pollset *p,
809 grpc_pollset_worker *specific_worker) {
810 return pollset_kick_ext(p, specific_worker, 0);
Craig Tiller1a969c82016-02-22 12:13:16 -0800811}
812
813/* global state management */
814
Craig Tiller4f1d0f32016-05-06 17:12:37 -0700815static grpc_error *pollset_global_init(void) {
Craig Tiller1a969c82016-02-22 12:13:16 -0800816 gpr_tls_init(&g_current_thread_poller);
817 gpr_tls_init(&g_current_thread_worker);
818 grpc_wakeup_fd_global_init();
Craig Tiller4f1d0f32016-05-06 17:12:37 -0700819 return grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
Craig Tiller1a969c82016-02-22 12:13:16 -0800820}
821
Craig Tillera75d18a2016-02-25 08:45:00 -0800822static void pollset_global_shutdown(void) {
Craig Tiller1a969c82016-02-22 12:13:16 -0800823 grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
824 gpr_tls_destroy(&g_current_thread_poller);
825 gpr_tls_destroy(&g_current_thread_worker);
826 grpc_wakeup_fd_global_destroy();
827}
828
Craig Tiller4f1d0f32016-05-06 17:12:37 -0700829static grpc_error *kick_poller(void) {
830 return grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd);
831}
Craig Tiller1a969c82016-02-22 12:13:16 -0800832
833/* main interface */
834
835static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null);
836
Craig Tiller7f2cdc32016-02-25 16:02:11 -0800837static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
838 gpr_mu_init(&pollset->mu);
839 *mu = &pollset->mu;
Craig Tiller1a969c82016-02-22 12:13:16 -0800840 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
841 pollset->in_flight_cbs = 0;
842 pollset->shutting_down = 0;
843 pollset->called_shutdown = 0;
844 pollset->kicked_without_pollers = 0;
845 pollset->idle_jobs.head = pollset->idle_jobs.tail = NULL;
846 pollset->local_wakeup_cache = NULL;
847 pollset->kicked_without_pollers = 0;
848 become_basic_pollset(pollset, NULL);
849}
850
Craig Tillera75d18a2016-02-25 08:45:00 -0800851static void pollset_destroy(grpc_pollset *pollset) {
Craig Tiller1a969c82016-02-22 12:13:16 -0800852 GPR_ASSERT(pollset->in_flight_cbs == 0);
Craig Tillera75d18a2016-02-25 08:45:00 -0800853 GPR_ASSERT(!pollset_has_workers(pollset));
Craig Tiller1a969c82016-02-22 12:13:16 -0800854 GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail);
855 pollset->vtable->destroy(pollset);
856 while (pollset->local_wakeup_cache) {
857 grpc_cached_wakeup_fd *next = pollset->local_wakeup_cache->next;
858 grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd);
859 gpr_free(pollset->local_wakeup_cache);
860 pollset->local_wakeup_cache = next;
861 }
Craig Tiller7f2cdc32016-02-25 16:02:11 -0800862 gpr_mu_destroy(&pollset->mu);
Craig Tiller1a969c82016-02-22 12:13:16 -0800863}
864
Craig Tillera75d18a2016-02-25 08:45:00 -0800865static void pollset_reset(grpc_pollset *pollset) {
Craig Tiller1a969c82016-02-22 12:13:16 -0800866 GPR_ASSERT(pollset->shutting_down);
867 GPR_ASSERT(pollset->in_flight_cbs == 0);
Craig Tillera75d18a2016-02-25 08:45:00 -0800868 GPR_ASSERT(!pollset_has_workers(pollset));
Craig Tiller1a969c82016-02-22 12:13:16 -0800869 GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail);
870 pollset->vtable->destroy(pollset);
871 pollset->shutting_down = 0;
872 pollset->called_shutdown = 0;
873 pollset->kicked_without_pollers = 0;
874 become_basic_pollset(pollset, NULL);
875}
876
Craig Tillera75d18a2016-02-25 08:45:00 -0800877static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
878 grpc_fd *fd) {
Craig Tiller7f2cdc32016-02-25 16:02:11 -0800879 gpr_mu_lock(&pollset->mu);
Craig Tiller1a969c82016-02-22 12:13:16 -0800880 pollset->vtable->add_fd(exec_ctx, pollset, fd, 1);
881/* the following (enabled only in debug) will reacquire and then release
882 our lock - meaning that if the unlocking flag passed to add_fd above is
883 not respected, the code will deadlock (in a way that we have a chance of
884 debugging) */
885#ifndef NDEBUG
Craig Tiller7f2cdc32016-02-25 16:02:11 -0800886 gpr_mu_lock(&pollset->mu);
887 gpr_mu_unlock(&pollset->mu);
Craig Tiller1a969c82016-02-22 12:13:16 -0800888#endif
889}
890
891static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
892 GPR_ASSERT(grpc_closure_list_empty(pollset->idle_jobs));
893 pollset->vtable->finish_shutdown(pollset);
Craig Tiller27f59af2016-04-28 14:19:48 -0700894 grpc_exec_ctx_push(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL);
Craig Tiller1a969c82016-02-22 12:13:16 -0800895}
896
Craig Tiller4f1d0f32016-05-06 17:12:37 -0700897static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
898 grpc_pollset_worker **worker_hdl,
899 gpr_timespec now, gpr_timespec deadline) {
Craig Tiller1a969c82016-02-22 12:13:16 -0800900 grpc_pollset_worker worker;
901 *worker_hdl = &worker;
Craig Tiller4f1d0f32016-05-06 17:12:37 -0700902 grpc_error *error = GRPC_ERROR_NONE;
Craig Tiller1a969c82016-02-22 12:13:16 -0800903
904 /* pollset->mu already held */
905 int added_worker = 0;
906 int locked = 1;
907 int queued_work = 0;
908 int keep_polling = 0;
Craig Tillera75d18a2016-02-25 08:45:00 -0800909 GPR_TIMER_BEGIN("pollset_work", 0);
Craig Tiller1a969c82016-02-22 12:13:16 -0800910 /* this must happen before we (potentially) drop pollset->mu */
911 worker.next = worker.prev = NULL;
912 worker.reevaluate_polling_on_wakeup = 0;
913 if (pollset->local_wakeup_cache != NULL) {
914 worker.wakeup_fd = pollset->local_wakeup_cache;
915 pollset->local_wakeup_cache = worker.wakeup_fd->next;
916 } else {
917 worker.wakeup_fd = gpr_malloc(sizeof(*worker.wakeup_fd));
Craig Tiller4f1d0f32016-05-06 17:12:37 -0700918 error = grpc_wakeup_fd_init(&worker.wakeup_fd->fd);
919 if (error != GRPC_ERROR_NONE) {
920 return error;
921 }
Craig Tiller1a969c82016-02-22 12:13:16 -0800922 }
923 worker.kicked_specifically = 0;
924 /* If there's work waiting for the pollset to be idle, and the
925 pollset is idle, then do that work */
Craig Tillera75d18a2016-02-25 08:45:00 -0800926 if (!pollset_has_workers(pollset) &&
Craig Tiller1a969c82016-02-22 12:13:16 -0800927 !grpc_closure_list_empty(pollset->idle_jobs)) {
Craig Tillera75d18a2016-02-25 08:45:00 -0800928 GPR_TIMER_MARK("pollset_work.idle_jobs", 0);
Craig Tiller1a969c82016-02-22 12:13:16 -0800929 grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
930 goto done;
931 }
932 /* If we're shutting down then we don't execute any extended work */
933 if (pollset->shutting_down) {
Craig Tillera75d18a2016-02-25 08:45:00 -0800934 GPR_TIMER_MARK("pollset_work.shutting_down", 0);
Craig Tiller1a969c82016-02-22 12:13:16 -0800935 goto done;
936 }
937 /* Give do_promote priority so we don't starve it out */
938 if (pollset->in_flight_cbs) {
Craig Tillera75d18a2016-02-25 08:45:00 -0800939 GPR_TIMER_MARK("pollset_work.in_flight_cbs", 0);
Craig Tiller7f2cdc32016-02-25 16:02:11 -0800940 gpr_mu_unlock(&pollset->mu);
Craig Tiller1a969c82016-02-22 12:13:16 -0800941 locked = 0;
942 goto done;
943 }
944 /* Start polling, and keep doing so while we're being asked to
945 re-evaluate our pollers (this allows poll() based pollers to
946 ensure they don't miss wakeups) */
947 keep_polling = 1;
948 while (keep_polling) {
949 keep_polling = 0;
950 if (!pollset->kicked_without_pollers) {
951 if (!added_worker) {
952 push_front_worker(pollset, &worker);
953 added_worker = 1;
954 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
955 }
956 gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset);
957 GPR_TIMER_BEGIN("maybe_work_and_unlock", 0);
Craig Tiller4f1d0f32016-05-06 17:12:37 -0700958 error = pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, &worker,
959 deadline, now);
Craig Tiller1a969c82016-02-22 12:13:16 -0800960 GPR_TIMER_END("maybe_work_and_unlock", 0);
961 locked = 0;
962 gpr_tls_set(&g_current_thread_poller, 0);
963 } else {
Craig Tillera75d18a2016-02-25 08:45:00 -0800964 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
Craig Tiller1a969c82016-02-22 12:13:16 -0800965 pollset->kicked_without_pollers = 0;
966 }
967 /* Finished execution - start cleaning up.
968 Note that we may arrive here from outside the enclosing while() loop.
969 In that case we won't loop though as we haven't added worker to the
970 worker list, which means nobody could ask us to re-evaluate polling). */
971 done:
972 if (!locked) {
973 queued_work |= grpc_exec_ctx_flush(exec_ctx);
Craig Tiller7f2cdc32016-02-25 16:02:11 -0800974 gpr_mu_lock(&pollset->mu);
Craig Tiller1a969c82016-02-22 12:13:16 -0800975 locked = 1;
976 }
Craig Tillera75d18a2016-02-25 08:45:00 -0800977 /* If we're forced to re-evaluate polling (via pollset_kick with
Craig Tiller1a969c82016-02-22 12:13:16 -0800978 GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force
979 a loop */
Craig Tiller4f1d0f32016-05-06 17:12:37 -0700980 if (worker.reevaluate_polling_on_wakeup && error == GRPC_ERROR_NONE) {
Craig Tiller1a969c82016-02-22 12:13:16 -0800981 worker.reevaluate_polling_on_wakeup = 0;
982 pollset->kicked_without_pollers = 0;
983 if (queued_work || worker.kicked_specifically) {
984 /* If there's queued work on the list, then set the deadline to be
985 immediate so we get back out of the polling loop quickly */
986 deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC);
987 }
988 keep_polling = 1;
989 }
990 }
991 if (added_worker) {
992 remove_worker(pollset, &worker);
993 gpr_tls_set(&g_current_thread_worker, 0);
994 }
995 /* release wakeup fd to the local pool */
996 worker.wakeup_fd->next = pollset->local_wakeup_cache;
997 pollset->local_wakeup_cache = worker.wakeup_fd;
998 /* check shutdown conditions */
999 if (pollset->shutting_down) {
Craig Tillera75d18a2016-02-25 08:45:00 -08001000 if (pollset_has_workers(pollset)) {
1001 pollset_kick(pollset, NULL);
Craig Tiller1a969c82016-02-22 12:13:16 -08001002 } else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) {
1003 pollset->called_shutdown = 1;
Craig Tiller7f2cdc32016-02-25 16:02:11 -08001004 gpr_mu_unlock(&pollset->mu);
Craig Tiller1a969c82016-02-22 12:13:16 -08001005 finish_shutdown(exec_ctx, pollset);
1006 grpc_exec_ctx_flush(exec_ctx);
1007 /* Continuing to access pollset here is safe -- it is the caller's
1008 * responsibility to not destroy when it has outstanding calls to
Craig Tillera75d18a2016-02-25 08:45:00 -08001009 * pollset_work.
Craig Tiller1a969c82016-02-22 12:13:16 -08001010 * TODO(dklempner): Can we refactor the shutdown logic to avoid this? */
Craig Tiller7f2cdc32016-02-25 16:02:11 -08001011 gpr_mu_lock(&pollset->mu);
Craig Tiller1a969c82016-02-22 12:13:16 -08001012 } else if (!grpc_closure_list_empty(pollset->idle_jobs)) {
1013 grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
Craig Tiller7f2cdc32016-02-25 16:02:11 -08001014 gpr_mu_unlock(&pollset->mu);
Craig Tiller1a969c82016-02-22 12:13:16 -08001015 grpc_exec_ctx_flush(exec_ctx);
Craig Tiller7f2cdc32016-02-25 16:02:11 -08001016 gpr_mu_lock(&pollset->mu);
Craig Tiller1a969c82016-02-22 12:13:16 -08001017 }
1018 }
1019 *worker_hdl = NULL;
Craig Tillera75d18a2016-02-25 08:45:00 -08001020 GPR_TIMER_END("pollset_work", 0);
Craig Tiller4f1d0f32016-05-06 17:12:37 -07001021 return error;
Craig Tiller1a969c82016-02-22 12:13:16 -08001022}
1023
Craig Tillera75d18a2016-02-25 08:45:00 -08001024static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1025 grpc_closure *closure) {
Craig Tiller1a969c82016-02-22 12:13:16 -08001026 GPR_ASSERT(!pollset->shutting_down);
1027 pollset->shutting_down = 1;
1028 pollset->shutdown_done = closure;
Craig Tillera75d18a2016-02-25 08:45:00 -08001029 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1030 if (!pollset_has_workers(pollset)) {
Craig Tiller1a969c82016-02-22 12:13:16 -08001031 grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
1032 }
1033 if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 &&
Craig Tillera75d18a2016-02-25 08:45:00 -08001034 !pollset_has_workers(pollset)) {
Craig Tiller1a969c82016-02-22 12:13:16 -08001035 pollset->called_shutdown = 1;
1036 finish_shutdown(exec_ctx, pollset);
1037 }
1038}
1039
Craig Tillera75d18a2016-02-25 08:45:00 -08001040static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
1041 gpr_timespec now) {
Craig Tiller1a969c82016-02-22 12:13:16 -08001042 gpr_timespec timeout;
1043 static const int64_t max_spin_polling_us = 10;
1044 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
1045 return -1;
1046 }
1047 if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
1048 max_spin_polling_us,
1049 GPR_TIMESPAN))) <= 0) {
1050 return 0;
1051 }
1052 timeout = gpr_time_sub(deadline, now);
1053 return gpr_time_to_millis(gpr_time_add(
1054 timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
1055}
1056
1057/*
1058 * basic_pollset - a vtable that provides polling for zero or one file
1059 * descriptor via poll()
1060 */
1061
1062typedef struct grpc_unary_promote_args {
1063 const grpc_pollset_vtable *original_vtable;
1064 grpc_pollset *pollset;
1065 grpc_fd *fd;
1066 grpc_closure promotion_closure;
1067} grpc_unary_promote_args;
1068
1069static void basic_do_promote(grpc_exec_ctx *exec_ctx, void *args,
Craig Tillerc027e772016-05-03 16:27:00 -07001070 grpc_error *error) {
Craig Tiller1a969c82016-02-22 12:13:16 -08001071 grpc_unary_promote_args *up_args = args;
1072 const grpc_pollset_vtable *original_vtable = up_args->original_vtable;
1073 grpc_pollset *pollset = up_args->pollset;
1074 grpc_fd *fd = up_args->fd;
1075
1076 /*
1077 * This is quite tricky. There are a number of cases to keep in mind here:
1078 * 1. fd may have been orphaned
1079 * 2. The pollset may no longer be a unary poller (and we can't let case #1
1080 * leak to other pollset types!)
1081 * 3. pollset's fd (which may have changed) may have been orphaned
1082 * 4. The pollset may be shutting down.
1083 */
1084
Craig Tiller7f2cdc32016-02-25 16:02:11 -08001085 gpr_mu_lock(&pollset->mu);
Craig Tiller1a969c82016-02-22 12:13:16 -08001086 /* First we need to ensure that nobody is polling concurrently */
Craig Tillera75d18a2016-02-25 08:45:00 -08001087 GPR_ASSERT(!pollset_has_workers(pollset));
Craig Tiller1a969c82016-02-22 12:13:16 -08001088
1089 gpr_free(up_args);
1090 /* At this point the pollset may no longer be a unary poller. In that case
1091 * we should just call the right add function and be done. */
1092 /* TODO(klempner): If we're not careful this could cause infinite recursion.
1093 * That's not a problem for now because empty_pollset has a trivial poller
1094 * and we don't have any mechanism to unbecome multipoller. */
1095 pollset->in_flight_cbs--;
1096 if (pollset->shutting_down) {
1097 /* We don't care about this pollset anymore. */
1098 if (pollset->in_flight_cbs == 0 && !pollset->called_shutdown) {
1099 pollset->called_shutdown = 1;
1100 finish_shutdown(exec_ctx, pollset);
1101 }
Craig Tillera75d18a2016-02-25 08:45:00 -08001102 } else if (fd_is_orphaned(fd)) {
Craig Tiller1a969c82016-02-22 12:13:16 -08001103 /* Don't try to add it to anything, we'll drop our ref on it below */
1104 } else if (pollset->vtable != original_vtable) {
1105 pollset->vtable->add_fd(exec_ctx, pollset, fd, 0);
1106 } else if (fd != pollset->data.ptr) {
1107 grpc_fd *fds[2];
1108 fds[0] = pollset->data.ptr;
1109 fds[1] = fd;
1110
Craig Tillera75d18a2016-02-25 08:45:00 -08001111 if (fds[0] && !fd_is_orphaned(fds[0])) {
1112 platform_become_multipoller(exec_ctx, pollset, fds, GPR_ARRAY_SIZE(fds));
Craig Tiller1a969c82016-02-22 12:13:16 -08001113 GRPC_FD_UNREF(fds[0], "basicpoll");
1114 } else {
1115 /* old fd is orphaned and we haven't cleaned it up until now, so remain a
1116 * unary poller */
1117 /* Note that it is possible that fds[1] is also orphaned at this point.
1118 * That's okay, we'll correct it at the next add or poll. */
1119 if (fds[0]) GRPC_FD_UNREF(fds[0], "basicpoll");
1120 pollset->data.ptr = fd;
1121 GRPC_FD_REF(fd, "basicpoll");
1122 }
1123 }
1124
Craig Tiller7f2cdc32016-02-25 16:02:11 -08001125 gpr_mu_unlock(&pollset->mu);
Craig Tiller1a969c82016-02-22 12:13:16 -08001126
1127 /* Matching ref in basic_pollset_add_fd */
1128 GRPC_FD_UNREF(fd, "basicpoll_add");
1129}
1130
1131static void basic_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1132 grpc_fd *fd, int and_unlock_pollset) {
1133 grpc_unary_promote_args *up_args;
1134 GPR_ASSERT(fd);
1135 if (fd == pollset->data.ptr) goto exit;
1136
Craig Tillera75d18a2016-02-25 08:45:00 -08001137 if (!pollset_has_workers(pollset)) {
Craig Tiller1a969c82016-02-22 12:13:16 -08001138 /* Fast path -- no in flight cbs */
1139 /* TODO(klempner): Comment this out and fix any test failures or establish
1140 * they are due to timing issues */
1141 grpc_fd *fds[2];
1142 fds[0] = pollset->data.ptr;
1143 fds[1] = fd;
1144
1145 if (fds[0] == NULL) {
1146 pollset->data.ptr = fd;
1147 GRPC_FD_REF(fd, "basicpoll");
Craig Tillera75d18a2016-02-25 08:45:00 -08001148 } else if (!fd_is_orphaned(fds[0])) {
1149 platform_become_multipoller(exec_ctx, pollset, fds, GPR_ARRAY_SIZE(fds));
Craig Tiller1a969c82016-02-22 12:13:16 -08001150 GRPC_FD_UNREF(fds[0], "basicpoll");
1151 } else {
1152 /* old fd is orphaned and we haven't cleaned it up until now, so remain a
1153 * unary poller */
1154 GRPC_FD_UNREF(fds[0], "basicpoll");
1155 pollset->data.ptr = fd;
1156 GRPC_FD_REF(fd, "basicpoll");
1157 }
1158 goto exit;
1159 }
1160
1161 /* Now we need to promote. This needs to happen when we're not polling. Since
1162 * this may be called from poll, the wait needs to happen asynchronously. */
1163 GRPC_FD_REF(fd, "basicpoll_add");
1164 pollset->in_flight_cbs++;
1165 up_args = gpr_malloc(sizeof(*up_args));
1166 up_args->fd = fd;
1167 up_args->original_vtable = pollset->vtable;
1168 up_args->pollset = pollset;
1169 up_args->promotion_closure.cb = basic_do_promote;
1170 up_args->promotion_closure.cb_arg = up_args;
1171
Craig Tiller27f59af2016-04-28 14:19:48 -07001172 grpc_closure_list_append(&pollset->idle_jobs, &up_args->promotion_closure,
1173 GRPC_ERROR_NONE);
Craig Tillera75d18a2016-02-25 08:45:00 -08001174 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
Craig Tiller1a969c82016-02-22 12:13:16 -08001175
1176exit:
1177 if (and_unlock_pollset) {
Craig Tiller7f2cdc32016-02-25 16:02:11 -08001178 gpr_mu_unlock(&pollset->mu);
Craig Tiller1a969c82016-02-22 12:13:16 -08001179 }
1180}
1181
Craig Tiller4f1d0f32016-05-06 17:12:37 -07001182static void work_combine_error(grpc_error **composite, grpc_error *error) {
1183 if (error == GRPC_ERROR_NONE) return;
1184 if (*composite == GRPC_ERROR_NONE) {
1185 *composite = GRPC_ERROR_CREATE("pollset_work");
1186 }
1187 *composite = grpc_error_add_child(*composite, error);
1188}
1189
1190static grpc_error *basic_pollset_maybe_work_and_unlock(
1191 grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker,
1192 gpr_timespec deadline, gpr_timespec now) {
1193 grpc_error *error = GRPC_ERROR_NONE;
1194
Craig Tiller1a969c82016-02-22 12:13:16 -08001195#define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR)
1196#define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR)
1197
1198 struct pollfd pfd[3];
1199 grpc_fd *fd;
1200 grpc_fd_watcher fd_watcher;
1201 int timeout;
1202 int r;
1203 nfds_t nfds;
1204
1205 fd = pollset->data.ptr;
Craig Tillera75d18a2016-02-25 08:45:00 -08001206 if (fd && fd_is_orphaned(fd)) {
Craig Tiller1a969c82016-02-22 12:13:16 -08001207 GRPC_FD_UNREF(fd, "basicpoll");
1208 fd = pollset->data.ptr = NULL;
1209 }
Craig Tillera75d18a2016-02-25 08:45:00 -08001210 timeout = poll_deadline_to_millis_timeout(deadline, now);
Craig Tiller1a969c82016-02-22 12:13:16 -08001211 pfd[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd);
1212 pfd[0].events = POLLIN;
1213 pfd[0].revents = 0;
1214 pfd[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd->fd);
1215 pfd[1].events = POLLIN;
1216 pfd[1].revents = 0;
1217 nfds = 2;
1218 if (fd) {
1219 pfd[2].fd = fd->fd;
1220 pfd[2].revents = 0;
1221 GRPC_FD_REF(fd, "basicpoll_begin");
Craig Tiller7f2cdc32016-02-25 16:02:11 -08001222 gpr_mu_unlock(&pollset->mu);
Craig Tillera75d18a2016-02-25 08:45:00 -08001223 pfd[2].events =
1224 (short)fd_begin_poll(fd, pollset, worker, POLLIN, POLLOUT, &fd_watcher);
Craig Tiller1a969c82016-02-22 12:13:16 -08001225 if (pfd[2].events != 0) {
1226 nfds++;
1227 }
1228 } else {
Craig Tiller7f2cdc32016-02-25 16:02:11 -08001229 gpr_mu_unlock(&pollset->mu);
Craig Tiller1a969c82016-02-22 12:13:16 -08001230 }
1231
1232 /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
1233 even going into the blocking annotation if possible */
1234 /* poll fd count (argument 2) is shortened by one if we have no events
1235 to poll on - such that it only includes the kicker */
1236 GPR_TIMER_BEGIN("poll", 0);
1237 GRPC_SCHEDULING_START_BLOCKING_REGION;
1238 r = grpc_poll_function(pfd, nfds, timeout);
1239 GRPC_SCHEDULING_END_BLOCKING_REGION;
1240 GPR_TIMER_END("poll", 0);
1241
1242 if (r < 0) {
1243 if (errno != EINTR) {
Craig Tiller4f1d0f32016-05-06 17:12:37 -07001244 work_combine_error(&error, GRPC_OS_ERROR(errno, "poll"));
Craig Tiller1a969c82016-02-22 12:13:16 -08001245 }
1246 if (fd) {
Craig Tillera75d18a2016-02-25 08:45:00 -08001247 fd_end_poll(exec_ctx, &fd_watcher, 0, 0);
Craig Tiller1a969c82016-02-22 12:13:16 -08001248 }
1249 } else if (r == 0) {
1250 if (fd) {
Craig Tillera75d18a2016-02-25 08:45:00 -08001251 fd_end_poll(exec_ctx, &fd_watcher, 0, 0);
Craig Tiller1a969c82016-02-22 12:13:16 -08001252 }
1253 } else {
1254 if (pfd[0].revents & POLLIN_CHECK) {
Craig Tiller4f1d0f32016-05-06 17:12:37 -07001255 work_combine_error(&error,
1256 grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd));
Craig Tiller1a969c82016-02-22 12:13:16 -08001257 }
1258 if (pfd[1].revents & POLLIN_CHECK) {
Craig Tiller4f1d0f32016-05-06 17:12:37 -07001259 work_combine_error(&error,
1260 grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd));
Craig Tiller1a969c82016-02-22 12:13:16 -08001261 }
1262 if (nfds > 2) {
Craig Tillera75d18a2016-02-25 08:45:00 -08001263 fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN_CHECK,
1264 pfd[2].revents & POLLOUT_CHECK);
Craig Tiller1a969c82016-02-22 12:13:16 -08001265 } else if (fd) {
Craig Tillera75d18a2016-02-25 08:45:00 -08001266 fd_end_poll(exec_ctx, &fd_watcher, 0, 0);
Craig Tiller1a969c82016-02-22 12:13:16 -08001267 }
1268 }
1269
1270 if (fd) {
1271 GRPC_FD_UNREF(fd, "basicpoll_begin");
1272 }
Craig Tiller4f1d0f32016-05-06 17:12:37 -07001273
1274 return error;
Craig Tiller1a969c82016-02-22 12:13:16 -08001275}
1276
1277static void basic_pollset_destroy(grpc_pollset *pollset) {
1278 if (pollset->data.ptr != NULL) {
1279 GRPC_FD_UNREF(pollset->data.ptr, "basicpoll");
1280 pollset->data.ptr = NULL;
1281 }
1282}
1283
1284static const grpc_pollset_vtable basic_pollset = {
1285 basic_pollset_add_fd, basic_pollset_maybe_work_and_unlock,
1286 basic_pollset_destroy, basic_pollset_destroy};
1287
1288static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null) {
1289 pollset->vtable = &basic_pollset;
1290 pollset->data.ptr = fd_or_null;
1291 if (fd_or_null != NULL) {
1292 GRPC_FD_REF(fd_or_null, "basicpoll");
1293 }
1294}
1295
1296/*******************************************************************************
1297 * pollset_multipoller_with_poll_posix.c
1298 */
1299
Craig Tillera75d18a2016-02-25 08:45:00 -08001300#ifndef GPR_LINUX_MULTIPOLL_WITH_EPOLL
1301
Craig Tiller1a969c82016-02-22 12:13:16 -08001302typedef struct {
1303 /* all polled fds */
1304 size_t fd_count;
1305 size_t fd_capacity;
1306 grpc_fd **fds;
1307 /* fds that have been removed from the pollset explicitly */
1308 size_t del_count;
1309 size_t del_capacity;
1310 grpc_fd **dels;
1311} poll_hdr;
1312
1313static void multipoll_with_poll_pollset_add_fd(grpc_exec_ctx *exec_ctx,
1314 grpc_pollset *pollset,
1315 grpc_fd *fd,
1316 int and_unlock_pollset) {
1317 size_t i;
1318 poll_hdr *h = pollset->data.ptr;
1319 /* TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here */
1320 for (i = 0; i < h->fd_count; i++) {
1321 if (h->fds[i] == fd) goto exit;
1322 }
1323 if (h->fd_count == h->fd_capacity) {
1324 h->fd_capacity = GPR_MAX(h->fd_capacity + 8, h->fd_count * 3 / 2);
1325 h->fds = gpr_realloc(h->fds, sizeof(grpc_fd *) * h->fd_capacity);
1326 }
1327 h->fds[h->fd_count++] = fd;
1328 GRPC_FD_REF(fd, "multipoller");
1329exit:
1330 if (and_unlock_pollset) {
Craig Tiller24ee9b82016-02-25 16:36:44 -08001331 gpr_mu_unlock(&pollset->mu);
Craig Tiller1a969c82016-02-22 12:13:16 -08001332 }
1333}
1334
Craig Tiller4f1d0f32016-05-06 17:12:37 -07001335static grpc_error *multipoll_with_poll_pollset_maybe_work_and_unlock(
Craig Tiller1a969c82016-02-22 12:13:16 -08001336 grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker,
1337 gpr_timespec deadline, gpr_timespec now) {
Craig Tiller4f1d0f32016-05-06 17:12:37 -07001338 grpc_error *error = GRPC_ERROR_NONE;
1339
Craig Tiller1a969c82016-02-22 12:13:16 -08001340#define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR)
1341#define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR)
1342
1343 int timeout;
1344 int r;
1345 size_t i, j, fd_count;
1346 nfds_t pfd_count;
1347 poll_hdr *h;
1348 /* TODO(ctiller): inline some elements to avoid an allocation */
1349 grpc_fd_watcher *watchers;
1350 struct pollfd *pfds;
1351
1352 h = pollset->data.ptr;
Craig Tillera75d18a2016-02-25 08:45:00 -08001353 timeout = poll_deadline_to_millis_timeout(deadline, now);
Craig Tiller1a969c82016-02-22 12:13:16 -08001354 /* TODO(ctiller): perform just one malloc here if we exceed the inline case */
1355 pfds = gpr_malloc(sizeof(*pfds) * (h->fd_count + 2));
1356 watchers = gpr_malloc(sizeof(*watchers) * (h->fd_count + 2));
1357 fd_count = 0;
1358 pfd_count = 2;
1359 pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd);
1360 pfds[0].events = POLLIN;
1361 pfds[0].revents = 0;
1362 pfds[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd->fd);
1363 pfds[1].events = POLLIN;
1364 pfds[1].revents = 0;
1365 for (i = 0; i < h->fd_count; i++) {
Craig Tillera75d18a2016-02-25 08:45:00 -08001366 int remove = fd_is_orphaned(h->fds[i]);
Craig Tiller1a969c82016-02-22 12:13:16 -08001367 for (j = 0; !remove && j < h->del_count; j++) {
1368 if (h->fds[i] == h->dels[j]) remove = 1;
1369 }
1370 if (remove) {
1371 GRPC_FD_UNREF(h->fds[i], "multipoller");
1372 } else {
1373 h->fds[fd_count++] = h->fds[i];
1374 watchers[pfd_count].fd = h->fds[i];
Craig Tiller4560bda2016-03-07 20:17:10 -08001375 GRPC_FD_REF(watchers[pfd_count].fd, "multipoller_start");
Craig Tiller1a969c82016-02-22 12:13:16 -08001376 pfds[pfd_count].fd = h->fds[i]->fd;
1377 pfds[pfd_count].revents = 0;
1378 pfd_count++;
1379 }
1380 }
1381 for (j = 0; j < h->del_count; j++) {
1382 GRPC_FD_UNREF(h->dels[j], "multipoller_del");
1383 }
1384 h->del_count = 0;
1385 h->fd_count = fd_count;
Craig Tiller24ee9b82016-02-25 16:36:44 -08001386 gpr_mu_unlock(&pollset->mu);
Craig Tiller1a969c82016-02-22 12:13:16 -08001387
1388 for (i = 2; i < pfd_count; i++) {
Craig Tiller4560bda2016-03-07 20:17:10 -08001389 grpc_fd *fd = watchers[i].fd;
Craig Tiller8a034482016-03-28 16:09:04 -07001390 pfds[i].events = (short)fd_begin_poll(fd, pollset, worker, POLLIN, POLLOUT,
1391 &watchers[i]);
Craig Tiller4560bda2016-03-07 20:17:10 -08001392 GRPC_FD_UNREF(fd, "multipoller_start");
Craig Tiller1a969c82016-02-22 12:13:16 -08001393 }
1394
1395 /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
1396 even going into the blocking annotation if possible */
1397 GRPC_SCHEDULING_START_BLOCKING_REGION;
1398 r = grpc_poll_function(pfds, pfd_count, timeout);
1399 GRPC_SCHEDULING_END_BLOCKING_REGION;
1400
1401 if (r < 0) {
1402 if (errno != EINTR) {
1403 gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
1404 }
1405 for (i = 2; i < pfd_count; i++) {
Craig Tillera75d18a2016-02-25 08:45:00 -08001406 fd_end_poll(exec_ctx, &watchers[i], 0, 0);
Craig Tiller1a969c82016-02-22 12:13:16 -08001407 }
1408 } else if (r == 0) {
1409 for (i = 2; i < pfd_count; i++) {
Craig Tillera75d18a2016-02-25 08:45:00 -08001410 fd_end_poll(exec_ctx, &watchers[i], 0, 0);
Craig Tiller1a969c82016-02-22 12:13:16 -08001411 }
1412 } else {
1413 if (pfds[0].revents & POLLIN_CHECK) {
Craig Tiller4f1d0f32016-05-06 17:12:37 -07001414 work_combine_error(&error,
1415 grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd));
Craig Tiller1a969c82016-02-22 12:13:16 -08001416 }
1417 if (pfds[1].revents & POLLIN_CHECK) {
Craig Tiller4f1d0f32016-05-06 17:12:37 -07001418 work_combine_error(&error,
1419 grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd));
Craig Tiller1a969c82016-02-22 12:13:16 -08001420 }
1421 for (i = 2; i < pfd_count; i++) {
1422 if (watchers[i].fd == NULL) {
Craig Tillera75d18a2016-02-25 08:45:00 -08001423 fd_end_poll(exec_ctx, &watchers[i], 0, 0);
Craig Tiller1a969c82016-02-22 12:13:16 -08001424 continue;
1425 }
Craig Tillera75d18a2016-02-25 08:45:00 -08001426 fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN_CHECK,
1427 pfds[i].revents & POLLOUT_CHECK);
Craig Tiller1a969c82016-02-22 12:13:16 -08001428 }
1429 }
1430
1431 gpr_free(pfds);
1432 gpr_free(watchers);
Craig Tiller4f1d0f32016-05-06 17:12:37 -07001433 return error;
Craig Tiller1a969c82016-02-22 12:13:16 -08001434}
1435
1436static void multipoll_with_poll_pollset_finish_shutdown(grpc_pollset *pollset) {
1437 size_t i;
1438 poll_hdr *h = pollset->data.ptr;
1439 for (i = 0; i < h->fd_count; i++) {
1440 GRPC_FD_UNREF(h->fds[i], "multipoller");
1441 }
1442 for (i = 0; i < h->del_count; i++) {
1443 GRPC_FD_UNREF(h->dels[i], "multipoller_del");
1444 }
1445 h->fd_count = 0;
1446 h->del_count = 0;
1447}
1448
1449static void multipoll_with_poll_pollset_destroy(grpc_pollset *pollset) {
1450 poll_hdr *h = pollset->data.ptr;
1451 multipoll_with_poll_pollset_finish_shutdown(pollset);
1452 gpr_free(h->fds);
1453 gpr_free(h->dels);
1454 gpr_free(h);
1455}
1456
1457static const grpc_pollset_vtable multipoll_with_poll_pollset = {
1458 multipoll_with_poll_pollset_add_fd,
1459 multipoll_with_poll_pollset_maybe_work_and_unlock,
1460 multipoll_with_poll_pollset_finish_shutdown,
1461 multipoll_with_poll_pollset_destroy};
1462
Craig Tillera75d18a2016-02-25 08:45:00 -08001463static void poll_become_multipoller(grpc_exec_ctx *exec_ctx,
1464 grpc_pollset *pollset, grpc_fd **fds,
1465 size_t nfds) {
Craig Tiller1a969c82016-02-22 12:13:16 -08001466 size_t i;
1467 poll_hdr *h = gpr_malloc(sizeof(poll_hdr));
1468 pollset->vtable = &multipoll_with_poll_pollset;
1469 pollset->data.ptr = h;
1470 h->fd_count = nfds;
1471 h->fd_capacity = nfds;
1472 h->fds = gpr_malloc(nfds * sizeof(grpc_fd *));
1473 h->del_count = 0;
1474 h->del_capacity = 0;
1475 h->dels = NULL;
1476 for (i = 0; i < nfds; i++) {
1477 h->fds[i] = fds[i];
1478 GRPC_FD_REF(fds[i], "multipoller");
1479 }
1480}
1481
Craig Tillera75d18a2016-02-25 08:45:00 -08001482#endif /* !GPR_LINUX_MULTIPOLL_WITH_EPOLL */
1483
Craig Tiller1a969c82016-02-22 12:13:16 -08001484/*******************************************************************************
1485 * pollset_multipoller_with_epoll_posix.c
1486 */
1487
1488#ifdef GPR_LINUX_MULTIPOLL_WITH_EPOLL
1489
1490#include <errno.h>
1491#include <poll.h>
1492#include <string.h>
1493#include <sys/epoll.h>
1494#include <unistd.h>
1495
1496#include <grpc/support/alloc.h>
1497#include <grpc/support/log.h>
1498#include <grpc/support/useful.h>
1499
Craig Tiller8a034482016-03-28 16:09:04 -07001500#include "src/core/lib/iomgr/ev_posix.h"
1501#include "src/core/lib/profiling/timers.h"
1502#include "src/core/lib/support/block_annotate.h"
Craig Tiller1a969c82016-02-22 12:13:16 -08001503
Craig Tiller24ee9b82016-02-25 16:36:44 -08001504static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st) {
1505 /* only one set_ready can be active at once (but there may be a racing
1506 notify_on) */
1507 gpr_mu_lock(&fd->mu);
1508 set_ready_locked(exec_ctx, fd, st);
1509 gpr_mu_unlock(&fd->mu);
1510}
1511
1512static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
1513 set_ready(exec_ctx, fd, &fd->read_closure);
1514}
1515
1516static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
1517 set_ready(exec_ctx, fd, &fd->write_closure);
1518}
1519
Craig Tiller1a969c82016-02-22 12:13:16 -08001520struct epoll_fd_list {
1521 int *epoll_fds;
1522 size_t count;
1523 size_t capacity;
1524};
1525
1526static struct epoll_fd_list epoll_fd_global_list;
1527static gpr_once init_epoll_fd_list_mu = GPR_ONCE_INIT;
1528static gpr_mu epoll_fd_list_mu;
1529
1530static void init_mu(void) { gpr_mu_init(&epoll_fd_list_mu); }
1531
1532static void add_epoll_fd_to_global_list(int epoll_fd) {
1533 gpr_once_init(&init_epoll_fd_list_mu, init_mu);
1534
1535 gpr_mu_lock(&epoll_fd_list_mu);
1536 if (epoll_fd_global_list.count == epoll_fd_global_list.capacity) {
1537 epoll_fd_global_list.capacity =
1538 GPR_MAX((size_t)8, epoll_fd_global_list.capacity * 2);
1539 epoll_fd_global_list.epoll_fds =
1540 gpr_realloc(epoll_fd_global_list.epoll_fds,
1541 epoll_fd_global_list.capacity * sizeof(int));
1542 }
1543 epoll_fd_global_list.epoll_fds[epoll_fd_global_list.count++] = epoll_fd;
1544 gpr_mu_unlock(&epoll_fd_list_mu);
1545}
1546
1547static void remove_epoll_fd_from_global_list(int epoll_fd) {
1548 gpr_mu_lock(&epoll_fd_list_mu);
1549 GPR_ASSERT(epoll_fd_global_list.count > 0);
1550 for (size_t i = 0; i < epoll_fd_global_list.count; i++) {
1551 if (epoll_fd == epoll_fd_global_list.epoll_fds[i]) {
1552 epoll_fd_global_list.epoll_fds[i] =
1553 epoll_fd_global_list.epoll_fds[--(epoll_fd_global_list.count)];
1554 break;
1555 }
1556 }
1557 gpr_mu_unlock(&epoll_fd_list_mu);
1558}
1559
Craig Tillera75d18a2016-02-25 08:45:00 -08001560static void remove_fd_from_all_epoll_sets(int fd) {
Craig Tiller1a969c82016-02-22 12:13:16 -08001561 int err;
1562 gpr_once_init(&init_epoll_fd_list_mu, init_mu);
1563 gpr_mu_lock(&epoll_fd_list_mu);
1564 if (epoll_fd_global_list.count == 0) {
1565 gpr_mu_unlock(&epoll_fd_list_mu);
1566 return;
1567 }
1568 for (size_t i = 0; i < epoll_fd_global_list.count; i++) {
1569 err = epoll_ctl(epoll_fd_global_list.epoll_fds[i], EPOLL_CTL_DEL, fd, NULL);
1570 if (err < 0 && errno != ENOENT) {
1571 gpr_log(GPR_ERROR, "epoll_ctl del for %d failed: %s", fd,
1572 strerror(errno));
1573 }
1574 }
1575 gpr_mu_unlock(&epoll_fd_list_mu);
1576}
1577
1578typedef struct {
1579 grpc_pollset *pollset;
1580 grpc_fd *fd;
1581 grpc_closure closure;
1582} delayed_add;
1583
1584typedef struct { int epoll_fd; } epoll_hdr;
1585
1586static void finally_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1587 grpc_fd *fd) {
1588 epoll_hdr *h = pollset->data.ptr;
1589 struct epoll_event ev;
1590 int err;
1591 grpc_fd_watcher watcher;
1592
1593 /* We pretend to be polling whilst adding an fd to keep the fd from being
1594 closed during the add. This may result in a spurious wakeup being assigned
1595 to this pollset whilst adding, but that should be benign. */
Craig Tillera75d18a2016-02-25 08:45:00 -08001596 GPR_ASSERT(fd_begin_poll(fd, pollset, NULL, 0, 0, &watcher) == 0);
Craig Tiller1a969c82016-02-22 12:13:16 -08001597 if (watcher.fd != NULL) {
1598 ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
1599 ev.data.ptr = fd;
1600 err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev);
1601 if (err < 0) {
1602 /* FDs may be added to a pollset multiple times, so EEXIST is normal. */
1603 if (errno != EEXIST) {
1604 gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", fd->fd,
1605 strerror(errno));
1606 }
1607 }
1608 }
Craig Tillera75d18a2016-02-25 08:45:00 -08001609 fd_end_poll(exec_ctx, &watcher, 0, 0);
Craig Tiller1a969c82016-02-22 12:13:16 -08001610}
1611
1612static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg,
Craig Tillerc027e772016-05-03 16:27:00 -07001613 grpc_error *error) {
Craig Tiller1a969c82016-02-22 12:13:16 -08001614 delayed_add *da = arg;
1615
Craig Tillera75d18a2016-02-25 08:45:00 -08001616 if (!fd_is_orphaned(da->fd)) {
Craig Tiller1a969c82016-02-22 12:13:16 -08001617 finally_add_fd(exec_ctx, da->pollset, da->fd);
1618 }
1619
Craig Tiller7f2cdc32016-02-25 16:02:11 -08001620 gpr_mu_lock(&da->pollset->mu);
Craig Tiller1a969c82016-02-22 12:13:16 -08001621 da->pollset->in_flight_cbs--;
1622 if (da->pollset->shutting_down) {
1623 /* We don't care about this pollset anymore. */
1624 if (da->pollset->in_flight_cbs == 0 && !da->pollset->called_shutdown) {
1625 da->pollset->called_shutdown = 1;
Craig Tiller27f59af2016-04-28 14:19:48 -07001626 grpc_exec_ctx_push(exec_ctx, da->pollset->shutdown_done, GRPC_ERROR_NONE,
1627 NULL);
Craig Tiller1a969c82016-02-22 12:13:16 -08001628 }
1629 }
Craig Tiller7f2cdc32016-02-25 16:02:11 -08001630 gpr_mu_unlock(&da->pollset->mu);
Craig Tiller1a969c82016-02-22 12:13:16 -08001631
1632 GRPC_FD_UNREF(da->fd, "delayed_add");
1633
1634 gpr_free(da);
1635}
1636
1637static void multipoll_with_epoll_pollset_add_fd(grpc_exec_ctx *exec_ctx,
1638 grpc_pollset *pollset,
1639 grpc_fd *fd,
1640 int and_unlock_pollset) {
1641 if (and_unlock_pollset) {
Craig Tiller7f2cdc32016-02-25 16:02:11 -08001642 gpr_mu_unlock(&pollset->mu);
Craig Tiller1a969c82016-02-22 12:13:16 -08001643 finally_add_fd(exec_ctx, pollset, fd);
1644 } else {
1645 delayed_add *da = gpr_malloc(sizeof(*da));
1646 da->pollset = pollset;
1647 da->fd = fd;
1648 GRPC_FD_REF(fd, "delayed_add");
1649 grpc_closure_init(&da->closure, perform_delayed_add, da);
1650 pollset->in_flight_cbs++;
Craig Tiller27f59af2016-04-28 14:19:48 -07001651 grpc_exec_ctx_push(exec_ctx, &da->closure, GRPC_ERROR_NONE, NULL);
Craig Tiller1a969c82016-02-22 12:13:16 -08001652 }
1653}
1654
1655/* TODO(klempner): We probably want to turn this down a bit */
1656#define GRPC_EPOLL_MAX_EVENTS 1000
1657
Craig Tiller0f75fbe2016-05-06 21:10:06 -07001658static grpc_error *multipoll_with_epoll_pollset_maybe_work_and_unlock(
Craig Tiller1a969c82016-02-22 12:13:16 -08001659 grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker,
1660 gpr_timespec deadline, gpr_timespec now) {
1661 struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
1662 int ep_rv;
1663 int poll_rv;
1664 epoll_hdr *h = pollset->data.ptr;
1665 int timeout_ms;
1666 struct pollfd pfds[2];
Craig Tiller0f75fbe2016-05-06 21:10:06 -07001667 grpc_error *error = GRPC_ERROR_NONE;
Craig Tiller1a969c82016-02-22 12:13:16 -08001668
1669 /* If you want to ignore epoll's ability to sanely handle parallel pollers,
1670 * for a more apples-to-apples performance comparison with poll, add a
1671 * if (pollset->counter != 0) { return 0; }
1672 * here.
1673 */
1674
Craig Tiller7f2cdc32016-02-25 16:02:11 -08001675 gpr_mu_unlock(&pollset->mu);
Craig Tiller1a969c82016-02-22 12:13:16 -08001676
Craig Tillera75d18a2016-02-25 08:45:00 -08001677 timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
Craig Tiller1a969c82016-02-22 12:13:16 -08001678
1679 pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd->fd);
1680 pfds[0].events = POLLIN;
1681 pfds[0].revents = 0;
1682 pfds[1].fd = h->epoll_fd;
1683 pfds[1].events = POLLIN;
1684 pfds[1].revents = 0;
1685
1686 /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
1687 even going into the blocking annotation if possible */
1688 GPR_TIMER_BEGIN("poll", 0);
1689 GRPC_SCHEDULING_START_BLOCKING_REGION;
1690 poll_rv = grpc_poll_function(pfds, 2, timeout_ms);
1691 GRPC_SCHEDULING_END_BLOCKING_REGION;
1692 GPR_TIMER_END("poll", 0);
1693
1694 if (poll_rv < 0) {
1695 if (errno != EINTR) {
1696 gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
1697 }
1698 } else if (poll_rv == 0) {
1699 /* do nothing */
1700 } else {
1701 if (pfds[0].revents) {
Craig Tiller1aee5362016-05-07 11:26:50 -07001702 work_combine_error(&error,
1703 grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd));
Craig Tiller1a969c82016-02-22 12:13:16 -08001704 }
1705 if (pfds[1].revents) {
1706 do {
1707 /* The following epoll_wait never blocks; it has a timeout of 0 */
1708 ep_rv = epoll_wait(h->epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
1709 if (ep_rv < 0) {
1710 if (errno != EINTR) {
Craig Tiller0f75fbe2016-05-06 21:10:06 -07001711 work_combine_error(&error, GRPC_OS_ERROR(errno, "epoll_wait"));
Craig Tiller1a969c82016-02-22 12:13:16 -08001712 }
1713 } else {
1714 int i;
1715 for (i = 0; i < ep_rv; ++i) {
1716 grpc_fd *fd = ep_ev[i].data.ptr;
1717 /* TODO(klempner): We might want to consider making err and pri
1718 * separate events */
1719 int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
1720 int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
1721 int write_ev = ep_ev[i].events & EPOLLOUT;
1722 if (fd == NULL) {
Craig Tiller1aee5362016-05-07 11:26:50 -07001723 work_combine_error(&error, grpc_wakeup_fd_consume_wakeup(
1724 &grpc_global_wakeup_fd));
Craig Tiller1a969c82016-02-22 12:13:16 -08001725 } else {
1726 if (read_ev || cancel) {
Craig Tillera75d18a2016-02-25 08:45:00 -08001727 fd_become_readable(exec_ctx, fd);
Craig Tiller1a969c82016-02-22 12:13:16 -08001728 }
1729 if (write_ev || cancel) {
Craig Tillera75d18a2016-02-25 08:45:00 -08001730 fd_become_writable(exec_ctx, fd);
Craig Tiller1a969c82016-02-22 12:13:16 -08001731 }
1732 }
1733 }
1734 }
1735 } while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
1736 }
1737 }
Craig Tiller0f75fbe2016-05-06 21:10:06 -07001738 return error;
Craig Tiller1a969c82016-02-22 12:13:16 -08001739}
1740
1741static void multipoll_with_epoll_pollset_finish_shutdown(
1742 grpc_pollset *pollset) {}
1743
1744static void multipoll_with_epoll_pollset_destroy(grpc_pollset *pollset) {
1745 epoll_hdr *h = pollset->data.ptr;
1746 close(h->epoll_fd);
1747 remove_epoll_fd_from_global_list(h->epoll_fd);
1748 gpr_free(h);
1749}
1750
1751static const grpc_pollset_vtable multipoll_with_epoll_pollset = {
1752 multipoll_with_epoll_pollset_add_fd,
1753 multipoll_with_epoll_pollset_maybe_work_and_unlock,
1754 multipoll_with_epoll_pollset_finish_shutdown,
1755 multipoll_with_epoll_pollset_destroy};
1756
1757static void epoll_become_multipoller(grpc_exec_ctx *exec_ctx,
1758 grpc_pollset *pollset, grpc_fd **fds,
1759 size_t nfds) {
1760 size_t i;
1761 epoll_hdr *h = gpr_malloc(sizeof(epoll_hdr));
1762 struct epoll_event ev;
1763 int err;
1764
1765 pollset->vtable = &multipoll_with_epoll_pollset;
1766 pollset->data.ptr = h;
1767 h->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
1768 if (h->epoll_fd < 0) {
1769 /* TODO(klempner): Fall back to poll here, especially on ENOSYS */
1770 gpr_log(GPR_ERROR, "epoll_create1 failed: %s", strerror(errno));
1771 abort();
1772 }
1773 add_epoll_fd_to_global_list(h->epoll_fd);
1774
1775 ev.events = (uint32_t)(EPOLLIN | EPOLLET);
1776 ev.data.ptr = NULL;
1777 err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD,
1778 GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), &ev);
1779 if (err < 0) {
1780 gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s",
1781 GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd),
1782 strerror(errno));
1783 }
1784
1785 for (i = 0; i < nfds; i++) {
1786 multipoll_with_epoll_pollset_add_fd(exec_ctx, pollset, fds[i], 0);
1787 }
1788}
1789
Craig Tiller1a969c82016-02-22 12:13:16 -08001790#else /* GPR_LINUX_MULTIPOLL_WITH_EPOLL */
1791
Craig Tillera75d18a2016-02-25 08:45:00 -08001792static void remove_fd_from_all_epoll_sets(int fd) {}
Craig Tiller1a969c82016-02-22 12:13:16 -08001793
1794#endif /* GPR_LINUX_MULTIPOLL_WITH_EPOLL */
1795
Craig Tillera3f34422016-02-25 07:16:24 -08001796/*******************************************************************************
1797 * pollset_set_posix.c
1798 */
1799
Craig Tillera75d18a2016-02-25 08:45:00 -08001800static grpc_pollset_set *pollset_set_create(void) {
Craig Tillera3f34422016-02-25 07:16:24 -08001801 grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
1802 memset(pollset_set, 0, sizeof(*pollset_set));
1803 gpr_mu_init(&pollset_set->mu);
1804 return pollset_set;
1805}
1806
Craig Tillera75d18a2016-02-25 08:45:00 -08001807static void pollset_set_destroy(grpc_pollset_set *pollset_set) {
Craig Tillera3f34422016-02-25 07:16:24 -08001808 size_t i;
1809 gpr_mu_destroy(&pollset_set->mu);
1810 for (i = 0; i < pollset_set->fd_count; i++) {
1811 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1812 }
1813 gpr_free(pollset_set->pollsets);
1814 gpr_free(pollset_set->pollset_sets);
1815 gpr_free(pollset_set->fds);
1816 gpr_free(pollset_set);
1817}
1818
Craig Tillera75d18a2016-02-25 08:45:00 -08001819static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
1820 grpc_pollset_set *pollset_set,
1821 grpc_pollset *pollset) {
Craig Tillera3f34422016-02-25 07:16:24 -08001822 size_t i, j;
1823 gpr_mu_lock(&pollset_set->mu);
1824 if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
1825 pollset_set->pollset_capacity =
1826 GPR_MAX(8, 2 * pollset_set->pollset_capacity);
1827 pollset_set->pollsets =
1828 gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity *
1829 sizeof(*pollset_set->pollsets));
1830 }
1831 pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
1832 for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
Craig Tillera75d18a2016-02-25 08:45:00 -08001833 if (fd_is_orphaned(pollset_set->fds[i])) {
Craig Tillera3f34422016-02-25 07:16:24 -08001834 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1835 } else {
Craig Tillera75d18a2016-02-25 08:45:00 -08001836 pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
Craig Tillera3f34422016-02-25 07:16:24 -08001837 pollset_set->fds[j++] = pollset_set->fds[i];
1838 }
1839 }
1840 pollset_set->fd_count = j;
1841 gpr_mu_unlock(&pollset_set->mu);
1842}
1843
Craig Tillera75d18a2016-02-25 08:45:00 -08001844static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
1845 grpc_pollset_set *pollset_set,
1846 grpc_pollset *pollset) {
Craig Tillera3f34422016-02-25 07:16:24 -08001847 size_t i;
1848 gpr_mu_lock(&pollset_set->mu);
1849 for (i = 0; i < pollset_set->pollset_count; i++) {
1850 if (pollset_set->pollsets[i] == pollset) {
1851 pollset_set->pollset_count--;
1852 GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i],
1853 pollset_set->pollsets[pollset_set->pollset_count]);
1854 break;
1855 }
1856 }
1857 gpr_mu_unlock(&pollset_set->mu);
1858}
1859
Craig Tillera75d18a2016-02-25 08:45:00 -08001860static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
1861 grpc_pollset_set *bag,
1862 grpc_pollset_set *item) {
Craig Tillera3f34422016-02-25 07:16:24 -08001863 size_t i, j;
1864 gpr_mu_lock(&bag->mu);
1865 if (bag->pollset_set_count == bag->pollset_set_capacity) {
1866 bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
1867 bag->pollset_sets =
1868 gpr_realloc(bag->pollset_sets,
1869 bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
1870 }
1871 bag->pollset_sets[bag->pollset_set_count++] = item;
1872 for (i = 0, j = 0; i < bag->fd_count; i++) {
Craig Tillera75d18a2016-02-25 08:45:00 -08001873 if (fd_is_orphaned(bag->fds[i])) {
Craig Tillera3f34422016-02-25 07:16:24 -08001874 GRPC_FD_UNREF(bag->fds[i], "pollset_set");
1875 } else {
Craig Tillera75d18a2016-02-25 08:45:00 -08001876 pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
Craig Tillera3f34422016-02-25 07:16:24 -08001877 bag->fds[j++] = bag->fds[i];
1878 }
1879 }
1880 bag->fd_count = j;
1881 gpr_mu_unlock(&bag->mu);
1882}
1883
Craig Tillera75d18a2016-02-25 08:45:00 -08001884static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
1885 grpc_pollset_set *bag,
1886 grpc_pollset_set *item) {
Craig Tillera3f34422016-02-25 07:16:24 -08001887 size_t i;
1888 gpr_mu_lock(&bag->mu);
1889 for (i = 0; i < bag->pollset_set_count; i++) {
1890 if (bag->pollset_sets[i] == item) {
1891 bag->pollset_set_count--;
1892 GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i],
1893 bag->pollset_sets[bag->pollset_set_count]);
1894 break;
1895 }
1896 }
1897 gpr_mu_unlock(&bag->mu);
1898}
1899
Craig Tillera75d18a2016-02-25 08:45:00 -08001900static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
1901 grpc_pollset_set *pollset_set, grpc_fd *fd) {
Craig Tillera3f34422016-02-25 07:16:24 -08001902 size_t i;
1903 gpr_mu_lock(&pollset_set->mu);
1904 if (pollset_set->fd_count == pollset_set->fd_capacity) {
1905 pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
1906 pollset_set->fds = gpr_realloc(
1907 pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds));
1908 }
1909 GRPC_FD_REF(fd, "pollset_set");
1910 pollset_set->fds[pollset_set->fd_count++] = fd;
1911 for (i = 0; i < pollset_set->pollset_count; i++) {
Craig Tillera75d18a2016-02-25 08:45:00 -08001912 pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd);
Craig Tillera3f34422016-02-25 07:16:24 -08001913 }
1914 for (i = 0; i < pollset_set->pollset_set_count; i++) {
Craig Tillera75d18a2016-02-25 08:45:00 -08001915 pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
Craig Tillera3f34422016-02-25 07:16:24 -08001916 }
1917 gpr_mu_unlock(&pollset_set->mu);
1918}
1919
Craig Tillera75d18a2016-02-25 08:45:00 -08001920static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
1921 grpc_pollset_set *pollset_set, grpc_fd *fd) {
Craig Tillera3f34422016-02-25 07:16:24 -08001922 size_t i;
1923 gpr_mu_lock(&pollset_set->mu);
1924 for (i = 0; i < pollset_set->fd_count; i++) {
1925 if (pollset_set->fds[i] == fd) {
1926 pollset_set->fd_count--;
1927 GPR_SWAP(grpc_fd *, pollset_set->fds[i],
1928 pollset_set->fds[pollset_set->fd_count]);
1929 GRPC_FD_UNREF(fd, "pollset_set");
1930 break;
1931 }
1932 }
1933 for (i = 0; i < pollset_set->pollset_set_count; i++) {
Craig Tillera75d18a2016-02-25 08:45:00 -08001934 pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
Craig Tillera3f34422016-02-25 07:16:24 -08001935 }
1936 gpr_mu_unlock(&pollset_set->mu);
1937}
1938
Craig Tillera75d18a2016-02-25 08:45:00 -08001939/*******************************************************************************
1940 * event engine binding
1941 */
1942
1943static void shutdown_engine(void) {
1944 fd_global_shutdown();
1945 pollset_global_shutdown();
1946}
1947
1948static const grpc_event_engine_vtable vtable = {
1949 .pollset_size = sizeof(grpc_pollset),
1950
1951 .fd_create = fd_create,
1952 .fd_wrapped_fd = fd_wrapped_fd,
1953 .fd_orphan = fd_orphan,
1954 .fd_shutdown = fd_shutdown,
1955 .fd_notify_on_read = fd_notify_on_read,
1956 .fd_notify_on_write = fd_notify_on_write,
1957
1958 .pollset_init = pollset_init,
1959 .pollset_shutdown = pollset_shutdown,
1960 .pollset_reset = pollset_reset,
1961 .pollset_destroy = pollset_destroy,
1962 .pollset_work = pollset_work,
1963 .pollset_kick = pollset_kick,
1964 .pollset_add_fd = pollset_add_fd,
1965
1966 .pollset_set_create = pollset_set_create,
1967 .pollset_set_destroy = pollset_set_destroy,
1968 .pollset_set_add_pollset = pollset_set_add_pollset,
1969 .pollset_set_del_pollset = pollset_set_del_pollset,
1970 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
1971 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
1972 .pollset_set_add_fd = pollset_set_add_fd,
1973 .pollset_set_del_fd = pollset_set_del_fd,
1974
1975 .kick_poller = kick_poller,
1976
1977 .shutdown_engine = shutdown_engine,
1978};
1979
1980const grpc_event_engine_vtable *grpc_init_poll_and_epoll_posix(void) {
Craig Tiller4f1d0f32016-05-06 17:12:37 -07001981 const char *msg;
1982 grpc_error *err = GRPC_ERROR_NONE;
Craig Tillera75d18a2016-02-25 08:45:00 -08001983#ifdef GPR_LINUX_MULTIPOLL_WITH_EPOLL
1984 platform_become_multipoller = epoll_become_multipoller;
1985#else
1986 platform_become_multipoller = poll_become_multipoller;
1987#endif
1988 fd_global_init();
Craig Tiller4f1d0f32016-05-06 17:12:37 -07001989 err = pollset_global_init();
1990 if (err != GRPC_ERROR_NONE) goto error;
Craig Tillera75d18a2016-02-25 08:45:00 -08001991 return &vtable;
Craig Tiller4f1d0f32016-05-06 17:12:37 -07001992
1993error:
1994 msg = grpc_error_string(err);
1995 gpr_log(GPR_ERROR, "%s", msg);
1996 grpc_error_free_string(msg);
1997 return NULL;
Craig Tillera75d18a2016-02-25 08:45:00 -08001998}
1999
Craig Tiller190d3602015-02-18 09:23:38 -08002000#endif