blob: 931f3fc3556a5a03cf502c21cf6ed68fd9521825 [file] [log] [blame]
Craig Tillerdd86b692017-04-06 10:43:11 -07001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2017 gRPC authors.
Craig Tillerdd86b692017-04-06 10:43:11 -07004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
Craig Tillerdd86b692017-04-06 10:43:11 -07008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
Craig Tillerdd86b692017-04-06 10:43:11 -070010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
Craig Tillerdd86b692017-04-06 10:43:11 -070016 *
17 */
18
19#include "src/core/lib/iomgr/port.h"
20
yang-gceb24752017-11-07 12:06:37 -080021#include <grpc/support/log.h>
22
Craig Tillerdd86b692017-04-06 10:43:11 -070023/* This polling engine is only relevant on linux kernels supporting epoll() */
24#ifdef GRPC_LINUX_EPOLL
25
Craig Tillerc6109852017-05-01 14:26:49 -070026#include "src/core/lib/iomgr/ev_epollex_linux.h"
Craig Tillerdd86b692017-04-06 10:43:11 -070027
28#include <assert.h>
29#include <errno.h>
Craig Tiller20397792017-07-18 11:35:27 -070030#include <limits.h>
Craig Tillerdd86b692017-04-06 10:43:11 -070031#include <poll.h>
32#include <pthread.h>
33#include <string.h>
Craig Tillerdd86b692017-04-06 10:43:11 -070034#include <sys/socket.h>
Craig Tiller1e8c2ab2017-10-12 15:50:13 -070035#include <sys/syscall.h>
Craig Tillerdd86b692017-04-06 10:43:11 -070036#include <unistd.h>
37
38#include <grpc/support/alloc.h>
Craig Tillerdd86b692017-04-06 10:43:11 -070039#include <grpc/support/string_util.h>
40#include <grpc/support/tls.h>
41#include <grpc/support/useful.h>
42
Craig Tillerb4bb1cd2017-07-20 14:18:17 -070043#include "src/core/lib/debug/stats.h"
Craig Tiller6b7c1fb2017-07-19 15:45:03 -070044#include "src/core/lib/iomgr/block_annotate.h"
Craig Tillerdd86b692017-04-06 10:43:11 -070045#include "src/core/lib/iomgr/iomgr_internal.h"
Craig Tiller819cd882017-04-25 13:18:22 -070046#include "src/core/lib/iomgr/is_epollexclusive_available.h"
Craig Tillerdd86b692017-04-06 10:43:11 -070047#include "src/core/lib/iomgr/lockfree_event.h"
Craig Tiller819cd882017-04-25 13:18:22 -070048#include "src/core/lib/iomgr/sys_epoll_wrapper.h"
Craig Tillerdd86b692017-04-06 10:43:11 -070049#include "src/core/lib/iomgr/timer.h"
50#include "src/core/lib/iomgr/wakeup_fd_posix.h"
Craig Tillerdd86b692017-04-06 10:43:11 -070051#include "src/core/lib/profiling/timers.h"
Craig Tillerfbf61bb2017-11-08 11:50:14 -080052#include "src/core/lib/support/manual_constructor.h"
Craig Tiller0b4c9012017-04-06 17:19:37 -070053#include "src/core/lib/support/spinlock.h"
Craig Tillerdd86b692017-04-06 10:43:11 -070054
Craig Tiller29a9c3a2017-10-04 15:15:04 -070055// debug aid: create workers on the heap (allows asan to spot
56// use-after-destruction)
Craig Tiller513daab2017-10-06 09:18:34 -070057//#define GRPC_EPOLLEX_CREATE_WORKERS_ON_HEAP 1
Craig Tiller8fc1ca12017-04-07 13:01:48 -070058
Craig Tiller8223f412017-10-17 22:05:55 +000059#define MAX_EPOLL_EVENTS 100
60#define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 5
Craig Tillerf18286b2017-04-10 14:44:09 -070061
Noah Eisenc384d812017-11-12 20:14:27 -080062grpc_core::DebugOnlyTraceFlag grpc_trace_pollable_refcount(false,
63 "pollable_refcount");
Craig Tillerbd412f92017-04-07 13:45:19 -070064
Craig Tiller8fc1ca12017-04-07 13:01:48 -070065/*******************************************************************************
Craig Tillerd1d7fdd2017-04-14 16:16:24 -070066 * pollable Declarations
67 */
68
Craig Tiller249de2b2017-10-02 11:49:19 -070069typedef enum { PO_MULTI, PO_FD, PO_EMPTY } pollable_type;
Craig Tillerdd86b692017-04-06 10:43:11 -070070
Craig Tiller249de2b2017-10-02 11:49:19 -070071typedef struct pollable pollable;
Craig Tillerdd86b692017-04-06 10:43:11 -070072
Craig Tiller5d9db0c2017-10-18 09:43:55 -070073/// A pollable is something that can be polled: it has an epoll set to poll on,
74/// and a wakeup fd for kicks
75/// There are three broad types:
76/// - PO_EMPTY - the empty pollable, used before file descriptors are added to
77/// a pollset
78/// - PO_FD - a pollable containing only one FD - used to optimize single-fd
79/// pollsets (which are common with synchronous api usage)
80/// - PO_MULTI - a pollable containing many fds
Craig Tiller249de2b2017-10-02 11:49:19 -070081struct pollable {
82 pollable_type type; // immutable
Craig Tillerdd86b692017-04-06 10:43:11 -070083 gpr_refcount refs;
Craig Tillerdd86b692017-04-06 10:43:11 -070084
Craig Tillerd1d7fdd2017-04-14 16:16:24 -070085 int epfd;
86 grpc_wakeup_fd wakeup;
Craig Tillerd1d7fdd2017-04-14 16:16:24 -070087
Craig Tiller249de2b2017-10-02 11:49:19 -070088 // only for type fd... one ref to the owner fd
Craig Tillerbaa14a92017-11-03 09:09:36 -070089 grpc_fd* owner_fd;
Craig Tiller249de2b2017-10-02 11:49:19 -070090
Craig Tillerbaa14a92017-11-03 09:09:36 -070091 grpc_pollset_set* pollset_set;
92 pollable* next;
93 pollable* prev;
Craig Tiller249de2b2017-10-02 11:49:19 -070094
95 gpr_mu mu;
Craig Tillerbaa14a92017-11-03 09:09:36 -070096 grpc_pollset_worker* root_worker;
Craig Tiller8223f412017-10-17 22:05:55 +000097
98 int event_cursor;
99 int event_count;
100 struct epoll_event events[MAX_EPOLL_EVENTS];
Craig Tiller249de2b2017-10-02 11:49:19 -0700101};
102
Craig Tillerbaa14a92017-11-03 09:09:36 -0700103static const char* pollable_type_string(pollable_type t) {
Craig Tiller86ffd652017-06-20 00:16:54 +0000104 switch (t) {
Craig Tiller249de2b2017-10-02 11:49:19 -0700105 case PO_MULTI:
Craig Tiller7fd8f672017-07-05 15:10:12 -0700106 return "pollset";
107 case PO_FD:
108 return "fd";
Craig Tiller249de2b2017-10-02 11:49:19 -0700109 case PO_EMPTY:
110 return "empty";
Craig Tiller86ffd652017-06-20 00:16:54 +0000111 }
112 return "<invalid>";
113}
114
Craig Tillerbaa14a92017-11-03 09:09:36 -0700115static char* pollable_desc(pollable* p) {
116 char* out;
Craig Tiller249de2b2017-10-02 11:49:19 -0700117 gpr_asprintf(&out, "type=%s epfd=%d wakeup=%d", pollable_type_string(p->type),
118 p->epfd, p->wakeup.read_fd);
Craig Tiller86ffd652017-06-20 00:16:54 +0000119 return out;
120}
121
Craig Tiller5d9db0c2017-10-18 09:43:55 -0700122/// Shared empty pollable - used by pollset to poll on until the first fd is
123/// added
Craig Tillerbaa14a92017-11-03 09:09:36 -0700124static pollable* g_empty_pollable;
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700125
Craig Tillerbaa14a92017-11-03 09:09:36 -0700126static grpc_error* pollable_create(pollable_type type, pollable** p);
Craig Tiller389ea902017-10-04 17:42:49 +0000127#ifdef NDEBUG
Craig Tillerbaa14a92017-11-03 09:09:36 -0700128static pollable* pollable_ref(pollable* p);
129static void pollable_unref(pollable* p);
Craig Tiller389ea902017-10-04 17:42:49 +0000130#define POLLABLE_REF(p, r) pollable_ref(p)
131#define POLLABLE_UNREF(p, r) pollable_unref(p)
132#else
Craig Tillerbaa14a92017-11-03 09:09:36 -0700133static pollable* pollable_ref(pollable* p, int line, const char* reason);
134static void pollable_unref(pollable* p, int line, const char* reason);
Craig Tiller389ea902017-10-04 17:42:49 +0000135#define POLLABLE_REF(p, r) pollable_ref((p), __LINE__, (r))
136#define POLLABLE_UNREF(p, r) pollable_unref((p), __LINE__, (r))
137#endif
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700138
139/*******************************************************************************
Craig Tillerdd86b692017-04-06 10:43:11 -0700140 * Fd Declarations
141 */
142
Craig Tillerdd86b692017-04-06 10:43:11 -0700143struct grpc_fd {
Craig Tillerdd86b692017-04-06 10:43:11 -0700144 int fd;
145 /* refst format:
146 bit 0 : 1=Active / 0=Orphaned
147 bits 1-n : refcount
148 Ref/Unref by two to avoid altering the orphaned bit */
149 gpr_atm refst;
150
Craig Tiller4fd6a412017-10-04 22:41:13 +0000151 gpr_mu orphan_mu;
152
Craig Tiller249de2b2017-10-02 11:49:19 -0700153 gpr_mu pollable_mu;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700154 pollable* pollable_obj;
Craig Tillerdd86b692017-04-06 10:43:11 -0700155
Craig Tillerfbf61bb2017-11-08 11:50:14 -0800156 grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure;
157 grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure;
Craig Tillerdd86b692017-04-06 10:43:11 -0700158
Craig Tillerbaa14a92017-11-03 09:09:36 -0700159 struct grpc_fd* freelist_next;
160 grpc_closure* on_done_closure;
Craig Tillerdd86b692017-04-06 10:43:11 -0700161
162 /* The pollset that last noticed that the fd is readable. The actual type
163 * stored in this is (grpc_pollset *) */
164 gpr_atm read_notifier_pollset;
165
166 grpc_iomgr_object iomgr_object;
167};
168
Craig Tillerdd86b692017-04-06 10:43:11 -0700169static void fd_global_init(void);
170static void fd_global_shutdown(void);
171
Craig Tillerdd86b692017-04-06 10:43:11 -0700172/*******************************************************************************
173 * Pollset Declarations
174 */
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700175
Craig Tiller23adbd52017-10-02 15:29:18 -0700176typedef struct {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700177 grpc_pollset_worker* next;
178 grpc_pollset_worker* prev;
Craig Tiller23adbd52017-10-02 15:29:18 -0700179} pwlink;
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700180
Craig Tiller23adbd52017-10-02 15:29:18 -0700181typedef enum { PWLINK_POLLABLE = 0, PWLINK_POLLSET, PWLINK_COUNT } pwlinks;
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700182
Craig Tillerdd86b692017-04-06 10:43:11 -0700183struct grpc_pollset_worker {
Craig Tillere24b24d2017-04-06 16:05:45 -0700184 bool kicked;
185 bool initialized_cv;
Craig Tiller5d9db0c2017-10-18 09:43:55 -0700186#ifndef NDEBUG
187 // debug aid: which thread started this worker
Craig Tillerad059f72017-10-12 22:47:05 +0000188 pid_t originator;
Craig Tiller5d9db0c2017-10-18 09:43:55 -0700189#endif
Craig Tillere24b24d2017-04-06 16:05:45 -0700190 gpr_cv cv;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700191 grpc_pollset* pollset;
192 pollable* pollable_obj;
Craig Tiller249de2b2017-10-02 11:49:19 -0700193
Craig Tiller23adbd52017-10-02 15:29:18 -0700194 pwlink links[PWLINK_COUNT];
Craig Tillerdd86b692017-04-06 10:43:11 -0700195};
196
Craig Tillere24b24d2017-04-06 16:05:45 -0700197struct grpc_pollset {
Craig Tiller249de2b2017-10-02 11:49:19 -0700198 gpr_mu mu;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700199 pollable* active_pollable;
Craig Tillerbb93af62017-04-07 23:49:00 +0000200 bool kicked_without_poller;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700201 grpc_closure* shutdown_closure;
202 grpc_pollset_worker* root_worker;
Craig Tillerad059f72017-10-12 22:47:05 +0000203 int containing_pollset_set_count;
Craig Tillere24b24d2017-04-06 16:05:45 -0700204};
Craig Tillerdd86b692017-04-06 10:43:11 -0700205
206/*******************************************************************************
207 * Pollset-set Declarations
208 */
Craig Tiller23adbd52017-10-02 15:29:18 -0700209
Craig Tiller8fc1ca12017-04-07 13:01:48 -0700210struct grpc_pollset_set {
Craig Tiller249de2b2017-10-02 11:49:19 -0700211 gpr_refcount refs;
212 gpr_mu mu;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700213 grpc_pollset_set* parent;
Craig Tiller23adbd52017-10-02 15:29:18 -0700214
215 size_t pollset_count;
216 size_t pollset_capacity;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700217 grpc_pollset** pollsets;
Craig Tiller23adbd52017-10-02 15:29:18 -0700218
219 size_t fd_count;
220 size_t fd_capacity;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700221 grpc_fd** fds;
Craig Tiller8fc1ca12017-04-07 13:01:48 -0700222};
Craig Tillerdd86b692017-04-06 10:43:11 -0700223
224/*******************************************************************************
225 * Common helpers
226 */
227
Craig Tillerbaa14a92017-11-03 09:09:36 -0700228static bool append_error(grpc_error** composite, grpc_error* error,
229 const char* desc) {
Craig Tillere24b24d2017-04-06 16:05:45 -0700230 if (error == GRPC_ERROR_NONE) return true;
231 if (*composite == GRPC_ERROR_NONE) {
232 *composite = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc);
233 }
234 *composite = grpc_error_add_child(*composite, error);
235 return false;
236}
237
Craig Tillerdd86b692017-04-06 10:43:11 -0700238/*******************************************************************************
239 * Fd Definitions
240 */
241
242/* We need to keep a freelist not because of any concerns of malloc performance
243 * but instead so that implementations with multiple threads in (for example)
244 * epoll_wait deal with the race between pollset removal and incoming poll
245 * notifications.
246 *
247 * The problem is that the poller ultimately holds a reference to this
248 * object, so it is very difficult to know when is safe to free it, at least
249 * without some expensive synchronization.
250 *
251 * If we keep the object freelisted, in the worst case losing this race just
252 * becomes a spurious read notification on a reused fd.
253 */
254
Noah Eisen882dfed2017-11-14 14:58:20 -0800255static grpc_fd* fd_freelist = nullptr;
Craig Tillerdd86b692017-04-06 10:43:11 -0700256static gpr_mu fd_freelist_mu;
257
Noah Eisen264879f2017-06-20 17:14:47 -0700258#ifndef NDEBUG
Craig Tillerdd86b692017-04-06 10:43:11 -0700259#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
Craig Tillerf8401102017-04-17 09:47:28 -0700260#define UNREF_BY(ec, fd, n, reason) \
261 unref_by(ec, fd, n, reason, __FILE__, __LINE__)
Craig Tillerbaa14a92017-11-03 09:09:36 -0700262static void ref_by(grpc_fd* fd, int n, const char* reason, const char* file,
Craig Tillerdd86b692017-04-06 10:43:11 -0700263 int line) {
ncteisen9ffb1492017-11-10 14:00:49 -0800264 if (grpc_trace_fd_refcount.enabled()) {
Noah Eisen264879f2017-06-20 17:14:47 -0700265 gpr_log(GPR_DEBUG,
266 "FD %d %p ref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]",
267 fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst),
268 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
269 }
Craig Tillerdd86b692017-04-06 10:43:11 -0700270#else
271#define REF_BY(fd, n, reason) ref_by(fd, n)
Craig Tiller50480b22017-04-17 16:34:52 +0000272#define UNREF_BY(ec, fd, n, reason) unref_by(ec, fd, n)
Craig Tillerbaa14a92017-11-03 09:09:36 -0700273static void ref_by(grpc_fd* fd, int n) {
Craig Tillerdd86b692017-04-06 10:43:11 -0700274#endif
275 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
276}
277
Craig Tillerbaa14a92017-11-03 09:09:36 -0700278static void fd_destroy(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
279 grpc_fd* fd = (grpc_fd*)arg;
Craig Tillerf8401102017-04-17 09:47:28 -0700280 /* Add the fd to the freelist */
281 grpc_iomgr_unregister_object(&fd->iomgr_object);
Craig Tiller389ea902017-10-04 17:42:49 +0000282 POLLABLE_UNREF(fd->pollable_obj, "fd_pollable");
Craig Tiller249de2b2017-10-02 11:49:19 -0700283 gpr_mu_destroy(&fd->pollable_mu);
Craig Tiller4fd6a412017-10-04 22:41:13 +0000284 gpr_mu_destroy(&fd->orphan_mu);
Craig Tillerf8401102017-04-17 09:47:28 -0700285 gpr_mu_lock(&fd_freelist_mu);
286 fd->freelist_next = fd_freelist;
287 fd_freelist = fd;
Craig Tillerdd86b692017-04-06 10:43:11 -0700288
yang-ged49fe52017-11-20 13:49:54 -0800289 fd->read_closure->DestroyEvent();
290 fd->write_closure->DestroyEvent();
Craig Tillerdd86b692017-04-06 10:43:11 -0700291
Craig Tillerf8401102017-04-17 09:47:28 -0700292 gpr_mu_unlock(&fd_freelist_mu);
Craig Tiller50480b22017-04-17 16:34:52 +0000293}
294
Noah Eisen264879f2017-06-20 17:14:47 -0700295#ifndef NDEBUG
Craig Tillerbaa14a92017-11-03 09:09:36 -0700296static void unref_by(grpc_exec_ctx* exec_ctx, grpc_fd* fd, int n,
297 const char* reason, const char* file, int line) {
ncteisen9ffb1492017-11-10 14:00:49 -0800298 if (grpc_trace_fd_refcount.enabled()) {
Noah Eisen264879f2017-06-20 17:14:47 -0700299 gpr_log(GPR_DEBUG,
300 "FD %d %p unref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]",
301 fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst),
302 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
303 }
Craig Tiller50480b22017-04-17 16:34:52 +0000304#else
Craig Tillerbaa14a92017-11-03 09:09:36 -0700305static void unref_by(grpc_exec_ctx* exec_ctx, grpc_fd* fd, int n) {
Craig Tiller50480b22017-04-17 16:34:52 +0000306#endif
Noah Eisen264879f2017-06-20 17:14:47 -0700307 gpr_atm old = gpr_atm_full_fetch_add(&fd->refst, -n);
Craig Tiller50480b22017-04-17 16:34:52 +0000308 if (old == n) {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700309 GRPC_CLOSURE_SCHED(
310 exec_ctx,
311 GRPC_CLOSURE_CREATE(fd_destroy, fd, grpc_schedule_on_exec_ctx),
312 GRPC_ERROR_NONE);
Craig Tillerdd86b692017-04-06 10:43:11 -0700313 } else {
314 GPR_ASSERT(old > n);
315 }
316}
317
318static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
319
320static void fd_global_shutdown(void) {
321 gpr_mu_lock(&fd_freelist_mu);
322 gpr_mu_unlock(&fd_freelist_mu);
Noah Eisen882dfed2017-11-14 14:58:20 -0800323 while (fd_freelist != nullptr) {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700324 grpc_fd* fd = fd_freelist;
Craig Tillerdd86b692017-04-06 10:43:11 -0700325 fd_freelist = fd_freelist->freelist_next;
Craig Tillerdd86b692017-04-06 10:43:11 -0700326 gpr_free(fd);
327 }
328 gpr_mu_destroy(&fd_freelist_mu);
329}
330
Craig Tillerbaa14a92017-11-03 09:09:36 -0700331static grpc_fd* fd_create(int fd, const char* name) {
Noah Eisen882dfed2017-11-14 14:58:20 -0800332 grpc_fd* new_fd = nullptr;
Craig Tillerdd86b692017-04-06 10:43:11 -0700333
334 gpr_mu_lock(&fd_freelist_mu);
Noah Eisen882dfed2017-11-14 14:58:20 -0800335 if (fd_freelist != nullptr) {
Craig Tillerdd86b692017-04-06 10:43:11 -0700336 new_fd = fd_freelist;
337 fd_freelist = fd_freelist->freelist_next;
338 }
339 gpr_mu_unlock(&fd_freelist_mu);
340
Noah Eisen882dfed2017-11-14 14:58:20 -0800341 if (new_fd == nullptr) {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700342 new_fd = (grpc_fd*)gpr_malloc(sizeof(grpc_fd));
yang-g26521b32017-11-17 17:15:37 -0800343 new_fd->read_closure.Init();
344 new_fd->write_closure.Init();
Craig Tillerdd86b692017-04-06 10:43:11 -0700345 }
346
Craig Tiller249de2b2017-10-02 11:49:19 -0700347 gpr_mu_init(&new_fd->pollable_mu);
Craig Tiller4fd6a412017-10-04 22:41:13 +0000348 gpr_mu_init(&new_fd->orphan_mu);
Noah Eisen882dfed2017-11-14 14:58:20 -0800349 new_fd->pollable_obj = nullptr;
Craig Tillerdd86b692017-04-06 10:43:11 -0700350 gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
351 new_fd->fd = fd;
yang-ged49fe52017-11-20 13:49:54 -0800352 new_fd->read_closure->InitEvent();
353 new_fd->write_closure->InitEvent();
Craig Tillerdd86b692017-04-06 10:43:11 -0700354 gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
355
Noah Eisen882dfed2017-11-14 14:58:20 -0800356 new_fd->freelist_next = nullptr;
357 new_fd->on_done_closure = nullptr;
Craig Tillerdd86b692017-04-06 10:43:11 -0700358
Craig Tillerbaa14a92017-11-03 09:09:36 -0700359 char* fd_name;
Craig Tillerdd86b692017-04-06 10:43:11 -0700360 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
361 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
Noah Eisen264879f2017-06-20 17:14:47 -0700362#ifndef NDEBUG
ncteisen9ffb1492017-11-10 14:00:49 -0800363 if (grpc_trace_fd_refcount.enabled()) {
Noah Eisen264879f2017-06-20 17:14:47 -0700364 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, new_fd, fd_name);
365 }
Craig Tillerdd86b692017-04-06 10:43:11 -0700366#endif
367 gpr_free(fd_name);
368 return new_fd;
369}
370
Craig Tillerbaa14a92017-11-03 09:09:36 -0700371static int fd_wrapped_fd(grpc_fd* fd) {
Craig Tiller249de2b2017-10-02 11:49:19 -0700372 int ret_fd = fd->fd;
373 return (gpr_atm_acq_load(&fd->refst) & 1) ? ret_fd : -1;
Craig Tillerdd86b692017-04-06 10:43:11 -0700374}
375
Craig Tillerbaa14a92017-11-03 09:09:36 -0700376static void fd_orphan(grpc_exec_ctx* exec_ctx, grpc_fd* fd,
377 grpc_closure* on_done, int* release_fd,
378 bool already_closed, const char* reason) {
Yuchen Zengd40a7ae2017-07-12 15:59:56 -0700379 bool is_fd_closed = already_closed;
Craig Tillerdd86b692017-04-06 10:43:11 -0700380
Craig Tiller4fd6a412017-10-04 22:41:13 +0000381 gpr_mu_lock(&fd->orphan_mu);
382
Craig Tillerdd86b692017-04-06 10:43:11 -0700383 fd->on_done_closure = on_done;
384
385 /* If release_fd is not NULL, we should be relinquishing control of the file
386 descriptor fd->fd (but we still own the grpc_fd structure). */
Noah Eisen882dfed2017-11-14 14:58:20 -0800387 if (release_fd != nullptr) {
Craig Tillerdd86b692017-04-06 10:43:11 -0700388 *release_fd = fd->fd;
Yuchen Zengd40a7ae2017-07-12 15:59:56 -0700389 } else if (!is_fd_closed) {
Craig Tillerdd86b692017-04-06 10:43:11 -0700390 close(fd->fd);
391 is_fd_closed = true;
392 }
393
Craig Tillerf4360d72017-04-07 08:51:00 -0700394 if (!is_fd_closed) {
395 gpr_log(GPR_DEBUG, "TODO: handle fd removal?");
396 }
397
Craig Tillerdd86b692017-04-06 10:43:11 -0700398 /* Remove the active status but keep referenced. We want this grpc_fd struct
399 to be alive (and not added to freelist) until the end of this function */
400 REF_BY(fd, 1, reason);
401
Craig Tiller249de2b2017-10-02 11:49:19 -0700402 GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE);
Craig Tillerdd86b692017-04-06 10:43:11 -0700403
Craig Tiller4fd6a412017-10-04 22:41:13 +0000404 gpr_mu_unlock(&fd->orphan_mu);
405
Craig Tiller50480b22017-04-17 16:34:52 +0000406 UNREF_BY(exec_ctx, fd, 2, reason); /* Drop the reference */
Craig Tillerdd86b692017-04-06 10:43:11 -0700407}
408
Craig Tillerbaa14a92017-11-03 09:09:36 -0700409static grpc_pollset* fd_get_read_notifier_pollset(grpc_exec_ctx* exec_ctx,
410 grpc_fd* fd) {
Craig Tillerdd86b692017-04-06 10:43:11 -0700411 gpr_atm notifier = gpr_atm_acq_load(&fd->read_notifier_pollset);
Craig Tillerbaa14a92017-11-03 09:09:36 -0700412 return (grpc_pollset*)notifier;
Craig Tillerdd86b692017-04-06 10:43:11 -0700413}
414
Craig Tillerbaa14a92017-11-03 09:09:36 -0700415static bool fd_is_shutdown(grpc_fd* fd) {
Craig Tillerfbf61bb2017-11-08 11:50:14 -0800416 return fd->read_closure->IsShutdown();
Craig Tillerdd86b692017-04-06 10:43:11 -0700417}
418
419/* Might be called multiple times */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700420static void fd_shutdown(grpc_exec_ctx* exec_ctx, grpc_fd* fd, grpc_error* why) {
Craig Tillerfbf61bb2017-11-08 11:50:14 -0800421 if (fd->read_closure->SetShutdown(exec_ctx, GRPC_ERROR_REF(why))) {
Craig Tillerdd86b692017-04-06 10:43:11 -0700422 shutdown(fd->fd, SHUT_RDWR);
Craig Tillerfbf61bb2017-11-08 11:50:14 -0800423 fd->write_closure->SetShutdown(exec_ctx, GRPC_ERROR_REF(why));
Craig Tillerdd86b692017-04-06 10:43:11 -0700424 }
425 GRPC_ERROR_UNREF(why);
426}
427
Craig Tillerbaa14a92017-11-03 09:09:36 -0700428static void fd_notify_on_read(grpc_exec_ctx* exec_ctx, grpc_fd* fd,
429 grpc_closure* closure) {
Craig Tillerfbf61bb2017-11-08 11:50:14 -0800430 fd->read_closure->NotifyOn(exec_ctx, closure);
Craig Tillerdd86b692017-04-06 10:43:11 -0700431}
432
Craig Tillerbaa14a92017-11-03 09:09:36 -0700433static void fd_notify_on_write(grpc_exec_ctx* exec_ctx, grpc_fd* fd,
434 grpc_closure* closure) {
Craig Tillerfbf61bb2017-11-08 11:50:14 -0800435 fd->write_closure->NotifyOn(exec_ctx, closure);
Craig Tillerdd86b692017-04-06 10:43:11 -0700436}
437
Craig Tillerdd86b692017-04-06 10:43:11 -0700438/*******************************************************************************
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700439 * Pollable Definitions
440 */
441
Craig Tillerbaa14a92017-11-03 09:09:36 -0700442static grpc_error* pollable_create(pollable_type type, pollable** p) {
Noah Eisen882dfed2017-11-14 14:58:20 -0800443 *p = nullptr;
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700444
Craig Tiller249de2b2017-10-02 11:49:19 -0700445 int epfd = epoll_create1(EPOLL_CLOEXEC);
446 if (epfd == -1) {
447 return GRPC_OS_ERROR(errno, "epoll_create1");
Craig Tiller79d24fb2017-04-17 19:35:19 +0000448 }
Craig Tillerbaa14a92017-11-03 09:09:36 -0700449 *p = (pollable*)gpr_malloc(sizeof(**p));
450 grpc_error* err = grpc_wakeup_fd_init(&(*p)->wakeup);
Craig Tiller249de2b2017-10-02 11:49:19 -0700451 if (err != GRPC_ERROR_NONE) {
452 close(epfd);
Craig Tiller90a9d7d2017-10-04 21:24:03 +0000453 gpr_free(*p);
Noah Eisen882dfed2017-11-14 14:58:20 -0800454 *p = nullptr;
Craig Tiller249de2b2017-10-02 11:49:19 -0700455 return err;
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700456 }
Craig Tiller249de2b2017-10-02 11:49:19 -0700457 struct epoll_event ev;
458 ev.events = (uint32_t)(EPOLLIN | EPOLLET);
Craig Tillerbaa14a92017-11-03 09:09:36 -0700459 ev.data.ptr = (void*)(1 | (intptr_t) & (*p)->wakeup);
Craig Tiller90a9d7d2017-10-04 21:24:03 +0000460 if (epoll_ctl(epfd, EPOLL_CTL_ADD, (*p)->wakeup.read_fd, &ev) != 0) {
Craig Tiller249de2b2017-10-02 11:49:19 -0700461 err = GRPC_OS_ERROR(errno, "epoll_ctl");
462 close(epfd);
Craig Tiller90a9d7d2017-10-04 21:24:03 +0000463 grpc_wakeup_fd_destroy(&(*p)->wakeup);
Craig Tiller29a9c3a2017-10-04 15:15:04 -0700464 gpr_free(*p);
Noah Eisen882dfed2017-11-14 14:58:20 -0800465 *p = nullptr;
Craig Tiller249de2b2017-10-02 11:49:19 -0700466 return err;
467 }
468
Craig Tiller249de2b2017-10-02 11:49:19 -0700469 (*p)->type = type;
470 gpr_ref_init(&(*p)->refs, 1);
Craig Tiller65da0ef2017-10-02 15:35:59 -0700471 gpr_mu_init(&(*p)->mu);
Craig Tiller249de2b2017-10-02 11:49:19 -0700472 (*p)->epfd = epfd;
Noah Eisen882dfed2017-11-14 14:58:20 -0800473 (*p)->owner_fd = nullptr;
474 (*p)->pollset_set = nullptr;
Craig Tiller249de2b2017-10-02 11:49:19 -0700475 (*p)->next = (*p)->prev = *p;
Noah Eisen882dfed2017-11-14 14:58:20 -0800476 (*p)->root_worker = nullptr;
Craig Tillere29196b2017-10-18 19:52:33 +0000477 (*p)->event_cursor = 0;
478 (*p)->event_count = 0;
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700479 return GRPC_ERROR_NONE;
480}
481
Craig Tiller389ea902017-10-04 17:42:49 +0000482#ifdef NDEBUG
Craig Tillerbaa14a92017-11-03 09:09:36 -0700483static pollable* pollable_ref(pollable* p) {
Craig Tiller389ea902017-10-04 17:42:49 +0000484#else
Craig Tillerbaa14a92017-11-03 09:09:36 -0700485static pollable* pollable_ref(pollable* p, int line, const char* reason) {
ncteisen9ffb1492017-11-10 14:00:49 -0800486 if (grpc_trace_pollable_refcount.enabled()) {
Craig Tiller389ea902017-10-04 17:42:49 +0000487 int r = (int)gpr_atm_no_barrier_load(&p->refs.count);
Craig Tiller29a9c3a2017-10-04 15:15:04 -0700488 gpr_log(__FILE__, line, GPR_LOG_SEVERITY_DEBUG,
489 "POLLABLE:%p ref %d->%d %s", p, r, r + 1, reason);
Craig Tiller389ea902017-10-04 17:42:49 +0000490 }
491#endif
Craig Tiller249de2b2017-10-02 11:49:19 -0700492 gpr_ref(&p->refs);
493 return p;
494}
495
Craig Tiller389ea902017-10-04 17:42:49 +0000496#ifdef NDEBUG
Craig Tillerbaa14a92017-11-03 09:09:36 -0700497static void pollable_unref(pollable* p) {
Craig Tiller389ea902017-10-04 17:42:49 +0000498#else
Craig Tillerbaa14a92017-11-03 09:09:36 -0700499static void pollable_unref(pollable* p, int line, const char* reason) {
Noah Eisen882dfed2017-11-14 14:58:20 -0800500 if (p == nullptr) return;
ncteisen9ffb1492017-11-10 14:00:49 -0800501 if (grpc_trace_pollable_refcount.enabled()) {
Craig Tiller389ea902017-10-04 17:42:49 +0000502 int r = (int)gpr_atm_no_barrier_load(&p->refs.count);
Craig Tiller29a9c3a2017-10-04 15:15:04 -0700503 gpr_log(__FILE__, line, GPR_LOG_SEVERITY_DEBUG,
504 "POLLABLE:%p unref %d->%d %s", p, r, r - 1, reason);
Craig Tiller389ea902017-10-04 17:42:49 +0000505 }
506#endif
Noah Eisen882dfed2017-11-14 14:58:20 -0800507 if (p != nullptr && gpr_unref(&p->refs)) {
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700508 close(p->epfd);
509 grpc_wakeup_fd_destroy(&p->wakeup);
Craig Tiller389ea902017-10-04 17:42:49 +0000510 gpr_free(p);
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700511 }
512}
513
Craig Tillerbaa14a92017-11-03 09:09:36 -0700514static grpc_error* pollable_add_fd(pollable* p, grpc_fd* fd) {
515 grpc_error* error = GRPC_ERROR_NONE;
516 static const char* err_desc = "pollable_add_fd";
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700517 const int epfd = p->epfd;
518
ncteisen9ffb1492017-11-10 14:00:49 -0800519 if (grpc_polling_trace.enabled()) {
Craig Tiller86ffd652017-06-20 00:16:54 +0000520 gpr_log(GPR_DEBUG, "add fd %p (%d) to pollable %p", fd, fd->fd, p);
Craig Tiller6c8383a2017-05-05 16:54:42 +0000521 }
522
Yash Tibrewal533d1182017-09-18 10:48:22 -0700523 struct epoll_event ev_fd;
524 ev_fd.events = (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT | EPOLLEXCLUSIVE);
525 ev_fd.data.ptr = fd;
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700526 if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd->fd, &ev_fd) != 0) {
527 switch (errno) {
Craig Tiller61f96c12017-05-12 13:36:39 -0700528 case EEXIST:
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700529 break;
530 default:
531 append_error(&error, GRPC_OS_ERROR(errno, "epoll_ctl"), err_desc);
532 }
533 }
534
535 return error;
536}
537
538/*******************************************************************************
Craig Tillerdd86b692017-04-06 10:43:11 -0700539 * Pollset Definitions
540 */
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700541
Craig Tillerdd86b692017-04-06 10:43:11 -0700542GPR_TLS_DECL(g_current_thread_pollset);
543GPR_TLS_DECL(g_current_thread_worker);
544
Craig Tillerdd86b692017-04-06 10:43:11 -0700545/* Global state management */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700546static grpc_error* pollset_global_init(void) {
Craig Tillerdd86b692017-04-06 10:43:11 -0700547 gpr_tls_init(&g_current_thread_pollset);
548 gpr_tls_init(&g_current_thread_worker);
Craig Tiller249de2b2017-10-02 11:49:19 -0700549 return pollable_create(PO_EMPTY, &g_empty_pollable);
Craig Tillerdd86b692017-04-06 10:43:11 -0700550}
551
552static void pollset_global_shutdown(void) {
Craig Tiller389ea902017-10-04 17:42:49 +0000553 POLLABLE_UNREF(g_empty_pollable, "g_empty_pollable");
Craig Tillerdd86b692017-04-06 10:43:11 -0700554 gpr_tls_destroy(&g_current_thread_pollset);
555 gpr_tls_destroy(&g_current_thread_worker);
556}
557
Craig Tiller5d9db0c2017-10-18 09:43:55 -0700558/* pollset->mu must be held while calling this function */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700559static void pollset_maybe_finish_shutdown(grpc_exec_ctx* exec_ctx,
560 grpc_pollset* pollset) {
ncteisen9ffb1492017-11-10 14:00:49 -0800561 if (grpc_polling_trace.enabled()) {
Craig Tillerb653dff2017-10-16 14:38:36 -0700562 gpr_log(GPR_DEBUG,
563 "PS:%p (pollable:%p) maybe_finish_shutdown sc=%p (target:!NULL) "
564 "rw=%p (target:NULL) cpsc=%d (target:0)",
565 pollset, pollset->active_pollable, pollset->shutdown_closure,
566 pollset->root_worker, pollset->containing_pollset_set_count);
567 }
Noah Eisen882dfed2017-11-14 14:58:20 -0800568 if (pollset->shutdown_closure != nullptr && pollset->root_worker == nullptr &&
Craig Tiller1e8c2ab2017-10-12 15:50:13 -0700569 pollset->containing_pollset_set_count == 0) {
Craig Tillerca669b02017-06-09 12:41:51 -0700570 GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE);
Noah Eisen882dfed2017-11-14 14:58:20 -0800571 pollset->shutdown_closure = nullptr;
Craig Tillere16722c2017-05-17 21:50:37 +0000572 }
573}
574
Craig Tiller1e8c2ab2017-10-12 15:50:13 -0700575/* pollset->mu must be held before calling this function,
576 * pollset->active_pollable->mu & specific_worker->pollable_obj->mu must not be
577 * held */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700578static grpc_error* kick_one_worker(grpc_exec_ctx* exec_ctx,
579 grpc_pollset_worker* specific_worker) {
580 pollable* p = specific_worker->pollable_obj;
Craig Tillerad059f72017-10-12 22:47:05 +0000581 grpc_core::mu_guard lock(&p->mu);
Noah Eisen882dfed2017-11-14 14:58:20 -0800582 GPR_ASSERT(specific_worker != nullptr);
Craig Tiller23adbd52017-10-02 15:29:18 -0700583 if (specific_worker->kicked) {
ncteisen9ffb1492017-11-10 14:00:49 -0800584 if (grpc_polling_trace.enabled()) {
Craig Tillerd37d1422017-04-18 19:41:07 +0000585 gpr_log(GPR_DEBUG, "PS:%p kicked_specific_but_already_kicked", p);
586 }
Craig Tiller97633742017-10-15 15:34:40 -0700587 GRPC_STATS_INC_POLLSET_KICKED_AGAIN(exec_ctx);
Craig Tillerd37d1422017-04-18 19:41:07 +0000588 return GRPC_ERROR_NONE;
Craig Tillerad059f72017-10-12 22:47:05 +0000589 }
Craig Tiller1e8c2ab2017-10-12 15:50:13 -0700590 if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) {
ncteisen9ffb1492017-11-10 14:00:49 -0800591 if (grpc_polling_trace.enabled()) {
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700592 gpr_log(GPR_DEBUG, "PS:%p kicked_specific_but_awake", p);
Craig Tiller1ad94772017-04-11 16:15:19 +0000593 }
Craig Tiller97633742017-10-15 15:34:40 -0700594 GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD(exec_ctx);
Craig Tillerd37d1422017-04-18 19:41:07 +0000595 specific_worker->kicked = true;
Craig Tillere24b24d2017-04-06 16:05:45 -0700596 return GRPC_ERROR_NONE;
Craig Tiller97633742017-10-15 15:34:40 -0700597 }
598 if (specific_worker == p->root_worker) {
599 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD(exec_ctx);
ncteisen9ffb1492017-11-10 14:00:49 -0800600 if (grpc_polling_trace.enabled()) {
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700601 gpr_log(GPR_DEBUG, "PS:%p kicked_specific_via_wakeup_fd", p);
Craig Tiller1ad94772017-04-11 16:15:19 +0000602 }
Craig Tillerd37d1422017-04-18 19:41:07 +0000603 specific_worker->kicked = true;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700604 grpc_error* error = grpc_wakeup_fd_wakeup(&p->wakeup);
Craig Tillerad059f72017-10-12 22:47:05 +0000605 return error;
606 }
Craig Tiller97633742017-10-15 15:34:40 -0700607 if (specific_worker->initialized_cv) {
608 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV(exec_ctx);
ncteisen9ffb1492017-11-10 14:00:49 -0800609 if (grpc_polling_trace.enabled()) {
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700610 gpr_log(GPR_DEBUG, "PS:%p kicked_specific_via_cv", p);
Craig Tiller1ad94772017-04-11 16:15:19 +0000611 }
Craig Tillere3a69332017-04-18 16:45:40 +0000612 specific_worker->kicked = true;
Craig Tillere24b24d2017-04-06 16:05:45 -0700613 gpr_cv_signal(&specific_worker->cv);
614 return GRPC_ERROR_NONE;
615 }
Craig Tiller1e8c2ab2017-10-12 15:50:13 -0700616 // we can get here during end_worker after removing specific_worker from the
617 // pollable list but before removing it from the pollset list
Craig Tillerad059f72017-10-12 22:47:05 +0000618 return GRPC_ERROR_NONE;
Craig Tiller26017de2017-04-15 00:07:25 +0000619}
620
Craig Tillerbaa14a92017-11-03 09:09:36 -0700621static grpc_error* pollset_kick(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
622 grpc_pollset_worker* specific_worker) {
ncteisen09615092017-10-31 17:48:01 -0700623 GRPC_STATS_INC_POLLSET_KICK(exec_ctx);
ncteisen9ffb1492017-11-10 14:00:49 -0800624 if (grpc_polling_trace.enabled()) {
Craig Tillerdd86b692017-04-06 10:43:11 -0700625 gpr_log(GPR_DEBUG,
Craig Tiller3d073c22017-10-04 22:10:58 +0000626 "PS:%p kick %p tls_pollset=%p tls_worker=%p pollset.root_worker=%p",
Craig Tiller29a9c3a2017-10-04 15:15:04 -0700627 pollset, specific_worker,
Craig Tillerbaa14a92017-11-03 09:09:36 -0700628 (void*)gpr_tls_get(&g_current_thread_pollset),
629 (void*)gpr_tls_get(&g_current_thread_worker), pollset->root_worker);
Craig Tiller26017de2017-04-15 00:07:25 +0000630 }
Noah Eisen882dfed2017-11-14 14:58:20 -0800631 if (specific_worker == nullptr) {
Craig Tillerdd86b692017-04-06 10:43:11 -0700632 if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) {
Noah Eisen882dfed2017-11-14 14:58:20 -0800633 if (pollset->root_worker == nullptr) {
ncteisen9ffb1492017-11-10 14:00:49 -0800634 if (grpc_polling_trace.enabled()) {
Craig Tiller3d073c22017-10-04 22:10:58 +0000635 gpr_log(GPR_DEBUG, "PS:%p kicked_any_without_poller", pollset);
Craig Tillerdd86b692017-04-06 10:43:11 -0700636 }
Craig Tiller5d9db0c2017-10-18 09:43:55 -0700637 GRPC_STATS_INC_POLLSET_KICKED_WITHOUT_POLLER(exec_ctx);
Craig Tillerdd86b692017-04-06 10:43:11 -0700638 pollset->kicked_without_poller = true;
639 return GRPC_ERROR_NONE;
640 } else {
Craig Tiller7c8e59d2017-10-18 11:57:25 -0700641 // We've been asked to kick a poller, but we haven't been told which one
642 // ... any will do
643 // We look at the pollset worker list because:
644 // 1. the pollable list may include workers from other pollers, so we'd
645 // need to do an O(N) search
646 // 2. we'd additionally need to take the pollable lock, which we've so
647 // far avoided
648 // Now, we would prefer to wake a poller in cv_wait, and not in
649 // epoll_wait (since the latter would imply the need to do an additional
650 // wakeup)
651 // We know that if a worker is at the root of a pollable, it's (likely)
652 // also the root of a pollset, and we know that if a worker is NOT at
653 // the root of a pollset, it's (likely) not at the root of a pollable,
654 // so we take our chances and choose the SECOND worker enqueued against
655 // the pollset as a worker that's likely to be in cv_wait
Craig Tiller5d9db0c2017-10-18 09:43:55 -0700656 return kick_one_worker(
657 exec_ctx, pollset->root_worker->links[PWLINK_POLLSET].next);
Craig Tillerdd86b692017-04-06 10:43:11 -0700658 }
659 } else {
ncteisen9ffb1492017-11-10 14:00:49 -0800660 if (grpc_polling_trace.enabled()) {
Craig Tiller3d073c22017-10-04 22:10:58 +0000661 gpr_log(GPR_DEBUG, "PS:%p kicked_any_but_awake", pollset);
Craig Tillerdd86b692017-04-06 10:43:11 -0700662 }
Craig Tiller5d9db0c2017-10-18 09:43:55 -0700663 GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD(exec_ctx);
Craig Tillerdd86b692017-04-06 10:43:11 -0700664 return GRPC_ERROR_NONE;
665 }
Craig Tillerdd86b692017-04-06 10:43:11 -0700666 } else {
Craig Tiller5d9db0c2017-10-18 09:43:55 -0700667 return kick_one_worker(exec_ctx, specific_worker);
Craig Tiller26017de2017-04-15 00:07:25 +0000668 }
669}
670
Craig Tillerbaa14a92017-11-03 09:09:36 -0700671static grpc_error* pollset_kick_all(grpc_exec_ctx* exec_ctx,
672 grpc_pollset* pollset) {
673 grpc_error* error = GRPC_ERROR_NONE;
674 const char* err_desc = "pollset_kick_all";
675 grpc_pollset_worker* w = pollset->root_worker;
Noah Eisen882dfed2017-11-14 14:58:20 -0800676 if (w != nullptr) {
Craig Tiller29a9c3a2017-10-04 15:15:04 -0700677 do {
ncteisen09615092017-10-31 17:48:01 -0700678 GRPC_STATS_INC_POLLSET_KICK(exec_ctx);
Craig Tiller5d9db0c2017-10-18 09:43:55 -0700679 append_error(&error, kick_one_worker(exec_ctx, w), err_desc);
Craig Tiller29a9c3a2017-10-04 15:15:04 -0700680 w = w->links[PWLINK_POLLSET].next;
681 } while (w != pollset->root_worker);
Craig Tiller26017de2017-04-15 00:07:25 +0000682 }
683 return error;
Craig Tillerdd86b692017-04-06 10:43:11 -0700684}
685
Craig Tillerbaa14a92017-11-03 09:09:36 -0700686static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
Craig Tiller249de2b2017-10-02 11:49:19 -0700687 gpr_mu_init(&pollset->mu);
Craig Tiller389ea902017-10-04 17:42:49 +0000688 pollset->active_pollable = POLLABLE_REF(g_empty_pollable, "pollset");
Craig Tiller249de2b2017-10-02 11:49:19 -0700689 *mu = &pollset->mu;
Craig Tiller9f012512017-04-13 15:37:14 -0700690}
691
Craig Tillerbaa14a92017-11-03 09:09:36 -0700692static int poll_deadline_to_millis_timeout(grpc_exec_ctx* exec_ctx,
Craig Tiller20397792017-07-18 11:35:27 -0700693 grpc_millis millis) {
694 if (millis == GRPC_MILLIS_INF_FUTURE) return -1;
695 grpc_millis delta = millis - grpc_exec_ctx_now(exec_ctx);
696 if (delta > INT_MAX)
697 return INT_MAX;
698 else if (delta < 0)
Craig Tillerdd86b692017-04-06 10:43:11 -0700699 return 0;
Craig Tiller20397792017-07-18 11:35:27 -0700700 else
701 return (int)delta;
Craig Tillerdd86b692017-04-06 10:43:11 -0700702}
703
Craig Tillerbaa14a92017-11-03 09:09:36 -0700704static void fd_become_readable(grpc_exec_ctx* exec_ctx, grpc_fd* fd,
705 grpc_pollset* notifier) {
Craig Tillerfbf61bb2017-11-08 11:50:14 -0800706 fd->read_closure->SetReady(exec_ctx);
Craig Tillerdd86b692017-04-06 10:43:11 -0700707
708 /* Note, it is possible that fd_become_readable might be called twice with
709 different 'notifier's when an fd becomes readable and it is in two epoll
710 sets (This can happen briefly during polling island merges). In such cases
711 it does not really matter which notifer is set as the read_notifier_pollset
712 (They would both point to the same polling island anyway) */
713 /* Use release store to match with acquire load in fd_get_read_notifier */
714 gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
715}
716
Craig Tillerbaa14a92017-11-03 09:09:36 -0700717static void fd_become_writable(grpc_exec_ctx* exec_ctx, grpc_fd* fd) {
Craig Tillerfbf61bb2017-11-08 11:50:14 -0800718 fd->write_closure->SetReady(exec_ctx);
Craig Tillerdd86b692017-04-06 10:43:11 -0700719}
720
Craig Tillerbaa14a92017-11-03 09:09:36 -0700721static grpc_error* fd_get_or_become_pollable(grpc_fd* fd, pollable** p) {
Craig Tiller249de2b2017-10-02 11:49:19 -0700722 gpr_mu_lock(&fd->pollable_mu);
Craig Tillerbaa14a92017-11-03 09:09:36 -0700723 grpc_error* error = GRPC_ERROR_NONE;
724 static const char* err_desc = "fd_get_or_become_pollable";
Noah Eisen882dfed2017-11-14 14:58:20 -0800725 if (fd->pollable_obj == nullptr) {
Craig Tiller249de2b2017-10-02 11:49:19 -0700726 if (append_error(&error, pollable_create(PO_FD, &fd->pollable_obj),
727 err_desc)) {
Craig Tiller29a9c3a2017-10-04 15:15:04 -0700728 fd->pollable_obj->owner_fd = fd;
Craig Tiller249de2b2017-10-02 11:49:19 -0700729 if (!append_error(&error, pollable_add_fd(fd->pollable_obj, fd),
730 err_desc)) {
Craig Tiller389ea902017-10-04 17:42:49 +0000731 POLLABLE_UNREF(fd->pollable_obj, "fd_pollable");
Noah Eisen882dfed2017-11-14 14:58:20 -0800732 fd->pollable_obj = nullptr;
Craig Tiller249de2b2017-10-02 11:49:19 -0700733 }
734 }
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700735 }
Craig Tiller249de2b2017-10-02 11:49:19 -0700736 if (error == GRPC_ERROR_NONE) {
Noah Eisen882dfed2017-11-14 14:58:20 -0800737 GPR_ASSERT(fd->pollable_obj != nullptr);
Craig Tiller389ea902017-10-04 17:42:49 +0000738 *p = POLLABLE_REF(fd->pollable_obj, "pollset");
Craig Tiller249de2b2017-10-02 11:49:19 -0700739 } else {
Noah Eisen882dfed2017-11-14 14:58:20 -0800740 GPR_ASSERT(fd->pollable_obj == nullptr);
741 *p = nullptr;
Craig Tiller249de2b2017-10-02 11:49:19 -0700742 }
743 gpr_mu_unlock(&fd->pollable_mu);
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700744 return error;
745}
746
Craig Tillerdd86b692017-04-06 10:43:11 -0700747/* pollset->po.mu lock must be held by the caller before calling this */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700748static void pollset_shutdown(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
749 grpc_closure* closure) {
Noah Eisen882dfed2017-11-14 14:58:20 -0800750 GPR_ASSERT(pollset->shutdown_closure == nullptr);
Craig Tillere24b24d2017-04-06 16:05:45 -0700751 pollset->shutdown_closure = closure;
Craig Tiller23adbd52017-10-02 15:29:18 -0700752 GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(exec_ctx, pollset));
Craig Tillerd9cd8f02017-04-07 16:26:02 -0700753 pollset_maybe_finish_shutdown(exec_ctx, pollset);
Craig Tillere24b24d2017-04-06 16:05:45 -0700754}
Craig Tillerdd86b692017-04-06 10:43:11 -0700755
Craig Tillerbaa14a92017-11-03 09:09:36 -0700756static grpc_error* pollable_process_events(grpc_exec_ctx* exec_ctx,
757 grpc_pollset* pollset,
758 pollable* pollable_obj, bool drain) {
759 static const char* err_desc = "pollset_process_events";
760 grpc_error* error = GRPC_ERROR_NONE;
Craig Tillerca4fc662017-06-09 11:13:56 -0700761 for (int i = 0; (drain || i < MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL) &&
Craig Tiller8223f412017-10-17 22:05:55 +0000762 pollable_obj->event_cursor != pollable_obj->event_count;
Craig Tillerca4fc662017-06-09 11:13:56 -0700763 i++) {
Craig Tiller8223f412017-10-17 22:05:55 +0000764 int n = pollable_obj->event_cursor++;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700765 struct epoll_event* ev = &pollable_obj->events[n];
766 void* data_ptr = ev->data.ptr;
Craig Tiller4f07ea82017-06-08 23:56:06 +0000767 if (1 & (intptr_t)data_ptr) {
ncteisen9ffb1492017-11-10 14:00:49 -0800768 if (grpc_polling_trace.enabled()) {
Craig Tiller4f07ea82017-06-08 23:56:06 +0000769 gpr_log(GPR_DEBUG, "PS:%p got pollset_wakeup %p", pollset, data_ptr);
Craig Tillercd0354b2017-04-10 16:19:18 -0700770 }
Yash Tibrewal52778c42017-09-11 15:00:11 -0700771 append_error(&error,
772 grpc_wakeup_fd_consume_wakeup(
Craig Tillerbaa14a92017-11-03 09:09:36 -0700773 (grpc_wakeup_fd*)((~(intptr_t)1) & (intptr_t)data_ptr)),
Craig Tiller9bedddd2017-06-08 17:05:00 -0700774 err_desc);
Craig Tillere24b24d2017-04-06 16:05:45 -0700775 } else {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700776 grpc_fd* fd = (grpc_fd*)data_ptr;
Craig Tiller4f07ea82017-06-08 23:56:06 +0000777 bool cancel = (ev->events & (EPOLLERR | EPOLLHUP)) != 0;
778 bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0;
779 bool write_ev = (ev->events & EPOLLOUT) != 0;
ncteisen9ffb1492017-11-10 14:00:49 -0800780 if (grpc_polling_trace.enabled()) {
Craig Tillere3a69332017-04-18 16:45:40 +0000781 gpr_log(GPR_DEBUG,
Craig Tiller4f07ea82017-06-08 23:56:06 +0000782 "PS:%p got fd %p: cancel=%d read=%d "
Craig Tillere3a69332017-04-18 16:45:40 +0000783 "write=%d",
Craig Tiller4f07ea82017-06-08 23:56:06 +0000784 pollset, fd, cancel, read_ev, write_ev);
Craig Tillercd0354b2017-04-10 16:19:18 -0700785 }
Craig Tiller61f96c12017-05-12 13:36:39 -0700786 if (read_ev || cancel) {
787 fd_become_readable(exec_ctx, fd, pollset);
788 }
789 if (write_ev || cancel) {
790 fd_become_writable(exec_ctx, fd);
Craig Tillere24b24d2017-04-06 16:05:45 -0700791 }
792 }
793 }
794
795 return error;
796}
797
Craig Tiller4f07ea82017-06-08 23:56:06 +0000798/* pollset_shutdown is guaranteed to be called before pollset_destroy. */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700799static void pollset_destroy(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset) {
Craig Tiller389ea902017-10-04 17:42:49 +0000800 POLLABLE_UNREF(pollset->active_pollable, "pollset");
Noah Eisen882dfed2017-11-14 14:58:20 -0800801 pollset->active_pollable = nullptr;
Craig Tiller4f07ea82017-06-08 23:56:06 +0000802}
803
Craig Tillerbaa14a92017-11-03 09:09:36 -0700804static grpc_error* pollable_epoll(grpc_exec_ctx* exec_ctx, pollable* p,
Craig Tiller5d9db0c2017-10-18 09:43:55 -0700805 grpc_millis deadline) {
Craig Tiller20397792017-07-18 11:35:27 -0700806 int timeout = poll_deadline_to_millis_timeout(exec_ctx, deadline);
Craig Tiller4f07ea82017-06-08 23:56:06 +0000807
ncteisen9ffb1492017-11-10 14:00:49 -0800808 if (grpc_polling_trace.enabled()) {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700809 char* desc = pollable_desc(p);
Craig Tiller5d9db0c2017-10-18 09:43:55 -0700810 gpr_log(GPR_DEBUG, "POLLABLE:%p[%s] poll for %dms", p, desc, timeout);
Craig Tiller86ffd652017-06-20 00:16:54 +0000811 gpr_free(desc);
Craig Tiller4f07ea82017-06-08 23:56:06 +0000812 }
813
814 if (timeout != 0) {
815 GRPC_SCHEDULING_START_BLOCKING_REGION;
816 }
817 int r;
818 do {
Craig Tillerb4bb1cd2017-07-20 14:18:17 -0700819 GRPC_STATS_INC_SYSCALL_POLL(exec_ctx);
Craig Tiller8223f412017-10-17 22:05:55 +0000820 r = epoll_wait(p->epfd, p->events, MAX_EPOLL_EVENTS, timeout);
Craig Tiller4f07ea82017-06-08 23:56:06 +0000821 } while (r < 0 && errno == EINTR);
822 if (timeout != 0) {
Craig Tiller781e91a2017-07-17 16:21:00 -0700823 GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(exec_ctx);
Craig Tiller4f07ea82017-06-08 23:56:06 +0000824 }
825
826 if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
827
ncteisen9ffb1492017-11-10 14:00:49 -0800828 if (grpc_polling_trace.enabled()) {
Craig Tiller5d9db0c2017-10-18 09:43:55 -0700829 gpr_log(GPR_DEBUG, "POLLABLE:%p got %d events", p, r);
Craig Tiller4f07ea82017-06-08 23:56:06 +0000830 }
831
Craig Tiller8223f412017-10-17 22:05:55 +0000832 p->event_cursor = 0;
833 p->event_count = r;
Craig Tiller4f07ea82017-06-08 23:56:06 +0000834
835 return GRPC_ERROR_NONE;
836}
837
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700838/* Return true if first in list */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700839static bool worker_insert(grpc_pollset_worker** root_worker,
840 grpc_pollset_worker* worker, pwlinks link) {
Noah Eisen882dfed2017-11-14 14:58:20 -0800841 if (*root_worker == nullptr) {
Craig Tiller23adbd52017-10-02 15:29:18 -0700842 *root_worker = worker;
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700843 worker->links[link].next = worker->links[link].prev = worker;
844 return true;
845 } else {
Craig Tiller23adbd52017-10-02 15:29:18 -0700846 worker->links[link].next = *root_worker;
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700847 worker->links[link].prev = worker->links[link].next->links[link].prev;
848 worker->links[link].next->links[link].prev = worker;
849 worker->links[link].prev->links[link].next = worker;
850 return false;
851 }
852}
853
Craig Tiller249de2b2017-10-02 11:49:19 -0700854/* returns the new root IFF the root changed */
Craig Tiller23adbd52017-10-02 15:29:18 -0700855typedef enum { WRR_NEW_ROOT, WRR_EMPTIED, WRR_REMOVED } worker_remove_result;
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700856
Craig Tillerbaa14a92017-11-03 09:09:36 -0700857static worker_remove_result worker_remove(grpc_pollset_worker** root_worker,
858 grpc_pollset_worker* worker,
Craig Tiller23adbd52017-10-02 15:29:18 -0700859 pwlinks link) {
860 if (worker == *root_worker) {
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700861 if (worker == worker->links[link].next) {
Noah Eisen882dfed2017-11-14 14:58:20 -0800862 *root_worker = nullptr;
Craig Tiller23adbd52017-10-02 15:29:18 -0700863 return WRR_EMPTIED;
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700864 } else {
Craig Tiller23adbd52017-10-02 15:29:18 -0700865 *root_worker = worker->links[link].next;
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700866 worker->links[link].prev->links[link].next = worker->links[link].next;
867 worker->links[link].next->links[link].prev = worker->links[link].prev;
Craig Tiller23adbd52017-10-02 15:29:18 -0700868 return WRR_NEW_ROOT;
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700869 }
870 } else {
871 worker->links[link].prev->links[link].next = worker->links[link].next;
872 worker->links[link].next->links[link].prev = worker->links[link].prev;
Craig Tiller23adbd52017-10-02 15:29:18 -0700873 return WRR_REMOVED;
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700874 }
875}
876
Craig Tillere24b24d2017-04-06 16:05:45 -0700877/* Return true if this thread should poll */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700878static bool begin_worker(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
879 grpc_pollset_worker* worker,
880 grpc_pollset_worker** worker_hdl,
Craig Tiller20397792017-07-18 11:35:27 -0700881 grpc_millis deadline) {
Craig Tiller8223f412017-10-17 22:05:55 +0000882 bool do_poll = (pollset->shutdown_closure == nullptr);
Noah Eisen882dfed2017-11-14 14:58:20 -0800883 if (worker_hdl != nullptr) *worker_hdl = worker;
Craig Tiller9f012512017-04-13 15:37:14 -0700884 worker->initialized_cv = false;
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700885 worker->kicked = false;
886 worker->pollset = pollset;
Craig Tiller29a9c3a2017-10-04 15:15:04 -0700887 worker->pollable_obj =
888 POLLABLE_REF(pollset->active_pollable, "pollset_worker");
Craig Tiller23adbd52017-10-02 15:29:18 -0700889 worker_insert(&pollset->root_worker, worker, PWLINK_POLLSET);
Craig Tiller249de2b2017-10-02 11:49:19 -0700890 gpr_mu_lock(&worker->pollable_obj->mu);
Craig Tiller23adbd52017-10-02 15:29:18 -0700891 if (!worker_insert(&worker->pollable_obj->root_worker, worker,
892 PWLINK_POLLABLE)) {
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700893 worker->initialized_cv = true;
894 gpr_cv_init(&worker->cv);
Craig Tillerad059f72017-10-12 22:47:05 +0000895 gpr_mu_unlock(&pollset->mu);
ncteisen9ffb1492017-11-10 14:00:49 -0800896 if (grpc_polling_trace.enabled() &&
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700897 worker->pollable_obj->root_worker != worker) {
Craig Tillere3a69332017-04-18 16:45:40 +0000898 gpr_log(GPR_DEBUG, "PS:%p wait %p w=%p for %dms", pollset,
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700899 worker->pollable_obj, worker,
Craig Tiller20397792017-07-18 11:35:27 -0700900 poll_deadline_to_millis_timeout(exec_ctx, deadline));
Craig Tillere3a69332017-04-18 16:45:40 +0000901 }
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700902 while (do_poll && worker->pollable_obj->root_worker != worker) {
Craig Tiller6b9a99c2017-10-12 00:27:00 +0000903 if (gpr_cv_wait(&worker->cv, &worker->pollable_obj->mu,
Craig Tiller20397792017-07-18 11:35:27 -0700904 grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME))) {
ncteisen9ffb1492017-11-10 14:00:49 -0800905 if (grpc_polling_trace.enabled()) {
Craig Tillere3a69332017-04-18 16:45:40 +0000906 gpr_log(GPR_DEBUG, "PS:%p timeout_wait %p w=%p", pollset,
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700907 worker->pollable_obj, worker);
Craig Tillere3a69332017-04-18 16:45:40 +0000908 }
909 do_poll = false;
910 } else if (worker->kicked) {
ncteisen9ffb1492017-11-10 14:00:49 -0800911 if (grpc_polling_trace.enabled()) {
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700912 gpr_log(GPR_DEBUG, "PS:%p wakeup %p w=%p", pollset,
913 worker->pollable_obj, worker);
Craig Tillere3a69332017-04-18 16:45:40 +0000914 }
915 do_poll = false;
ncteisen9ffb1492017-11-10 14:00:49 -0800916 } else if (grpc_polling_trace.enabled() &&
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700917 worker->pollable_obj->root_worker != worker) {
Craig Tillere3a69332017-04-18 16:45:40 +0000918 gpr_log(GPR_DEBUG, "PS:%p spurious_wakeup %p w=%p", pollset,
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700919 worker->pollable_obj, worker);
Craig Tillere24b24d2017-04-06 16:05:45 -0700920 }
921 }
Craig Tiller20397792017-07-18 11:35:27 -0700922 grpc_exec_ctx_invalidate_now(exec_ctx);
Craig Tillerad059f72017-10-12 22:47:05 +0000923 } else {
924 gpr_mu_unlock(&pollset->mu);
Craig Tillere24b24d2017-04-06 16:05:45 -0700925 }
Craig Tiller249de2b2017-10-02 11:49:19 -0700926 gpr_mu_unlock(&worker->pollable_obj->mu);
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700927
Craig Tillerad059f72017-10-12 22:47:05 +0000928 return do_poll;
Craig Tillere24b24d2017-04-06 16:05:45 -0700929}
930
Craig Tillerbaa14a92017-11-03 09:09:36 -0700931static void end_worker(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
932 grpc_pollset_worker* worker,
933 grpc_pollset_worker** worker_hdl) {
Craig Tiller8223f412017-10-17 22:05:55 +0000934 gpr_mu_lock(&pollset->mu);
Craig Tiller249de2b2017-10-02 11:49:19 -0700935 gpr_mu_lock(&worker->pollable_obj->mu);
Craig Tiller8223f412017-10-17 22:05:55 +0000936 switch (worker_remove(&worker->pollable_obj->root_worker, worker,
Craig Tiller96e24702017-10-17 15:06:46 -0700937 PWLINK_POLLABLE)) {
938 case WRR_NEW_ROOT: {
Craig Tiller8223f412017-10-17 22:05:55 +0000939 // wakeup new poller
Craig Tillerbaa14a92017-11-03 09:09:36 -0700940 grpc_pollset_worker* new_root = worker->pollable_obj->root_worker;
Craig Tiller8223f412017-10-17 22:05:55 +0000941 GPR_ASSERT(new_root->initialized_cv);
942 gpr_cv_signal(&new_root->cv);
943 break;
944 }
Craig Tiller96e24702017-10-17 15:06:46 -0700945 case WRR_EMPTIED:
946 if (pollset->active_pollable != worker->pollable_obj) {
947 // pollable no longer being polled: flush events
948 pollable_process_events(exec_ctx, pollset, worker->pollable_obj, true);
949 }
950 break;
951 case WRR_REMOVED:
952 break;
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700953 }
Craig Tillerad059f72017-10-12 22:47:05 +0000954 gpr_mu_unlock(&worker->pollable_obj->mu);
955 POLLABLE_UNREF(worker->pollable_obj, "pollset_worker");
Craig Tiller1e8c2ab2017-10-12 15:50:13 -0700956 if (worker_remove(&pollset->root_worker, worker, PWLINK_POLLSET) ==
957 WRR_EMPTIED) {
Craig Tillerad059f72017-10-12 22:47:05 +0000958 pollset_maybe_finish_shutdown(exec_ctx, pollset);
Craig Tillerd1d7fdd2017-04-14 16:16:24 -0700959 }
960 if (worker->initialized_cv) {
961 gpr_cv_destroy(&worker->cv);
962 }
Craig Tillere24b24d2017-04-06 16:05:45 -0700963}
964
Craig Tiller5d9db0c2017-10-18 09:43:55 -0700965#ifndef NDEBUG
Craig Tillerad059f72017-10-12 22:47:05 +0000966static long gettid(void) { return syscall(__NR_gettid); }
Craig Tiller5d9db0c2017-10-18 09:43:55 -0700967#endif
Craig Tillerad059f72017-10-12 22:47:05 +0000968
Craig Tiller5d9db0c2017-10-18 09:43:55 -0700969/* pollset->mu lock must be held by the caller before calling this.
Craig Tiller8fc1ca12017-04-07 13:01:48 -0700970 The function pollset_work() may temporarily release the lock (pollset->po.mu)
Craig Tillerdd86b692017-04-06 10:43:11 -0700971 during the course of its execution but it will always re-acquire the lock and
972 ensure that it is held by the time the function returns */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700973static grpc_error* pollset_work(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
974 grpc_pollset_worker** worker_hdl,
Craig Tiller20397792017-07-18 11:35:27 -0700975 grpc_millis deadline) {
Craig Tiller90a9d7d2017-10-04 21:24:03 +0000976#ifdef GRPC_EPOLLEX_CREATE_WORKERS_ON_HEAP
Craig Tillerbaa14a92017-11-03 09:09:36 -0700977 grpc_pollset_worker* worker =
978 (grpc_pollset_worker*)gpr_malloc(sizeof(*worker));
Craig Tiller90a9d7d2017-10-04 21:24:03 +0000979#define WORKER_PTR (worker)
980#else
Craig Tillere24b24d2017-04-06 16:05:45 -0700981 grpc_pollset_worker worker;
Craig Tiller90a9d7d2017-10-04 21:24:03 +0000982#define WORKER_PTR (&worker)
983#endif
Craig Tiller5d9db0c2017-10-18 09:43:55 -0700984#ifndef NDEBUG
Craig Tillerad059f72017-10-12 22:47:05 +0000985 WORKER_PTR->originator = gettid();
Craig Tiller5d9db0c2017-10-18 09:43:55 -0700986#endif
ncteisen9ffb1492017-11-10 14:00:49 -0800987 if (grpc_polling_trace.enabled()) {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700988 gpr_log(GPR_DEBUG,
989 "PS:%p work hdl=%p worker=%p now=%" PRIdPTR " deadline=%" PRIdPTR
990 " kwp=%d pollable=%p",
Craig Tillerbfec10f2017-10-11 17:31:06 -0700991 pollset, worker_hdl, WORKER_PTR, grpc_exec_ctx_now(exec_ctx),
992 deadline, pollset->kicked_without_poller, pollset->active_pollable);
Craig Tillercd0354b2017-04-10 16:19:18 -0700993 }
Craig Tillerbaa14a92017-11-03 09:09:36 -0700994 static const char* err_desc = "pollset_work";
995 grpc_error* error = GRPC_ERROR_NONE;
Craig Tillerbb93af62017-04-07 23:49:00 +0000996 if (pollset->kicked_without_poller) {
997 pollset->kicked_without_poller = false;
Craig Tiller90a9d7d2017-10-04 21:24:03 +0000998 } else {
Craig Tiller6b9a99c2017-10-12 00:27:00 +0000999 if (begin_worker(exec_ctx, pollset, WORKER_PTR, worker_hdl, deadline)) {
Craig Tiller29a9c3a2017-10-04 15:15:04 -07001000 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
1001 gpr_tls_set(&g_current_thread_worker, (intptr_t)WORKER_PTR);
Craig Tiller96e24702017-10-17 15:06:46 -07001002 if (WORKER_PTR->pollable_obj->event_cursor ==
1003 WORKER_PTR->pollable_obj->event_count) {
Craig Tillerbaa14a92017-11-03 09:09:36 -07001004 append_error(
1005 &error,
1006 pollable_epoll(exec_ctx, WORKER_PTR->pollable_obj, deadline),
1007 err_desc);
Craig Tiller29a9c3a2017-10-04 15:15:04 -07001008 }
Craig Tiller8223f412017-10-17 22:05:55 +00001009 append_error(&error,
Craig Tiller96e24702017-10-17 15:06:46 -07001010 pollable_process_events(exec_ctx, pollset,
1011 WORKER_PTR->pollable_obj, false),
Craig Tiller4f07ea82017-06-08 23:56:06 +00001012 err_desc);
Craig Tiller29a9c3a2017-10-04 15:15:04 -07001013 grpc_exec_ctx_flush(exec_ctx);
Craig Tiller29a9c3a2017-10-04 15:15:04 -07001014 gpr_tls_set(&g_current_thread_pollset, 0);
1015 gpr_tls_set(&g_current_thread_worker, 0);
Craig Tillerd1d7fdd2017-04-14 16:16:24 -07001016 }
Craig Tiller29a9c3a2017-10-04 15:15:04 -07001017 end_worker(exec_ctx, pollset, WORKER_PTR, worker_hdl);
Craig Tiller90a9d7d2017-10-04 21:24:03 +00001018 }
1019#ifdef GRPC_EPOLLEX_CREATE_WORKERS_ON_HEAP
1020 gpr_free(worker);
1021#endif
Craig Tiller8223f412017-10-17 22:05:55 +00001022#undef WORKER_PTR
Craig Tillere24b24d2017-04-06 16:05:45 -07001023 return error;
Craig Tillerdd86b692017-04-06 10:43:11 -07001024}
1025
Craig Tillerbaa14a92017-11-03 09:09:36 -07001026static grpc_error* pollset_transition_pollable_from_empty_to_fd_locked(
1027 grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, grpc_fd* fd) {
1028 static const char* err_desc = "pollset_transition_pollable_from_empty_to_fd";
1029 grpc_error* error = GRPC_ERROR_NONE;
ncteisen9ffb1492017-11-10 14:00:49 -08001030 if (grpc_polling_trace.enabled()) {
Craig Tiller29a9c3a2017-10-04 15:15:04 -07001031 gpr_log(GPR_DEBUG,
1032 "PS:%p add fd %p (%d); transition pollable from empty to fd",
Craig Tillerc5ce0572017-10-03 23:09:41 +00001033 pollset, fd, fd->fd);
Craig Tiller23adbd52017-10-02 15:29:18 -07001034 }
1035 append_error(&error, pollset_kick_all(exec_ctx, pollset), err_desc);
Craig Tiller389ea902017-10-04 17:42:49 +00001036 POLLABLE_UNREF(pollset->active_pollable, "pollset");
Craig Tiller5d9db0c2017-10-18 09:43:55 -07001037 append_error(&error, fd_get_or_become_pollable(fd, &pollset->active_pollable),
Craig Tiller23adbd52017-10-02 15:29:18 -07001038 err_desc);
1039 return error;
1040}
1041
Craig Tillerbaa14a92017-11-03 09:09:36 -07001042static grpc_error* pollset_transition_pollable_from_fd_to_multi_locked(
1043 grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, grpc_fd* and_add_fd) {
1044 static const char* err_desc = "pollset_transition_pollable_from_fd_to_multi";
1045 grpc_error* error = GRPC_ERROR_NONE;
ncteisen9ffb1492017-11-10 14:00:49 -08001046 if (grpc_polling_trace.enabled()) {
Craig Tiller29a9c3a2017-10-04 15:15:04 -07001047 gpr_log(
1048 GPR_DEBUG,
1049 "PS:%p add fd %p (%d); transition pollable from fd %p to multipoller",
1050 pollset, and_add_fd, and_add_fd ? and_add_fd->fd : -1,
1051 pollset->active_pollable->owner_fd);
Craig Tiller23adbd52017-10-02 15:29:18 -07001052 }
1053 append_error(&error, pollset_kick_all(exec_ctx, pollset), err_desc);
Craig Tillerbaa14a92017-11-03 09:09:36 -07001054 grpc_fd* initial_fd = pollset->active_pollable->owner_fd;
Craig Tiller389ea902017-10-04 17:42:49 +00001055 POLLABLE_UNREF(pollset->active_pollable, "pollset");
Noah Eisen882dfed2017-11-14 14:58:20 -08001056 pollset->active_pollable = nullptr;
Craig Tiller23adbd52017-10-02 15:29:18 -07001057 if (append_error(&error, pollable_create(PO_MULTI, &pollset->active_pollable),
1058 err_desc)) {
1059 append_error(&error, pollable_add_fd(pollset->active_pollable, initial_fd),
1060 err_desc);
Noah Eisen882dfed2017-11-14 14:58:20 -08001061 if (and_add_fd != nullptr) {
Craig Tiller23adbd52017-10-02 15:29:18 -07001062 append_error(&error,
1063 pollable_add_fd(pollset->active_pollable, and_add_fd),
Craig Tillerf8401102017-04-17 09:47:28 -07001064 err_desc);
Craig Tiller50480b22017-04-17 16:34:52 +00001065 }
Craig Tiller6c8383a2017-05-05 16:54:42 +00001066 }
1067 return error;
Craig Tillerd1d7fdd2017-04-14 16:16:24 -07001068}
Craig Tillere16722c2017-05-17 21:50:37 +00001069
Craig Tillerdd86b692017-04-06 10:43:11 -07001070/* expects pollsets locked, flag whether fd is locked or not */
Craig Tillerbaa14a92017-11-03 09:09:36 -07001071static grpc_error* pollset_add_fd_locked(grpc_exec_ctx* exec_ctx,
1072 grpc_pollset* pollset, grpc_fd* fd) {
1073 grpc_error* error = GRPC_ERROR_NONE;
1074 pollable* po_at_start =
Craig Tiller29a9c3a2017-10-04 15:15:04 -07001075 POLLABLE_REF(pollset->active_pollable, "pollset_add_fd");
Craig Tiller249de2b2017-10-02 11:49:19 -07001076 switch (pollset->active_pollable->type) {
1077 case PO_EMPTY:
1078 /* empty pollable --> single fd pollable */
Craig Tiller23adbd52017-10-02 15:29:18 -07001079 error = pollset_transition_pollable_from_empty_to_fd_locked(exec_ctx,
1080 pollset, fd);
Craig Tiller249de2b2017-10-02 11:49:19 -07001081 break;
1082 case PO_FD:
Craig Tiller4fd6a412017-10-04 22:41:13 +00001083 gpr_mu_lock(&po_at_start->owner_fd->orphan_mu);
Craig Tillerd8d9f572017-10-04 16:10:23 -07001084 if ((gpr_atm_no_barrier_load(&pollset->active_pollable->owner_fd->refst) &
1085 1) == 0) {
1086 error = pollset_transition_pollable_from_empty_to_fd_locked(
1087 exec_ctx, pollset, fd);
Craig Tiller4fd6a412017-10-04 22:41:13 +00001088 } else {
1089 /* fd --> multipoller */
Craig Tillerd8d9f572017-10-04 16:10:23 -07001090 error = pollset_transition_pollable_from_fd_to_multi_locked(
1091 exec_ctx, pollset, fd);
Craig Tiller4fd6a412017-10-04 22:41:13 +00001092 }
1093 gpr_mu_unlock(&po_at_start->owner_fd->orphan_mu);
Craig Tiller249de2b2017-10-02 11:49:19 -07001094 break;
1095 case PO_MULTI:
Craig Tiller23adbd52017-10-02 15:29:18 -07001096 error = pollable_add_fd(pollset->active_pollable, fd);
Craig Tiller249de2b2017-10-02 11:49:19 -07001097 break;
1098 }
1099 if (error != GRPC_ERROR_NONE) {
Craig Tiller389ea902017-10-04 17:42:49 +00001100 POLLABLE_UNREF(pollset->active_pollable, "pollset");
Craig Tiller249de2b2017-10-02 11:49:19 -07001101 pollset->active_pollable = po_at_start;
1102 } else {
Craig Tiller389ea902017-10-04 17:42:49 +00001103 POLLABLE_UNREF(po_at_start, "pollset_add_fd");
Craig Tillere24b24d2017-04-06 16:05:45 -07001104 }
Craig Tiller50480b22017-04-17 16:34:52 +00001105 return error;
1106}
Craig Tillerd1d7fdd2017-04-14 16:16:24 -07001107
Craig Tillerbaa14a92017-11-03 09:09:36 -07001108static grpc_error* pollset_as_multipollable_locked(grpc_exec_ctx* exec_ctx,
1109 grpc_pollset* pollset,
1110 pollable** pollable_obj) {
1111 grpc_error* error = GRPC_ERROR_NONE;
1112 pollable* po_at_start =
Craig Tiller29a9c3a2017-10-04 15:15:04 -07001113 POLLABLE_REF(pollset->active_pollable, "pollset_as_multipollable");
Craig Tiller23adbd52017-10-02 15:29:18 -07001114 switch (pollset->active_pollable->type) {
1115 case PO_EMPTY:
Craig Tiller389ea902017-10-04 17:42:49 +00001116 POLLABLE_UNREF(pollset->active_pollable, "pollset");
Craig Tiller23adbd52017-10-02 15:29:18 -07001117 error = pollable_create(PO_MULTI, &pollset->active_pollable);
1118 break;
1119 case PO_FD:
Craig Tiller4fd6a412017-10-04 22:41:13 +00001120 gpr_mu_lock(&po_at_start->owner_fd->orphan_mu);
Craig Tillerd8d9f572017-10-04 16:10:23 -07001121 if ((gpr_atm_no_barrier_load(&pollset->active_pollable->owner_fd->refst) &
1122 1) == 0) {
Craig Tiller4fd6a412017-10-04 22:41:13 +00001123 POLLABLE_UNREF(pollset->active_pollable, "pollset");
1124 error = pollable_create(PO_MULTI, &pollset->active_pollable);
1125 } else {
1126 error = pollset_transition_pollable_from_fd_to_multi_locked(
Noah Eisen882dfed2017-11-14 14:58:20 -08001127 exec_ctx, pollset, nullptr);
Craig Tiller4fd6a412017-10-04 22:41:13 +00001128 }
1129 gpr_mu_unlock(&po_at_start->owner_fd->orphan_mu);
Craig Tiller23adbd52017-10-02 15:29:18 -07001130 break;
1131 case PO_MULTI:
1132 break;
1133 }
1134 if (error != GRPC_ERROR_NONE) {
Craig Tiller389ea902017-10-04 17:42:49 +00001135 POLLABLE_UNREF(pollset->active_pollable, "pollset");
Craig Tiller23adbd52017-10-02 15:29:18 -07001136 pollset->active_pollable = po_at_start;
Noah Eisen882dfed2017-11-14 14:58:20 -08001137 *pollable_obj = nullptr;
Craig Tiller23adbd52017-10-02 15:29:18 -07001138 } else {
Craig Tiller389ea902017-10-04 17:42:49 +00001139 *pollable_obj = POLLABLE_REF(pollset->active_pollable, "pollset_set");
1140 POLLABLE_UNREF(po_at_start, "pollset_as_multipollable");
Craig Tillere24b24d2017-04-06 16:05:45 -07001141 }
1142 return error;
1143}
1144
Craig Tillerbaa14a92017-11-03 09:09:36 -07001145static void pollset_add_fd(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
1146 grpc_fd* fd) {
Craig Tiller249de2b2017-10-02 11:49:19 -07001147 gpr_mu_lock(&pollset->mu);
Craig Tillerbaa14a92017-11-03 09:09:36 -07001148 grpc_error* error = pollset_add_fd_locked(exec_ctx, pollset, fd);
Craig Tiller249de2b2017-10-02 11:49:19 -07001149 gpr_mu_unlock(&pollset->mu);
Craig Tillere24b24d2017-04-06 16:05:45 -07001150 GRPC_LOG_IF_ERROR("pollset_add_fd", error);
Craig Tillerdd86b692017-04-06 10:43:11 -07001151}
1152
1153/*******************************************************************************
1154 * Pollset-set Definitions
1155 */
1156
Craig Tillerbaa14a92017-11-03 09:09:36 -07001157static grpc_pollset_set* pss_lock_adam(grpc_pollset_set* pss) {
Craig Tiller249de2b2017-10-02 11:49:19 -07001158 gpr_mu_lock(&pss->mu);
Noah Eisen882dfed2017-11-14 14:58:20 -08001159 while (pss->parent != nullptr) {
Craig Tiller249de2b2017-10-02 11:49:19 -07001160 gpr_mu_unlock(&pss->mu);
1161 pss = pss->parent;
Craig Tillerc5ce0572017-10-03 23:09:41 +00001162 gpr_mu_lock(&pss->mu);
Craig Tiller249de2b2017-10-02 11:49:19 -07001163 }
Craig Tillerdd86b692017-04-06 10:43:11 -07001164 return pss;
1165}
1166
Craig Tillerbaa14a92017-11-03 09:09:36 -07001167static grpc_pollset_set* pollset_set_create(void) {
1168 grpc_pollset_set* pss = (grpc_pollset_set*)gpr_zalloc(sizeof(*pss));
Craig Tiller249de2b2017-10-02 11:49:19 -07001169 gpr_mu_init(&pss->mu);
1170 gpr_ref_init(&pss->refs, 1);
Craig Tillerdd86b692017-04-06 10:43:11 -07001171 return pss;
1172}
1173
Craig Tillerbaa14a92017-11-03 09:09:36 -07001174static void pollset_set_unref(grpc_exec_ctx* exec_ctx, grpc_pollset_set* pss) {
Noah Eisen882dfed2017-11-14 14:58:20 -08001175 if (pss == nullptr) return;
Craig Tiller389ea902017-10-04 17:42:49 +00001176 if (!gpr_unref(&pss->refs)) return;
1177 pollset_set_unref(exec_ctx, pss->parent);
1178 gpr_mu_destroy(&pss->mu);
Craig Tiller29a9c3a2017-10-04 15:15:04 -07001179 for (size_t i = 0; i < pss->pollset_count; i++) {
Craig Tiller82324922017-10-16 21:37:28 +00001180 gpr_mu_lock(&pss->pollsets[i]->mu);
Craig Tillerb653dff2017-10-16 14:38:36 -07001181 if (0 == --pss->pollsets[i]->containing_pollset_set_count) {
Craig Tiller82324922017-10-16 21:37:28 +00001182 pollset_maybe_finish_shutdown(exec_ctx, pss->pollsets[i]);
1183 }
1184 gpr_mu_unlock(&pss->pollsets[i]->mu);
Craig Tiller389ea902017-10-04 17:42:49 +00001185 }
Craig Tiller29a9c3a2017-10-04 15:15:04 -07001186 for (size_t i = 0; i < pss->fd_count; i++) {
Craig Tiller389ea902017-10-04 17:42:49 +00001187 UNREF_BY(exec_ctx, pss->fds[i], 2, "pollset_set");
1188 }
1189 gpr_free(pss->pollsets);
1190 gpr_free(pss->fds);
Craig Tiller83d5fb62017-04-10 22:13:43 +00001191 gpr_free(pss);
Craig Tillerdd86b692017-04-06 10:43:11 -07001192}
1193
Craig Tillerbaa14a92017-11-03 09:09:36 -07001194static void pollset_set_add_fd(grpc_exec_ctx* exec_ctx, grpc_pollset_set* pss,
1195 grpc_fd* fd) {
ncteisen9ffb1492017-11-10 14:00:49 -08001196 if (grpc_polling_trace.enabled()) {
Craig Tiller29a9c3a2017-10-04 15:15:04 -07001197 gpr_log(GPR_DEBUG, "PSS:%p: add fd %p (%d)", pss, fd, fd->fd);
1198 }
Craig Tillerbaa14a92017-11-03 09:09:36 -07001199 grpc_error* error = GRPC_ERROR_NONE;
1200 static const char* err_desc = "pollset_set_add_fd";
Craig Tiller249de2b2017-10-02 11:49:19 -07001201 pss = pss_lock_adam(pss);
Craig Tiller23adbd52017-10-02 15:29:18 -07001202 for (size_t i = 0; i < pss->pollset_count; i++) {
Craig Tillerb653dff2017-10-16 14:38:36 -07001203 append_error(&error, pollable_add_fd(pss->pollsets[i]->active_pollable, fd),
1204 err_desc);
Craig Tiller249de2b2017-10-02 11:49:19 -07001205 }
Craig Tiller23adbd52017-10-02 15:29:18 -07001206 if (pss->fd_count == pss->fd_capacity) {
1207 pss->fd_capacity = GPR_MAX(pss->fd_capacity * 2, 8);
Craig Tiller513daab2017-10-06 09:18:34 -07001208 pss->fds =
Craig Tillerbaa14a92017-11-03 09:09:36 -07001209 (grpc_fd**)gpr_realloc(pss->fds, pss->fd_capacity * sizeof(*pss->fds));
Craig Tiller23adbd52017-10-02 15:29:18 -07001210 }
1211 REF_BY(fd, 2, "pollset_set");
1212 pss->fds[pss->fd_count++] = fd;
Craig Tiller249de2b2017-10-02 11:49:19 -07001213 gpr_mu_unlock(&pss->mu);
1214
Craig Tiller23adbd52017-10-02 15:29:18 -07001215 GRPC_LOG_IF_ERROR(err_desc, error);
Craig Tillerdd86b692017-04-06 10:43:11 -07001216}
1217
Craig Tillerbaa14a92017-11-03 09:09:36 -07001218static void pollset_set_del_fd(grpc_exec_ctx* exec_ctx, grpc_pollset_set* pss,
1219 grpc_fd* fd) {
ncteisen9ffb1492017-11-10 14:00:49 -08001220 if (grpc_polling_trace.enabled()) {
Craig Tiller29a9c3a2017-10-04 15:15:04 -07001221 gpr_log(GPR_DEBUG, "PSS:%p: del fd %p", pss, fd);
1222 }
Craig Tiller23adbd52017-10-02 15:29:18 -07001223 pss = pss_lock_adam(pss);
1224 size_t i;
1225 for (i = 0; i < pss->fd_count; i++) {
1226 if (pss->fds[i] == fd) {
1227 UNREF_BY(exec_ctx, fd, 2, "pollset_set");
1228 break;
1229 }
1230 }
1231 GPR_ASSERT(i != pss->fd_count);
1232 for (; i < pss->fd_count - 1; i++) {
1233 pss->fds[i] = pss->fds[i + 1];
1234 }
1235 pss->fd_count--;
1236 gpr_mu_unlock(&pss->mu);
Craig Tillerdd86b692017-04-06 10:43:11 -07001237}
1238
Craig Tillerbaa14a92017-11-03 09:09:36 -07001239static void pollset_set_del_pollset(grpc_exec_ctx* exec_ctx,
1240 grpc_pollset_set* pss, grpc_pollset* ps) {
ncteisen9ffb1492017-11-10 14:00:49 -08001241 if (grpc_polling_trace.enabled()) {
Craig Tiller29a9c3a2017-10-04 15:15:04 -07001242 gpr_log(GPR_DEBUG, "PSS:%p: del pollset %p", pss, ps);
1243 }
Craig Tiller23adbd52017-10-02 15:29:18 -07001244 pss = pss_lock_adam(pss);
1245 size_t i;
1246 for (i = 0; i < pss->pollset_count; i++) {
Craig Tiller82324922017-10-16 21:37:28 +00001247 if (pss->pollsets[i] == ps) {
Craig Tiller23adbd52017-10-02 15:29:18 -07001248 break;
1249 }
1250 }
1251 GPR_ASSERT(i != pss->pollset_count);
1252 for (; i < pss->pollset_count - 1; i++) {
1253 pss->pollsets[i] = pss->pollsets[i + 1];
1254 }
1255 pss->pollset_count--;
1256 gpr_mu_unlock(&pss->mu);
Craig Tillerad059f72017-10-12 22:47:05 +00001257 gpr_mu_lock(&ps->mu);
1258 if (0 == --ps->containing_pollset_set_count) {
1259 pollset_maybe_finish_shutdown(exec_ctx, ps);
1260 }
1261 gpr_mu_unlock(&ps->mu);
Craig Tiller23adbd52017-10-02 15:29:18 -07001262}
1263
Craig Tiller4fd6a412017-10-04 22:41:13 +00001264// add all fds to pollables, and output a new array of unorphaned out_fds
Craig Tiller82324922017-10-16 21:37:28 +00001265// assumes pollsets are multipollable
Craig Tillerbaa14a92017-11-03 09:09:36 -07001266static grpc_error* add_fds_to_pollsets(grpc_exec_ctx* exec_ctx, grpc_fd** fds,
1267 size_t fd_count, grpc_pollset** pollsets,
Craig Tillerb653dff2017-10-16 14:38:36 -07001268 size_t pollset_count,
Craig Tillerbaa14a92017-11-03 09:09:36 -07001269 const char* err_desc, grpc_fd** out_fds,
1270 size_t* out_fd_count) {
1271 grpc_error* error = GRPC_ERROR_NONE;
Craig Tiller23adbd52017-10-02 15:29:18 -07001272 for (size_t i = 0; i < fd_count; i++) {
Craig Tiller4fd6a412017-10-04 22:41:13 +00001273 gpr_mu_lock(&fds[i]->orphan_mu);
1274 if ((gpr_atm_no_barrier_load(&fds[i]->refst) & 1) == 0) {
1275 gpr_mu_unlock(&fds[i]->orphan_mu);
1276 UNREF_BY(exec_ctx, fds[i], 2, "pollset_set");
1277 } else {
Craig Tiller82324922017-10-16 21:37:28 +00001278 for (size_t j = 0; j < pollset_count; j++) {
Craig Tillerb653dff2017-10-16 14:38:36 -07001279 append_error(&error,
1280 pollable_add_fd(pollsets[j]->active_pollable, fds[i]),
1281 err_desc);
Craig Tiller4fd6a412017-10-04 22:41:13 +00001282 }
1283 gpr_mu_unlock(&fds[i]->orphan_mu);
1284 out_fds[(*out_fd_count)++] = fds[i];
Craig Tiller23adbd52017-10-02 15:29:18 -07001285 }
1286 }
1287 return error;
1288}
Craig Tillerdd86b692017-04-06 10:43:11 -07001289
Craig Tillerbaa14a92017-11-03 09:09:36 -07001290static void pollset_set_add_pollset(grpc_exec_ctx* exec_ctx,
1291 grpc_pollset_set* pss, grpc_pollset* ps) {
ncteisen9ffb1492017-11-10 14:00:49 -08001292 if (grpc_polling_trace.enabled()) {
Craig Tiller79c4f1f2017-10-12 01:12:40 +00001293 gpr_log(GPR_DEBUG, "PSS:%p: add pollset %p", pss, ps);
1294 }
Craig Tillerbaa14a92017-11-03 09:09:36 -07001295 grpc_error* error = GRPC_ERROR_NONE;
1296 static const char* err_desc = "pollset_set_add_pollset";
Noah Eisen882dfed2017-11-14 14:58:20 -08001297 pollable* pollable_obj = nullptr;
Craig Tiller1e8c2ab2017-10-12 15:50:13 -07001298 gpr_mu_lock(&ps->mu);
1299 if (!GRPC_LOG_IF_ERROR(err_desc, pollset_as_multipollable_locked(
1300 exec_ctx, ps, &pollable_obj))) {
Noah Eisen882dfed2017-11-14 14:58:20 -08001301 GPR_ASSERT(pollable_obj == nullptr);
Craig Tiller1e8c2ab2017-10-12 15:50:13 -07001302 gpr_mu_unlock(&ps->mu);
Craig Tiller79c4f1f2017-10-12 01:12:40 +00001303 return;
1304 }
Craig Tiller1e8c2ab2017-10-12 15:50:13 -07001305 ps->containing_pollset_set_count++;
1306 gpr_mu_unlock(&ps->mu);
Craig Tiller79c4f1f2017-10-12 01:12:40 +00001307 pss = pss_lock_adam(pss);
1308 size_t initial_fd_count = pss->fd_count;
1309 pss->fd_count = 0;
Craig Tillerb653dff2017-10-16 14:38:36 -07001310 append_error(&error,
1311 add_fds_to_pollsets(exec_ctx, pss->fds, initial_fd_count, &ps, 1,
1312 err_desc, pss->fds, &pss->fd_count),
Craig Tiller79c4f1f2017-10-12 01:12:40 +00001313 err_desc);
1314 if (pss->pollset_count == pss->pollset_capacity) {
1315 pss->pollset_capacity = GPR_MAX(pss->pollset_capacity * 2, 8);
Craig Tillerbaa14a92017-11-03 09:09:36 -07001316 pss->pollsets = (grpc_pollset**)gpr_realloc(
Craig Tiller79c4f1f2017-10-12 01:12:40 +00001317 pss->pollsets, pss->pollset_capacity * sizeof(*pss->pollsets));
1318 }
Craig Tiller82324922017-10-16 21:37:28 +00001319 pss->pollsets[pss->pollset_count++] = ps;
Craig Tiller79c4f1f2017-10-12 01:12:40 +00001320 gpr_mu_unlock(&pss->mu);
Craig Tillerb653dff2017-10-16 14:38:36 -07001321 POLLABLE_UNREF(pollable_obj, "pollset_set");
Craig Tiller79c4f1f2017-10-12 01:12:40 +00001322
1323 GRPC_LOG_IF_ERROR(err_desc, error);
1324}
Craig Tillerdd86b692017-04-06 10:43:11 -07001325
Craig Tillerbaa14a92017-11-03 09:09:36 -07001326static void pollset_set_add_pollset_set(grpc_exec_ctx* exec_ctx,
1327 grpc_pollset_set* a,
1328 grpc_pollset_set* b) {
ncteisen9ffb1492017-11-10 14:00:49 -08001329 if (grpc_polling_trace.enabled()) {
Craig Tiller29a9c3a2017-10-04 15:15:04 -07001330 gpr_log(GPR_DEBUG, "PSS: merge (%p, %p)", a, b);
1331 }
Craig Tillerbaa14a92017-11-03 09:09:36 -07001332 grpc_error* error = GRPC_ERROR_NONE;
1333 static const char* err_desc = "pollset_set_add_fd";
Craig Tiller23adbd52017-10-02 15:29:18 -07001334 for (;;) {
1335 if (a == b) {
1336 // pollset ancestors are the same: nothing to do
1337 return;
1338 }
1339 if (a > b) {
Craig Tillerbaa14a92017-11-03 09:09:36 -07001340 GPR_SWAP(grpc_pollset_set*, a, b);
Craig Tiller23adbd52017-10-02 15:29:18 -07001341 }
Craig Tillerbaa14a92017-11-03 09:09:36 -07001342 gpr_mu* a_mu = &a->mu;
1343 gpr_mu* b_mu = &b->mu;
Craig Tillerc5ce0572017-10-03 23:09:41 +00001344 gpr_mu_lock(a_mu);
1345 gpr_mu_lock(b_mu);
Noah Eisen882dfed2017-11-14 14:58:20 -08001346 if (a->parent != nullptr) {
Craig Tiller23adbd52017-10-02 15:29:18 -07001347 a = a->parent;
Noah Eisen882dfed2017-11-14 14:58:20 -08001348 } else if (b->parent != nullptr) {
Craig Tiller23adbd52017-10-02 15:29:18 -07001349 b = b->parent;
1350 } else {
1351 break; // exit loop, both pollsets locked
1352 }
Craig Tillerc5ce0572017-10-03 23:09:41 +00001353 gpr_mu_unlock(a_mu);
1354 gpr_mu_unlock(b_mu);
Craig Tiller23adbd52017-10-02 15:29:18 -07001355 }
1356 // try to do the least copying possible
1357 // TODO(ctiller): there's probably a better heuristic here
1358 const size_t a_size = a->fd_count + a->pollset_count;
1359 const size_t b_size = b->fd_count + b->pollset_count;
1360 if (b_size > a_size) {
Craig Tillerbaa14a92017-11-03 09:09:36 -07001361 GPR_SWAP(grpc_pollset_set*, a, b);
Craig Tiller23adbd52017-10-02 15:29:18 -07001362 }
ncteisen9ffb1492017-11-10 14:00:49 -08001363 if (grpc_polling_trace.enabled()) {
Craig Tiller29a9c3a2017-10-04 15:15:04 -07001364 gpr_log(GPR_DEBUG, "PSS: parent %p to %p", b, a);
1365 }
Craig Tiller23adbd52017-10-02 15:29:18 -07001366 gpr_ref(&a->refs);
1367 b->parent = a;
Craig Tiller23adbd52017-10-02 15:29:18 -07001368 if (a->fd_capacity < a->fd_count + b->fd_count) {
1369 a->fd_capacity = GPR_MAX(2 * a->fd_capacity, a->fd_count + b->fd_count);
Craig Tillerbaa14a92017-11-03 09:09:36 -07001370 a->fds = (grpc_fd**)gpr_realloc(a->fds, a->fd_capacity * sizeof(*a->fds));
Craig Tiller23adbd52017-10-02 15:29:18 -07001371 }
Craig Tiller4fd6a412017-10-04 22:41:13 +00001372 size_t initial_a_fd_count = a->fd_count;
1373 a->fd_count = 0;
Craig Tillerbaa14a92017-11-03 09:09:36 -07001374 append_error(
1375 &error,
1376 add_fds_to_pollsets(exec_ctx, a->fds, initial_a_fd_count, b->pollsets,
1377 b->pollset_count, "merge_a2b", a->fds, &a->fd_count),
1378 err_desc);
1379 append_error(
1380 &error,
1381 add_fds_to_pollsets(exec_ctx, b->fds, b->fd_count, a->pollsets,
1382 a->pollset_count, "merge_b2a", a->fds, &a->fd_count),
1383 err_desc);
Craig Tiller23adbd52017-10-02 15:29:18 -07001384 if (a->pollset_capacity < a->pollset_count + b->pollset_count) {
1385 a->pollset_capacity =
1386 GPR_MAX(2 * a->pollset_capacity, a->pollset_count + b->pollset_count);
Craig Tillerbaa14a92017-11-03 09:09:36 -07001387 a->pollsets = (grpc_pollset**)gpr_realloc(
Craig Tiller513daab2017-10-06 09:18:34 -07001388 a->pollsets, a->pollset_capacity * sizeof(*a->pollsets));
Craig Tiller23adbd52017-10-02 15:29:18 -07001389 }
Craig Tiller658f5bd2017-10-17 09:12:34 -07001390 if (b->pollset_count > 0) {
1391 memcpy(a->pollsets + a->pollset_count, b->pollsets,
1392 b->pollset_count * sizeof(*b->pollsets));
1393 }
Craig Tiller23adbd52017-10-02 15:29:18 -07001394 a->pollset_count += b->pollset_count;
1395 gpr_free(b->fds);
1396 gpr_free(b->pollsets);
Noah Eisen882dfed2017-11-14 14:58:20 -08001397 b->fds = nullptr;
1398 b->pollsets = nullptr;
Craig Tiller23adbd52017-10-02 15:29:18 -07001399 b->fd_count = b->fd_capacity = b->pollset_count = b->pollset_capacity = 0;
1400 gpr_mu_unlock(&a->mu);
1401 gpr_mu_unlock(&b->mu);
Craig Tillerdd86b692017-04-06 10:43:11 -07001402}
1403
Craig Tillerbaa14a92017-11-03 09:09:36 -07001404static void pollset_set_del_pollset_set(grpc_exec_ctx* exec_ctx,
1405 grpc_pollset_set* bag,
1406 grpc_pollset_set* item) {}
Craig Tillerdd86b692017-04-06 10:43:11 -07001407
1408/*******************************************************************************
1409 * Event engine binding
1410 */
1411
1412static void shutdown_engine(void) {
1413 fd_global_shutdown();
1414 pollset_global_shutdown();
1415}
1416
1417static const grpc_event_engine_vtable vtable = {
Yash Tibrewal533d1182017-09-18 10:48:22 -07001418 sizeof(grpc_pollset),
Craig Tillerdd86b692017-04-06 10:43:11 -07001419
Yash Tibrewal533d1182017-09-18 10:48:22 -07001420 fd_create,
1421 fd_wrapped_fd,
1422 fd_orphan,
1423 fd_shutdown,
1424 fd_notify_on_read,
1425 fd_notify_on_write,
1426 fd_is_shutdown,
1427 fd_get_read_notifier_pollset,
Craig Tillerdd86b692017-04-06 10:43:11 -07001428
Yash Tibrewal533d1182017-09-18 10:48:22 -07001429 pollset_init,
1430 pollset_shutdown,
1431 pollset_destroy,
1432 pollset_work,
1433 pollset_kick,
1434 pollset_add_fd,
Craig Tillerdd86b692017-04-06 10:43:11 -07001435
Yash Tibrewal533d1182017-09-18 10:48:22 -07001436 pollset_set_create,
Craig Tiller29a9c3a2017-10-04 15:15:04 -07001437 pollset_set_unref, // destroy ==> unref 1 public ref
Yash Tibrewal533d1182017-09-18 10:48:22 -07001438 pollset_set_add_pollset,
1439 pollset_set_del_pollset,
1440 pollset_set_add_pollset_set,
1441 pollset_set_del_pollset_set,
1442 pollset_set_add_fd,
1443 pollset_set_del_fd,
Craig Tillerdd86b692017-04-06 10:43:11 -07001444
Yash Tibrewal533d1182017-09-18 10:48:22 -07001445 shutdown_engine,
Craig Tillerdd86b692017-04-06 10:43:11 -07001446};
1447
Craig Tillerbaa14a92017-11-03 09:09:36 -07001448const grpc_event_engine_vtable* grpc_init_epollex_linux(
Craig Tiller6c8383a2017-05-05 16:54:42 +00001449 bool explicitly_requested) {
Craig Tillerd4a0eb22017-10-18 09:27:15 -07001450 if (!explicitly_requested) {
Noah Eisen882dfed2017-11-14 14:58:20 -08001451 return nullptr;
Craig Tillerd4a0eb22017-10-18 09:27:15 -07001452 }
1453
Craig Tillerdd86b692017-04-06 10:43:11 -07001454 if (!grpc_has_wakeup_fd()) {
yang-g30101b02017-11-06 14:35:30 -08001455 gpr_log(GPR_ERROR, "Skipping epollex because of no wakeup fd.");
Noah Eisen882dfed2017-11-14 14:58:20 -08001456 return nullptr;
Craig Tillerdd86b692017-04-06 10:43:11 -07001457 }
1458
Craig Tiller819cd882017-04-25 13:18:22 -07001459 if (!grpc_is_epollexclusive_available()) {
yang-g30101b02017-11-06 14:35:30 -08001460 gpr_log(GPR_INFO, "Skipping epollex because it is not supported.");
Noah Eisen882dfed2017-11-14 14:58:20 -08001461 return nullptr;
Craig Tillerdd86b692017-04-06 10:43:11 -07001462 }
1463
1464 fd_global_init();
1465
1466 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
Craig Tiller54d388d2017-04-14 16:20:45 -07001467 pollset_global_shutdown();
1468 fd_global_shutdown();
Noah Eisen882dfed2017-11-14 14:58:20 -08001469 return nullptr;
Craig Tillerdd86b692017-04-06 10:43:11 -07001470 }
1471
1472 return &vtable;
1473}
1474
1475#else /* defined(GRPC_LINUX_EPOLL) */
1476#if defined(GRPC_POSIX_SOCKET)
Yash Tibrewal1cac2232017-09-26 11:31:11 -07001477#include "src/core/lib/iomgr/ev_epollex_linux.h"
Craig Tillerdd86b692017-04-06 10:43:11 -07001478/* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
1479 * NULL */
Craig Tillerbaa14a92017-11-03 09:09:36 -07001480const grpc_event_engine_vtable* grpc_init_epollex_linux(
Craig Tiller6c8383a2017-05-05 16:54:42 +00001481 bool explicitly_requested) {
1482 return NULL;
1483}
Craig Tillerdd86b692017-04-06 10:43:11 -07001484#endif /* defined(GRPC_POSIX_SOCKET) */
1485
1486#endif /* !defined(GRPC_LINUX_EPOLL) */