blob: 396731758e0111946a25de8b98641e338118b318 [file] [log] [blame]
Craig Tillerdd86b692017-04-06 10:43:11 -07001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2017 gRPC authors.
Craig Tillerdd86b692017-04-06 10:43:11 -07004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
Craig Tillerdd86b692017-04-06 10:43:11 -07008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
Craig Tillerdd86b692017-04-06 10:43:11 -070010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
Craig Tillerdd86b692017-04-06 10:43:11 -070016 *
17 */
18
19#include "src/core/lib/iomgr/port.h"
20
21/* This polling engine is only relevant on linux kernels supporting epoll() */
22#ifdef GRPC_LINUX_EPOLL
23
Craig Tillerc6109852017-05-01 14:26:49 -070024#include "src/core/lib/iomgr/ev_epollex_linux.h"
Craig Tillerdd86b692017-04-06 10:43:11 -070025
26#include <assert.h>
27#include <errno.h>
28#include <poll.h>
29#include <pthread.h>
30#include <string.h>
Craig Tillerdd86b692017-04-06 10:43:11 -070031#include <sys/socket.h>
32#include <unistd.h>
33
34#include <grpc/support/alloc.h>
35#include <grpc/support/log.h>
36#include <grpc/support/string_util.h>
37#include <grpc/support/tls.h>
38#include <grpc/support/useful.h>
39
Craig Tillerb4bb1cd2017-07-20 14:18:17 -070040#include "src/core/lib/debug/stats.h"
Craig Tillerdd86b692017-04-06 10:43:11 -070041#include "src/core/lib/iomgr/ev_posix.h"
42#include "src/core/lib/iomgr/iomgr_internal.h"
Craig Tiller819cd882017-04-25 13:18:22 -070043#include "src/core/lib/iomgr/is_epollexclusive_available.h"
Craig Tillerdd86b692017-04-06 10:43:11 -070044#include "src/core/lib/iomgr/lockfree_event.h"
Craig Tiller819cd882017-04-25 13:18:22 -070045#include "src/core/lib/iomgr/sys_epoll_wrapper.h"
Craig Tillerdd86b692017-04-06 10:43:11 -070046#include "src/core/lib/iomgr/timer.h"
47#include "src/core/lib/iomgr/wakeup_fd_posix.h"
Craig Tillerdd86b692017-04-06 10:43:11 -070048#include "src/core/lib/profiling/timers.h"
49#include "src/core/lib/support/block_annotate.h"
Craig Tiller0b4c9012017-04-06 17:19:37 -070050#include "src/core/lib/support/spinlock.h"
Craig Tillerdd86b692017-04-06 10:43:11 -070051
Craig Tiller90a9d7d2017-10-04 21:24:03 +000052// debug aid: create workers on the heap (allows asan to spot use-after-destruction)
53#define GRPC_EPOLLEX_CREATE_WORKERS_ON_HEAP 1
54
Craig Tiller389ea902017-10-04 17:42:49 +000055#ifndef NDEBUG
56grpc_tracer_flag grpc_trace_pollable_refcount =
57 GRPC_TRACER_INITIALIZER(false, "pollable_refcount");
58#endif
59
Craig Tillerdd86b692017-04-06 10:43:11 -070060/*******************************************************************************
Craig Tiller389ea902017-10-04 17:42:49 +000061 * pollable Declarations
Craig Tiller8fc1ca12017-04-07 13:01:48 -070062 */
63
Craig Tiller249de2b2017-10-02 11:49:19 -070064typedef enum { PO_MULTI, PO_FD, PO_EMPTY } pollable_type;
Craig Tiller8fc1ca12017-04-07 13:01:48 -070065
Craig Tiller249de2b2017-10-02 11:49:19 -070066typedef struct pollable pollable;
Craig Tillerf18286b2017-04-10 14:44:09 -070067
Craig Tiller249de2b2017-10-02 11:49:19 -070068struct pollable {
69 pollable_type type; // immutable
Craig Tillerf18286b2017-04-10 14:44:09 -070070 gpr_refcount refs;
Craig Tillerf18286b2017-04-10 14:44:09 -070071
Craig Tillerd1d7fdd2017-04-14 16:16:24 -070072 int epfd;
73 grpc_wakeup_fd wakeup;
Craig Tillerd1d7fdd2017-04-14 16:16:24 -070074
Craig Tiller249de2b2017-10-02 11:49:19 -070075 // only for type fd... one ref to the owner fd
76 grpc_fd *owner_fd;
77
78 grpc_pollset_set *pollset_set;
79 pollable *next;
80 pollable *prev;
81
82 gpr_mu mu;
83 grpc_pollset_worker *root_worker;
84};
85
86static const char *pollable_type_string(pollable_type t) {
Craig Tiller86ffd652017-06-20 00:16:54 +000087 switch (t) {
Craig Tiller249de2b2017-10-02 11:49:19 -070088 case PO_MULTI:
Craig Tiller7fd8f672017-07-05 15:10:12 -070089 return "pollset";
90 case PO_FD:
91 return "fd";
Craig Tiller249de2b2017-10-02 11:49:19 -070092 case PO_EMPTY:
93 return "empty";
Craig Tiller86ffd652017-06-20 00:16:54 +000094 }
95 return "<invalid>";
96}
97
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -070098static char *pollable_desc(pollable *p) {
Craig Tiller86ffd652017-06-20 00:16:54 +000099 char *out;
Craig Tiller249de2b2017-10-02 11:49:19 -0700100 gpr_asprintf(&out, "type=%s epfd=%d wakeup=%d", pollable_type_string(p->type),
101 p->epfd, p->wakeup.read_fd);
Craig Tiller86ffd652017-06-20 00:16:54 +0000102 return out;
103}
104
Craig Tiller249de2b2017-10-02 11:49:19 -0700105static pollable *g_empty_pollable;
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700106
Craig Tiller249de2b2017-10-02 11:49:19 -0700107static grpc_error *pollable_create(pollable_type type, pollable **p);
Craig Tiller389ea902017-10-04 17:42:49 +0000108#ifdef NDEBUG
Craig Tiller249de2b2017-10-02 11:49:19 -0700109static pollable *pollable_ref(pollable *p);
110static void pollable_unref(pollable *p);
Craig Tiller389ea902017-10-04 17:42:49 +0000111#define POLLABLE_REF(p, r) pollable_ref(p)
112#define POLLABLE_UNREF(p, r) pollable_unref(p)
113#else
114static pollable *pollable_ref(pollable *p, int line, const char *reason);
115static void pollable_unref(pollable *p, int line, const char *reason);
116#define POLLABLE_REF(p, r) pollable_ref((p), __LINE__, (r))
117#define POLLABLE_UNREF(p, r) pollable_unref((p), __LINE__, (r))
118#endif
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700119
120/*******************************************************************************
Craig Tillerdd86b692017-04-06 10:43:11 -0700121 * Fd Declarations
122 */
123
Craig Tillerdd86b692017-04-06 10:43:11 -0700124struct grpc_fd {
Craig Tillerdd86b692017-04-06 10:43:11 -0700125 int fd;
126 /* refst format:
127 bit 0 : 1=Active / 0=Orphaned
128 bits 1-n : refcount
129 Ref/Unref by two to avoid altering the orphaned bit */
130 gpr_atm refst;
131
Craig Tiller249de2b2017-10-02 11:49:19 -0700132 gpr_mu pollable_mu;
133 pollable *pollable_obj;
Craig Tillerdd86b692017-04-06 10:43:11 -0700134
135 gpr_atm read_closure;
136 gpr_atm write_closure;
137
138 struct grpc_fd *freelist_next;
139 grpc_closure *on_done_closure;
140
141 /* The pollset that last noticed that the fd is readable. The actual type
142 * stored in this is (grpc_pollset *) */
143 gpr_atm read_notifier_pollset;
144
145 grpc_iomgr_object iomgr_object;
146};
147
Craig Tillerdd86b692017-04-06 10:43:11 -0700148static void fd_global_init(void);
149static void fd_global_shutdown(void);
150
Craig Tillerdd86b692017-04-06 10:43:11 -0700151/*******************************************************************************
152 * Pollset Declarations
153 */
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700154
Craig Tiller23adbd52017-10-02 15:29:18 -0700155typedef struct {
156 grpc_pollset_worker *next;
157 grpc_pollset_worker *prev;
158} pwlink;
159
160typedef enum { PWLINK_POLLABLE = 0, PWLINK_POLLSET, PWLINK_COUNT } pwlinks;
161
Craig Tillerdd86b692017-04-06 10:43:11 -0700162struct grpc_pollset_worker {
Craig Tillere24b24d2017-04-06 16:05:45 -0700163 bool kicked;
164 bool initialized_cv;
165 gpr_cv cv;
Craig Tiller9f012512017-04-13 15:37:14 -0700166 grpc_pollset *pollset;
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700167 pollable *pollable_obj;
Craig Tiller249de2b2017-10-02 11:49:19 -0700168
Craig Tiller23adbd52017-10-02 15:29:18 -0700169 pwlink links[PWLINK_COUNT];
Craig Tillerdd86b692017-04-06 10:43:11 -0700170};
171
Craig Tiller4f07ea82017-06-08 23:56:06 +0000172#define MAX_EPOLL_EVENTS 100
Craig Tillerca4fc662017-06-09 11:13:56 -0700173#define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 5
Craig Tiller4f07ea82017-06-08 23:56:06 +0000174
Craig Tillere24b24d2017-04-06 16:05:45 -0700175struct grpc_pollset {
Craig Tiller249de2b2017-10-02 11:49:19 -0700176 gpr_mu mu;
177 pollable *active_pollable;
Craig Tillerbb93af62017-04-07 23:49:00 +0000178 bool kicked_without_poller;
Craig Tillere24b24d2017-04-06 16:05:45 -0700179 grpc_closure *shutdown_closure;
Craig Tiller23adbd52017-10-02 15:29:18 -0700180 grpc_pollset_worker *root_worker;
Craig Tiller4f07ea82017-06-08 23:56:06 +0000181
182 int event_cursor;
183 int event_count;
184 struct epoll_event events[MAX_EPOLL_EVENTS];
Craig Tillere24b24d2017-04-06 16:05:45 -0700185};
Craig Tillerdd86b692017-04-06 10:43:11 -0700186
187/*******************************************************************************
188 * Pollset-set Declarations
189 */
Craig Tiller23adbd52017-10-02 15:29:18 -0700190
Craig Tiller8fc1ca12017-04-07 13:01:48 -0700191struct grpc_pollset_set {
Craig Tiller249de2b2017-10-02 11:49:19 -0700192 gpr_refcount refs;
193 gpr_mu mu;
194 grpc_pollset_set *parent;
Craig Tiller23adbd52017-10-02 15:29:18 -0700195
196 size_t pollset_count;
197 size_t pollset_capacity;
198 pollable **pollsets;
199
200 size_t fd_count;
201 size_t fd_capacity;
202 grpc_fd **fds;
Craig Tiller8fc1ca12017-04-07 13:01:48 -0700203};
Craig Tillerdd86b692017-04-06 10:43:11 -0700204
205/*******************************************************************************
206 * Common helpers
207 */
208
Craig Tillere24b24d2017-04-06 16:05:45 -0700209static bool append_error(grpc_error **composite, grpc_error *error,
210 const char *desc) {
211 if (error == GRPC_ERROR_NONE) return true;
212 if (*composite == GRPC_ERROR_NONE) {
213 *composite = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc);
214 }
215 *composite = grpc_error_add_child(*composite, error);
216 return false;
217}
218
Craig Tillerdd86b692017-04-06 10:43:11 -0700219/*******************************************************************************
220 * Fd Definitions
221 */
222
223/* We need to keep a freelist not because of any concerns of malloc performance
224 * but instead so that implementations with multiple threads in (for example)
225 * epoll_wait deal with the race between pollset removal and incoming poll
226 * notifications.
227 *
228 * The problem is that the poller ultimately holds a reference to this
229 * object, so it is very difficult to know when is safe to free it, at least
230 * without some expensive synchronization.
231 *
232 * If we keep the object freelisted, in the worst case losing this race just
233 * becomes a spurious read notification on a reused fd.
234 */
235
236/* The alarm system needs to be able to wakeup 'some poller' sometimes
237 * (specifically when a new alarm needs to be triggered earlier than the next
238 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
239 * case occurs. */
240
241static grpc_fd *fd_freelist = NULL;
242static gpr_mu fd_freelist_mu;
243
Noah Eisen264879f2017-06-20 17:14:47 -0700244#ifndef NDEBUG
Craig Tillerdd86b692017-04-06 10:43:11 -0700245#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
Craig Tillerf8401102017-04-17 09:47:28 -0700246#define UNREF_BY(ec, fd, n, reason) \
247 unref_by(ec, fd, n, reason, __FILE__, __LINE__)
Craig Tillerdd86b692017-04-06 10:43:11 -0700248static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
249 int line) {
Noah Eisen264879f2017-06-20 17:14:47 -0700250 if (GRPC_TRACER_ON(grpc_trace_fd_refcount)) {
251 gpr_log(GPR_DEBUG,
252 "FD %d %p ref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]",
253 fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst),
254 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
255 }
Craig Tillerdd86b692017-04-06 10:43:11 -0700256#else
257#define REF_BY(fd, n, reason) ref_by(fd, n)
Craig Tiller50480b22017-04-17 16:34:52 +0000258#define UNREF_BY(ec, fd, n, reason) unref_by(ec, fd, n)
Craig Tillerdd86b692017-04-06 10:43:11 -0700259static void ref_by(grpc_fd *fd, int n) {
260#endif
261 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
262}
263
Craig Tiller50480b22017-04-17 16:34:52 +0000264static void fd_destroy(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700265 grpc_fd *fd = (grpc_fd *)arg;
Craig Tillerf8401102017-04-17 09:47:28 -0700266 /* Add the fd to the freelist */
267 grpc_iomgr_unregister_object(&fd->iomgr_object);
Craig Tiller389ea902017-10-04 17:42:49 +0000268 POLLABLE_UNREF(fd->pollable_obj, "fd_pollable");
Craig Tiller249de2b2017-10-02 11:49:19 -0700269 gpr_mu_destroy(&fd->pollable_mu);
Craig Tillerf8401102017-04-17 09:47:28 -0700270 gpr_mu_lock(&fd_freelist_mu);
271 fd->freelist_next = fd_freelist;
272 fd_freelist = fd;
Craig Tillerdd86b692017-04-06 10:43:11 -0700273
Craig Tillerf8401102017-04-17 09:47:28 -0700274 grpc_lfev_destroy(&fd->read_closure);
275 grpc_lfev_destroy(&fd->write_closure);
Craig Tillerdd86b692017-04-06 10:43:11 -0700276
Craig Tillerf8401102017-04-17 09:47:28 -0700277 gpr_mu_unlock(&fd_freelist_mu);
Craig Tiller50480b22017-04-17 16:34:52 +0000278}
279
Noah Eisen264879f2017-06-20 17:14:47 -0700280#ifndef NDEBUG
Craig Tillerf8401102017-04-17 09:47:28 -0700281static void unref_by(grpc_exec_ctx *exec_ctx, grpc_fd *fd, int n,
282 const char *reason, const char *file, int line) {
Noah Eisen264879f2017-06-20 17:14:47 -0700283 if (GRPC_TRACER_ON(grpc_trace_fd_refcount)) {
284 gpr_log(GPR_DEBUG,
285 "FD %d %p unref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]",
286 fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst),
287 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
288 }
Craig Tiller50480b22017-04-17 16:34:52 +0000289#else
290static void unref_by(grpc_exec_ctx *exec_ctx, grpc_fd *fd, int n) {
Craig Tiller50480b22017-04-17 16:34:52 +0000291#endif
Noah Eisen264879f2017-06-20 17:14:47 -0700292 gpr_atm old = gpr_atm_full_fetch_add(&fd->refst, -n);
Craig Tiller50480b22017-04-17 16:34:52 +0000293 if (old == n) {
ncteisen274bbbe2017-06-08 14:57:11 -0700294 GRPC_CLOSURE_SCHED(exec_ctx, GRPC_CLOSURE_CREATE(fd_destroy, fd,
Craig Tillerf8401102017-04-17 09:47:28 -0700295 grpc_schedule_on_exec_ctx),
296 GRPC_ERROR_NONE);
Craig Tillerdd86b692017-04-06 10:43:11 -0700297 } else {
298 GPR_ASSERT(old > n);
299 }
300}
301
302static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
303
304static void fd_global_shutdown(void) {
305 gpr_mu_lock(&fd_freelist_mu);
306 gpr_mu_unlock(&fd_freelist_mu);
307 while (fd_freelist != NULL) {
308 grpc_fd *fd = fd_freelist;
309 fd_freelist = fd_freelist->freelist_next;
Craig Tillerdd86b692017-04-06 10:43:11 -0700310 gpr_free(fd);
311 }
312 gpr_mu_destroy(&fd_freelist_mu);
313}
314
315static grpc_fd *fd_create(int fd, const char *name) {
316 grpc_fd *new_fd = NULL;
317
318 gpr_mu_lock(&fd_freelist_mu);
319 if (fd_freelist != NULL) {
320 new_fd = fd_freelist;
321 fd_freelist = fd_freelist->freelist_next;
322 }
323 gpr_mu_unlock(&fd_freelist_mu);
324
325 if (new_fd == NULL) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700326 new_fd = (grpc_fd *)gpr_malloc(sizeof(grpc_fd));
Craig Tillerdd86b692017-04-06 10:43:11 -0700327 }
328
Craig Tiller249de2b2017-10-02 11:49:19 -0700329 gpr_mu_init(&new_fd->pollable_mu);
330 new_fd->pollable_obj = NULL;
Craig Tillerdd86b692017-04-06 10:43:11 -0700331 gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
332 new_fd->fd = fd;
Craig Tillerdd86b692017-04-06 10:43:11 -0700333 grpc_lfev_init(&new_fd->read_closure);
334 grpc_lfev_init(&new_fd->write_closure);
335 gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
336
337 new_fd->freelist_next = NULL;
338 new_fd->on_done_closure = NULL;
339
Craig Tillerdd86b692017-04-06 10:43:11 -0700340 char *fd_name;
341 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
342 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
Noah Eisen264879f2017-06-20 17:14:47 -0700343#ifndef NDEBUG
344 if (GRPC_TRACER_ON(grpc_trace_fd_refcount)) {
345 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, new_fd, fd_name);
346 }
Craig Tillerdd86b692017-04-06 10:43:11 -0700347#endif
348 gpr_free(fd_name);
349 return new_fd;
350}
351
352static int fd_wrapped_fd(grpc_fd *fd) {
Craig Tiller249de2b2017-10-02 11:49:19 -0700353 int ret_fd = fd->fd;
354 return (gpr_atm_acq_load(&fd->refst) & 1) ? ret_fd : -1;
Craig Tillerdd86b692017-04-06 10:43:11 -0700355}
356
357static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
358 grpc_closure *on_done, int *release_fd,
Yuchen Zengd40a7ae2017-07-12 15:59:56 -0700359 bool already_closed, const char *reason) {
360 bool is_fd_closed = already_closed;
Craig Tillerdd86b692017-04-06 10:43:11 -0700361
Craig Tillerdd86b692017-04-06 10:43:11 -0700362 fd->on_done_closure = on_done;
363
364 /* If release_fd is not NULL, we should be relinquishing control of the file
365 descriptor fd->fd (but we still own the grpc_fd structure). */
366 if (release_fd != NULL) {
367 *release_fd = fd->fd;
Yuchen Zengd40a7ae2017-07-12 15:59:56 -0700368 } else if (!is_fd_closed) {
Craig Tillerdd86b692017-04-06 10:43:11 -0700369 close(fd->fd);
370 is_fd_closed = true;
371 }
372
Craig Tillerf4360d72017-04-07 08:51:00 -0700373 if (!is_fd_closed) {
374 gpr_log(GPR_DEBUG, "TODO: handle fd removal?");
375 }
376
Craig Tillerdd86b692017-04-06 10:43:11 -0700377 /* Remove the active status but keep referenced. We want this grpc_fd struct
378 to be alive (and not added to freelist) until the end of this function */
379 REF_BY(fd, 1, reason);
380
Craig Tiller249de2b2017-10-02 11:49:19 -0700381 GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE);
Craig Tillerdd86b692017-04-06 10:43:11 -0700382
Craig Tiller50480b22017-04-17 16:34:52 +0000383 UNREF_BY(exec_ctx, fd, 2, reason); /* Drop the reference */
Craig Tillerdd86b692017-04-06 10:43:11 -0700384}
385
386static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
387 grpc_fd *fd) {
388 gpr_atm notifier = gpr_atm_acq_load(&fd->read_notifier_pollset);
389 return (grpc_pollset *)notifier;
390}
391
392static bool fd_is_shutdown(grpc_fd *fd) {
393 return grpc_lfev_is_shutdown(&fd->read_closure);
394}
395
396/* Might be called multiple times */
397static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
398 if (grpc_lfev_set_shutdown(exec_ctx, &fd->read_closure,
399 GRPC_ERROR_REF(why))) {
400 shutdown(fd->fd, SHUT_RDWR);
401 grpc_lfev_set_shutdown(exec_ctx, &fd->write_closure, GRPC_ERROR_REF(why));
402 }
403 GRPC_ERROR_UNREF(why);
404}
405
406static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
407 grpc_closure *closure) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700408 grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read");
Craig Tillerdd86b692017-04-06 10:43:11 -0700409}
410
411static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
412 grpc_closure *closure) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700413 grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write");
Craig Tillerdd86b692017-04-06 10:43:11 -0700414}
415
Craig Tillerdd86b692017-04-06 10:43:11 -0700416/*******************************************************************************
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700417 * Pollable Definitions
418 */
419
Craig Tiller249de2b2017-10-02 11:49:19 -0700420static grpc_error *pollable_create(pollable_type type, pollable **p) {
421 *p = NULL;
422
423 int epfd = epoll_create1(EPOLL_CLOEXEC);
424 if (epfd == -1) {
425 return GRPC_OS_ERROR(errno, "epoll_create1");
426 }
Craig Tiller90a9d7d2017-10-04 21:24:03 +0000427 *p = gpr_malloc(sizeof(**p));
428 grpc_error *err = grpc_wakeup_fd_init(&(*p)->wakeup);
Craig Tiller249de2b2017-10-02 11:49:19 -0700429 if (err != GRPC_ERROR_NONE) {
430 close(epfd);
Craig Tiller90a9d7d2017-10-04 21:24:03 +0000431 gpr_free(*p);
432 *p = NULL;
Craig Tiller249de2b2017-10-02 11:49:19 -0700433 return err;
434 }
435 struct epoll_event ev;
436 ev.events = (uint32_t)(EPOLLIN | EPOLLET);
Craig Tiller90a9d7d2017-10-04 21:24:03 +0000437 ev.data.ptr = (void*)(1 | (intptr_t)&(*p)->wakeup);
438 if (epoll_ctl(epfd, EPOLL_CTL_ADD, (*p)->wakeup.read_fd, &ev) != 0) {
Craig Tiller249de2b2017-10-02 11:49:19 -0700439 err = GRPC_OS_ERROR(errno, "epoll_ctl");
440 close(epfd);
Craig Tiller90a9d7d2017-10-04 21:24:03 +0000441 grpc_wakeup_fd_destroy(&(*p)->wakeup);
442gpr_free(*p);
443*p = NULL;
Craig Tiller249de2b2017-10-02 11:49:19 -0700444 return err;
445 }
446
Craig Tiller249de2b2017-10-02 11:49:19 -0700447 (*p)->type = type;
448 gpr_ref_init(&(*p)->refs, 1);
Craig Tiller65da0ef2017-10-02 15:35:59 -0700449 gpr_mu_init(&(*p)->mu);
Craig Tiller249de2b2017-10-02 11:49:19 -0700450 (*p)->epfd = epfd;
Craig Tiller249de2b2017-10-02 11:49:19 -0700451 (*p)->owner_fd = NULL;
452 (*p)->pollset_set = NULL;
453 (*p)->next = (*p)->prev = *p;
454 (*p)->root_worker = NULL;
455 return GRPC_ERROR_NONE;
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700456}
457
Craig Tiller389ea902017-10-04 17:42:49 +0000458#ifdef NDEBUG
Craig Tiller249de2b2017-10-02 11:49:19 -0700459static pollable *pollable_ref(pollable *p) {
Craig Tiller389ea902017-10-04 17:42:49 +0000460#else
461static pollable *pollable_ref(pollable *p, int line, const char *reason) {
462 if (GRPC_TRACER_ON(grpc_trace_pollable_refcount)) {
463 int r = (int)gpr_atm_no_barrier_load(&p->refs.count);
464 gpr_log(__FILE__, line, GPR_LOG_SEVERITY_DEBUG, "POLLABLE:%p ref %d->%d %s", p, r, r+1, reason);
465 }
466#endif
Craig Tiller249de2b2017-10-02 11:49:19 -0700467 gpr_ref(&p->refs);
468 return p;
469}
470
Craig Tiller389ea902017-10-04 17:42:49 +0000471#ifdef NDEBUG
Craig Tiller249de2b2017-10-02 11:49:19 -0700472static void pollable_unref(pollable *p) {
Craig Tiller389ea902017-10-04 17:42:49 +0000473#else
474static void pollable_unref(pollable *p, int line, const char *reason) {
475 if (p == NULL) return;
476 if (GRPC_TRACER_ON(grpc_trace_pollable_refcount)) {
477 int r = (int)gpr_atm_no_barrier_load(&p->refs.count);
478 gpr_log(__FILE__, line, GPR_LOG_SEVERITY_DEBUG, "POLLABLE:%p unref %d->%d %s", p, r, r-1, reason);
479 }
480#endif
Craig Tiller249de2b2017-10-02 11:49:19 -0700481 if (p != NULL && gpr_unref(&p->refs)) {
Craig Tiller79d24fb2017-04-17 19:35:19 +0000482 close(p->epfd);
483 grpc_wakeup_fd_destroy(&p->wakeup);
Craig Tiller389ea902017-10-04 17:42:49 +0000484 gpr_free(p);
Craig Tiller79d24fb2017-04-17 19:35:19 +0000485 }
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700486}
487
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700488static grpc_error *pollable_add_fd(pollable *p, grpc_fd *fd) {
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700489 grpc_error *error = GRPC_ERROR_NONE;
490 static const char *err_desc = "pollable_add_fd";
491 const int epfd = p->epfd;
492
Craig Tiller6c8383a2017-05-05 16:54:42 +0000493 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller86ffd652017-06-20 00:16:54 +0000494 gpr_log(GPR_DEBUG, "add fd %p (%d) to pollable %p", fd, fd->fd, p);
Craig Tiller6c8383a2017-05-05 16:54:42 +0000495 }
496
Yash Tibrewal533d1182017-09-18 10:48:22 -0700497 struct epoll_event ev_fd;
498 ev_fd.events = (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT | EPOLLEXCLUSIVE);
499 ev_fd.data.ptr = fd;
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700500 if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd->fd, &ev_fd) != 0) {
501 switch (errno) {
Craig Tiller61f96c12017-05-12 13:36:39 -0700502 case EEXIST:
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700503 break;
504 default:
505 append_error(&error, GRPC_OS_ERROR(errno, "epoll_ctl"), err_desc);
506 }
507 }
508
509 return error;
510}
511
512/*******************************************************************************
Craig Tillerdd86b692017-04-06 10:43:11 -0700513 * Pollset Definitions
514 */
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700515
Craig Tillerdd86b692017-04-06 10:43:11 -0700516GPR_TLS_DECL(g_current_thread_pollset);
517GPR_TLS_DECL(g_current_thread_worker);
518
Craig Tillerdd86b692017-04-06 10:43:11 -0700519/* Global state management */
520static grpc_error *pollset_global_init(void) {
521 gpr_tls_init(&g_current_thread_pollset);
522 gpr_tls_init(&g_current_thread_worker);
Craig Tiller249de2b2017-10-02 11:49:19 -0700523 return pollable_create(PO_EMPTY, &g_empty_pollable);
Craig Tillerdd86b692017-04-06 10:43:11 -0700524}
525
526static void pollset_global_shutdown(void) {
Craig Tiller389ea902017-10-04 17:42:49 +0000527 POLLABLE_UNREF(g_empty_pollable, "g_empty_pollable");
Craig Tillerdd86b692017-04-06 10:43:11 -0700528 gpr_tls_destroy(&g_current_thread_pollset);
529 gpr_tls_destroy(&g_current_thread_worker);
530}
531
Craig Tillere16722c2017-05-17 21:50:37 +0000532static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx,
533 grpc_pollset *pollset) {
Craig Tiller23adbd52017-10-02 15:29:18 -0700534 if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL) {
Craig Tillerca669b02017-06-09 12:41:51 -0700535 GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE);
Craig Tillere16722c2017-05-17 21:50:37 +0000536 pollset->shutdown_closure = NULL;
537 }
538}
539
Craig Tiller23adbd52017-10-02 15:29:18 -0700540/* both pollset->active_pollable->mu, pollset->mu must be held before calling
541 * this function */
542static grpc_error *pollset_kick_one(grpc_exec_ctx *exec_ctx,
543 grpc_pollset *pollset,
544 grpc_pollset_worker *specific_worker) {
545 pollable *p = pollset->active_pollable;
Craig Tiller90a9d7d2017-10-04 21:24:03 +0000546GPR_ASSERT(specific_worker != NULL);
Craig Tiller23adbd52017-10-02 15:29:18 -0700547 if (specific_worker->kicked) {
548 if (GRPC_TRACER_ON(grpc_polling_trace)) {
549 gpr_log(GPR_DEBUG, "PS:%p kicked_specific_but_already_kicked", p);
550 }
551 return GRPC_ERROR_NONE;
552 } else if (gpr_tls_get(&g_current_thread_worker) ==
553 (intptr_t)specific_worker) {
554 if (GRPC_TRACER_ON(grpc_polling_trace)) {
555 gpr_log(GPR_DEBUG, "PS:%p kicked_specific_but_awake", p);
556 }
557 specific_worker->kicked = true;
558 return GRPC_ERROR_NONE;
559 } else if (specific_worker == p->root_worker) {
560 if (GRPC_TRACER_ON(grpc_polling_trace)) {
561 gpr_log(GPR_DEBUG, "PS:%p kicked_specific_via_wakeup_fd", p);
562 }
563 specific_worker->kicked = true;
564 return grpc_wakeup_fd_wakeup(&p->wakeup);
565 } else {
566 if (GRPC_TRACER_ON(grpc_polling_trace)) {
567 gpr_log(GPR_DEBUG, "PS:%p kicked_specific_via_cv", p);
568 }
569 specific_worker->kicked = true;
570 gpr_cv_signal(&specific_worker->cv);
571 return GRPC_ERROR_NONE;
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700572 }
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700573}
574
Craig Tiller23adbd52017-10-02 15:29:18 -0700575/* both pollset->active_pollable->mu, pollset->mu must be held before calling
576 * this function */
577static grpc_error *pollset_kick_inner(grpc_exec_ctx *exec_ctx,
578 grpc_pollset *pollset,
Craig Tillerf8401102017-04-17 09:47:28 -0700579 grpc_pollset_worker *specific_worker) {
Craig Tiller23adbd52017-10-02 15:29:18 -0700580 pollable *p = pollset->active_pollable;
Craig Tillercfaa0462017-05-05 15:27:40 +0000581 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tillercd0354b2017-04-10 16:19:18 -0700582 gpr_log(GPR_DEBUG,
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700583 "PS:%p kick %p tls_pollset=%p tls_worker=%p "
584 "root_worker=(pollset:%p pollable:%p)",
Craig Tillercd0354b2017-04-10 16:19:18 -0700585 p, specific_worker, (void *)gpr_tls_get(&g_current_thread_pollset),
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700586 (void *)gpr_tls_get(&g_current_thread_worker), pollset->root_worker,
Craig Tillercd0354b2017-04-10 16:19:18 -0700587 p->root_worker);
588 }
Craig Tillere24b24d2017-04-06 16:05:45 -0700589 if (specific_worker == NULL) {
Craig Tillerb72a74a2017-04-27 12:07:05 -0700590 if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) {
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700591 if (pollset->root_worker == NULL) {
Craig Tillercfaa0462017-05-05 15:27:40 +0000592 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700593 gpr_log(GPR_DEBUG, "PS:%p kicked_any_without_poller", p);
Craig Tiller1ad94772017-04-11 16:15:19 +0000594 }
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700595 pollset->kicked_without_poller = true;
Craig Tillerbb93af62017-04-07 23:49:00 +0000596 return GRPC_ERROR_NONE;
597 } else {
Craig Tiller23adbd52017-10-02 15:29:18 -0700598 return pollset_kick_one(exec_ctx, pollset, pollset->root_worker);
Craig Tillerbb93af62017-04-07 23:49:00 +0000599 }
Craig Tillere24b24d2017-04-06 16:05:45 -0700600 } else {
Craig Tillercfaa0462017-05-05 15:27:40 +0000601 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700602 gpr_log(GPR_DEBUG, "PS:%p kicked_any_but_awake", p);
Craig Tiller1ad94772017-04-11 16:15:19 +0000603 }
Craig Tillere24b24d2017-04-06 16:05:45 -0700604 return GRPC_ERROR_NONE;
605 }
Craig Tillere24b24d2017-04-06 16:05:45 -0700606 } else {
Craig Tiller23adbd52017-10-02 15:29:18 -0700607 return pollset_kick_one(exec_ctx, pollset, specific_worker);
Craig Tillere24b24d2017-04-06 16:05:45 -0700608 }
Craig Tiller26017de2017-04-15 00:07:25 +0000609}
610
Craig Tiller0ff222a2017-09-01 09:41:43 -0700611static grpc_error *pollset_kick(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
Craig Tiller26017de2017-04-15 00:07:25 +0000612 grpc_pollset_worker *specific_worker) {
Craig Tiller23adbd52017-10-02 15:29:18 -0700613 pollable *p = pollset->active_pollable;
614 gpr_mu_lock(&p->mu);
615 grpc_error *error = pollset_kick_inner(exec_ctx, pollset, specific_worker);
616 gpr_mu_unlock(&p->mu);
Craig Tiller26017de2017-04-15 00:07:25 +0000617 return error;
Craig Tiller23adbd52017-10-02 15:29:18 -0700618}
619
620static grpc_error *pollset_kick_all(grpc_exec_ctx *exec_ctx,
621 grpc_pollset *pollset) {
622 pollable *p = pollset->active_pollable;
623 grpc_error *error = GRPC_ERROR_NONE;
624 const char *err_desc = "pollset_kick_all";
625 gpr_mu_lock(&p->mu);
Craig Tiller90a9d7d2017-10-04 21:24:03 +0000626grpc_pollset_worker *w = pollset->root_worker;
627if (w!=NULL) {
628do {
Craig Tiller23adbd52017-10-02 15:29:18 -0700629 append_error(&error, pollset_kick_one(exec_ctx, pollset, w), err_desc);
Craig Tiller90a9d7d2017-10-04 21:24:03 +0000630 w = w->links[PWLINK_POLLSET].next;
631} while (w != pollset->root_worker);
632}
Craig Tiller23adbd52017-10-02 15:29:18 -0700633 gpr_mu_unlock(&p->mu);
634 return error;
Craig Tillerdd86b692017-04-06 10:43:11 -0700635}
636
Craig Tillere24b24d2017-04-06 16:05:45 -0700637static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
Craig Tiller249de2b2017-10-02 11:49:19 -0700638 gpr_mu_init(&pollset->mu);
Craig Tiller389ea902017-10-04 17:42:49 +0000639 pollset->active_pollable = POLLABLE_REF(g_empty_pollable, "pollset");
Craig Tiller249de2b2017-10-02 11:49:19 -0700640 *mu = &pollset->mu;
Craig Tiller9f012512017-04-13 15:37:14 -0700641}
642
Craig Tillerdd86b692017-04-06 10:43:11 -0700643/* Convert a timespec to milliseconds:
644 - Very small or negative poll times are clamped to zero to do a non-blocking
645 poll (which becomes spin polling)
646 - Other small values are rounded up to one millisecond
647 - Longer than a millisecond polls are rounded up to the next nearest
648 millisecond to avoid spinning
649 - Infinite timeouts are converted to -1 */
650static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
651 gpr_timespec now) {
652 gpr_timespec timeout;
Craig Tillerdd86b692017-04-06 10:43:11 -0700653 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
654 return -1;
655 }
656
Craig Tiller7f5fac92017-04-07 21:47:41 +0000657 if (gpr_time_cmp(deadline, now) <= 0) {
Craig Tillerdd86b692017-04-06 10:43:11 -0700658 return 0;
659 }
Craig Tiller7f5fac92017-04-07 21:47:41 +0000660
Craig Tiller8c6878b2017-04-07 15:15:30 -0700661 static const gpr_timespec round_up = {
Yash Tibrewal06312bd2017-09-18 15:10:22 -0700662 0, /* tv_sec */
663 GPR_NS_PER_MS - 1, /* tv_nsec */
664 GPR_TIMESPAN /* clock_type */
665 };
Craig Tillerdd86b692017-04-06 10:43:11 -0700666 timeout = gpr_time_sub(deadline, now);
Craig Tiller7f5fac92017-04-07 21:47:41 +0000667 int millis = gpr_time_to_millis(gpr_time_add(timeout, round_up));
Craig Tillerdd86b692017-04-06 10:43:11 -0700668 return millis >= 1 ? millis : 1;
669}
670
671static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
672 grpc_pollset *notifier) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700673 grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read");
Craig Tillerdd86b692017-04-06 10:43:11 -0700674
675 /* Note, it is possible that fd_become_readable might be called twice with
676 different 'notifier's when an fd becomes readable and it is in two epoll
677 sets (This can happen briefly during polling island merges). In such cases
678 it does not really matter which notifer is set as the read_notifier_pollset
679 (They would both point to the same polling island anyway) */
680 /* Use release store to match with acquire load in fd_get_read_notifier */
681 gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
682}
683
684static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700685 grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write");
Craig Tillerdd86b692017-04-06 10:43:11 -0700686}
687
Craig Tiller249de2b2017-10-02 11:49:19 -0700688static grpc_error *fd_become_pollable(grpc_fd *fd, pollable **p) {
689 gpr_mu_lock(&fd->pollable_mu);
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700690 grpc_error *error = GRPC_ERROR_NONE;
691 static const char *err_desc = "fd_become_pollable";
Craig Tiller249de2b2017-10-02 11:49:19 -0700692 if (fd->pollable_obj == NULL) {
693 if (append_error(&error, pollable_create(PO_FD, &fd->pollable_obj),
694 err_desc)) {
Craig Tiller39908712017-10-02 22:58:24 +0000695fd->pollable_obj->owner_fd = fd;
Craig Tiller249de2b2017-10-02 11:49:19 -0700696 if (!append_error(&error, pollable_add_fd(fd->pollable_obj, fd),
697 err_desc)) {
Craig Tiller389ea902017-10-04 17:42:49 +0000698 POLLABLE_UNREF(fd->pollable_obj, "fd_pollable");
Craig Tiller249de2b2017-10-02 11:49:19 -0700699 fd->pollable_obj = NULL;
700 }
701 }
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700702 }
Craig Tiller249de2b2017-10-02 11:49:19 -0700703 if (error == GRPC_ERROR_NONE) {
704 GPR_ASSERT(fd->pollable_obj != NULL);
Craig Tiller389ea902017-10-04 17:42:49 +0000705 *p = POLLABLE_REF(fd->pollable_obj, "pollset");
Craig Tiller249de2b2017-10-02 11:49:19 -0700706 } else {
707 GPR_ASSERT(fd->pollable_obj == NULL);
708 *p = NULL;
709 }
710 gpr_mu_unlock(&fd->pollable_mu);
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700711 return error;
712}
713
Craig Tillerdd86b692017-04-06 10:43:11 -0700714/* pollset->po.mu lock must be held by the caller before calling this */
715static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
Craig Tillere24b24d2017-04-06 16:05:45 -0700716 grpc_closure *closure) {
717 GPR_ASSERT(pollset->shutdown_closure == NULL);
718 pollset->shutdown_closure = closure;
Craig Tiller23adbd52017-10-02 15:29:18 -0700719 GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(exec_ctx, pollset));
Craig Tillerd9cd8f02017-04-07 16:26:02 -0700720 pollset_maybe_finish_shutdown(exec_ctx, pollset);
Craig Tillere24b24d2017-04-06 16:05:45 -0700721}
Craig Tillerdd86b692017-04-06 10:43:11 -0700722
Craig Tiller9bedddd2017-06-08 17:05:00 -0700723static grpc_error *pollset_process_events(grpc_exec_ctx *exec_ctx,
724 grpc_pollset *pollset, bool drain) {
Craig Tiller4f07ea82017-06-08 23:56:06 +0000725 static const char *err_desc = "pollset_process_events";
Craig Tillere24b24d2017-04-06 16:05:45 -0700726 grpc_error *error = GRPC_ERROR_NONE;
Craig Tillerca4fc662017-06-09 11:13:56 -0700727 for (int i = 0; (drain || i < MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL) &&
728 pollset->event_cursor != pollset->event_count;
729 i++) {
Craig Tiller4f07ea82017-06-08 23:56:06 +0000730 int n = pollset->event_cursor++;
731 struct epoll_event *ev = &pollset->events[n];
732 void *data_ptr = ev->data.ptr;
733 if (1 & (intptr_t)data_ptr) {
Craig Tillercfaa0462017-05-05 15:27:40 +0000734 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller4f07ea82017-06-08 23:56:06 +0000735 gpr_log(GPR_DEBUG, "PS:%p got pollset_wakeup %p", pollset, data_ptr);
Craig Tillercd0354b2017-04-10 16:19:18 -0700736 }
Yash Tibrewal52778c42017-09-11 15:00:11 -0700737 append_error(&error,
738 grpc_wakeup_fd_consume_wakeup(
739 (grpc_wakeup_fd *)((~(intptr_t)1) & (intptr_t)data_ptr)),
Craig Tiller9bedddd2017-06-08 17:05:00 -0700740 err_desc);
Craig Tillere24b24d2017-04-06 16:05:45 -0700741 } else {
Craig Tiller61f96c12017-05-12 13:36:39 -0700742 grpc_fd *fd = (grpc_fd *)data_ptr;
Craig Tiller4f07ea82017-06-08 23:56:06 +0000743 bool cancel = (ev->events & (EPOLLERR | EPOLLHUP)) != 0;
744 bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0;
745 bool write_ev = (ev->events & EPOLLOUT) != 0;
Craig Tillercfaa0462017-05-05 15:27:40 +0000746 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tillere3a69332017-04-18 16:45:40 +0000747 gpr_log(GPR_DEBUG,
Craig Tiller4f07ea82017-06-08 23:56:06 +0000748 "PS:%p got fd %p: cancel=%d read=%d "
Craig Tillere3a69332017-04-18 16:45:40 +0000749 "write=%d",
Craig Tiller4f07ea82017-06-08 23:56:06 +0000750 pollset, fd, cancel, read_ev, write_ev);
Craig Tillercd0354b2017-04-10 16:19:18 -0700751 }
Craig Tiller61f96c12017-05-12 13:36:39 -0700752 if (read_ev || cancel) {
753 fd_become_readable(exec_ctx, fd, pollset);
754 }
755 if (write_ev || cancel) {
756 fd_become_writable(exec_ctx, fd);
Craig Tillere24b24d2017-04-06 16:05:45 -0700757 }
758 }
759 }
760
761 return error;
762}
763
Craig Tiller4f07ea82017-06-08 23:56:06 +0000764/* pollset_shutdown is guaranteed to be called before pollset_destroy. */
765static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
Craig Tiller389ea902017-10-04 17:42:49 +0000766 POLLABLE_UNREF(pollset->active_pollable, "pollset");
Craig Tiller249de2b2017-10-02 11:49:19 -0700767 pollset->active_pollable = NULL;
Craig Tiller9bedddd2017-06-08 17:05:00 -0700768 GRPC_LOG_IF_ERROR("pollset_process_events",
769 pollset_process_events(exec_ctx, pollset, true));
Craig Tiller4f07ea82017-06-08 23:56:06 +0000770}
771
772static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700773 pollable *p, gpr_timespec now,
Craig Tiller4f07ea82017-06-08 23:56:06 +0000774 gpr_timespec deadline) {
775 int timeout = poll_deadline_to_millis_timeout(deadline, now);
776
777 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller86ffd652017-06-20 00:16:54 +0000778 char *desc = pollable_desc(p);
779 gpr_log(GPR_DEBUG, "PS:%p poll %p[%s] for %dms", pollset, p, desc, timeout);
780 gpr_free(desc);
Craig Tiller4f07ea82017-06-08 23:56:06 +0000781 }
782
783 if (timeout != 0) {
784 GRPC_SCHEDULING_START_BLOCKING_REGION;
785 }
786 int r;
787 do {
Craig Tillerb4bb1cd2017-07-20 14:18:17 -0700788 GRPC_STATS_INC_SYSCALL_POLL(exec_ctx);
Craig Tiller4f07ea82017-06-08 23:56:06 +0000789 r = epoll_wait(p->epfd, pollset->events, MAX_EPOLL_EVENTS, timeout);
790 } while (r < 0 && errno == EINTR);
791 if (timeout != 0) {
792 GRPC_SCHEDULING_END_BLOCKING_REGION;
793 }
794
795 if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
796
797 if (GRPC_TRACER_ON(grpc_polling_trace)) {
798 gpr_log(GPR_DEBUG, "PS:%p poll %p got %d events", pollset, p, r);
799 }
800
801 pollset->event_cursor = 0;
802 pollset->event_count = r;
803
804 return GRPC_ERROR_NONE;
805}
806
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700807/* Return true if first in list */
Craig Tiller23adbd52017-10-02 15:29:18 -0700808static bool worker_insert(grpc_pollset_worker **root_worker,
809 grpc_pollset_worker *worker, pwlinks link) {
810 if (*root_worker == NULL) {
811 *root_worker = worker;
812 worker->links[link].next = worker->links[link].prev = worker;
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700813 return true;
814 } else {
Craig Tiller23adbd52017-10-02 15:29:18 -0700815 worker->links[link].next = *root_worker;
816 worker->links[link].prev = worker->links[link].next->links[link].prev;
817 worker->links[link].next->links[link].prev = worker;
818 worker->links[link].prev->links[link].next = worker;
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700819 return false;
820 }
821}
822
Craig Tiller249de2b2017-10-02 11:49:19 -0700823/* returns the new root IFF the root changed */
Craig Tiller23adbd52017-10-02 15:29:18 -0700824typedef enum { WRR_NEW_ROOT, WRR_EMPTIED, WRR_REMOVED } worker_remove_result;
825
826static worker_remove_result worker_remove(grpc_pollset_worker **root_worker,
827 grpc_pollset_worker *worker,
828 pwlinks link) {
829 if (worker == *root_worker) {
830 if (worker == worker->links[link].next) {
831 *root_worker = NULL;
832 return WRR_EMPTIED;
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700833 } else {
Craig Tiller23adbd52017-10-02 15:29:18 -0700834 *root_worker = worker->links[link].next;
835 worker->links[link].prev->links[link].next = worker->links[link].next;
836 worker->links[link].next->links[link].prev = worker->links[link].prev;
837 return WRR_NEW_ROOT;
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700838 }
839 } else {
Craig Tiller23adbd52017-10-02 15:29:18 -0700840 worker->links[link].prev->links[link].next = worker->links[link].next;
841 worker->links[link].next->links[link].prev = worker->links[link].prev;
842 return WRR_REMOVED;
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700843 }
844}
845
Craig Tillere24b24d2017-04-06 16:05:45 -0700846/* Return true if this thread should poll */
847static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
Craig Tillere3a69332017-04-18 16:45:40 +0000848 grpc_pollset_worker **worker_hdl, gpr_timespec *now,
Craig Tillere24b24d2017-04-06 16:05:45 -0700849 gpr_timespec deadline) {
Craig Tillere3a69332017-04-18 16:45:40 +0000850 bool do_poll = true;
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700851 if (worker_hdl != NULL) *worker_hdl = worker;
Craig Tiller9f012512017-04-13 15:37:14 -0700852 worker->initialized_cv = false;
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700853 worker->kicked = false;
854 worker->pollset = pollset;
Craig Tiller389ea902017-10-04 17:42:49 +0000855 worker->pollable_obj = POLLABLE_REF(pollset->active_pollable, "pollset_worker");
Craig Tiller23adbd52017-10-02 15:29:18 -0700856 worker_insert(&pollset->root_worker, worker, PWLINK_POLLSET);
Craig Tiller249de2b2017-10-02 11:49:19 -0700857 gpr_mu_lock(&worker->pollable_obj->mu);
Craig Tiller23adbd52017-10-02 15:29:18 -0700858 if (!worker_insert(&worker->pollable_obj->root_worker, worker,
859 PWLINK_POLLABLE)) {
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700860 worker->initialized_cv = true;
861 gpr_cv_init(&worker->cv);
Craig Tiller6c8383a2017-05-05 16:54:42 +0000862 if (GRPC_TRACER_ON(grpc_polling_trace) &&
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700863 worker->pollable_obj->root_worker != worker) {
Craig Tillere3a69332017-04-18 16:45:40 +0000864 gpr_log(GPR_DEBUG, "PS:%p wait %p w=%p for %dms", pollset,
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700865 worker->pollable_obj, worker,
Craig Tillere3a69332017-04-18 16:45:40 +0000866 poll_deadline_to_millis_timeout(deadline, *now));
867 }
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700868 while (do_poll && worker->pollable_obj->root_worker != worker) {
Craig Tiller249de2b2017-10-02 11:49:19 -0700869 if (gpr_cv_wait(&worker->cv, &worker->pollable_obj->mu, deadline)) {
Craig Tillercfaa0462017-05-05 15:27:40 +0000870 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tillere3a69332017-04-18 16:45:40 +0000871 gpr_log(GPR_DEBUG, "PS:%p timeout_wait %p w=%p", pollset,
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700872 worker->pollable_obj, worker);
Craig Tillere3a69332017-04-18 16:45:40 +0000873 }
874 do_poll = false;
875 } else if (worker->kicked) {
Craig Tillercfaa0462017-05-05 15:27:40 +0000876 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700877 gpr_log(GPR_DEBUG, "PS:%p wakeup %p w=%p", pollset,
878 worker->pollable_obj, worker);
Craig Tillere3a69332017-04-18 16:45:40 +0000879 }
880 do_poll = false;
Craig Tillercfaa0462017-05-05 15:27:40 +0000881 } else if (GRPC_TRACER_ON(grpc_polling_trace) &&
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700882 worker->pollable_obj->root_worker != worker) {
Craig Tillere3a69332017-04-18 16:45:40 +0000883 gpr_log(GPR_DEBUG, "PS:%p spurious_wakeup %p w=%p", pollset,
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700884 worker->pollable_obj, worker);
Craig Tillere24b24d2017-04-06 16:05:45 -0700885 }
886 }
Craig Tillere3a69332017-04-18 16:45:40 +0000887 *now = gpr_now(now->clock_type);
Craig Tillere24b24d2017-04-06 16:05:45 -0700888 }
Craig Tiller249de2b2017-10-02 11:49:19 -0700889 gpr_mu_unlock(&worker->pollable_obj->mu);
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700890
Craig Tillere3a69332017-04-18 16:45:40 +0000891 return do_poll && pollset->shutdown_closure == NULL &&
Craig Tiller249de2b2017-10-02 11:49:19 -0700892 pollset->active_pollable == worker->pollable_obj;
Craig Tillere24b24d2017-04-06 16:05:45 -0700893}
894
Craig Tillerf8401102017-04-17 09:47:28 -0700895static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
896 grpc_pollset_worker *worker,
Craig Tillere24b24d2017-04-06 16:05:45 -0700897 grpc_pollset_worker **worker_hdl) {
Craig Tiller249de2b2017-10-02 11:49:19 -0700898 gpr_mu_lock(&worker->pollable_obj->mu);
Craig Tiller23adbd52017-10-02 15:29:18 -0700899 if (worker_remove(&worker->pollable_obj->root_worker, worker,
900 PWLINK_POLLABLE) == WRR_NEW_ROOT) {
901 grpc_pollset_worker *new_root = worker->pollable_obj->root_worker;
Craig Tiller249de2b2017-10-02 11:49:19 -0700902 GPR_ASSERT(new_root->initialized_cv);
903 gpr_cv_signal(&new_root->cv);
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700904 }
905 if (worker->initialized_cv) {
906 gpr_cv_destroy(&worker->cv);
907 }
Craig Tiller249de2b2017-10-02 11:49:19 -0700908 gpr_mu_unlock(&worker->pollable_obj->mu);
Craig Tiller23adbd52017-10-02 15:29:18 -0700909 if (worker_remove(&pollset->root_worker, worker, PWLINK_POLLSET)) {
Craig Tiller227e11b2017-04-20 18:25:02 +0000910 pollset_maybe_finish_shutdown(exec_ctx, pollset);
911 }
Craig Tiller389ea902017-10-04 17:42:49 +0000912 POLLABLE_UNREF(worker->pollable_obj, "pollset_worker");
Craig Tillere24b24d2017-04-06 16:05:45 -0700913}
914
Craig Tiller8fc1ca12017-04-07 13:01:48 -0700915/* pollset->po.mu lock must be held by the caller before calling this.
916 The function pollset_work() may temporarily release the lock (pollset->po.mu)
Craig Tillerdd86b692017-04-06 10:43:11 -0700917 during the course of its execution but it will always re-acquire the lock and
918 ensure that it is held by the time the function returns */
919static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
920 grpc_pollset_worker **worker_hdl,
921 gpr_timespec now, gpr_timespec deadline) {
Craig Tiller90a9d7d2017-10-04 21:24:03 +0000922#ifdef GRPC_EPOLLEX_CREATE_WORKERS_ON_HEAP
923 grpc_pollset_worker *worker = gpr_malloc(sizeof(*worker));
924#define WORKER_PTR (worker)
925#else
Craig Tillere24b24d2017-04-06 16:05:45 -0700926 grpc_pollset_worker worker;
Craig Tiller90a9d7d2017-10-04 21:24:03 +0000927#define WORKER_PTR (&worker)
928#endif
Craig Tiller249de2b2017-10-02 11:49:19 -0700929 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tillercd0354b2017-04-10 16:19:18 -0700930 gpr_log(GPR_DEBUG, "PS:%p work hdl=%p worker=%p now=%" PRId64
Craig Tillerc5ce0572017-10-03 23:09:41 +0000931 ".%09d deadline=%" PRId64 ".%09d kwp=%d pollable=%p",
Craig Tiller90a9d7d2017-10-04 21:24:03 +0000932 pollset, worker_hdl, WORKER_PTR, now.tv_sec, now.tv_nsec,
Craig Tillerc5ce0572017-10-03 23:09:41 +0000933 deadline.tv_sec, deadline.tv_nsec, pollset->kicked_without_poller, pollset->active_pollable);
Craig Tillercd0354b2017-04-10 16:19:18 -0700934 }
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700935 static const char *err_desc = "pollset_work";
Craig Tiller90a9d7d2017-10-04 21:24:03 +0000936 grpc_error *error = GRPC_ERROR_NONE;
Craig Tillerbb93af62017-04-07 23:49:00 +0000937 if (pollset->kicked_without_poller) {
938 pollset->kicked_without_poller = false;
Craig Tiller90a9d7d2017-10-04 21:24:03 +0000939 } else {
940 if (begin_worker(pollset, WORKER_PTR, worker_hdl, &now, deadline)) {
Craig Tillerbb93af62017-04-07 23:49:00 +0000941 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
Craig Tiller90a9d7d2017-10-04 21:24:03 +0000942 gpr_tls_set(&g_current_thread_worker, (intptr_t)WORKER_PTR);
Craig Tillere24b24d2017-04-06 16:05:45 -0700943 GPR_ASSERT(!pollset->shutdown_closure);
Craig Tiller249de2b2017-10-02 11:49:19 -0700944 gpr_mu_unlock(&pollset->mu);
Craig Tiller4f07ea82017-06-08 23:56:06 +0000945 if (pollset->event_cursor == pollset->event_count) {
Craig Tiller90a9d7d2017-10-04 21:24:03 +0000946 append_error(&error, pollset_epoll(exec_ctx, pollset, WORKER_PTR->pollable_obj,
Craig Tiller9bedddd2017-06-08 17:05:00 -0700947 now, deadline),
Craig Tiller4f07ea82017-06-08 23:56:06 +0000948 err_desc);
949 }
Craig Tiller9bedddd2017-06-08 17:05:00 -0700950 append_error(&error, pollset_process_events(exec_ctx, pollset, false),
951 err_desc);
Craig Tiller249de2b2017-10-02 11:49:19 -0700952 grpc_exec_ctx_flush(exec_ctx);
953 gpr_mu_lock(&pollset->mu);
Craig Tillerbb93af62017-04-07 23:49:00 +0000954 gpr_tls_set(&g_current_thread_pollset, 0);
955 gpr_tls_set(&g_current_thread_worker, 0);
Craig Tillere24b24d2017-04-06 16:05:45 -0700956 }
Craig Tiller90a9d7d2017-10-04 21:24:03 +0000957 end_worker(exec_ctx, pollset, WORKER_PTR, worker_hdl);
958 }
959#ifdef GRPC_EPOLLEX_CREATE_WORKERS_ON_HEAP
960 gpr_free(worker);
961#endif
Craig Tillere24b24d2017-04-06 16:05:45 -0700962 return error;
Craig Tillerdd86b692017-04-06 10:43:11 -0700963}
964
Craig Tiller23adbd52017-10-02 15:29:18 -0700965static grpc_error *pollset_transition_pollable_from_empty_to_fd_locked(
966 grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd *fd) {
967 static const char *err_desc = "pollset_transition_pollable_from_empty_to_fd";
968 grpc_error *error = GRPC_ERROR_NONE;
969 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tillerc5ce0572017-10-03 23:09:41 +0000970 gpr_log(GPR_DEBUG, "PS:%p add fd %p (%d); transition pollable from empty to fd",
971 pollset, fd, fd->fd);
Craig Tiller23adbd52017-10-02 15:29:18 -0700972 }
973 append_error(&error, pollset_kick_all(exec_ctx, pollset), err_desc);
Craig Tiller389ea902017-10-04 17:42:49 +0000974 POLLABLE_UNREF(pollset->active_pollable, "pollset");
Craig Tiller23adbd52017-10-02 15:29:18 -0700975 append_error(&error, fd_become_pollable(fd, &pollset->active_pollable),
976 err_desc);
977 return error;
978}
979
980static grpc_error *pollset_transition_pollable_from_fd_to_multi_locked(
981 grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd *and_add_fd) {
982 static const char *err_desc = "pollset_transition_pollable_from_fd_to_multi";
983 grpc_error *error = GRPC_ERROR_NONE;
984 if (GRPC_TRACER_ON(grpc_polling_trace)) {
985 gpr_log(GPR_DEBUG,
Craig Tillerc5ce0572017-10-03 23:09:41 +0000986 "PS:%p add fd %p (%d); transition pollable from fd %p to multipoller",
987 pollset, and_add_fd, and_add_fd?and_add_fd->fd:-1, pollset->active_pollable->owner_fd);
Craig Tiller23adbd52017-10-02 15:29:18 -0700988 }
989 append_error(&error, pollset_kick_all(exec_ctx, pollset), err_desc);
Craig Tiller23adbd52017-10-02 15:29:18 -0700990 grpc_fd *initial_fd = pollset->active_pollable->owner_fd;
Craig Tiller389ea902017-10-04 17:42:49 +0000991 POLLABLE_UNREF(pollset->active_pollable, "pollset");
992 pollset->active_pollable = NULL;
Craig Tiller23adbd52017-10-02 15:29:18 -0700993 if (append_error(&error, pollable_create(PO_MULTI, &pollset->active_pollable),
994 err_desc)) {
995 append_error(&error, pollable_add_fd(pollset->active_pollable, initial_fd),
996 err_desc);
997 if (and_add_fd != NULL) {
998 append_error(&error,
999 pollable_add_fd(pollset->active_pollable, and_add_fd),
1000 err_desc);
1001 }
1002 }
1003 return error;
1004}
1005
Craig Tiller50480b22017-04-17 16:34:52 +00001006/* expects pollsets locked, flag whether fd is locked or not */
Craig Tillerf8401102017-04-17 09:47:28 -07001007static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx,
Craig Tiller249de2b2017-10-02 11:49:19 -07001008 grpc_pollset *pollset, grpc_fd *fd) {
Craig Tiller50480b22017-04-17 16:34:52 +00001009 grpc_error *error = GRPC_ERROR_NONE;
Craig Tiller389ea902017-10-04 17:42:49 +00001010 pollable *po_at_start = POLLABLE_REF(pollset->active_pollable, "pollset_add_fd");
Craig Tiller249de2b2017-10-02 11:49:19 -07001011 switch (pollset->active_pollable->type) {
1012 case PO_EMPTY:
1013 /* empty pollable --> single fd pollable */
Craig Tiller23adbd52017-10-02 15:29:18 -07001014 error = pollset_transition_pollable_from_empty_to_fd_locked(exec_ctx,
1015 pollset, fd);
Craig Tiller249de2b2017-10-02 11:49:19 -07001016 break;
1017 case PO_FD:
1018 /* fd --> multipoller */
Craig Tiller23adbd52017-10-02 15:29:18 -07001019 error = pollset_transition_pollable_from_fd_to_multi_locked(exec_ctx,
1020 pollset, fd);
Craig Tiller249de2b2017-10-02 11:49:19 -07001021 break;
1022 case PO_MULTI:
Craig Tiller23adbd52017-10-02 15:29:18 -07001023 error = pollable_add_fd(pollset->active_pollable, fd);
Craig Tiller249de2b2017-10-02 11:49:19 -07001024 break;
1025 }
1026 if (error != GRPC_ERROR_NONE) {
Craig Tiller389ea902017-10-04 17:42:49 +00001027 POLLABLE_UNREF(pollset->active_pollable, "pollset");
Craig Tiller249de2b2017-10-02 11:49:19 -07001028 pollset->active_pollable = po_at_start;
1029 } else {
Craig Tiller389ea902017-10-04 17:42:49 +00001030 POLLABLE_UNREF(po_at_start, "pollset_add_fd");
Craig Tillere24b24d2017-04-06 16:05:45 -07001031 }
Craig Tiller50480b22017-04-17 16:34:52 +00001032 return error;
1033}
Craig Tillerd1d7fdd2017-04-14 16:16:24 -07001034
Craig Tiller23adbd52017-10-02 15:29:18 -07001035static grpc_error *pollset_as_multipollable(grpc_exec_ctx *exec_ctx,
1036 grpc_pollset *pollset,
1037 pollable **pollable_obj) {
1038 grpc_error *error = GRPC_ERROR_NONE;
1039 gpr_mu_lock(&pollset->mu);
Craig Tiller389ea902017-10-04 17:42:49 +00001040 pollable *po_at_start = POLLABLE_REF(pollset->active_pollable, "pollset_as_multipollable");
Craig Tiller23adbd52017-10-02 15:29:18 -07001041 switch (pollset->active_pollable->type) {
1042 case PO_EMPTY:
Craig Tiller389ea902017-10-04 17:42:49 +00001043 POLLABLE_UNREF(pollset->active_pollable, "pollset");
Craig Tiller23adbd52017-10-02 15:29:18 -07001044 error = pollable_create(PO_MULTI, &pollset->active_pollable);
1045 break;
1046 case PO_FD:
1047 error = pollset_transition_pollable_from_fd_to_multi_locked(
1048 exec_ctx, pollset, NULL);
1049 break;
1050 case PO_MULTI:
1051 break;
1052 }
1053 if (error != GRPC_ERROR_NONE) {
Craig Tiller389ea902017-10-04 17:42:49 +00001054 POLLABLE_UNREF(pollset->active_pollable, "pollset");
Craig Tiller23adbd52017-10-02 15:29:18 -07001055 pollset->active_pollable = po_at_start;
Craig Tiller389ea902017-10-04 17:42:49 +00001056 *pollable_obj = NULL;
Craig Tiller23adbd52017-10-02 15:29:18 -07001057 } else {
Craig Tiller389ea902017-10-04 17:42:49 +00001058 *pollable_obj = POLLABLE_REF(pollset->active_pollable, "pollset_set");
1059 POLLABLE_UNREF(po_at_start, "pollset_as_multipollable");
Craig Tiller23adbd52017-10-02 15:29:18 -07001060 }
1061 gpr_mu_unlock(&pollset->mu);
1062 return error;
1063}
1064
Craig Tiller50480b22017-04-17 16:34:52 +00001065static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1066 grpc_fd *fd) {
Craig Tiller249de2b2017-10-02 11:49:19 -07001067 gpr_mu_lock(&pollset->mu);
1068 grpc_error *error = pollset_add_fd_locked(exec_ctx, pollset, fd);
1069 gpr_mu_unlock(&pollset->mu);
Craig Tillere24b24d2017-04-06 16:05:45 -07001070 GRPC_LOG_IF_ERROR("pollset_add_fd", error);
Craig Tillerdd86b692017-04-06 10:43:11 -07001071}
1072
1073/*******************************************************************************
1074 * Pollset-set Definitions
1075 */
1076
Craig Tiller249de2b2017-10-02 11:49:19 -07001077static grpc_pollset_set *pss_lock_adam(grpc_pollset_set *pss) {
1078 gpr_mu_lock(&pss->mu);
1079 while (pss->parent != NULL) {
Craig Tiller249de2b2017-10-02 11:49:19 -07001080 gpr_mu_unlock(&pss->mu);
1081 pss = pss->parent;
Craig Tillerc5ce0572017-10-03 23:09:41 +00001082 gpr_mu_lock(&pss->mu);
Craig Tiller249de2b2017-10-02 11:49:19 -07001083 }
1084 return pss;
1085}
1086
Craig Tillerdd86b692017-04-06 10:43:11 -07001087static grpc_pollset_set *pollset_set_create(void) {
Craig Tiller23adbd52017-10-02 15:29:18 -07001088 grpc_pollset_set *pss = (grpc_pollset_set *)gpr_zalloc(sizeof(*pss));
Craig Tiller249de2b2017-10-02 11:49:19 -07001089 gpr_mu_init(&pss->mu);
1090 gpr_ref_init(&pss->refs, 1);
Craig Tillerdd86b692017-04-06 10:43:11 -07001091 return pss;
1092}
1093
Craig Tiller389ea902017-10-04 17:42:49 +00001094static void pollset_set_unref(grpc_exec_ctx *exec_ctx,
1095 grpc_pollset_set *pss) {
1096 if (pss == NULL) return;
1097 if (!gpr_unref(&pss->refs)) return;
1098 pollset_set_unref(exec_ctx, pss->parent);
1099 gpr_mu_destroy(&pss->mu);
1100 for (size_t i=0; i<pss->pollset_count; i++) {
1101 POLLABLE_UNREF(pss->pollsets[i], "pollset_set");
1102 }
1103 for (size_t i=0;i<pss->fd_count; i++) {
1104 UNREF_BY(exec_ctx, pss->fds[i], 2, "pollset_set");
1105 }
1106 gpr_free(pss->pollsets);
1107 gpr_free(pss->fds);
1108 gpr_free(pss);
1109}
Craig Tillerdd86b692017-04-06 10:43:11 -07001110
1111static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
1112 grpc_fd *fd) {
Craig Tillerc5ce0572017-10-03 23:09:41 +00001113 if (GRPC_TRACER_ON(grpc_polling_trace)) {
1114 gpr_log(GPR_DEBUG, "PSS:%p: add fd %p (%d)", pss, fd, fd->fd);
1115 }
Craig Tiller249de2b2017-10-02 11:49:19 -07001116 grpc_error *error = GRPC_ERROR_NONE;
1117 static const char *err_desc = "pollset_set_add_fd";
1118 pss = pss_lock_adam(pss);
Craig Tiller23adbd52017-10-02 15:29:18 -07001119 for (size_t i = 0; i < pss->pollset_count; i++) {
1120 append_error(&error, pollable_add_fd(pss->pollsets[i], fd), err_desc);
Craig Tiller249de2b2017-10-02 11:49:19 -07001121 }
Craig Tiller23adbd52017-10-02 15:29:18 -07001122 if (pss->fd_count == pss->fd_capacity) {
1123 pss->fd_capacity = GPR_MAX(pss->fd_capacity * 2, 8);
1124 pss->fds = gpr_realloc(pss->fds, pss->fd_capacity * sizeof(*pss->fds));
1125 }
1126 REF_BY(fd, 2, "pollset_set");
1127 pss->fds[pss->fd_count++] = fd;
Craig Tiller249de2b2017-10-02 11:49:19 -07001128 gpr_mu_unlock(&pss->mu);
1129
Craig Tiller23adbd52017-10-02 15:29:18 -07001130 GRPC_LOG_IF_ERROR(err_desc, error);
Craig Tillerdd86b692017-04-06 10:43:11 -07001131}
1132
1133static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
Craig Tiller23adbd52017-10-02 15:29:18 -07001134 grpc_fd *fd) {
Craig Tillerc5ce0572017-10-03 23:09:41 +00001135 if (GRPC_TRACER_ON(grpc_polling_trace)) {
1136 gpr_log(GPR_DEBUG, "PSS:%p: del fd %p", pss, fd);
1137 }
Craig Tiller23adbd52017-10-02 15:29:18 -07001138 pss = pss_lock_adam(pss);
1139 size_t i;
1140 for (i = 0; i < pss->fd_count; i++) {
1141 if (pss->fds[i] == fd) {
1142 UNREF_BY(exec_ctx, fd, 2, "pollset_set");
1143 break;
1144 }
1145 }
1146 GPR_ASSERT(i != pss->fd_count);
1147 for (; i < pss->fd_count - 1; i++) {
1148 pss->fds[i] = pss->fds[i + 1];
1149 }
1150 pss->fd_count--;
1151 gpr_mu_unlock(&pss->mu);
1152}
Craig Tillerdd86b692017-04-06 10:43:11 -07001153
1154static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
Craig Tiller23adbd52017-10-02 15:29:18 -07001155 grpc_pollset_set *pss, grpc_pollset *ps) {
Craig Tillerc5ce0572017-10-03 23:09:41 +00001156 if (GRPC_TRACER_ON(grpc_polling_trace)) {
1157 gpr_log(GPR_DEBUG, "PSS:%p: add pollset %p", pss, ps);
1158 }
Craig Tiller23adbd52017-10-02 15:29:18 -07001159 grpc_error *error = GRPC_ERROR_NONE;
1160 static const char *err_desc = "pollset_set_add_pollset";
Craig Tillerc5ce0572017-10-03 23:09:41 +00001161 pollable *pollable_obj = NULL;
Craig Tiller23adbd52017-10-02 15:29:18 -07001162 if (!GRPC_LOG_IF_ERROR(
1163 err_desc, pollset_as_multipollable(exec_ctx, ps, &pollable_obj))) {
Craig Tiller389ea902017-10-04 17:42:49 +00001164GPR_ASSERT(pollable_obj==NULL);
Craig Tiller23adbd52017-10-02 15:29:18 -07001165 return;
1166 }
1167 pss = pss_lock_adam(pss);
1168 for (size_t i = 0; i < pss->fd_count; i++) {
1169 append_error(&error, pollable_add_fd(pollable_obj, pss->fds[i]), err_desc);
1170 }
1171 if (pss->pollset_count == pss->pollset_capacity) {
1172 pss->pollset_capacity = GPR_MAX(pss->pollset_capacity * 2, 8);
1173 pss->pollsets = gpr_realloc(pss->pollsets,
1174 pss->pollset_capacity * sizeof(*pss->pollsets));
1175 }
1176 pss->pollsets[pss->pollset_count++] = pollable_obj;
1177 gpr_mu_unlock(&pss->mu);
1178
1179 GRPC_LOG_IF_ERROR(err_desc, error);
1180}
Craig Tillerdd86b692017-04-06 10:43:11 -07001181
1182static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
Craig Tiller23adbd52017-10-02 15:29:18 -07001183 grpc_pollset_set *pss, grpc_pollset *ps) {
Craig Tillerc5ce0572017-10-03 23:09:41 +00001184 if (GRPC_TRACER_ON(grpc_polling_trace)) {
1185 gpr_log(GPR_DEBUG, "PSS:%p: del pollset %p", pss, ps);
1186 }
Craig Tiller23adbd52017-10-02 15:29:18 -07001187 pss = pss_lock_adam(pss);
1188 size_t i;
1189 for (i = 0; i < pss->pollset_count; i++) {
1190 if (pss->pollsets[i] == ps->active_pollable) {
Craig Tiller389ea902017-10-04 17:42:49 +00001191 POLLABLE_UNREF(pss->pollsets[i], "pollset_set");
Craig Tiller23adbd52017-10-02 15:29:18 -07001192 break;
1193 }
1194 }
1195 GPR_ASSERT(i != pss->pollset_count);
1196 for (; i < pss->pollset_count - 1; i++) {
1197 pss->pollsets[i] = pss->pollsets[i + 1];
1198 }
1199 pss->pollset_count--;
1200 gpr_mu_unlock(&pss->mu);
1201}
1202
1203static grpc_error *add_fds_to_pollables(grpc_exec_ctx *exec_ctx, grpc_fd **fds,
1204 size_t fd_count, pollable **pollables,
1205 size_t pollable_count,
1206 const char *err_desc) {
1207 grpc_error *error = GRPC_ERROR_NONE;
1208 for (size_t i = 0; i < fd_count; i++) {
1209 for (size_t j = 0; j < pollable_count; j++) {
1210 append_error(&error, pollable_add_fd(pollables[j], fds[i]), err_desc);
1211 }
1212 }
1213 return error;
1214}
Craig Tillerdd86b692017-04-06 10:43:11 -07001215
1216static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
Craig Tiller23adbd52017-10-02 15:29:18 -07001217 grpc_pollset_set *a,
1218 grpc_pollset_set *b) {
Craig Tillerc5ce0572017-10-03 23:09:41 +00001219 if (GRPC_TRACER_ON(grpc_polling_trace)) {
1220 gpr_log(GPR_DEBUG, "PSS: merge (%p, %p)", a, b);
1221 }
Craig Tiller23adbd52017-10-02 15:29:18 -07001222 grpc_error *error = GRPC_ERROR_NONE;
1223 static const char *err_desc = "pollset_set_add_fd";
1224 for (;;) {
1225 if (a == b) {
1226 // pollset ancestors are the same: nothing to do
1227 return;
1228 }
1229 if (a > b) {
1230 GPR_SWAP(grpc_pollset_set *, a, b);
1231 }
Craig Tillerc5ce0572017-10-03 23:09:41 +00001232 gpr_mu *a_mu = &a->mu;
1233 gpr_mu *b_mu = &b->mu;
1234 gpr_mu_lock(a_mu);
1235 gpr_mu_lock(b_mu);
Craig Tiller23adbd52017-10-02 15:29:18 -07001236 if (a->parent != NULL) {
1237 a = a->parent;
1238 } else if (b->parent != NULL) {
1239 b = b->parent;
1240 } else {
1241 break; // exit loop, both pollsets locked
1242 }
Craig Tillerc5ce0572017-10-03 23:09:41 +00001243 gpr_mu_unlock(a_mu);
1244 gpr_mu_unlock(b_mu);
Craig Tiller23adbd52017-10-02 15:29:18 -07001245 }
1246 // try to do the least copying possible
1247 // TODO(ctiller): there's probably a better heuristic here
1248 const size_t a_size = a->fd_count + a->pollset_count;
1249 const size_t b_size = b->fd_count + b->pollset_count;
1250 if (b_size > a_size) {
1251 GPR_SWAP(grpc_pollset_set *, a, b);
1252 }
Craig Tillerc5ce0572017-10-03 23:09:41 +00001253 if (GRPC_TRACER_ON(grpc_polling_trace)) {
1254 gpr_log(GPR_DEBUG, "PSS: parent %p to %p", b, a);
1255 }
Craig Tiller23adbd52017-10-02 15:29:18 -07001256 gpr_ref(&a->refs);
1257 b->parent = a;
1258 append_error(&error,
1259 add_fds_to_pollables(exec_ctx, a->fds, a->fd_count, b->pollsets,
1260 b->pollset_count, "merge_a2b"),
1261 err_desc);
1262 append_error(&error,
1263 add_fds_to_pollables(exec_ctx, b->fds, b->fd_count, a->pollsets,
1264 a->pollset_count, "merge_b2a"),
1265 err_desc);
1266 if (a->fd_capacity < a->fd_count + b->fd_count) {
1267 a->fd_capacity = GPR_MAX(2 * a->fd_capacity, a->fd_count + b->fd_count);
1268 a->fds = gpr_realloc(a->fds, a->fd_capacity * sizeof(*a->fds));
1269 }
1270 if (a->pollset_capacity < a->pollset_count + b->pollset_count) {
1271 a->pollset_capacity =
1272 GPR_MAX(2 * a->pollset_capacity, a->pollset_count + b->pollset_count);
1273 a->pollsets =
1274 gpr_realloc(a->pollsets, a->pollset_capacity * sizeof(*a->pollsets));
1275 }
1276 memcpy(a->fds + a->fd_count, b->fds, b->fd_count * sizeof(*b->fds));
1277 memcpy(a->pollsets + a->pollset_count, b->pollsets,
1278 b->pollset_count * sizeof(*b->pollsets));
1279 a->fd_count += b->fd_count;
1280 a->pollset_count += b->pollset_count;
1281 gpr_free(b->fds);
1282 gpr_free(b->pollsets);
Craig Tiller90a9d7d2017-10-04 21:24:03 +00001283 b->fds = NULL;
1284 b->pollsets = NULL;
Craig Tiller23adbd52017-10-02 15:29:18 -07001285 b->fd_count = b->fd_capacity = b->pollset_count = b->pollset_capacity = 0;
1286 gpr_mu_unlock(&a->mu);
1287 gpr_mu_unlock(&b->mu);
1288}
Craig Tillerdd86b692017-04-06 10:43:11 -07001289
1290static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
1291 grpc_pollset_set *bag,
Craig Tillerf081d892017-04-07 14:04:42 -07001292 grpc_pollset_set *item) {}
Craig Tillerdd86b692017-04-06 10:43:11 -07001293
1294/*******************************************************************************
1295 * Event engine binding
1296 */
1297
1298static void shutdown_engine(void) {
1299 fd_global_shutdown();
1300 pollset_global_shutdown();
1301}
1302
1303static const grpc_event_engine_vtable vtable = {
Yash Tibrewal533d1182017-09-18 10:48:22 -07001304 sizeof(grpc_pollset),
Craig Tillerdd86b692017-04-06 10:43:11 -07001305
Yash Tibrewal533d1182017-09-18 10:48:22 -07001306 fd_create,
1307 fd_wrapped_fd,
1308 fd_orphan,
1309 fd_shutdown,
1310 fd_notify_on_read,
1311 fd_notify_on_write,
1312 fd_is_shutdown,
1313 fd_get_read_notifier_pollset,
Craig Tillerdd86b692017-04-06 10:43:11 -07001314
Yash Tibrewal533d1182017-09-18 10:48:22 -07001315 pollset_init,
1316 pollset_shutdown,
1317 pollset_destroy,
1318 pollset_work,
1319 pollset_kick,
1320 pollset_add_fd,
Craig Tillerdd86b692017-04-06 10:43:11 -07001321
Yash Tibrewal533d1182017-09-18 10:48:22 -07001322 pollset_set_create,
Craig Tiller389ea902017-10-04 17:42:49 +00001323 pollset_set_unref, // destroy ==> unref 1 public ref
Yash Tibrewal533d1182017-09-18 10:48:22 -07001324 pollset_set_add_pollset,
1325 pollset_set_del_pollset,
1326 pollset_set_add_pollset_set,
1327 pollset_set_del_pollset_set,
1328 pollset_set_add_fd,
1329 pollset_set_del_fd,
Craig Tillerdd86b692017-04-06 10:43:11 -07001330
Yash Tibrewal533d1182017-09-18 10:48:22 -07001331 shutdown_engine,
Craig Tillerdd86b692017-04-06 10:43:11 -07001332};
1333
Craig Tiller6c8383a2017-05-05 16:54:42 +00001334const grpc_event_engine_vtable *grpc_init_epollex_linux(
1335 bool explicitly_requested) {
Craig Tillerdd86b692017-04-06 10:43:11 -07001336 if (!grpc_has_wakeup_fd()) {
1337 return NULL;
1338 }
1339
Craig Tiller819cd882017-04-25 13:18:22 -07001340 if (!grpc_is_epollexclusive_available()) {
Craig Tillerdd86b692017-04-06 10:43:11 -07001341 return NULL;
1342 }
1343
Craig Tiller389ea902017-10-04 17:42:49 +00001344 grpc_register_tracer(&grpc_trace_pollable_refcount);
1345
Craig Tillerdd86b692017-04-06 10:43:11 -07001346 fd_global_init();
1347
1348 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
Craig Tiller54d388d2017-04-14 16:20:45 -07001349 pollset_global_shutdown();
1350 fd_global_shutdown();
Craig Tillerdd86b692017-04-06 10:43:11 -07001351 return NULL;
1352 }
1353
1354 return &vtable;
1355}
1356
1357#else /* defined(GRPC_LINUX_EPOLL) */
1358#if defined(GRPC_POSIX_SOCKET)
1359#include "src/core/lib/iomgr/ev_posix.h"
1360/* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
1361 * NULL */
Craig Tiller6c8383a2017-05-05 16:54:42 +00001362const grpc_event_engine_vtable *grpc_init_epollex_linux(
1363 bool explicitly_requested) {
1364 return NULL;
1365}
Craig Tillerdd86b692017-04-06 10:43:11 -07001366#endif /* defined(GRPC_POSIX_SOCKET) */
1367
1368#endif /* !defined(GRPC_LINUX_EPOLL) */