blob: 671c4cd730d19af26d5a54e23359067471c948a3 [file] [log] [blame]
Craig Tillerdd86b692017-04-06 10:43:11 -07001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2017 gRPC authors.
Craig Tillerdd86b692017-04-06 10:43:11 -07004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
Craig Tillerdd86b692017-04-06 10:43:11 -07008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
Craig Tillerdd86b692017-04-06 10:43:11 -070010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
Craig Tillerdd86b692017-04-06 10:43:11 -070016 *
17 */
18
19#include "src/core/lib/iomgr/port.h"
20
21/* This polling engine is only relevant on linux kernels supporting epoll() */
22#ifdef GRPC_LINUX_EPOLL
23
Craig Tillerc6109852017-05-01 14:26:49 -070024#include "src/core/lib/iomgr/ev_epollex_linux.h"
Craig Tillerdd86b692017-04-06 10:43:11 -070025
26#include <assert.h>
27#include <errno.h>
28#include <poll.h>
29#include <pthread.h>
30#include <string.h>
Craig Tillerdd86b692017-04-06 10:43:11 -070031#include <sys/socket.h>
32#include <unistd.h>
33
34#include <grpc/support/alloc.h>
35#include <grpc/support/log.h>
36#include <grpc/support/string_util.h>
37#include <grpc/support/tls.h>
38#include <grpc/support/useful.h>
39
Craig Tillerb4bb1cd2017-07-20 14:18:17 -070040#include "src/core/lib/debug/stats.h"
Craig Tillerdd86b692017-04-06 10:43:11 -070041#include "src/core/lib/iomgr/ev_posix.h"
42#include "src/core/lib/iomgr/iomgr_internal.h"
Craig Tiller819cd882017-04-25 13:18:22 -070043#include "src/core/lib/iomgr/is_epollexclusive_available.h"
Craig Tillerdd86b692017-04-06 10:43:11 -070044#include "src/core/lib/iomgr/lockfree_event.h"
Craig Tiller819cd882017-04-25 13:18:22 -070045#include "src/core/lib/iomgr/sys_epoll_wrapper.h"
Craig Tillerdd86b692017-04-06 10:43:11 -070046#include "src/core/lib/iomgr/timer.h"
47#include "src/core/lib/iomgr/wakeup_fd_posix.h"
Craig Tillerdd86b692017-04-06 10:43:11 -070048#include "src/core/lib/profiling/timers.h"
49#include "src/core/lib/support/block_annotate.h"
Craig Tiller0b4c9012017-04-06 17:19:37 -070050#include "src/core/lib/support/spinlock.h"
Craig Tillerdd86b692017-04-06 10:43:11 -070051
Craig Tillerdd86b692017-04-06 10:43:11 -070052/*******************************************************************************
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -070053 * Polling object
Craig Tiller8fc1ca12017-04-07 13:01:48 -070054 */
55
56typedef enum {
Craig Tillerf18286b2017-04-10 14:44:09 -070057 PO_POLLING_GROUP,
Craig Tillerd1d7fdd2017-04-14 16:16:24 -070058 PO_POLLSET_SET,
59 PO_POLLSET,
60 PO_FD, /* ordering is important: we always want to lock pollsets before fds:
61 this guarantees that using an fd as a pollable is safe */
Craig Tiller911490c2017-04-14 16:19:57 -070062 PO_EMPTY_POLLABLE,
Craig Tillerf18286b2017-04-10 14:44:09 -070063 PO_COUNT
64} polling_obj_type;
Craig Tiller8fc1ca12017-04-07 13:01:48 -070065
Craig Tillerf18286b2017-04-10 14:44:09 -070066typedef struct polling_obj polling_obj;
67typedef struct polling_group polling_group;
68
69struct polling_obj {
Craig Tiller8fc1ca12017-04-07 13:01:48 -070070 gpr_mu mu;
Craig Tillerf18286b2017-04-10 14:44:09 -070071 polling_obj_type type;
72 polling_group *group;
73 struct polling_obj *next;
74 struct polling_obj *prev;
75};
Craig Tiller8fc1ca12017-04-07 13:01:48 -070076
Craig Tillerf18286b2017-04-10 14:44:09 -070077struct polling_group {
78 polling_obj po;
79 gpr_refcount refs;
80};
81
82static void po_init(polling_obj *po, polling_obj_type type);
83static void po_destroy(polling_obj *po);
84static void po_join(grpc_exec_ctx *exec_ctx, polling_obj *a, polling_obj *b);
Craig Tillerd1d7fdd2017-04-14 16:16:24 -070085static int po_cmp(polling_obj *a, polling_obj *b);
Craig Tillerf18286b2017-04-10 14:44:09 -070086
87static void pg_create(grpc_exec_ctx *exec_ctx, polling_obj **initial_po,
88 size_t initial_po_count);
89static polling_group *pg_ref(polling_group *pg);
90static void pg_unref(polling_group *pg);
91static void pg_merge(grpc_exec_ctx *exec_ctx, polling_group *a,
92 polling_group *b);
93static void pg_join(grpc_exec_ctx *exec_ctx, polling_group *pg,
94 polling_obj *po);
Craig Tillerbd412f92017-04-07 13:45:19 -070095
Craig Tiller8fc1ca12017-04-07 13:01:48 -070096/*******************************************************************************
Craig Tillerd1d7fdd2017-04-14 16:16:24 -070097 * pollable Declarations
98 */
99
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700100typedef struct pollable {
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700101 polling_obj po;
102 int epfd;
103 grpc_wakeup_fd wakeup;
104 grpc_pollset_worker *root_worker;
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700105} pollable;
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700106
Craig Tiller86ffd652017-06-20 00:16:54 +0000107static const char *polling_obj_type_string(polling_obj_type t) {
108 switch (t) {
Craig Tiller7fd8f672017-07-05 15:10:12 -0700109 case PO_POLLING_GROUP:
110 return "polling_group";
111 case PO_POLLSET_SET:
112 return "pollset_set";
113 case PO_POLLSET:
114 return "pollset";
115 case PO_FD:
116 return "fd";
117 case PO_EMPTY_POLLABLE:
118 return "empty_pollable";
119 case PO_COUNT:
120 return "<invalid:count>";
Craig Tiller86ffd652017-06-20 00:16:54 +0000121 }
122 return "<invalid>";
123}
124
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700125static char *pollable_desc(pollable *p) {
Craig Tiller86ffd652017-06-20 00:16:54 +0000126 char *out;
Craig Tiller7fd8f672017-07-05 15:10:12 -0700127 gpr_asprintf(&out, "type=%s group=%p epfd=%d wakeup=%d",
128 polling_obj_type_string(p->po.type), p->po.group, p->epfd,
129 p->wakeup.read_fd);
Craig Tiller86ffd652017-06-20 00:16:54 +0000130 return out;
131}
132
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700133static pollable g_empty_pollable;
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700134
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700135static void pollable_init(pollable *p, polling_obj_type type);
136static void pollable_destroy(pollable *p);
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700137/* ensure that p->epfd, p->wakeup are initialized; p->po.mu must be held */
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700138static grpc_error *pollable_materialize(pollable *p);
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700139
140/*******************************************************************************
Craig Tillerdd86b692017-04-06 10:43:11 -0700141 * Fd Declarations
142 */
143
Craig Tillerdd86b692017-04-06 10:43:11 -0700144struct grpc_fd {
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700145 pollable pollable_obj;
Craig Tillerdd86b692017-04-06 10:43:11 -0700146 int fd;
147 /* refst format:
148 bit 0 : 1=Active / 0=Orphaned
149 bits 1-n : refcount
150 Ref/Unref by two to avoid altering the orphaned bit */
151 gpr_atm refst;
152
153 /* The fd is either closed or we relinquished control of it. In either
154 cases, this indicates that the 'fd' on this structure is no longer
155 valid */
Craig Tiller79d24fb2017-04-17 19:35:19 +0000156 gpr_mu orphaned_mu;
Craig Tillerdd86b692017-04-06 10:43:11 -0700157 bool orphaned;
158
159 gpr_atm read_closure;
160 gpr_atm write_closure;
161
162 struct grpc_fd *freelist_next;
163 grpc_closure *on_done_closure;
164
165 /* The pollset that last noticed that the fd is readable. The actual type
166 * stored in this is (grpc_pollset *) */
167 gpr_atm read_notifier_pollset;
168
169 grpc_iomgr_object iomgr_object;
170};
171
Craig Tillerdd86b692017-04-06 10:43:11 -0700172static void fd_global_init(void);
173static void fd_global_shutdown(void);
174
Craig Tillerdd86b692017-04-06 10:43:11 -0700175/*******************************************************************************
176 * Pollset Declarations
177 */
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700178
179typedef struct pollset_worker_link {
180 grpc_pollset_worker *next;
181 grpc_pollset_worker *prev;
182} pollset_worker_link;
183
184typedef enum {
185 PWL_POLLSET,
186 PWL_POLLABLE,
187 POLLSET_WORKER_LINK_COUNT
188} pollset_worker_links;
189
Craig Tillerdd86b692017-04-06 10:43:11 -0700190struct grpc_pollset_worker {
Craig Tillere24b24d2017-04-06 16:05:45 -0700191 bool kicked;
192 bool initialized_cv;
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700193 pollset_worker_link links[POLLSET_WORKER_LINK_COUNT];
Craig Tillere24b24d2017-04-06 16:05:45 -0700194 gpr_cv cv;
Craig Tiller9f012512017-04-13 15:37:14 -0700195 grpc_pollset *pollset;
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700196 pollable *pollable_obj;
Craig Tillerdd86b692017-04-06 10:43:11 -0700197};
198
Craig Tiller4f07ea82017-06-08 23:56:06 +0000199#define MAX_EPOLL_EVENTS 100
Craig Tillerca4fc662017-06-09 11:13:56 -0700200#define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 5
Craig Tiller4f07ea82017-06-08 23:56:06 +0000201
Craig Tillere24b24d2017-04-06 16:05:45 -0700202struct grpc_pollset {
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700203 pollable pollable_obj;
Yash Tibrewal8f798e92017-09-13 15:45:47 -0700204 pollable *current_pollable_obj;
Craig Tillerf7e07482017-05-18 10:13:53 -0700205 int kick_alls_pending;
Craig Tillerbb93af62017-04-07 23:49:00 +0000206 bool kicked_without_poller;
Craig Tillere24b24d2017-04-06 16:05:45 -0700207 grpc_closure *shutdown_closure;
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700208 grpc_pollset_worker *root_worker;
Craig Tiller4f07ea82017-06-08 23:56:06 +0000209
210 int event_cursor;
211 int event_count;
212 struct epoll_event events[MAX_EPOLL_EVENTS];
Craig Tillere24b24d2017-04-06 16:05:45 -0700213};
Craig Tillerdd86b692017-04-06 10:43:11 -0700214
215/*******************************************************************************
216 * Pollset-set Declarations
217 */
Craig Tiller8fc1ca12017-04-07 13:01:48 -0700218struct grpc_pollset_set {
Craig Tillerf18286b2017-04-10 14:44:09 -0700219 polling_obj po;
Craig Tiller8fc1ca12017-04-07 13:01:48 -0700220};
Craig Tillerdd86b692017-04-06 10:43:11 -0700221
222/*******************************************************************************
223 * Common helpers
224 */
225
Craig Tillere24b24d2017-04-06 16:05:45 -0700226static bool append_error(grpc_error **composite, grpc_error *error,
227 const char *desc) {
228 if (error == GRPC_ERROR_NONE) return true;
229 if (*composite == GRPC_ERROR_NONE) {
230 *composite = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc);
231 }
232 *composite = grpc_error_add_child(*composite, error);
233 return false;
234}
235
Craig Tillerdd86b692017-04-06 10:43:11 -0700236/*******************************************************************************
237 * Fd Definitions
238 */
239
240/* We need to keep a freelist not because of any concerns of malloc performance
241 * but instead so that implementations with multiple threads in (for example)
242 * epoll_wait deal with the race between pollset removal and incoming poll
243 * notifications.
244 *
245 * The problem is that the poller ultimately holds a reference to this
246 * object, so it is very difficult to know when is safe to free it, at least
247 * without some expensive synchronization.
248 *
249 * If we keep the object freelisted, in the worst case losing this race just
250 * becomes a spurious read notification on a reused fd.
251 */
252
253/* The alarm system needs to be able to wakeup 'some poller' sometimes
254 * (specifically when a new alarm needs to be triggered earlier than the next
255 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
256 * case occurs. */
257
258static grpc_fd *fd_freelist = NULL;
259static gpr_mu fd_freelist_mu;
260
Noah Eisen264879f2017-06-20 17:14:47 -0700261#ifndef NDEBUG
Craig Tillerdd86b692017-04-06 10:43:11 -0700262#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
Craig Tillerf8401102017-04-17 09:47:28 -0700263#define UNREF_BY(ec, fd, n, reason) \
264 unref_by(ec, fd, n, reason, __FILE__, __LINE__)
Craig Tillerdd86b692017-04-06 10:43:11 -0700265static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
266 int line) {
Noah Eisen264879f2017-06-20 17:14:47 -0700267 if (GRPC_TRACER_ON(grpc_trace_fd_refcount)) {
268 gpr_log(GPR_DEBUG,
269 "FD %d %p ref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]",
270 fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst),
271 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
272 }
Craig Tillerdd86b692017-04-06 10:43:11 -0700273#else
274#define REF_BY(fd, n, reason) ref_by(fd, n)
Craig Tiller50480b22017-04-17 16:34:52 +0000275#define UNREF_BY(ec, fd, n, reason) unref_by(ec, fd, n)
Craig Tillerdd86b692017-04-06 10:43:11 -0700276static void ref_by(grpc_fd *fd, int n) {
277#endif
278 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
279}
280
Craig Tiller50480b22017-04-17 16:34:52 +0000281static void fd_destroy(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700282 grpc_fd *fd = (grpc_fd *)arg;
Craig Tillerf8401102017-04-17 09:47:28 -0700283 /* Add the fd to the freelist */
284 grpc_iomgr_unregister_object(&fd->iomgr_object);
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700285 pollable_destroy(&fd->pollable_obj);
Craig Tiller79d24fb2017-04-17 19:35:19 +0000286 gpr_mu_destroy(&fd->orphaned_mu);
Craig Tillerf8401102017-04-17 09:47:28 -0700287 gpr_mu_lock(&fd_freelist_mu);
288 fd->freelist_next = fd_freelist;
289 fd_freelist = fd;
Craig Tillerdd86b692017-04-06 10:43:11 -0700290
Craig Tillerf8401102017-04-17 09:47:28 -0700291 grpc_lfev_destroy(&fd->read_closure);
292 grpc_lfev_destroy(&fd->write_closure);
Craig Tillerdd86b692017-04-06 10:43:11 -0700293
Craig Tillerf8401102017-04-17 09:47:28 -0700294 gpr_mu_unlock(&fd_freelist_mu);
Craig Tiller50480b22017-04-17 16:34:52 +0000295}
296
Noah Eisen264879f2017-06-20 17:14:47 -0700297#ifndef NDEBUG
Craig Tillerf8401102017-04-17 09:47:28 -0700298static void unref_by(grpc_exec_ctx *exec_ctx, grpc_fd *fd, int n,
299 const char *reason, const char *file, int line) {
Noah Eisen264879f2017-06-20 17:14:47 -0700300 if (GRPC_TRACER_ON(grpc_trace_fd_refcount)) {
301 gpr_log(GPR_DEBUG,
302 "FD %d %p unref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]",
303 fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst),
304 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
305 }
Craig Tiller50480b22017-04-17 16:34:52 +0000306#else
307static void unref_by(grpc_exec_ctx *exec_ctx, grpc_fd *fd, int n) {
Craig Tiller50480b22017-04-17 16:34:52 +0000308#endif
Noah Eisen264879f2017-06-20 17:14:47 -0700309 gpr_atm old = gpr_atm_full_fetch_add(&fd->refst, -n);
Craig Tiller50480b22017-04-17 16:34:52 +0000310 if (old == n) {
ncteisen274bbbe2017-06-08 14:57:11 -0700311 GRPC_CLOSURE_SCHED(exec_ctx, GRPC_CLOSURE_CREATE(fd_destroy, fd,
Craig Tillerf8401102017-04-17 09:47:28 -0700312 grpc_schedule_on_exec_ctx),
313 GRPC_ERROR_NONE);
Craig Tillerdd86b692017-04-06 10:43:11 -0700314 } else {
315 GPR_ASSERT(old > n);
316 }
317}
318
319static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
320
321static void fd_global_shutdown(void) {
322 gpr_mu_lock(&fd_freelist_mu);
323 gpr_mu_unlock(&fd_freelist_mu);
324 while (fd_freelist != NULL) {
325 grpc_fd *fd = fd_freelist;
326 fd_freelist = fd_freelist->freelist_next;
Craig Tillerdd86b692017-04-06 10:43:11 -0700327 gpr_free(fd);
328 }
329 gpr_mu_destroy(&fd_freelist_mu);
330}
331
332static grpc_fd *fd_create(int fd, const char *name) {
333 grpc_fd *new_fd = NULL;
334
335 gpr_mu_lock(&fd_freelist_mu);
336 if (fd_freelist != NULL) {
337 new_fd = fd_freelist;
338 fd_freelist = fd_freelist->freelist_next;
339 }
340 gpr_mu_unlock(&fd_freelist_mu);
341
342 if (new_fd == NULL) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700343 new_fd = (grpc_fd *)gpr_malloc(sizeof(grpc_fd));
Craig Tillerdd86b692017-04-06 10:43:11 -0700344 }
345
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700346 pollable_init(&new_fd->pollable_obj, PO_FD);
Craig Tillerdd86b692017-04-06 10:43:11 -0700347
348 gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
349 new_fd->fd = fd;
Craig Tiller79d24fb2017-04-17 19:35:19 +0000350 gpr_mu_init(&new_fd->orphaned_mu);
Craig Tillerdd86b692017-04-06 10:43:11 -0700351 new_fd->orphaned = false;
352 grpc_lfev_init(&new_fd->read_closure);
353 grpc_lfev_init(&new_fd->write_closure);
354 gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
355
356 new_fd->freelist_next = NULL;
357 new_fd->on_done_closure = NULL;
358
Craig Tillerdd86b692017-04-06 10:43:11 -0700359 char *fd_name;
360 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
361 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
Noah Eisen264879f2017-06-20 17:14:47 -0700362#ifndef NDEBUG
363 if (GRPC_TRACER_ON(grpc_trace_fd_refcount)) {
364 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, new_fd, fd_name);
365 }
Craig Tillerdd86b692017-04-06 10:43:11 -0700366#endif
367 gpr_free(fd_name);
368 return new_fd;
369}
370
371static int fd_wrapped_fd(grpc_fd *fd) {
372 int ret_fd = -1;
Craig Tiller79d24fb2017-04-17 19:35:19 +0000373 gpr_mu_lock(&fd->orphaned_mu);
Craig Tillerdd86b692017-04-06 10:43:11 -0700374 if (!fd->orphaned) {
375 ret_fd = fd->fd;
376 }
Craig Tiller79d24fb2017-04-17 19:35:19 +0000377 gpr_mu_unlock(&fd->orphaned_mu);
Craig Tillerdd86b692017-04-06 10:43:11 -0700378
379 return ret_fd;
380}
381
382static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
383 grpc_closure *on_done, int *release_fd,
Yuchen Zengd40a7ae2017-07-12 15:59:56 -0700384 bool already_closed, const char *reason) {
385 bool is_fd_closed = already_closed;
Craig Tillerdd86b692017-04-06 10:43:11 -0700386 grpc_error *error = GRPC_ERROR_NONE;
387
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700388 gpr_mu_lock(&fd->pollable_obj.po.mu);
Craig Tiller79d24fb2017-04-17 19:35:19 +0000389 gpr_mu_lock(&fd->orphaned_mu);
Craig Tillerdd86b692017-04-06 10:43:11 -0700390 fd->on_done_closure = on_done;
391
392 /* If release_fd is not NULL, we should be relinquishing control of the file
393 descriptor fd->fd (but we still own the grpc_fd structure). */
394 if (release_fd != NULL) {
395 *release_fd = fd->fd;
Yuchen Zengd40a7ae2017-07-12 15:59:56 -0700396 } else if (!is_fd_closed) {
Craig Tillerdd86b692017-04-06 10:43:11 -0700397 close(fd->fd);
398 is_fd_closed = true;
399 }
400
401 fd->orphaned = true;
402
Craig Tillerf4360d72017-04-07 08:51:00 -0700403 if (!is_fd_closed) {
404 gpr_log(GPR_DEBUG, "TODO: handle fd removal?");
405 }
406
Craig Tillerdd86b692017-04-06 10:43:11 -0700407 /* Remove the active status but keep referenced. We want this grpc_fd struct
408 to be alive (and not added to freelist) until the end of this function */
409 REF_BY(fd, 1, reason);
410
ncteisen274bbbe2017-06-08 14:57:11 -0700411 GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error));
Craig Tillerdd86b692017-04-06 10:43:11 -0700412
Craig Tiller79d24fb2017-04-17 19:35:19 +0000413 gpr_mu_unlock(&fd->orphaned_mu);
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700414 gpr_mu_unlock(&fd->pollable_obj.po.mu);
Craig Tiller50480b22017-04-17 16:34:52 +0000415 UNREF_BY(exec_ctx, fd, 2, reason); /* Drop the reference */
Craig Tillerdd86b692017-04-06 10:43:11 -0700416 GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error));
417 GRPC_ERROR_UNREF(error);
418}
419
420static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
421 grpc_fd *fd) {
422 gpr_atm notifier = gpr_atm_acq_load(&fd->read_notifier_pollset);
423 return (grpc_pollset *)notifier;
424}
425
426static bool fd_is_shutdown(grpc_fd *fd) {
427 return grpc_lfev_is_shutdown(&fd->read_closure);
428}
429
430/* Might be called multiple times */
431static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
432 if (grpc_lfev_set_shutdown(exec_ctx, &fd->read_closure,
433 GRPC_ERROR_REF(why))) {
434 shutdown(fd->fd, SHUT_RDWR);
435 grpc_lfev_set_shutdown(exec_ctx, &fd->write_closure, GRPC_ERROR_REF(why));
436 }
437 GRPC_ERROR_UNREF(why);
438}
439
440static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
441 grpc_closure *closure) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700442 grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read");
Craig Tillerdd86b692017-04-06 10:43:11 -0700443}
444
445static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
446 grpc_closure *closure) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700447 grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write");
Craig Tillerdd86b692017-04-06 10:43:11 -0700448}
449
Craig Tillerdd86b692017-04-06 10:43:11 -0700450/*******************************************************************************
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700451 * Pollable Definitions
452 */
453
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700454static void pollable_init(pollable *p, polling_obj_type type) {
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700455 po_init(&p->po, type);
456 p->root_worker = NULL;
457 p->epfd = -1;
458}
459
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700460static void pollable_destroy(pollable *p) {
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700461 po_destroy(&p->po);
Craig Tiller79d24fb2017-04-17 19:35:19 +0000462 if (p->epfd != -1) {
463 close(p->epfd);
464 grpc_wakeup_fd_destroy(&p->wakeup);
465 }
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700466}
467
468/* ensure that p->epfd, p->wakeup are initialized; p->po.mu must be held */
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700469static grpc_error *pollable_materialize(pollable *p) {
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700470 if (p->epfd == -1) {
471 int new_epfd = epoll_create1(EPOLL_CLOEXEC);
472 if (new_epfd < 0) {
473 return GRPC_OS_ERROR(errno, "epoll_create1");
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700474 }
475 grpc_error *err = grpc_wakeup_fd_init(&p->wakeup);
476 if (err != GRPC_ERROR_NONE) {
477 close(new_epfd);
478 return err;
479 }
Yash Tibrewal533d1182017-09-18 10:48:22 -0700480 struct epoll_event ev;
481 ev.events = (uint32_t)(EPOLLIN | EPOLLET);
482 ev.data.ptr = (void *)(1 | (intptr_t)&p->wakeup);
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700483 if (epoll_ctl(new_epfd, EPOLL_CTL_ADD, p->wakeup.read_fd, &ev) != 0) {
484 err = GRPC_OS_ERROR(errno, "epoll_ctl");
485 close(new_epfd);
486 grpc_wakeup_fd_destroy(&p->wakeup);
487 return err;
488 }
489
490 p->epfd = new_epfd;
491 }
492 return GRPC_ERROR_NONE;
493}
494
495/* pollable must be materialized */
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700496static grpc_error *pollable_add_fd(pollable *p, grpc_fd *fd) {
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700497 grpc_error *error = GRPC_ERROR_NONE;
498 static const char *err_desc = "pollable_add_fd";
499 const int epfd = p->epfd;
Craig Tiller79d24fb2017-04-17 19:35:19 +0000500 GPR_ASSERT(epfd != -1);
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700501
Craig Tiller6c8383a2017-05-05 16:54:42 +0000502 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller86ffd652017-06-20 00:16:54 +0000503 gpr_log(GPR_DEBUG, "add fd %p (%d) to pollable %p", fd, fd->fd, p);
Craig Tiller6c8383a2017-05-05 16:54:42 +0000504 }
505
Craig Tiller79d24fb2017-04-17 19:35:19 +0000506 gpr_mu_lock(&fd->orphaned_mu);
507 if (fd->orphaned) {
508 gpr_mu_unlock(&fd->orphaned_mu);
509 return GRPC_ERROR_NONE;
510 }
Yash Tibrewal533d1182017-09-18 10:48:22 -0700511 struct epoll_event ev_fd;
512 ev_fd.events = (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT | EPOLLEXCLUSIVE);
513 ev_fd.data.ptr = fd;
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700514 if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd->fd, &ev_fd) != 0) {
515 switch (errno) {
Craig Tiller61f96c12017-05-12 13:36:39 -0700516 case EEXIST:
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700517 break;
518 default:
519 append_error(&error, GRPC_OS_ERROR(errno, "epoll_ctl"), err_desc);
520 }
521 }
Craig Tiller79d24fb2017-04-17 19:35:19 +0000522 gpr_mu_unlock(&fd->orphaned_mu);
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700523
524 return error;
525}
526
527/*******************************************************************************
Craig Tillerdd86b692017-04-06 10:43:11 -0700528 * Pollset Definitions
529 */
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700530
Craig Tillerdd86b692017-04-06 10:43:11 -0700531GPR_TLS_DECL(g_current_thread_pollset);
532GPR_TLS_DECL(g_current_thread_worker);
533
Craig Tillerdd86b692017-04-06 10:43:11 -0700534/* Global state management */
535static grpc_error *pollset_global_init(void) {
536 gpr_tls_init(&g_current_thread_pollset);
537 gpr_tls_init(&g_current_thread_worker);
Craig Tiller911490c2017-04-14 16:19:57 -0700538 pollable_init(&g_empty_pollable, PO_EMPTY_POLLABLE);
Craig Tillercfaa0462017-05-05 15:27:40 +0000539 return GRPC_ERROR_NONE;
Craig Tillerdd86b692017-04-06 10:43:11 -0700540}
541
542static void pollset_global_shutdown(void) {
Craig Tiller911490c2017-04-14 16:19:57 -0700543 pollable_destroy(&g_empty_pollable);
Craig Tillerdd86b692017-04-06 10:43:11 -0700544 gpr_tls_destroy(&g_current_thread_pollset);
545 gpr_tls_destroy(&g_current_thread_worker);
546}
547
Craig Tillere16722c2017-05-17 21:50:37 +0000548static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx,
549 grpc_pollset *pollset) {
Craig Tillerf7e07482017-05-18 10:13:53 -0700550 if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL &&
551 pollset->kick_alls_pending == 0) {
Craig Tillerca669b02017-06-09 12:41:51 -0700552 GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE);
Craig Tillere16722c2017-05-17 21:50:37 +0000553 pollset->shutdown_closure = NULL;
554 }
555}
556
Craig Tillerf7e07482017-05-18 10:13:53 -0700557static void do_kick_all(grpc_exec_ctx *exec_ctx, void *arg,
558 grpc_error *error_unused) {
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700559 grpc_error *error = GRPC_ERROR_NONE;
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700560 grpc_pollset *pollset = (grpc_pollset *)arg;
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700561 gpr_mu_lock(&pollset->pollable_obj.po.mu);
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700562 if (pollset->root_worker != NULL) {
563 grpc_pollset_worker *worker = pollset->root_worker;
564 do {
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700565 if (worker->pollable_obj != &pollset->pollable_obj) {
566 gpr_mu_lock(&worker->pollable_obj->po.mu);
Craig Tillerc8d9b9c2017-04-21 16:41:34 +0000567 }
Craig Tiller86ffd652017-06-20 00:16:54 +0000568 if (worker->initialized_cv && worker != pollset->root_worker) {
569 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller7fd8f672017-07-05 15:10:12 -0700570 gpr_log(GPR_DEBUG, "PS:%p kickall_via_cv %p (pollable %p vs %p)",
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700571 pollset, worker, &pollset->pollable_obj,
572 worker->pollable_obj);
Craig Tiller86ffd652017-06-20 00:16:54 +0000573 }
Craig Tillere3a69332017-04-18 16:45:40 +0000574 worker->kicked = true;
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700575 gpr_cv_signal(&worker->cv);
576 } else {
Craig Tiller86ffd652017-06-20 00:16:54 +0000577 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller7fd8f672017-07-05 15:10:12 -0700578 gpr_log(GPR_DEBUG, "PS:%p kickall_via_wakeup %p (pollable %p vs %p)",
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700579 pollset, worker, &pollset->pollable_obj,
580 worker->pollable_obj);
Craig Tiller86ffd652017-06-20 00:16:54 +0000581 }
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700582 append_error(&error,
583 grpc_wakeup_fd_wakeup(&worker->pollable_obj->wakeup),
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700584 "pollset_shutdown");
585 }
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700586 if (worker->pollable_obj != &pollset->pollable_obj) {
587 gpr_mu_unlock(&worker->pollable_obj->po.mu);
Craig Tillerc8d9b9c2017-04-21 16:41:34 +0000588 }
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700589
590 worker = worker->links[PWL_POLLSET].next;
591 } while (worker != pollset->root_worker);
592 }
Craig Tillerf7e07482017-05-18 10:13:53 -0700593 pollset->kick_alls_pending--;
594 pollset_maybe_finish_shutdown(exec_ctx, pollset);
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700595 gpr_mu_unlock(&pollset->pollable_obj.po.mu);
Craig Tillere16722c2017-05-17 21:50:37 +0000596 GRPC_LOG_IF_ERROR("kick_all", error);
597}
598
599static void pollset_kick_all(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
Craig Tillerf7e07482017-05-18 10:13:53 -0700600 pollset->kick_alls_pending++;
Craig Tillerca669b02017-06-09 12:41:51 -0700601 GRPC_CLOSURE_SCHED(exec_ctx, GRPC_CLOSURE_CREATE(do_kick_all, pollset,
Craig Tillerf7e07482017-05-18 10:13:53 -0700602 grpc_schedule_on_exec_ctx),
603 GRPC_ERROR_NONE);
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700604}
605
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700606static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable *p,
Craig Tillerf8401102017-04-17 09:47:28 -0700607 grpc_pollset_worker *specific_worker) {
Craig Tillercfaa0462017-05-05 15:27:40 +0000608 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tillercd0354b2017-04-10 16:19:18 -0700609 gpr_log(GPR_DEBUG,
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700610 "PS:%p kick %p tls_pollset=%p tls_worker=%p "
611 "root_worker=(pollset:%p pollable:%p)",
Craig Tillercd0354b2017-04-10 16:19:18 -0700612 p, specific_worker, (void *)gpr_tls_get(&g_current_thread_pollset),
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700613 (void *)gpr_tls_get(&g_current_thread_worker), pollset->root_worker,
Craig Tillercd0354b2017-04-10 16:19:18 -0700614 p->root_worker);
615 }
Craig Tillere24b24d2017-04-06 16:05:45 -0700616 if (specific_worker == NULL) {
Craig Tillerb72a74a2017-04-27 12:07:05 -0700617 if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) {
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700618 if (pollset->root_worker == NULL) {
Craig Tillercfaa0462017-05-05 15:27:40 +0000619 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700620 gpr_log(GPR_DEBUG, "PS:%p kicked_any_without_poller", p);
Craig Tiller1ad94772017-04-11 16:15:19 +0000621 }
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700622 pollset->kicked_without_poller = true;
Craig Tillerbb93af62017-04-07 23:49:00 +0000623 return GRPC_ERROR_NONE;
624 } else {
Craig Tillercfaa0462017-05-05 15:27:40 +0000625 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700626 gpr_log(GPR_DEBUG, "PS:%p kicked_any_via_wakeup_fd", p);
Craig Tiller1ad94772017-04-11 16:15:19 +0000627 }
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700628 grpc_error *err = pollable_materialize(p);
629 if (err != GRPC_ERROR_NONE) return err;
630 return grpc_wakeup_fd_wakeup(&p->wakeup);
Craig Tillerbb93af62017-04-07 23:49:00 +0000631 }
Craig Tillere24b24d2017-04-06 16:05:45 -0700632 } else {
Craig Tillercfaa0462017-05-05 15:27:40 +0000633 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700634 gpr_log(GPR_DEBUG, "PS:%p kicked_any_but_awake", p);
Craig Tiller1ad94772017-04-11 16:15:19 +0000635 }
Craig Tillere24b24d2017-04-06 16:05:45 -0700636 return GRPC_ERROR_NONE;
637 }
Craig Tillerd37d1422017-04-18 19:41:07 +0000638 } else if (specific_worker->kicked) {
Craig Tillercfaa0462017-05-05 15:27:40 +0000639 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tillerd37d1422017-04-18 19:41:07 +0000640 gpr_log(GPR_DEBUG, "PS:%p kicked_specific_but_already_kicked", p);
641 }
642 return GRPC_ERROR_NONE;
Craig Tillere24b24d2017-04-06 16:05:45 -0700643 } else if (gpr_tls_get(&g_current_thread_worker) ==
644 (intptr_t)specific_worker) {
Craig Tillercfaa0462017-05-05 15:27:40 +0000645 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700646 gpr_log(GPR_DEBUG, "PS:%p kicked_specific_but_awake", p);
Craig Tiller1ad94772017-04-11 16:15:19 +0000647 }
Craig Tillerd37d1422017-04-18 19:41:07 +0000648 specific_worker->kicked = true;
Craig Tillere24b24d2017-04-06 16:05:45 -0700649 return GRPC_ERROR_NONE;
650 } else if (specific_worker == p->root_worker) {
Craig Tillercfaa0462017-05-05 15:27:40 +0000651 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700652 gpr_log(GPR_DEBUG, "PS:%p kicked_specific_via_wakeup_fd", p);
Craig Tiller1ad94772017-04-11 16:15:19 +0000653 }
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700654 grpc_error *err = pollable_materialize(p);
655 if (err != GRPC_ERROR_NONE) return err;
Craig Tillerd37d1422017-04-18 19:41:07 +0000656 specific_worker->kicked = true;
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700657 return grpc_wakeup_fd_wakeup(&p->wakeup);
Craig Tillere24b24d2017-04-06 16:05:45 -0700658 } else {
Craig Tillercfaa0462017-05-05 15:27:40 +0000659 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700660 gpr_log(GPR_DEBUG, "PS:%p kicked_specific_via_cv", p);
Craig Tiller1ad94772017-04-11 16:15:19 +0000661 }
Craig Tillere3a69332017-04-18 16:45:40 +0000662 specific_worker->kicked = true;
Craig Tillere24b24d2017-04-06 16:05:45 -0700663 gpr_cv_signal(&specific_worker->cv);
664 return GRPC_ERROR_NONE;
665 }
Craig Tiller26017de2017-04-15 00:07:25 +0000666}
667
668/* p->po.mu must be held before calling this function */
669static grpc_error *pollset_kick(grpc_pollset *pollset,
670 grpc_pollset_worker *specific_worker) {
Yash Tibrewal8f798e92017-09-13 15:45:47 -0700671 pollable *p = pollset->current_pollable_obj;
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700672 if (p != &pollset->pollable_obj) {
Craig Tiller26017de2017-04-15 00:07:25 +0000673 gpr_mu_lock(&p->po.mu);
674 }
675 grpc_error *error = pollset_kick_inner(pollset, p, specific_worker);
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700676 if (p != &pollset->pollable_obj) {
Craig Tiller26017de2017-04-15 00:07:25 +0000677 gpr_mu_unlock(&p->po.mu);
678 }
679 return error;
Craig Tillerdd86b692017-04-06 10:43:11 -0700680}
681
Craig Tillere24b24d2017-04-06 16:05:45 -0700682static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700683 pollable_init(&pollset->pollable_obj, PO_POLLSET);
Yash Tibrewal8f798e92017-09-13 15:45:47 -0700684 pollset->current_pollable_obj = &g_empty_pollable;
Craig Tillerbb93af62017-04-07 23:49:00 +0000685 pollset->kicked_without_poller = false;
Craig Tillere24b24d2017-04-06 16:05:45 -0700686 pollset->shutdown_closure = NULL;
Craig Tillere24b24d2017-04-06 16:05:45 -0700687 pollset->root_worker = NULL;
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700688 *mu = &pollset->pollable_obj.po.mu;
Craig Tiller9f012512017-04-13 15:37:14 -0700689}
690
Craig Tillerdd86b692017-04-06 10:43:11 -0700691/* Convert a timespec to milliseconds:
692 - Very small or negative poll times are clamped to zero to do a non-blocking
693 poll (which becomes spin polling)
694 - Other small values are rounded up to one millisecond
695 - Longer than a millisecond polls are rounded up to the next nearest
696 millisecond to avoid spinning
697 - Infinite timeouts are converted to -1 */
698static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
699 gpr_timespec now) {
700 gpr_timespec timeout;
Craig Tillerdd86b692017-04-06 10:43:11 -0700701 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
702 return -1;
703 }
704
Craig Tiller7f5fac92017-04-07 21:47:41 +0000705 if (gpr_time_cmp(deadline, now) <= 0) {
Craig Tillerdd86b692017-04-06 10:43:11 -0700706 return 0;
707 }
Craig Tiller7f5fac92017-04-07 21:47:41 +0000708
Yash Tibrewal533d1182017-09-18 10:48:22 -0700709 static const gpr_timespec round_up = {0, GPR_NS_PER_MS - 1, GPR_TIMESPAN};
Craig Tillerdd86b692017-04-06 10:43:11 -0700710 timeout = gpr_time_sub(deadline, now);
Craig Tiller7f5fac92017-04-07 21:47:41 +0000711 int millis = gpr_time_to_millis(gpr_time_add(timeout, round_up));
Craig Tillerdd86b692017-04-06 10:43:11 -0700712 return millis >= 1 ? millis : 1;
713}
714
715static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
716 grpc_pollset *notifier) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700717 grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read");
Craig Tillerdd86b692017-04-06 10:43:11 -0700718
719 /* Note, it is possible that fd_become_readable might be called twice with
720 different 'notifier's when an fd becomes readable and it is in two epoll
721 sets (This can happen briefly during polling island merges). In such cases
722 it does not really matter which notifer is set as the read_notifier_pollset
723 (They would both point to the same polling island anyway) */
724 /* Use release store to match with acquire load in fd_get_read_notifier */
725 gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
726}
727
728static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700729 grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write");
Craig Tillerdd86b692017-04-06 10:43:11 -0700730}
731
Craig Tiller50480b22017-04-17 16:34:52 +0000732static grpc_error *fd_become_pollable_locked(grpc_fd *fd) {
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700733 grpc_error *error = GRPC_ERROR_NONE;
734 static const char *err_desc = "fd_become_pollable";
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700735 if (append_error(&error, pollable_materialize(&fd->pollable_obj), err_desc)) {
736 append_error(&error, pollable_add_fd(&fd->pollable_obj, fd), err_desc);
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700737 }
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700738 return error;
739}
740
Craig Tillerdd86b692017-04-06 10:43:11 -0700741/* pollset->po.mu lock must be held by the caller before calling this */
742static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
Craig Tillere24b24d2017-04-06 16:05:45 -0700743 grpc_closure *closure) {
744 GPR_ASSERT(pollset->shutdown_closure == NULL);
745 pollset->shutdown_closure = closure;
Craig Tillere16722c2017-05-17 21:50:37 +0000746 pollset_kick_all(exec_ctx, pollset);
Craig Tillerd9cd8f02017-04-07 16:26:02 -0700747 pollset_maybe_finish_shutdown(exec_ctx, pollset);
Craig Tillere24b24d2017-04-06 16:05:45 -0700748}
Craig Tillerdd86b692017-04-06 10:43:11 -0700749
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700750static bool pollset_is_pollable_fd(grpc_pollset *pollset, pollable *p) {
751 return p != &g_empty_pollable && p != &pollset->pollable_obj;
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700752}
753
Craig Tiller9bedddd2017-06-08 17:05:00 -0700754static grpc_error *pollset_process_events(grpc_exec_ctx *exec_ctx,
755 grpc_pollset *pollset, bool drain) {
Craig Tiller4f07ea82017-06-08 23:56:06 +0000756 static const char *err_desc = "pollset_process_events";
Craig Tillere24b24d2017-04-06 16:05:45 -0700757 grpc_error *error = GRPC_ERROR_NONE;
Craig Tillerca4fc662017-06-09 11:13:56 -0700758 for (int i = 0; (drain || i < MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL) &&
759 pollset->event_cursor != pollset->event_count;
760 i++) {
Craig Tiller4f07ea82017-06-08 23:56:06 +0000761 int n = pollset->event_cursor++;
762 struct epoll_event *ev = &pollset->events[n];
763 void *data_ptr = ev->data.ptr;
764 if (1 & (intptr_t)data_ptr) {
Craig Tillercfaa0462017-05-05 15:27:40 +0000765 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller4f07ea82017-06-08 23:56:06 +0000766 gpr_log(GPR_DEBUG, "PS:%p got pollset_wakeup %p", pollset, data_ptr);
Craig Tillercd0354b2017-04-10 16:19:18 -0700767 }
Yash Tibrewal52778c42017-09-11 15:00:11 -0700768 append_error(&error,
769 grpc_wakeup_fd_consume_wakeup(
770 (grpc_wakeup_fd *)((~(intptr_t)1) & (intptr_t)data_ptr)),
Craig Tiller9bedddd2017-06-08 17:05:00 -0700771 err_desc);
Craig Tillere24b24d2017-04-06 16:05:45 -0700772 } else {
Craig Tiller61f96c12017-05-12 13:36:39 -0700773 grpc_fd *fd = (grpc_fd *)data_ptr;
Craig Tiller4f07ea82017-06-08 23:56:06 +0000774 bool cancel = (ev->events & (EPOLLERR | EPOLLHUP)) != 0;
775 bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0;
776 bool write_ev = (ev->events & EPOLLOUT) != 0;
Craig Tillercfaa0462017-05-05 15:27:40 +0000777 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tillere3a69332017-04-18 16:45:40 +0000778 gpr_log(GPR_DEBUG,
Craig Tiller4f07ea82017-06-08 23:56:06 +0000779 "PS:%p got fd %p: cancel=%d read=%d "
Craig Tillere3a69332017-04-18 16:45:40 +0000780 "write=%d",
Craig Tiller4f07ea82017-06-08 23:56:06 +0000781 pollset, fd, cancel, read_ev, write_ev);
Craig Tillercd0354b2017-04-10 16:19:18 -0700782 }
Craig Tiller61f96c12017-05-12 13:36:39 -0700783 if (read_ev || cancel) {
784 fd_become_readable(exec_ctx, fd, pollset);
785 }
786 if (write_ev || cancel) {
787 fd_become_writable(exec_ctx, fd);
Craig Tillere24b24d2017-04-06 16:05:45 -0700788 }
789 }
790 }
791
792 return error;
793}
794
Craig Tiller4f07ea82017-06-08 23:56:06 +0000795/* pollset_shutdown is guaranteed to be called before pollset_destroy. */
796static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700797 pollable_destroy(&pollset->pollable_obj);
Yash Tibrewal8f798e92017-09-13 15:45:47 -0700798 if (pollset_is_pollable_fd(pollset, pollset->current_pollable_obj)) {
799 UNREF_BY(exec_ctx, (grpc_fd *)pollset->current_pollable_obj, 2,
Craig Tiller4f07ea82017-06-08 23:56:06 +0000800 "pollset_pollable");
801 }
Craig Tiller9bedddd2017-06-08 17:05:00 -0700802 GRPC_LOG_IF_ERROR("pollset_process_events",
803 pollset_process_events(exec_ctx, pollset, true));
Craig Tiller4f07ea82017-06-08 23:56:06 +0000804}
805
806static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700807 pollable *p, gpr_timespec now,
Craig Tiller4f07ea82017-06-08 23:56:06 +0000808 gpr_timespec deadline) {
809 int timeout = poll_deadline_to_millis_timeout(deadline, now);
810
811 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller86ffd652017-06-20 00:16:54 +0000812 char *desc = pollable_desc(p);
813 gpr_log(GPR_DEBUG, "PS:%p poll %p[%s] for %dms", pollset, p, desc, timeout);
814 gpr_free(desc);
Craig Tiller4f07ea82017-06-08 23:56:06 +0000815 }
816
817 if (timeout != 0) {
818 GRPC_SCHEDULING_START_BLOCKING_REGION;
819 }
820 int r;
821 do {
Craig Tillerb4bb1cd2017-07-20 14:18:17 -0700822 GRPC_STATS_INC_SYSCALL_POLL(exec_ctx);
Craig Tiller4f07ea82017-06-08 23:56:06 +0000823 r = epoll_wait(p->epfd, pollset->events, MAX_EPOLL_EVENTS, timeout);
824 } while (r < 0 && errno == EINTR);
825 if (timeout != 0) {
826 GRPC_SCHEDULING_END_BLOCKING_REGION;
827 }
828
829 if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
830
831 if (GRPC_TRACER_ON(grpc_polling_trace)) {
832 gpr_log(GPR_DEBUG, "PS:%p poll %p got %d events", pollset, p, r);
833 }
834
835 pollset->event_cursor = 0;
836 pollset->event_count = r;
837
838 return GRPC_ERROR_NONE;
839}
840
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700841/* Return true if first in list */
842static bool worker_insert(grpc_pollset_worker **root, pollset_worker_links link,
843 grpc_pollset_worker *worker) {
844 if (*root == NULL) {
845 *root = worker;
846 worker->links[link].next = worker->links[link].prev = worker;
847 return true;
848 } else {
849 worker->links[link].next = *root;
850 worker->links[link].prev = worker->links[link].next->links[link].prev;
851 worker->links[link].next->links[link].prev = worker;
852 worker->links[link].prev->links[link].next = worker;
853 return false;
854 }
855}
856
857/* Return true if last in list */
858typedef enum { EMPTIED, NEW_ROOT, REMOVED } worker_remove_result;
859
860static worker_remove_result worker_remove(grpc_pollset_worker **root,
861 pollset_worker_links link,
862 grpc_pollset_worker *worker) {
863 if (worker == *root) {
864 if (worker == worker->links[link].next) {
865 *root = NULL;
866 return EMPTIED;
867 } else {
868 *root = worker->links[link].next;
869 worker->links[link].prev->links[link].next = worker->links[link].next;
870 worker->links[link].next->links[link].prev = worker->links[link].prev;
871 return NEW_ROOT;
872 }
873 } else {
874 worker->links[link].prev->links[link].next = worker->links[link].next;
875 worker->links[link].next->links[link].prev = worker->links[link].prev;
876 return REMOVED;
877 }
878}
879
Craig Tillere24b24d2017-04-06 16:05:45 -0700880/* Return true if this thread should poll */
881static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
Craig Tillere3a69332017-04-18 16:45:40 +0000882 grpc_pollset_worker **worker_hdl, gpr_timespec *now,
Craig Tillere24b24d2017-04-06 16:05:45 -0700883 gpr_timespec deadline) {
Craig Tillere3a69332017-04-18 16:45:40 +0000884 bool do_poll = true;
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700885 if (worker_hdl != NULL) *worker_hdl = worker;
Craig Tiller9f012512017-04-13 15:37:14 -0700886 worker->initialized_cv = false;
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700887 worker->kicked = false;
888 worker->pollset = pollset;
Yash Tibrewal8f798e92017-09-13 15:45:47 -0700889 worker->pollable_obj = pollset->current_pollable_obj;
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700890
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700891 if (pollset_is_pollable_fd(pollset, worker->pollable_obj)) {
892 REF_BY((grpc_fd *)worker->pollable_obj, 2, "one_poll");
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700893 }
894
895 worker_insert(&pollset->root_worker, PWL_POLLSET, worker);
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700896 if (!worker_insert(&worker->pollable_obj->root_worker, PWL_POLLABLE,
897 worker)) {
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700898 worker->initialized_cv = true;
899 gpr_cv_init(&worker->cv);
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700900 if (worker->pollable_obj != &pollset->pollable_obj) {
901 gpr_mu_unlock(&pollset->pollable_obj.po.mu);
Craig Tillere3a69332017-04-18 16:45:40 +0000902 }
Craig Tiller6c8383a2017-05-05 16:54:42 +0000903 if (GRPC_TRACER_ON(grpc_polling_trace) &&
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700904 worker->pollable_obj->root_worker != worker) {
Craig Tillere3a69332017-04-18 16:45:40 +0000905 gpr_log(GPR_DEBUG, "PS:%p wait %p w=%p for %dms", pollset,
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700906 worker->pollable_obj, worker,
Craig Tillere3a69332017-04-18 16:45:40 +0000907 poll_deadline_to_millis_timeout(deadline, *now));
908 }
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700909 while (do_poll && worker->pollable_obj->root_worker != worker) {
910 if (gpr_cv_wait(&worker->cv, &worker->pollable_obj->po.mu, deadline)) {
Craig Tillercfaa0462017-05-05 15:27:40 +0000911 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tillere3a69332017-04-18 16:45:40 +0000912 gpr_log(GPR_DEBUG, "PS:%p timeout_wait %p w=%p", pollset,
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700913 worker->pollable_obj, worker);
Craig Tillere3a69332017-04-18 16:45:40 +0000914 }
915 do_poll = false;
916 } else if (worker->kicked) {
Craig Tillercfaa0462017-05-05 15:27:40 +0000917 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700918 gpr_log(GPR_DEBUG, "PS:%p wakeup %p w=%p", pollset,
919 worker->pollable_obj, worker);
Craig Tillere3a69332017-04-18 16:45:40 +0000920 }
921 do_poll = false;
Craig Tillercfaa0462017-05-05 15:27:40 +0000922 } else if (GRPC_TRACER_ON(grpc_polling_trace) &&
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700923 worker->pollable_obj->root_worker != worker) {
Craig Tillere3a69332017-04-18 16:45:40 +0000924 gpr_log(GPR_DEBUG, "PS:%p spurious_wakeup %p w=%p", pollset,
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700925 worker->pollable_obj, worker);
Craig Tillere24b24d2017-04-06 16:05:45 -0700926 }
927 }
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700928 if (worker->pollable_obj != &pollset->pollable_obj) {
929 gpr_mu_unlock(&worker->pollable_obj->po.mu);
930 gpr_mu_lock(&pollset->pollable_obj.po.mu);
931 gpr_mu_lock(&worker->pollable_obj->po.mu);
Craig Tillere3a69332017-04-18 16:45:40 +0000932 }
933 *now = gpr_now(now->clock_type);
Craig Tillere24b24d2017-04-06 16:05:45 -0700934 }
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700935
Craig Tillere3a69332017-04-18 16:45:40 +0000936 return do_poll && pollset->shutdown_closure == NULL &&
Yash Tibrewal8f798e92017-09-13 15:45:47 -0700937 pollset->current_pollable_obj == worker->pollable_obj;
Craig Tillere24b24d2017-04-06 16:05:45 -0700938}
939
Craig Tillerf8401102017-04-17 09:47:28 -0700940static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
941 grpc_pollset_worker *worker,
Craig Tillere24b24d2017-04-06 16:05:45 -0700942 grpc_pollset_worker **worker_hdl) {
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700943 if (NEW_ROOT ==
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700944 worker_remove(&worker->pollable_obj->root_worker, PWL_POLLABLE, worker)) {
945 gpr_cv_signal(&worker->pollable_obj->root_worker->cv);
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700946 }
947 if (worker->initialized_cv) {
948 gpr_cv_destroy(&worker->cv);
949 }
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700950 if (pollset_is_pollable_fd(pollset, worker->pollable_obj)) {
951 UNREF_BY(exec_ctx, (grpc_fd *)worker->pollable_obj, 2, "one_poll");
Craig Tillere24b24d2017-04-06 16:05:45 -0700952 }
Craig Tiller227e11b2017-04-20 18:25:02 +0000953 if (EMPTIED == worker_remove(&pollset->root_worker, PWL_POLLSET, worker)) {
954 pollset_maybe_finish_shutdown(exec_ctx, pollset);
955 }
Craig Tillere24b24d2017-04-06 16:05:45 -0700956}
957
Craig Tiller8fc1ca12017-04-07 13:01:48 -0700958/* pollset->po.mu lock must be held by the caller before calling this.
959 The function pollset_work() may temporarily release the lock (pollset->po.mu)
Craig Tillerdd86b692017-04-06 10:43:11 -0700960 during the course of its execution but it will always re-acquire the lock and
961 ensure that it is held by the time the function returns */
962static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
963 grpc_pollset_worker **worker_hdl,
964 gpr_timespec now, gpr_timespec deadline) {
Craig Tillere24b24d2017-04-06 16:05:45 -0700965 grpc_pollset_worker worker;
Craig Tillercfaa0462017-05-05 15:27:40 +0000966 if (0 && GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tillercd0354b2017-04-10 16:19:18 -0700967 gpr_log(GPR_DEBUG, "PS:%p work hdl=%p worker=%p now=%" PRId64
968 ".%09d deadline=%" PRId64 ".%09d kwp=%d root_worker=%p",
969 pollset, worker_hdl, &worker, now.tv_sec, now.tv_nsec,
970 deadline.tv_sec, deadline.tv_nsec, pollset->kicked_without_poller,
971 pollset->root_worker);
972 }
Craig Tillere24b24d2017-04-06 16:05:45 -0700973 grpc_error *error = GRPC_ERROR_NONE;
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700974 static const char *err_desc = "pollset_work";
Craig Tillerbb93af62017-04-07 23:49:00 +0000975 if (pollset->kicked_without_poller) {
976 pollset->kicked_without_poller = false;
977 return GRPC_ERROR_NONE;
978 }
Yash Tibrewal8f798e92017-09-13 15:45:47 -0700979 if (pollset->current_pollable_obj != &pollset->pollable_obj) {
980 gpr_mu_lock(&pollset->current_pollable_obj->po.mu);
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700981 }
Craig Tillere3a69332017-04-18 16:45:40 +0000982 if (begin_worker(pollset, &worker, worker_hdl, &now, deadline)) {
Craig Tillerbb93af62017-04-07 23:49:00 +0000983 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
984 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
Craig Tillere24b24d2017-04-06 16:05:45 -0700985 GPR_ASSERT(!pollset->shutdown_closure);
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700986 append_error(&error, pollable_materialize(worker.pollable_obj), err_desc);
987 if (worker.pollable_obj != &pollset->pollable_obj) {
988 gpr_mu_unlock(&worker.pollable_obj->po.mu);
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700989 }
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700990 gpr_mu_unlock(&pollset->pollable_obj.po.mu);
Craig Tiller4f07ea82017-06-08 23:56:06 +0000991 if (pollset->event_cursor == pollset->event_count) {
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700992 append_error(&error, pollset_epoll(exec_ctx, pollset, worker.pollable_obj,
Craig Tiller9bedddd2017-06-08 17:05:00 -0700993 now, deadline),
Craig Tiller4f07ea82017-06-08 23:56:06 +0000994 err_desc);
995 }
Craig Tiller9bedddd2017-06-08 17:05:00 -0700996 append_error(&error, pollset_process_events(exec_ctx, pollset, false),
997 err_desc);
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700998 gpr_mu_lock(&pollset->pollable_obj.po.mu);
999 if (worker.pollable_obj != &pollset->pollable_obj) {
1000 gpr_mu_lock(&worker.pollable_obj->po.mu);
Craig Tiller9f012512017-04-13 15:37:14 -07001001 }
Craig Tillerbb93af62017-04-07 23:49:00 +00001002 gpr_tls_set(&g_current_thread_pollset, 0);
1003 gpr_tls_set(&g_current_thread_worker, 0);
Craig Tillerd9cd8f02017-04-07 16:26:02 -07001004 pollset_maybe_finish_shutdown(exec_ctx, pollset);
Craig Tillere24b24d2017-04-06 16:05:45 -07001005 }
Craig Tillerf8401102017-04-17 09:47:28 -07001006 end_worker(exec_ctx, pollset, &worker, worker_hdl);
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -07001007 if (worker.pollable_obj != &pollset->pollable_obj) {
1008 gpr_mu_unlock(&worker.pollable_obj->po.mu);
Craig Tillerd1d7fdd2017-04-14 16:16:24 -07001009 }
Craig Tiller4f07ea82017-06-08 23:56:06 +00001010 if (grpc_exec_ctx_has_work(exec_ctx)) {
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -07001011 gpr_mu_unlock(&pollset->pollable_obj.po.mu);
Craig Tiller4f07ea82017-06-08 23:56:06 +00001012 grpc_exec_ctx_flush(exec_ctx);
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -07001013 gpr_mu_lock(&pollset->pollable_obj.po.mu);
Craig Tiller4f07ea82017-06-08 23:56:06 +00001014 }
Craig Tillere24b24d2017-04-06 16:05:45 -07001015 return error;
Craig Tillerdd86b692017-04-06 10:43:11 -07001016}
1017
Craig Tillerf8401102017-04-17 09:47:28 -07001018static void unref_fd_no_longer_poller(grpc_exec_ctx *exec_ctx, void *arg,
1019 grpc_error *error) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001020 grpc_fd *fd = (grpc_fd *)arg;
Craig Tiller50480b22017-04-17 16:34:52 +00001021 UNREF_BY(exec_ctx, fd, 2, "pollset_pollable");
1022}
1023
1024/* expects pollsets locked, flag whether fd is locked or not */
Craig Tillerf8401102017-04-17 09:47:28 -07001025static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx,
1026 grpc_pollset *pollset, grpc_fd *fd,
1027 bool fd_locked) {
Craig Tillere24b24d2017-04-06 16:05:45 -07001028 static const char *err_desc = "pollset_add_fd";
Craig Tiller50480b22017-04-17 16:34:52 +00001029 grpc_error *error = GRPC_ERROR_NONE;
Yash Tibrewal8f798e92017-09-13 15:45:47 -07001030 if (pollset->current_pollable_obj == &g_empty_pollable) {
Craig Tiller86ffd652017-06-20 00:16:54 +00001031 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller6c8383a2017-05-05 16:54:42 +00001032 gpr_log(GPR_DEBUG,
1033 "PS:%p add fd %p; transition pollable from empty to fd", pollset,
1034 fd);
Craig Tiller86ffd652017-06-20 00:16:54 +00001035 }
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -07001036 /* empty pollable --> single fd pollable */
Craig Tillere16722c2017-05-17 21:50:37 +00001037 pollset_kick_all(exec_ctx, pollset);
Yash Tibrewal8f798e92017-09-13 15:45:47 -07001038 pollset->current_pollable_obj = &fd->pollable_obj;
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -07001039 if (!fd_locked) gpr_mu_lock(&fd->pollable_obj.po.mu);
Craig Tiller50480b22017-04-17 16:34:52 +00001040 append_error(&error, fd_become_pollable_locked(fd), err_desc);
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -07001041 if (!fd_locked) gpr_mu_unlock(&fd->pollable_obj.po.mu);
Craig Tillerd1d7fdd2017-04-14 16:16:24 -07001042 REF_BY(fd, 2, "pollset_pollable");
Yash Tibrewal8f798e92017-09-13 15:45:47 -07001043 } else if (pollset->current_pollable_obj == &pollset->pollable_obj) {
Craig Tiller86ffd652017-06-20 00:16:54 +00001044 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller6c8383a2017-05-05 16:54:42 +00001045 gpr_log(GPR_DEBUG, "PS:%p add fd %p; already multipolling", pollset, fd);
Craig Tiller86ffd652017-06-20 00:16:54 +00001046 }
Yash Tibrewal8f798e92017-09-13 15:45:47 -07001047 append_error(&error, pollable_add_fd(pollset->current_pollable_obj, fd),
Craig Tillerd1d7fdd2017-04-14 16:16:24 -07001048 err_desc);
Yash Tibrewal8f798e92017-09-13 15:45:47 -07001049 } else if (pollset->current_pollable_obj != &fd->pollable_obj) {
1050 grpc_fd *had_fd = (grpc_fd *)pollset->current_pollable_obj;
Craig Tiller86ffd652017-06-20 00:16:54 +00001051 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller6c8383a2017-05-05 16:54:42 +00001052 gpr_log(GPR_DEBUG,
1053 "PS:%p add fd %p; transition pollable from fd %p to multipoller",
1054 pollset, fd, had_fd);
Craig Tiller86ffd652017-06-20 00:16:54 +00001055 }
1056 /* Introduce a spurious completion.
1057 If we do not, then it may be that the fd-specific epoll set consumed
1058 a completion without being polled, leading to a missed edge going up. */
Sree Kuchibhotlac9caedd2017-07-14 15:46:51 -07001059 grpc_lfev_set_ready(exec_ctx, &had_fd->read_closure, "read");
1060 grpc_lfev_set_ready(exec_ctx, &had_fd->write_closure, "write");
Craig Tillere16722c2017-05-17 21:50:37 +00001061 pollset_kick_all(exec_ctx, pollset);
Yash Tibrewal8f798e92017-09-13 15:45:47 -07001062 pollset->current_pollable_obj = &pollset->pollable_obj;
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -07001063 if (append_error(&error, pollable_materialize(&pollset->pollable_obj),
Craig Tillerd1d7fdd2017-04-14 16:16:24 -07001064 err_desc)) {
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -07001065 pollable_add_fd(&pollset->pollable_obj, had_fd);
1066 pollable_add_fd(&pollset->pollable_obj, fd);
Craig Tillere24b24d2017-04-06 16:05:45 -07001067 }
ncteisen274bbbe2017-06-08 14:57:11 -07001068 GRPC_CLOSURE_SCHED(exec_ctx,
1069 GRPC_CLOSURE_CREATE(unref_fd_no_longer_poller, had_fd,
Craig Tillerf8401102017-04-17 09:47:28 -07001070 grpc_schedule_on_exec_ctx),
1071 GRPC_ERROR_NONE);
Craig Tillere24b24d2017-04-06 16:05:45 -07001072 }
Craig Tiller50480b22017-04-17 16:34:52 +00001073 return error;
1074}
Craig Tillerd1d7fdd2017-04-14 16:16:24 -07001075
Craig Tiller50480b22017-04-17 16:34:52 +00001076static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1077 grpc_fd *fd) {
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -07001078 gpr_mu_lock(&pollset->pollable_obj.po.mu);
Craig Tiller50480b22017-04-17 16:34:52 +00001079 grpc_error *error = pollset_add_fd_locked(exec_ctx, pollset, fd, false);
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -07001080 gpr_mu_unlock(&pollset->pollable_obj.po.mu);
Craig Tillere24b24d2017-04-06 16:05:45 -07001081 GRPC_LOG_IF_ERROR("pollset_add_fd", error);
Craig Tillerdd86b692017-04-06 10:43:11 -07001082}
1083
1084/*******************************************************************************
1085 * Pollset-set Definitions
1086 */
1087
1088static grpc_pollset_set *pollset_set_create(void) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001089 grpc_pollset_set *pss = (grpc_pollset_set *)gpr_zalloc(sizeof(*pss));
Craig Tillerf18286b2017-04-10 14:44:09 -07001090 po_init(&pss->po, PO_POLLSET_SET);
Craig Tillerdd86b692017-04-06 10:43:11 -07001091 return pss;
1092}
1093
1094static void pollset_set_destroy(grpc_exec_ctx *exec_ctx,
1095 grpc_pollset_set *pss) {
Craig Tillerf18286b2017-04-10 14:44:09 -07001096 po_destroy(&pss->po);
Craig Tiller83d5fb62017-04-10 22:13:43 +00001097 gpr_free(pss);
Craig Tillerdd86b692017-04-06 10:43:11 -07001098}
1099
1100static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
1101 grpc_fd *fd) {
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -07001102 po_join(exec_ctx, &pss->po, &fd->pollable_obj.po);
Craig Tillerdd86b692017-04-06 10:43:11 -07001103}
1104
1105static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
Craig Tillerf18286b2017-04-10 14:44:09 -07001106 grpc_fd *fd) {}
Craig Tillerdd86b692017-04-06 10:43:11 -07001107
1108static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
1109 grpc_pollset_set *pss, grpc_pollset *ps) {
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -07001110 po_join(exec_ctx, &pss->po, &ps->pollable_obj.po);
Craig Tillerdd86b692017-04-06 10:43:11 -07001111}
1112
1113static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
Craig Tillerf18286b2017-04-10 14:44:09 -07001114 grpc_pollset_set *pss, grpc_pollset *ps) {}
Craig Tillerdd86b692017-04-06 10:43:11 -07001115
1116static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
1117 grpc_pollset_set *bag,
1118 grpc_pollset_set *item) {
Craig Tillerf18286b2017-04-10 14:44:09 -07001119 po_join(exec_ctx, &bag->po, &item->po);
Craig Tillerdd86b692017-04-06 10:43:11 -07001120}
1121
1122static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
1123 grpc_pollset_set *bag,
Craig Tillerf081d892017-04-07 14:04:42 -07001124 grpc_pollset_set *item) {}
Craig Tillerdd86b692017-04-06 10:43:11 -07001125
Craig Tillerf18286b2017-04-10 14:44:09 -07001126static void po_init(polling_obj *po, polling_obj_type type) {
1127 gpr_mu_init(&po->mu);
1128 po->type = type;
1129 po->group = NULL;
1130 po->next = po;
1131 po->prev = po;
1132}
1133
1134static polling_group *pg_lock_latest(polling_group *pg) {
1135 /* assumes pg unlocked; consumes ref, returns ref */
1136 gpr_mu_lock(&pg->po.mu);
1137 while (pg->po.group != NULL) {
1138 polling_group *new_pg = pg_ref(pg->po.group);
1139 gpr_mu_unlock(&pg->po.mu);
1140 pg_unref(pg);
1141 pg = new_pg;
1142 gpr_mu_lock(&pg->po.mu);
1143 }
1144 return pg;
1145}
1146
1147static void po_destroy(polling_obj *po) {
1148 if (po->group != NULL) {
1149 polling_group *pg = pg_lock_latest(po->group);
1150 po->prev->next = po->next;
1151 po->next->prev = po->prev;
Craig Tiller3e21ec52017-04-10 15:11:38 -07001152 gpr_mu_unlock(&pg->po.mu);
Craig Tillerf18286b2017-04-10 14:44:09 -07001153 pg_unref(pg);
1154 }
1155 gpr_mu_destroy(&po->mu);
1156}
1157
1158static polling_group *pg_ref(polling_group *pg) {
1159 gpr_ref(&pg->refs);
1160 return pg;
1161}
1162
1163static void pg_unref(polling_group *pg) {
1164 if (gpr_unref(&pg->refs)) {
1165 po_destroy(&pg->po);
1166 gpr_free(pg);
1167 }
1168}
1169
Craig Tillerd1d7fdd2017-04-14 16:16:24 -07001170static int po_cmp(polling_obj *a, polling_obj *b) {
1171 if (a == b) return 0;
1172 if (a->type < b->type) return -1;
1173 if (a->type > b->type) return 1;
1174 if (a < b) return -1;
1175 assert(a > b);
1176 return 1;
1177}
1178
Craig Tillerf18286b2017-04-10 14:44:09 -07001179static void po_join(grpc_exec_ctx *exec_ctx, polling_obj *a, polling_obj *b) {
Craig Tillerd1d7fdd2017-04-14 16:16:24 -07001180 switch (po_cmp(a, b)) {
1181 case 0:
1182 return;
1183 case 1:
1184 GPR_SWAP(polling_obj *, a, b);
1185 /* fall through */
1186 case -1:
1187 gpr_mu_lock(&a->mu);
1188 gpr_mu_lock(&b->mu);
Craig Tillerf18286b2017-04-10 14:44:09 -07001189
Craig Tillerd1d7fdd2017-04-14 16:16:24 -07001190 if (a->group == NULL) {
1191 if (b->group == NULL) {
1192 polling_obj *initial_po[] = {a, b};
1193 pg_create(exec_ctx, initial_po, GPR_ARRAY_SIZE(initial_po));
1194 gpr_mu_unlock(&a->mu);
1195 gpr_mu_unlock(&b->mu);
1196 } else {
1197 polling_group *b_group = pg_ref(b->group);
1198 gpr_mu_unlock(&b->mu);
1199 gpr_mu_unlock(&a->mu);
1200 pg_join(exec_ctx, b_group, a);
1201 }
1202 } else if (b->group == NULL) {
1203 polling_group *a_group = pg_ref(a->group);
1204 gpr_mu_unlock(&a->mu);
1205 gpr_mu_unlock(&b->mu);
1206 pg_join(exec_ctx, a_group, b);
1207 } else if (a->group == b->group) {
1208 /* nothing to do */
1209 gpr_mu_unlock(&a->mu);
1210 gpr_mu_unlock(&b->mu);
1211 } else {
1212 polling_group *a_group = pg_ref(a->group);
1213 polling_group *b_group = pg_ref(b->group);
1214 gpr_mu_unlock(&a->mu);
1215 gpr_mu_unlock(&b->mu);
1216 pg_merge(exec_ctx, a_group, b_group);
1217 }
Craig Tillerf18286b2017-04-10 14:44:09 -07001218 }
1219}
1220
Craig Tillerf18286b2017-04-10 14:44:09 -07001221static void pg_notify(grpc_exec_ctx *exec_ctx, polling_obj *a, polling_obj *b) {
Craig Tiller6462fd82017-04-13 17:32:07 +00001222 if (a->type == PO_FD && b->type == PO_POLLSET) {
Craig Tiller50480b22017-04-17 16:34:52 +00001223 pollset_add_fd_locked(exec_ctx, (grpc_pollset *)b, (grpc_fd *)a, true);
Craig Tiller6462fd82017-04-13 17:32:07 +00001224 } else if (a->type == PO_POLLSET && b->type == PO_FD) {
Craig Tiller50480b22017-04-17 16:34:52 +00001225 pollset_add_fd_locked(exec_ctx, (grpc_pollset *)a, (grpc_fd *)b, true);
Craig Tillerf18286b2017-04-10 14:44:09 -07001226 }
1227}
1228
1229static void pg_broadcast(grpc_exec_ctx *exec_ctx, polling_group *from,
1230 polling_group *to) {
1231 for (polling_obj *a = from->po.next; a != &from->po; a = a->next) {
1232 for (polling_obj *b = to->po.next; b != &to->po; b = b->next) {
Craig Tiller50480b22017-04-17 16:34:52 +00001233 if (po_cmp(a, b) < 0) {
Craig Tillerf8401102017-04-17 09:47:28 -07001234 gpr_mu_lock(&a->mu);
1235 gpr_mu_lock(&b->mu);
Craig Tiller50480b22017-04-17 16:34:52 +00001236 } else {
1237 GPR_ASSERT(po_cmp(a, b) != 0);
Craig Tillerf8401102017-04-17 09:47:28 -07001238 gpr_mu_lock(&b->mu);
1239 gpr_mu_lock(&a->mu);
Craig Tiller50480b22017-04-17 16:34:52 +00001240 }
Craig Tillerf18286b2017-04-10 14:44:09 -07001241 pg_notify(exec_ctx, a, b);
Craig Tillerf8401102017-04-17 09:47:28 -07001242 gpr_mu_unlock(&a->mu);
1243 gpr_mu_unlock(&b->mu);
Craig Tillerf18286b2017-04-10 14:44:09 -07001244 }
1245 }
1246}
1247
1248static void pg_create(grpc_exec_ctx *exec_ctx, polling_obj **initial_po,
1249 size_t initial_po_count) {
1250 /* assumes all polling objects in initial_po are locked */
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001251 polling_group *pg = (polling_group *)gpr_malloc(sizeof(*pg));
Craig Tillerf18286b2017-04-10 14:44:09 -07001252 po_init(&pg->po, PO_POLLING_GROUP);
Craig Tillerac328d22017-04-10 14:45:15 -07001253 gpr_ref_init(&pg->refs, (int)initial_po_count);
Craig Tillerf18286b2017-04-10 14:44:09 -07001254 for (size_t i = 0; i < initial_po_count; i++) {
1255 GPR_ASSERT(initial_po[i]->group == NULL);
1256 initial_po[i]->group = pg;
1257 }
1258 for (size_t i = 1; i < initial_po_count; i++) {
1259 initial_po[i]->prev = initial_po[i - 1];
1260 }
1261 for (size_t i = 0; i < initial_po_count - 1; i++) {
1262 initial_po[i]->next = initial_po[i + 1];
1263 }
1264 initial_po[0]->prev = &pg->po;
1265 initial_po[initial_po_count - 1]->next = &pg->po;
1266 pg->po.next = initial_po[0];
1267 pg->po.prev = initial_po[initial_po_count - 1];
1268 for (size_t i = 1; i < initial_po_count; i++) {
1269 for (size_t j = 0; j < i; j++) {
1270 pg_notify(exec_ctx, initial_po[i], initial_po[j]);
1271 }
1272 }
1273}
1274
1275static void pg_join(grpc_exec_ctx *exec_ctx, polling_group *pg,
1276 polling_obj *po) {
1277 /* assumes neither pg nor po are locked; consumes one ref to pg */
1278 pg = pg_lock_latest(pg);
1279 /* pg locked */
Craig Tiller50480b22017-04-17 16:34:52 +00001280 for (polling_obj *existing = pg->po.next /* skip pg - it's just a stub */;
1281 existing != &pg->po; existing = existing->next) {
1282 if (po_cmp(po, existing) < 0) {
1283 gpr_mu_lock(&po->mu);
1284 gpr_mu_lock(&existing->mu);
1285 } else {
1286 GPR_ASSERT(po_cmp(po, existing) != 0);
1287 gpr_mu_lock(&existing->mu);
1288 gpr_mu_lock(&po->mu);
1289 }
1290 /* pg, po, existing locked */
1291 if (po->group != NULL) {
1292 gpr_mu_unlock(&pg->po.mu);
1293 polling_group *po_group = pg_ref(po->group);
1294 gpr_mu_unlock(&po->mu);
1295 gpr_mu_unlock(&existing->mu);
1296 pg_merge(exec_ctx, pg, po_group);
1297 /* early exit: polling obj picked up a group during joining: we needed
1298 to do a full merge */
1299 return;
1300 }
1301 pg_notify(exec_ctx, po, existing);
1302 gpr_mu_unlock(&po->mu);
1303 gpr_mu_unlock(&existing->mu);
1304 }
Craig Tillerf18286b2017-04-10 14:44:09 -07001305 gpr_mu_lock(&po->mu);
1306 if (po->group != NULL) {
1307 gpr_mu_unlock(&pg->po.mu);
1308 polling_group *po_group = pg_ref(po->group);
1309 gpr_mu_unlock(&po->mu);
1310 pg_merge(exec_ctx, pg, po_group);
Craig Tiller50480b22017-04-17 16:34:52 +00001311 /* early exit: polling obj picked up a group during joining: we needed
Craig Tillerf18286b2017-04-10 14:44:09 -07001312 to do a full merge */
1313 return;
1314 }
Craig Tillerf18286b2017-04-10 14:44:09 -07001315 po->group = pg;
1316 po->next = &pg->po;
1317 po->prev = pg->po.prev;
1318 po->prev->next = po->next->prev = po;
1319 gpr_mu_unlock(&pg->po.mu);
1320 gpr_mu_unlock(&po->mu);
1321}
1322
1323static void pg_merge(grpc_exec_ctx *exec_ctx, polling_group *a,
1324 polling_group *b) {
1325 for (;;) {
Craig Tiller36148942017-04-13 17:45:08 +00001326 if (a == b) {
1327 pg_unref(a);
1328 pg_unref(b);
1329 return;
1330 }
Craig Tillerf18286b2017-04-10 14:44:09 -07001331 if (a > b) GPR_SWAP(polling_group *, a, b);
1332 gpr_mu_lock(&a->po.mu);
1333 gpr_mu_lock(&b->po.mu);
1334 if (a->po.group != NULL) {
1335 polling_group *m2 = pg_ref(a->po.group);
1336 gpr_mu_unlock(&a->po.mu);
1337 gpr_mu_unlock(&b->po.mu);
1338 pg_unref(a);
1339 a = m2;
1340 } else if (b->po.group != NULL) {
1341 polling_group *m2 = pg_ref(b->po.group);
1342 gpr_mu_unlock(&a->po.mu);
1343 gpr_mu_unlock(&b->po.mu);
1344 pg_unref(b);
1345 b = m2;
1346 } else {
1347 break;
1348 }
1349 }
1350 polling_group **unref = NULL;
1351 size_t unref_count = 0;
1352 size_t unref_cap = 0;
1353 b->po.group = a;
1354 pg_broadcast(exec_ctx, a, b);
1355 pg_broadcast(exec_ctx, b, a);
1356 while (b->po.next != &b->po) {
1357 polling_obj *po = b->po.next;
1358 gpr_mu_lock(&po->mu);
1359 if (unref_count == unref_cap) {
1360 unref_cap = GPR_MAX(8, 3 * unref_cap / 2);
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001361 unref = (polling_group **)gpr_realloc(unref, unref_cap * sizeof(*unref));
Craig Tillerf18286b2017-04-10 14:44:09 -07001362 }
1363 unref[unref_count++] = po->group;
1364 po->group = pg_ref(a);
1365 // unlink from b
1366 po->prev->next = po->next;
1367 po->next->prev = po->prev;
1368 // link to a
1369 po->next = &a->po;
1370 po->prev = a->po.prev;
1371 po->next->prev = po->prev->next = po;
1372 gpr_mu_unlock(&po->mu);
1373 }
1374 gpr_mu_unlock(&a->po.mu);
1375 gpr_mu_unlock(&b->po.mu);
1376 for (size_t i = 0; i < unref_count; i++) {
1377 pg_unref(unref[i]);
1378 }
1379 gpr_free(unref);
Craig Tiller36148942017-04-13 17:45:08 +00001380 pg_unref(b);
Craig Tillerf18286b2017-04-10 14:44:09 -07001381}
1382
Craig Tillerdd86b692017-04-06 10:43:11 -07001383/*******************************************************************************
1384 * Event engine binding
1385 */
1386
1387static void shutdown_engine(void) {
1388 fd_global_shutdown();
1389 pollset_global_shutdown();
1390}
1391
1392static const grpc_event_engine_vtable vtable = {
Yash Tibrewal533d1182017-09-18 10:48:22 -07001393 sizeof(grpc_pollset),
Craig Tillerdd86b692017-04-06 10:43:11 -07001394
Yash Tibrewal533d1182017-09-18 10:48:22 -07001395 fd_create,
1396 fd_wrapped_fd,
1397 fd_orphan,
1398 fd_shutdown,
1399 fd_notify_on_read,
1400 fd_notify_on_write,
1401 fd_is_shutdown,
1402 fd_get_read_notifier_pollset,
Craig Tillerdd86b692017-04-06 10:43:11 -07001403
Yash Tibrewal533d1182017-09-18 10:48:22 -07001404 pollset_init,
1405 pollset_shutdown,
1406 pollset_destroy,
1407 pollset_work,
1408 pollset_kick,
1409 pollset_add_fd,
Craig Tillerdd86b692017-04-06 10:43:11 -07001410
Yash Tibrewal533d1182017-09-18 10:48:22 -07001411 pollset_set_create,
1412 pollset_set_destroy,
1413 pollset_set_add_pollset,
1414 pollset_set_del_pollset,
1415 pollset_set_add_pollset_set,
1416 pollset_set_del_pollset_set,
1417 pollset_set_add_fd,
1418 pollset_set_del_fd,
Craig Tillerdd86b692017-04-06 10:43:11 -07001419
Yash Tibrewal533d1182017-09-18 10:48:22 -07001420 shutdown_engine,
Craig Tillerdd86b692017-04-06 10:43:11 -07001421};
1422
Craig Tiller6c8383a2017-05-05 16:54:42 +00001423const grpc_event_engine_vtable *grpc_init_epollex_linux(
1424 bool explicitly_requested) {
Craig Tillerdd86b692017-04-06 10:43:11 -07001425 if (!grpc_has_wakeup_fd()) {
1426 return NULL;
1427 }
1428
Craig Tiller819cd882017-04-25 13:18:22 -07001429 if (!grpc_is_epollexclusive_available()) {
Craig Tillerdd86b692017-04-06 10:43:11 -07001430 return NULL;
1431 }
1432
1433 fd_global_init();
1434
1435 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
Craig Tiller54d388d2017-04-14 16:20:45 -07001436 pollset_global_shutdown();
1437 fd_global_shutdown();
Craig Tillerdd86b692017-04-06 10:43:11 -07001438 return NULL;
1439 }
1440
1441 return &vtable;
1442}
1443
1444#else /* defined(GRPC_LINUX_EPOLL) */
1445#if defined(GRPC_POSIX_SOCKET)
1446#include "src/core/lib/iomgr/ev_posix.h"
1447/* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
1448 * NULL */
Craig Tiller6c8383a2017-05-05 16:54:42 +00001449const grpc_event_engine_vtable *grpc_init_epollex_linux(
1450 bool explicitly_requested) {
1451 return NULL;
1452}
Craig Tillerdd86b692017-04-06 10:43:11 -07001453#endif /* defined(GRPC_POSIX_SOCKET) */
1454
1455#endif /* !defined(GRPC_LINUX_EPOLL) */