blob: 4efd705fa8113e753e5e5e39462715659d696323 [file] [log] [blame]
Craig Tillerc67cc992017-04-27 10:15:51 -07001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2017 gRPC authors.
Craig Tillerc67cc992017-04-27 10:15:51 -07004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
Craig Tillerc67cc992017-04-27 10:15:51 -07008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
Craig Tillerc67cc992017-04-27 10:15:51 -070010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
Craig Tillerc67cc992017-04-27 10:15:51 -070016 *
17 */
18
19#include "src/core/lib/iomgr/port.h"
20
21/* This polling engine is only relevant on linux kernels supporting epoll() */
22#ifdef GRPC_LINUX_EPOLL
23
Craig Tiller4509c472017-04-27 19:05:13 +000024#include "src/core/lib/iomgr/ev_epoll1_linux.h"
Craig Tillerc67cc992017-04-27 10:15:51 -070025
26#include <assert.h>
27#include <errno.h>
28#include <poll.h>
29#include <pthread.h>
30#include <string.h>
31#include <sys/epoll.h>
32#include <sys/socket.h>
33#include <unistd.h>
34
35#include <grpc/support/alloc.h>
Craig Tiller6de05932017-04-28 09:17:38 -070036#include <grpc/support/cpu.h>
Craig Tillerc67cc992017-04-27 10:15:51 -070037#include <grpc/support/log.h>
38#include <grpc/support/string_util.h>
39#include <grpc/support/tls.h>
40#include <grpc/support/useful.h>
41
Craig Tillerb4bb1cd2017-07-20 14:18:17 -070042#include "src/core/lib/debug/stats.h"
Craig Tillerc67cc992017-04-27 10:15:51 -070043#include "src/core/lib/iomgr/ev_posix.h"
44#include "src/core/lib/iomgr/iomgr_internal.h"
45#include "src/core/lib/iomgr/lockfree_event.h"
Craig Tillerc67cc992017-04-27 10:15:51 -070046#include "src/core/lib/iomgr/wakeup_fd_posix.h"
Craig Tillerc67cc992017-04-27 10:15:51 -070047#include "src/core/lib/profiling/timers.h"
48#include "src/core/lib/support/block_annotate.h"
Craig Tillerb89bac02017-05-26 15:20:32 +000049#include "src/core/lib/support/string.h"
Craig Tillerc67cc992017-04-27 10:15:51 -070050
Craig Tillerc67cc992017-04-27 10:15:51 -070051static grpc_wakeup_fd global_wakeup_fd;
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -070052
53/*******************************************************************************
54 * Singleton epoll set related fields
55 */
56
57#define MAX_EPOLL_EVENTS 100
Sree Kuchibhotla19614522017-08-25 17:10:10 -070058#define MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION 1
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -070059
Sree Kuchibhotlae01940f2017-08-27 18:10:12 -070060/* NOTE ON SYNCHRONIZATION:
61 * - Fields in this struct are only modified by the designated poller. Hence
62 * there is no need for any locks to protect the struct.
63 * - num_events and cursor fields have to be of atomic type to provide memory
64 * visibility guarantees only. i.e In case of multiple pollers, the designated
65 * polling thread keeps changing; the thread that wrote these values may be
66 * different from the thread reading the values
67 */
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -070068typedef struct epoll_set {
69 int epfd;
70
71 /* The epoll_events after the last call to epoll_wait() */
72 struct epoll_event events[MAX_EPOLL_EVENTS];
73
74 /* The number of epoll_events after the last call to epoll_wait() */
Sree Kuchibhotlaa92a9cc2017-08-27 14:02:15 -070075 gpr_atm num_events;
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -070076
77 /* Index of the first event in epoll_events that has to be processed. This
78 * field is only valid if num_events > 0 */
Sree Kuchibhotlaa92a9cc2017-08-27 14:02:15 -070079 gpr_atm cursor;
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -070080} epoll_set;
81
82/* The global singleton epoll set */
83static epoll_set g_epoll_set;
84
85/* Must be called *only* once */
86static bool epoll_set_init() {
87 g_epoll_set.epfd = epoll_create1(EPOLL_CLOEXEC);
88 if (g_epoll_set.epfd < 0) {
89 gpr_log(GPR_ERROR, "epoll unavailable");
90 return false;
91 }
92
Sree Kuchibhotlaa92a9cc2017-08-27 14:02:15 -070093 gpr_log(GPR_INFO, "grpc epoll fd: %d", g_epoll_set.epfd);
94 gpr_atm_no_barrier_store(&g_epoll_set.num_events, 0);
95 gpr_atm_no_barrier_store(&g_epoll_set.cursor, 0);
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -070096 return true;
97}
98
99/* epoll_set_init() MUST be called before calling this. */
100static void epoll_set_shutdown() {
101 if (g_epoll_set.epfd >= 0) {
102 close(g_epoll_set.epfd);
103 g_epoll_set.epfd = -1;
104 }
105}
Craig Tillerc67cc992017-04-27 10:15:51 -0700106
107/*******************************************************************************
108 * Fd Declarations
109 */
110
111struct grpc_fd {
112 int fd;
113
Craig Tillerc67cc992017-04-27 10:15:51 -0700114 gpr_atm read_closure;
115 gpr_atm write_closure;
116
117 struct grpc_fd *freelist_next;
Craig Tillerc67cc992017-04-27 10:15:51 -0700118
119 /* The pollset that last noticed that the fd is readable. The actual type
120 * stored in this is (grpc_pollset *) */
121 gpr_atm read_notifier_pollset;
122
123 grpc_iomgr_object iomgr_object;
124};
125
126static void fd_global_init(void);
127static void fd_global_shutdown(void);
128
129/*******************************************************************************
130 * Pollset Declarations
131 */
132
Craig Tiller43bf2592017-04-28 23:21:01 +0000133typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state;
Craig Tillerc67cc992017-04-27 10:15:51 -0700134
Craig Tiller830e82a2017-05-31 16:26:27 -0700135static const char *kick_state_string(kick_state st) {
136 switch (st) {
137 case UNKICKED:
138 return "UNKICKED";
139 case KICKED:
140 return "KICKED";
141 case DESIGNATED_POLLER:
142 return "DESIGNATED_POLLER";
143 }
144 GPR_UNREACHABLE_CODE(return "UNKNOWN");
145}
146
Craig Tillerc67cc992017-04-27 10:15:51 -0700147struct grpc_pollset_worker {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700148 kick_state kick_state;
Craig Tiller55624a32017-05-26 08:14:44 -0700149 int kick_state_mutator; // which line of code last changed kick state
Craig Tillerc67cc992017-04-27 10:15:51 -0700150 bool initialized_cv;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700151 grpc_pollset_worker *next;
152 grpc_pollset_worker *prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700153 gpr_cv cv;
Craig Tiller50da5ec2017-05-01 13:51:14 -0700154 grpc_closure_list schedule_on_end_work;
Craig Tillerc67cc992017-04-27 10:15:51 -0700155};
156
Craig Tiller55624a32017-05-26 08:14:44 -0700157#define SET_KICK_STATE(worker, state) \
158 do { \
159 (worker)->kick_state = (state); \
160 (worker)->kick_state_mutator = __LINE__; \
161 } while (false)
162
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700163#define MAX_NEIGHBORHOODS 1024
Craig Tillerba550da2017-05-01 14:26:31 +0000164
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700165typedef struct pollset_neighborhood {
Craig Tiller6de05932017-04-28 09:17:38 -0700166 gpr_mu mu;
167 grpc_pollset *active_root;
Craig Tiller6de05932017-04-28 09:17:38 -0700168 char pad[GPR_CACHELINE_SIZE];
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700169} pollset_neighborhood;
Craig Tiller6de05932017-04-28 09:17:38 -0700170
Craig Tillerc67cc992017-04-27 10:15:51 -0700171struct grpc_pollset {
Craig Tiller6de05932017-04-28 09:17:38 -0700172 gpr_mu mu;
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700173 pollset_neighborhood *neighborhood;
174 bool reassigning_neighborhood;
Craig Tiller4509c472017-04-27 19:05:13 +0000175 grpc_pollset_worker *root_worker;
176 bool kicked_without_poller;
Sree Kuchibhotla0d8431a2017-07-18 16:21:54 -0700177
178 /* Set to true if the pollset is observed to have no workers available to
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700179 poll */
Craig Tiller6de05932017-04-28 09:17:38 -0700180 bool seen_inactive;
Sree Kuchibhotla0d8431a2017-07-18 16:21:54 -0700181 bool shutting_down; /* Is the pollset shutting down ? */
Craig Tiller4509c472017-04-27 19:05:13 +0000182 grpc_closure *shutdown_closure; /* Called after after shutdown is complete */
Sree Kuchibhotla0d8431a2017-07-18 16:21:54 -0700183
184 /* Number of workers who are *about-to* attach themselves to the pollset
185 * worker list */
Craig Tillerba550da2017-05-01 14:26:31 +0000186 int begin_refs;
Craig Tiller6de05932017-04-28 09:17:38 -0700187
188 grpc_pollset *next;
189 grpc_pollset *prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700190};
191
192/*******************************************************************************
193 * Pollset-set Declarations
194 */
Craig Tiller6de05932017-04-28 09:17:38 -0700195
Craig Tiller61f96c12017-05-12 13:36:39 -0700196struct grpc_pollset_set {
197 char unused;
198};
Craig Tillerc67cc992017-04-27 10:15:51 -0700199
200/*******************************************************************************
201 * Common helpers
202 */
203
204static bool append_error(grpc_error **composite, grpc_error *error,
205 const char *desc) {
206 if (error == GRPC_ERROR_NONE) return true;
207 if (*composite == GRPC_ERROR_NONE) {
208 *composite = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc);
209 }
210 *composite = grpc_error_add_child(*composite, error);
211 return false;
212}
213
214/*******************************************************************************
215 * Fd Definitions
216 */
217
218/* We need to keep a freelist not because of any concerns of malloc performance
219 * but instead so that implementations with multiple threads in (for example)
220 * epoll_wait deal with the race between pollset removal and incoming poll
221 * notifications.
222 *
223 * The problem is that the poller ultimately holds a reference to this
224 * object, so it is very difficult to know when is safe to free it, at least
225 * without some expensive synchronization.
226 *
227 * If we keep the object freelisted, in the worst case losing this race just
228 * becomes a spurious read notification on a reused fd.
229 */
230
231/* The alarm system needs to be able to wakeup 'some poller' sometimes
232 * (specifically when a new alarm needs to be triggered earlier than the next
233 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
234 * case occurs. */
235
236static grpc_fd *fd_freelist = NULL;
237static gpr_mu fd_freelist_mu;
238
Craig Tillerc67cc992017-04-27 10:15:51 -0700239static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
240
241static void fd_global_shutdown(void) {
242 gpr_mu_lock(&fd_freelist_mu);
243 gpr_mu_unlock(&fd_freelist_mu);
244 while (fd_freelist != NULL) {
245 grpc_fd *fd = fd_freelist;
246 fd_freelist = fd_freelist->freelist_next;
Craig Tillerc67cc992017-04-27 10:15:51 -0700247 gpr_free(fd);
248 }
249 gpr_mu_destroy(&fd_freelist_mu);
250}
251
252static grpc_fd *fd_create(int fd, const char *name) {
253 grpc_fd *new_fd = NULL;
254
255 gpr_mu_lock(&fd_freelist_mu);
256 if (fd_freelist != NULL) {
257 new_fd = fd_freelist;
258 fd_freelist = fd_freelist->freelist_next;
259 }
260 gpr_mu_unlock(&fd_freelist_mu);
261
262 if (new_fd == NULL) {
Yash Tibrewal7cdd99c2017-09-08 16:04:12 -0700263 new_fd = (grpc_fd *)gpr_malloc(sizeof(grpc_fd));
Craig Tillerc67cc992017-04-27 10:15:51 -0700264 }
265
Craig Tillerc67cc992017-04-27 10:15:51 -0700266 new_fd->fd = fd;
Craig Tillerc67cc992017-04-27 10:15:51 -0700267 grpc_lfev_init(&new_fd->read_closure);
268 grpc_lfev_init(&new_fd->write_closure);
269 gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
270
271 new_fd->freelist_next = NULL;
Craig Tillerc67cc992017-04-27 10:15:51 -0700272
273 char *fd_name;
274 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
275 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
Noah Eisen264879f2017-06-20 17:14:47 -0700276#ifndef NDEBUG
277 if (GRPC_TRACER_ON(grpc_trace_fd_refcount)) {
278 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, new_fd, fd_name);
279 }
Craig Tillerc67cc992017-04-27 10:15:51 -0700280#endif
281 gpr_free(fd_name);
Craig Tiller9ddb3152017-04-27 21:32:56 +0000282
283 struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET),
284 .data.ptr = new_fd};
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700285 if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
Craig Tiller9ddb3152017-04-27 21:32:56 +0000286 gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno));
287 }
288
Craig Tillerc67cc992017-04-27 10:15:51 -0700289 return new_fd;
290}
291
Craig Tiller4509c472017-04-27 19:05:13 +0000292static int fd_wrapped_fd(grpc_fd *fd) { return fd->fd; }
Craig Tillerc67cc992017-04-27 10:15:51 -0700293
Sree Kuchibhotlaf2641472017-08-02 23:46:40 -0700294/* if 'releasing_fd' is true, it means that we are going to detach the internal
295 * fd from grpc_fd structure (i.e which means we should not be calling
296 * shutdown() syscall on that fd) */
297static void fd_shutdown_internal(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
298 grpc_error *why, bool releasing_fd) {
Craig Tiller9ddb3152017-04-27 21:32:56 +0000299 if (grpc_lfev_set_shutdown(exec_ctx, &fd->read_closure,
300 GRPC_ERROR_REF(why))) {
Sree Kuchibhotlaf2641472017-08-02 23:46:40 -0700301 if (!releasing_fd) {
302 shutdown(fd->fd, SHUT_RDWR);
303 }
Craig Tiller9ddb3152017-04-27 21:32:56 +0000304 grpc_lfev_set_shutdown(exec_ctx, &fd->write_closure, GRPC_ERROR_REF(why));
305 }
306 GRPC_ERROR_UNREF(why);
307}
308
Sree Kuchibhotlaf2641472017-08-02 23:46:40 -0700309/* Might be called multiple times */
310static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
311 fd_shutdown_internal(exec_ctx, fd, why, false);
312}
313
Craig Tillerc67cc992017-04-27 10:15:51 -0700314static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
315 grpc_closure *on_done, int *release_fd,
Yuchen Zengd40a7ae2017-07-12 15:59:56 -0700316 bool already_closed, const char *reason) {
Craig Tillerc67cc992017-04-27 10:15:51 -0700317 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotlaf2641472017-08-02 23:46:40 -0700318 bool is_release_fd = (release_fd != NULL);
Craig Tillerc67cc992017-04-27 10:15:51 -0700319
Craig Tiller9ddb3152017-04-27 21:32:56 +0000320 if (!grpc_lfev_is_shutdown(&fd->read_closure)) {
Sree Kuchibhotlaf2641472017-08-02 23:46:40 -0700321 fd_shutdown_internal(exec_ctx, fd,
322 GRPC_ERROR_CREATE_FROM_COPIED_STRING(reason),
323 is_release_fd);
Craig Tiller9ddb3152017-04-27 21:32:56 +0000324 }
325
Craig Tillerc67cc992017-04-27 10:15:51 -0700326 /* If release_fd is not NULL, we should be relinquishing control of the file
327 descriptor fd->fd (but we still own the grpc_fd structure). */
Sree Kuchibhotlaf2641472017-08-02 23:46:40 -0700328 if (is_release_fd) {
Craig Tillerc67cc992017-04-27 10:15:51 -0700329 *release_fd = fd->fd;
Yuchen Zengd40a7ae2017-07-12 15:59:56 -0700330 } else if (!already_closed) {
Craig Tillerc67cc992017-04-27 10:15:51 -0700331 close(fd->fd);
Craig Tillerc67cc992017-04-27 10:15:51 -0700332 }
333
ncteisen274bbbe2017-06-08 14:57:11 -0700334 GRPC_CLOSURE_SCHED(exec_ctx, on_done, GRPC_ERROR_REF(error));
Craig Tillerc67cc992017-04-27 10:15:51 -0700335
Craig Tiller4509c472017-04-27 19:05:13 +0000336 grpc_iomgr_unregister_object(&fd->iomgr_object);
337 grpc_lfev_destroy(&fd->read_closure);
338 grpc_lfev_destroy(&fd->write_closure);
Craig Tillerc67cc992017-04-27 10:15:51 -0700339
Craig Tiller4509c472017-04-27 19:05:13 +0000340 gpr_mu_lock(&fd_freelist_mu);
341 fd->freelist_next = fd_freelist;
342 fd_freelist = fd;
343 gpr_mu_unlock(&fd_freelist_mu);
Craig Tillerc67cc992017-04-27 10:15:51 -0700344}
345
346static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
347 grpc_fd *fd) {
348 gpr_atm notifier = gpr_atm_acq_load(&fd->read_notifier_pollset);
349 return (grpc_pollset *)notifier;
350}
351
352static bool fd_is_shutdown(grpc_fd *fd) {
353 return grpc_lfev_is_shutdown(&fd->read_closure);
354}
355
Craig Tillerc67cc992017-04-27 10:15:51 -0700356static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
357 grpc_closure *closure) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700358 grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read");
Craig Tillerc67cc992017-04-27 10:15:51 -0700359}
360
361static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
362 grpc_closure *closure) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700363 grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write");
Craig Tillerc67cc992017-04-27 10:15:51 -0700364}
365
Craig Tiller4509c472017-04-27 19:05:13 +0000366static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
367 grpc_pollset *notifier) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700368 grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read");
Craig Tiller4509c472017-04-27 19:05:13 +0000369 /* Use release store to match with acquire load in fd_get_read_notifier */
370 gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
371}
372
373static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700374 grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write");
Craig Tillerc67cc992017-04-27 10:15:51 -0700375}
376
377/*******************************************************************************
378 * Pollset Definitions
379 */
380
Craig Tiller6de05932017-04-28 09:17:38 -0700381GPR_TLS_DECL(g_current_thread_pollset);
382GPR_TLS_DECL(g_current_thread_worker);
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700383
384/* The designated poller */
Craig Tiller6de05932017-04-28 09:17:38 -0700385static gpr_atm g_active_poller;
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700386
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700387static pollset_neighborhood *g_neighborhoods;
388static size_t g_num_neighborhoods;
Craig Tiller6de05932017-04-28 09:17:38 -0700389
Craig Tillerc67cc992017-04-27 10:15:51 -0700390/* Return true if first in list */
Craig Tiller32f90ee2017-04-28 12:46:41 -0700391static bool worker_insert(grpc_pollset *pollset, grpc_pollset_worker *worker) {
392 if (pollset->root_worker == NULL) {
393 pollset->root_worker = worker;
394 worker->next = worker->prev = worker;
Craig Tillerc67cc992017-04-27 10:15:51 -0700395 return true;
396 } else {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700397 worker->next = pollset->root_worker;
398 worker->prev = worker->next->prev;
399 worker->next->prev = worker;
400 worker->prev->next = worker;
Craig Tillerc67cc992017-04-27 10:15:51 -0700401 return false;
402 }
403}
404
405/* Return true if last in list */
406typedef enum { EMPTIED, NEW_ROOT, REMOVED } worker_remove_result;
407
Craig Tiller32f90ee2017-04-28 12:46:41 -0700408static worker_remove_result worker_remove(grpc_pollset *pollset,
Craig Tillerc67cc992017-04-27 10:15:51 -0700409 grpc_pollset_worker *worker) {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700410 if (worker == pollset->root_worker) {
411 if (worker == worker->next) {
412 pollset->root_worker = NULL;
Craig Tillerc67cc992017-04-27 10:15:51 -0700413 return EMPTIED;
414 } else {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700415 pollset->root_worker = worker->next;
416 worker->prev->next = worker->next;
417 worker->next->prev = worker->prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700418 return NEW_ROOT;
419 }
420 } else {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700421 worker->prev->next = worker->next;
422 worker->next->prev = worker->prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700423 return REMOVED;
424 }
425}
426
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700427static size_t choose_neighborhood(void) {
428 return (size_t)gpr_cpu_current_cpu() % g_num_neighborhoods;
Craig Tillerba550da2017-05-01 14:26:31 +0000429}
430
Craig Tiller4509c472017-04-27 19:05:13 +0000431static grpc_error *pollset_global_init(void) {
Craig Tiller4509c472017-04-27 19:05:13 +0000432 gpr_tls_init(&g_current_thread_pollset);
433 gpr_tls_init(&g_current_thread_worker);
Craig Tiller6de05932017-04-28 09:17:38 -0700434 gpr_atm_no_barrier_store(&g_active_poller, 0);
Craig Tiller375eb252017-04-27 23:29:12 +0000435 global_wakeup_fd.read_fd = -1;
436 grpc_error *err = grpc_wakeup_fd_init(&global_wakeup_fd);
437 if (err != GRPC_ERROR_NONE) return err;
Craig Tiller4509c472017-04-27 19:05:13 +0000438 struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLET),
439 .data.ptr = &global_wakeup_fd};
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700440 if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd,
441 &ev) != 0) {
Craig Tiller4509c472017-04-27 19:05:13 +0000442 return GRPC_OS_ERROR(errno, "epoll_ctl");
443 }
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700444 g_num_neighborhoods = GPR_CLAMP(gpr_cpu_num_cores(), 1, MAX_NEIGHBORHOODS);
445 g_neighborhoods = (pollset_neighborhood *)gpr_zalloc(
446 sizeof(*g_neighborhoods) * g_num_neighborhoods);
447 for (size_t i = 0; i < g_num_neighborhoods; i++) {
448 gpr_mu_init(&g_neighborhoods[i].mu);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700449 }
Craig Tiller4509c472017-04-27 19:05:13 +0000450 return GRPC_ERROR_NONE;
451}
452
453static void pollset_global_shutdown(void) {
Craig Tiller4509c472017-04-27 19:05:13 +0000454 gpr_tls_destroy(&g_current_thread_pollset);
455 gpr_tls_destroy(&g_current_thread_worker);
Craig Tiller375eb252017-04-27 23:29:12 +0000456 if (global_wakeup_fd.read_fd != -1) grpc_wakeup_fd_destroy(&global_wakeup_fd);
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700457 for (size_t i = 0; i < g_num_neighborhoods; i++) {
458 gpr_mu_destroy(&g_neighborhoods[i].mu);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700459 }
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700460 gpr_free(g_neighborhoods);
Craig Tiller4509c472017-04-27 19:05:13 +0000461}
462
463static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
Craig Tiller6de05932017-04-28 09:17:38 -0700464 gpr_mu_init(&pollset->mu);
465 *mu = &pollset->mu;
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700466 pollset->neighborhood = &g_neighborhoods[choose_neighborhood()];
467 pollset->reassigning_neighborhood = false;
Sree Kuchibhotla30882302017-08-16 13:46:52 -0700468 pollset->root_worker = NULL;
469 pollset->kicked_without_poller = false;
Craig Tiller6de05932017-04-28 09:17:38 -0700470 pollset->seen_inactive = true;
Sree Kuchibhotla30882302017-08-16 13:46:52 -0700471 pollset->shutting_down = false;
472 pollset->shutdown_closure = NULL;
473 pollset->begin_refs = 0;
474 pollset->next = pollset->prev = NULL;
Craig Tiller6de05932017-04-28 09:17:38 -0700475}
476
Craig Tillerc6109852017-05-01 14:26:49 -0700477static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
Craig Tillere00d7332017-05-01 15:43:51 +0000478 gpr_mu_lock(&pollset->mu);
Craig Tillerba550da2017-05-01 14:26:31 +0000479 if (!pollset->seen_inactive) {
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700480 pollset_neighborhood *neighborhood = pollset->neighborhood;
Craig Tillere00d7332017-05-01 15:43:51 +0000481 gpr_mu_unlock(&pollset->mu);
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700482 retry_lock_neighborhood:
483 gpr_mu_lock(&neighborhood->mu);
Craig Tillere00d7332017-05-01 15:43:51 +0000484 gpr_mu_lock(&pollset->mu);
485 if (!pollset->seen_inactive) {
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700486 if (pollset->neighborhood != neighborhood) {
487 gpr_mu_unlock(&neighborhood->mu);
488 neighborhood = pollset->neighborhood;
Craig Tillere00d7332017-05-01 15:43:51 +0000489 gpr_mu_unlock(&pollset->mu);
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700490 goto retry_lock_neighborhood;
Craig Tillere00d7332017-05-01 15:43:51 +0000491 }
492 pollset->prev->next = pollset->next;
493 pollset->next->prev = pollset->prev;
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700494 if (pollset == pollset->neighborhood->active_root) {
495 pollset->neighborhood->active_root =
Craig Tillere00d7332017-05-01 15:43:51 +0000496 pollset->next == pollset ? NULL : pollset->next;
497 }
Craig Tillerba550da2017-05-01 14:26:31 +0000498 }
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700499 gpr_mu_unlock(&pollset->neighborhood->mu);
Craig Tiller6de05932017-04-28 09:17:38 -0700500 }
Craig Tillere00d7332017-05-01 15:43:51 +0000501 gpr_mu_unlock(&pollset->mu);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700502 gpr_mu_destroy(&pollset->mu);
Craig Tiller4509c472017-04-27 19:05:13 +0000503}
504
505static grpc_error *pollset_kick_all(grpc_pollset *pollset) {
yang-gdf92a642017-08-21 22:38:45 -0700506 GPR_TIMER_BEGIN("pollset_kick_all", 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000507 grpc_error *error = GRPC_ERROR_NONE;
508 if (pollset->root_worker != NULL) {
509 grpc_pollset_worker *worker = pollset->root_worker;
510 do {
Craig Tiller55624a32017-05-26 08:14:44 -0700511 switch (worker->kick_state) {
512 case KICKED:
513 break;
514 case UNKICKED:
515 SET_KICK_STATE(worker, KICKED);
516 if (worker->initialized_cv) {
517 gpr_cv_signal(&worker->cv);
518 }
519 break;
520 case DESIGNATED_POLLER:
521 SET_KICK_STATE(worker, KICKED);
522 append_error(&error, grpc_wakeup_fd_wakeup(&global_wakeup_fd),
Sree Kuchibhotla0d8431a2017-07-18 16:21:54 -0700523 "pollset_kick_all");
Craig Tiller55624a32017-05-26 08:14:44 -0700524 break;
Craig Tiller4509c472017-04-27 19:05:13 +0000525 }
526
Craig Tiller32f90ee2017-04-28 12:46:41 -0700527 worker = worker->next;
Craig Tiller4509c472017-04-27 19:05:13 +0000528 } while (worker != pollset->root_worker);
529 }
Sree Kuchibhotla0d8431a2017-07-18 16:21:54 -0700530 // TODO: sreek. Check if we need to set 'kicked_without_poller' to true here
531 // in the else case
yang-gdf92a642017-08-21 22:38:45 -0700532 GPR_TIMER_END("pollset_kick_all", 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000533 return error;
534}
535
536static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx,
537 grpc_pollset *pollset) {
Craig Tillerba550da2017-05-01 14:26:31 +0000538 if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL &&
539 pollset->begin_refs == 0) {
yang-gdf92a642017-08-21 22:38:45 -0700540 GPR_TIMER_MARK("pollset_finish_shutdown", 0);
ncteisen274bbbe2017-06-08 14:57:11 -0700541 GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE);
Craig Tiller4509c472017-04-27 19:05:13 +0000542 pollset->shutdown_closure = NULL;
543 }
544}
545
546static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
547 grpc_closure *closure) {
yang-gdf92a642017-08-21 22:38:45 -0700548 GPR_TIMER_BEGIN("pollset_shutdown", 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000549 GPR_ASSERT(pollset->shutdown_closure == NULL);
Craig Tillerc81512a2017-05-26 09:53:58 -0700550 GPR_ASSERT(!pollset->shutting_down);
Craig Tiller4509c472017-04-27 19:05:13 +0000551 pollset->shutdown_closure = closure;
Craig Tillerc81512a2017-05-26 09:53:58 -0700552 pollset->shutting_down = true;
Craig Tiller4509c472017-04-27 19:05:13 +0000553 GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(pollset));
554 pollset_maybe_finish_shutdown(exec_ctx, pollset);
yang-gdf92a642017-08-21 22:38:45 -0700555 GPR_TIMER_END("pollset_shutdown", 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000556}
557
Craig Tiller4509c472017-04-27 19:05:13 +0000558static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
559 gpr_timespec now) {
560 gpr_timespec timeout;
561 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
562 return -1;
563 }
564
565 if (gpr_time_cmp(deadline, now) <= 0) {
566 return 0;
567 }
568
569 static const gpr_timespec round_up = {
570 .clock_type = GPR_TIMESPAN, .tv_sec = 0, .tv_nsec = GPR_NS_PER_MS - 1};
571 timeout = gpr_time_sub(deadline, now);
572 int millis = gpr_time_to_millis(gpr_time_add(timeout, round_up));
573 return millis >= 1 ? millis : 1;
574}
575
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700576/* Process the epoll events found by do_epoll_wait() function.
577 - g_epoll_set.cursor points to the index of the first event to be processed
578 - This function then processes up-to MAX_EPOLL_EVENTS_PER_ITERATION and
579 updates the g_epoll_set.cursor
Craig Tiller4509c472017-04-27 19:05:13 +0000580
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700581 NOTE ON SYNCRHONIZATION: Similar to do_epoll_wait(), this function is only
582 called by g_active_poller thread. So there is no need for synchronization
583 when accessing fields in g_epoll_set */
584static grpc_error *process_epoll_events(grpc_exec_ctx *exec_ctx,
585 grpc_pollset *pollset) {
586 static const char *err_desc = "process_events";
Craig Tiller4509c472017-04-27 19:05:13 +0000587 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700588
Sree Kuchibhotla3d609f12017-08-25 10:00:18 -0700589 GPR_TIMER_BEGIN("process_epoll_events", 0);
Sree Kuchibhotlaa92a9cc2017-08-27 14:02:15 -0700590 long num_events = gpr_atm_acq_load(&g_epoll_set.num_events);
591 long cursor = gpr_atm_acq_load(&g_epoll_set.cursor);
592 for (int idx = 0;
593 (idx < MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION) && cursor != num_events;
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700594 idx++) {
Sree Kuchibhotlaa92a9cc2017-08-27 14:02:15 -0700595 long c = cursor++;
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700596 struct epoll_event *ev = &g_epoll_set.events[c];
597 void *data_ptr = ev->data.ptr;
598
Craig Tiller4509c472017-04-27 19:05:13 +0000599 if (data_ptr == &global_wakeup_fd) {
Craig Tiller4509c472017-04-27 19:05:13 +0000600 append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
601 err_desc);
602 } else {
603 grpc_fd *fd = (grpc_fd *)(data_ptr);
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700604 bool cancel = (ev->events & (EPOLLERR | EPOLLHUP)) != 0;
605 bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0;
606 bool write_ev = (ev->events & EPOLLOUT) != 0;
607
Craig Tiller4509c472017-04-27 19:05:13 +0000608 if (read_ev || cancel) {
609 fd_become_readable(exec_ctx, fd, pollset);
610 }
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700611
Craig Tiller4509c472017-04-27 19:05:13 +0000612 if (write_ev || cancel) {
613 fd_become_writable(exec_ctx, fd);
614 }
615 }
616 }
Sree Kuchibhotlaa92a9cc2017-08-27 14:02:15 -0700617 gpr_atm_rel_store(&g_epoll_set.cursor, cursor);
Sree Kuchibhotla3d609f12017-08-25 10:00:18 -0700618 GPR_TIMER_END("process_epoll_events", 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000619 return error;
620}
621
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700622/* Do epoll_wait and store the events in g_epoll_set.events field. This does not
623 "process" any of the events yet; that is done in process_epoll_events().
624 *See process_epoll_events() function for more details.
625
626 NOTE ON SYNCHRONIZATION: At any point of time, only the g_active_poller
627 (i.e the designated poller thread) will be calling this function. So there is
628 no need for any synchronization when accesing fields in g_epoll_set */
629static grpc_error *do_epoll_wait(grpc_exec_ctx *exec_ctx, grpc_pollset *ps,
630 gpr_timespec now, gpr_timespec deadline) {
Sree Kuchibhotla3d609f12017-08-25 10:00:18 -0700631 GPR_TIMER_BEGIN("do_epoll_wait", 0);
632
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700633 int r;
634 int timeout = poll_deadline_to_millis_timeout(deadline, now);
635 if (timeout != 0) {
636 GRPC_SCHEDULING_START_BLOCKING_REGION;
637 }
638 do {
Craig Tillerb4bb1cd2017-07-20 14:18:17 -0700639 GRPC_STATS_INC_SYSCALL_POLL(exec_ctx);
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700640 r = epoll_wait(g_epoll_set.epfd, g_epoll_set.events, MAX_EPOLL_EVENTS,
641 timeout);
642 } while (r < 0 && errno == EINTR);
643 if (timeout != 0) {
644 GRPC_SCHEDULING_END_BLOCKING_REGION;
645 }
646
647 if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
648
649 if (GRPC_TRACER_ON(grpc_polling_trace)) {
650 gpr_log(GPR_DEBUG, "ps: %p poll got %d events", ps, r);
651 }
652
Sree Kuchibhotlaa92a9cc2017-08-27 14:02:15 -0700653 gpr_atm_rel_store(&g_epoll_set.num_events, r);
654 gpr_atm_rel_store(&g_epoll_set.cursor, 0);
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700655
Sree Kuchibhotla3d609f12017-08-25 10:00:18 -0700656 GPR_TIMER_END("do_epoll_wait", 0);
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700657 return GRPC_ERROR_NONE;
658}
659
Craig Tiller4509c472017-04-27 19:05:13 +0000660static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
661 grpc_pollset_worker **worker_hdl, gpr_timespec *now,
662 gpr_timespec deadline) {
yang-gdf92a642017-08-21 22:38:45 -0700663 GPR_TIMER_BEGIN("begin_worker", 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000664 if (worker_hdl != NULL) *worker_hdl = worker;
665 worker->initialized_cv = false;
Craig Tiller55624a32017-05-26 08:14:44 -0700666 SET_KICK_STATE(worker, UNKICKED);
Craig Tiller50da5ec2017-05-01 13:51:14 -0700667 worker->schedule_on_end_work = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
Craig Tillerba550da2017-05-01 14:26:31 +0000668 pollset->begin_refs++;
Craig Tiller4509c472017-04-27 19:05:13 +0000669
Craig Tiller830e82a2017-05-31 16:26:27 -0700670 if (GRPC_TRACER_ON(grpc_polling_trace)) {
671 gpr_log(GPR_ERROR, "PS:%p BEGIN_STARTS:%p", pollset, worker);
672 }
673
Craig Tiller32f90ee2017-04-28 12:46:41 -0700674 if (pollset->seen_inactive) {
675 // pollset has been observed to be inactive, we need to move back to the
676 // active list
Craig Tillere00d7332017-05-01 15:43:51 +0000677 bool is_reassigning = false;
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700678 if (!pollset->reassigning_neighborhood) {
Craig Tillere00d7332017-05-01 15:43:51 +0000679 is_reassigning = true;
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700680 pollset->reassigning_neighborhood = true;
681 pollset->neighborhood = &g_neighborhoods[choose_neighborhood()];
Craig Tillere00d7332017-05-01 15:43:51 +0000682 }
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700683 pollset_neighborhood *neighborhood = pollset->neighborhood;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700684 gpr_mu_unlock(&pollset->mu);
Craig Tillerba550da2017-05-01 14:26:31 +0000685 // pollset unlocked: state may change (even worker->kick_state)
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700686 retry_lock_neighborhood:
687 gpr_mu_lock(&neighborhood->mu);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700688 gpr_mu_lock(&pollset->mu);
Craig Tiller830e82a2017-05-31 16:26:27 -0700689 if (GRPC_TRACER_ON(grpc_polling_trace)) {
690 gpr_log(GPR_ERROR, "PS:%p BEGIN_REORG:%p kick_state=%s is_reassigning=%d",
691 pollset, worker, kick_state_string(worker->kick_state),
692 is_reassigning);
693 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700694 if (pollset->seen_inactive) {
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700695 if (neighborhood != pollset->neighborhood) {
696 gpr_mu_unlock(&neighborhood->mu);
697 neighborhood = pollset->neighborhood;
Craig Tiller2acab6e2017-04-30 23:06:33 +0000698 gpr_mu_unlock(&pollset->mu);
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700699 goto retry_lock_neighborhood;
Craig Tiller2acab6e2017-04-30 23:06:33 +0000700 }
Sree Kuchibhotla0d8431a2017-07-18 16:21:54 -0700701
Sree Kuchibhotlafb349402017-09-06 10:58:06 -0700702 /* In the brief time we released the pollset locks above, the worker MAY
703 have been kicked. In this case, the worker should get out of this
704 pollset ASAP and hence this should neither add the pollset to
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700705 neighborhood nor mark the pollset as active.
Sree Kuchibhotlafb349402017-09-06 10:58:06 -0700706
707 On a side note, the only way a worker's kick state could have changed
708 at this point is if it were "kicked specifically". Since the worker has
709 not added itself to the pollset yet (by calling worker_insert()), it is
710 not visible in the "kick any" path yet */
711 if (worker->kick_state == UNKICKED) {
712 pollset->seen_inactive = false;
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700713 if (neighborhood->active_root == NULL) {
714 neighborhood->active_root = pollset->next = pollset->prev = pollset;
Sree Kuchibhotlafb349402017-09-06 10:58:06 -0700715 /* Make this the designated poller if there isn't one already */
716 if (worker->kick_state == UNKICKED &&
717 gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)worker)) {
718 SET_KICK_STATE(worker, DESIGNATED_POLLER);
719 }
720 } else {
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700721 pollset->next = neighborhood->active_root;
Sree Kuchibhotlafb349402017-09-06 10:58:06 -0700722 pollset->prev = pollset->next->prev;
723 pollset->next->prev = pollset->prev->next = pollset;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700724 }
Craig Tiller4509c472017-04-27 19:05:13 +0000725 }
726 }
Craig Tillere00d7332017-05-01 15:43:51 +0000727 if (is_reassigning) {
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700728 GPR_ASSERT(pollset->reassigning_neighborhood);
729 pollset->reassigning_neighborhood = false;
Craig Tillere00d7332017-05-01 15:43:51 +0000730 }
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700731 gpr_mu_unlock(&neighborhood->mu);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700732 }
Sree Kuchibhotlae6506bc2017-07-18 21:43:45 -0700733
Craig Tiller32f90ee2017-04-28 12:46:41 -0700734 worker_insert(pollset, worker);
Craig Tillerba550da2017-05-01 14:26:31 +0000735 pollset->begin_refs--;
Sree Kuchibhotla949d0752017-07-20 23:49:15 -0700736 if (worker->kick_state == UNKICKED && !pollset->kicked_without_poller) {
Craig Tillera4b8eb02017-04-29 00:13:52 +0000737 GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700738 worker->initialized_cv = true;
739 gpr_cv_init(&worker->cv);
Craig Tillerc81512a2017-05-26 09:53:58 -0700740 while (worker->kick_state == UNKICKED && !pollset->shutting_down) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700741 if (GRPC_TRACER_ON(grpc_polling_trace)) {
742 gpr_log(GPR_ERROR, "PS:%p BEGIN_WAIT:%p kick_state=%s shutdown=%d",
743 pollset, worker, kick_state_string(worker->kick_state),
744 pollset->shutting_down);
745 }
Sree Kuchibhotla0d8431a2017-07-18 16:21:54 -0700746
Craig Tiller32f90ee2017-04-28 12:46:41 -0700747 if (gpr_cv_wait(&worker->cv, &pollset->mu, deadline) &&
748 worker->kick_state == UNKICKED) {
Sree Kuchibhotla0d8431a2017-07-18 16:21:54 -0700749 /* If gpr_cv_wait returns true (i.e a timeout), pretend that the worker
750 received a kick */
Craig Tiller55624a32017-05-26 08:14:44 -0700751 SET_KICK_STATE(worker, KICKED);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700752 }
Craig Tillerba550da2017-05-01 14:26:31 +0000753 }
Craig Tiller4509c472017-04-27 19:05:13 +0000754 *now = gpr_now(now->clock_type);
755 }
Sree Kuchibhotla949d0752017-07-20 23:49:15 -0700756
Craig Tiller830e82a2017-05-31 16:26:27 -0700757 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Sree Kuchibhotla949d0752017-07-20 23:49:15 -0700758 gpr_log(GPR_ERROR,
759 "PS:%p BEGIN_DONE:%p kick_state=%s shutdown=%d "
760 "kicked_without_poller: %d",
761 pollset, worker, kick_state_string(worker->kick_state),
762 pollset->shutting_down, pollset->kicked_without_poller);
Craig Tiller830e82a2017-05-31 16:26:27 -0700763 }
Craig Tiller4509c472017-04-27 19:05:13 +0000764
Sree Kuchibhotlae6506bc2017-07-18 21:43:45 -0700765 /* We release pollset lock in this function at a couple of places:
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700766 * 1. Briefly when assigning pollset to a neighborhood
Sree Kuchibhotlae6506bc2017-07-18 21:43:45 -0700767 * 2. When doing gpr_cv_wait()
768 * It is possible that 'kicked_without_poller' was set to true during (1) and
769 * 'shutting_down' is set to true during (1) or (2). If either of them is
770 * true, this worker cannot do polling */
Sree Kuchibhotlae6506bc2017-07-18 21:43:45 -0700771 /* TODO(sreek): Perhaps there is a better way to handle kicked_without_poller
772 * case; especially when the worker is the DESIGNATED_POLLER */
773
Sree Kuchibhotlaa0616ef2017-07-18 23:49:49 -0700774 if (pollset->kicked_without_poller) {
775 pollset->kicked_without_poller = false;
yang-gdf92a642017-08-21 22:38:45 -0700776 GPR_TIMER_END("begin_worker", 0);
Sree Kuchibhotlaa0616ef2017-07-18 23:49:49 -0700777 return false;
778 }
779
yang-gdf92a642017-08-21 22:38:45 -0700780 GPR_TIMER_END("begin_worker", 0);
Sree Kuchibhotlaa0616ef2017-07-18 23:49:49 -0700781 return worker->kick_state == DESIGNATED_POLLER && !pollset->shutting_down;
Craig Tiller4509c472017-04-27 19:05:13 +0000782}
783
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700784static bool check_neighborhood_for_available_poller(
785 pollset_neighborhood *neighborhood) {
786 GPR_TIMER_BEGIN("check_neighborhood_for_available_poller", 0);
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700787 bool found_worker = false;
788 do {
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700789 grpc_pollset *inspect = neighborhood->active_root;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700790 if (inspect == NULL) {
791 break;
792 }
793 gpr_mu_lock(&inspect->mu);
794 GPR_ASSERT(!inspect->seen_inactive);
795 grpc_pollset_worker *inspect_worker = inspect->root_worker;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700796 if (inspect_worker != NULL) {
Craig Tillera4b8eb02017-04-29 00:13:52 +0000797 do {
Craig Tillerba550da2017-05-01 14:26:31 +0000798 switch (inspect_worker->kick_state) {
799 case UNKICKED:
800 if (gpr_atm_no_barrier_cas(&g_active_poller, 0,
801 (gpr_atm)inspect_worker)) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700802 if (GRPC_TRACER_ON(grpc_polling_trace)) {
803 gpr_log(GPR_DEBUG, " .. choose next poller to be %p",
804 inspect_worker);
805 }
Craig Tiller55624a32017-05-26 08:14:44 -0700806 SET_KICK_STATE(inspect_worker, DESIGNATED_POLLER);
Craig Tillerba550da2017-05-01 14:26:31 +0000807 if (inspect_worker->initialized_cv) {
yang-gdf92a642017-08-21 22:38:45 -0700808 GPR_TIMER_MARK("signal worker", 0);
Craig Tillerba550da2017-05-01 14:26:31 +0000809 gpr_cv_signal(&inspect_worker->cv);
810 }
Craig Tiller830e82a2017-05-31 16:26:27 -0700811 } else {
812 if (GRPC_TRACER_ON(grpc_polling_trace)) {
813 gpr_log(GPR_DEBUG, " .. beaten to choose next poller");
814 }
Craig Tillera4b8eb02017-04-29 00:13:52 +0000815 }
Craig Tillerba550da2017-05-01 14:26:31 +0000816 // even if we didn't win the cas, there's a worker, we can stop
817 found_worker = true;
818 break;
819 case KICKED:
820 break;
821 case DESIGNATED_POLLER:
822 found_worker = true; // ok, so someone else found the worker, but
823 // we'll accept that
824 break;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700825 }
Craig Tillera4b8eb02017-04-29 00:13:52 +0000826 inspect_worker = inspect_worker->next;
Craig Tiller830e82a2017-05-31 16:26:27 -0700827 } while (!found_worker && inspect_worker != inspect->root_worker);
Craig Tillera4b8eb02017-04-29 00:13:52 +0000828 }
829 if (!found_worker) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700830 if (GRPC_TRACER_ON(grpc_polling_trace)) {
831 gpr_log(GPR_DEBUG, " .. mark pollset %p inactive", inspect);
832 }
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700833 inspect->seen_inactive = true;
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700834 if (inspect == neighborhood->active_root) {
835 neighborhood->active_root =
Craig Tillera95bacf2017-05-01 12:51:24 -0700836 inspect->next == inspect ? NULL : inspect->next;
Craig Tiller2acab6e2017-04-30 23:06:33 +0000837 }
838 inspect->next->prev = inspect->prev;
839 inspect->prev->next = inspect->next;
Craig Tillere00d7332017-05-01 15:43:51 +0000840 inspect->next = inspect->prev = NULL;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700841 }
842 gpr_mu_unlock(&inspect->mu);
843 } while (!found_worker);
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700844 GPR_TIMER_END("check_neighborhood_for_available_poller", 0);
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700845 return found_worker;
846}
847
Craig Tiller4509c472017-04-27 19:05:13 +0000848static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
849 grpc_pollset_worker *worker,
850 grpc_pollset_worker **worker_hdl) {
yang-gdf92a642017-08-21 22:38:45 -0700851 GPR_TIMER_BEGIN("end_worker", 0);
Craig Tiller830e82a2017-05-31 16:26:27 -0700852 if (GRPC_TRACER_ON(grpc_polling_trace)) {
853 gpr_log(GPR_DEBUG, "PS:%p END_WORKER:%p", pollset, worker);
854 }
Craig Tiller8502ecb2017-04-28 14:22:01 -0700855 if (worker_hdl != NULL) *worker_hdl = NULL;
Craig Tiller830e82a2017-05-31 16:26:27 -0700856 /* Make sure we appear kicked */
Craig Tiller55624a32017-05-26 08:14:44 -0700857 SET_KICK_STATE(worker, KICKED);
Craig Tiller50da5ec2017-05-01 13:51:14 -0700858 grpc_closure_list_move(&worker->schedule_on_end_work,
859 &exec_ctx->closure_list);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700860 if (gpr_atm_no_barrier_load(&g_active_poller) == (gpr_atm)worker) {
Craig Tillera4b8eb02017-04-29 00:13:52 +0000861 if (worker->next != worker && worker->next->kick_state == UNKICKED) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700862 if (GRPC_TRACER_ON(grpc_polling_trace)) {
863 gpr_log(GPR_DEBUG, " .. choose next poller to be peer %p", worker);
864 }
Craig Tiller2acab6e2017-04-30 23:06:33 +0000865 GPR_ASSERT(worker->next->initialized_cv);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700866 gpr_atm_no_barrier_store(&g_active_poller, (gpr_atm)worker->next);
Craig Tiller55624a32017-05-26 08:14:44 -0700867 SET_KICK_STATE(worker->next, DESIGNATED_POLLER);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700868 gpr_cv_signal(&worker->next->cv);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700869 if (grpc_exec_ctx_has_work(exec_ctx)) {
870 gpr_mu_unlock(&pollset->mu);
871 grpc_exec_ctx_flush(exec_ctx);
872 gpr_mu_lock(&pollset->mu);
873 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700874 } else {
875 gpr_atm_no_barrier_store(&g_active_poller, 0);
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700876 size_t poller_neighborhood_idx =
877 (size_t)(pollset->neighborhood - g_neighborhoods);
Craig Tillerbb742672017-05-17 22:19:05 +0000878 gpr_mu_unlock(&pollset->mu);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700879 bool found_worker = false;
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700880 bool scan_state[MAX_NEIGHBORHOODS];
881 for (size_t i = 0; !found_worker && i < g_num_neighborhoods; i++) {
882 pollset_neighborhood *neighborhood =
883 &g_neighborhoods[(poller_neighborhood_idx + i) %
884 g_num_neighborhoods];
885 if (gpr_mu_trylock(&neighborhood->mu)) {
886 found_worker = check_neighborhood_for_available_poller(neighborhood);
887 gpr_mu_unlock(&neighborhood->mu);
Craig Tillerba550da2017-05-01 14:26:31 +0000888 scan_state[i] = true;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700889 } else {
Craig Tillerba550da2017-05-01 14:26:31 +0000890 scan_state[i] = false;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700891 }
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700892 }
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700893 for (size_t i = 0; !found_worker && i < g_num_neighborhoods; i++) {
Craig Tillerba550da2017-05-01 14:26:31 +0000894 if (scan_state[i]) continue;
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700895 pollset_neighborhood *neighborhood =
896 &g_neighborhoods[(poller_neighborhood_idx + i) %
897 g_num_neighborhoods];
898 gpr_mu_lock(&neighborhood->mu);
899 found_worker = check_neighborhood_for_available_poller(neighborhood);
900 gpr_mu_unlock(&neighborhood->mu);
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700901 }
Craig Tiller8502ecb2017-04-28 14:22:01 -0700902 grpc_exec_ctx_flush(exec_ctx);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700903 gpr_mu_lock(&pollset->mu);
904 }
Craig Tiller50da5ec2017-05-01 13:51:14 -0700905 } else if (grpc_exec_ctx_has_work(exec_ctx)) {
906 gpr_mu_unlock(&pollset->mu);
907 grpc_exec_ctx_flush(exec_ctx);
908 gpr_mu_lock(&pollset->mu);
Craig Tiller4509c472017-04-27 19:05:13 +0000909 }
910 if (worker->initialized_cv) {
911 gpr_cv_destroy(&worker->cv);
912 }
Craig Tiller830e82a2017-05-31 16:26:27 -0700913 if (GRPC_TRACER_ON(grpc_polling_trace)) {
914 gpr_log(GPR_DEBUG, " .. remove worker");
915 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700916 if (EMPTIED == worker_remove(pollset, worker)) {
Craig Tiller4509c472017-04-27 19:05:13 +0000917 pollset_maybe_finish_shutdown(exec_ctx, pollset);
918 }
Craig Tillera4b8eb02017-04-29 00:13:52 +0000919 GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
yang-gdf92a642017-08-21 22:38:45 -0700920 GPR_TIMER_END("end_worker", 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000921}
922
923/* pollset->po.mu lock must be held by the caller before calling this.
924 The function pollset_work() may temporarily release the lock (pollset->po.mu)
925 during the course of its execution but it will always re-acquire the lock and
926 ensure that it is held by the time the function returns */
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700927static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *ps,
Craig Tiller4509c472017-04-27 19:05:13 +0000928 grpc_pollset_worker **worker_hdl,
929 gpr_timespec now, gpr_timespec deadline) {
930 grpc_pollset_worker worker;
931 grpc_error *error = GRPC_ERROR_NONE;
932 static const char *err_desc = "pollset_work";
yang-gdf92a642017-08-21 22:38:45 -0700933 GPR_TIMER_BEGIN("pollset_work", 0);
Sree Kuchibhotlab154cd12017-08-25 10:33:41 -0700934 if (ps->kicked_without_poller) {
935 ps->kicked_without_poller = false;
yang-gdf92a642017-08-21 22:38:45 -0700936 GPR_TIMER_END("pollset_work", 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000937 return GRPC_ERROR_NONE;
938 }
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700939
940 if (begin_worker(ps, &worker, worker_hdl, &now, deadline)) {
941 gpr_tls_set(&g_current_thread_pollset, (intptr_t)ps);
Craig Tiller4509c472017-04-27 19:05:13 +0000942 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700943 GPR_ASSERT(!ps->shutting_down);
944 GPR_ASSERT(!ps->seen_inactive);
945
946 gpr_mu_unlock(&ps->mu); /* unlock */
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700947 /* This is the designated polling thread at this point and should ideally do
948 polling. However, if there are unprocessed events left from a previous
949 call to do_epoll_wait(), skip calling epoll_wait() in this iteration and
950 process the pending epoll events.
951
952 The reason for decoupling do_epoll_wait and process_epoll_events is to
953 better distrubute the work (i.e handling epoll events) across multiple
954 threads
955
956 process_epoll_events() returns very quickly: It just queues the work on
957 exec_ctx but does not execute it (the actual exectution or more
958 accurately grpc_exec_ctx_flush() happens in end_worker() AFTER selecting
959 a designated poller). So we are not waiting long periods without a
960 designated poller */
Sree Kuchibhotlaa92a9cc2017-08-27 14:02:15 -0700961 if (gpr_atm_acq_load(&g_epoll_set.cursor) ==
962 gpr_atm_acq_load(&g_epoll_set.num_events)) {
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700963 append_error(&error, do_epoll_wait(exec_ctx, ps, now, deadline),
964 err_desc);
965 }
966 append_error(&error, process_epoll_events(exec_ctx, ps), err_desc);
967
968 gpr_mu_lock(&ps->mu); /* lock */
969
Craig Tiller4509c472017-04-27 19:05:13 +0000970 gpr_tls_set(&g_current_thread_worker, 0);
Craig Tiller830e82a2017-05-31 16:26:27 -0700971 } else {
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700972 gpr_tls_set(&g_current_thread_pollset, (intptr_t)ps);
Craig Tiller4509c472017-04-27 19:05:13 +0000973 }
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700974 end_worker(exec_ctx, ps, &worker, worker_hdl);
975
Craig Tiller8502ecb2017-04-28 14:22:01 -0700976 gpr_tls_set(&g_current_thread_pollset, 0);
yang-gdf92a642017-08-21 22:38:45 -0700977 GPR_TIMER_END("pollset_work", 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000978 return error;
979}
980
981static grpc_error *pollset_kick(grpc_pollset *pollset,
982 grpc_pollset_worker *specific_worker) {
yang-gdf92a642017-08-21 22:38:45 -0700983 GPR_TIMER_BEGIN("pollset_kick", 0);
984 grpc_error *ret_err = GRPC_ERROR_NONE;
Craig Tillerb89bac02017-05-26 15:20:32 +0000985 if (GRPC_TRACER_ON(grpc_polling_trace)) {
986 gpr_strvec log;
987 gpr_strvec_init(&log);
988 char *tmp;
Craig Tiller75aef7f2017-05-26 08:26:08 -0700989 gpr_asprintf(
990 &tmp, "PS:%p KICK:%p curps=%p curworker=%p root=%p", pollset,
991 specific_worker, (void *)gpr_tls_get(&g_current_thread_pollset),
992 (void *)gpr_tls_get(&g_current_thread_worker), pollset->root_worker);
Craig Tillerb89bac02017-05-26 15:20:32 +0000993 gpr_strvec_add(&log, tmp);
994 if (pollset->root_worker != NULL) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700995 gpr_asprintf(&tmp, " {kick_state=%s next=%p {kick_state=%s}}",
996 kick_state_string(pollset->root_worker->kick_state),
997 pollset->root_worker->next,
998 kick_state_string(pollset->root_worker->next->kick_state));
Craig Tillerb89bac02017-05-26 15:20:32 +0000999 gpr_strvec_add(&log, tmp);
1000 }
1001 if (specific_worker != NULL) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001002 gpr_asprintf(&tmp, " worker_kick_state=%s",
1003 kick_state_string(specific_worker->kick_state));
Craig Tillerb89bac02017-05-26 15:20:32 +00001004 gpr_strvec_add(&log, tmp);
1005 }
1006 tmp = gpr_strvec_flatten(&log, NULL);
1007 gpr_strvec_destroy(&log);
Craig Tiller830e82a2017-05-31 16:26:27 -07001008 gpr_log(GPR_ERROR, "%s", tmp);
Craig Tillerb89bac02017-05-26 15:20:32 +00001009 gpr_free(tmp);
1010 }
Sree Kuchibhotlafb349402017-09-06 10:58:06 -07001011
Craig Tiller4509c472017-04-27 19:05:13 +00001012 if (specific_worker == NULL) {
1013 if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) {
Craig Tiller375eb252017-04-27 23:29:12 +00001014 grpc_pollset_worker *root_worker = pollset->root_worker;
1015 if (root_worker == NULL) {
Craig Tiller4509c472017-04-27 19:05:13 +00001016 pollset->kicked_without_poller = true;
Craig Tiller75aef7f2017-05-26 08:26:08 -07001017 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001018 gpr_log(GPR_ERROR, " .. kicked_without_poller");
Craig Tiller75aef7f2017-05-26 08:26:08 -07001019 }
yang-gdf92a642017-08-21 22:38:45 -07001020 goto done;
Craig Tiller375eb252017-04-27 23:29:12 +00001021 }
Craig Tiller32f90ee2017-04-28 12:46:41 -07001022 grpc_pollset_worker *next_worker = root_worker->next;
Craig Tiller830e82a2017-05-31 16:26:27 -07001023 if (root_worker->kick_state == KICKED) {
Craig Tiller75aef7f2017-05-26 08:26:08 -07001024 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001025 gpr_log(GPR_ERROR, " .. already kicked %p", root_worker);
1026 }
1027 SET_KICK_STATE(root_worker, KICKED);
yang-gdf92a642017-08-21 22:38:45 -07001028 goto done;
Craig Tiller830e82a2017-05-31 16:26:27 -07001029 } else if (next_worker->kick_state == KICKED) {
1030 if (GRPC_TRACER_ON(grpc_polling_trace)) {
1031 gpr_log(GPR_ERROR, " .. already kicked %p", next_worker);
1032 }
1033 SET_KICK_STATE(next_worker, KICKED);
yang-gdf92a642017-08-21 22:38:45 -07001034 goto done;
Craig Tiller830e82a2017-05-31 16:26:27 -07001035 } else if (root_worker ==
1036 next_worker && // only try and wake up a poller if
1037 // there is no next worker
1038 root_worker == (grpc_pollset_worker *)gpr_atm_no_barrier_load(
1039 &g_active_poller)) {
1040 if (GRPC_TRACER_ON(grpc_polling_trace)) {
1041 gpr_log(GPR_ERROR, " .. kicked %p", root_worker);
Craig Tiller75aef7f2017-05-26 08:26:08 -07001042 }
Craig Tiller55624a32017-05-26 08:14:44 -07001043 SET_KICK_STATE(root_worker, KICKED);
yang-gdf92a642017-08-21 22:38:45 -07001044 ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1045 goto done;
Craig Tiller8502ecb2017-04-28 14:22:01 -07001046 } else if (next_worker->kick_state == UNKICKED) {
Craig Tiller75aef7f2017-05-26 08:26:08 -07001047 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001048 gpr_log(GPR_ERROR, " .. kicked %p", next_worker);
Craig Tiller75aef7f2017-05-26 08:26:08 -07001049 }
Craig Tiller8502ecb2017-04-28 14:22:01 -07001050 GPR_ASSERT(next_worker->initialized_cv);
Craig Tiller55624a32017-05-26 08:14:44 -07001051 SET_KICK_STATE(next_worker, KICKED);
Craig Tiller375eb252017-04-27 23:29:12 +00001052 gpr_cv_signal(&next_worker->cv);
yang-gdf92a642017-08-21 22:38:45 -07001053 goto done;
Craig Tiller55624a32017-05-26 08:14:44 -07001054 } else if (next_worker->kick_state == DESIGNATED_POLLER) {
1055 if (root_worker->kick_state != DESIGNATED_POLLER) {
Craig Tiller75aef7f2017-05-26 08:26:08 -07001056 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001057 gpr_log(
1058 GPR_ERROR,
1059 " .. kicked root non-poller %p (initialized_cv=%d) (poller=%p)",
1060 root_worker, root_worker->initialized_cv, next_worker);
Craig Tiller75aef7f2017-05-26 08:26:08 -07001061 }
Craig Tiller55624a32017-05-26 08:14:44 -07001062 SET_KICK_STATE(root_worker, KICKED);
1063 if (root_worker->initialized_cv) {
1064 gpr_cv_signal(&root_worker->cv);
1065 }
yang-gdf92a642017-08-21 22:38:45 -07001066 goto done;
Craig Tiller55624a32017-05-26 08:14:44 -07001067 } else {
Craig Tiller75aef7f2017-05-26 08:26:08 -07001068 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001069 gpr_log(GPR_ERROR, " .. non-root poller %p (root=%p)", next_worker,
Craig Tiller75aef7f2017-05-26 08:26:08 -07001070 root_worker);
1071 }
Craig Tiller55624a32017-05-26 08:14:44 -07001072 SET_KICK_STATE(next_worker, KICKED);
yang-gdf92a642017-08-21 22:38:45 -07001073 ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1074 goto done;
Craig Tiller55624a32017-05-26 08:14:44 -07001075 }
Craig Tiller8502ecb2017-04-28 14:22:01 -07001076 } else {
Craig Tiller55624a32017-05-26 08:14:44 -07001077 GPR_ASSERT(next_worker->kick_state == KICKED);
1078 SET_KICK_STATE(next_worker, KICKED);
yang-gdf92a642017-08-21 22:38:45 -07001079 goto done;
Craig Tiller4509c472017-04-27 19:05:13 +00001080 }
1081 } else {
Craig Tiller830e82a2017-05-31 16:26:27 -07001082 if (GRPC_TRACER_ON(grpc_polling_trace)) {
1083 gpr_log(GPR_ERROR, " .. kicked while waking up");
1084 }
yang-gdf92a642017-08-21 22:38:45 -07001085 goto done;
Craig Tiller4509c472017-04-27 19:05:13 +00001086 }
Sree Kuchibhotlafb349402017-09-06 10:58:06 -07001087
1088 GPR_UNREACHABLE_CODE(goto done);
1089 }
1090
1091 if (specific_worker->kick_state == KICKED) {
Craig Tiller75aef7f2017-05-26 08:26:08 -07001092 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001093 gpr_log(GPR_ERROR, " .. specific worker already kicked");
Craig Tiller75aef7f2017-05-26 08:26:08 -07001094 }
yang-gdf92a642017-08-21 22:38:45 -07001095 goto done;
Craig Tiller4509c472017-04-27 19:05:13 +00001096 } else if (gpr_tls_get(&g_current_thread_worker) ==
1097 (intptr_t)specific_worker) {
Craig Tiller75aef7f2017-05-26 08:26:08 -07001098 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001099 gpr_log(GPR_ERROR, " .. mark %p kicked", specific_worker);
Craig Tiller75aef7f2017-05-26 08:26:08 -07001100 }
Craig Tiller55624a32017-05-26 08:14:44 -07001101 SET_KICK_STATE(specific_worker, KICKED);
yang-gdf92a642017-08-21 22:38:45 -07001102 goto done;
Craig Tiller32f90ee2017-04-28 12:46:41 -07001103 } else if (specific_worker ==
1104 (grpc_pollset_worker *)gpr_atm_no_barrier_load(&g_active_poller)) {
Craig Tiller75aef7f2017-05-26 08:26:08 -07001105 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001106 gpr_log(GPR_ERROR, " .. kick active poller");
Craig Tiller75aef7f2017-05-26 08:26:08 -07001107 }
Craig Tiller55624a32017-05-26 08:14:44 -07001108 SET_KICK_STATE(specific_worker, KICKED);
yang-gdf92a642017-08-21 22:38:45 -07001109 ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1110 goto done;
Craig Tiller8502ecb2017-04-28 14:22:01 -07001111 } else if (specific_worker->initialized_cv) {
Craig Tiller75aef7f2017-05-26 08:26:08 -07001112 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001113 gpr_log(GPR_ERROR, " .. kick waiting worker");
Craig Tiller75aef7f2017-05-26 08:26:08 -07001114 }
Craig Tiller55624a32017-05-26 08:14:44 -07001115 SET_KICK_STATE(specific_worker, KICKED);
Craig Tiller4509c472017-04-27 19:05:13 +00001116 gpr_cv_signal(&specific_worker->cv);
yang-gdf92a642017-08-21 22:38:45 -07001117 goto done;
Craig Tiller8502ecb2017-04-28 14:22:01 -07001118 } else {
Craig Tiller75aef7f2017-05-26 08:26:08 -07001119 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001120 gpr_log(GPR_ERROR, " .. kick non-waiting worker");
Craig Tiller75aef7f2017-05-26 08:26:08 -07001121 }
Craig Tiller55624a32017-05-26 08:14:44 -07001122 SET_KICK_STATE(specific_worker, KICKED);
yang-gdf92a642017-08-21 22:38:45 -07001123 goto done;
Craig Tiller4509c472017-04-27 19:05:13 +00001124 }
yang-gdf92a642017-08-21 22:38:45 -07001125done:
1126 GPR_TIMER_END("pollset_kick", 0);
1127 return ret_err;
Craig Tiller4509c472017-04-27 19:05:13 +00001128}
1129
1130static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1131 grpc_fd *fd) {}
1132
Craig Tiller4509c472017-04-27 19:05:13 +00001133/*******************************************************************************
Craig Tillerc67cc992017-04-27 10:15:51 -07001134 * Pollset-set Definitions
1135 */
1136
1137static grpc_pollset_set *pollset_set_create(void) {
1138 return (grpc_pollset_set *)((intptr_t)0xdeafbeef);
1139}
1140
1141static void pollset_set_destroy(grpc_exec_ctx *exec_ctx,
1142 grpc_pollset_set *pss) {}
1143
1144static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
1145 grpc_fd *fd) {}
1146
1147static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
1148 grpc_fd *fd) {}
1149
1150static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
1151 grpc_pollset_set *pss, grpc_pollset *ps) {}
1152
1153static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
1154 grpc_pollset_set *pss, grpc_pollset *ps) {}
1155
1156static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
1157 grpc_pollset_set *bag,
1158 grpc_pollset_set *item) {}
1159
1160static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
1161 grpc_pollset_set *bag,
1162 grpc_pollset_set *item) {}
1163
1164/*******************************************************************************
1165 * Event engine binding
1166 */
1167
1168static void shutdown_engine(void) {
1169 fd_global_shutdown();
1170 pollset_global_shutdown();
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -07001171 epoll_set_shutdown();
Craig Tillerc67cc992017-04-27 10:15:51 -07001172}
1173
1174static const grpc_event_engine_vtable vtable = {
1175 .pollset_size = sizeof(grpc_pollset),
1176
1177 .fd_create = fd_create,
1178 .fd_wrapped_fd = fd_wrapped_fd,
1179 .fd_orphan = fd_orphan,
1180 .fd_shutdown = fd_shutdown,
1181 .fd_is_shutdown = fd_is_shutdown,
1182 .fd_notify_on_read = fd_notify_on_read,
1183 .fd_notify_on_write = fd_notify_on_write,
1184 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
Craig Tillerc67cc992017-04-27 10:15:51 -07001185
1186 .pollset_init = pollset_init,
1187 .pollset_shutdown = pollset_shutdown,
1188 .pollset_destroy = pollset_destroy,
1189 .pollset_work = pollset_work,
1190 .pollset_kick = pollset_kick,
1191 .pollset_add_fd = pollset_add_fd,
1192
1193 .pollset_set_create = pollset_set_create,
1194 .pollset_set_destroy = pollset_set_destroy,
1195 .pollset_set_add_pollset = pollset_set_add_pollset,
1196 .pollset_set_del_pollset = pollset_set_del_pollset,
1197 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
1198 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
1199 .pollset_set_add_fd = pollset_set_add_fd,
1200 .pollset_set_del_fd = pollset_set_del_fd,
1201
Craig Tillerc67cc992017-04-27 10:15:51 -07001202 .shutdown_engine = shutdown_engine,
1203};
1204
1205/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -07001206 * Create epoll_fd (epoll_set_init() takes care of that) to make sure epoll
1207 * support is available */
Craig Tiller6f0af492017-04-27 19:26:16 +00001208const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) {
Craig Tillerc67cc992017-04-27 10:15:51 -07001209 if (!grpc_has_wakeup_fd()) {
1210 return NULL;
1211 }
1212
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -07001213 if (!epoll_set_init()) {
Craig Tillerc67cc992017-04-27 10:15:51 -07001214 return NULL;
1215 }
1216
Craig Tillerc67cc992017-04-27 10:15:51 -07001217 fd_global_init();
1218
1219 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
Craig Tiller4509c472017-04-27 19:05:13 +00001220 fd_global_shutdown();
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -07001221 epoll_set_shutdown();
Craig Tillerc67cc992017-04-27 10:15:51 -07001222 return NULL;
1223 }
1224
1225 return &vtable;
1226}
1227
1228#else /* defined(GRPC_LINUX_EPOLL) */
1229#if defined(GRPC_POSIX_SOCKET)
1230#include "src/core/lib/iomgr/ev_posix.h"
1231/* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
1232 * NULL */
Craig Tiller9ddb3152017-04-27 21:32:56 +00001233const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) {
1234 return NULL;
1235}
Craig Tillerc67cc992017-04-27 10:15:51 -07001236#endif /* defined(GRPC_POSIX_SOCKET) */
1237#endif /* !defined(GRPC_LINUX_EPOLL) */