blob: b76eb9e1c9f188f896245073e501b799c4edf0eb [file] [log] [blame]
Craig Tillerc67cc992017-04-27 10:15:51 -07001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2017 gRPC authors.
Craig Tillerc67cc992017-04-27 10:15:51 -07004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
Craig Tillerc67cc992017-04-27 10:15:51 -07008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
Craig Tillerc67cc992017-04-27 10:15:51 -070010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
Craig Tillerc67cc992017-04-27 10:15:51 -070016 *
17 */
18
19#include "src/core/lib/iomgr/port.h"
20
21/* This polling engine is only relevant on linux kernels supporting epoll() */
22#ifdef GRPC_LINUX_EPOLL
23
Craig Tiller4509c472017-04-27 19:05:13 +000024#include "src/core/lib/iomgr/ev_epoll1_linux.h"
Craig Tillerc67cc992017-04-27 10:15:51 -070025
26#include <assert.h>
27#include <errno.h>
28#include <poll.h>
29#include <pthread.h>
30#include <string.h>
31#include <sys/epoll.h>
32#include <sys/socket.h>
33#include <unistd.h>
34
35#include <grpc/support/alloc.h>
Craig Tiller6de05932017-04-28 09:17:38 -070036#include <grpc/support/cpu.h>
Craig Tillerc67cc992017-04-27 10:15:51 -070037#include <grpc/support/log.h>
38#include <grpc/support/string_util.h>
39#include <grpc/support/tls.h>
40#include <grpc/support/useful.h>
41
Craig Tillerb4bb1cd2017-07-20 14:18:17 -070042#include "src/core/lib/debug/stats.h"
Craig Tillerc67cc992017-04-27 10:15:51 -070043#include "src/core/lib/iomgr/ev_posix.h"
44#include "src/core/lib/iomgr/iomgr_internal.h"
45#include "src/core/lib/iomgr/lockfree_event.h"
Craig Tillerc67cc992017-04-27 10:15:51 -070046#include "src/core/lib/iomgr/wakeup_fd_posix.h"
Craig Tillerc67cc992017-04-27 10:15:51 -070047#include "src/core/lib/profiling/timers.h"
48#include "src/core/lib/support/block_annotate.h"
Craig Tillerb89bac02017-05-26 15:20:32 +000049#include "src/core/lib/support/string.h"
Craig Tillerc67cc992017-04-27 10:15:51 -070050
Craig Tillerc67cc992017-04-27 10:15:51 -070051static grpc_wakeup_fd global_wakeup_fd;
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -070052
53/*******************************************************************************
54 * Singleton epoll set related fields
55 */
56
57#define MAX_EPOLL_EVENTS 100
Sree Kuchibhotla19614522017-08-25 17:10:10 -070058#define MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION 1
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -070059
Sree Kuchibhotlae01940f2017-08-27 18:10:12 -070060/* NOTE ON SYNCHRONIZATION:
61 * - Fields in this struct are only modified by the designated poller. Hence
62 * there is no need for any locks to protect the struct.
63 * - num_events and cursor fields have to be of atomic type to provide memory
64 * visibility guarantees only. i.e In case of multiple pollers, the designated
65 * polling thread keeps changing; the thread that wrote these values may be
66 * different from the thread reading the values
67 */
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -070068typedef struct epoll_set {
69 int epfd;
70
71 /* The epoll_events after the last call to epoll_wait() */
72 struct epoll_event events[MAX_EPOLL_EVENTS];
73
74 /* The number of epoll_events after the last call to epoll_wait() */
Sree Kuchibhotlaa92a9cc2017-08-27 14:02:15 -070075 gpr_atm num_events;
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -070076
77 /* Index of the first event in epoll_events that has to be processed. This
78 * field is only valid if num_events > 0 */
Sree Kuchibhotlaa92a9cc2017-08-27 14:02:15 -070079 gpr_atm cursor;
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -070080} epoll_set;
81
82/* The global singleton epoll set */
83static epoll_set g_epoll_set;
84
85/* Must be called *only* once */
86static bool epoll_set_init() {
87 g_epoll_set.epfd = epoll_create1(EPOLL_CLOEXEC);
88 if (g_epoll_set.epfd < 0) {
89 gpr_log(GPR_ERROR, "epoll unavailable");
90 return false;
91 }
92
Sree Kuchibhotlaa92a9cc2017-08-27 14:02:15 -070093 gpr_log(GPR_INFO, "grpc epoll fd: %d", g_epoll_set.epfd);
94 gpr_atm_no_barrier_store(&g_epoll_set.num_events, 0);
95 gpr_atm_no_barrier_store(&g_epoll_set.cursor, 0);
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -070096 return true;
97}
98
99/* epoll_set_init() MUST be called before calling this. */
100static void epoll_set_shutdown() {
101 if (g_epoll_set.epfd >= 0) {
102 close(g_epoll_set.epfd);
103 g_epoll_set.epfd = -1;
104 }
105}
Craig Tillerc67cc992017-04-27 10:15:51 -0700106
107/*******************************************************************************
108 * Fd Declarations
109 */
110
111struct grpc_fd {
112 int fd;
113
Craig Tillerc67cc992017-04-27 10:15:51 -0700114 gpr_atm read_closure;
115 gpr_atm write_closure;
116
117 struct grpc_fd *freelist_next;
Craig Tillerc67cc992017-04-27 10:15:51 -0700118
119 /* The pollset that last noticed that the fd is readable. The actual type
120 * stored in this is (grpc_pollset *) */
121 gpr_atm read_notifier_pollset;
122
123 grpc_iomgr_object iomgr_object;
124};
125
126static void fd_global_init(void);
127static void fd_global_shutdown(void);
128
129/*******************************************************************************
130 * Pollset Declarations
131 */
132
Craig Tiller43bf2592017-04-28 23:21:01 +0000133typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state;
Craig Tillerc67cc992017-04-27 10:15:51 -0700134
Craig Tiller830e82a2017-05-31 16:26:27 -0700135static const char *kick_state_string(kick_state st) {
136 switch (st) {
137 case UNKICKED:
138 return "UNKICKED";
139 case KICKED:
140 return "KICKED";
141 case DESIGNATED_POLLER:
142 return "DESIGNATED_POLLER";
143 }
144 GPR_UNREACHABLE_CODE(return "UNKNOWN");
145}
146
Craig Tillerc67cc992017-04-27 10:15:51 -0700147struct grpc_pollset_worker {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700148 kick_state kick_state;
Craig Tiller55624a32017-05-26 08:14:44 -0700149 int kick_state_mutator; // which line of code last changed kick state
Craig Tillerc67cc992017-04-27 10:15:51 -0700150 bool initialized_cv;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700151 grpc_pollset_worker *next;
152 grpc_pollset_worker *prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700153 gpr_cv cv;
Craig Tiller50da5ec2017-05-01 13:51:14 -0700154 grpc_closure_list schedule_on_end_work;
Craig Tillerc67cc992017-04-27 10:15:51 -0700155};
156
Craig Tiller55624a32017-05-26 08:14:44 -0700157#define SET_KICK_STATE(worker, state) \
158 do { \
159 (worker)->kick_state = (state); \
160 (worker)->kick_state_mutator = __LINE__; \
161 } while (false)
162
Craig Tillerba550da2017-05-01 14:26:31 +0000163#define MAX_NEIGHBOURHOODS 1024
164
Craig Tiller6de05932017-04-28 09:17:38 -0700165typedef struct pollset_neighbourhood {
166 gpr_mu mu;
167 grpc_pollset *active_root;
Craig Tiller6de05932017-04-28 09:17:38 -0700168 char pad[GPR_CACHELINE_SIZE];
169} pollset_neighbourhood;
170
Craig Tillerc67cc992017-04-27 10:15:51 -0700171struct grpc_pollset {
Craig Tiller6de05932017-04-28 09:17:38 -0700172 gpr_mu mu;
173 pollset_neighbourhood *neighbourhood;
Craig Tillere00d7332017-05-01 15:43:51 +0000174 bool reassigning_neighbourhood;
Craig Tiller4509c472017-04-27 19:05:13 +0000175 grpc_pollset_worker *root_worker;
176 bool kicked_without_poller;
Sree Kuchibhotla0d8431a2017-07-18 16:21:54 -0700177
178 /* Set to true if the pollset is observed to have no workers available to
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700179 poll */
Craig Tiller6de05932017-04-28 09:17:38 -0700180 bool seen_inactive;
Sree Kuchibhotla0d8431a2017-07-18 16:21:54 -0700181 bool shutting_down; /* Is the pollset shutting down ? */
Craig Tiller4509c472017-04-27 19:05:13 +0000182 grpc_closure *shutdown_closure; /* Called after after shutdown is complete */
Sree Kuchibhotla0d8431a2017-07-18 16:21:54 -0700183
184 /* Number of workers who are *about-to* attach themselves to the pollset
185 * worker list */
Craig Tillerba550da2017-05-01 14:26:31 +0000186 int begin_refs;
Craig Tiller6de05932017-04-28 09:17:38 -0700187
188 grpc_pollset *next;
189 grpc_pollset *prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700190};
191
192/*******************************************************************************
193 * Pollset-set Declarations
194 */
Craig Tiller6de05932017-04-28 09:17:38 -0700195
Craig Tiller61f96c12017-05-12 13:36:39 -0700196struct grpc_pollset_set {
197 char unused;
198};
Craig Tillerc67cc992017-04-27 10:15:51 -0700199
200/*******************************************************************************
201 * Common helpers
202 */
203
204static bool append_error(grpc_error **composite, grpc_error *error,
205 const char *desc) {
206 if (error == GRPC_ERROR_NONE) return true;
207 if (*composite == GRPC_ERROR_NONE) {
208 *composite = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc);
209 }
210 *composite = grpc_error_add_child(*composite, error);
211 return false;
212}
213
214/*******************************************************************************
215 * Fd Definitions
216 */
217
218/* We need to keep a freelist not because of any concerns of malloc performance
219 * but instead so that implementations with multiple threads in (for example)
220 * epoll_wait deal with the race between pollset removal and incoming poll
221 * notifications.
222 *
223 * The problem is that the poller ultimately holds a reference to this
224 * object, so it is very difficult to know when is safe to free it, at least
225 * without some expensive synchronization.
226 *
227 * If we keep the object freelisted, in the worst case losing this race just
228 * becomes a spurious read notification on a reused fd.
229 */
230
231/* The alarm system needs to be able to wakeup 'some poller' sometimes
232 * (specifically when a new alarm needs to be triggered earlier than the next
233 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
234 * case occurs. */
235
236static grpc_fd *fd_freelist = NULL;
237static gpr_mu fd_freelist_mu;
238
Craig Tillerc67cc992017-04-27 10:15:51 -0700239static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
240
241static void fd_global_shutdown(void) {
242 gpr_mu_lock(&fd_freelist_mu);
243 gpr_mu_unlock(&fd_freelist_mu);
244 while (fd_freelist != NULL) {
245 grpc_fd *fd = fd_freelist;
246 fd_freelist = fd_freelist->freelist_next;
Craig Tillerc67cc992017-04-27 10:15:51 -0700247 gpr_free(fd);
248 }
249 gpr_mu_destroy(&fd_freelist_mu);
250}
251
252static grpc_fd *fd_create(int fd, const char *name) {
253 grpc_fd *new_fd = NULL;
254
255 gpr_mu_lock(&fd_freelist_mu);
256 if (fd_freelist != NULL) {
257 new_fd = fd_freelist;
258 fd_freelist = fd_freelist->freelist_next;
259 }
260 gpr_mu_unlock(&fd_freelist_mu);
261
262 if (new_fd == NULL) {
263 new_fd = gpr_malloc(sizeof(grpc_fd));
Craig Tillerc67cc992017-04-27 10:15:51 -0700264 }
265
Craig Tillerc67cc992017-04-27 10:15:51 -0700266 new_fd->fd = fd;
Craig Tillerc67cc992017-04-27 10:15:51 -0700267 grpc_lfev_init(&new_fd->read_closure);
268 grpc_lfev_init(&new_fd->write_closure);
269 gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
270
271 new_fd->freelist_next = NULL;
Craig Tillerc67cc992017-04-27 10:15:51 -0700272
273 char *fd_name;
274 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
275 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
Noah Eisen264879f2017-06-20 17:14:47 -0700276#ifndef NDEBUG
277 if (GRPC_TRACER_ON(grpc_trace_fd_refcount)) {
278 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, new_fd, fd_name);
279 }
Craig Tillerc67cc992017-04-27 10:15:51 -0700280#endif
281 gpr_free(fd_name);
Craig Tiller9ddb3152017-04-27 21:32:56 +0000282
283 struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET),
284 .data.ptr = new_fd};
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700285 if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
Craig Tiller9ddb3152017-04-27 21:32:56 +0000286 gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno));
287 }
288
Craig Tillerc67cc992017-04-27 10:15:51 -0700289 return new_fd;
290}
291
Craig Tiller4509c472017-04-27 19:05:13 +0000292static int fd_wrapped_fd(grpc_fd *fd) { return fd->fd; }
Craig Tillerc67cc992017-04-27 10:15:51 -0700293
Sree Kuchibhotlaf2641472017-08-02 23:46:40 -0700294/* if 'releasing_fd' is true, it means that we are going to detach the internal
295 * fd from grpc_fd structure (i.e which means we should not be calling
296 * shutdown() syscall on that fd) */
297static void fd_shutdown_internal(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
298 grpc_error *why, bool releasing_fd) {
Craig Tiller9ddb3152017-04-27 21:32:56 +0000299 if (grpc_lfev_set_shutdown(exec_ctx, &fd->read_closure,
300 GRPC_ERROR_REF(why))) {
Sree Kuchibhotlaf2641472017-08-02 23:46:40 -0700301 if (!releasing_fd) {
302 shutdown(fd->fd, SHUT_RDWR);
303 }
Craig Tiller9ddb3152017-04-27 21:32:56 +0000304 grpc_lfev_set_shutdown(exec_ctx, &fd->write_closure, GRPC_ERROR_REF(why));
305 }
306 GRPC_ERROR_UNREF(why);
307}
308
Sree Kuchibhotlaf2641472017-08-02 23:46:40 -0700309/* Might be called multiple times */
310static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
311 fd_shutdown_internal(exec_ctx, fd, why, false);
312}
313
Craig Tillerc67cc992017-04-27 10:15:51 -0700314static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
315 grpc_closure *on_done, int *release_fd,
Yuchen Zengd40a7ae2017-07-12 15:59:56 -0700316 bool already_closed, const char *reason) {
Craig Tillerc67cc992017-04-27 10:15:51 -0700317 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotlaf2641472017-08-02 23:46:40 -0700318 bool is_release_fd = (release_fd != NULL);
Craig Tillerc67cc992017-04-27 10:15:51 -0700319
Craig Tiller9ddb3152017-04-27 21:32:56 +0000320 if (!grpc_lfev_is_shutdown(&fd->read_closure)) {
Sree Kuchibhotlaf2641472017-08-02 23:46:40 -0700321 fd_shutdown_internal(exec_ctx, fd,
322 GRPC_ERROR_CREATE_FROM_COPIED_STRING(reason),
323 is_release_fd);
Craig Tiller9ddb3152017-04-27 21:32:56 +0000324 }
325
Craig Tillerc67cc992017-04-27 10:15:51 -0700326 /* If release_fd is not NULL, we should be relinquishing control of the file
327 descriptor fd->fd (but we still own the grpc_fd structure). */
Sree Kuchibhotlaf2641472017-08-02 23:46:40 -0700328 if (is_release_fd) {
Craig Tillerc67cc992017-04-27 10:15:51 -0700329 *release_fd = fd->fd;
Yuchen Zengd40a7ae2017-07-12 15:59:56 -0700330 } else if (!already_closed) {
Craig Tillerc67cc992017-04-27 10:15:51 -0700331 close(fd->fd);
Craig Tillerc67cc992017-04-27 10:15:51 -0700332 }
333
ncteisen274bbbe2017-06-08 14:57:11 -0700334 GRPC_CLOSURE_SCHED(exec_ctx, on_done, GRPC_ERROR_REF(error));
Craig Tillerc67cc992017-04-27 10:15:51 -0700335
Craig Tiller4509c472017-04-27 19:05:13 +0000336 grpc_iomgr_unregister_object(&fd->iomgr_object);
337 grpc_lfev_destroy(&fd->read_closure);
338 grpc_lfev_destroy(&fd->write_closure);
Craig Tillerc67cc992017-04-27 10:15:51 -0700339
Craig Tiller4509c472017-04-27 19:05:13 +0000340 gpr_mu_lock(&fd_freelist_mu);
341 fd->freelist_next = fd_freelist;
342 fd_freelist = fd;
343 gpr_mu_unlock(&fd_freelist_mu);
Craig Tillerc67cc992017-04-27 10:15:51 -0700344}
345
346static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
347 grpc_fd *fd) {
348 gpr_atm notifier = gpr_atm_acq_load(&fd->read_notifier_pollset);
349 return (grpc_pollset *)notifier;
350}
351
352static bool fd_is_shutdown(grpc_fd *fd) {
353 return grpc_lfev_is_shutdown(&fd->read_closure);
354}
355
Craig Tillerc67cc992017-04-27 10:15:51 -0700356static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
357 grpc_closure *closure) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700358 grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read");
Craig Tillerc67cc992017-04-27 10:15:51 -0700359}
360
361static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
362 grpc_closure *closure) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700363 grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write");
Craig Tillerc67cc992017-04-27 10:15:51 -0700364}
365
Craig Tiller4509c472017-04-27 19:05:13 +0000366static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
367 grpc_pollset *notifier) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700368 grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read");
Craig Tiller4509c472017-04-27 19:05:13 +0000369 /* Use release store to match with acquire load in fd_get_read_notifier */
370 gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
371}
372
373static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700374 grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write");
Craig Tillerc67cc992017-04-27 10:15:51 -0700375}
376
377/*******************************************************************************
378 * Pollset Definitions
379 */
380
Craig Tiller6de05932017-04-28 09:17:38 -0700381GPR_TLS_DECL(g_current_thread_pollset);
382GPR_TLS_DECL(g_current_thread_worker);
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700383
384/* The designated poller */
Craig Tiller6de05932017-04-28 09:17:38 -0700385static gpr_atm g_active_poller;
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700386
Craig Tiller6de05932017-04-28 09:17:38 -0700387static pollset_neighbourhood *g_neighbourhoods;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700388static size_t g_num_neighbourhoods;
Craig Tiller6de05932017-04-28 09:17:38 -0700389
Craig Tillerc67cc992017-04-27 10:15:51 -0700390/* Return true if first in list */
Craig Tiller32f90ee2017-04-28 12:46:41 -0700391static bool worker_insert(grpc_pollset *pollset, grpc_pollset_worker *worker) {
392 if (pollset->root_worker == NULL) {
393 pollset->root_worker = worker;
394 worker->next = worker->prev = worker;
Craig Tillerc67cc992017-04-27 10:15:51 -0700395 return true;
396 } else {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700397 worker->next = pollset->root_worker;
398 worker->prev = worker->next->prev;
399 worker->next->prev = worker;
400 worker->prev->next = worker;
Craig Tillerc67cc992017-04-27 10:15:51 -0700401 return false;
402 }
403}
404
405/* Return true if last in list */
406typedef enum { EMPTIED, NEW_ROOT, REMOVED } worker_remove_result;
407
Craig Tiller32f90ee2017-04-28 12:46:41 -0700408static worker_remove_result worker_remove(grpc_pollset *pollset,
Craig Tillerc67cc992017-04-27 10:15:51 -0700409 grpc_pollset_worker *worker) {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700410 if (worker == pollset->root_worker) {
411 if (worker == worker->next) {
412 pollset->root_worker = NULL;
Craig Tillerc67cc992017-04-27 10:15:51 -0700413 return EMPTIED;
414 } else {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700415 pollset->root_worker = worker->next;
416 worker->prev->next = worker->next;
417 worker->next->prev = worker->prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700418 return NEW_ROOT;
419 }
420 } else {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700421 worker->prev->next = worker->next;
422 worker->next->prev = worker->prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700423 return REMOVED;
424 }
425}
426
Craig Tillerba550da2017-05-01 14:26:31 +0000427static size_t choose_neighbourhood(void) {
428 return (size_t)gpr_cpu_current_cpu() % g_num_neighbourhoods;
429}
430
Craig Tiller4509c472017-04-27 19:05:13 +0000431static grpc_error *pollset_global_init(void) {
Craig Tiller4509c472017-04-27 19:05:13 +0000432 gpr_tls_init(&g_current_thread_pollset);
433 gpr_tls_init(&g_current_thread_worker);
Craig Tiller6de05932017-04-28 09:17:38 -0700434 gpr_atm_no_barrier_store(&g_active_poller, 0);
Craig Tiller375eb252017-04-27 23:29:12 +0000435 global_wakeup_fd.read_fd = -1;
436 grpc_error *err = grpc_wakeup_fd_init(&global_wakeup_fd);
437 if (err != GRPC_ERROR_NONE) return err;
Craig Tiller4509c472017-04-27 19:05:13 +0000438 struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLET),
439 .data.ptr = &global_wakeup_fd};
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700440 if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd,
441 &ev) != 0) {
Craig Tiller4509c472017-04-27 19:05:13 +0000442 return GRPC_OS_ERROR(errno, "epoll_ctl");
443 }
Craig Tillerba550da2017-05-01 14:26:31 +0000444 g_num_neighbourhoods = GPR_CLAMP(gpr_cpu_num_cores(), 1, MAX_NEIGHBOURHOODS);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700445 g_neighbourhoods =
446 gpr_zalloc(sizeof(*g_neighbourhoods) * g_num_neighbourhoods);
447 for (size_t i = 0; i < g_num_neighbourhoods; i++) {
448 gpr_mu_init(&g_neighbourhoods[i].mu);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700449 }
Craig Tiller4509c472017-04-27 19:05:13 +0000450 return GRPC_ERROR_NONE;
451}
452
453static void pollset_global_shutdown(void) {
Craig Tiller4509c472017-04-27 19:05:13 +0000454 gpr_tls_destroy(&g_current_thread_pollset);
455 gpr_tls_destroy(&g_current_thread_worker);
Craig Tiller375eb252017-04-27 23:29:12 +0000456 if (global_wakeup_fd.read_fd != -1) grpc_wakeup_fd_destroy(&global_wakeup_fd);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700457 for (size_t i = 0; i < g_num_neighbourhoods; i++) {
458 gpr_mu_destroy(&g_neighbourhoods[i].mu);
459 }
460 gpr_free(g_neighbourhoods);
Craig Tiller4509c472017-04-27 19:05:13 +0000461}
462
463static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
Craig Tiller6de05932017-04-28 09:17:38 -0700464 gpr_mu_init(&pollset->mu);
465 *mu = &pollset->mu;
Craig Tillerba550da2017-05-01 14:26:31 +0000466 pollset->neighbourhood = &g_neighbourhoods[choose_neighbourhood()];
Sree Kuchibhotla30882302017-08-16 13:46:52 -0700467 pollset->reassigning_neighbourhood = false;
468 pollset->root_worker = NULL;
469 pollset->kicked_without_poller = false;
Craig Tiller6de05932017-04-28 09:17:38 -0700470 pollset->seen_inactive = true;
Sree Kuchibhotla30882302017-08-16 13:46:52 -0700471 pollset->shutting_down = false;
472 pollset->shutdown_closure = NULL;
473 pollset->begin_refs = 0;
474 pollset->next = pollset->prev = NULL;
Craig Tiller6de05932017-04-28 09:17:38 -0700475}
476
Craig Tillerc6109852017-05-01 14:26:49 -0700477static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
Craig Tillere00d7332017-05-01 15:43:51 +0000478 gpr_mu_lock(&pollset->mu);
Craig Tillerba550da2017-05-01 14:26:31 +0000479 if (!pollset->seen_inactive) {
Craig Tillere00d7332017-05-01 15:43:51 +0000480 pollset_neighbourhood *neighbourhood = pollset->neighbourhood;
481 gpr_mu_unlock(&pollset->mu);
Craig Tillera95bacf2017-05-01 12:51:24 -0700482 retry_lock_neighbourhood:
Craig Tillere00d7332017-05-01 15:43:51 +0000483 gpr_mu_lock(&neighbourhood->mu);
484 gpr_mu_lock(&pollset->mu);
485 if (!pollset->seen_inactive) {
486 if (pollset->neighbourhood != neighbourhood) {
487 gpr_mu_unlock(&neighbourhood->mu);
488 neighbourhood = pollset->neighbourhood;
489 gpr_mu_unlock(&pollset->mu);
490 goto retry_lock_neighbourhood;
491 }
492 pollset->prev->next = pollset->next;
493 pollset->next->prev = pollset->prev;
494 if (pollset == pollset->neighbourhood->active_root) {
495 pollset->neighbourhood->active_root =
496 pollset->next == pollset ? NULL : pollset->next;
497 }
Craig Tillerba550da2017-05-01 14:26:31 +0000498 }
499 gpr_mu_unlock(&pollset->neighbourhood->mu);
Craig Tiller6de05932017-04-28 09:17:38 -0700500 }
Craig Tillere00d7332017-05-01 15:43:51 +0000501 gpr_mu_unlock(&pollset->mu);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700502 gpr_mu_destroy(&pollset->mu);
Craig Tiller4509c472017-04-27 19:05:13 +0000503}
504
505static grpc_error *pollset_kick_all(grpc_pollset *pollset) {
yang-gdf92a642017-08-21 22:38:45 -0700506 GPR_TIMER_BEGIN("pollset_kick_all", 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000507 grpc_error *error = GRPC_ERROR_NONE;
508 if (pollset->root_worker != NULL) {
509 grpc_pollset_worker *worker = pollset->root_worker;
510 do {
Craig Tiller55624a32017-05-26 08:14:44 -0700511 switch (worker->kick_state) {
512 case KICKED:
513 break;
514 case UNKICKED:
515 SET_KICK_STATE(worker, KICKED);
516 if (worker->initialized_cv) {
517 gpr_cv_signal(&worker->cv);
518 }
519 break;
520 case DESIGNATED_POLLER:
521 SET_KICK_STATE(worker, KICKED);
522 append_error(&error, grpc_wakeup_fd_wakeup(&global_wakeup_fd),
Sree Kuchibhotla0d8431a2017-07-18 16:21:54 -0700523 "pollset_kick_all");
Craig Tiller55624a32017-05-26 08:14:44 -0700524 break;
Craig Tiller4509c472017-04-27 19:05:13 +0000525 }
526
Craig Tiller32f90ee2017-04-28 12:46:41 -0700527 worker = worker->next;
Craig Tiller4509c472017-04-27 19:05:13 +0000528 } while (worker != pollset->root_worker);
529 }
Sree Kuchibhotla0d8431a2017-07-18 16:21:54 -0700530 // TODO: sreek. Check if we need to set 'kicked_without_poller' to true here
531 // in the else case
yang-gdf92a642017-08-21 22:38:45 -0700532 GPR_TIMER_END("pollset_kick_all", 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000533 return error;
534}
535
536static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx,
537 grpc_pollset *pollset) {
Craig Tillerba550da2017-05-01 14:26:31 +0000538 if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL &&
539 pollset->begin_refs == 0) {
yang-gdf92a642017-08-21 22:38:45 -0700540 GPR_TIMER_MARK("pollset_finish_shutdown", 0);
ncteisen274bbbe2017-06-08 14:57:11 -0700541 GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE);
Craig Tiller4509c472017-04-27 19:05:13 +0000542 pollset->shutdown_closure = NULL;
543 }
544}
545
546static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
547 grpc_closure *closure) {
yang-gdf92a642017-08-21 22:38:45 -0700548 GPR_TIMER_BEGIN("pollset_shutdown", 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000549 GPR_ASSERT(pollset->shutdown_closure == NULL);
Craig Tillerc81512a2017-05-26 09:53:58 -0700550 GPR_ASSERT(!pollset->shutting_down);
Craig Tiller4509c472017-04-27 19:05:13 +0000551 pollset->shutdown_closure = closure;
Craig Tillerc81512a2017-05-26 09:53:58 -0700552 pollset->shutting_down = true;
Craig Tiller4509c472017-04-27 19:05:13 +0000553 GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(pollset));
554 pollset_maybe_finish_shutdown(exec_ctx, pollset);
yang-gdf92a642017-08-21 22:38:45 -0700555 GPR_TIMER_END("pollset_shutdown", 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000556}
557
Craig Tiller4509c472017-04-27 19:05:13 +0000558static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
559 gpr_timespec now) {
560 gpr_timespec timeout;
561 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
562 return -1;
563 }
564
565 if (gpr_time_cmp(deadline, now) <= 0) {
566 return 0;
567 }
568
569 static const gpr_timespec round_up = {
570 .clock_type = GPR_TIMESPAN, .tv_sec = 0, .tv_nsec = GPR_NS_PER_MS - 1};
571 timeout = gpr_time_sub(deadline, now);
572 int millis = gpr_time_to_millis(gpr_time_add(timeout, round_up));
573 return millis >= 1 ? millis : 1;
574}
575
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700576/* Process the epoll events found by do_epoll_wait() function.
577 - g_epoll_set.cursor points to the index of the first event to be processed
578 - This function then processes up-to MAX_EPOLL_EVENTS_PER_ITERATION and
579 updates the g_epoll_set.cursor
Craig Tiller4509c472017-04-27 19:05:13 +0000580
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700581 NOTE ON SYNCRHONIZATION: Similar to do_epoll_wait(), this function is only
582 called by g_active_poller thread. So there is no need for synchronization
583 when accessing fields in g_epoll_set */
584static grpc_error *process_epoll_events(grpc_exec_ctx *exec_ctx,
585 grpc_pollset *pollset) {
586 static const char *err_desc = "process_events";
Craig Tiller4509c472017-04-27 19:05:13 +0000587 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700588
Sree Kuchibhotla3d609f12017-08-25 10:00:18 -0700589 GPR_TIMER_BEGIN("process_epoll_events", 0);
Sree Kuchibhotlaa92a9cc2017-08-27 14:02:15 -0700590 long num_events = gpr_atm_acq_load(&g_epoll_set.num_events);
591 long cursor = gpr_atm_acq_load(&g_epoll_set.cursor);
592 for (int idx = 0;
593 (idx < MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION) && cursor != num_events;
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700594 idx++) {
Sree Kuchibhotlaa92a9cc2017-08-27 14:02:15 -0700595 long c = cursor++;
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700596 struct epoll_event *ev = &g_epoll_set.events[c];
597 void *data_ptr = ev->data.ptr;
598
Craig Tiller4509c472017-04-27 19:05:13 +0000599 if (data_ptr == &global_wakeup_fd) {
Craig Tiller4509c472017-04-27 19:05:13 +0000600 append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
601 err_desc);
602 } else {
603 grpc_fd *fd = (grpc_fd *)(data_ptr);
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700604 bool cancel = (ev->events & (EPOLLERR | EPOLLHUP)) != 0;
605 bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0;
606 bool write_ev = (ev->events & EPOLLOUT) != 0;
607
Craig Tiller4509c472017-04-27 19:05:13 +0000608 if (read_ev || cancel) {
609 fd_become_readable(exec_ctx, fd, pollset);
610 }
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700611
Craig Tiller4509c472017-04-27 19:05:13 +0000612 if (write_ev || cancel) {
613 fd_become_writable(exec_ctx, fd);
614 }
615 }
616 }
Sree Kuchibhotlaa92a9cc2017-08-27 14:02:15 -0700617 gpr_atm_rel_store(&g_epoll_set.cursor, cursor);
Sree Kuchibhotla3d609f12017-08-25 10:00:18 -0700618 GPR_TIMER_END("process_epoll_events", 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000619 return error;
620}
621
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700622/* Do epoll_wait and store the events in g_epoll_set.events field. This does not
623 "process" any of the events yet; that is done in process_epoll_events().
624 *See process_epoll_events() function for more details.
625
626 NOTE ON SYNCHRONIZATION: At any point of time, only the g_active_poller
627 (i.e the designated poller thread) will be calling this function. So there is
628 no need for any synchronization when accesing fields in g_epoll_set */
629static grpc_error *do_epoll_wait(grpc_exec_ctx *exec_ctx, grpc_pollset *ps,
630 gpr_timespec now, gpr_timespec deadline) {
Sree Kuchibhotla3d609f12017-08-25 10:00:18 -0700631 GPR_TIMER_BEGIN("do_epoll_wait", 0);
632
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700633 int r;
634 int timeout = poll_deadline_to_millis_timeout(deadline, now);
635 if (timeout != 0) {
636 GRPC_SCHEDULING_START_BLOCKING_REGION;
637 }
638 do {
Craig Tillerb4bb1cd2017-07-20 14:18:17 -0700639 GRPC_STATS_INC_SYSCALL_POLL(exec_ctx);
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700640 r = epoll_wait(g_epoll_set.epfd, g_epoll_set.events, MAX_EPOLL_EVENTS,
641 timeout);
642 } while (r < 0 && errno == EINTR);
643 if (timeout != 0) {
644 GRPC_SCHEDULING_END_BLOCKING_REGION;
645 }
646
647 if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
648
649 if (GRPC_TRACER_ON(grpc_polling_trace)) {
650 gpr_log(GPR_DEBUG, "ps: %p poll got %d events", ps, r);
651 }
652
Sree Kuchibhotlaa92a9cc2017-08-27 14:02:15 -0700653 gpr_atm_rel_store(&g_epoll_set.num_events, r);
654 gpr_atm_rel_store(&g_epoll_set.cursor, 0);
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700655
Sree Kuchibhotla3d609f12017-08-25 10:00:18 -0700656 GPR_TIMER_END("do_epoll_wait", 0);
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700657 return GRPC_ERROR_NONE;
658}
659
Craig Tiller4509c472017-04-27 19:05:13 +0000660static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
661 grpc_pollset_worker **worker_hdl, gpr_timespec *now,
662 gpr_timespec deadline) {
yang-gdf92a642017-08-21 22:38:45 -0700663 GPR_TIMER_BEGIN("begin_worker", 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000664 if (worker_hdl != NULL) *worker_hdl = worker;
665 worker->initialized_cv = false;
Craig Tiller55624a32017-05-26 08:14:44 -0700666 SET_KICK_STATE(worker, UNKICKED);
Craig Tiller50da5ec2017-05-01 13:51:14 -0700667 worker->schedule_on_end_work = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
Craig Tillerba550da2017-05-01 14:26:31 +0000668 pollset->begin_refs++;
Craig Tiller4509c472017-04-27 19:05:13 +0000669
Craig Tiller830e82a2017-05-31 16:26:27 -0700670 if (GRPC_TRACER_ON(grpc_polling_trace)) {
671 gpr_log(GPR_ERROR, "PS:%p BEGIN_STARTS:%p", pollset, worker);
672 }
673
Craig Tiller32f90ee2017-04-28 12:46:41 -0700674 if (pollset->seen_inactive) {
675 // pollset has been observed to be inactive, we need to move back to the
676 // active list
Craig Tillere00d7332017-05-01 15:43:51 +0000677 bool is_reassigning = false;
678 if (!pollset->reassigning_neighbourhood) {
679 is_reassigning = true;
680 pollset->reassigning_neighbourhood = true;
681 pollset->neighbourhood = &g_neighbourhoods[choose_neighbourhood()];
682 }
683 pollset_neighbourhood *neighbourhood = pollset->neighbourhood;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700684 gpr_mu_unlock(&pollset->mu);
Craig Tillerba550da2017-05-01 14:26:31 +0000685 // pollset unlocked: state may change (even worker->kick_state)
686 retry_lock_neighbourhood:
Craig Tiller32f90ee2017-04-28 12:46:41 -0700687 gpr_mu_lock(&neighbourhood->mu);
688 gpr_mu_lock(&pollset->mu);
Craig Tiller830e82a2017-05-31 16:26:27 -0700689 if (GRPC_TRACER_ON(grpc_polling_trace)) {
690 gpr_log(GPR_ERROR, "PS:%p BEGIN_REORG:%p kick_state=%s is_reassigning=%d",
691 pollset, worker, kick_state_string(worker->kick_state),
692 is_reassigning);
693 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700694 if (pollset->seen_inactive) {
Craig Tiller2acab6e2017-04-30 23:06:33 +0000695 if (neighbourhood != pollset->neighbourhood) {
696 gpr_mu_unlock(&neighbourhood->mu);
697 neighbourhood = pollset->neighbourhood;
698 gpr_mu_unlock(&pollset->mu);
699 goto retry_lock_neighbourhood;
700 }
Sree Kuchibhotla0d8431a2017-07-18 16:21:54 -0700701
Sree Kuchibhotlafb349402017-09-06 10:58:06 -0700702 /* In the brief time we released the pollset locks above, the worker MAY
703 have been kicked. In this case, the worker should get out of this
704 pollset ASAP and hence this should neither add the pollset to
705 neighbourhood nor mark the pollset as active.
706
707 On a side note, the only way a worker's kick state could have changed
708 at this point is if it were "kicked specifically". Since the worker has
709 not added itself to the pollset yet (by calling worker_insert()), it is
710 not visible in the "kick any" path yet */
711 if (worker->kick_state == UNKICKED) {
712 pollset->seen_inactive = false;
713 if (neighbourhood->active_root == NULL) {
714 neighbourhood->active_root = pollset->next = pollset->prev = pollset;
715 /* Make this the designated poller if there isn't one already */
716 if (worker->kick_state == UNKICKED &&
717 gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)worker)) {
718 SET_KICK_STATE(worker, DESIGNATED_POLLER);
719 }
720 } else {
721 pollset->next = neighbourhood->active_root;
722 pollset->prev = pollset->next->prev;
723 pollset->next->prev = pollset->prev->next = pollset;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700724 }
Craig Tiller4509c472017-04-27 19:05:13 +0000725 }
726 }
Craig Tillere00d7332017-05-01 15:43:51 +0000727 if (is_reassigning) {
728 GPR_ASSERT(pollset->reassigning_neighbourhood);
729 pollset->reassigning_neighbourhood = false;
730 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700731 gpr_mu_unlock(&neighbourhood->mu);
732 }
Sree Kuchibhotlae6506bc2017-07-18 21:43:45 -0700733
Craig Tiller32f90ee2017-04-28 12:46:41 -0700734 worker_insert(pollset, worker);
Craig Tillerba550da2017-05-01 14:26:31 +0000735 pollset->begin_refs--;
Sree Kuchibhotla949d0752017-07-20 23:49:15 -0700736 if (worker->kick_state == UNKICKED && !pollset->kicked_without_poller) {
Craig Tillera4b8eb02017-04-29 00:13:52 +0000737 GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700738 worker->initialized_cv = true;
739 gpr_cv_init(&worker->cv);
Craig Tillerc81512a2017-05-26 09:53:58 -0700740 while (worker->kick_state == UNKICKED && !pollset->shutting_down) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700741 if (GRPC_TRACER_ON(grpc_polling_trace)) {
742 gpr_log(GPR_ERROR, "PS:%p BEGIN_WAIT:%p kick_state=%s shutdown=%d",
743 pollset, worker, kick_state_string(worker->kick_state),
744 pollset->shutting_down);
745 }
Sree Kuchibhotla0d8431a2017-07-18 16:21:54 -0700746
Craig Tiller32f90ee2017-04-28 12:46:41 -0700747 if (gpr_cv_wait(&worker->cv, &pollset->mu, deadline) &&
748 worker->kick_state == UNKICKED) {
Sree Kuchibhotla0d8431a2017-07-18 16:21:54 -0700749 /* If gpr_cv_wait returns true (i.e a timeout), pretend that the worker
750 received a kick */
Craig Tiller55624a32017-05-26 08:14:44 -0700751 SET_KICK_STATE(worker, KICKED);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700752 }
Craig Tillerba550da2017-05-01 14:26:31 +0000753 }
Craig Tiller4509c472017-04-27 19:05:13 +0000754 *now = gpr_now(now->clock_type);
755 }
Sree Kuchibhotla949d0752017-07-20 23:49:15 -0700756
Craig Tiller830e82a2017-05-31 16:26:27 -0700757 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Sree Kuchibhotla949d0752017-07-20 23:49:15 -0700758 gpr_log(GPR_ERROR,
759 "PS:%p BEGIN_DONE:%p kick_state=%s shutdown=%d "
760 "kicked_without_poller: %d",
761 pollset, worker, kick_state_string(worker->kick_state),
762 pollset->shutting_down, pollset->kicked_without_poller);
Craig Tiller830e82a2017-05-31 16:26:27 -0700763 }
Craig Tiller4509c472017-04-27 19:05:13 +0000764
Sree Kuchibhotlae6506bc2017-07-18 21:43:45 -0700765 /* We release pollset lock in this function at a couple of places:
Sree Kuchibhotlaa0616ef2017-07-18 23:49:49 -0700766 * 1. Briefly when assigning pollset to a neighbourhood
Sree Kuchibhotlae6506bc2017-07-18 21:43:45 -0700767 * 2. When doing gpr_cv_wait()
768 * It is possible that 'kicked_without_poller' was set to true during (1) and
769 * 'shutting_down' is set to true during (1) or (2). If either of them is
770 * true, this worker cannot do polling */
Sree Kuchibhotlae6506bc2017-07-18 21:43:45 -0700771 /* TODO(sreek): Perhaps there is a better way to handle kicked_without_poller
772 * case; especially when the worker is the DESIGNATED_POLLER */
773
Sree Kuchibhotlaa0616ef2017-07-18 23:49:49 -0700774 if (pollset->kicked_without_poller) {
775 pollset->kicked_without_poller = false;
yang-gdf92a642017-08-21 22:38:45 -0700776 GPR_TIMER_END("begin_worker", 0);
Sree Kuchibhotlaa0616ef2017-07-18 23:49:49 -0700777 return false;
778 }
779
yang-gdf92a642017-08-21 22:38:45 -0700780 GPR_TIMER_END("begin_worker", 0);
Sree Kuchibhotlaa0616ef2017-07-18 23:49:49 -0700781 return worker->kick_state == DESIGNATED_POLLER && !pollset->shutting_down;
Craig Tiller4509c472017-04-27 19:05:13 +0000782}
783
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700784static bool check_neighbourhood_for_available_poller(
Craig Tillera4b8eb02017-04-29 00:13:52 +0000785 pollset_neighbourhood *neighbourhood) {
yang-gdf92a642017-08-21 22:38:45 -0700786 GPR_TIMER_BEGIN("check_neighbourhood_for_available_poller", 0);
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700787 bool found_worker = false;
788 do {
789 grpc_pollset *inspect = neighbourhood->active_root;
790 if (inspect == NULL) {
791 break;
792 }
793 gpr_mu_lock(&inspect->mu);
794 GPR_ASSERT(!inspect->seen_inactive);
795 grpc_pollset_worker *inspect_worker = inspect->root_worker;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700796 if (inspect_worker != NULL) {
Craig Tillera4b8eb02017-04-29 00:13:52 +0000797 do {
Craig Tillerba550da2017-05-01 14:26:31 +0000798 switch (inspect_worker->kick_state) {
799 case UNKICKED:
800 if (gpr_atm_no_barrier_cas(&g_active_poller, 0,
801 (gpr_atm)inspect_worker)) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700802 if (GRPC_TRACER_ON(grpc_polling_trace)) {
803 gpr_log(GPR_DEBUG, " .. choose next poller to be %p",
804 inspect_worker);
805 }
Craig Tiller55624a32017-05-26 08:14:44 -0700806 SET_KICK_STATE(inspect_worker, DESIGNATED_POLLER);
Craig Tillerba550da2017-05-01 14:26:31 +0000807 if (inspect_worker->initialized_cv) {
yang-gdf92a642017-08-21 22:38:45 -0700808 GPR_TIMER_MARK("signal worker", 0);
Craig Tillerba550da2017-05-01 14:26:31 +0000809 gpr_cv_signal(&inspect_worker->cv);
810 }
Craig Tiller830e82a2017-05-31 16:26:27 -0700811 } else {
812 if (GRPC_TRACER_ON(grpc_polling_trace)) {
813 gpr_log(GPR_DEBUG, " .. beaten to choose next poller");
814 }
Craig Tillera4b8eb02017-04-29 00:13:52 +0000815 }
Craig Tillerba550da2017-05-01 14:26:31 +0000816 // even if we didn't win the cas, there's a worker, we can stop
817 found_worker = true;
818 break;
819 case KICKED:
820 break;
821 case DESIGNATED_POLLER:
822 found_worker = true; // ok, so someone else found the worker, but
823 // we'll accept that
824 break;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700825 }
Craig Tillera4b8eb02017-04-29 00:13:52 +0000826 inspect_worker = inspect_worker->next;
Craig Tiller830e82a2017-05-31 16:26:27 -0700827 } while (!found_worker && inspect_worker != inspect->root_worker);
Craig Tillera4b8eb02017-04-29 00:13:52 +0000828 }
829 if (!found_worker) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700830 if (GRPC_TRACER_ON(grpc_polling_trace)) {
831 gpr_log(GPR_DEBUG, " .. mark pollset %p inactive", inspect);
832 }
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700833 inspect->seen_inactive = true;
Craig Tiller2acab6e2017-04-30 23:06:33 +0000834 if (inspect == neighbourhood->active_root) {
Craig Tillera95bacf2017-05-01 12:51:24 -0700835 neighbourhood->active_root =
836 inspect->next == inspect ? NULL : inspect->next;
Craig Tiller2acab6e2017-04-30 23:06:33 +0000837 }
838 inspect->next->prev = inspect->prev;
839 inspect->prev->next = inspect->next;
Craig Tillere00d7332017-05-01 15:43:51 +0000840 inspect->next = inspect->prev = NULL;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700841 }
842 gpr_mu_unlock(&inspect->mu);
843 } while (!found_worker);
yang-gdf92a642017-08-21 22:38:45 -0700844 GPR_TIMER_END("check_neighbourhood_for_available_poller", 0);
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700845 return found_worker;
846}
847
Craig Tiller4509c472017-04-27 19:05:13 +0000848static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
849 grpc_pollset_worker *worker,
850 grpc_pollset_worker **worker_hdl) {
yang-gdf92a642017-08-21 22:38:45 -0700851 GPR_TIMER_BEGIN("end_worker", 0);
Craig Tiller830e82a2017-05-31 16:26:27 -0700852 if (GRPC_TRACER_ON(grpc_polling_trace)) {
853 gpr_log(GPR_DEBUG, "PS:%p END_WORKER:%p", pollset, worker);
854 }
Craig Tiller8502ecb2017-04-28 14:22:01 -0700855 if (worker_hdl != NULL) *worker_hdl = NULL;
Craig Tiller830e82a2017-05-31 16:26:27 -0700856 /* Make sure we appear kicked */
Craig Tiller55624a32017-05-26 08:14:44 -0700857 SET_KICK_STATE(worker, KICKED);
Craig Tiller50da5ec2017-05-01 13:51:14 -0700858 grpc_closure_list_move(&worker->schedule_on_end_work,
859 &exec_ctx->closure_list);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700860 if (gpr_atm_no_barrier_load(&g_active_poller) == (gpr_atm)worker) {
Craig Tillera4b8eb02017-04-29 00:13:52 +0000861 if (worker->next != worker && worker->next->kick_state == UNKICKED) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700862 if (GRPC_TRACER_ON(grpc_polling_trace)) {
863 gpr_log(GPR_DEBUG, " .. choose next poller to be peer %p", worker);
864 }
Craig Tiller2acab6e2017-04-30 23:06:33 +0000865 GPR_ASSERT(worker->next->initialized_cv);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700866 gpr_atm_no_barrier_store(&g_active_poller, (gpr_atm)worker->next);
Craig Tiller55624a32017-05-26 08:14:44 -0700867 SET_KICK_STATE(worker->next, DESIGNATED_POLLER);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700868 gpr_cv_signal(&worker->next->cv);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700869 if (grpc_exec_ctx_has_work(exec_ctx)) {
870 gpr_mu_unlock(&pollset->mu);
871 grpc_exec_ctx_flush(exec_ctx);
872 gpr_mu_lock(&pollset->mu);
873 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700874 } else {
875 gpr_atm_no_barrier_store(&g_active_poller, 0);
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700876 size_t poller_neighbourhood_idx =
877 (size_t)(pollset->neighbourhood - g_neighbourhoods);
Craig Tillerbb742672017-05-17 22:19:05 +0000878 gpr_mu_unlock(&pollset->mu);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700879 bool found_worker = false;
Craig Tillerba550da2017-05-01 14:26:31 +0000880 bool scan_state[MAX_NEIGHBOURHOODS];
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700881 for (size_t i = 0; !found_worker && i < g_num_neighbourhoods; i++) {
882 pollset_neighbourhood *neighbourhood =
883 &g_neighbourhoods[(poller_neighbourhood_idx + i) %
884 g_num_neighbourhoods];
885 if (gpr_mu_trylock(&neighbourhood->mu)) {
886 found_worker =
Craig Tillera4b8eb02017-04-29 00:13:52 +0000887 check_neighbourhood_for_available_poller(neighbourhood);
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700888 gpr_mu_unlock(&neighbourhood->mu);
Craig Tillerba550da2017-05-01 14:26:31 +0000889 scan_state[i] = true;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700890 } else {
Craig Tillerba550da2017-05-01 14:26:31 +0000891 scan_state[i] = false;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700892 }
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700893 }
Craig Tiller2acab6e2017-04-30 23:06:33 +0000894 for (size_t i = 0; !found_worker && i < g_num_neighbourhoods; i++) {
Craig Tillerba550da2017-05-01 14:26:31 +0000895 if (scan_state[i]) continue;
Craig Tiller2acab6e2017-04-30 23:06:33 +0000896 pollset_neighbourhood *neighbourhood =
897 &g_neighbourhoods[(poller_neighbourhood_idx + i) %
898 g_num_neighbourhoods];
899 gpr_mu_lock(&neighbourhood->mu);
Craig Tillerba550da2017-05-01 14:26:31 +0000900 found_worker = check_neighbourhood_for_available_poller(neighbourhood);
Craig Tiller2acab6e2017-04-30 23:06:33 +0000901 gpr_mu_unlock(&neighbourhood->mu);
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700902 }
Craig Tiller8502ecb2017-04-28 14:22:01 -0700903 grpc_exec_ctx_flush(exec_ctx);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700904 gpr_mu_lock(&pollset->mu);
905 }
Craig Tiller50da5ec2017-05-01 13:51:14 -0700906 } else if (grpc_exec_ctx_has_work(exec_ctx)) {
907 gpr_mu_unlock(&pollset->mu);
908 grpc_exec_ctx_flush(exec_ctx);
909 gpr_mu_lock(&pollset->mu);
Craig Tiller4509c472017-04-27 19:05:13 +0000910 }
911 if (worker->initialized_cv) {
912 gpr_cv_destroy(&worker->cv);
913 }
Craig Tiller830e82a2017-05-31 16:26:27 -0700914 if (GRPC_TRACER_ON(grpc_polling_trace)) {
915 gpr_log(GPR_DEBUG, " .. remove worker");
916 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700917 if (EMPTIED == worker_remove(pollset, worker)) {
Craig Tiller4509c472017-04-27 19:05:13 +0000918 pollset_maybe_finish_shutdown(exec_ctx, pollset);
919 }
Craig Tillera4b8eb02017-04-29 00:13:52 +0000920 GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
yang-gdf92a642017-08-21 22:38:45 -0700921 GPR_TIMER_END("end_worker", 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000922}
923
924/* pollset->po.mu lock must be held by the caller before calling this.
925 The function pollset_work() may temporarily release the lock (pollset->po.mu)
926 during the course of its execution but it will always re-acquire the lock and
927 ensure that it is held by the time the function returns */
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700928static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *ps,
Craig Tiller4509c472017-04-27 19:05:13 +0000929 grpc_pollset_worker **worker_hdl,
930 gpr_timespec now, gpr_timespec deadline) {
931 grpc_pollset_worker worker;
932 grpc_error *error = GRPC_ERROR_NONE;
933 static const char *err_desc = "pollset_work";
yang-gdf92a642017-08-21 22:38:45 -0700934 GPR_TIMER_BEGIN("pollset_work", 0);
Sree Kuchibhotlab154cd12017-08-25 10:33:41 -0700935 if (ps->kicked_without_poller) {
936 ps->kicked_without_poller = false;
yang-gdf92a642017-08-21 22:38:45 -0700937 GPR_TIMER_END("pollset_work", 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000938 return GRPC_ERROR_NONE;
939 }
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700940
941 if (begin_worker(ps, &worker, worker_hdl, &now, deadline)) {
942 gpr_tls_set(&g_current_thread_pollset, (intptr_t)ps);
Craig Tiller4509c472017-04-27 19:05:13 +0000943 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700944 GPR_ASSERT(!ps->shutting_down);
945 GPR_ASSERT(!ps->seen_inactive);
946
947 gpr_mu_unlock(&ps->mu); /* unlock */
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700948 /* This is the designated polling thread at this point and should ideally do
949 polling. However, if there are unprocessed events left from a previous
950 call to do_epoll_wait(), skip calling epoll_wait() in this iteration and
951 process the pending epoll events.
952
953 The reason for decoupling do_epoll_wait and process_epoll_events is to
954 better distrubute the work (i.e handling epoll events) across multiple
955 threads
956
957 process_epoll_events() returns very quickly: It just queues the work on
958 exec_ctx but does not execute it (the actual exectution or more
959 accurately grpc_exec_ctx_flush() happens in end_worker() AFTER selecting
960 a designated poller). So we are not waiting long periods without a
961 designated poller */
Sree Kuchibhotlaa92a9cc2017-08-27 14:02:15 -0700962 if (gpr_atm_acq_load(&g_epoll_set.cursor) ==
963 gpr_atm_acq_load(&g_epoll_set.num_events)) {
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700964 append_error(&error, do_epoll_wait(exec_ctx, ps, now, deadline),
965 err_desc);
966 }
967 append_error(&error, process_epoll_events(exec_ctx, ps), err_desc);
968
969 gpr_mu_lock(&ps->mu); /* lock */
970
Craig Tiller4509c472017-04-27 19:05:13 +0000971 gpr_tls_set(&g_current_thread_worker, 0);
Craig Tiller830e82a2017-05-31 16:26:27 -0700972 } else {
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700973 gpr_tls_set(&g_current_thread_pollset, (intptr_t)ps);
Craig Tiller4509c472017-04-27 19:05:13 +0000974 }
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700975 end_worker(exec_ctx, ps, &worker, worker_hdl);
976
Craig Tiller8502ecb2017-04-28 14:22:01 -0700977 gpr_tls_set(&g_current_thread_pollset, 0);
yang-gdf92a642017-08-21 22:38:45 -0700978 GPR_TIMER_END("pollset_work", 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000979 return error;
980}
981
982static grpc_error *pollset_kick(grpc_pollset *pollset,
983 grpc_pollset_worker *specific_worker) {
yang-gdf92a642017-08-21 22:38:45 -0700984 GPR_TIMER_BEGIN("pollset_kick", 0);
985 grpc_error *ret_err = GRPC_ERROR_NONE;
Craig Tillerb89bac02017-05-26 15:20:32 +0000986 if (GRPC_TRACER_ON(grpc_polling_trace)) {
987 gpr_strvec log;
988 gpr_strvec_init(&log);
989 char *tmp;
Craig Tiller75aef7f2017-05-26 08:26:08 -0700990 gpr_asprintf(
991 &tmp, "PS:%p KICK:%p curps=%p curworker=%p root=%p", pollset,
992 specific_worker, (void *)gpr_tls_get(&g_current_thread_pollset),
993 (void *)gpr_tls_get(&g_current_thread_worker), pollset->root_worker);
Craig Tillerb89bac02017-05-26 15:20:32 +0000994 gpr_strvec_add(&log, tmp);
995 if (pollset->root_worker != NULL) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700996 gpr_asprintf(&tmp, " {kick_state=%s next=%p {kick_state=%s}}",
997 kick_state_string(pollset->root_worker->kick_state),
998 pollset->root_worker->next,
999 kick_state_string(pollset->root_worker->next->kick_state));
Craig Tillerb89bac02017-05-26 15:20:32 +00001000 gpr_strvec_add(&log, tmp);
1001 }
1002 if (specific_worker != NULL) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001003 gpr_asprintf(&tmp, " worker_kick_state=%s",
1004 kick_state_string(specific_worker->kick_state));
Craig Tillerb89bac02017-05-26 15:20:32 +00001005 gpr_strvec_add(&log, tmp);
1006 }
1007 tmp = gpr_strvec_flatten(&log, NULL);
1008 gpr_strvec_destroy(&log);
Craig Tiller830e82a2017-05-31 16:26:27 -07001009 gpr_log(GPR_ERROR, "%s", tmp);
Craig Tillerb89bac02017-05-26 15:20:32 +00001010 gpr_free(tmp);
1011 }
Sree Kuchibhotlafb349402017-09-06 10:58:06 -07001012
Craig Tiller4509c472017-04-27 19:05:13 +00001013 if (specific_worker == NULL) {
1014 if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) {
Craig Tiller375eb252017-04-27 23:29:12 +00001015 grpc_pollset_worker *root_worker = pollset->root_worker;
1016 if (root_worker == NULL) {
Craig Tiller4509c472017-04-27 19:05:13 +00001017 pollset->kicked_without_poller = true;
Craig Tiller75aef7f2017-05-26 08:26:08 -07001018 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001019 gpr_log(GPR_ERROR, " .. kicked_without_poller");
Craig Tiller75aef7f2017-05-26 08:26:08 -07001020 }
yang-gdf92a642017-08-21 22:38:45 -07001021 goto done;
Craig Tiller375eb252017-04-27 23:29:12 +00001022 }
Craig Tiller32f90ee2017-04-28 12:46:41 -07001023 grpc_pollset_worker *next_worker = root_worker->next;
Craig Tiller830e82a2017-05-31 16:26:27 -07001024 if (root_worker->kick_state == KICKED) {
Craig Tiller75aef7f2017-05-26 08:26:08 -07001025 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001026 gpr_log(GPR_ERROR, " .. already kicked %p", root_worker);
1027 }
1028 SET_KICK_STATE(root_worker, KICKED);
yang-gdf92a642017-08-21 22:38:45 -07001029 goto done;
Craig Tiller830e82a2017-05-31 16:26:27 -07001030 } else if (next_worker->kick_state == KICKED) {
1031 if (GRPC_TRACER_ON(grpc_polling_trace)) {
1032 gpr_log(GPR_ERROR, " .. already kicked %p", next_worker);
1033 }
1034 SET_KICK_STATE(next_worker, KICKED);
yang-gdf92a642017-08-21 22:38:45 -07001035 goto done;
Craig Tiller830e82a2017-05-31 16:26:27 -07001036 } else if (root_worker ==
1037 next_worker && // only try and wake up a poller if
1038 // there is no next worker
1039 root_worker == (grpc_pollset_worker *)gpr_atm_no_barrier_load(
1040 &g_active_poller)) {
1041 if (GRPC_TRACER_ON(grpc_polling_trace)) {
1042 gpr_log(GPR_ERROR, " .. kicked %p", root_worker);
Craig Tiller75aef7f2017-05-26 08:26:08 -07001043 }
Craig Tiller55624a32017-05-26 08:14:44 -07001044 SET_KICK_STATE(root_worker, KICKED);
yang-gdf92a642017-08-21 22:38:45 -07001045 ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1046 goto done;
Craig Tiller8502ecb2017-04-28 14:22:01 -07001047 } else if (next_worker->kick_state == UNKICKED) {
Craig Tiller75aef7f2017-05-26 08:26:08 -07001048 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001049 gpr_log(GPR_ERROR, " .. kicked %p", next_worker);
Craig Tiller75aef7f2017-05-26 08:26:08 -07001050 }
Craig Tiller8502ecb2017-04-28 14:22:01 -07001051 GPR_ASSERT(next_worker->initialized_cv);
Craig Tiller55624a32017-05-26 08:14:44 -07001052 SET_KICK_STATE(next_worker, KICKED);
Craig Tiller375eb252017-04-27 23:29:12 +00001053 gpr_cv_signal(&next_worker->cv);
yang-gdf92a642017-08-21 22:38:45 -07001054 goto done;
Craig Tiller55624a32017-05-26 08:14:44 -07001055 } else if (next_worker->kick_state == DESIGNATED_POLLER) {
1056 if (root_worker->kick_state != DESIGNATED_POLLER) {
Craig Tiller75aef7f2017-05-26 08:26:08 -07001057 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001058 gpr_log(
1059 GPR_ERROR,
1060 " .. kicked root non-poller %p (initialized_cv=%d) (poller=%p)",
1061 root_worker, root_worker->initialized_cv, next_worker);
Craig Tiller75aef7f2017-05-26 08:26:08 -07001062 }
Craig Tiller55624a32017-05-26 08:14:44 -07001063 SET_KICK_STATE(root_worker, KICKED);
1064 if (root_worker->initialized_cv) {
1065 gpr_cv_signal(&root_worker->cv);
1066 }
yang-gdf92a642017-08-21 22:38:45 -07001067 goto done;
Craig Tiller55624a32017-05-26 08:14:44 -07001068 } else {
Craig Tiller75aef7f2017-05-26 08:26:08 -07001069 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001070 gpr_log(GPR_ERROR, " .. non-root poller %p (root=%p)", next_worker,
Craig Tiller75aef7f2017-05-26 08:26:08 -07001071 root_worker);
1072 }
Craig Tiller55624a32017-05-26 08:14:44 -07001073 SET_KICK_STATE(next_worker, KICKED);
yang-gdf92a642017-08-21 22:38:45 -07001074 ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1075 goto done;
Craig Tiller55624a32017-05-26 08:14:44 -07001076 }
Craig Tiller8502ecb2017-04-28 14:22:01 -07001077 } else {
Craig Tiller55624a32017-05-26 08:14:44 -07001078 GPR_ASSERT(next_worker->kick_state == KICKED);
1079 SET_KICK_STATE(next_worker, KICKED);
yang-gdf92a642017-08-21 22:38:45 -07001080 goto done;
Craig Tiller4509c472017-04-27 19:05:13 +00001081 }
1082 } else {
Craig Tiller830e82a2017-05-31 16:26:27 -07001083 if (GRPC_TRACER_ON(grpc_polling_trace)) {
1084 gpr_log(GPR_ERROR, " .. kicked while waking up");
1085 }
yang-gdf92a642017-08-21 22:38:45 -07001086 goto done;
Craig Tiller4509c472017-04-27 19:05:13 +00001087 }
Sree Kuchibhotlafb349402017-09-06 10:58:06 -07001088
1089 GPR_UNREACHABLE_CODE(goto done);
1090 }
1091
1092 if (specific_worker->kick_state == KICKED) {
Craig Tiller75aef7f2017-05-26 08:26:08 -07001093 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001094 gpr_log(GPR_ERROR, " .. specific worker already kicked");
Craig Tiller75aef7f2017-05-26 08:26:08 -07001095 }
yang-gdf92a642017-08-21 22:38:45 -07001096 goto done;
Craig Tiller4509c472017-04-27 19:05:13 +00001097 } else if (gpr_tls_get(&g_current_thread_worker) ==
1098 (intptr_t)specific_worker) {
Craig Tiller75aef7f2017-05-26 08:26:08 -07001099 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001100 gpr_log(GPR_ERROR, " .. mark %p kicked", specific_worker);
Craig Tiller75aef7f2017-05-26 08:26:08 -07001101 }
Craig Tiller55624a32017-05-26 08:14:44 -07001102 SET_KICK_STATE(specific_worker, KICKED);
yang-gdf92a642017-08-21 22:38:45 -07001103 goto done;
Craig Tiller32f90ee2017-04-28 12:46:41 -07001104 } else if (specific_worker ==
1105 (grpc_pollset_worker *)gpr_atm_no_barrier_load(&g_active_poller)) {
Craig Tiller75aef7f2017-05-26 08:26:08 -07001106 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001107 gpr_log(GPR_ERROR, " .. kick active poller");
Craig Tiller75aef7f2017-05-26 08:26:08 -07001108 }
Craig Tiller55624a32017-05-26 08:14:44 -07001109 SET_KICK_STATE(specific_worker, KICKED);
yang-gdf92a642017-08-21 22:38:45 -07001110 ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1111 goto done;
Craig Tiller8502ecb2017-04-28 14:22:01 -07001112 } else if (specific_worker->initialized_cv) {
Craig Tiller75aef7f2017-05-26 08:26:08 -07001113 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001114 gpr_log(GPR_ERROR, " .. kick waiting worker");
Craig Tiller75aef7f2017-05-26 08:26:08 -07001115 }
Craig Tiller55624a32017-05-26 08:14:44 -07001116 SET_KICK_STATE(specific_worker, KICKED);
Craig Tiller4509c472017-04-27 19:05:13 +00001117 gpr_cv_signal(&specific_worker->cv);
yang-gdf92a642017-08-21 22:38:45 -07001118 goto done;
Craig Tiller8502ecb2017-04-28 14:22:01 -07001119 } else {
Craig Tiller75aef7f2017-05-26 08:26:08 -07001120 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001121 gpr_log(GPR_ERROR, " .. kick non-waiting worker");
Craig Tiller75aef7f2017-05-26 08:26:08 -07001122 }
Craig Tiller55624a32017-05-26 08:14:44 -07001123 SET_KICK_STATE(specific_worker, KICKED);
yang-gdf92a642017-08-21 22:38:45 -07001124 goto done;
Craig Tiller4509c472017-04-27 19:05:13 +00001125 }
yang-gdf92a642017-08-21 22:38:45 -07001126done:
1127 GPR_TIMER_END("pollset_kick", 0);
1128 return ret_err;
Craig Tiller4509c472017-04-27 19:05:13 +00001129}
1130
1131static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1132 grpc_fd *fd) {}
1133
Craig Tiller4509c472017-04-27 19:05:13 +00001134/*******************************************************************************
Craig Tillerc67cc992017-04-27 10:15:51 -07001135 * Pollset-set Definitions
1136 */
1137
1138static grpc_pollset_set *pollset_set_create(void) {
1139 return (grpc_pollset_set *)((intptr_t)0xdeafbeef);
1140}
1141
1142static void pollset_set_destroy(grpc_exec_ctx *exec_ctx,
1143 grpc_pollset_set *pss) {}
1144
1145static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
1146 grpc_fd *fd) {}
1147
1148static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
1149 grpc_fd *fd) {}
1150
1151static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
1152 grpc_pollset_set *pss, grpc_pollset *ps) {}
1153
1154static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
1155 grpc_pollset_set *pss, grpc_pollset *ps) {}
1156
1157static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
1158 grpc_pollset_set *bag,
1159 grpc_pollset_set *item) {}
1160
1161static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
1162 grpc_pollset_set *bag,
1163 grpc_pollset_set *item) {}
1164
1165/*******************************************************************************
1166 * Event engine binding
1167 */
1168
1169static void shutdown_engine(void) {
1170 fd_global_shutdown();
1171 pollset_global_shutdown();
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -07001172 epoll_set_shutdown();
Craig Tillerc67cc992017-04-27 10:15:51 -07001173}
1174
1175static const grpc_event_engine_vtable vtable = {
1176 .pollset_size = sizeof(grpc_pollset),
1177
1178 .fd_create = fd_create,
1179 .fd_wrapped_fd = fd_wrapped_fd,
1180 .fd_orphan = fd_orphan,
1181 .fd_shutdown = fd_shutdown,
1182 .fd_is_shutdown = fd_is_shutdown,
1183 .fd_notify_on_read = fd_notify_on_read,
1184 .fd_notify_on_write = fd_notify_on_write,
1185 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
Craig Tillerc67cc992017-04-27 10:15:51 -07001186
1187 .pollset_init = pollset_init,
1188 .pollset_shutdown = pollset_shutdown,
1189 .pollset_destroy = pollset_destroy,
1190 .pollset_work = pollset_work,
1191 .pollset_kick = pollset_kick,
1192 .pollset_add_fd = pollset_add_fd,
1193
1194 .pollset_set_create = pollset_set_create,
1195 .pollset_set_destroy = pollset_set_destroy,
1196 .pollset_set_add_pollset = pollset_set_add_pollset,
1197 .pollset_set_del_pollset = pollset_set_del_pollset,
1198 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
1199 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
1200 .pollset_set_add_fd = pollset_set_add_fd,
1201 .pollset_set_del_fd = pollset_set_del_fd,
1202
Craig Tillerc67cc992017-04-27 10:15:51 -07001203 .shutdown_engine = shutdown_engine,
1204};
1205
1206/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -07001207 * Create epoll_fd (epoll_set_init() takes care of that) to make sure epoll
1208 * support is available */
Craig Tiller6f0af492017-04-27 19:26:16 +00001209const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) {
Craig Tillerc67cc992017-04-27 10:15:51 -07001210 if (!grpc_has_wakeup_fd()) {
1211 return NULL;
1212 }
1213
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -07001214 if (!epoll_set_init()) {
Craig Tillerc67cc992017-04-27 10:15:51 -07001215 return NULL;
1216 }
1217
Craig Tillerc67cc992017-04-27 10:15:51 -07001218 fd_global_init();
1219
1220 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
Craig Tiller4509c472017-04-27 19:05:13 +00001221 fd_global_shutdown();
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -07001222 epoll_set_shutdown();
Craig Tillerc67cc992017-04-27 10:15:51 -07001223 return NULL;
1224 }
1225
1226 return &vtable;
1227}
1228
1229#else /* defined(GRPC_LINUX_EPOLL) */
1230#if defined(GRPC_POSIX_SOCKET)
1231#include "src/core/lib/iomgr/ev_posix.h"
1232/* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
1233 * NULL */
Craig Tiller9ddb3152017-04-27 21:32:56 +00001234const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) {
1235 return NULL;
1236}
Craig Tillerc67cc992017-04-27 10:15:51 -07001237#endif /* defined(GRPC_POSIX_SOCKET) */
1238#endif /* !defined(GRPC_LINUX_EPOLL) */