blob: 689aac15bf21c2cbe72860685a4ecea792de0ceb [file] [log] [blame]
Craig Tillerc67cc992017-04-27 10:15:51 -07001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2017 gRPC authors.
Craig Tillerc67cc992017-04-27 10:15:51 -07004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
Craig Tillerc67cc992017-04-27 10:15:51 -07008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
Craig Tillerc67cc992017-04-27 10:15:51 -070010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
Craig Tillerc67cc992017-04-27 10:15:51 -070016 *
17 */
18
19#include "src/core/lib/iomgr/port.h"
20
21/* This polling engine is only relevant on linux kernels supporting epoll() */
22#ifdef GRPC_LINUX_EPOLL
Yash Tibrewal1cac2232017-09-26 11:31:11 -070023#include "src/core/lib/iomgr/ev_epoll1_linux.h"
Craig Tillerc67cc992017-04-27 10:15:51 -070024
Craig Tillerc67cc992017-04-27 10:15:51 -070025#include <assert.h>
26#include <errno.h>
27#include <poll.h>
28#include <pthread.h>
29#include <string.h>
30#include <sys/epoll.h>
31#include <sys/socket.h>
32#include <unistd.h>
33
34#include <grpc/support/alloc.h>
Craig Tiller6de05932017-04-28 09:17:38 -070035#include <grpc/support/cpu.h>
Craig Tillerc67cc992017-04-27 10:15:51 -070036#include <grpc/support/log.h>
37#include <grpc/support/string_util.h>
38#include <grpc/support/tls.h>
39#include <grpc/support/useful.h>
40
Craig Tillerb4bb1cd2017-07-20 14:18:17 -070041#include "src/core/lib/debug/stats.h"
Craig Tillerc67cc992017-04-27 10:15:51 -070042#include "src/core/lib/iomgr/ev_posix.h"
43#include "src/core/lib/iomgr/iomgr_internal.h"
44#include "src/core/lib/iomgr/lockfree_event.h"
Craig Tillerc67cc992017-04-27 10:15:51 -070045#include "src/core/lib/iomgr/wakeup_fd_posix.h"
Craig Tillerc67cc992017-04-27 10:15:51 -070046#include "src/core/lib/profiling/timers.h"
47#include "src/core/lib/support/block_annotate.h"
Craig Tillerb89bac02017-05-26 15:20:32 +000048#include "src/core/lib/support/string.h"
Craig Tillerc67cc992017-04-27 10:15:51 -070049
Craig Tillerc67cc992017-04-27 10:15:51 -070050static grpc_wakeup_fd global_wakeup_fd;
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -070051
52/*******************************************************************************
53 * Singleton epoll set related fields
54 */
55
56#define MAX_EPOLL_EVENTS 100
Sree Kuchibhotla19614522017-08-25 17:10:10 -070057#define MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION 1
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -070058
Sree Kuchibhotlae01940f2017-08-27 18:10:12 -070059/* NOTE ON SYNCHRONIZATION:
60 * - Fields in this struct are only modified by the designated poller. Hence
61 * there is no need for any locks to protect the struct.
62 * - num_events and cursor fields have to be of atomic type to provide memory
63 * visibility guarantees only. i.e In case of multiple pollers, the designated
64 * polling thread keeps changing; the thread that wrote these values may be
65 * different from the thread reading the values
66 */
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -070067typedef struct epoll_set {
68 int epfd;
69
70 /* The epoll_events after the last call to epoll_wait() */
71 struct epoll_event events[MAX_EPOLL_EVENTS];
72
73 /* The number of epoll_events after the last call to epoll_wait() */
Sree Kuchibhotlaa92a9cc2017-08-27 14:02:15 -070074 gpr_atm num_events;
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -070075
76 /* Index of the first event in epoll_events that has to be processed. This
77 * field is only valid if num_events > 0 */
Sree Kuchibhotlaa92a9cc2017-08-27 14:02:15 -070078 gpr_atm cursor;
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -070079} epoll_set;
80
81/* The global singleton epoll set */
82static epoll_set g_epoll_set;
83
84/* Must be called *only* once */
85static bool epoll_set_init() {
86 g_epoll_set.epfd = epoll_create1(EPOLL_CLOEXEC);
87 if (g_epoll_set.epfd < 0) {
88 gpr_log(GPR_ERROR, "epoll unavailable");
89 return false;
90 }
91
Sree Kuchibhotlaa92a9cc2017-08-27 14:02:15 -070092 gpr_log(GPR_INFO, "grpc epoll fd: %d", g_epoll_set.epfd);
93 gpr_atm_no_barrier_store(&g_epoll_set.num_events, 0);
94 gpr_atm_no_barrier_store(&g_epoll_set.cursor, 0);
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -070095 return true;
96}
97
98/* epoll_set_init() MUST be called before calling this. */
99static void epoll_set_shutdown() {
100 if (g_epoll_set.epfd >= 0) {
101 close(g_epoll_set.epfd);
102 g_epoll_set.epfd = -1;
103 }
104}
Craig Tillerc67cc992017-04-27 10:15:51 -0700105
106/*******************************************************************************
107 * Fd Declarations
108 */
109
110struct grpc_fd {
111 int fd;
112
Craig Tillerc67cc992017-04-27 10:15:51 -0700113 gpr_atm read_closure;
114 gpr_atm write_closure;
115
116 struct grpc_fd *freelist_next;
Craig Tillerc67cc992017-04-27 10:15:51 -0700117
118 /* The pollset that last noticed that the fd is readable. The actual type
119 * stored in this is (grpc_pollset *) */
120 gpr_atm read_notifier_pollset;
121
122 grpc_iomgr_object iomgr_object;
123};
124
125static void fd_global_init(void);
126static void fd_global_shutdown(void);
127
128/*******************************************************************************
129 * Pollset Declarations
130 */
131
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700132typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state;
Craig Tillerc67cc992017-04-27 10:15:51 -0700133
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700134static const char *kick_state_string(kick_state st) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700135 switch (st) {
136 case UNKICKED:
137 return "UNKICKED";
138 case KICKED:
139 return "KICKED";
140 case DESIGNATED_POLLER:
141 return "DESIGNATED_POLLER";
142 }
143 GPR_UNREACHABLE_CODE(return "UNKNOWN");
144}
145
Craig Tillerc67cc992017-04-27 10:15:51 -0700146struct grpc_pollset_worker {
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700147 kick_state state;
Craig Tiller55624a32017-05-26 08:14:44 -0700148 int kick_state_mutator; // which line of code last changed kick state
Craig Tillerc67cc992017-04-27 10:15:51 -0700149 bool initialized_cv;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700150 grpc_pollset_worker *next;
151 grpc_pollset_worker *prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700152 gpr_cv cv;
Craig Tiller50da5ec2017-05-01 13:51:14 -0700153 grpc_closure_list schedule_on_end_work;
Craig Tillerc67cc992017-04-27 10:15:51 -0700154};
155
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700156#define SET_KICK_STATE(worker, kick_state) \
Craig Tiller55624a32017-05-26 08:14:44 -0700157 do { \
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700158 (worker)->state = (kick_state); \
Craig Tiller55624a32017-05-26 08:14:44 -0700159 (worker)->kick_state_mutator = __LINE__; \
160 } while (false)
161
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700162#define MAX_NEIGHBORHOODS 1024
Craig Tillerba550da2017-05-01 14:26:31 +0000163
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700164typedef struct pollset_neighborhood {
Craig Tiller6de05932017-04-28 09:17:38 -0700165 gpr_mu mu;
166 grpc_pollset *active_root;
Craig Tiller6de05932017-04-28 09:17:38 -0700167 char pad[GPR_CACHELINE_SIZE];
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700168} pollset_neighborhood;
Craig Tiller6de05932017-04-28 09:17:38 -0700169
Craig Tillerc67cc992017-04-27 10:15:51 -0700170struct grpc_pollset {
Craig Tiller6de05932017-04-28 09:17:38 -0700171 gpr_mu mu;
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700172 pollset_neighborhood *neighborhood;
173 bool reassigning_neighborhood;
Craig Tiller4509c472017-04-27 19:05:13 +0000174 grpc_pollset_worker *root_worker;
175 bool kicked_without_poller;
Sree Kuchibhotla0d8431a2017-07-18 16:21:54 -0700176
177 /* Set to true if the pollset is observed to have no workers available to
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700178 poll */
Craig Tiller6de05932017-04-28 09:17:38 -0700179 bool seen_inactive;
Sree Kuchibhotla0d8431a2017-07-18 16:21:54 -0700180 bool shutting_down; /* Is the pollset shutting down ? */
Craig Tiller4509c472017-04-27 19:05:13 +0000181 grpc_closure *shutdown_closure; /* Called after after shutdown is complete */
Sree Kuchibhotla0d8431a2017-07-18 16:21:54 -0700182
183 /* Number of workers who are *about-to* attach themselves to the pollset
184 * worker list */
Craig Tillerba550da2017-05-01 14:26:31 +0000185 int begin_refs;
Craig Tiller6de05932017-04-28 09:17:38 -0700186
187 grpc_pollset *next;
188 grpc_pollset *prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700189};
190
191/*******************************************************************************
192 * Pollset-set Declarations
193 */
Craig Tiller6de05932017-04-28 09:17:38 -0700194
Craig Tiller61f96c12017-05-12 13:36:39 -0700195struct grpc_pollset_set {
196 char unused;
197};
Craig Tillerc67cc992017-04-27 10:15:51 -0700198
199/*******************************************************************************
200 * Common helpers
201 */
202
203static bool append_error(grpc_error **composite, grpc_error *error,
204 const char *desc) {
205 if (error == GRPC_ERROR_NONE) return true;
206 if (*composite == GRPC_ERROR_NONE) {
207 *composite = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc);
208 }
209 *composite = grpc_error_add_child(*composite, error);
210 return false;
211}
212
213/*******************************************************************************
214 * Fd Definitions
215 */
216
217/* We need to keep a freelist not because of any concerns of malloc performance
218 * but instead so that implementations with multiple threads in (for example)
219 * epoll_wait deal with the race between pollset removal and incoming poll
220 * notifications.
221 *
222 * The problem is that the poller ultimately holds a reference to this
223 * object, so it is very difficult to know when is safe to free it, at least
224 * without some expensive synchronization.
225 *
226 * If we keep the object freelisted, in the worst case losing this race just
227 * becomes a spurious read notification on a reused fd.
228 */
229
230/* The alarm system needs to be able to wakeup 'some poller' sometimes
231 * (specifically when a new alarm needs to be triggered earlier than the next
232 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
233 * case occurs. */
234
235static grpc_fd *fd_freelist = NULL;
236static gpr_mu fd_freelist_mu;
237
Craig Tillerc67cc992017-04-27 10:15:51 -0700238static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
239
240static void fd_global_shutdown(void) {
241 gpr_mu_lock(&fd_freelist_mu);
242 gpr_mu_unlock(&fd_freelist_mu);
243 while (fd_freelist != NULL) {
244 grpc_fd *fd = fd_freelist;
245 fd_freelist = fd_freelist->freelist_next;
Craig Tillerc67cc992017-04-27 10:15:51 -0700246 gpr_free(fd);
247 }
248 gpr_mu_destroy(&fd_freelist_mu);
249}
250
251static grpc_fd *fd_create(int fd, const char *name) {
252 grpc_fd *new_fd = NULL;
253
254 gpr_mu_lock(&fd_freelist_mu);
255 if (fd_freelist != NULL) {
256 new_fd = fd_freelist;
257 fd_freelist = fd_freelist->freelist_next;
258 }
259 gpr_mu_unlock(&fd_freelist_mu);
260
261 if (new_fd == NULL) {
Yash Tibrewal7cdd99c2017-09-08 16:04:12 -0700262 new_fd = (grpc_fd *)gpr_malloc(sizeof(grpc_fd));
Craig Tillerc67cc992017-04-27 10:15:51 -0700263 }
264
Craig Tillerc67cc992017-04-27 10:15:51 -0700265 new_fd->fd = fd;
Craig Tillerc67cc992017-04-27 10:15:51 -0700266 grpc_lfev_init(&new_fd->read_closure);
267 grpc_lfev_init(&new_fd->write_closure);
268 gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
269
270 new_fd->freelist_next = NULL;
Craig Tillerc67cc992017-04-27 10:15:51 -0700271
272 char *fd_name;
273 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
274 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
Noah Eisen264879f2017-06-20 17:14:47 -0700275#ifndef NDEBUG
276 if (GRPC_TRACER_ON(grpc_trace_fd_refcount)) {
277 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, new_fd, fd_name);
278 }
Craig Tillerc67cc992017-04-27 10:15:51 -0700279#endif
280 gpr_free(fd_name);
Craig Tiller9ddb3152017-04-27 21:32:56 +0000281
Yash Tibrewal533d1182017-09-18 10:48:22 -0700282 struct epoll_event ev;
283 ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
284 ev.data.ptr = new_fd;
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700285 if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
Craig Tiller9ddb3152017-04-27 21:32:56 +0000286 gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno));
287 }
288
Craig Tillerc67cc992017-04-27 10:15:51 -0700289 return new_fd;
290}
291
Craig Tiller4509c472017-04-27 19:05:13 +0000292static int fd_wrapped_fd(grpc_fd *fd) { return fd->fd; }
Craig Tillerc67cc992017-04-27 10:15:51 -0700293
Sree Kuchibhotlaf2641472017-08-02 23:46:40 -0700294/* if 'releasing_fd' is true, it means that we are going to detach the internal
295 * fd from grpc_fd structure (i.e which means we should not be calling
296 * shutdown() syscall on that fd) */
297static void fd_shutdown_internal(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
298 grpc_error *why, bool releasing_fd) {
Craig Tiller9ddb3152017-04-27 21:32:56 +0000299 if (grpc_lfev_set_shutdown(exec_ctx, &fd->read_closure,
300 GRPC_ERROR_REF(why))) {
Sree Kuchibhotlaf2641472017-08-02 23:46:40 -0700301 if (!releasing_fd) {
302 shutdown(fd->fd, SHUT_RDWR);
303 }
Craig Tiller9ddb3152017-04-27 21:32:56 +0000304 grpc_lfev_set_shutdown(exec_ctx, &fd->write_closure, GRPC_ERROR_REF(why));
305 }
306 GRPC_ERROR_UNREF(why);
307}
308
Sree Kuchibhotlaf2641472017-08-02 23:46:40 -0700309/* Might be called multiple times */
310static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
311 fd_shutdown_internal(exec_ctx, fd, why, false);
312}
313
Craig Tillerc67cc992017-04-27 10:15:51 -0700314static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
315 grpc_closure *on_done, int *release_fd,
Yuchen Zengd40a7ae2017-07-12 15:59:56 -0700316 bool already_closed, const char *reason) {
Craig Tillerc67cc992017-04-27 10:15:51 -0700317 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotlaf2641472017-08-02 23:46:40 -0700318 bool is_release_fd = (release_fd != NULL);
Craig Tillerc67cc992017-04-27 10:15:51 -0700319
Craig Tiller9ddb3152017-04-27 21:32:56 +0000320 if (!grpc_lfev_is_shutdown(&fd->read_closure)) {
Sree Kuchibhotlaf2641472017-08-02 23:46:40 -0700321 fd_shutdown_internal(exec_ctx, fd,
322 GRPC_ERROR_CREATE_FROM_COPIED_STRING(reason),
323 is_release_fd);
Craig Tiller9ddb3152017-04-27 21:32:56 +0000324 }
325
Craig Tillerc67cc992017-04-27 10:15:51 -0700326 /* If release_fd is not NULL, we should be relinquishing control of the file
327 descriptor fd->fd (but we still own the grpc_fd structure). */
Sree Kuchibhotlaf2641472017-08-02 23:46:40 -0700328 if (is_release_fd) {
Craig Tillerc67cc992017-04-27 10:15:51 -0700329 *release_fd = fd->fd;
Yuchen Zengd40a7ae2017-07-12 15:59:56 -0700330 } else if (!already_closed) {
Craig Tillerc67cc992017-04-27 10:15:51 -0700331 close(fd->fd);
Craig Tillerc67cc992017-04-27 10:15:51 -0700332 }
333
ncteisen274bbbe2017-06-08 14:57:11 -0700334 GRPC_CLOSURE_SCHED(exec_ctx, on_done, GRPC_ERROR_REF(error));
Craig Tillerc67cc992017-04-27 10:15:51 -0700335
Craig Tiller4509c472017-04-27 19:05:13 +0000336 grpc_iomgr_unregister_object(&fd->iomgr_object);
337 grpc_lfev_destroy(&fd->read_closure);
338 grpc_lfev_destroy(&fd->write_closure);
Craig Tillerc67cc992017-04-27 10:15:51 -0700339
Craig Tiller4509c472017-04-27 19:05:13 +0000340 gpr_mu_lock(&fd_freelist_mu);
341 fd->freelist_next = fd_freelist;
342 fd_freelist = fd;
343 gpr_mu_unlock(&fd_freelist_mu);
Craig Tillerc67cc992017-04-27 10:15:51 -0700344}
345
346static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
347 grpc_fd *fd) {
348 gpr_atm notifier = gpr_atm_acq_load(&fd->read_notifier_pollset);
349 return (grpc_pollset *)notifier;
350}
351
352static bool fd_is_shutdown(grpc_fd *fd) {
353 return grpc_lfev_is_shutdown(&fd->read_closure);
354}
355
Craig Tillerc67cc992017-04-27 10:15:51 -0700356static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
357 grpc_closure *closure) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700358 grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read");
Craig Tillerc67cc992017-04-27 10:15:51 -0700359}
360
361static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
362 grpc_closure *closure) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700363 grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write");
Craig Tillerc67cc992017-04-27 10:15:51 -0700364}
365
Craig Tiller4509c472017-04-27 19:05:13 +0000366static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
367 grpc_pollset *notifier) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700368 grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read");
Craig Tiller4509c472017-04-27 19:05:13 +0000369 /* Use release store to match with acquire load in fd_get_read_notifier */
370 gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
371}
372
373static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700374 grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write");
Craig Tillerc67cc992017-04-27 10:15:51 -0700375}
376
377/*******************************************************************************
378 * Pollset Definitions
379 */
380
Craig Tiller6de05932017-04-28 09:17:38 -0700381GPR_TLS_DECL(g_current_thread_pollset);
382GPR_TLS_DECL(g_current_thread_worker);
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700383
384/* The designated poller */
Craig Tiller6de05932017-04-28 09:17:38 -0700385static gpr_atm g_active_poller;
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700386
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700387static pollset_neighborhood *g_neighborhoods;
388static size_t g_num_neighborhoods;
Craig Tiller6de05932017-04-28 09:17:38 -0700389
Craig Tillerc67cc992017-04-27 10:15:51 -0700390/* Return true if first in list */
Craig Tiller32f90ee2017-04-28 12:46:41 -0700391static bool worker_insert(grpc_pollset *pollset, grpc_pollset_worker *worker) {
392 if (pollset->root_worker == NULL) {
393 pollset->root_worker = worker;
394 worker->next = worker->prev = worker;
Craig Tillerc67cc992017-04-27 10:15:51 -0700395 return true;
396 } else {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700397 worker->next = pollset->root_worker;
398 worker->prev = worker->next->prev;
399 worker->next->prev = worker;
400 worker->prev->next = worker;
Craig Tillerc67cc992017-04-27 10:15:51 -0700401 return false;
402 }
403}
404
405/* Return true if last in list */
406typedef enum { EMPTIED, NEW_ROOT, REMOVED } worker_remove_result;
407
Craig Tiller32f90ee2017-04-28 12:46:41 -0700408static worker_remove_result worker_remove(grpc_pollset *pollset,
Craig Tillerc67cc992017-04-27 10:15:51 -0700409 grpc_pollset_worker *worker) {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700410 if (worker == pollset->root_worker) {
411 if (worker == worker->next) {
412 pollset->root_worker = NULL;
Craig Tillerc67cc992017-04-27 10:15:51 -0700413 return EMPTIED;
414 } else {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700415 pollset->root_worker = worker->next;
416 worker->prev->next = worker->next;
417 worker->next->prev = worker->prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700418 return NEW_ROOT;
419 }
420 } else {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700421 worker->prev->next = worker->next;
422 worker->next->prev = worker->prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700423 return REMOVED;
424 }
425}
426
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700427static size_t choose_neighborhood(void) {
428 return (size_t)gpr_cpu_current_cpu() % g_num_neighborhoods;
Craig Tillerba550da2017-05-01 14:26:31 +0000429}
430
Craig Tiller4509c472017-04-27 19:05:13 +0000431static grpc_error *pollset_global_init(void) {
Craig Tiller4509c472017-04-27 19:05:13 +0000432 gpr_tls_init(&g_current_thread_pollset);
433 gpr_tls_init(&g_current_thread_worker);
Craig Tiller6de05932017-04-28 09:17:38 -0700434 gpr_atm_no_barrier_store(&g_active_poller, 0);
Craig Tiller375eb252017-04-27 23:29:12 +0000435 global_wakeup_fd.read_fd = -1;
436 grpc_error *err = grpc_wakeup_fd_init(&global_wakeup_fd);
437 if (err != GRPC_ERROR_NONE) return err;
Yash Tibrewal533d1182017-09-18 10:48:22 -0700438 struct epoll_event ev;
439 ev.events = (uint32_t)(EPOLLIN | EPOLLET);
440 ev.data.ptr = &global_wakeup_fd;
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700441 if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd,
442 &ev) != 0) {
Craig Tiller4509c472017-04-27 19:05:13 +0000443 return GRPC_OS_ERROR(errno, "epoll_ctl");
444 }
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700445 g_num_neighborhoods = GPR_CLAMP(gpr_cpu_num_cores(), 1, MAX_NEIGHBORHOODS);
446 g_neighborhoods = (pollset_neighborhood *)gpr_zalloc(
447 sizeof(*g_neighborhoods) * g_num_neighborhoods);
448 for (size_t i = 0; i < g_num_neighborhoods; i++) {
449 gpr_mu_init(&g_neighborhoods[i].mu);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700450 }
Craig Tiller4509c472017-04-27 19:05:13 +0000451 return GRPC_ERROR_NONE;
452}
453
454static void pollset_global_shutdown(void) {
Craig Tiller4509c472017-04-27 19:05:13 +0000455 gpr_tls_destroy(&g_current_thread_pollset);
456 gpr_tls_destroy(&g_current_thread_worker);
Craig Tiller375eb252017-04-27 23:29:12 +0000457 if (global_wakeup_fd.read_fd != -1) grpc_wakeup_fd_destroy(&global_wakeup_fd);
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700458 for (size_t i = 0; i < g_num_neighborhoods; i++) {
459 gpr_mu_destroy(&g_neighborhoods[i].mu);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700460 }
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700461 gpr_free(g_neighborhoods);
Craig Tiller4509c472017-04-27 19:05:13 +0000462}
463
464static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
Craig Tiller6de05932017-04-28 09:17:38 -0700465 gpr_mu_init(&pollset->mu);
466 *mu = &pollset->mu;
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700467 pollset->neighborhood = &g_neighborhoods[choose_neighborhood()];
468 pollset->reassigning_neighborhood = false;
Sree Kuchibhotla30882302017-08-16 13:46:52 -0700469 pollset->root_worker = NULL;
470 pollset->kicked_without_poller = false;
Craig Tiller6de05932017-04-28 09:17:38 -0700471 pollset->seen_inactive = true;
Sree Kuchibhotla30882302017-08-16 13:46:52 -0700472 pollset->shutting_down = false;
473 pollset->shutdown_closure = NULL;
474 pollset->begin_refs = 0;
475 pollset->next = pollset->prev = NULL;
Craig Tiller6de05932017-04-28 09:17:38 -0700476}
477
Craig Tillerc6109852017-05-01 14:26:49 -0700478static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
Craig Tillere00d7332017-05-01 15:43:51 +0000479 gpr_mu_lock(&pollset->mu);
Craig Tillerba550da2017-05-01 14:26:31 +0000480 if (!pollset->seen_inactive) {
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700481 pollset_neighborhood *neighborhood = pollset->neighborhood;
Craig Tillere00d7332017-05-01 15:43:51 +0000482 gpr_mu_unlock(&pollset->mu);
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700483 retry_lock_neighborhood:
484 gpr_mu_lock(&neighborhood->mu);
Craig Tillere00d7332017-05-01 15:43:51 +0000485 gpr_mu_lock(&pollset->mu);
486 if (!pollset->seen_inactive) {
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700487 if (pollset->neighborhood != neighborhood) {
488 gpr_mu_unlock(&neighborhood->mu);
489 neighborhood = pollset->neighborhood;
Craig Tillere00d7332017-05-01 15:43:51 +0000490 gpr_mu_unlock(&pollset->mu);
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700491 goto retry_lock_neighborhood;
Craig Tillere00d7332017-05-01 15:43:51 +0000492 }
493 pollset->prev->next = pollset->next;
494 pollset->next->prev = pollset->prev;
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700495 if (pollset == pollset->neighborhood->active_root) {
496 pollset->neighborhood->active_root =
Craig Tillere00d7332017-05-01 15:43:51 +0000497 pollset->next == pollset ? NULL : pollset->next;
498 }
Craig Tillerba550da2017-05-01 14:26:31 +0000499 }
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700500 gpr_mu_unlock(&pollset->neighborhood->mu);
Craig Tiller6de05932017-04-28 09:17:38 -0700501 }
Craig Tillere00d7332017-05-01 15:43:51 +0000502 gpr_mu_unlock(&pollset->mu);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700503 gpr_mu_destroy(&pollset->mu);
Craig Tiller4509c472017-04-27 19:05:13 +0000504}
505
Craig Tiller0ff222a2017-09-01 09:41:43 -0700506static grpc_error *pollset_kick_all(grpc_exec_ctx *exec_ctx,
507 grpc_pollset *pollset) {
yang-gdf92a642017-08-21 22:38:45 -0700508 GPR_TIMER_BEGIN("pollset_kick_all", 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000509 grpc_error *error = GRPC_ERROR_NONE;
510 if (pollset->root_worker != NULL) {
511 grpc_pollset_worker *worker = pollset->root_worker;
512 do {
Craig Tiller0ff222a2017-09-01 09:41:43 -0700513 GRPC_STATS_INC_POLLSET_KICK(exec_ctx);
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700514 switch (worker->state) {
Craig Tiller55624a32017-05-26 08:14:44 -0700515 case KICKED:
Craig Tiller480f5d82017-09-13 09:36:07 -0700516 GRPC_STATS_INC_POLLSET_KICKED_AGAIN(exec_ctx);
Craig Tiller55624a32017-05-26 08:14:44 -0700517 break;
518 case UNKICKED:
519 SET_KICK_STATE(worker, KICKED);
520 if (worker->initialized_cv) {
Craig Tillerebacb2f2017-09-13 12:32:33 -0700521 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV(exec_ctx);
Craig Tiller55624a32017-05-26 08:14:44 -0700522 gpr_cv_signal(&worker->cv);
523 }
524 break;
525 case DESIGNATED_POLLER:
Craig Tiller480f5d82017-09-13 09:36:07 -0700526 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD(exec_ctx);
Craig Tiller55624a32017-05-26 08:14:44 -0700527 SET_KICK_STATE(worker, KICKED);
528 append_error(&error, grpc_wakeup_fd_wakeup(&global_wakeup_fd),
Sree Kuchibhotla0d8431a2017-07-18 16:21:54 -0700529 "pollset_kick_all");
Craig Tiller55624a32017-05-26 08:14:44 -0700530 break;
Craig Tiller4509c472017-04-27 19:05:13 +0000531 }
532
Craig Tiller32f90ee2017-04-28 12:46:41 -0700533 worker = worker->next;
Craig Tiller4509c472017-04-27 19:05:13 +0000534 } while (worker != pollset->root_worker);
535 }
Sree Kuchibhotla0d8431a2017-07-18 16:21:54 -0700536 // TODO: sreek. Check if we need to set 'kicked_without_poller' to true here
537 // in the else case
yang-gdf92a642017-08-21 22:38:45 -0700538 GPR_TIMER_END("pollset_kick_all", 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000539 return error;
540}
541
542static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx,
543 grpc_pollset *pollset) {
Craig Tillerba550da2017-05-01 14:26:31 +0000544 if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL &&
545 pollset->begin_refs == 0) {
yang-gdf92a642017-08-21 22:38:45 -0700546 GPR_TIMER_MARK("pollset_finish_shutdown", 0);
ncteisen274bbbe2017-06-08 14:57:11 -0700547 GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE);
Craig Tiller4509c472017-04-27 19:05:13 +0000548 pollset->shutdown_closure = NULL;
549 }
550}
551
552static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
553 grpc_closure *closure) {
yang-gdf92a642017-08-21 22:38:45 -0700554 GPR_TIMER_BEGIN("pollset_shutdown", 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000555 GPR_ASSERT(pollset->shutdown_closure == NULL);
Craig Tillerc81512a2017-05-26 09:53:58 -0700556 GPR_ASSERT(!pollset->shutting_down);
Craig Tiller4509c472017-04-27 19:05:13 +0000557 pollset->shutdown_closure = closure;
Craig Tillerc81512a2017-05-26 09:53:58 -0700558 pollset->shutting_down = true;
Craig Tiller0ff222a2017-09-01 09:41:43 -0700559 GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(exec_ctx, pollset));
Craig Tiller4509c472017-04-27 19:05:13 +0000560 pollset_maybe_finish_shutdown(exec_ctx, pollset);
yang-gdf92a642017-08-21 22:38:45 -0700561 GPR_TIMER_END("pollset_shutdown", 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000562}
563
Craig Tiller4509c472017-04-27 19:05:13 +0000564static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
565 gpr_timespec now) {
566 gpr_timespec timeout;
567 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
568 return -1;
569 }
570
571 if (gpr_time_cmp(deadline, now) <= 0) {
572 return 0;
573 }
574
575 static const gpr_timespec round_up = {
Yash Tibrewal06312bd2017-09-18 15:10:22 -0700576 0, /* tv_sec */
577 GPR_NS_PER_MS - 1, /* tv_nsec */
578 GPR_TIMESPAN /* clock_type */
579 };
Craig Tiller4509c472017-04-27 19:05:13 +0000580 timeout = gpr_time_sub(deadline, now);
581 int millis = gpr_time_to_millis(gpr_time_add(timeout, round_up));
582 return millis >= 1 ? millis : 1;
583}
584
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700585/* Process the epoll events found by do_epoll_wait() function.
586 - g_epoll_set.cursor points to the index of the first event to be processed
587 - This function then processes up-to MAX_EPOLL_EVENTS_PER_ITERATION and
588 updates the g_epoll_set.cursor
Craig Tiller4509c472017-04-27 19:05:13 +0000589
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700590 NOTE ON SYNCRHONIZATION: Similar to do_epoll_wait(), this function is only
591 called by g_active_poller thread. So there is no need for synchronization
592 when accessing fields in g_epoll_set */
593static grpc_error *process_epoll_events(grpc_exec_ctx *exec_ctx,
594 grpc_pollset *pollset) {
595 static const char *err_desc = "process_events";
Craig Tiller4509c472017-04-27 19:05:13 +0000596 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700597
Sree Kuchibhotla3d609f12017-08-25 10:00:18 -0700598 GPR_TIMER_BEGIN("process_epoll_events", 0);
Sree Kuchibhotlaa92a9cc2017-08-27 14:02:15 -0700599 long num_events = gpr_atm_acq_load(&g_epoll_set.num_events);
600 long cursor = gpr_atm_acq_load(&g_epoll_set.cursor);
601 for (int idx = 0;
602 (idx < MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION) && cursor != num_events;
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700603 idx++) {
Sree Kuchibhotlaa92a9cc2017-08-27 14:02:15 -0700604 long c = cursor++;
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700605 struct epoll_event *ev = &g_epoll_set.events[c];
606 void *data_ptr = ev->data.ptr;
607
Craig Tiller4509c472017-04-27 19:05:13 +0000608 if (data_ptr == &global_wakeup_fd) {
Craig Tiller4509c472017-04-27 19:05:13 +0000609 append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
610 err_desc);
611 } else {
612 grpc_fd *fd = (grpc_fd *)(data_ptr);
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700613 bool cancel = (ev->events & (EPOLLERR | EPOLLHUP)) != 0;
614 bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0;
615 bool write_ev = (ev->events & EPOLLOUT) != 0;
616
Craig Tiller4509c472017-04-27 19:05:13 +0000617 if (read_ev || cancel) {
618 fd_become_readable(exec_ctx, fd, pollset);
619 }
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700620
Craig Tiller4509c472017-04-27 19:05:13 +0000621 if (write_ev || cancel) {
622 fd_become_writable(exec_ctx, fd);
623 }
624 }
625 }
Sree Kuchibhotlaa92a9cc2017-08-27 14:02:15 -0700626 gpr_atm_rel_store(&g_epoll_set.cursor, cursor);
Sree Kuchibhotla3d609f12017-08-25 10:00:18 -0700627 GPR_TIMER_END("process_epoll_events", 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000628 return error;
629}
630
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700631/* Do epoll_wait and store the events in g_epoll_set.events field. This does not
632 "process" any of the events yet; that is done in process_epoll_events().
633 *See process_epoll_events() function for more details.
634
635 NOTE ON SYNCHRONIZATION: At any point of time, only the g_active_poller
636 (i.e the designated poller thread) will be calling this function. So there is
637 no need for any synchronization when accesing fields in g_epoll_set */
638static grpc_error *do_epoll_wait(grpc_exec_ctx *exec_ctx, grpc_pollset *ps,
639 gpr_timespec now, gpr_timespec deadline) {
Sree Kuchibhotla3d609f12017-08-25 10:00:18 -0700640 GPR_TIMER_BEGIN("do_epoll_wait", 0);
641
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700642 int r;
643 int timeout = poll_deadline_to_millis_timeout(deadline, now);
644 if (timeout != 0) {
645 GRPC_SCHEDULING_START_BLOCKING_REGION;
646 }
647 do {
Craig Tillerb4bb1cd2017-07-20 14:18:17 -0700648 GRPC_STATS_INC_SYSCALL_POLL(exec_ctx);
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700649 r = epoll_wait(g_epoll_set.epfd, g_epoll_set.events, MAX_EPOLL_EVENTS,
650 timeout);
651 } while (r < 0 && errno == EINTR);
652 if (timeout != 0) {
653 GRPC_SCHEDULING_END_BLOCKING_REGION;
654 }
655
656 if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
657
Craig Tiller0ff222a2017-09-01 09:41:43 -0700658 GRPC_STATS_INC_POLL_EVENTS_RETURNED(exec_ctx, r);
659
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700660 if (GRPC_TRACER_ON(grpc_polling_trace)) {
661 gpr_log(GPR_DEBUG, "ps: %p poll got %d events", ps, r);
662 }
663
Sree Kuchibhotlaa92a9cc2017-08-27 14:02:15 -0700664 gpr_atm_rel_store(&g_epoll_set.num_events, r);
665 gpr_atm_rel_store(&g_epoll_set.cursor, 0);
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700666
Sree Kuchibhotla3d609f12017-08-25 10:00:18 -0700667 GPR_TIMER_END("do_epoll_wait", 0);
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700668 return GRPC_ERROR_NONE;
669}
670
Craig Tiller4509c472017-04-27 19:05:13 +0000671static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
672 grpc_pollset_worker **worker_hdl, gpr_timespec *now,
673 gpr_timespec deadline) {
yang-gdf92a642017-08-21 22:38:45 -0700674 GPR_TIMER_BEGIN("begin_worker", 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000675 if (worker_hdl != NULL) *worker_hdl = worker;
676 worker->initialized_cv = false;
Craig Tiller55624a32017-05-26 08:14:44 -0700677 SET_KICK_STATE(worker, UNKICKED);
Craig Tiller50da5ec2017-05-01 13:51:14 -0700678 worker->schedule_on_end_work = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
Craig Tillerba550da2017-05-01 14:26:31 +0000679 pollset->begin_refs++;
Craig Tiller4509c472017-04-27 19:05:13 +0000680
Craig Tiller830e82a2017-05-31 16:26:27 -0700681 if (GRPC_TRACER_ON(grpc_polling_trace)) {
682 gpr_log(GPR_ERROR, "PS:%p BEGIN_STARTS:%p", pollset, worker);
683 }
684
Craig Tiller32f90ee2017-04-28 12:46:41 -0700685 if (pollset->seen_inactive) {
686 // pollset has been observed to be inactive, we need to move back to the
687 // active list
Craig Tillere00d7332017-05-01 15:43:51 +0000688 bool is_reassigning = false;
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700689 if (!pollset->reassigning_neighborhood) {
Craig Tillere00d7332017-05-01 15:43:51 +0000690 is_reassigning = true;
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700691 pollset->reassigning_neighborhood = true;
692 pollset->neighborhood = &g_neighborhoods[choose_neighborhood()];
Craig Tillere00d7332017-05-01 15:43:51 +0000693 }
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700694 pollset_neighborhood *neighborhood = pollset->neighborhood;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700695 gpr_mu_unlock(&pollset->mu);
Craig Tillerba550da2017-05-01 14:26:31 +0000696 // pollset unlocked: state may change (even worker->kick_state)
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700697 retry_lock_neighborhood:
698 gpr_mu_lock(&neighborhood->mu);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700699 gpr_mu_lock(&pollset->mu);
Craig Tiller830e82a2017-05-31 16:26:27 -0700700 if (GRPC_TRACER_ON(grpc_polling_trace)) {
701 gpr_log(GPR_ERROR, "PS:%p BEGIN_REORG:%p kick_state=%s is_reassigning=%d",
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700702 pollset, worker, kick_state_string(worker->state),
Craig Tiller830e82a2017-05-31 16:26:27 -0700703 is_reassigning);
704 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700705 if (pollset->seen_inactive) {
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700706 if (neighborhood != pollset->neighborhood) {
707 gpr_mu_unlock(&neighborhood->mu);
708 neighborhood = pollset->neighborhood;
Craig Tiller2acab6e2017-04-30 23:06:33 +0000709 gpr_mu_unlock(&pollset->mu);
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700710 goto retry_lock_neighborhood;
Craig Tiller2acab6e2017-04-30 23:06:33 +0000711 }
Sree Kuchibhotla0d8431a2017-07-18 16:21:54 -0700712
Sree Kuchibhotlafb349402017-09-06 10:58:06 -0700713 /* In the brief time we released the pollset locks above, the worker MAY
714 have been kicked. In this case, the worker should get out of this
715 pollset ASAP and hence this should neither add the pollset to
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700716 neighborhood nor mark the pollset as active.
Sree Kuchibhotlafb349402017-09-06 10:58:06 -0700717
718 On a side note, the only way a worker's kick state could have changed
719 at this point is if it were "kicked specifically". Since the worker has
720 not added itself to the pollset yet (by calling worker_insert()), it is
721 not visible in the "kick any" path yet */
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700722 if (worker->state == UNKICKED) {
Sree Kuchibhotlafb349402017-09-06 10:58:06 -0700723 pollset->seen_inactive = false;
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700724 if (neighborhood->active_root == NULL) {
725 neighborhood->active_root = pollset->next = pollset->prev = pollset;
Sree Kuchibhotlafb349402017-09-06 10:58:06 -0700726 /* Make this the designated poller if there isn't one already */
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700727 if (worker->state == UNKICKED &&
Sree Kuchibhotlafb349402017-09-06 10:58:06 -0700728 gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)worker)) {
729 SET_KICK_STATE(worker, DESIGNATED_POLLER);
730 }
731 } else {
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700732 pollset->next = neighborhood->active_root;
Sree Kuchibhotlafb349402017-09-06 10:58:06 -0700733 pollset->prev = pollset->next->prev;
734 pollset->next->prev = pollset->prev->next = pollset;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700735 }
Craig Tiller4509c472017-04-27 19:05:13 +0000736 }
737 }
Craig Tillere00d7332017-05-01 15:43:51 +0000738 if (is_reassigning) {
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700739 GPR_ASSERT(pollset->reassigning_neighborhood);
740 pollset->reassigning_neighborhood = false;
Craig Tillere00d7332017-05-01 15:43:51 +0000741 }
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700742 gpr_mu_unlock(&neighborhood->mu);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700743 }
Sree Kuchibhotlae6506bc2017-07-18 21:43:45 -0700744
Craig Tiller32f90ee2017-04-28 12:46:41 -0700745 worker_insert(pollset, worker);
Craig Tillerba550da2017-05-01 14:26:31 +0000746 pollset->begin_refs--;
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700747 if (worker->state == UNKICKED && !pollset->kicked_without_poller) {
Craig Tillera4b8eb02017-04-29 00:13:52 +0000748 GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700749 worker->initialized_cv = true;
750 gpr_cv_init(&worker->cv);
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700751 while (worker->state == UNKICKED && !pollset->shutting_down) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700752 if (GRPC_TRACER_ON(grpc_polling_trace)) {
753 gpr_log(GPR_ERROR, "PS:%p BEGIN_WAIT:%p kick_state=%s shutdown=%d",
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700754 pollset, worker, kick_state_string(worker->state),
Craig Tiller830e82a2017-05-31 16:26:27 -0700755 pollset->shutting_down);
756 }
Sree Kuchibhotla0d8431a2017-07-18 16:21:54 -0700757
Craig Tiller32f90ee2017-04-28 12:46:41 -0700758 if (gpr_cv_wait(&worker->cv, &pollset->mu, deadline) &&
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700759 worker->state == UNKICKED) {
Sree Kuchibhotla0d8431a2017-07-18 16:21:54 -0700760 /* If gpr_cv_wait returns true (i.e a timeout), pretend that the worker
761 received a kick */
Craig Tiller55624a32017-05-26 08:14:44 -0700762 SET_KICK_STATE(worker, KICKED);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700763 }
Craig Tillerba550da2017-05-01 14:26:31 +0000764 }
Craig Tiller4509c472017-04-27 19:05:13 +0000765 *now = gpr_now(now->clock_type);
766 }
Sree Kuchibhotla949d0752017-07-20 23:49:15 -0700767
Craig Tiller830e82a2017-05-31 16:26:27 -0700768 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Sree Kuchibhotla949d0752017-07-20 23:49:15 -0700769 gpr_log(GPR_ERROR,
770 "PS:%p BEGIN_DONE:%p kick_state=%s shutdown=%d "
771 "kicked_without_poller: %d",
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700772 pollset, worker, kick_state_string(worker->state),
Sree Kuchibhotla949d0752017-07-20 23:49:15 -0700773 pollset->shutting_down, pollset->kicked_without_poller);
Craig Tiller830e82a2017-05-31 16:26:27 -0700774 }
Craig Tiller4509c472017-04-27 19:05:13 +0000775
Sree Kuchibhotlae6506bc2017-07-18 21:43:45 -0700776 /* We release pollset lock in this function at a couple of places:
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700777 * 1. Briefly when assigning pollset to a neighborhood
Sree Kuchibhotlae6506bc2017-07-18 21:43:45 -0700778 * 2. When doing gpr_cv_wait()
779 * It is possible that 'kicked_without_poller' was set to true during (1) and
780 * 'shutting_down' is set to true during (1) or (2). If either of them is
781 * true, this worker cannot do polling */
Sree Kuchibhotlae6506bc2017-07-18 21:43:45 -0700782 /* TODO(sreek): Perhaps there is a better way to handle kicked_without_poller
783 * case; especially when the worker is the DESIGNATED_POLLER */
784
Sree Kuchibhotlaa0616ef2017-07-18 23:49:49 -0700785 if (pollset->kicked_without_poller) {
786 pollset->kicked_without_poller = false;
yang-gdf92a642017-08-21 22:38:45 -0700787 GPR_TIMER_END("begin_worker", 0);
Sree Kuchibhotlaa0616ef2017-07-18 23:49:49 -0700788 return false;
789 }
790
yang-gdf92a642017-08-21 22:38:45 -0700791 GPR_TIMER_END("begin_worker", 0);
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700792 return worker->state == DESIGNATED_POLLER && !pollset->shutting_down;
Craig Tiller4509c472017-04-27 19:05:13 +0000793}
794
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700795static bool check_neighborhood_for_available_poller(
Craig Tiller64f8b122017-09-13 12:39:21 -0700796 grpc_exec_ctx *exec_ctx, pollset_neighborhood *neighborhood) {
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700797 GPR_TIMER_BEGIN("check_neighborhood_for_available_poller", 0);
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700798 bool found_worker = false;
799 do {
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700800 grpc_pollset *inspect = neighborhood->active_root;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700801 if (inspect == NULL) {
802 break;
803 }
804 gpr_mu_lock(&inspect->mu);
805 GPR_ASSERT(!inspect->seen_inactive);
806 grpc_pollset_worker *inspect_worker = inspect->root_worker;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700807 if (inspect_worker != NULL) {
Craig Tillera4b8eb02017-04-29 00:13:52 +0000808 do {
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700809 switch (inspect_worker->state) {
Craig Tillerba550da2017-05-01 14:26:31 +0000810 case UNKICKED:
811 if (gpr_atm_no_barrier_cas(&g_active_poller, 0,
812 (gpr_atm)inspect_worker)) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700813 if (GRPC_TRACER_ON(grpc_polling_trace)) {
814 gpr_log(GPR_DEBUG, " .. choose next poller to be %p",
815 inspect_worker);
816 }
Craig Tiller55624a32017-05-26 08:14:44 -0700817 SET_KICK_STATE(inspect_worker, DESIGNATED_POLLER);
Craig Tillerba550da2017-05-01 14:26:31 +0000818 if (inspect_worker->initialized_cv) {
yang-gdf92a642017-08-21 22:38:45 -0700819 GPR_TIMER_MARK("signal worker", 0);
Craig Tillercf34fa52017-09-13 12:37:01 -0700820 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV(exec_ctx);
Craig Tillerba550da2017-05-01 14:26:31 +0000821 gpr_cv_signal(&inspect_worker->cv);
822 }
Craig Tiller830e82a2017-05-31 16:26:27 -0700823 } else {
824 if (GRPC_TRACER_ON(grpc_polling_trace)) {
825 gpr_log(GPR_DEBUG, " .. beaten to choose next poller");
826 }
Craig Tillera4b8eb02017-04-29 00:13:52 +0000827 }
Craig Tillerba550da2017-05-01 14:26:31 +0000828 // even if we didn't win the cas, there's a worker, we can stop
829 found_worker = true;
830 break;
831 case KICKED:
832 break;
833 case DESIGNATED_POLLER:
834 found_worker = true; // ok, so someone else found the worker, but
835 // we'll accept that
836 break;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700837 }
Craig Tillera4b8eb02017-04-29 00:13:52 +0000838 inspect_worker = inspect_worker->next;
Craig Tiller830e82a2017-05-31 16:26:27 -0700839 } while (!found_worker && inspect_worker != inspect->root_worker);
Craig Tillera4b8eb02017-04-29 00:13:52 +0000840 }
841 if (!found_worker) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700842 if (GRPC_TRACER_ON(grpc_polling_trace)) {
843 gpr_log(GPR_DEBUG, " .. mark pollset %p inactive", inspect);
844 }
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700845 inspect->seen_inactive = true;
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700846 if (inspect == neighborhood->active_root) {
847 neighborhood->active_root =
Craig Tillera95bacf2017-05-01 12:51:24 -0700848 inspect->next == inspect ? NULL : inspect->next;
Craig Tiller2acab6e2017-04-30 23:06:33 +0000849 }
850 inspect->next->prev = inspect->prev;
851 inspect->prev->next = inspect->next;
Craig Tillere00d7332017-05-01 15:43:51 +0000852 inspect->next = inspect->prev = NULL;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700853 }
854 gpr_mu_unlock(&inspect->mu);
855 } while (!found_worker);
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700856 GPR_TIMER_END("check_neighborhood_for_available_poller", 0);
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700857 return found_worker;
858}
859
Craig Tiller4509c472017-04-27 19:05:13 +0000860static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
861 grpc_pollset_worker *worker,
862 grpc_pollset_worker **worker_hdl) {
yang-gdf92a642017-08-21 22:38:45 -0700863 GPR_TIMER_BEGIN("end_worker", 0);
Craig Tiller830e82a2017-05-31 16:26:27 -0700864 if (GRPC_TRACER_ON(grpc_polling_trace)) {
865 gpr_log(GPR_DEBUG, "PS:%p END_WORKER:%p", pollset, worker);
866 }
Craig Tiller8502ecb2017-04-28 14:22:01 -0700867 if (worker_hdl != NULL) *worker_hdl = NULL;
Craig Tiller830e82a2017-05-31 16:26:27 -0700868 /* Make sure we appear kicked */
Craig Tiller55624a32017-05-26 08:14:44 -0700869 SET_KICK_STATE(worker, KICKED);
Craig Tiller50da5ec2017-05-01 13:51:14 -0700870 grpc_closure_list_move(&worker->schedule_on_end_work,
871 &exec_ctx->closure_list);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700872 if (gpr_atm_no_barrier_load(&g_active_poller) == (gpr_atm)worker) {
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700873 if (worker->next != worker && worker->next->state == UNKICKED) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700874 if (GRPC_TRACER_ON(grpc_polling_trace)) {
875 gpr_log(GPR_DEBUG, " .. choose next poller to be peer %p", worker);
876 }
Craig Tiller2acab6e2017-04-30 23:06:33 +0000877 GPR_ASSERT(worker->next->initialized_cv);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700878 gpr_atm_no_barrier_store(&g_active_poller, (gpr_atm)worker->next);
Craig Tiller55624a32017-05-26 08:14:44 -0700879 SET_KICK_STATE(worker->next, DESIGNATED_POLLER);
Craig Tiller1a012bb2017-09-13 14:29:00 -0700880 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV(exec_ctx);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700881 gpr_cv_signal(&worker->next->cv);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700882 if (grpc_exec_ctx_has_work(exec_ctx)) {
883 gpr_mu_unlock(&pollset->mu);
884 grpc_exec_ctx_flush(exec_ctx);
885 gpr_mu_lock(&pollset->mu);
886 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700887 } else {
888 gpr_atm_no_barrier_store(&g_active_poller, 0);
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700889 size_t poller_neighborhood_idx =
890 (size_t)(pollset->neighborhood - g_neighborhoods);
Craig Tillerbb742672017-05-17 22:19:05 +0000891 gpr_mu_unlock(&pollset->mu);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700892 bool found_worker = false;
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700893 bool scan_state[MAX_NEIGHBORHOODS];
894 for (size_t i = 0; !found_worker && i < g_num_neighborhoods; i++) {
895 pollset_neighborhood *neighborhood =
896 &g_neighborhoods[(poller_neighborhood_idx + i) %
897 g_num_neighborhoods];
898 if (gpr_mu_trylock(&neighborhood->mu)) {
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700899 found_worker =
Craig Tiller64f8b122017-09-13 12:39:21 -0700900 check_neighborhood_for_available_poller(exec_ctx, neighborhood);
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700901 gpr_mu_unlock(&neighborhood->mu);
Craig Tillerba550da2017-05-01 14:26:31 +0000902 scan_state[i] = true;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700903 } else {
Craig Tillerba550da2017-05-01 14:26:31 +0000904 scan_state[i] = false;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700905 }
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700906 }
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700907 for (size_t i = 0; !found_worker && i < g_num_neighborhoods; i++) {
Craig Tillerba550da2017-05-01 14:26:31 +0000908 if (scan_state[i]) continue;
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700909 pollset_neighborhood *neighborhood =
910 &g_neighborhoods[(poller_neighborhood_idx + i) %
911 g_num_neighborhoods];
912 gpr_mu_lock(&neighborhood->mu);
Craig Tillercf34fa52017-09-13 12:37:01 -0700913 found_worker =
Craig Tiller64f8b122017-09-13 12:39:21 -0700914 check_neighborhood_for_available_poller(exec_ctx, neighborhood);
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700915 gpr_mu_unlock(&neighborhood->mu);
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700916 }
Craig Tiller8502ecb2017-04-28 14:22:01 -0700917 grpc_exec_ctx_flush(exec_ctx);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700918 gpr_mu_lock(&pollset->mu);
919 }
Craig Tiller50da5ec2017-05-01 13:51:14 -0700920 } else if (grpc_exec_ctx_has_work(exec_ctx)) {
921 gpr_mu_unlock(&pollset->mu);
922 grpc_exec_ctx_flush(exec_ctx);
923 gpr_mu_lock(&pollset->mu);
Craig Tiller4509c472017-04-27 19:05:13 +0000924 }
925 if (worker->initialized_cv) {
926 gpr_cv_destroy(&worker->cv);
927 }
Craig Tiller830e82a2017-05-31 16:26:27 -0700928 if (GRPC_TRACER_ON(grpc_polling_trace)) {
929 gpr_log(GPR_DEBUG, " .. remove worker");
930 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700931 if (EMPTIED == worker_remove(pollset, worker)) {
Craig Tiller4509c472017-04-27 19:05:13 +0000932 pollset_maybe_finish_shutdown(exec_ctx, pollset);
933 }
Craig Tillera4b8eb02017-04-29 00:13:52 +0000934 GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
yang-gdf92a642017-08-21 22:38:45 -0700935 GPR_TIMER_END("end_worker", 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000936}
937
938/* pollset->po.mu lock must be held by the caller before calling this.
939 The function pollset_work() may temporarily release the lock (pollset->po.mu)
940 during the course of its execution but it will always re-acquire the lock and
941 ensure that it is held by the time the function returns */
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700942static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *ps,
Craig Tiller4509c472017-04-27 19:05:13 +0000943 grpc_pollset_worker **worker_hdl,
944 gpr_timespec now, gpr_timespec deadline) {
945 grpc_pollset_worker worker;
946 grpc_error *error = GRPC_ERROR_NONE;
947 static const char *err_desc = "pollset_work";
yang-gdf92a642017-08-21 22:38:45 -0700948 GPR_TIMER_BEGIN("pollset_work", 0);
Sree Kuchibhotlab154cd12017-08-25 10:33:41 -0700949 if (ps->kicked_without_poller) {
950 ps->kicked_without_poller = false;
yang-gdf92a642017-08-21 22:38:45 -0700951 GPR_TIMER_END("pollset_work", 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000952 return GRPC_ERROR_NONE;
953 }
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700954
955 if (begin_worker(ps, &worker, worker_hdl, &now, deadline)) {
956 gpr_tls_set(&g_current_thread_pollset, (intptr_t)ps);
Craig Tiller4509c472017-04-27 19:05:13 +0000957 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700958 GPR_ASSERT(!ps->shutting_down);
959 GPR_ASSERT(!ps->seen_inactive);
960
961 gpr_mu_unlock(&ps->mu); /* unlock */
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700962 /* This is the designated polling thread at this point and should ideally do
963 polling. However, if there are unprocessed events left from a previous
964 call to do_epoll_wait(), skip calling epoll_wait() in this iteration and
965 process the pending epoll events.
966
967 The reason for decoupling do_epoll_wait and process_epoll_events is to
968 better distrubute the work (i.e handling epoll events) across multiple
969 threads
970
971 process_epoll_events() returns very quickly: It just queues the work on
972 exec_ctx but does not execute it (the actual exectution or more
973 accurately grpc_exec_ctx_flush() happens in end_worker() AFTER selecting
974 a designated poller). So we are not waiting long periods without a
975 designated poller */
Sree Kuchibhotlaa92a9cc2017-08-27 14:02:15 -0700976 if (gpr_atm_acq_load(&g_epoll_set.cursor) ==
977 gpr_atm_acq_load(&g_epoll_set.num_events)) {
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700978 append_error(&error, do_epoll_wait(exec_ctx, ps, now, deadline),
979 err_desc);
980 }
981 append_error(&error, process_epoll_events(exec_ctx, ps), err_desc);
982
983 gpr_mu_lock(&ps->mu); /* lock */
984
Craig Tiller4509c472017-04-27 19:05:13 +0000985 gpr_tls_set(&g_current_thread_worker, 0);
Craig Tiller830e82a2017-05-31 16:26:27 -0700986 } else {
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700987 gpr_tls_set(&g_current_thread_pollset, (intptr_t)ps);
Craig Tiller4509c472017-04-27 19:05:13 +0000988 }
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700989 end_worker(exec_ctx, ps, &worker, worker_hdl);
990
Craig Tiller8502ecb2017-04-28 14:22:01 -0700991 gpr_tls_set(&g_current_thread_pollset, 0);
yang-gdf92a642017-08-21 22:38:45 -0700992 GPR_TIMER_END("pollset_work", 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000993 return error;
994}
995
Craig Tiller0ff222a2017-09-01 09:41:43 -0700996static grpc_error *pollset_kick(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
Craig Tiller4509c472017-04-27 19:05:13 +0000997 grpc_pollset_worker *specific_worker) {
yang-gdf92a642017-08-21 22:38:45 -0700998 GPR_TIMER_BEGIN("pollset_kick", 0);
Craig Tiller0ff222a2017-09-01 09:41:43 -0700999 GRPC_STATS_INC_POLLSET_KICK(exec_ctx);
yang-gdf92a642017-08-21 22:38:45 -07001000 grpc_error *ret_err = GRPC_ERROR_NONE;
Craig Tillerb89bac02017-05-26 15:20:32 +00001001 if (GRPC_TRACER_ON(grpc_polling_trace)) {
1002 gpr_strvec log;
1003 gpr_strvec_init(&log);
1004 char *tmp;
Craig Tiller75aef7f2017-05-26 08:26:08 -07001005 gpr_asprintf(
1006 &tmp, "PS:%p KICK:%p curps=%p curworker=%p root=%p", pollset,
1007 specific_worker, (void *)gpr_tls_get(&g_current_thread_pollset),
1008 (void *)gpr_tls_get(&g_current_thread_worker), pollset->root_worker);
Craig Tillerb89bac02017-05-26 15:20:32 +00001009 gpr_strvec_add(&log, tmp);
1010 if (pollset->root_worker != NULL) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001011 gpr_asprintf(&tmp, " {kick_state=%s next=%p {kick_state=%s}}",
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -07001012 kick_state_string(pollset->root_worker->state),
Craig Tiller830e82a2017-05-31 16:26:27 -07001013 pollset->root_worker->next,
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -07001014 kick_state_string(pollset->root_worker->next->state));
Craig Tillerb89bac02017-05-26 15:20:32 +00001015 gpr_strvec_add(&log, tmp);
1016 }
1017 if (specific_worker != NULL) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001018 gpr_asprintf(&tmp, " worker_kick_state=%s",
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -07001019 kick_state_string(specific_worker->state));
Craig Tillerb89bac02017-05-26 15:20:32 +00001020 gpr_strvec_add(&log, tmp);
1021 }
1022 tmp = gpr_strvec_flatten(&log, NULL);
1023 gpr_strvec_destroy(&log);
Craig Tiller830e82a2017-05-31 16:26:27 -07001024 gpr_log(GPR_ERROR, "%s", tmp);
Craig Tillerb89bac02017-05-26 15:20:32 +00001025 gpr_free(tmp);
1026 }
Sree Kuchibhotlafb349402017-09-06 10:58:06 -07001027
Craig Tiller4509c472017-04-27 19:05:13 +00001028 if (specific_worker == NULL) {
1029 if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) {
Craig Tiller375eb252017-04-27 23:29:12 +00001030 grpc_pollset_worker *root_worker = pollset->root_worker;
1031 if (root_worker == NULL) {
Craig Tiller0ff222a2017-09-01 09:41:43 -07001032 GRPC_STATS_INC_POLLSET_KICKED_WITHOUT_POLLER(exec_ctx);
Craig Tiller4509c472017-04-27 19:05:13 +00001033 pollset->kicked_without_poller = true;
Craig Tiller75aef7f2017-05-26 08:26:08 -07001034 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001035 gpr_log(GPR_ERROR, " .. kicked_without_poller");
Craig Tiller75aef7f2017-05-26 08:26:08 -07001036 }
yang-gdf92a642017-08-21 22:38:45 -07001037 goto done;
Craig Tiller375eb252017-04-27 23:29:12 +00001038 }
Craig Tiller32f90ee2017-04-28 12:46:41 -07001039 grpc_pollset_worker *next_worker = root_worker->next;
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -07001040 if (root_worker->state == KICKED) {
Craig Tiller0ff222a2017-09-01 09:41:43 -07001041 GRPC_STATS_INC_POLLSET_KICKED_AGAIN(exec_ctx);
Craig Tiller75aef7f2017-05-26 08:26:08 -07001042 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001043 gpr_log(GPR_ERROR, " .. already kicked %p", root_worker);
1044 }
1045 SET_KICK_STATE(root_worker, KICKED);
yang-gdf92a642017-08-21 22:38:45 -07001046 goto done;
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -07001047 } else if (next_worker->state == KICKED) {
Craig Tiller0ff222a2017-09-01 09:41:43 -07001048 GRPC_STATS_INC_POLLSET_KICKED_AGAIN(exec_ctx);
Craig Tiller830e82a2017-05-31 16:26:27 -07001049 if (GRPC_TRACER_ON(grpc_polling_trace)) {
1050 gpr_log(GPR_ERROR, " .. already kicked %p", next_worker);
1051 }
1052 SET_KICK_STATE(next_worker, KICKED);
yang-gdf92a642017-08-21 22:38:45 -07001053 goto done;
Craig Tiller830e82a2017-05-31 16:26:27 -07001054 } else if (root_worker ==
1055 next_worker && // only try and wake up a poller if
1056 // there is no next worker
1057 root_worker == (grpc_pollset_worker *)gpr_atm_no_barrier_load(
1058 &g_active_poller)) {
Craig Tiller0ff222a2017-09-01 09:41:43 -07001059 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD(exec_ctx);
Craig Tiller830e82a2017-05-31 16:26:27 -07001060 if (GRPC_TRACER_ON(grpc_polling_trace)) {
1061 gpr_log(GPR_ERROR, " .. kicked %p", root_worker);
Craig Tiller75aef7f2017-05-26 08:26:08 -07001062 }
Craig Tiller55624a32017-05-26 08:14:44 -07001063 SET_KICK_STATE(root_worker, KICKED);
yang-gdf92a642017-08-21 22:38:45 -07001064 ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1065 goto done;
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -07001066 } else if (next_worker->state == UNKICKED) {
Craig Tiller0ff222a2017-09-01 09:41:43 -07001067 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV(exec_ctx);
Craig Tiller75aef7f2017-05-26 08:26:08 -07001068 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001069 gpr_log(GPR_ERROR, " .. kicked %p", next_worker);
Craig Tiller75aef7f2017-05-26 08:26:08 -07001070 }
Craig Tiller8502ecb2017-04-28 14:22:01 -07001071 GPR_ASSERT(next_worker->initialized_cv);
Craig Tiller55624a32017-05-26 08:14:44 -07001072 SET_KICK_STATE(next_worker, KICKED);
Craig Tiller375eb252017-04-27 23:29:12 +00001073 gpr_cv_signal(&next_worker->cv);
yang-gdf92a642017-08-21 22:38:45 -07001074 goto done;
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -07001075 } else if (next_worker->state == DESIGNATED_POLLER) {
1076 if (root_worker->state != DESIGNATED_POLLER) {
Craig Tiller75aef7f2017-05-26 08:26:08 -07001077 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001078 gpr_log(
1079 GPR_ERROR,
1080 " .. kicked root non-poller %p (initialized_cv=%d) (poller=%p)",
1081 root_worker, root_worker->initialized_cv, next_worker);
Craig Tiller75aef7f2017-05-26 08:26:08 -07001082 }
Craig Tiller55624a32017-05-26 08:14:44 -07001083 SET_KICK_STATE(root_worker, KICKED);
1084 if (root_worker->initialized_cv) {
Craig Tiller0ff222a2017-09-01 09:41:43 -07001085 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV(exec_ctx);
Craig Tiller55624a32017-05-26 08:14:44 -07001086 gpr_cv_signal(&root_worker->cv);
1087 }
yang-gdf92a642017-08-21 22:38:45 -07001088 goto done;
Craig Tiller55624a32017-05-26 08:14:44 -07001089 } else {
Craig Tiller0ff222a2017-09-01 09:41:43 -07001090 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD(exec_ctx);
Craig Tiller75aef7f2017-05-26 08:26:08 -07001091 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001092 gpr_log(GPR_ERROR, " .. non-root poller %p (root=%p)", next_worker,
Craig Tiller75aef7f2017-05-26 08:26:08 -07001093 root_worker);
1094 }
Craig Tiller55624a32017-05-26 08:14:44 -07001095 SET_KICK_STATE(next_worker, KICKED);
yang-gdf92a642017-08-21 22:38:45 -07001096 ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1097 goto done;
Craig Tiller55624a32017-05-26 08:14:44 -07001098 }
Craig Tiller8502ecb2017-04-28 14:22:01 -07001099 } else {
Craig Tiller0ff222a2017-09-01 09:41:43 -07001100 GRPC_STATS_INC_POLLSET_KICKED_AGAIN(exec_ctx);
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -07001101 GPR_ASSERT(next_worker->state == KICKED);
Craig Tiller55624a32017-05-26 08:14:44 -07001102 SET_KICK_STATE(next_worker, KICKED);
yang-gdf92a642017-08-21 22:38:45 -07001103 goto done;
Craig Tiller4509c472017-04-27 19:05:13 +00001104 }
1105 } else {
Craig Tiller1a012bb2017-09-13 14:29:00 -07001106 GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD(exec_ctx);
Craig Tiller830e82a2017-05-31 16:26:27 -07001107 if (GRPC_TRACER_ON(grpc_polling_trace)) {
1108 gpr_log(GPR_ERROR, " .. kicked while waking up");
1109 }
yang-gdf92a642017-08-21 22:38:45 -07001110 goto done;
Craig Tiller4509c472017-04-27 19:05:13 +00001111 }
Sree Kuchibhotlafb349402017-09-06 10:58:06 -07001112
1113 GPR_UNREACHABLE_CODE(goto done);
1114 }
1115
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -07001116 if (specific_worker->state == KICKED) {
Craig Tiller75aef7f2017-05-26 08:26:08 -07001117 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001118 gpr_log(GPR_ERROR, " .. specific worker already kicked");
Craig Tiller75aef7f2017-05-26 08:26:08 -07001119 }
yang-gdf92a642017-08-21 22:38:45 -07001120 goto done;
Craig Tiller4509c472017-04-27 19:05:13 +00001121 } else if (gpr_tls_get(&g_current_thread_worker) ==
1122 (intptr_t)specific_worker) {
Craig Tiller0ff222a2017-09-01 09:41:43 -07001123 GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD(exec_ctx);
Craig Tiller75aef7f2017-05-26 08:26:08 -07001124 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001125 gpr_log(GPR_ERROR, " .. mark %p kicked", specific_worker);
Craig Tiller75aef7f2017-05-26 08:26:08 -07001126 }
Craig Tiller55624a32017-05-26 08:14:44 -07001127 SET_KICK_STATE(specific_worker, KICKED);
yang-gdf92a642017-08-21 22:38:45 -07001128 goto done;
Craig Tiller32f90ee2017-04-28 12:46:41 -07001129 } else if (specific_worker ==
1130 (grpc_pollset_worker *)gpr_atm_no_barrier_load(&g_active_poller)) {
Craig Tiller0ff222a2017-09-01 09:41:43 -07001131 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD(exec_ctx);
Craig Tiller75aef7f2017-05-26 08:26:08 -07001132 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001133 gpr_log(GPR_ERROR, " .. kick active poller");
Craig Tiller75aef7f2017-05-26 08:26:08 -07001134 }
Craig Tiller55624a32017-05-26 08:14:44 -07001135 SET_KICK_STATE(specific_worker, KICKED);
yang-gdf92a642017-08-21 22:38:45 -07001136 ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1137 goto done;
Craig Tiller8502ecb2017-04-28 14:22:01 -07001138 } else if (specific_worker->initialized_cv) {
Craig Tiller0ff222a2017-09-01 09:41:43 -07001139 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV(exec_ctx);
Craig Tiller75aef7f2017-05-26 08:26:08 -07001140 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001141 gpr_log(GPR_ERROR, " .. kick waiting worker");
Craig Tiller75aef7f2017-05-26 08:26:08 -07001142 }
Craig Tiller55624a32017-05-26 08:14:44 -07001143 SET_KICK_STATE(specific_worker, KICKED);
Craig Tiller4509c472017-04-27 19:05:13 +00001144 gpr_cv_signal(&specific_worker->cv);
yang-gdf92a642017-08-21 22:38:45 -07001145 goto done;
Craig Tiller8502ecb2017-04-28 14:22:01 -07001146 } else {
Craig Tiller0ff222a2017-09-01 09:41:43 -07001147 GRPC_STATS_INC_POLLSET_KICKED_AGAIN(exec_ctx);
Craig Tiller75aef7f2017-05-26 08:26:08 -07001148 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001149 gpr_log(GPR_ERROR, " .. kick non-waiting worker");
Craig Tiller75aef7f2017-05-26 08:26:08 -07001150 }
Craig Tiller55624a32017-05-26 08:14:44 -07001151 SET_KICK_STATE(specific_worker, KICKED);
yang-gdf92a642017-08-21 22:38:45 -07001152 goto done;
Craig Tiller4509c472017-04-27 19:05:13 +00001153 }
yang-gdf92a642017-08-21 22:38:45 -07001154done:
1155 GPR_TIMER_END("pollset_kick", 0);
1156 return ret_err;
Craig Tiller4509c472017-04-27 19:05:13 +00001157}
1158
1159static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1160 grpc_fd *fd) {}
1161
Craig Tiller4509c472017-04-27 19:05:13 +00001162/*******************************************************************************
Craig Tillerc67cc992017-04-27 10:15:51 -07001163 * Pollset-set Definitions
1164 */
1165
1166static grpc_pollset_set *pollset_set_create(void) {
1167 return (grpc_pollset_set *)((intptr_t)0xdeafbeef);
1168}
1169
1170static void pollset_set_destroy(grpc_exec_ctx *exec_ctx,
1171 grpc_pollset_set *pss) {}
1172
1173static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
1174 grpc_fd *fd) {}
1175
1176static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
1177 grpc_fd *fd) {}
1178
1179static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
1180 grpc_pollset_set *pss, grpc_pollset *ps) {}
1181
1182static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
1183 grpc_pollset_set *pss, grpc_pollset *ps) {}
1184
1185static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
1186 grpc_pollset_set *bag,
1187 grpc_pollset_set *item) {}
1188
1189static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
1190 grpc_pollset_set *bag,
1191 grpc_pollset_set *item) {}
1192
1193/*******************************************************************************
1194 * Event engine binding
1195 */
1196
1197static void shutdown_engine(void) {
1198 fd_global_shutdown();
1199 pollset_global_shutdown();
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -07001200 epoll_set_shutdown();
Craig Tillerc67cc992017-04-27 10:15:51 -07001201}
1202
1203static const grpc_event_engine_vtable vtable = {
Yash Tibrewal533d1182017-09-18 10:48:22 -07001204 sizeof(grpc_pollset),
Craig Tillerc67cc992017-04-27 10:15:51 -07001205
Yash Tibrewal533d1182017-09-18 10:48:22 -07001206 fd_create,
1207 fd_wrapped_fd,
1208 fd_orphan,
1209 fd_shutdown,
1210 fd_notify_on_read,
1211 fd_notify_on_write,
1212 fd_is_shutdown,
1213 fd_get_read_notifier_pollset,
Craig Tillerc67cc992017-04-27 10:15:51 -07001214
Yash Tibrewal533d1182017-09-18 10:48:22 -07001215 pollset_init,
1216 pollset_shutdown,
1217 pollset_destroy,
1218 pollset_work,
1219 pollset_kick,
1220 pollset_add_fd,
Craig Tillerc67cc992017-04-27 10:15:51 -07001221
Yash Tibrewal533d1182017-09-18 10:48:22 -07001222 pollset_set_create,
1223 pollset_set_destroy,
1224 pollset_set_add_pollset,
1225 pollset_set_del_pollset,
1226 pollset_set_add_pollset_set,
1227 pollset_set_del_pollset_set,
1228 pollset_set_add_fd,
1229 pollset_set_del_fd,
Craig Tillerc67cc992017-04-27 10:15:51 -07001230
Yash Tibrewal533d1182017-09-18 10:48:22 -07001231 shutdown_engine,
Craig Tillerc67cc992017-04-27 10:15:51 -07001232};
1233
1234/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -07001235 * Create epoll_fd (epoll_set_init() takes care of that) to make sure epoll
1236 * support is available */
Craig Tiller6f0af492017-04-27 19:26:16 +00001237const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) {
Craig Tillerc67cc992017-04-27 10:15:51 -07001238 if (!grpc_has_wakeup_fd()) {
1239 return NULL;
1240 }
1241
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -07001242 if (!epoll_set_init()) {
Craig Tillerc67cc992017-04-27 10:15:51 -07001243 return NULL;
1244 }
1245
Craig Tillerc67cc992017-04-27 10:15:51 -07001246 fd_global_init();
1247
1248 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
Craig Tiller4509c472017-04-27 19:05:13 +00001249 fd_global_shutdown();
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -07001250 epoll_set_shutdown();
Craig Tillerc67cc992017-04-27 10:15:51 -07001251 return NULL;
1252 }
1253
1254 return &vtable;
1255}
1256
1257#else /* defined(GRPC_LINUX_EPOLL) */
1258#if defined(GRPC_POSIX_SOCKET)
Yash Tibrewal1cac2232017-09-26 11:31:11 -07001259#include "src/core/lib/iomgr/ev_epoll1_linux.h"
Craig Tillerc67cc992017-04-27 10:15:51 -07001260/* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
1261 * NULL */
Craig Tiller9ddb3152017-04-27 21:32:56 +00001262const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) {
1263 return NULL;
1264}
Craig Tillerc67cc992017-04-27 10:15:51 -07001265#endif /* defined(GRPC_POSIX_SOCKET) */
1266#endif /* !defined(GRPC_LINUX_EPOLL) */