blob: 53df2d7737ab4645bf6d3b31bb23a4d13e3d9267 [file] [log] [blame]
Craig Tiller253bd502016-02-25 12:30:23 -08001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2016 gRPC authors.
Craig Tiller253bd502016-02-25 12:30:23 -08004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
Craig Tiller253bd502016-02-25 12:30:23 -08008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
Craig Tiller253bd502016-02-25 12:30:23 -080010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
Craig Tiller253bd502016-02-25 12:30:23 -080016 *
17 */
18
murgatroid9954070892016-08-08 17:01:18 -070019#include "src/core/lib/iomgr/port.h"
Craig Tiller253bd502016-02-25 12:30:23 -080020
murgatroid99623dd4f2016-08-08 17:31:27 -070021#ifdef GRPC_POSIX_SOCKET
Craig Tiller253bd502016-02-25 12:30:23 -080022
Craig Tillerd9a60bb2016-03-28 23:13:19 -070023#include "src/core/lib/iomgr/ev_poll_posix.h"
Craig Tiller253bd502016-02-25 12:30:23 -080024
25#include <assert.h>
26#include <errno.h>
27#include <poll.h>
28#include <string.h>
29#include <sys/socket.h>
30#include <unistd.h>
31
32#include <grpc/support/alloc.h>
33#include <grpc/support/log.h>
34#include <grpc/support/string_util.h>
Ken Payson82e4ec72016-10-13 12:26:01 -070035#include <grpc/support/thd.h>
Craig Tiller253bd502016-02-25 12:30:23 -080036#include <grpc/support/tls.h>
37#include <grpc/support/useful.h>
38
Craig Tillerb4bb1cd2017-07-20 14:18:17 -070039#include "src/core/lib/debug/stats.h"
Craig Tillerd9a60bb2016-03-28 23:13:19 -070040#include "src/core/lib/iomgr/iomgr_internal.h"
Craig Tiller185f6c92017-03-17 08:33:19 -070041#include "src/core/lib/iomgr/timer.h"
Ken Payson82e4ec72016-10-13 12:26:01 -070042#include "src/core/lib/iomgr/wakeup_fd_cv.h"
Craig Tillerd9a60bb2016-03-28 23:13:19 -070043#include "src/core/lib/iomgr/wakeup_fd_posix.h"
44#include "src/core/lib/profiling/timers.h"
45#include "src/core/lib/support/block_annotate.h"
Ken Payson16b39d62016-12-06 12:18:28 -080046#include "src/core/lib/support/murmur_hash.h"
Craig Tiller253bd502016-02-25 12:30:23 -080047
Craig Tillere24b24d2017-04-06 16:05:45 -070048#define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1)
49
Craig Tiller253bd502016-02-25 12:30:23 -080050/*******************************************************************************
51 * FD declarations
52 */
53
54typedef struct grpc_fd_watcher {
55 struct grpc_fd_watcher *next;
56 struct grpc_fd_watcher *prev;
57 grpc_pollset *pollset;
58 grpc_pollset_worker *worker;
59 grpc_fd *fd;
60} grpc_fd_watcher;
61
62struct grpc_fd {
63 int fd;
64 /* refst format:
65 bit0: 1=active/0=orphaned
66 bit1-n: refcount
67 meaning that mostly we ref by two to avoid altering the orphaned bit,
68 and just unref by 1 when we're ready to flag the object as orphaned */
69 gpr_atm refst;
70
71 gpr_mu mu;
72 int shutdown;
73 int closed;
74 int released;
Craig Tillercda759d2017-01-27 11:37:37 -080075 grpc_error *shutdown_error;
Craig Tiller253bd502016-02-25 12:30:23 -080076
77 /* The watcher list.
78
79 The following watcher related fields are protected by watcher_mu.
80
81 An fd_watcher is an ephemeral object created when an fd wants to
82 begin polling, and destroyed after the poll.
83
84 It denotes the fd's interest in whether to read poll or write poll
85 or both or neither on this fd.
86
87 If a watcher is asked to poll for reads or writes, the read_watcher
88 or write_watcher fields are set respectively. A watcher may be asked
89 to poll for both, in which case both fields will be set.
90
91 read_watcher and write_watcher may be NULL if no watcher has been
92 asked to poll for reads or writes.
93
94 If an fd_watcher is not asked to poll for reads or writes, it's added
95 to a linked list of inactive watchers, rooted at inactive_watcher_root.
96 If at a later time there becomes need of a poller to poll, one of
97 the inactive pollers may be kicked out of their poll loops to take
98 that responsibility. */
99 grpc_fd_watcher inactive_watcher_root;
100 grpc_fd_watcher *read_watcher;
101 grpc_fd_watcher *write_watcher;
102
103 grpc_closure *read_closure;
104 grpc_closure *write_closure;
105
Craig Tiller253bd502016-02-25 12:30:23 -0800106 grpc_closure *on_done_closure;
107
108 grpc_iomgr_object iomgr_object;
Craig Tillerb1d3b362016-05-14 13:20:21 -0700109
110 /* The pollset that last noticed and notified that the fd is readable */
111 grpc_pollset *read_notifier_pollset;
Craig Tiller253bd502016-02-25 12:30:23 -0800112};
113
114/* Begin polling on an fd.
115 Registers that the given pollset is interested in this fd - so that if read
116 or writability interest changes, the pollset can be kicked to pick up that
117 new interest.
118 Return value is:
119 (fd_needs_read? read_mask : 0) | (fd_needs_write? write_mask : 0)
120 i.e. a combination of read_mask and write_mask determined by the fd's current
121 interest in said events.
122 Polling strategies that do not need to alter their behavior depending on the
123 fd's current interest (such as epoll) do not need to call this function.
124 MUST NOT be called with a pollset lock taken */
125static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
126 grpc_pollset_worker *worker, uint32_t read_mask,
127 uint32_t write_mask, grpc_fd_watcher *rec);
128/* Complete polling previously started with fd_begin_poll
129 MUST NOT be called with a pollset lock taken
130 if got_read or got_write are 1, also does the become_{readable,writable} as
131 appropriate. */
132static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *rec,
Craig Tillerb1d3b362016-05-14 13:20:21 -0700133 int got_read, int got_write,
134 grpc_pollset *read_notifier_pollset);
Craig Tiller253bd502016-02-25 12:30:23 -0800135
136/* Return 1 if this fd is orphaned, 0 otherwise */
137static bool fd_is_orphaned(grpc_fd *fd);
138
ncteisend39010e2017-06-08 17:08:07 -0700139#ifndef NDEBUG
Craig Tiller253bd502016-02-25 12:30:23 -0800140static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
141static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
142 int line);
143#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
144#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
145#else
146static void fd_ref(grpc_fd *fd);
147static void fd_unref(grpc_fd *fd);
148#define GRPC_FD_REF(fd, reason) fd_ref(fd)
149#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
150#endif
151
Craig Tiller253bd502016-02-25 12:30:23 -0800152#define CLOSURE_NOT_READY ((grpc_closure *)0)
153#define CLOSURE_READY ((grpc_closure *)1)
154
155/*******************************************************************************
156 * pollset declarations
157 */
158
Craig Tiller253bd502016-02-25 12:30:23 -0800159typedef struct grpc_cached_wakeup_fd {
160 grpc_wakeup_fd fd;
161 struct grpc_cached_wakeup_fd *next;
162} grpc_cached_wakeup_fd;
163
164struct grpc_pollset_worker {
165 grpc_cached_wakeup_fd *wakeup_fd;
166 int reevaluate_polling_on_wakeup;
167 int kicked_specifically;
168 struct grpc_pollset_worker *next;
169 struct grpc_pollset_worker *prev;
170};
171
172struct grpc_pollset {
Craig Tillerac04b7f2016-02-26 08:36:44 -0800173 gpr_mu mu;
Craig Tiller253bd502016-02-25 12:30:23 -0800174 grpc_pollset_worker root_worker;
Craig Tiller253bd502016-02-25 12:30:23 -0800175 int shutting_down;
176 int called_shutdown;
177 int kicked_without_pollers;
178 grpc_closure *shutdown_done;
179 grpc_closure_list idle_jobs;
Craig Tiller22543592017-02-14 10:29:36 -0800180 int pollset_set_count;
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800181 /* all polled fds */
182 size_t fd_count;
183 size_t fd_capacity;
184 grpc_fd **fds;
Craig Tiller253bd502016-02-25 12:30:23 -0800185 /* Local cache of eventfds for workers */
186 grpc_cached_wakeup_fd *local_wakeup_cache;
187};
188
Craig Tiller253bd502016-02-25 12:30:23 -0800189/* Add an fd to a pollset */
190static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
191 struct grpc_fd *fd);
192
193static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
194 grpc_pollset_set *pollset_set, grpc_fd *fd);
195
196/* Convert a timespec to milliseconds:
197 - very small or negative poll times are clamped to zero to do a
198 non-blocking poll (which becomes spin polling)
199 - other small values are rounded up to one millisecond
200 - longer than a millisecond polls are rounded up to the next nearest
201 millisecond to avoid spinning
202 - infinite timeouts are converted to -1 */
203static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
204 gpr_timespec now);
205
206/* Allow kick to wakeup the currently polling worker */
207#define GRPC_POLLSET_CAN_KICK_SELF 1
208/* Force the wakee to repoll when awoken */
209#define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2
210/* As per pollset_kick, with an extended set of flags (defined above)
211 -- mostly for fd_posix's use. */
Craig Tillerd6a5b802016-05-13 20:17:38 -0700212static grpc_error *pollset_kick_ext(grpc_pollset *p,
213 grpc_pollset_worker *specific_worker,
214 uint32_t flags) GRPC_MUST_USE_RESULT;
Craig Tiller253bd502016-02-25 12:30:23 -0800215
Craig Tiller253bd502016-02-25 12:30:23 -0800216/* Return 1 if the pollset has active threads in pollset_work (pollset must
217 * be locked) */
Craig Tiller22543592017-02-14 10:29:36 -0800218static bool pollset_has_workers(grpc_pollset *pollset);
Craig Tiller253bd502016-02-25 12:30:23 -0800219
Craig Tiller253bd502016-02-25 12:30:23 -0800220/*******************************************************************************
221 * pollset_set definitions
222 */
223
224struct grpc_pollset_set {
225 gpr_mu mu;
226
227 size_t pollset_count;
228 size_t pollset_capacity;
229 grpc_pollset **pollsets;
230
231 size_t pollset_set_count;
232 size_t pollset_set_capacity;
233 struct grpc_pollset_set **pollset_sets;
234
235 size_t fd_count;
236 size_t fd_capacity;
237 grpc_fd **fds;
238};
239
240/*******************************************************************************
Ken Payson82e4ec72016-10-13 12:26:01 -0700241 * condition variable polling definitions
242 */
243
Ken Payson16b39d62016-12-06 12:18:28 -0800244#define POLLCV_THREAD_GRACE_MS 1000
Ken Payson82e4ec72016-10-13 12:26:01 -0700245#define CV_POLL_PERIOD_MS 1000
246#define CV_DEFAULT_TABLE_SIZE 16
247
Ken Payson16b39d62016-12-06 12:18:28 -0800248typedef struct poll_result {
Ken Payson82e4ec72016-10-13 12:26:01 -0700249 gpr_refcount refcount;
Ken Payson16b39d62016-12-06 12:18:28 -0800250 cv_node *watchers;
251 int watchcount;
Ken Payson82e4ec72016-10-13 12:26:01 -0700252 struct pollfd *fds;
253 nfds_t nfds;
Ken Payson82e4ec72016-10-13 12:26:01 -0700254 int retval;
255 int err;
Ken Payson16b39d62016-12-06 12:18:28 -0800256 int completed;
257} poll_result;
258
259typedef struct poll_args {
260 gpr_cv trigger;
261 int trigger_set;
262 struct pollfd *fds;
263 nfds_t nfds;
264 poll_result *result;
265 struct poll_args *next;
266 struct poll_args *prev;
Ken Payson82e4ec72016-10-13 12:26:01 -0700267} poll_args;
268
Ken Paysonbd344772017-08-09 23:00:47 -0700269// This is a 2-tiered cache, we mantain a hash table
Ken Payson16b39d62016-12-06 12:18:28 -0800270// of active poll calls, so we can wait on the result
Ken Paysonbd344772017-08-09 23:00:47 -0700271// of that call. We also maintain a freelist of inactive
Ken Payson16b39d62016-12-06 12:18:28 -0800272// poll threads.
273typedef struct poll_hash_table {
274 poll_args *free_pollers;
275 poll_args **active_pollers;
276 unsigned int size;
277 unsigned int count;
278} poll_hash_table;
279
280poll_hash_table poll_cache;
Ken Payson82e4ec72016-10-13 12:26:01 -0700281cv_fd_table g_cvfds;
282
283/*******************************************************************************
Craig Tiller253bd502016-02-25 12:30:23 -0800284 * fd_posix.c
285 */
286
ncteisend39010e2017-06-08 17:08:07 -0700287#ifndef NDEBUG
Craig Tiller253bd502016-02-25 12:30:23 -0800288#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
289#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
290static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
291 int line) {
ncteisend39010e2017-06-08 17:08:07 -0700292 if (GRPC_TRACER_ON(grpc_trace_fd_refcount)) {
ncteisen3ac64f82017-06-19 17:35:44 -0700293 gpr_log(GPR_DEBUG,
294 "FD %d %p ref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]",
295 fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst),
296 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
ncteisend39010e2017-06-08 17:08:07 -0700297 }
Craig Tiller253bd502016-02-25 12:30:23 -0800298#else
299#define REF_BY(fd, n, reason) ref_by(fd, n)
300#define UNREF_BY(fd, n, reason) unref_by(fd, n)
301static void ref_by(grpc_fd *fd, int n) {
302#endif
303 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
304}
305
ncteisend39010e2017-06-08 17:08:07 -0700306#ifndef NDEBUG
Craig Tiller253bd502016-02-25 12:30:23 -0800307static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
308 int line) {
ncteisend39010e2017-06-08 17:08:07 -0700309 if (GRPC_TRACER_ON(grpc_trace_fd_refcount)) {
ncteisen3ac64f82017-06-19 17:35:44 -0700310 gpr_log(GPR_DEBUG,
311 "FD %d %p unref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]",
312 fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst),
313 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
ncteisend39010e2017-06-08 17:08:07 -0700314 }
Craig Tiller253bd502016-02-25 12:30:23 -0800315#else
316static void unref_by(grpc_fd *fd, int n) {
Craig Tiller253bd502016-02-25 12:30:23 -0800317#endif
ncteisen973863d2017-06-12 10:28:50 -0700318 gpr_atm old = gpr_atm_full_fetch_add(&fd->refst, -n);
Craig Tiller253bd502016-02-25 12:30:23 -0800319 if (old == n) {
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800320 gpr_mu_destroy(&fd->mu);
Craig Tillerb38197e2016-02-26 10:14:54 -0800321 grpc_iomgr_unregister_object(&fd->iomgr_object);
Craig Tillercda759d2017-01-27 11:37:37 -0800322 if (fd->shutdown) GRPC_ERROR_UNREF(fd->shutdown_error);
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800323 gpr_free(fd);
Craig Tiller253bd502016-02-25 12:30:23 -0800324 } else {
325 GPR_ASSERT(old > n);
326 }
327}
328
Craig Tiller253bd502016-02-25 12:30:23 -0800329static grpc_fd *fd_create(int fd, const char *name) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700330 grpc_fd *r = (grpc_fd *)gpr_malloc(sizeof(*r));
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800331 gpr_mu_init(&r->mu);
332 gpr_atm_rel_store(&r->refst, 1);
333 r->shutdown = 0;
334 r->read_closure = CLOSURE_NOT_READY;
335 r->write_closure = CLOSURE_NOT_READY;
336 r->fd = fd;
337 r->inactive_watcher_root.next = r->inactive_watcher_root.prev =
338 &r->inactive_watcher_root;
339 r->read_watcher = r->write_watcher = NULL;
340 r->on_done_closure = NULL;
341 r->closed = 0;
342 r->released = 0;
Craig Tillerb1d3b362016-05-14 13:20:21 -0700343 r->read_notifier_pollset = NULL;
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800344
Craig Tiller253bd502016-02-25 12:30:23 -0800345 char *name2;
346 gpr_asprintf(&name2, "%s fd=%d", name, fd);
347 grpc_iomgr_register_object(&r->iomgr_object, name2);
348 gpr_free(name2);
Craig Tiller253bd502016-02-25 12:30:23 -0800349 return r;
350}
351
352static bool fd_is_orphaned(grpc_fd *fd) {
353 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
354}
355
Craig Tillerb1d3b362016-05-14 13:20:21 -0700356/* Return the read-notifier pollset */
357static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
358 grpc_fd *fd) {
359 grpc_pollset *notifier = NULL;
360
361 gpr_mu_lock(&fd->mu);
362 notifier = fd->read_notifier_pollset;
363 gpr_mu_unlock(&fd->mu);
364
365 return notifier;
366}
367
Craig Tillerd6a5b802016-05-13 20:17:38 -0700368static grpc_error *pollset_kick_locked(grpc_fd_watcher *watcher) {
Craig Tillerac04b7f2016-02-26 08:36:44 -0800369 gpr_mu_lock(&watcher->pollset->mu);
Craig Tiller253bd502016-02-25 12:30:23 -0800370 GPR_ASSERT(watcher->worker);
Craig Tillerd6a5b802016-05-13 20:17:38 -0700371 grpc_error *err = pollset_kick_ext(watcher->pollset, watcher->worker,
372 GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP);
Craig Tillerac04b7f2016-02-26 08:36:44 -0800373 gpr_mu_unlock(&watcher->pollset->mu);
Craig Tillerd6a5b802016-05-13 20:17:38 -0700374 return err;
Craig Tiller253bd502016-02-25 12:30:23 -0800375}
376
377static void maybe_wake_one_watcher_locked(grpc_fd *fd) {
378 if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) {
379 pollset_kick_locked(fd->inactive_watcher_root.next);
380 } else if (fd->read_watcher) {
381 pollset_kick_locked(fd->read_watcher);
382 } else if (fd->write_watcher) {
383 pollset_kick_locked(fd->write_watcher);
384 }
385}
386
387static void wake_all_watchers_locked(grpc_fd *fd) {
388 grpc_fd_watcher *watcher;
389 for (watcher = fd->inactive_watcher_root.next;
390 watcher != &fd->inactive_watcher_root; watcher = watcher->next) {
391 pollset_kick_locked(watcher);
392 }
393 if (fd->read_watcher) {
394 pollset_kick_locked(fd->read_watcher);
395 }
396 if (fd->write_watcher && fd->write_watcher != fd->read_watcher) {
397 pollset_kick_locked(fd->write_watcher);
398 }
399}
400
401static int has_watchers(grpc_fd *fd) {
402 return fd->read_watcher != NULL || fd->write_watcher != NULL ||
403 fd->inactive_watcher_root.next != &fd->inactive_watcher_root;
404}
405
406static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
407 fd->closed = 1;
408 if (!fd->released) {
409 close(fd->fd);
Craig Tiller253bd502016-02-25 12:30:23 -0800410 }
ncteisen969b46e2017-06-08 14:57:11 -0700411 GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE);
Craig Tiller253bd502016-02-25 12:30:23 -0800412}
413
414static int fd_wrapped_fd(grpc_fd *fd) {
415 if (fd->released || fd->closed) {
416 return -1;
417 } else {
418 return fd->fd;
419 }
420}
421
422static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
423 grpc_closure *on_done, int *release_fd,
Yuchen Zengd40a7ae2017-07-12 15:59:56 -0700424 bool already_closed, const char *reason) {
Craig Tiller253bd502016-02-25 12:30:23 -0800425 fd->on_done_closure = on_done;
426 fd->released = release_fd != NULL;
Yuchen Zengd40a7ae2017-07-12 15:59:56 -0700427 if (release_fd != NULL) {
Craig Tiller253bd502016-02-25 12:30:23 -0800428 *release_fd = fd->fd;
Yuchen Zengd40a7ae2017-07-12 15:59:56 -0700429 fd->released = true;
430 } else if (already_closed) {
431 fd->released = true;
Craig Tiller253bd502016-02-25 12:30:23 -0800432 }
433 gpr_mu_lock(&fd->mu);
434 REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
435 if (!has_watchers(fd)) {
436 close_fd_locked(exec_ctx, fd);
437 } else {
438 wake_all_watchers_locked(fd);
439 }
440 gpr_mu_unlock(&fd->mu);
441 UNREF_BY(fd, 2, reason); /* drop the reference */
442}
443
444/* increment refcount by two to avoid changing the orphan bit */
ncteisend39010e2017-06-08 17:08:07 -0700445#ifndef NDEBUG
Craig Tiller253bd502016-02-25 12:30:23 -0800446static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
447 int line) {
448 ref_by(fd, 2, reason, file, line);
449}
450
451static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
452 int line) {
453 unref_by(fd, 2, reason, file, line);
454}
455#else
456static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
457
458static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
459#endif
460
Craig Tillercda759d2017-01-27 11:37:37 -0800461static grpc_error *fd_shutdown_error(grpc_fd *fd) {
462 if (!fd->shutdown) {
Craig Tillerd6a5b802016-05-13 20:17:38 -0700463 return GRPC_ERROR_NONE;
464 } else {
ncteisen4b36a3d2017-03-13 19:08:06 -0700465 return GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
466 "FD shutdown", &fd->shutdown_error, 1);
Craig Tillerd6a5b802016-05-13 20:17:38 -0700467 }
468}
469
Craig Tiller253bd502016-02-25 12:30:23 -0800470static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
471 grpc_closure **st, grpc_closure *closure) {
Craig Tiller52f23122016-06-15 09:34:14 -0700472 if (fd->shutdown) {
ncteisen969b46e2017-06-08 14:57:11 -0700473 GRPC_CLOSURE_SCHED(exec_ctx, closure,
ncteisen4b36a3d2017-03-13 19:08:06 -0700474 GRPC_ERROR_CREATE_FROM_STATIC_STRING("FD shutdown"));
Craig Tiller52f23122016-06-15 09:34:14 -0700475 } else if (*st == CLOSURE_NOT_READY) {
Craig Tiller253bd502016-02-25 12:30:23 -0800476 /* not ready ==> switch to a waiting state by setting the closure */
477 *st = closure;
478 } else if (*st == CLOSURE_READY) {
479 /* already ready ==> queue the closure to run immediately */
480 *st = CLOSURE_NOT_READY;
ncteisen969b46e2017-06-08 14:57:11 -0700481 GRPC_CLOSURE_SCHED(exec_ctx, closure, fd_shutdown_error(fd));
Craig Tiller253bd502016-02-25 12:30:23 -0800482 maybe_wake_one_watcher_locked(fd);
483 } else {
484 /* upcallptr was set to a different closure. This is an error! */
485 gpr_log(GPR_ERROR,
486 "User called a notify_on function with a previous callback still "
487 "pending");
488 abort();
489 }
490}
491
492/* returns 1 if state becomes not ready */
493static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
494 grpc_closure **st) {
495 if (*st == CLOSURE_READY) {
496 /* duplicate ready ==> ignore */
497 return 0;
498 } else if (*st == CLOSURE_NOT_READY) {
499 /* not ready, and not waiting ==> flag ready */
500 *st = CLOSURE_READY;
501 return 0;
502 } else {
503 /* waiting ==> queue closure */
ncteisen969b46e2017-06-08 14:57:11 -0700504 GRPC_CLOSURE_SCHED(exec_ctx, *st, fd_shutdown_error(fd));
Craig Tiller253bd502016-02-25 12:30:23 -0800505 *st = CLOSURE_NOT_READY;
506 return 1;
507 }
508}
509
Craig Tillerb1d3b362016-05-14 13:20:21 -0700510static void set_read_notifier_pollset_locked(
511 grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_pollset *read_notifier_pollset) {
512 fd->read_notifier_pollset = read_notifier_pollset;
513}
514
Craig Tillercda759d2017-01-27 11:37:37 -0800515static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
Craig Tiller253bd502016-02-25 12:30:23 -0800516 gpr_mu_lock(&fd->mu);
Craig Tiller52f23122016-06-15 09:34:14 -0700517 /* only shutdown once */
518 if (!fd->shutdown) {
519 fd->shutdown = 1;
Craig Tillercda759d2017-01-27 11:37:37 -0800520 fd->shutdown_error = why;
Craig Tiller52f23122016-06-15 09:34:14 -0700521 /* signal read/write closed to OS so that future operations fail */
522 shutdown(fd->fd, SHUT_RDWR);
523 set_ready_locked(exec_ctx, fd, &fd->read_closure);
524 set_ready_locked(exec_ctx, fd, &fd->write_closure);
Craig Tillercda759d2017-01-27 11:37:37 -0800525 } else {
526 GRPC_ERROR_UNREF(why);
Craig Tiller52f23122016-06-15 09:34:14 -0700527 }
Craig Tiller253bd502016-02-25 12:30:23 -0800528 gpr_mu_unlock(&fd->mu);
529}
530
Craig Tiller52f23122016-06-15 09:34:14 -0700531static bool fd_is_shutdown(grpc_fd *fd) {
532 gpr_mu_lock(&fd->mu);
533 bool r = fd->shutdown;
534 gpr_mu_unlock(&fd->mu);
535 return r;
536}
537
Craig Tiller253bd502016-02-25 12:30:23 -0800538static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
539 grpc_closure *closure) {
540 gpr_mu_lock(&fd->mu);
541 notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
542 gpr_mu_unlock(&fd->mu);
543}
544
545static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
546 grpc_closure *closure) {
547 gpr_mu_lock(&fd->mu);
548 notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
549 gpr_mu_unlock(&fd->mu);
550}
551
552static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
553 grpc_pollset_worker *worker, uint32_t read_mask,
554 uint32_t write_mask, grpc_fd_watcher *watcher) {
555 uint32_t mask = 0;
556 grpc_closure *cur;
557 int requested;
558 /* keep track of pollers that have requested our events, in case they change
559 */
560 GRPC_FD_REF(fd, "poll");
561
562 gpr_mu_lock(&fd->mu);
563
564 /* if we are shutdown, then don't add to the watcher set */
565 if (fd->shutdown) {
566 watcher->fd = NULL;
567 watcher->pollset = NULL;
568 watcher->worker = NULL;
569 gpr_mu_unlock(&fd->mu);
570 GRPC_FD_UNREF(fd, "poll");
571 return 0;
572 }
573
574 /* if there is nobody polling for read, but we need to, then start doing so */
575 cur = fd->read_closure;
576 requested = cur != CLOSURE_READY;
577 if (read_mask && fd->read_watcher == NULL && requested) {
578 fd->read_watcher = watcher;
579 mask |= read_mask;
580 }
581 /* if there is nobody polling for write, but we need to, then start doing so
582 */
583 cur = fd->write_closure;
584 requested = cur != CLOSURE_READY;
585 if (write_mask && fd->write_watcher == NULL && requested) {
586 fd->write_watcher = watcher;
587 mask |= write_mask;
588 }
589 /* if not polling, remember this watcher in case we need someone to later */
590 if (mask == 0 && worker != NULL) {
591 watcher->next = &fd->inactive_watcher_root;
592 watcher->prev = watcher->next->prev;
593 watcher->next->prev = watcher->prev->next = watcher;
594 }
595 watcher->pollset = pollset;
596 watcher->worker = worker;
597 watcher->fd = fd;
598 gpr_mu_unlock(&fd->mu);
599
600 return mask;
601}
602
603static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
Craig Tillerb1d3b362016-05-14 13:20:21 -0700604 int got_read, int got_write,
605 grpc_pollset *read_notifier_pollset) {
Craig Tiller253bd502016-02-25 12:30:23 -0800606 int was_polling = 0;
607 int kick = 0;
608 grpc_fd *fd = watcher->fd;
609
610 if (fd == NULL) {
611 return;
612 }
613
614 gpr_mu_lock(&fd->mu);
615
616 if (watcher == fd->read_watcher) {
617 /* remove read watcher, kick if we still need a read */
618 was_polling = 1;
619 if (!got_read) {
620 kick = 1;
621 }
622 fd->read_watcher = NULL;
623 }
624 if (watcher == fd->write_watcher) {
625 /* remove write watcher, kick if we still need a write */
626 was_polling = 1;
627 if (!got_write) {
628 kick = 1;
629 }
630 fd->write_watcher = NULL;
631 }
632 if (!was_polling && watcher->worker != NULL) {
633 /* remove from inactive list */
634 watcher->next->prev = watcher->prev;
635 watcher->prev->next = watcher->next;
636 }
637 if (got_read) {
638 if (set_ready_locked(exec_ctx, fd, &fd->read_closure)) {
639 kick = 1;
640 }
Craig Tillerb1d3b362016-05-14 13:20:21 -0700641 if (read_notifier_pollset != NULL) {
642 set_read_notifier_pollset_locked(exec_ctx, fd, read_notifier_pollset);
643 }
Craig Tiller253bd502016-02-25 12:30:23 -0800644 }
645 if (got_write) {
646 if (set_ready_locked(exec_ctx, fd, &fd->write_closure)) {
647 kick = 1;
648 }
649 }
650 if (kick) {
651 maybe_wake_one_watcher_locked(fd);
652 }
653 if (fd_is_orphaned(fd) && !has_watchers(fd) && !fd->closed) {
654 close_fd_locked(exec_ctx, fd);
655 }
656 gpr_mu_unlock(&fd->mu);
657
658 GRPC_FD_UNREF(fd, "poll");
659}
660
Craig Tiller253bd502016-02-25 12:30:23 -0800661/*******************************************************************************
662 * pollset_posix.c
663 */
664
665GPR_TLS_DECL(g_current_thread_poller);
666GPR_TLS_DECL(g_current_thread_worker);
667
Craig Tiller253bd502016-02-25 12:30:23 -0800668static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
669 worker->prev->next = worker->next;
670 worker->next->prev = worker->prev;
671}
672
Craig Tiller22543592017-02-14 10:29:36 -0800673static bool pollset_has_workers(grpc_pollset *p) {
Craig Tiller253bd502016-02-25 12:30:23 -0800674 return p->root_worker.next != &p->root_worker;
675}
676
Craig Tiller22543592017-02-14 10:29:36 -0800677static bool pollset_in_pollset_sets(grpc_pollset *p) {
678 return p->pollset_set_count;
679}
680
681static bool pollset_has_observers(grpc_pollset *p) {
682 return pollset_has_workers(p) || pollset_in_pollset_sets(p);
683}
684
Craig Tiller253bd502016-02-25 12:30:23 -0800685static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
686 if (pollset_has_workers(p)) {
687 grpc_pollset_worker *w = p->root_worker.next;
688 remove_worker(p, w);
689 return w;
690 } else {
691 return NULL;
692 }
693}
694
695static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
696 worker->next = &p->root_worker;
697 worker->prev = worker->next->prev;
698 worker->prev->next = worker->next->prev = worker;
699}
700
701static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
702 worker->prev = &p->root_worker;
703 worker->next = worker->prev->next;
704 worker->prev->next = worker->next->prev = worker;
705}
706
Craig Tillerd6a5b802016-05-13 20:17:38 -0700707static void kick_append_error(grpc_error **composite, grpc_error *error) {
708 if (error == GRPC_ERROR_NONE) return;
709 if (*composite == GRPC_ERROR_NONE) {
ncteisen4b36a3d2017-03-13 19:08:06 -0700710 *composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Kick Failure");
Craig Tillerd6a5b802016-05-13 20:17:38 -0700711 }
712 *composite = grpc_error_add_child(*composite, error);
713}
714
715static grpc_error *pollset_kick_ext(grpc_pollset *p,
716 grpc_pollset_worker *specific_worker,
717 uint32_t flags) {
Craig Tiller253bd502016-02-25 12:30:23 -0800718 GPR_TIMER_BEGIN("pollset_kick_ext", 0);
Craig Tillerd6a5b802016-05-13 20:17:38 -0700719 grpc_error *error = GRPC_ERROR_NONE;
Craig Tiller253bd502016-02-25 12:30:23 -0800720
721 /* pollset->mu already held */
722 if (specific_worker != NULL) {
723 if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
724 GPR_TIMER_BEGIN("pollset_kick_ext.broadcast", 0);
725 GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
726 for (specific_worker = p->root_worker.next;
727 specific_worker != &p->root_worker;
728 specific_worker = specific_worker->next) {
Craig Tillerd6a5b802016-05-13 20:17:38 -0700729 kick_append_error(
730 &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
Craig Tiller253bd502016-02-25 12:30:23 -0800731 }
Craig Tillerd6a5b802016-05-13 20:17:38 -0700732 p->kicked_without_pollers = true;
Craig Tiller253bd502016-02-25 12:30:23 -0800733 GPR_TIMER_END("pollset_kick_ext.broadcast", 0);
734 } else if (gpr_tls_get(&g_current_thread_worker) !=
735 (intptr_t)specific_worker) {
736 GPR_TIMER_MARK("different_thread_worker", 0);
737 if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
Craig Tillerd6a5b802016-05-13 20:17:38 -0700738 specific_worker->reevaluate_polling_on_wakeup = true;
Craig Tiller253bd502016-02-25 12:30:23 -0800739 }
Craig Tillerd6a5b802016-05-13 20:17:38 -0700740 specific_worker->kicked_specifically = true;
741 kick_append_error(&error,
742 grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
Craig Tiller253bd502016-02-25 12:30:23 -0800743 } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) {
744 GPR_TIMER_MARK("kick_yoself", 0);
745 if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
Craig Tillerd6a5b802016-05-13 20:17:38 -0700746 specific_worker->reevaluate_polling_on_wakeup = true;
Craig Tiller253bd502016-02-25 12:30:23 -0800747 }
Craig Tillerd6a5b802016-05-13 20:17:38 -0700748 specific_worker->kicked_specifically = true;
749 kick_append_error(&error,
750 grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
Craig Tiller253bd502016-02-25 12:30:23 -0800751 }
752 } else if (gpr_tls_get(&g_current_thread_poller) != (intptr_t)p) {
753 GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
754 GPR_TIMER_MARK("kick_anonymous", 0);
755 specific_worker = pop_front_worker(p);
756 if (specific_worker != NULL) {
757 if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) {
758 GPR_TIMER_MARK("kick_anonymous_not_self", 0);
759 push_back_worker(p, specific_worker);
760 specific_worker = pop_front_worker(p);
761 if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 &&
762 gpr_tls_get(&g_current_thread_worker) ==
763 (intptr_t)specific_worker) {
764 push_back_worker(p, specific_worker);
765 specific_worker = NULL;
766 }
767 }
768 if (specific_worker != NULL) {
769 GPR_TIMER_MARK("finally_kick", 0);
770 push_back_worker(p, specific_worker);
Craig Tillerd6a5b802016-05-13 20:17:38 -0700771 kick_append_error(
772 &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
Craig Tiller253bd502016-02-25 12:30:23 -0800773 }
774 } else {
775 GPR_TIMER_MARK("kicked_no_pollers", 0);
Craig Tillerd6a5b802016-05-13 20:17:38 -0700776 p->kicked_without_pollers = true;
Craig Tiller253bd502016-02-25 12:30:23 -0800777 }
778 }
779
780 GPR_TIMER_END("pollset_kick_ext", 0);
Craig Tiller449c64b2016-06-13 16:26:50 -0700781 GRPC_LOG_IF_ERROR("pollset_kick_ext", GRPC_ERROR_REF(error));
Craig Tillerd6a5b802016-05-13 20:17:38 -0700782 return error;
Craig Tiller253bd502016-02-25 12:30:23 -0800783}
784
Craig Tillerd6a5b802016-05-13 20:17:38 -0700785static grpc_error *pollset_kick(grpc_pollset *p,
786 grpc_pollset_worker *specific_worker) {
787 return pollset_kick_ext(p, specific_worker, 0);
Craig Tiller253bd502016-02-25 12:30:23 -0800788}
789
790/* global state management */
791
Craig Tillerd6a5b802016-05-13 20:17:38 -0700792static grpc_error *pollset_global_init(void) {
Craig Tiller253bd502016-02-25 12:30:23 -0800793 gpr_tls_init(&g_current_thread_poller);
794 gpr_tls_init(&g_current_thread_worker);
Craig Tillerc3571792017-05-02 12:33:38 -0700795 return GRPC_ERROR_NONE;
Craig Tiller253bd502016-02-25 12:30:23 -0800796}
797
798static void pollset_global_shutdown(void) {
Craig Tiller253bd502016-02-25 12:30:23 -0800799 gpr_tls_destroy(&g_current_thread_poller);
800 gpr_tls_destroy(&g_current_thread_worker);
Craig Tiller253bd502016-02-25 12:30:23 -0800801}
802
Craig Tiller253bd502016-02-25 12:30:23 -0800803/* main interface */
804
Craig Tillerac04b7f2016-02-26 08:36:44 -0800805static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
806 gpr_mu_init(&pollset->mu);
807 *mu = &pollset->mu;
Craig Tiller253bd502016-02-25 12:30:23 -0800808 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
Craig Tiller253bd502016-02-25 12:30:23 -0800809 pollset->shutting_down = 0;
810 pollset->called_shutdown = 0;
811 pollset->kicked_without_pollers = 0;
812 pollset->idle_jobs.head = pollset->idle_jobs.tail = NULL;
813 pollset->local_wakeup_cache = NULL;
814 pollset->kicked_without_pollers = 0;
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800815 pollset->fd_count = 0;
Craig Tillerb38197e2016-02-26 10:14:54 -0800816 pollset->fd_capacity = 0;
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800817 pollset->fds = NULL;
Craig Tiller22543592017-02-14 10:29:36 -0800818 pollset->pollset_set_count = 0;
Craig Tiller253bd502016-02-25 12:30:23 -0800819}
820
Craig Tillerf8401102017-04-17 09:47:28 -0700821static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
Craig Tiller253bd502016-02-25 12:30:23 -0800822 GPR_ASSERT(!pollset_has_workers(pollset));
823 GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail);
Craig Tiller253bd502016-02-25 12:30:23 -0800824 while (pollset->local_wakeup_cache) {
825 grpc_cached_wakeup_fd *next = pollset->local_wakeup_cache->next;
826 grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd);
827 gpr_free(pollset->local_wakeup_cache);
828 pollset->local_wakeup_cache = next;
829 }
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800830 gpr_free(pollset->fds);
Craig Tillerac04b7f2016-02-26 08:36:44 -0800831 gpr_mu_destroy(&pollset->mu);
Craig Tiller253bd502016-02-25 12:30:23 -0800832}
833
Craig Tiller253bd502016-02-25 12:30:23 -0800834static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
835 grpc_fd *fd) {
Craig Tillerac04b7f2016-02-26 08:36:44 -0800836 gpr_mu_lock(&pollset->mu);
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800837 size_t i;
838 /* TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here */
839 for (i = 0; i < pollset->fd_count; i++) {
840 if (pollset->fds[i] == fd) goto exit;
841 }
842 if (pollset->fd_count == pollset->fd_capacity) {
843 pollset->fd_capacity =
844 GPR_MAX(pollset->fd_capacity + 8, pollset->fd_count * 3 / 2);
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700845 pollset->fds = (grpc_fd **)gpr_realloc(
846 pollset->fds, sizeof(grpc_fd *) * pollset->fd_capacity);
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800847 }
848 pollset->fds[pollset->fd_count++] = fd;
849 GRPC_FD_REF(fd, "multipoller");
Craig Tiller4c2218e2016-05-11 10:27:08 -0700850 pollset_kick(pollset, NULL);
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800851exit:
Craig Tillerac04b7f2016-02-26 08:36:44 -0800852 gpr_mu_unlock(&pollset->mu);
Craig Tiller253bd502016-02-25 12:30:23 -0800853}
854
855static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
856 GPR_ASSERT(grpc_closure_list_empty(pollset->idle_jobs));
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800857 size_t i;
858 for (i = 0; i < pollset->fd_count; i++) {
859 GRPC_FD_UNREF(pollset->fds[i], "multipoller");
860 }
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800861 pollset->fd_count = 0;
ncteisen969b46e2017-06-08 14:57:11 -0700862 GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE);
Craig Tiller253bd502016-02-25 12:30:23 -0800863}
864
Craig Tillerd6a5b802016-05-13 20:17:38 -0700865static void work_combine_error(grpc_error **composite, grpc_error *error) {
866 if (error == GRPC_ERROR_NONE) return;
867 if (*composite == GRPC_ERROR_NONE) {
ncteisen4b36a3d2017-03-13 19:08:06 -0700868 *composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("pollset_work");
Craig Tillerd6a5b802016-05-13 20:17:38 -0700869 }
870 *composite = grpc_error_add_child(*composite, error);
871}
872
873static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
874 grpc_pollset_worker **worker_hdl,
875 gpr_timespec now, gpr_timespec deadline) {
Craig Tiller253bd502016-02-25 12:30:23 -0800876 grpc_pollset_worker worker;
Craig Tiller557c88c2017-04-05 17:20:18 -0700877 if (worker_hdl) *worker_hdl = &worker;
Craig Tillerd6a5b802016-05-13 20:17:38 -0700878 grpc_error *error = GRPC_ERROR_NONE;
Craig Tiller253bd502016-02-25 12:30:23 -0800879
Sree Kuchibhotla69b74782016-07-14 21:23:22 -0700880 /* Avoid malloc for small number of elements. */
881 enum { inline_elements = 96 };
882 struct pollfd pollfd_space[inline_elements];
883 struct grpc_fd_watcher watcher_space[inline_elements];
884
Craig Tiller253bd502016-02-25 12:30:23 -0800885 /* pollset->mu already held */
886 int added_worker = 0;
887 int locked = 1;
888 int queued_work = 0;
889 int keep_polling = 0;
890 GPR_TIMER_BEGIN("pollset_work", 0);
891 /* this must happen before we (potentially) drop pollset->mu */
892 worker.next = worker.prev = NULL;
893 worker.reevaluate_polling_on_wakeup = 0;
894 if (pollset->local_wakeup_cache != NULL) {
895 worker.wakeup_fd = pollset->local_wakeup_cache;
896 pollset->local_wakeup_cache = worker.wakeup_fd->next;
897 } else {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700898 worker.wakeup_fd =
899 (grpc_cached_wakeup_fd *)gpr_malloc(sizeof(*worker.wakeup_fd));
Craig Tillerd6a5b802016-05-13 20:17:38 -0700900 error = grpc_wakeup_fd_init(&worker.wakeup_fd->fd);
901 if (error != GRPC_ERROR_NONE) {
Craig Tiller449c64b2016-06-13 16:26:50 -0700902 GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
Craig Tillerd6a5b802016-05-13 20:17:38 -0700903 return error;
904 }
Craig Tiller253bd502016-02-25 12:30:23 -0800905 }
906 worker.kicked_specifically = 0;
907 /* If there's work waiting for the pollset to be idle, and the
908 pollset is idle, then do that work */
909 if (!pollset_has_workers(pollset) &&
910 !grpc_closure_list_empty(pollset->idle_jobs)) {
911 GPR_TIMER_MARK("pollset_work.idle_jobs", 0);
ncteisen969b46e2017-06-08 14:57:11 -0700912 GRPC_CLOSURE_LIST_SCHED(exec_ctx, &pollset->idle_jobs);
Craig Tiller253bd502016-02-25 12:30:23 -0800913 goto done;
914 }
915 /* If we're shutting down then we don't execute any extended work */
916 if (pollset->shutting_down) {
917 GPR_TIMER_MARK("pollset_work.shutting_down", 0);
918 goto done;
919 }
Craig Tiller253bd502016-02-25 12:30:23 -0800920 /* Start polling, and keep doing so while we're being asked to
921 re-evaluate our pollers (this allows poll() based pollers to
922 ensure they don't miss wakeups) */
923 keep_polling = 1;
Craig Tiller556e5ae2016-05-16 11:00:33 -0700924 gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset);
Craig Tiller253bd502016-02-25 12:30:23 -0800925 while (keep_polling) {
926 keep_polling = 0;
927 if (!pollset->kicked_without_pollers) {
928 if (!added_worker) {
929 push_front_worker(pollset, &worker);
930 added_worker = 1;
931 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
932 }
Craig Tiller253bd502016-02-25 12:30:23 -0800933 GPR_TIMER_BEGIN("maybe_work_and_unlock", 0);
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800934#define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR)
935#define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR)
936
937 int timeout;
938 int r;
Craig Tillerae09d9d2016-05-20 22:23:37 -0700939 size_t i, fd_count;
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800940 nfds_t pfd_count;
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800941 grpc_fd_watcher *watchers;
942 struct pollfd *pfds;
943
944 timeout = poll_deadline_to_millis_timeout(deadline, now);
Sree Kuchibhotla69b74782016-07-14 21:23:22 -0700945
946 if (pollset->fd_count + 2 <= inline_elements) {
947 pfds = pollfd_space;
948 watchers = watcher_space;
949 } else {
950 /* Allocate one buffer to hold both pfds and watchers arrays */
951 const size_t pfd_size = sizeof(*pfds) * (pollset->fd_count + 2);
952 const size_t watch_size = sizeof(*watchers) * (pollset->fd_count + 2);
953 void *buf = gpr_malloc(pfd_size + watch_size);
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700954 pfds = (struct pollfd *)buf;
955 watchers = (grpc_fd_watcher *)(void *)((char *)buf + pfd_size);
Sree Kuchibhotla69b74782016-07-14 21:23:22 -0700956 }
957
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800958 fd_count = 0;
Craig Tillerc3571792017-05-02 12:33:38 -0700959 pfd_count = 1;
960 pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker.wakeup_fd->fd);
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800961 pfds[0].events = POLLIN;
962 pfds[0].revents = 0;
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800963 for (i = 0; i < pollset->fd_count; i++) {
Craig Tillerae09d9d2016-05-20 22:23:37 -0700964 if (fd_is_orphaned(pollset->fds[i])) {
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800965 GRPC_FD_UNREF(pollset->fds[i], "multipoller");
966 } else {
967 pollset->fds[fd_count++] = pollset->fds[i];
968 watchers[pfd_count].fd = pollset->fds[i];
Craig Tiller2fad50d2016-03-08 07:52:42 -0800969 GRPC_FD_REF(watchers[pfd_count].fd, "multipoller_start");
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800970 pfds[pfd_count].fd = pollset->fds[i]->fd;
971 pfds[pfd_count].revents = 0;
972 pfd_count++;
973 }
974 }
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800975 pollset->fd_count = fd_count;
Craig Tillerac04b7f2016-02-26 08:36:44 -0800976 gpr_mu_unlock(&pollset->mu);
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800977
Craig Tillerc3571792017-05-02 12:33:38 -0700978 for (i = 1; i < pfd_count; i++) {
Craig Tiller2fad50d2016-03-08 07:52:42 -0800979 grpc_fd *fd = watchers[i].fd;
980 pfds[i].events = (short)fd_begin_poll(fd, pollset, &worker, POLLIN,
Craig Tillerd9a60bb2016-03-28 23:13:19 -0700981 POLLOUT, &watchers[i]);
Craig Tiller2fad50d2016-03-08 07:52:42 -0800982 GRPC_FD_UNREF(fd, "multipoller_start");
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800983 }
984
985 /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
986 even going into the blocking annotation if possible */
987 GRPC_SCHEDULING_START_BLOCKING_REGION;
Craig Tillerb4bb1cd2017-07-20 14:18:17 -0700988 GRPC_STATS_INC_SYSCALL_POLL(exec_ctx);
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800989 r = grpc_poll_function(pfds, pfd_count, timeout);
990 GRPC_SCHEDULING_END_BLOCKING_REGION;
991
Craig Tiller6f1a0542017-07-20 08:53:29 -0700992 if (GRPC_TRACER_ON(grpc_polling_trace)) {
993 gpr_log(GPR_DEBUG, "%p poll=%d", pollset, r);
994 }
Craig Tiller1ab56d82017-07-19 09:55:57 -0700995
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800996 if (r < 0) {
997 if (errno != EINTR) {
Craig Tillerd6a5b802016-05-13 20:17:38 -0700998 work_combine_error(&error, GRPC_OS_ERROR(errno, "poll"));
Craig Tiller7ac6bf02016-02-25 12:54:59 -0800999 }
kpayson64668a8422016-10-10 01:40:09 -07001000
Craig Tillerc3571792017-05-02 12:33:38 -07001001 for (i = 1; i < pfd_count; i++) {
kpayson64668a8422016-10-10 01:40:09 -07001002 if (watchers[i].fd == NULL) {
1003 fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL);
1004 } else {
1005 // Wake up all the file descriptors, if we have an invalid one
1006 // we can identify it on the next pollset_work()
Ken Payson7347ad82016-10-11 11:12:29 -07001007 fd_end_poll(exec_ctx, &watchers[i], 1, 1, pollset);
kpayson64668a8422016-10-10 01:40:09 -07001008 }
Craig Tiller7ac6bf02016-02-25 12:54:59 -08001009 }
1010 } else if (r == 0) {
Craig Tillerc3571792017-05-02 12:33:38 -07001011 for (i = 1; i < pfd_count; i++) {
Craig Tillerb1d3b362016-05-14 13:20:21 -07001012 fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL);
Craig Tiller7ac6bf02016-02-25 12:54:59 -08001013 }
1014 } else {
1015 if (pfds[0].revents & POLLIN_CHECK) {
Craig Tiller1cf09992017-09-01 10:09:17 -07001016 if (GRPC_TRACER_ON(grpc_polling_trace)) {
1017 gpr_log(GPR_DEBUG, "%p: got_wakeup", pollset);
1018 }
Craig Tillerd6a5b802016-05-13 20:17:38 -07001019 work_combine_error(
1020 &error, grpc_wakeup_fd_consume_wakeup(&worker.wakeup_fd->fd));
Craig Tiller7ac6bf02016-02-25 12:54:59 -08001021 }
Craig Tillerc3571792017-05-02 12:33:38 -07001022 for (i = 1; i < pfd_count; i++) {
Craig Tiller7ac6bf02016-02-25 12:54:59 -08001023 if (watchers[i].fd == NULL) {
Craig Tillerb1d3b362016-05-14 13:20:21 -07001024 fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL);
Craig Tiller9d861202016-05-11 10:12:57 -07001025 } else {
Craig Tiller6f1a0542017-07-20 08:53:29 -07001026 if (GRPC_TRACER_ON(grpc_polling_trace)) {
1027 gpr_log(GPR_DEBUG, "%p got_event: %d r:%d w:%d [%d]", pollset,
1028 pfds[i].fd, (pfds[i].revents & POLLIN_CHECK) != 0,
1029 (pfds[i].revents & POLLOUT_CHECK) != 0, pfds[i].revents);
1030 }
Craig Tiller9d861202016-05-11 10:12:57 -07001031 fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN_CHECK,
Craig Tillerb1d3b362016-05-14 13:20:21 -07001032 pfds[i].revents & POLLOUT_CHECK, pollset);
Craig Tiller7ac6bf02016-02-25 12:54:59 -08001033 }
Craig Tiller7ac6bf02016-02-25 12:54:59 -08001034 }
1035 }
1036
Sree Kuchibhotla69b74782016-07-14 21:23:22 -07001037 if (pfds != pollfd_space) {
1038 /* pfds and watchers are in the same memory block pointed to by pfds */
1039 gpr_free(pfds);
1040 }
1041
Craig Tiller253bd502016-02-25 12:30:23 -08001042 GPR_TIMER_END("maybe_work_and_unlock", 0);
1043 locked = 0;
Craig Tiller253bd502016-02-25 12:30:23 -08001044 } else {
1045 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1046 pollset->kicked_without_pollers = 0;
1047 }
1048 /* Finished execution - start cleaning up.
1049 Note that we may arrive here from outside the enclosing while() loop.
1050 In that case we won't loop though as we haven't added worker to the
1051 worker list, which means nobody could ask us to re-evaluate polling). */
1052 done:
1053 if (!locked) {
1054 queued_work |= grpc_exec_ctx_flush(exec_ctx);
Craig Tillerac04b7f2016-02-26 08:36:44 -08001055 gpr_mu_lock(&pollset->mu);
Craig Tiller253bd502016-02-25 12:30:23 -08001056 locked = 1;
1057 }
1058 /* If we're forced to re-evaluate polling (via pollset_kick with
1059 GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force
1060 a loop */
Craig Tillerd6a5b802016-05-13 20:17:38 -07001061 if (worker.reevaluate_polling_on_wakeup && error == GRPC_ERROR_NONE) {
Craig Tiller253bd502016-02-25 12:30:23 -08001062 worker.reevaluate_polling_on_wakeup = 0;
1063 pollset->kicked_without_pollers = 0;
1064 if (queued_work || worker.kicked_specifically) {
1065 /* If there's queued work on the list, then set the deadline to be
1066 immediate so we get back out of the polling loop quickly */
1067 deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC);
1068 }
1069 keep_polling = 1;
1070 }
Craig Tiller9d861202016-05-11 10:12:57 -07001071 if (keep_polling) {
1072 now = gpr_now(now.clock_type);
1073 }
Craig Tiller253bd502016-02-25 12:30:23 -08001074 }
Craig Tiller556e5ae2016-05-16 11:00:33 -07001075 gpr_tls_set(&g_current_thread_poller, 0);
Craig Tiller253bd502016-02-25 12:30:23 -08001076 if (added_worker) {
1077 remove_worker(pollset, &worker);
1078 gpr_tls_set(&g_current_thread_worker, 0);
1079 }
1080 /* release wakeup fd to the local pool */
1081 worker.wakeup_fd->next = pollset->local_wakeup_cache;
1082 pollset->local_wakeup_cache = worker.wakeup_fd;
1083 /* check shutdown conditions */
1084 if (pollset->shutting_down) {
1085 if (pollset_has_workers(pollset)) {
1086 pollset_kick(pollset, NULL);
Craig Tiller22543592017-02-14 10:29:36 -08001087 } else if (!pollset->called_shutdown && !pollset_has_observers(pollset)) {
Craig Tiller253bd502016-02-25 12:30:23 -08001088 pollset->called_shutdown = 1;
Craig Tillerac04b7f2016-02-26 08:36:44 -08001089 gpr_mu_unlock(&pollset->mu);
Craig Tiller253bd502016-02-25 12:30:23 -08001090 finish_shutdown(exec_ctx, pollset);
1091 grpc_exec_ctx_flush(exec_ctx);
1092 /* Continuing to access pollset here is safe -- it is the caller's
1093 * responsibility to not destroy when it has outstanding calls to
1094 * pollset_work.
1095 * TODO(dklempner): Can we refactor the shutdown logic to avoid this? */
Craig Tillerac04b7f2016-02-26 08:36:44 -08001096 gpr_mu_lock(&pollset->mu);
Craig Tiller253bd502016-02-25 12:30:23 -08001097 } else if (!grpc_closure_list_empty(pollset->idle_jobs)) {
ncteisen969b46e2017-06-08 14:57:11 -07001098 GRPC_CLOSURE_LIST_SCHED(exec_ctx, &pollset->idle_jobs);
Craig Tillerac04b7f2016-02-26 08:36:44 -08001099 gpr_mu_unlock(&pollset->mu);
Craig Tiller253bd502016-02-25 12:30:23 -08001100 grpc_exec_ctx_flush(exec_ctx);
Craig Tillerac04b7f2016-02-26 08:36:44 -08001101 gpr_mu_lock(&pollset->mu);
Craig Tiller253bd502016-02-25 12:30:23 -08001102 }
1103 }
Craig Tiller557c88c2017-04-05 17:20:18 -07001104 if (worker_hdl) *worker_hdl = NULL;
Craig Tiller253bd502016-02-25 12:30:23 -08001105 GPR_TIMER_END("pollset_work", 0);
Craig Tiller449c64b2016-06-13 16:26:50 -07001106 GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
Craig Tillerd6a5b802016-05-13 20:17:38 -07001107 return error;
Craig Tiller253bd502016-02-25 12:30:23 -08001108}
1109
1110static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1111 grpc_closure *closure) {
1112 GPR_ASSERT(!pollset->shutting_down);
1113 pollset->shutting_down = 1;
1114 pollset->shutdown_done = closure;
1115 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1116 if (!pollset_has_workers(pollset)) {
ncteisen969b46e2017-06-08 14:57:11 -07001117 GRPC_CLOSURE_LIST_SCHED(exec_ctx, &pollset->idle_jobs);
Craig Tiller253bd502016-02-25 12:30:23 -08001118 }
Craig Tiller22543592017-02-14 10:29:36 -08001119 if (!pollset->called_shutdown && !pollset_has_observers(pollset)) {
Craig Tiller253bd502016-02-25 12:30:23 -08001120 pollset->called_shutdown = 1;
1121 finish_shutdown(exec_ctx, pollset);
1122 }
1123}
1124
1125static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
1126 gpr_timespec now) {
1127 gpr_timespec timeout;
1128 static const int64_t max_spin_polling_us = 10;
1129 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
1130 return -1;
1131 }
1132 if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
1133 max_spin_polling_us,
1134 GPR_TIMESPAN))) <= 0) {
1135 return 0;
1136 }
1137 timeout = gpr_time_sub(deadline, now);
1138 return gpr_time_to_millis(gpr_time_add(
1139 timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
1140}
1141
Craig Tiller253bd502016-02-25 12:30:23 -08001142/*******************************************************************************
1143 * pollset_set_posix.c
1144 */
1145
1146static grpc_pollset_set *pollset_set_create(void) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001147 grpc_pollset_set *pollset_set =
1148 (grpc_pollset_set *)gpr_zalloc(sizeof(*pollset_set));
Craig Tiller253bd502016-02-25 12:30:23 -08001149 gpr_mu_init(&pollset_set->mu);
1150 return pollset_set;
1151}
1152
Craig Tiller9e5ac1b2017-02-14 22:25:50 -08001153static void pollset_set_destroy(grpc_exec_ctx *exec_ctx,
1154 grpc_pollset_set *pollset_set) {
Craig Tiller253bd502016-02-25 12:30:23 -08001155 size_t i;
1156 gpr_mu_destroy(&pollset_set->mu);
1157 for (i = 0; i < pollset_set->fd_count; i++) {
1158 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1159 }
Craig Tiller9e5ac1b2017-02-14 22:25:50 -08001160 for (i = 0; i < pollset_set->pollset_count; i++) {
1161 grpc_pollset *pollset = pollset_set->pollsets[i];
1162 gpr_mu_lock(&pollset->mu);
1163 pollset->pollset_set_count--;
1164 /* check shutdown */
1165 if (pollset->shutting_down && !pollset->called_shutdown &&
1166 !pollset_has_observers(pollset)) {
1167 pollset->called_shutdown = 1;
1168 gpr_mu_unlock(&pollset->mu);
1169 finish_shutdown(exec_ctx, pollset);
1170 } else {
1171 gpr_mu_unlock(&pollset->mu);
1172 }
1173 }
Craig Tiller253bd502016-02-25 12:30:23 -08001174 gpr_free(pollset_set->pollsets);
1175 gpr_free(pollset_set->pollset_sets);
1176 gpr_free(pollset_set->fds);
1177 gpr_free(pollset_set);
1178}
1179
1180static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
1181 grpc_pollset_set *pollset_set,
1182 grpc_pollset *pollset) {
1183 size_t i, j;
Craig Tiller22543592017-02-14 10:29:36 -08001184 gpr_mu_lock(&pollset->mu);
1185 pollset->pollset_set_count++;
1186 gpr_mu_unlock(&pollset->mu);
Craig Tiller253bd502016-02-25 12:30:23 -08001187 gpr_mu_lock(&pollset_set->mu);
1188 if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
1189 pollset_set->pollset_capacity =
1190 GPR_MAX(8, 2 * pollset_set->pollset_capacity);
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001191 pollset_set->pollsets = (grpc_pollset **)gpr_realloc(
1192 pollset_set->pollsets,
1193 pollset_set->pollset_capacity * sizeof(*pollset_set->pollsets));
Craig Tiller253bd502016-02-25 12:30:23 -08001194 }
1195 pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
1196 for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
1197 if (fd_is_orphaned(pollset_set->fds[i])) {
1198 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1199 } else {
1200 pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
1201 pollset_set->fds[j++] = pollset_set->fds[i];
1202 }
1203 }
1204 pollset_set->fd_count = j;
1205 gpr_mu_unlock(&pollset_set->mu);
1206}
1207
1208static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
1209 grpc_pollset_set *pollset_set,
1210 grpc_pollset *pollset) {
1211 size_t i;
1212 gpr_mu_lock(&pollset_set->mu);
1213 for (i = 0; i < pollset_set->pollset_count; i++) {
1214 if (pollset_set->pollsets[i] == pollset) {
1215 pollset_set->pollset_count--;
1216 GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i],
1217 pollset_set->pollsets[pollset_set->pollset_count]);
1218 break;
1219 }
1220 }
1221 gpr_mu_unlock(&pollset_set->mu);
Craig Tiller22543592017-02-14 10:29:36 -08001222 gpr_mu_lock(&pollset->mu);
1223 pollset->pollset_set_count--;
1224 /* check shutdown */
1225 if (pollset->shutting_down && !pollset->called_shutdown &&
1226 !pollset_has_observers(pollset)) {
1227 pollset->called_shutdown = 1;
1228 gpr_mu_unlock(&pollset->mu);
1229 finish_shutdown(exec_ctx, pollset);
1230 } else {
1231 gpr_mu_unlock(&pollset->mu);
1232 }
Craig Tiller253bd502016-02-25 12:30:23 -08001233}
1234
1235static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
1236 grpc_pollset_set *bag,
1237 grpc_pollset_set *item) {
1238 size_t i, j;
1239 gpr_mu_lock(&bag->mu);
1240 if (bag->pollset_set_count == bag->pollset_set_capacity) {
1241 bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001242 bag->pollset_sets = (grpc_pollset_set **)gpr_realloc(
1243 bag->pollset_sets,
1244 bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
Craig Tiller253bd502016-02-25 12:30:23 -08001245 }
1246 bag->pollset_sets[bag->pollset_set_count++] = item;
1247 for (i = 0, j = 0; i < bag->fd_count; i++) {
1248 if (fd_is_orphaned(bag->fds[i])) {
1249 GRPC_FD_UNREF(bag->fds[i], "pollset_set");
1250 } else {
1251 pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
1252 bag->fds[j++] = bag->fds[i];
1253 }
1254 }
1255 bag->fd_count = j;
1256 gpr_mu_unlock(&bag->mu);
1257}
1258
1259static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
1260 grpc_pollset_set *bag,
1261 grpc_pollset_set *item) {
1262 size_t i;
1263 gpr_mu_lock(&bag->mu);
1264 for (i = 0; i < bag->pollset_set_count; i++) {
1265 if (bag->pollset_sets[i] == item) {
1266 bag->pollset_set_count--;
1267 GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i],
1268 bag->pollset_sets[bag->pollset_set_count]);
1269 break;
1270 }
1271 }
1272 gpr_mu_unlock(&bag->mu);
1273}
1274
1275static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
1276 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1277 size_t i;
1278 gpr_mu_lock(&pollset_set->mu);
1279 if (pollset_set->fd_count == pollset_set->fd_capacity) {
1280 pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001281 pollset_set->fds = (grpc_fd **)gpr_realloc(
Craig Tiller253bd502016-02-25 12:30:23 -08001282 pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds));
1283 }
1284 GRPC_FD_REF(fd, "pollset_set");
1285 pollset_set->fds[pollset_set->fd_count++] = fd;
1286 for (i = 0; i < pollset_set->pollset_count; i++) {
1287 pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd);
1288 }
1289 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1290 pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1291 }
1292 gpr_mu_unlock(&pollset_set->mu);
1293}
1294
1295static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
1296 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1297 size_t i;
1298 gpr_mu_lock(&pollset_set->mu);
1299 for (i = 0; i < pollset_set->fd_count; i++) {
1300 if (pollset_set->fds[i] == fd) {
1301 pollset_set->fd_count--;
1302 GPR_SWAP(grpc_fd *, pollset_set->fds[i],
1303 pollset_set->fds[pollset_set->fd_count]);
1304 GRPC_FD_UNREF(fd, "pollset_set");
1305 break;
1306 }
1307 }
1308 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1309 pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1310 }
1311 gpr_mu_unlock(&pollset_set->mu);
1312}
1313
1314/*******************************************************************************
Ken Payson82e4ec72016-10-13 12:26:01 -07001315 * Condition Variable polling extensions
1316 */
1317
Ken Payson16b39d62016-12-06 12:18:28 -08001318static void run_poll(void *args);
1319static void cache_poller_locked(poll_args *args);
1320
1321static void cache_insert_locked(poll_args *args) {
1322 uint32_t key = gpr_murmur_hash3(args->fds, args->nfds * sizeof(struct pollfd),
1323 0xDEADBEEF);
1324 key = key % poll_cache.size;
1325 if (poll_cache.active_pollers[key]) {
1326 poll_cache.active_pollers[key]->prev = args;
1327 }
1328 args->next = poll_cache.active_pollers[key];
1329 args->prev = NULL;
1330 poll_cache.active_pollers[key] = args;
1331 poll_cache.count++;
1332}
1333
Ken Paysonbd344772017-08-09 23:00:47 -07001334static void init_result(poll_args *pargs) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001335 pargs->result = (poll_result *)gpr_malloc(sizeof(poll_result));
Ken Payson16b39d62016-12-06 12:18:28 -08001336 gpr_ref_init(&pargs->result->refcount, 1);
1337 pargs->result->watchers = NULL;
1338 pargs->result->watchcount = 0;
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001339 pargs->result->fds =
1340 (struct pollfd *)gpr_malloc(sizeof(struct pollfd) * pargs->nfds);
Ken Payson16b39d62016-12-06 12:18:28 -08001341 memcpy(pargs->result->fds, pargs->fds, sizeof(struct pollfd) * pargs->nfds);
1342 pargs->result->nfds = pargs->nfds;
1343 pargs->result->retval = 0;
1344 pargs->result->err = 0;
1345 pargs->result->completed = 0;
1346}
1347
1348// Creates a poll_args object for a given arguments to poll().
1349// This object may return a poll_args in the cache.
1350static poll_args *get_poller_locked(struct pollfd *fds, nfds_t count) {
1351 uint32_t key =
1352 gpr_murmur_hash3(fds, count * sizeof(struct pollfd), 0xDEADBEEF);
1353 key = key % poll_cache.size;
1354 poll_args *curr = poll_cache.active_pollers[key];
1355 while (curr) {
1356 if (curr->nfds == count &&
1357 memcmp(curr->fds, fds, count * sizeof(struct pollfd)) == 0) {
1358 gpr_free(fds);
1359 return curr;
1360 }
1361 curr = curr->next;
1362 }
1363
1364 if (poll_cache.free_pollers) {
1365 poll_args *pargs = poll_cache.free_pollers;
1366 poll_cache.free_pollers = pargs->next;
1367 if (poll_cache.free_pollers) {
1368 poll_cache.free_pollers->prev = NULL;
1369 }
1370 pargs->fds = fds;
1371 pargs->nfds = count;
1372 pargs->next = NULL;
1373 pargs->prev = NULL;
1374 init_result(pargs);
1375 cache_poller_locked(pargs);
1376 return pargs;
1377 }
1378
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001379 poll_args *pargs = (poll_args *)gpr_malloc(sizeof(struct poll_args));
Ken Payson16b39d62016-12-06 12:18:28 -08001380 gpr_cv_init(&pargs->trigger);
1381 pargs->fds = fds;
1382 pargs->nfds = count;
1383 pargs->next = NULL;
1384 pargs->prev = NULL;
1385 pargs->trigger_set = 0;
1386 init_result(pargs);
1387 cache_poller_locked(pargs);
1388 gpr_thd_id t_id;
1389 gpr_thd_options opt = gpr_thd_options_default();
1390 gpr_ref(&g_cvfds.pollcount);
1391 gpr_thd_options_set_detached(&opt);
Ken Payson2240a132017-08-04 11:07:48 -07001392 GPR_ASSERT(gpr_thd_new(&t_id, &run_poll, pargs, &opt));
Ken Payson16b39d62016-12-06 12:18:28 -08001393 return pargs;
1394}
1395
1396static void cache_delete_locked(poll_args *args) {
1397 if (!args->prev) {
1398 uint32_t key = gpr_murmur_hash3(
1399 args->fds, args->nfds * sizeof(struct pollfd), 0xDEADBEEF);
1400 key = key % poll_cache.size;
1401 GPR_ASSERT(poll_cache.active_pollers[key] == args);
1402 poll_cache.active_pollers[key] = args->next;
1403 } else {
1404 args->prev->next = args->next;
1405 }
1406
1407 if (args->next) {
1408 args->next->prev = args->prev;
1409 }
1410
1411 poll_cache.count--;
1412 if (poll_cache.free_pollers) {
1413 poll_cache.free_pollers->prev = args;
1414 }
1415 args->prev = NULL;
1416 args->next = poll_cache.free_pollers;
1417 gpr_free(args->fds);
1418 poll_cache.free_pollers = args;
1419}
1420
1421static void cache_poller_locked(poll_args *args) {
1422 if (poll_cache.count + 1 > poll_cache.size / 2) {
1423 poll_args **old_active_pollers = poll_cache.active_pollers;
1424 poll_cache.size = poll_cache.size * 2;
1425 poll_cache.count = 0;
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001426 poll_cache.active_pollers =
1427 (poll_args **)gpr_malloc(sizeof(void *) * poll_cache.size);
Ken Payson16b39d62016-12-06 12:18:28 -08001428 for (unsigned int i = 0; i < poll_cache.size; i++) {
1429 poll_cache.active_pollers[i] = NULL;
1430 }
1431 for (unsigned int i = 0; i < poll_cache.size / 2; i++) {
1432 poll_args *curr = old_active_pollers[i];
1433 poll_args *next = NULL;
1434 while (curr) {
1435 next = curr->next;
1436 cache_insert_locked(curr);
1437 curr = next;
1438 }
1439 }
1440 gpr_free(old_active_pollers);
1441 }
1442
1443 cache_insert_locked(args);
1444}
1445
1446static void cache_destroy_locked(poll_args *args) {
1447 if (args->next) {
1448 args->next->prev = args->prev;
1449 }
1450
1451 if (args->prev) {
1452 args->prev->next = args->next;
1453 } else {
1454 poll_cache.free_pollers = args->next;
1455 }
1456
1457 gpr_free(args);
1458}
1459
1460static void decref_poll_result(poll_result *res) {
1461 if (gpr_unref(&res->refcount)) {
1462 GPR_ASSERT(!res->watchers);
1463 gpr_free(res->fds);
1464 gpr_free(res);
Ken Payson82e4ec72016-10-13 12:26:01 -07001465 }
1466}
1467
Ken Payson16b39d62016-12-06 12:18:28 -08001468void remove_cvn(cv_node **head, cv_node *target) {
1469 if (target->next) {
1470 target->next->prev = target->prev;
1471 }
1472
1473 if (target->prev) {
1474 target->prev->next = target->next;
1475 } else {
1476 *head = target->next;
1477 }
1478}
1479
1480gpr_timespec thread_grace;
1481
Ken Payson82e4ec72016-10-13 12:26:01 -07001482// Poll in a background thread
Ken Payson16b39d62016-12-06 12:18:28 -08001483static void run_poll(void *args) {
1484 poll_args *pargs = (poll_args *)args;
1485 while (1) {
1486 poll_result *result = pargs->result;
1487 int retval = g_cvfds.poll(result->fds, result->nfds, CV_POLL_PERIOD_MS);
1488 gpr_mu_lock(&g_cvfds.mu);
1489 if (retval != 0) {
1490 result->completed = 1;
1491 result->retval = retval;
1492 result->err = errno;
1493 cv_node *watcher = result->watchers;
1494 while (watcher) {
1495 gpr_cv_signal(watcher->cv);
1496 watcher = watcher->next;
1497 }
Ken Payson82e4ec72016-10-13 12:26:01 -07001498 }
Ken Payson16b39d62016-12-06 12:18:28 -08001499 if (result->watchcount == 0 || result->completed) {
1500 cache_delete_locked(pargs);
1501 decref_poll_result(result);
1502 // Leave this polling thread alive for a grace period to do another poll()
1503 // op
1504 gpr_timespec deadline = gpr_now(GPR_CLOCK_REALTIME);
1505 deadline = gpr_time_add(deadline, thread_grace);
1506 pargs->trigger_set = 0;
1507 gpr_cv_wait(&pargs->trigger, &g_cvfds.mu, deadline);
1508 if (!pargs->trigger_set) {
1509 cache_destroy_locked(pargs);
1510 break;
1511 }
Ken Payson82e4ec72016-10-13 12:26:01 -07001512 }
Ken Payson16b39d62016-12-06 12:18:28 -08001513 gpr_mu_unlock(&g_cvfds.mu);
Ken Payson82e4ec72016-10-13 12:26:01 -07001514 }
Ken Payson16b39d62016-12-06 12:18:28 -08001515
1516 // We still have the lock here
1517 if (gpr_unref(&g_cvfds.pollcount)) {
1518 gpr_cv_signal(&g_cvfds.shutdown_cv);
Ken Payson82e4ec72016-10-13 12:26:01 -07001519 }
1520 gpr_mu_unlock(&g_cvfds.mu);
1521}
1522
1523// This function overrides poll() to handle condition variable wakeup fds
1524static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) {
1525 unsigned int i;
1526 int res, idx;
Ken Payson16b39d62016-12-06 12:18:28 -08001527 cv_node *pollcv;
Ken Payson42909c52016-11-06 20:06:12 -08001528 int skip_poll = 0;
Ken Payson82e4ec72016-10-13 12:26:01 -07001529 nfds_t nsockfds = 0;
Ken Payson16b39d62016-12-06 12:18:28 -08001530 poll_result *result = NULL;
Ken Payson82e4ec72016-10-13 12:26:01 -07001531 gpr_mu_lock(&g_cvfds.mu);
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001532 pollcv = (cv_node *)gpr_malloc(sizeof(cv_node));
Ken Payson16b39d62016-12-06 12:18:28 -08001533 pollcv->next = NULL;
1534 gpr_cv pollcv_cv;
1535 gpr_cv_init(&pollcv_cv);
1536 pollcv->cv = &pollcv_cv;
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001537 cv_node *fd_cvs = (cv_node *)gpr_malloc(nfds * sizeof(cv_node));
Ken Payson16b39d62016-12-06 12:18:28 -08001538
Ken Payson82e4ec72016-10-13 12:26:01 -07001539 for (i = 0; i < nfds; i++) {
1540 fds[i].revents = 0;
1541 if (fds[i].fd < 0 && (fds[i].events & POLLIN)) {
Vijay Pai4a7fca52017-09-12 14:31:11 -07001542 idx = GRPC_FD_TO_IDX(fds[i].fd);
Ken Payson16b39d62016-12-06 12:18:28 -08001543 fd_cvs[i].cv = &pollcv_cv;
1544 fd_cvs[i].prev = NULL;
1545 fd_cvs[i].next = g_cvfds.cvfds[idx].cvs;
1546 if (g_cvfds.cvfds[idx].cvs) {
1547 g_cvfds.cvfds[idx].cvs->prev = &(fd_cvs[i]);
1548 }
1549 g_cvfds.cvfds[idx].cvs = &(fd_cvs[i]);
Ken Payson42909c52016-11-06 20:06:12 -08001550 // Don't bother polling if a wakeup fd is ready
Ken Payson82e4ec72016-10-13 12:26:01 -07001551 if (g_cvfds.cvfds[idx].is_set) {
yang-gd255a722016-11-23 13:10:44 -08001552 skip_poll = 1;
Ken Payson82e4ec72016-10-13 12:26:01 -07001553 }
1554 } else if (fds[i].fd >= 0) {
1555 nsockfds++;
1556 }
1557 }
1558
Ken Payson16b39d62016-12-06 12:18:28 -08001559 gpr_timespec deadline = gpr_now(GPR_CLOCK_REALTIME);
1560 if (timeout < 0) {
1561 deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
1562 } else {
1563 deadline =
1564 gpr_time_add(deadline, gpr_time_from_millis(timeout, GPR_TIMESPAN));
1565 }
1566
Ken Payson42909c52016-11-06 20:06:12 -08001567 res = 0;
1568 if (!skip_poll && nsockfds > 0) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001569 struct pollfd *pollfds =
1570 (struct pollfd *)gpr_malloc(sizeof(struct pollfd) * nsockfds);
Ken Payson82e4ec72016-10-13 12:26:01 -07001571 idx = 0;
1572 for (i = 0; i < nfds; i++) {
1573 if (fds[i].fd >= 0) {
Ken Payson16b39d62016-12-06 12:18:28 -08001574 pollfds[idx].fd = fds[i].fd;
1575 pollfds[idx].events = fds[i].events;
1576 pollfds[idx].revents = 0;
Ken Payson82e4ec72016-10-13 12:26:01 -07001577 idx++;
1578 }
1579 }
Ken Payson16b39d62016-12-06 12:18:28 -08001580 poll_args *pargs = get_poller_locked(pollfds, nsockfds);
1581 result = pargs->result;
1582 pollcv->next = result->watchers;
1583 pollcv->prev = NULL;
1584 if (result->watchers) {
1585 result->watchers->prev = pollcv;
Ken Payson82e4ec72016-10-13 12:26:01 -07001586 }
Ken Payson16b39d62016-12-06 12:18:28 -08001587 result->watchers = pollcv;
1588 result->watchcount++;
1589 gpr_ref(&result->refcount);
1590
1591 pargs->trigger_set = 1;
1592 gpr_cv_signal(&pargs->trigger);
1593 gpr_cv_wait(&pollcv_cv, &g_cvfds.mu, deadline);
1594 res = result->retval;
1595 errno = result->err;
1596 result->watchcount--;
1597 remove_cvn(&result->watchers, pollcv);
Ken Payson42909c52016-11-06 20:06:12 -08001598 } else if (!skip_poll) {
Ken Payson16b39d62016-12-06 12:18:28 -08001599 gpr_cv_wait(&pollcv_cv, &g_cvfds.mu, deadline);
Ken Payson82e4ec72016-10-13 12:26:01 -07001600 }
1601
1602 idx = 0;
1603 for (i = 0; i < nfds; i++) {
1604 if (fds[i].fd < 0 && (fds[i].events & POLLIN)) {
Vijay Pai4a7fca52017-09-12 14:31:11 -07001605 remove_cvn(&g_cvfds.cvfds[GRPC_FD_TO_IDX(fds[i].fd)].cvs, &(fd_cvs[i]));
1606 if (g_cvfds.cvfds[GRPC_FD_TO_IDX(fds[i].fd)].is_set) {
Ken Payson82e4ec72016-10-13 12:26:01 -07001607 fds[i].revents = POLLIN;
1608 if (res >= 0) res++;
1609 }
Ken Payson16b39d62016-12-06 12:18:28 -08001610 } else if (!skip_poll && fds[i].fd >= 0 && result->completed) {
1611 fds[i].revents = result->fds[idx].revents;
Ken Payson82e4ec72016-10-13 12:26:01 -07001612 idx++;
1613 }
1614 }
1615
Ken Payson16b39d62016-12-06 12:18:28 -08001616 gpr_free(fd_cvs);
1617 gpr_free(pollcv);
1618 if (result) {
1619 decref_poll_result(result);
Ken Payson82e4ec72016-10-13 12:26:01 -07001620 }
Ken Payson16b39d62016-12-06 12:18:28 -08001621
Ken Payson82e4ec72016-10-13 12:26:01 -07001622 gpr_mu_unlock(&g_cvfds.mu);
1623
1624 return res;
1625}
1626
1627static void global_cv_fd_table_init() {
1628 gpr_mu_init(&g_cvfds.mu);
1629 gpr_mu_lock(&g_cvfds.mu);
Ken Payson16b39d62016-12-06 12:18:28 -08001630 gpr_cv_init(&g_cvfds.shutdown_cv);
1631 gpr_ref_init(&g_cvfds.pollcount, 1);
Ken Payson82e4ec72016-10-13 12:26:01 -07001632 g_cvfds.size = CV_DEFAULT_TABLE_SIZE;
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001633 g_cvfds.cvfds =
1634 (fd_node *)gpr_malloc(sizeof(fd_node) * CV_DEFAULT_TABLE_SIZE);
Ken Payson82e4ec72016-10-13 12:26:01 -07001635 g_cvfds.free_fds = NULL;
Ken Payson16b39d62016-12-06 12:18:28 -08001636 thread_grace = gpr_time_from_millis(POLLCV_THREAD_GRACE_MS, GPR_TIMESPAN);
Ken Payson82e4ec72016-10-13 12:26:01 -07001637 for (int i = 0; i < CV_DEFAULT_TABLE_SIZE; i++) {
1638 g_cvfds.cvfds[i].is_set = 0;
1639 g_cvfds.cvfds[i].cvs = NULL;
1640 g_cvfds.cvfds[i].next_free = g_cvfds.free_fds;
1641 g_cvfds.free_fds = &g_cvfds.cvfds[i];
1642 }
1643 // Override the poll function with one that supports cvfds
1644 g_cvfds.poll = grpc_poll_function;
1645 grpc_poll_function = &cvfd_poll;
Ken Payson16b39d62016-12-06 12:18:28 -08001646
1647 // Initialize the cache
1648 poll_cache.size = 32;
1649 poll_cache.count = 0;
1650 poll_cache.free_pollers = NULL;
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001651 poll_cache.active_pollers = (poll_args **)gpr_malloc(sizeof(void *) * 32);
Ken Payson16b39d62016-12-06 12:18:28 -08001652 for (unsigned int i = 0; i < poll_cache.size; i++) {
1653 poll_cache.active_pollers[i] = NULL;
1654 }
1655
Ken Payson82e4ec72016-10-13 12:26:01 -07001656 gpr_mu_unlock(&g_cvfds.mu);
1657}
1658
1659static void global_cv_fd_table_shutdown() {
1660 gpr_mu_lock(&g_cvfds.mu);
Ken Payson82e4ec72016-10-13 12:26:01 -07001661 // Attempt to wait for all abandoned poll() threads to terminate
1662 // Not doing so will result in reported memory leaks
Ken Payson16b39d62016-12-06 12:18:28 -08001663 if (!gpr_unref(&g_cvfds.pollcount)) {
1664 int res = gpr_cv_wait(&g_cvfds.shutdown_cv, &g_cvfds.mu,
Ken Payson82e4ec72016-10-13 12:26:01 -07001665 gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
1666 gpr_time_from_seconds(3, GPR_TIMESPAN)));
1667 GPR_ASSERT(res == 0);
1668 }
Ken Payson16b39d62016-12-06 12:18:28 -08001669 gpr_cv_destroy(&g_cvfds.shutdown_cv);
Ken Payson82e4ec72016-10-13 12:26:01 -07001670 grpc_poll_function = g_cvfds.poll;
1671 gpr_free(g_cvfds.cvfds);
Ken Payson16b39d62016-12-06 12:18:28 -08001672
1673 gpr_free(poll_cache.active_pollers);
1674
Ken Payson82e4ec72016-10-13 12:26:01 -07001675 gpr_mu_unlock(&g_cvfds.mu);
1676 gpr_mu_destroy(&g_cvfds.mu);
1677}
1678
1679/*******************************************************************************
Craig Tiller253bd502016-02-25 12:30:23 -08001680 * event engine binding
1681 */
1682
Ken Payson82e4ec72016-10-13 12:26:01 -07001683static void shutdown_engine(void) {
1684 pollset_global_shutdown();
1685 if (grpc_cv_wakeup_fds_enabled()) {
1686 global_cv_fd_table_shutdown();
1687 }
1688}
Craig Tiller253bd502016-02-25 12:30:23 -08001689
1690static const grpc_event_engine_vtable vtable = {
Yash Tibrewal533d1182017-09-18 10:48:22 -07001691 sizeof(grpc_pollset),
Craig Tiller253bd502016-02-25 12:30:23 -08001692
Yash Tibrewal533d1182017-09-18 10:48:22 -07001693 fd_create,
1694 fd_wrapped_fd,
1695 fd_orphan,
1696 fd_shutdown,
1697 fd_notify_on_read,
1698 fd_notify_on_write,
1699 fd_is_shutdown,
1700 fd_get_read_notifier_pollset,
Craig Tiller253bd502016-02-25 12:30:23 -08001701
Yash Tibrewal533d1182017-09-18 10:48:22 -07001702 pollset_init,
1703 pollset_shutdown,
1704 pollset_destroy,
1705 pollset_work,
1706 pollset_kick,
1707 pollset_add_fd,
Craig Tiller253bd502016-02-25 12:30:23 -08001708
Yash Tibrewal533d1182017-09-18 10:48:22 -07001709 pollset_set_create,
1710 pollset_set_destroy,
1711 pollset_set_add_pollset,
1712 pollset_set_del_pollset,
1713 pollset_set_add_pollset_set,
1714 pollset_set_del_pollset_set,
1715 pollset_set_add_fd,
1716 pollset_set_del_fd,
Craig Tiller253bd502016-02-25 12:30:23 -08001717
Yash Tibrewal533d1182017-09-18 10:48:22 -07001718 shutdown_engine,
Craig Tiller253bd502016-02-25 12:30:23 -08001719};
1720
Craig Tiller6f0af492017-04-27 19:26:16 +00001721const grpc_event_engine_vtable *grpc_init_poll_posix(bool explicit_request) {
Ken Paysoncd7d0472016-10-11 12:24:20 -07001722 if (!grpc_has_wakeup_fd()) {
Ken Paysonbc544be2016-10-06 19:23:47 -07001723 return NULL;
1724 }
Craig Tiller5e3a0ef2016-06-01 10:28:15 -07001725 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1726 return NULL;
1727 }
Craig Tiller253bd502016-02-25 12:30:23 -08001728 return &vtable;
1729}
1730
Craig Tiller6f0af492017-04-27 19:26:16 +00001731const grpc_event_engine_vtable *grpc_init_poll_cv_posix(bool explicit_request) {
Ken Payson82e4ec72016-10-13 12:26:01 -07001732 global_cv_fd_table_init();
1733 grpc_enable_cv_wakeup_fds(1);
1734 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1735 global_cv_fd_table_shutdown();
1736 grpc_enable_cv_wakeup_fds(0);
1737 return NULL;
1738 }
1739 return &vtable;
1740}
1741
Craig Tiller253bd502016-02-25 12:30:23 -08001742#endif