blob: b5066e5f588bb46d85a4d84017e5ac438c08d3f7 [file] [log] [blame]
Craig Tillerc67cc992017-04-27 10:15:51 -07001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2017 gRPC authors.
Craig Tillerc67cc992017-04-27 10:15:51 -07004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
Craig Tillerc67cc992017-04-27 10:15:51 -07008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
Craig Tillerc67cc992017-04-27 10:15:51 -070010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
Craig Tillerc67cc992017-04-27 10:15:51 -070016 *
17 */
18
19#include "src/core/lib/iomgr/port.h"
20
21/* This polling engine is only relevant on linux kernels supporting epoll() */
22#ifdef GRPC_LINUX_EPOLL
23
Craig Tiller4509c472017-04-27 19:05:13 +000024#include "src/core/lib/iomgr/ev_epoll1_linux.h"
Craig Tillerc67cc992017-04-27 10:15:51 -070025
26#include <assert.h>
27#include <errno.h>
28#include <poll.h>
29#include <pthread.h>
30#include <string.h>
31#include <sys/epoll.h>
32#include <sys/socket.h>
33#include <unistd.h>
34
35#include <grpc/support/alloc.h>
Craig Tiller6de05932017-04-28 09:17:38 -070036#include <grpc/support/cpu.h>
Craig Tillerc67cc992017-04-27 10:15:51 -070037#include <grpc/support/log.h>
38#include <grpc/support/string_util.h>
39#include <grpc/support/tls.h>
40#include <grpc/support/useful.h>
41
Craig Tillerb4bb1cd2017-07-20 14:18:17 -070042#include "src/core/lib/debug/stats.h"
Craig Tillerc67cc992017-04-27 10:15:51 -070043#include "src/core/lib/iomgr/ev_posix.h"
44#include "src/core/lib/iomgr/iomgr_internal.h"
45#include "src/core/lib/iomgr/lockfree_event.h"
Craig Tillerc67cc992017-04-27 10:15:51 -070046#include "src/core/lib/iomgr/wakeup_fd_posix.h"
Craig Tillerc67cc992017-04-27 10:15:51 -070047#include "src/core/lib/profiling/timers.h"
48#include "src/core/lib/support/block_annotate.h"
Craig Tillerb89bac02017-05-26 15:20:32 +000049#include "src/core/lib/support/string.h"
Craig Tillerc67cc992017-04-27 10:15:51 -070050
Craig Tillerc67cc992017-04-27 10:15:51 -070051static grpc_wakeup_fd global_wakeup_fd;
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -070052
53/*******************************************************************************
54 * Singleton epoll set related fields
55 */
56
57#define MAX_EPOLL_EVENTS 100
Sree Kuchibhotla19614522017-08-25 17:10:10 -070058#define MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION 1
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -070059
Sree Kuchibhotlae01940f2017-08-27 18:10:12 -070060/* NOTE ON SYNCHRONIZATION:
61 * - Fields in this struct are only modified by the designated poller. Hence
62 * there is no need for any locks to protect the struct.
63 * - num_events and cursor fields have to be of atomic type to provide memory
64 * visibility guarantees only. i.e In case of multiple pollers, the designated
65 * polling thread keeps changing; the thread that wrote these values may be
66 * different from the thread reading the values
67 */
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -070068typedef struct epoll_set {
69 int epfd;
70
71 /* The epoll_events after the last call to epoll_wait() */
72 struct epoll_event events[MAX_EPOLL_EVENTS];
73
74 /* The number of epoll_events after the last call to epoll_wait() */
Sree Kuchibhotlaa92a9cc2017-08-27 14:02:15 -070075 gpr_atm num_events;
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -070076
77 /* Index of the first event in epoll_events that has to be processed. This
78 * field is only valid if num_events > 0 */
Sree Kuchibhotlaa92a9cc2017-08-27 14:02:15 -070079 gpr_atm cursor;
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -070080} epoll_set;
81
82/* The global singleton epoll set */
83static epoll_set g_epoll_set;
84
85/* Must be called *only* once */
86static bool epoll_set_init() {
87 g_epoll_set.epfd = epoll_create1(EPOLL_CLOEXEC);
88 if (g_epoll_set.epfd < 0) {
89 gpr_log(GPR_ERROR, "epoll unavailable");
90 return false;
91 }
92
Sree Kuchibhotlaa92a9cc2017-08-27 14:02:15 -070093 gpr_log(GPR_INFO, "grpc epoll fd: %d", g_epoll_set.epfd);
94 gpr_atm_no_barrier_store(&g_epoll_set.num_events, 0);
95 gpr_atm_no_barrier_store(&g_epoll_set.cursor, 0);
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -070096 return true;
97}
98
99/* epoll_set_init() MUST be called before calling this. */
100static void epoll_set_shutdown() {
101 if (g_epoll_set.epfd >= 0) {
102 close(g_epoll_set.epfd);
103 g_epoll_set.epfd = -1;
104 }
105}
Craig Tillerc67cc992017-04-27 10:15:51 -0700106
107/*******************************************************************************
108 * Fd Declarations
109 */
110
111struct grpc_fd {
112 int fd;
113
Craig Tillerc67cc992017-04-27 10:15:51 -0700114 gpr_atm read_closure;
115 gpr_atm write_closure;
116
117 struct grpc_fd *freelist_next;
Craig Tillerc67cc992017-04-27 10:15:51 -0700118
119 /* The pollset that last noticed that the fd is readable. The actual type
120 * stored in this is (grpc_pollset *) */
121 gpr_atm read_notifier_pollset;
122
123 grpc_iomgr_object iomgr_object;
124};
125
126static void fd_global_init(void);
127static void fd_global_shutdown(void);
128
129/*******************************************************************************
130 * Pollset Declarations
131 */
132
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700133typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state;
Craig Tillerc67cc992017-04-27 10:15:51 -0700134
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700135static const char *kick_state_string(kick_state st) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700136 switch (st) {
137 case UNKICKED:
138 return "UNKICKED";
139 case KICKED:
140 return "KICKED";
141 case DESIGNATED_POLLER:
142 return "DESIGNATED_POLLER";
143 }
144 GPR_UNREACHABLE_CODE(return "UNKNOWN");
145}
146
Craig Tillerc67cc992017-04-27 10:15:51 -0700147struct grpc_pollset_worker {
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700148 kick_state state;
Craig Tiller55624a32017-05-26 08:14:44 -0700149 int kick_state_mutator; // which line of code last changed kick state
Craig Tillerc67cc992017-04-27 10:15:51 -0700150 bool initialized_cv;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700151 grpc_pollset_worker *next;
152 grpc_pollset_worker *prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700153 gpr_cv cv;
Craig Tiller50da5ec2017-05-01 13:51:14 -0700154 grpc_closure_list schedule_on_end_work;
Craig Tillerc67cc992017-04-27 10:15:51 -0700155};
156
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700157#define SET_KICK_STATE(worker, kick_state) \
Craig Tiller55624a32017-05-26 08:14:44 -0700158 do { \
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700159 (worker)->state = (kick_state); \
Craig Tiller55624a32017-05-26 08:14:44 -0700160 (worker)->kick_state_mutator = __LINE__; \
161 } while (false)
162
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700163#define MAX_NEIGHBORHOODS 1024
Craig Tillerba550da2017-05-01 14:26:31 +0000164
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700165typedef struct pollset_neighborhood {
Craig Tiller6de05932017-04-28 09:17:38 -0700166 gpr_mu mu;
167 grpc_pollset *active_root;
Craig Tiller6de05932017-04-28 09:17:38 -0700168 char pad[GPR_CACHELINE_SIZE];
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700169} pollset_neighborhood;
Craig Tiller6de05932017-04-28 09:17:38 -0700170
Craig Tillerc67cc992017-04-27 10:15:51 -0700171struct grpc_pollset {
Craig Tiller6de05932017-04-28 09:17:38 -0700172 gpr_mu mu;
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700173 pollset_neighborhood *neighborhood;
174 bool reassigning_neighborhood;
Craig Tiller4509c472017-04-27 19:05:13 +0000175 grpc_pollset_worker *root_worker;
176 bool kicked_without_poller;
Sree Kuchibhotla0d8431a2017-07-18 16:21:54 -0700177
178 /* Set to true if the pollset is observed to have no workers available to
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700179 poll */
Craig Tiller6de05932017-04-28 09:17:38 -0700180 bool seen_inactive;
Sree Kuchibhotla0d8431a2017-07-18 16:21:54 -0700181 bool shutting_down; /* Is the pollset shutting down ? */
Craig Tiller4509c472017-04-27 19:05:13 +0000182 grpc_closure *shutdown_closure; /* Called after after shutdown is complete */
Sree Kuchibhotla0d8431a2017-07-18 16:21:54 -0700183
184 /* Number of workers who are *about-to* attach themselves to the pollset
185 * worker list */
Craig Tillerba550da2017-05-01 14:26:31 +0000186 int begin_refs;
Craig Tiller6de05932017-04-28 09:17:38 -0700187
188 grpc_pollset *next;
189 grpc_pollset *prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700190};
191
192/*******************************************************************************
193 * Pollset-set Declarations
194 */
Craig Tiller6de05932017-04-28 09:17:38 -0700195
Craig Tiller61f96c12017-05-12 13:36:39 -0700196struct grpc_pollset_set {
197 char unused;
198};
Craig Tillerc67cc992017-04-27 10:15:51 -0700199
200/*******************************************************************************
201 * Common helpers
202 */
203
204static bool append_error(grpc_error **composite, grpc_error *error,
205 const char *desc) {
206 if (error == GRPC_ERROR_NONE) return true;
207 if (*composite == GRPC_ERROR_NONE) {
208 *composite = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc);
209 }
210 *composite = grpc_error_add_child(*composite, error);
211 return false;
212}
213
214/*******************************************************************************
215 * Fd Definitions
216 */
217
218/* We need to keep a freelist not because of any concerns of malloc performance
219 * but instead so that implementations with multiple threads in (for example)
220 * epoll_wait deal with the race between pollset removal and incoming poll
221 * notifications.
222 *
223 * The problem is that the poller ultimately holds a reference to this
224 * object, so it is very difficult to know when is safe to free it, at least
225 * without some expensive synchronization.
226 *
227 * If we keep the object freelisted, in the worst case losing this race just
228 * becomes a spurious read notification on a reused fd.
229 */
230
231/* The alarm system needs to be able to wakeup 'some poller' sometimes
232 * (specifically when a new alarm needs to be triggered earlier than the next
233 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
234 * case occurs. */
235
236static grpc_fd *fd_freelist = NULL;
237static gpr_mu fd_freelist_mu;
238
Craig Tillerc67cc992017-04-27 10:15:51 -0700239static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
240
241static void fd_global_shutdown(void) {
242 gpr_mu_lock(&fd_freelist_mu);
243 gpr_mu_unlock(&fd_freelist_mu);
244 while (fd_freelist != NULL) {
245 grpc_fd *fd = fd_freelist;
246 fd_freelist = fd_freelist->freelist_next;
Craig Tillerc67cc992017-04-27 10:15:51 -0700247 gpr_free(fd);
248 }
249 gpr_mu_destroy(&fd_freelist_mu);
250}
251
252static grpc_fd *fd_create(int fd, const char *name) {
253 grpc_fd *new_fd = NULL;
254
255 gpr_mu_lock(&fd_freelist_mu);
256 if (fd_freelist != NULL) {
257 new_fd = fd_freelist;
258 fd_freelist = fd_freelist->freelist_next;
259 }
260 gpr_mu_unlock(&fd_freelist_mu);
261
262 if (new_fd == NULL) {
Yash Tibrewal7cdd99c2017-09-08 16:04:12 -0700263 new_fd = (grpc_fd *)gpr_malloc(sizeof(grpc_fd));
Craig Tillerc67cc992017-04-27 10:15:51 -0700264 }
265
Craig Tillerc67cc992017-04-27 10:15:51 -0700266 new_fd->fd = fd;
Craig Tillerc67cc992017-04-27 10:15:51 -0700267 grpc_lfev_init(&new_fd->read_closure);
268 grpc_lfev_init(&new_fd->write_closure);
269 gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
270
271 new_fd->freelist_next = NULL;
Craig Tillerc67cc992017-04-27 10:15:51 -0700272
273 char *fd_name;
274 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
275 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
Noah Eisen264879f2017-06-20 17:14:47 -0700276#ifndef NDEBUG
277 if (GRPC_TRACER_ON(grpc_trace_fd_refcount)) {
278 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, new_fd, fd_name);
279 }
Craig Tillerc67cc992017-04-27 10:15:51 -0700280#endif
281 gpr_free(fd_name);
Craig Tiller9ddb3152017-04-27 21:32:56 +0000282
Yash Tibrewal533d1182017-09-18 10:48:22 -0700283 struct epoll_event ev;
284 ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
285 ev.data.ptr = new_fd;
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700286 if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
Craig Tiller9ddb3152017-04-27 21:32:56 +0000287 gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno));
288 }
289
Craig Tillerc67cc992017-04-27 10:15:51 -0700290 return new_fd;
291}
292
Craig Tiller4509c472017-04-27 19:05:13 +0000293static int fd_wrapped_fd(grpc_fd *fd) { return fd->fd; }
Craig Tillerc67cc992017-04-27 10:15:51 -0700294
Sree Kuchibhotlaf2641472017-08-02 23:46:40 -0700295/* if 'releasing_fd' is true, it means that we are going to detach the internal
296 * fd from grpc_fd structure (i.e which means we should not be calling
297 * shutdown() syscall on that fd) */
298static void fd_shutdown_internal(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
299 grpc_error *why, bool releasing_fd) {
Craig Tiller9ddb3152017-04-27 21:32:56 +0000300 if (grpc_lfev_set_shutdown(exec_ctx, &fd->read_closure,
301 GRPC_ERROR_REF(why))) {
Sree Kuchibhotlaf2641472017-08-02 23:46:40 -0700302 if (!releasing_fd) {
303 shutdown(fd->fd, SHUT_RDWR);
304 }
Craig Tiller9ddb3152017-04-27 21:32:56 +0000305 grpc_lfev_set_shutdown(exec_ctx, &fd->write_closure, GRPC_ERROR_REF(why));
306 }
307 GRPC_ERROR_UNREF(why);
308}
309
Sree Kuchibhotlaf2641472017-08-02 23:46:40 -0700310/* Might be called multiple times */
311static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
312 fd_shutdown_internal(exec_ctx, fd, why, false);
313}
314
Craig Tillerc67cc992017-04-27 10:15:51 -0700315static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
316 grpc_closure *on_done, int *release_fd,
Yuchen Zengd40a7ae2017-07-12 15:59:56 -0700317 bool already_closed, const char *reason) {
Craig Tillerc67cc992017-04-27 10:15:51 -0700318 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotlaf2641472017-08-02 23:46:40 -0700319 bool is_release_fd = (release_fd != NULL);
Craig Tillerc67cc992017-04-27 10:15:51 -0700320
Craig Tiller9ddb3152017-04-27 21:32:56 +0000321 if (!grpc_lfev_is_shutdown(&fd->read_closure)) {
Sree Kuchibhotlaf2641472017-08-02 23:46:40 -0700322 fd_shutdown_internal(exec_ctx, fd,
323 GRPC_ERROR_CREATE_FROM_COPIED_STRING(reason),
324 is_release_fd);
Craig Tiller9ddb3152017-04-27 21:32:56 +0000325 }
326
Craig Tillerc67cc992017-04-27 10:15:51 -0700327 /* If release_fd is not NULL, we should be relinquishing control of the file
328 descriptor fd->fd (but we still own the grpc_fd structure). */
Sree Kuchibhotlaf2641472017-08-02 23:46:40 -0700329 if (is_release_fd) {
Craig Tillerc67cc992017-04-27 10:15:51 -0700330 *release_fd = fd->fd;
Yuchen Zengd40a7ae2017-07-12 15:59:56 -0700331 } else if (!already_closed) {
Craig Tillerc67cc992017-04-27 10:15:51 -0700332 close(fd->fd);
Craig Tillerc67cc992017-04-27 10:15:51 -0700333 }
334
ncteisen274bbbe2017-06-08 14:57:11 -0700335 GRPC_CLOSURE_SCHED(exec_ctx, on_done, GRPC_ERROR_REF(error));
Craig Tillerc67cc992017-04-27 10:15:51 -0700336
Craig Tiller4509c472017-04-27 19:05:13 +0000337 grpc_iomgr_unregister_object(&fd->iomgr_object);
338 grpc_lfev_destroy(&fd->read_closure);
339 grpc_lfev_destroy(&fd->write_closure);
Craig Tillerc67cc992017-04-27 10:15:51 -0700340
Craig Tiller4509c472017-04-27 19:05:13 +0000341 gpr_mu_lock(&fd_freelist_mu);
342 fd->freelist_next = fd_freelist;
343 fd_freelist = fd;
344 gpr_mu_unlock(&fd_freelist_mu);
Craig Tillerc67cc992017-04-27 10:15:51 -0700345}
346
347static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
348 grpc_fd *fd) {
349 gpr_atm notifier = gpr_atm_acq_load(&fd->read_notifier_pollset);
350 return (grpc_pollset *)notifier;
351}
352
353static bool fd_is_shutdown(grpc_fd *fd) {
354 return grpc_lfev_is_shutdown(&fd->read_closure);
355}
356
Craig Tillerc67cc992017-04-27 10:15:51 -0700357static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
358 grpc_closure *closure) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700359 grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read");
Craig Tillerc67cc992017-04-27 10:15:51 -0700360}
361
362static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
363 grpc_closure *closure) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700364 grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write");
Craig Tillerc67cc992017-04-27 10:15:51 -0700365}
366
Craig Tiller4509c472017-04-27 19:05:13 +0000367static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
368 grpc_pollset *notifier) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700369 grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read");
Craig Tiller4509c472017-04-27 19:05:13 +0000370 /* Use release store to match with acquire load in fd_get_read_notifier */
371 gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
372}
373
374static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700375 grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write");
Craig Tillerc67cc992017-04-27 10:15:51 -0700376}
377
378/*******************************************************************************
379 * Pollset Definitions
380 */
381
Craig Tiller6de05932017-04-28 09:17:38 -0700382GPR_TLS_DECL(g_current_thread_pollset);
383GPR_TLS_DECL(g_current_thread_worker);
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700384
385/* The designated poller */
Craig Tiller6de05932017-04-28 09:17:38 -0700386static gpr_atm g_active_poller;
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700387
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700388static pollset_neighborhood *g_neighborhoods;
389static size_t g_num_neighborhoods;
Craig Tiller6de05932017-04-28 09:17:38 -0700390
Craig Tillerc67cc992017-04-27 10:15:51 -0700391/* Return true if first in list */
Craig Tiller32f90ee2017-04-28 12:46:41 -0700392static bool worker_insert(grpc_pollset *pollset, grpc_pollset_worker *worker) {
393 if (pollset->root_worker == NULL) {
394 pollset->root_worker = worker;
395 worker->next = worker->prev = worker;
Craig Tillerc67cc992017-04-27 10:15:51 -0700396 return true;
397 } else {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700398 worker->next = pollset->root_worker;
399 worker->prev = worker->next->prev;
400 worker->next->prev = worker;
401 worker->prev->next = worker;
Craig Tillerc67cc992017-04-27 10:15:51 -0700402 return false;
403 }
404}
405
406/* Return true if last in list */
407typedef enum { EMPTIED, NEW_ROOT, REMOVED } worker_remove_result;
408
Craig Tiller32f90ee2017-04-28 12:46:41 -0700409static worker_remove_result worker_remove(grpc_pollset *pollset,
Craig Tillerc67cc992017-04-27 10:15:51 -0700410 grpc_pollset_worker *worker) {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700411 if (worker == pollset->root_worker) {
412 if (worker == worker->next) {
413 pollset->root_worker = NULL;
Craig Tillerc67cc992017-04-27 10:15:51 -0700414 return EMPTIED;
415 } else {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700416 pollset->root_worker = worker->next;
417 worker->prev->next = worker->next;
418 worker->next->prev = worker->prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700419 return NEW_ROOT;
420 }
421 } else {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700422 worker->prev->next = worker->next;
423 worker->next->prev = worker->prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700424 return REMOVED;
425 }
426}
427
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700428static size_t choose_neighborhood(void) {
429 return (size_t)gpr_cpu_current_cpu() % g_num_neighborhoods;
Craig Tillerba550da2017-05-01 14:26:31 +0000430}
431
Craig Tiller4509c472017-04-27 19:05:13 +0000432static grpc_error *pollset_global_init(void) {
Craig Tiller4509c472017-04-27 19:05:13 +0000433 gpr_tls_init(&g_current_thread_pollset);
434 gpr_tls_init(&g_current_thread_worker);
Craig Tiller6de05932017-04-28 09:17:38 -0700435 gpr_atm_no_barrier_store(&g_active_poller, 0);
Craig Tiller375eb252017-04-27 23:29:12 +0000436 global_wakeup_fd.read_fd = -1;
437 grpc_error *err = grpc_wakeup_fd_init(&global_wakeup_fd);
438 if (err != GRPC_ERROR_NONE) return err;
Yash Tibrewal533d1182017-09-18 10:48:22 -0700439 struct epoll_event ev;
440 ev.events = (uint32_t)(EPOLLIN | EPOLLET);
441 ev.data.ptr = &global_wakeup_fd;
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700442 if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd,
443 &ev) != 0) {
Craig Tiller4509c472017-04-27 19:05:13 +0000444 return GRPC_OS_ERROR(errno, "epoll_ctl");
445 }
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700446 g_num_neighborhoods = GPR_CLAMP(gpr_cpu_num_cores(), 1, MAX_NEIGHBORHOODS);
447 g_neighborhoods = (pollset_neighborhood *)gpr_zalloc(
448 sizeof(*g_neighborhoods) * g_num_neighborhoods);
449 for (size_t i = 0; i < g_num_neighborhoods; i++) {
450 gpr_mu_init(&g_neighborhoods[i].mu);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700451 }
Craig Tiller4509c472017-04-27 19:05:13 +0000452 return GRPC_ERROR_NONE;
453}
454
455static void pollset_global_shutdown(void) {
Craig Tiller4509c472017-04-27 19:05:13 +0000456 gpr_tls_destroy(&g_current_thread_pollset);
457 gpr_tls_destroy(&g_current_thread_worker);
Craig Tiller375eb252017-04-27 23:29:12 +0000458 if (global_wakeup_fd.read_fd != -1) grpc_wakeup_fd_destroy(&global_wakeup_fd);
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700459 for (size_t i = 0; i < g_num_neighborhoods; i++) {
460 gpr_mu_destroy(&g_neighborhoods[i].mu);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700461 }
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700462 gpr_free(g_neighborhoods);
Craig Tiller4509c472017-04-27 19:05:13 +0000463}
464
465static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
Craig Tiller6de05932017-04-28 09:17:38 -0700466 gpr_mu_init(&pollset->mu);
467 *mu = &pollset->mu;
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700468 pollset->neighborhood = &g_neighborhoods[choose_neighborhood()];
469 pollset->reassigning_neighborhood = false;
Sree Kuchibhotla30882302017-08-16 13:46:52 -0700470 pollset->root_worker = NULL;
471 pollset->kicked_without_poller = false;
Craig Tiller6de05932017-04-28 09:17:38 -0700472 pollset->seen_inactive = true;
Sree Kuchibhotla30882302017-08-16 13:46:52 -0700473 pollset->shutting_down = false;
474 pollset->shutdown_closure = NULL;
475 pollset->begin_refs = 0;
476 pollset->next = pollset->prev = NULL;
Craig Tiller6de05932017-04-28 09:17:38 -0700477}
478
Craig Tillerc6109852017-05-01 14:26:49 -0700479static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
Craig Tillere00d7332017-05-01 15:43:51 +0000480 gpr_mu_lock(&pollset->mu);
Craig Tillerba550da2017-05-01 14:26:31 +0000481 if (!pollset->seen_inactive) {
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700482 pollset_neighborhood *neighborhood = pollset->neighborhood;
Craig Tillere00d7332017-05-01 15:43:51 +0000483 gpr_mu_unlock(&pollset->mu);
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700484 retry_lock_neighborhood:
485 gpr_mu_lock(&neighborhood->mu);
Craig Tillere00d7332017-05-01 15:43:51 +0000486 gpr_mu_lock(&pollset->mu);
487 if (!pollset->seen_inactive) {
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700488 if (pollset->neighborhood != neighborhood) {
489 gpr_mu_unlock(&neighborhood->mu);
490 neighborhood = pollset->neighborhood;
Craig Tillere00d7332017-05-01 15:43:51 +0000491 gpr_mu_unlock(&pollset->mu);
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700492 goto retry_lock_neighborhood;
Craig Tillere00d7332017-05-01 15:43:51 +0000493 }
494 pollset->prev->next = pollset->next;
495 pollset->next->prev = pollset->prev;
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700496 if (pollset == pollset->neighborhood->active_root) {
497 pollset->neighborhood->active_root =
Craig Tillere00d7332017-05-01 15:43:51 +0000498 pollset->next == pollset ? NULL : pollset->next;
499 }
Craig Tillerba550da2017-05-01 14:26:31 +0000500 }
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700501 gpr_mu_unlock(&pollset->neighborhood->mu);
Craig Tiller6de05932017-04-28 09:17:38 -0700502 }
Craig Tillere00d7332017-05-01 15:43:51 +0000503 gpr_mu_unlock(&pollset->mu);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700504 gpr_mu_destroy(&pollset->mu);
Craig Tiller4509c472017-04-27 19:05:13 +0000505}
506
507static grpc_error *pollset_kick_all(grpc_pollset *pollset) {
yang-gdf92a642017-08-21 22:38:45 -0700508 GPR_TIMER_BEGIN("pollset_kick_all", 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000509 grpc_error *error = GRPC_ERROR_NONE;
510 if (pollset->root_worker != NULL) {
511 grpc_pollset_worker *worker = pollset->root_worker;
512 do {
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700513 switch (worker->state) {
Craig Tiller55624a32017-05-26 08:14:44 -0700514 case KICKED:
515 break;
516 case UNKICKED:
517 SET_KICK_STATE(worker, KICKED);
518 if (worker->initialized_cv) {
519 gpr_cv_signal(&worker->cv);
520 }
521 break;
522 case DESIGNATED_POLLER:
523 SET_KICK_STATE(worker, KICKED);
524 append_error(&error, grpc_wakeup_fd_wakeup(&global_wakeup_fd),
Sree Kuchibhotla0d8431a2017-07-18 16:21:54 -0700525 "pollset_kick_all");
Craig Tiller55624a32017-05-26 08:14:44 -0700526 break;
Craig Tiller4509c472017-04-27 19:05:13 +0000527 }
528
Craig Tiller32f90ee2017-04-28 12:46:41 -0700529 worker = worker->next;
Craig Tiller4509c472017-04-27 19:05:13 +0000530 } while (worker != pollset->root_worker);
531 }
Sree Kuchibhotla0d8431a2017-07-18 16:21:54 -0700532 // TODO: sreek. Check if we need to set 'kicked_without_poller' to true here
533 // in the else case
yang-gdf92a642017-08-21 22:38:45 -0700534 GPR_TIMER_END("pollset_kick_all", 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000535 return error;
536}
537
538static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx,
539 grpc_pollset *pollset) {
Craig Tillerba550da2017-05-01 14:26:31 +0000540 if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL &&
541 pollset->begin_refs == 0) {
yang-gdf92a642017-08-21 22:38:45 -0700542 GPR_TIMER_MARK("pollset_finish_shutdown", 0);
ncteisen274bbbe2017-06-08 14:57:11 -0700543 GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE);
Craig Tiller4509c472017-04-27 19:05:13 +0000544 pollset->shutdown_closure = NULL;
545 }
546}
547
548static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
549 grpc_closure *closure) {
yang-gdf92a642017-08-21 22:38:45 -0700550 GPR_TIMER_BEGIN("pollset_shutdown", 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000551 GPR_ASSERT(pollset->shutdown_closure == NULL);
Craig Tillerc81512a2017-05-26 09:53:58 -0700552 GPR_ASSERT(!pollset->shutting_down);
Craig Tiller4509c472017-04-27 19:05:13 +0000553 pollset->shutdown_closure = closure;
Craig Tillerc81512a2017-05-26 09:53:58 -0700554 pollset->shutting_down = true;
Craig Tiller4509c472017-04-27 19:05:13 +0000555 GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(pollset));
556 pollset_maybe_finish_shutdown(exec_ctx, pollset);
yang-gdf92a642017-08-21 22:38:45 -0700557 GPR_TIMER_END("pollset_shutdown", 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000558}
559
Craig Tiller4509c472017-04-27 19:05:13 +0000560static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
561 gpr_timespec now) {
562 gpr_timespec timeout;
563 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
564 return -1;
565 }
566
567 if (gpr_time_cmp(deadline, now) <= 0) {
568 return 0;
569 }
570
Yash Tibrewal533d1182017-09-18 10:48:22 -0700571 static const gpr_timespec round_up = {0, GPR_NS_PER_MS - 1, GPR_TIMESPAN};
Craig Tiller4509c472017-04-27 19:05:13 +0000572 timeout = gpr_time_sub(deadline, now);
573 int millis = gpr_time_to_millis(gpr_time_add(timeout, round_up));
574 return millis >= 1 ? millis : 1;
575}
576
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700577/* Process the epoll events found by do_epoll_wait() function.
578 - g_epoll_set.cursor points to the index of the first event to be processed
579 - This function then processes up-to MAX_EPOLL_EVENTS_PER_ITERATION and
580 updates the g_epoll_set.cursor
Craig Tiller4509c472017-04-27 19:05:13 +0000581
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700582 NOTE ON SYNCRHONIZATION: Similar to do_epoll_wait(), this function is only
583 called by g_active_poller thread. So there is no need for synchronization
584 when accessing fields in g_epoll_set */
585static grpc_error *process_epoll_events(grpc_exec_ctx *exec_ctx,
586 grpc_pollset *pollset) {
587 static const char *err_desc = "process_events";
Craig Tiller4509c472017-04-27 19:05:13 +0000588 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700589
Sree Kuchibhotla3d609f12017-08-25 10:00:18 -0700590 GPR_TIMER_BEGIN("process_epoll_events", 0);
Sree Kuchibhotlaa92a9cc2017-08-27 14:02:15 -0700591 long num_events = gpr_atm_acq_load(&g_epoll_set.num_events);
592 long cursor = gpr_atm_acq_load(&g_epoll_set.cursor);
593 for (int idx = 0;
594 (idx < MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION) && cursor != num_events;
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700595 idx++) {
Sree Kuchibhotlaa92a9cc2017-08-27 14:02:15 -0700596 long c = cursor++;
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700597 struct epoll_event *ev = &g_epoll_set.events[c];
598 void *data_ptr = ev->data.ptr;
599
Craig Tiller4509c472017-04-27 19:05:13 +0000600 if (data_ptr == &global_wakeup_fd) {
Craig Tiller4509c472017-04-27 19:05:13 +0000601 append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
602 err_desc);
603 } else {
604 grpc_fd *fd = (grpc_fd *)(data_ptr);
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700605 bool cancel = (ev->events & (EPOLLERR | EPOLLHUP)) != 0;
606 bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0;
607 bool write_ev = (ev->events & EPOLLOUT) != 0;
608
Craig Tiller4509c472017-04-27 19:05:13 +0000609 if (read_ev || cancel) {
610 fd_become_readable(exec_ctx, fd, pollset);
611 }
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700612
Craig Tiller4509c472017-04-27 19:05:13 +0000613 if (write_ev || cancel) {
614 fd_become_writable(exec_ctx, fd);
615 }
616 }
617 }
Sree Kuchibhotlaa92a9cc2017-08-27 14:02:15 -0700618 gpr_atm_rel_store(&g_epoll_set.cursor, cursor);
Sree Kuchibhotla3d609f12017-08-25 10:00:18 -0700619 GPR_TIMER_END("process_epoll_events", 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000620 return error;
621}
622
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700623/* Do epoll_wait and store the events in g_epoll_set.events field. This does not
624 "process" any of the events yet; that is done in process_epoll_events().
625 *See process_epoll_events() function for more details.
626
627 NOTE ON SYNCHRONIZATION: At any point of time, only the g_active_poller
628 (i.e the designated poller thread) will be calling this function. So there is
629 no need for any synchronization when accesing fields in g_epoll_set */
630static grpc_error *do_epoll_wait(grpc_exec_ctx *exec_ctx, grpc_pollset *ps,
631 gpr_timespec now, gpr_timespec deadline) {
Sree Kuchibhotla3d609f12017-08-25 10:00:18 -0700632 GPR_TIMER_BEGIN("do_epoll_wait", 0);
633
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700634 int r;
635 int timeout = poll_deadline_to_millis_timeout(deadline, now);
636 if (timeout != 0) {
637 GRPC_SCHEDULING_START_BLOCKING_REGION;
638 }
639 do {
Craig Tillerb4bb1cd2017-07-20 14:18:17 -0700640 GRPC_STATS_INC_SYSCALL_POLL(exec_ctx);
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700641 r = epoll_wait(g_epoll_set.epfd, g_epoll_set.events, MAX_EPOLL_EVENTS,
642 timeout);
643 } while (r < 0 && errno == EINTR);
644 if (timeout != 0) {
645 GRPC_SCHEDULING_END_BLOCKING_REGION;
646 }
647
648 if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
649
650 if (GRPC_TRACER_ON(grpc_polling_trace)) {
651 gpr_log(GPR_DEBUG, "ps: %p poll got %d events", ps, r);
652 }
653
Sree Kuchibhotlaa92a9cc2017-08-27 14:02:15 -0700654 gpr_atm_rel_store(&g_epoll_set.num_events, r);
655 gpr_atm_rel_store(&g_epoll_set.cursor, 0);
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700656
Sree Kuchibhotla3d609f12017-08-25 10:00:18 -0700657 GPR_TIMER_END("do_epoll_wait", 0);
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700658 return GRPC_ERROR_NONE;
659}
660
Craig Tiller4509c472017-04-27 19:05:13 +0000661static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
662 grpc_pollset_worker **worker_hdl, gpr_timespec *now,
663 gpr_timespec deadline) {
yang-gdf92a642017-08-21 22:38:45 -0700664 GPR_TIMER_BEGIN("begin_worker", 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000665 if (worker_hdl != NULL) *worker_hdl = worker;
666 worker->initialized_cv = false;
Craig Tiller55624a32017-05-26 08:14:44 -0700667 SET_KICK_STATE(worker, UNKICKED);
Craig Tiller50da5ec2017-05-01 13:51:14 -0700668 worker->schedule_on_end_work = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
Craig Tillerba550da2017-05-01 14:26:31 +0000669 pollset->begin_refs++;
Craig Tiller4509c472017-04-27 19:05:13 +0000670
Craig Tiller830e82a2017-05-31 16:26:27 -0700671 if (GRPC_TRACER_ON(grpc_polling_trace)) {
672 gpr_log(GPR_ERROR, "PS:%p BEGIN_STARTS:%p", pollset, worker);
673 }
674
Craig Tiller32f90ee2017-04-28 12:46:41 -0700675 if (pollset->seen_inactive) {
676 // pollset has been observed to be inactive, we need to move back to the
677 // active list
Craig Tillere00d7332017-05-01 15:43:51 +0000678 bool is_reassigning = false;
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700679 if (!pollset->reassigning_neighborhood) {
Craig Tillere00d7332017-05-01 15:43:51 +0000680 is_reassigning = true;
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700681 pollset->reassigning_neighborhood = true;
682 pollset->neighborhood = &g_neighborhoods[choose_neighborhood()];
Craig Tillere00d7332017-05-01 15:43:51 +0000683 }
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700684 pollset_neighborhood *neighborhood = pollset->neighborhood;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700685 gpr_mu_unlock(&pollset->mu);
Craig Tillerba550da2017-05-01 14:26:31 +0000686 // pollset unlocked: state may change (even worker->kick_state)
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700687 retry_lock_neighborhood:
688 gpr_mu_lock(&neighborhood->mu);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700689 gpr_mu_lock(&pollset->mu);
Craig Tiller830e82a2017-05-31 16:26:27 -0700690 if (GRPC_TRACER_ON(grpc_polling_trace)) {
691 gpr_log(GPR_ERROR, "PS:%p BEGIN_REORG:%p kick_state=%s is_reassigning=%d",
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700692 pollset, worker, kick_state_string(worker->state),
Craig Tiller830e82a2017-05-31 16:26:27 -0700693 is_reassigning);
694 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700695 if (pollset->seen_inactive) {
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700696 if (neighborhood != pollset->neighborhood) {
697 gpr_mu_unlock(&neighborhood->mu);
698 neighborhood = pollset->neighborhood;
Craig Tiller2acab6e2017-04-30 23:06:33 +0000699 gpr_mu_unlock(&pollset->mu);
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700700 goto retry_lock_neighborhood;
Craig Tiller2acab6e2017-04-30 23:06:33 +0000701 }
Sree Kuchibhotla0d8431a2017-07-18 16:21:54 -0700702
Sree Kuchibhotlafb349402017-09-06 10:58:06 -0700703 /* In the brief time we released the pollset locks above, the worker MAY
704 have been kicked. In this case, the worker should get out of this
705 pollset ASAP and hence this should neither add the pollset to
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700706 neighborhood nor mark the pollset as active.
Sree Kuchibhotlafb349402017-09-06 10:58:06 -0700707
708 On a side note, the only way a worker's kick state could have changed
709 at this point is if it were "kicked specifically". Since the worker has
710 not added itself to the pollset yet (by calling worker_insert()), it is
711 not visible in the "kick any" path yet */
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700712 if (worker->state == UNKICKED) {
Sree Kuchibhotlafb349402017-09-06 10:58:06 -0700713 pollset->seen_inactive = false;
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700714 if (neighborhood->active_root == NULL) {
715 neighborhood->active_root = pollset->next = pollset->prev = pollset;
Sree Kuchibhotlafb349402017-09-06 10:58:06 -0700716 /* Make this the designated poller if there isn't one already */
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700717 if (worker->state == UNKICKED &&
Sree Kuchibhotlafb349402017-09-06 10:58:06 -0700718 gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)worker)) {
719 SET_KICK_STATE(worker, DESIGNATED_POLLER);
720 }
721 } else {
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700722 pollset->next = neighborhood->active_root;
Sree Kuchibhotlafb349402017-09-06 10:58:06 -0700723 pollset->prev = pollset->next->prev;
724 pollset->next->prev = pollset->prev->next = pollset;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700725 }
Craig Tiller4509c472017-04-27 19:05:13 +0000726 }
727 }
Craig Tillere00d7332017-05-01 15:43:51 +0000728 if (is_reassigning) {
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700729 GPR_ASSERT(pollset->reassigning_neighborhood);
730 pollset->reassigning_neighborhood = false;
Craig Tillere00d7332017-05-01 15:43:51 +0000731 }
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700732 gpr_mu_unlock(&neighborhood->mu);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700733 }
Sree Kuchibhotlae6506bc2017-07-18 21:43:45 -0700734
Craig Tiller32f90ee2017-04-28 12:46:41 -0700735 worker_insert(pollset, worker);
Craig Tillerba550da2017-05-01 14:26:31 +0000736 pollset->begin_refs--;
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700737 if (worker->state == UNKICKED && !pollset->kicked_without_poller) {
Craig Tillera4b8eb02017-04-29 00:13:52 +0000738 GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700739 worker->initialized_cv = true;
740 gpr_cv_init(&worker->cv);
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700741 while (worker->state == UNKICKED && !pollset->shutting_down) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700742 if (GRPC_TRACER_ON(grpc_polling_trace)) {
743 gpr_log(GPR_ERROR, "PS:%p BEGIN_WAIT:%p kick_state=%s shutdown=%d",
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700744 pollset, worker, kick_state_string(worker->state),
Craig Tiller830e82a2017-05-31 16:26:27 -0700745 pollset->shutting_down);
746 }
Sree Kuchibhotla0d8431a2017-07-18 16:21:54 -0700747
Craig Tiller32f90ee2017-04-28 12:46:41 -0700748 if (gpr_cv_wait(&worker->cv, &pollset->mu, deadline) &&
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700749 worker->state == UNKICKED) {
Sree Kuchibhotla0d8431a2017-07-18 16:21:54 -0700750 /* If gpr_cv_wait returns true (i.e a timeout), pretend that the worker
751 received a kick */
Craig Tiller55624a32017-05-26 08:14:44 -0700752 SET_KICK_STATE(worker, KICKED);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700753 }
Craig Tillerba550da2017-05-01 14:26:31 +0000754 }
Craig Tiller4509c472017-04-27 19:05:13 +0000755 *now = gpr_now(now->clock_type);
756 }
Sree Kuchibhotla949d0752017-07-20 23:49:15 -0700757
Craig Tiller830e82a2017-05-31 16:26:27 -0700758 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Sree Kuchibhotla949d0752017-07-20 23:49:15 -0700759 gpr_log(GPR_ERROR,
760 "PS:%p BEGIN_DONE:%p kick_state=%s shutdown=%d "
761 "kicked_without_poller: %d",
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700762 pollset, worker, kick_state_string(worker->state),
Sree Kuchibhotla949d0752017-07-20 23:49:15 -0700763 pollset->shutting_down, pollset->kicked_without_poller);
Craig Tiller830e82a2017-05-31 16:26:27 -0700764 }
Craig Tiller4509c472017-04-27 19:05:13 +0000765
Sree Kuchibhotlae6506bc2017-07-18 21:43:45 -0700766 /* We release pollset lock in this function at a couple of places:
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700767 * 1. Briefly when assigning pollset to a neighborhood
Sree Kuchibhotlae6506bc2017-07-18 21:43:45 -0700768 * 2. When doing gpr_cv_wait()
769 * It is possible that 'kicked_without_poller' was set to true during (1) and
770 * 'shutting_down' is set to true during (1) or (2). If either of them is
771 * true, this worker cannot do polling */
Sree Kuchibhotlae6506bc2017-07-18 21:43:45 -0700772 /* TODO(sreek): Perhaps there is a better way to handle kicked_without_poller
773 * case; especially when the worker is the DESIGNATED_POLLER */
774
Sree Kuchibhotlaa0616ef2017-07-18 23:49:49 -0700775 if (pollset->kicked_without_poller) {
776 pollset->kicked_without_poller = false;
yang-gdf92a642017-08-21 22:38:45 -0700777 GPR_TIMER_END("begin_worker", 0);
Sree Kuchibhotlaa0616ef2017-07-18 23:49:49 -0700778 return false;
779 }
780
yang-gdf92a642017-08-21 22:38:45 -0700781 GPR_TIMER_END("begin_worker", 0);
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700782 return worker->state == DESIGNATED_POLLER && !pollset->shutting_down;
Craig Tiller4509c472017-04-27 19:05:13 +0000783}
784
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700785static bool check_neighborhood_for_available_poller(
786 pollset_neighborhood *neighborhood) {
787 GPR_TIMER_BEGIN("check_neighborhood_for_available_poller", 0);
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700788 bool found_worker = false;
789 do {
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700790 grpc_pollset *inspect = neighborhood->active_root;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700791 if (inspect == NULL) {
792 break;
793 }
794 gpr_mu_lock(&inspect->mu);
795 GPR_ASSERT(!inspect->seen_inactive);
796 grpc_pollset_worker *inspect_worker = inspect->root_worker;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700797 if (inspect_worker != NULL) {
Craig Tillera4b8eb02017-04-29 00:13:52 +0000798 do {
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700799 switch (inspect_worker->state) {
Craig Tillerba550da2017-05-01 14:26:31 +0000800 case UNKICKED:
801 if (gpr_atm_no_barrier_cas(&g_active_poller, 0,
802 (gpr_atm)inspect_worker)) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700803 if (GRPC_TRACER_ON(grpc_polling_trace)) {
804 gpr_log(GPR_DEBUG, " .. choose next poller to be %p",
805 inspect_worker);
806 }
Craig Tiller55624a32017-05-26 08:14:44 -0700807 SET_KICK_STATE(inspect_worker, DESIGNATED_POLLER);
Craig Tillerba550da2017-05-01 14:26:31 +0000808 if (inspect_worker->initialized_cv) {
yang-gdf92a642017-08-21 22:38:45 -0700809 GPR_TIMER_MARK("signal worker", 0);
Craig Tillerba550da2017-05-01 14:26:31 +0000810 gpr_cv_signal(&inspect_worker->cv);
811 }
Craig Tiller830e82a2017-05-31 16:26:27 -0700812 } else {
813 if (GRPC_TRACER_ON(grpc_polling_trace)) {
814 gpr_log(GPR_DEBUG, " .. beaten to choose next poller");
815 }
Craig Tillera4b8eb02017-04-29 00:13:52 +0000816 }
Craig Tillerba550da2017-05-01 14:26:31 +0000817 // even if we didn't win the cas, there's a worker, we can stop
818 found_worker = true;
819 break;
820 case KICKED:
821 break;
822 case DESIGNATED_POLLER:
823 found_worker = true; // ok, so someone else found the worker, but
824 // we'll accept that
825 break;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700826 }
Craig Tillera4b8eb02017-04-29 00:13:52 +0000827 inspect_worker = inspect_worker->next;
Craig Tiller830e82a2017-05-31 16:26:27 -0700828 } while (!found_worker && inspect_worker != inspect->root_worker);
Craig Tillera4b8eb02017-04-29 00:13:52 +0000829 }
830 if (!found_worker) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700831 if (GRPC_TRACER_ON(grpc_polling_trace)) {
832 gpr_log(GPR_DEBUG, " .. mark pollset %p inactive", inspect);
833 }
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700834 inspect->seen_inactive = true;
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700835 if (inspect == neighborhood->active_root) {
836 neighborhood->active_root =
Craig Tillera95bacf2017-05-01 12:51:24 -0700837 inspect->next == inspect ? NULL : inspect->next;
Craig Tiller2acab6e2017-04-30 23:06:33 +0000838 }
839 inspect->next->prev = inspect->prev;
840 inspect->prev->next = inspect->next;
Craig Tillere00d7332017-05-01 15:43:51 +0000841 inspect->next = inspect->prev = NULL;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700842 }
843 gpr_mu_unlock(&inspect->mu);
844 } while (!found_worker);
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700845 GPR_TIMER_END("check_neighborhood_for_available_poller", 0);
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700846 return found_worker;
847}
848
Craig Tiller4509c472017-04-27 19:05:13 +0000849static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
850 grpc_pollset_worker *worker,
851 grpc_pollset_worker **worker_hdl) {
yang-gdf92a642017-08-21 22:38:45 -0700852 GPR_TIMER_BEGIN("end_worker", 0);
Craig Tiller830e82a2017-05-31 16:26:27 -0700853 if (GRPC_TRACER_ON(grpc_polling_trace)) {
854 gpr_log(GPR_DEBUG, "PS:%p END_WORKER:%p", pollset, worker);
855 }
Craig Tiller8502ecb2017-04-28 14:22:01 -0700856 if (worker_hdl != NULL) *worker_hdl = NULL;
Craig Tiller830e82a2017-05-31 16:26:27 -0700857 /* Make sure we appear kicked */
Craig Tiller55624a32017-05-26 08:14:44 -0700858 SET_KICK_STATE(worker, KICKED);
Craig Tiller50da5ec2017-05-01 13:51:14 -0700859 grpc_closure_list_move(&worker->schedule_on_end_work,
860 &exec_ctx->closure_list);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700861 if (gpr_atm_no_barrier_load(&g_active_poller) == (gpr_atm)worker) {
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700862 if (worker->next != worker && worker->next->state == UNKICKED) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700863 if (GRPC_TRACER_ON(grpc_polling_trace)) {
864 gpr_log(GPR_DEBUG, " .. choose next poller to be peer %p", worker);
865 }
Craig Tiller2acab6e2017-04-30 23:06:33 +0000866 GPR_ASSERT(worker->next->initialized_cv);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700867 gpr_atm_no_barrier_store(&g_active_poller, (gpr_atm)worker->next);
Craig Tiller55624a32017-05-26 08:14:44 -0700868 SET_KICK_STATE(worker->next, DESIGNATED_POLLER);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700869 gpr_cv_signal(&worker->next->cv);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700870 if (grpc_exec_ctx_has_work(exec_ctx)) {
871 gpr_mu_unlock(&pollset->mu);
872 grpc_exec_ctx_flush(exec_ctx);
873 gpr_mu_lock(&pollset->mu);
874 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700875 } else {
876 gpr_atm_no_barrier_store(&g_active_poller, 0);
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700877 size_t poller_neighborhood_idx =
878 (size_t)(pollset->neighborhood - g_neighborhoods);
Craig Tillerbb742672017-05-17 22:19:05 +0000879 gpr_mu_unlock(&pollset->mu);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700880 bool found_worker = false;
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700881 bool scan_state[MAX_NEIGHBORHOODS];
882 for (size_t i = 0; !found_worker && i < g_num_neighborhoods; i++) {
883 pollset_neighborhood *neighborhood =
884 &g_neighborhoods[(poller_neighborhood_idx + i) %
885 g_num_neighborhoods];
886 if (gpr_mu_trylock(&neighborhood->mu)) {
887 found_worker = check_neighborhood_for_available_poller(neighborhood);
888 gpr_mu_unlock(&neighborhood->mu);
Craig Tillerba550da2017-05-01 14:26:31 +0000889 scan_state[i] = true;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700890 } else {
Craig Tillerba550da2017-05-01 14:26:31 +0000891 scan_state[i] = false;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700892 }
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700893 }
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700894 for (size_t i = 0; !found_worker && i < g_num_neighborhoods; i++) {
Craig Tillerba550da2017-05-01 14:26:31 +0000895 if (scan_state[i]) continue;
Vijay Pai4b7ef4d2017-09-11 23:09:22 -0700896 pollset_neighborhood *neighborhood =
897 &g_neighborhoods[(poller_neighborhood_idx + i) %
898 g_num_neighborhoods];
899 gpr_mu_lock(&neighborhood->mu);
900 found_worker = check_neighborhood_for_available_poller(neighborhood);
901 gpr_mu_unlock(&neighborhood->mu);
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700902 }
Craig Tiller8502ecb2017-04-28 14:22:01 -0700903 grpc_exec_ctx_flush(exec_ctx);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700904 gpr_mu_lock(&pollset->mu);
905 }
Craig Tiller50da5ec2017-05-01 13:51:14 -0700906 } else if (grpc_exec_ctx_has_work(exec_ctx)) {
907 gpr_mu_unlock(&pollset->mu);
908 grpc_exec_ctx_flush(exec_ctx);
909 gpr_mu_lock(&pollset->mu);
Craig Tiller4509c472017-04-27 19:05:13 +0000910 }
911 if (worker->initialized_cv) {
912 gpr_cv_destroy(&worker->cv);
913 }
Craig Tiller830e82a2017-05-31 16:26:27 -0700914 if (GRPC_TRACER_ON(grpc_polling_trace)) {
915 gpr_log(GPR_DEBUG, " .. remove worker");
916 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700917 if (EMPTIED == worker_remove(pollset, worker)) {
Craig Tiller4509c472017-04-27 19:05:13 +0000918 pollset_maybe_finish_shutdown(exec_ctx, pollset);
919 }
Craig Tillera4b8eb02017-04-29 00:13:52 +0000920 GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
yang-gdf92a642017-08-21 22:38:45 -0700921 GPR_TIMER_END("end_worker", 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000922}
923
924/* pollset->po.mu lock must be held by the caller before calling this.
925 The function pollset_work() may temporarily release the lock (pollset->po.mu)
926 during the course of its execution but it will always re-acquire the lock and
927 ensure that it is held by the time the function returns */
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700928static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *ps,
Craig Tiller4509c472017-04-27 19:05:13 +0000929 grpc_pollset_worker **worker_hdl,
930 gpr_timespec now, gpr_timespec deadline) {
931 grpc_pollset_worker worker;
932 grpc_error *error = GRPC_ERROR_NONE;
933 static const char *err_desc = "pollset_work";
yang-gdf92a642017-08-21 22:38:45 -0700934 GPR_TIMER_BEGIN("pollset_work", 0);
Sree Kuchibhotlab154cd12017-08-25 10:33:41 -0700935 if (ps->kicked_without_poller) {
936 ps->kicked_without_poller = false;
yang-gdf92a642017-08-21 22:38:45 -0700937 GPR_TIMER_END("pollset_work", 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000938 return GRPC_ERROR_NONE;
939 }
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700940
941 if (begin_worker(ps, &worker, worker_hdl, &now, deadline)) {
942 gpr_tls_set(&g_current_thread_pollset, (intptr_t)ps);
Craig Tiller4509c472017-04-27 19:05:13 +0000943 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700944 GPR_ASSERT(!ps->shutting_down);
945 GPR_ASSERT(!ps->seen_inactive);
946
947 gpr_mu_unlock(&ps->mu); /* unlock */
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700948 /* This is the designated polling thread at this point and should ideally do
949 polling. However, if there are unprocessed events left from a previous
950 call to do_epoll_wait(), skip calling epoll_wait() in this iteration and
951 process the pending epoll events.
952
953 The reason for decoupling do_epoll_wait and process_epoll_events is to
954 better distrubute the work (i.e handling epoll events) across multiple
955 threads
956
957 process_epoll_events() returns very quickly: It just queues the work on
958 exec_ctx but does not execute it (the actual exectution or more
959 accurately grpc_exec_ctx_flush() happens in end_worker() AFTER selecting
960 a designated poller). So we are not waiting long periods without a
961 designated poller */
Sree Kuchibhotlaa92a9cc2017-08-27 14:02:15 -0700962 if (gpr_atm_acq_load(&g_epoll_set.cursor) ==
963 gpr_atm_acq_load(&g_epoll_set.num_events)) {
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700964 append_error(&error, do_epoll_wait(exec_ctx, ps, now, deadline),
965 err_desc);
966 }
967 append_error(&error, process_epoll_events(exec_ctx, ps), err_desc);
968
969 gpr_mu_lock(&ps->mu); /* lock */
970
Craig Tiller4509c472017-04-27 19:05:13 +0000971 gpr_tls_set(&g_current_thread_worker, 0);
Craig Tiller830e82a2017-05-31 16:26:27 -0700972 } else {
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700973 gpr_tls_set(&g_current_thread_pollset, (intptr_t)ps);
Craig Tiller4509c472017-04-27 19:05:13 +0000974 }
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -0700975 end_worker(exec_ctx, ps, &worker, worker_hdl);
976
Craig Tiller8502ecb2017-04-28 14:22:01 -0700977 gpr_tls_set(&g_current_thread_pollset, 0);
yang-gdf92a642017-08-21 22:38:45 -0700978 GPR_TIMER_END("pollset_work", 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000979 return error;
980}
981
982static grpc_error *pollset_kick(grpc_pollset *pollset,
983 grpc_pollset_worker *specific_worker) {
yang-gdf92a642017-08-21 22:38:45 -0700984 GPR_TIMER_BEGIN("pollset_kick", 0);
985 grpc_error *ret_err = GRPC_ERROR_NONE;
Craig Tillerb89bac02017-05-26 15:20:32 +0000986 if (GRPC_TRACER_ON(grpc_polling_trace)) {
987 gpr_strvec log;
988 gpr_strvec_init(&log);
989 char *tmp;
Craig Tiller75aef7f2017-05-26 08:26:08 -0700990 gpr_asprintf(
991 &tmp, "PS:%p KICK:%p curps=%p curworker=%p root=%p", pollset,
992 specific_worker, (void *)gpr_tls_get(&g_current_thread_pollset),
993 (void *)gpr_tls_get(&g_current_thread_worker), pollset->root_worker);
Craig Tillerb89bac02017-05-26 15:20:32 +0000994 gpr_strvec_add(&log, tmp);
995 if (pollset->root_worker != NULL) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700996 gpr_asprintf(&tmp, " {kick_state=%s next=%p {kick_state=%s}}",
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700997 kick_state_string(pollset->root_worker->state),
Craig Tiller830e82a2017-05-31 16:26:27 -0700998 pollset->root_worker->next,
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700999 kick_state_string(pollset->root_worker->next->state));
Craig Tillerb89bac02017-05-26 15:20:32 +00001000 gpr_strvec_add(&log, tmp);
1001 }
1002 if (specific_worker != NULL) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001003 gpr_asprintf(&tmp, " worker_kick_state=%s",
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -07001004 kick_state_string(specific_worker->state));
Craig Tillerb89bac02017-05-26 15:20:32 +00001005 gpr_strvec_add(&log, tmp);
1006 }
1007 tmp = gpr_strvec_flatten(&log, NULL);
1008 gpr_strvec_destroy(&log);
Craig Tiller830e82a2017-05-31 16:26:27 -07001009 gpr_log(GPR_ERROR, "%s", tmp);
Craig Tillerb89bac02017-05-26 15:20:32 +00001010 gpr_free(tmp);
1011 }
Sree Kuchibhotlafb349402017-09-06 10:58:06 -07001012
Craig Tiller4509c472017-04-27 19:05:13 +00001013 if (specific_worker == NULL) {
1014 if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) {
Craig Tiller375eb252017-04-27 23:29:12 +00001015 grpc_pollset_worker *root_worker = pollset->root_worker;
1016 if (root_worker == NULL) {
Craig Tiller4509c472017-04-27 19:05:13 +00001017 pollset->kicked_without_poller = true;
Craig Tiller75aef7f2017-05-26 08:26:08 -07001018 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001019 gpr_log(GPR_ERROR, " .. kicked_without_poller");
Craig Tiller75aef7f2017-05-26 08:26:08 -07001020 }
yang-gdf92a642017-08-21 22:38:45 -07001021 goto done;
Craig Tiller375eb252017-04-27 23:29:12 +00001022 }
Craig Tiller32f90ee2017-04-28 12:46:41 -07001023 grpc_pollset_worker *next_worker = root_worker->next;
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -07001024 if (root_worker->state == KICKED) {
Craig Tiller75aef7f2017-05-26 08:26:08 -07001025 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001026 gpr_log(GPR_ERROR, " .. already kicked %p", root_worker);
1027 }
1028 SET_KICK_STATE(root_worker, KICKED);
yang-gdf92a642017-08-21 22:38:45 -07001029 goto done;
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -07001030 } else if (next_worker->state == KICKED) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001031 if (GRPC_TRACER_ON(grpc_polling_trace)) {
1032 gpr_log(GPR_ERROR, " .. already kicked %p", next_worker);
1033 }
1034 SET_KICK_STATE(next_worker, KICKED);
yang-gdf92a642017-08-21 22:38:45 -07001035 goto done;
Craig Tiller830e82a2017-05-31 16:26:27 -07001036 } else if (root_worker ==
1037 next_worker && // only try and wake up a poller if
1038 // there is no next worker
1039 root_worker == (grpc_pollset_worker *)gpr_atm_no_barrier_load(
1040 &g_active_poller)) {
1041 if (GRPC_TRACER_ON(grpc_polling_trace)) {
1042 gpr_log(GPR_ERROR, " .. kicked %p", root_worker);
Craig Tiller75aef7f2017-05-26 08:26:08 -07001043 }
Craig Tiller55624a32017-05-26 08:14:44 -07001044 SET_KICK_STATE(root_worker, KICKED);
yang-gdf92a642017-08-21 22:38:45 -07001045 ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1046 goto done;
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -07001047 } else if (next_worker->state == UNKICKED) {
Craig Tiller75aef7f2017-05-26 08:26:08 -07001048 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001049 gpr_log(GPR_ERROR, " .. kicked %p", next_worker);
Craig Tiller75aef7f2017-05-26 08:26:08 -07001050 }
Craig Tiller8502ecb2017-04-28 14:22:01 -07001051 GPR_ASSERT(next_worker->initialized_cv);
Craig Tiller55624a32017-05-26 08:14:44 -07001052 SET_KICK_STATE(next_worker, KICKED);
Craig Tiller375eb252017-04-27 23:29:12 +00001053 gpr_cv_signal(&next_worker->cv);
yang-gdf92a642017-08-21 22:38:45 -07001054 goto done;
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -07001055 } else if (next_worker->state == DESIGNATED_POLLER) {
1056 if (root_worker->state != DESIGNATED_POLLER) {
Craig Tiller75aef7f2017-05-26 08:26:08 -07001057 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001058 gpr_log(
1059 GPR_ERROR,
1060 " .. kicked root non-poller %p (initialized_cv=%d) (poller=%p)",
1061 root_worker, root_worker->initialized_cv, next_worker);
Craig Tiller75aef7f2017-05-26 08:26:08 -07001062 }
Craig Tiller55624a32017-05-26 08:14:44 -07001063 SET_KICK_STATE(root_worker, KICKED);
1064 if (root_worker->initialized_cv) {
1065 gpr_cv_signal(&root_worker->cv);
1066 }
yang-gdf92a642017-08-21 22:38:45 -07001067 goto done;
Craig Tiller55624a32017-05-26 08:14:44 -07001068 } else {
Craig Tiller75aef7f2017-05-26 08:26:08 -07001069 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001070 gpr_log(GPR_ERROR, " .. non-root poller %p (root=%p)", next_worker,
Craig Tiller75aef7f2017-05-26 08:26:08 -07001071 root_worker);
1072 }
Craig Tiller55624a32017-05-26 08:14:44 -07001073 SET_KICK_STATE(next_worker, KICKED);
yang-gdf92a642017-08-21 22:38:45 -07001074 ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1075 goto done;
Craig Tiller55624a32017-05-26 08:14:44 -07001076 }
Craig Tiller8502ecb2017-04-28 14:22:01 -07001077 } else {
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -07001078 GPR_ASSERT(next_worker->state == KICKED);
Craig Tiller55624a32017-05-26 08:14:44 -07001079 SET_KICK_STATE(next_worker, KICKED);
yang-gdf92a642017-08-21 22:38:45 -07001080 goto done;
Craig Tiller4509c472017-04-27 19:05:13 +00001081 }
1082 } else {
Craig Tiller830e82a2017-05-31 16:26:27 -07001083 if (GRPC_TRACER_ON(grpc_polling_trace)) {
1084 gpr_log(GPR_ERROR, " .. kicked while waking up");
1085 }
yang-gdf92a642017-08-21 22:38:45 -07001086 goto done;
Craig Tiller4509c472017-04-27 19:05:13 +00001087 }
Sree Kuchibhotlafb349402017-09-06 10:58:06 -07001088
1089 GPR_UNREACHABLE_CODE(goto done);
1090 }
1091
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -07001092 if (specific_worker->state == KICKED) {
Craig Tiller75aef7f2017-05-26 08:26:08 -07001093 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001094 gpr_log(GPR_ERROR, " .. specific worker already kicked");
Craig Tiller75aef7f2017-05-26 08:26:08 -07001095 }
yang-gdf92a642017-08-21 22:38:45 -07001096 goto done;
Craig Tiller4509c472017-04-27 19:05:13 +00001097 } else if (gpr_tls_get(&g_current_thread_worker) ==
1098 (intptr_t)specific_worker) {
Craig Tiller75aef7f2017-05-26 08:26:08 -07001099 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001100 gpr_log(GPR_ERROR, " .. mark %p kicked", specific_worker);
Craig Tiller75aef7f2017-05-26 08:26:08 -07001101 }
Craig Tiller55624a32017-05-26 08:14:44 -07001102 SET_KICK_STATE(specific_worker, KICKED);
yang-gdf92a642017-08-21 22:38:45 -07001103 goto done;
Craig Tiller32f90ee2017-04-28 12:46:41 -07001104 } else if (specific_worker ==
1105 (grpc_pollset_worker *)gpr_atm_no_barrier_load(&g_active_poller)) {
Craig Tiller75aef7f2017-05-26 08:26:08 -07001106 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001107 gpr_log(GPR_ERROR, " .. kick active poller");
Craig Tiller75aef7f2017-05-26 08:26:08 -07001108 }
Craig Tiller55624a32017-05-26 08:14:44 -07001109 SET_KICK_STATE(specific_worker, KICKED);
yang-gdf92a642017-08-21 22:38:45 -07001110 ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1111 goto done;
Craig Tiller8502ecb2017-04-28 14:22:01 -07001112 } else if (specific_worker->initialized_cv) {
Craig Tiller75aef7f2017-05-26 08:26:08 -07001113 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001114 gpr_log(GPR_ERROR, " .. kick waiting worker");
Craig Tiller75aef7f2017-05-26 08:26:08 -07001115 }
Craig Tiller55624a32017-05-26 08:14:44 -07001116 SET_KICK_STATE(specific_worker, KICKED);
Craig Tiller4509c472017-04-27 19:05:13 +00001117 gpr_cv_signal(&specific_worker->cv);
yang-gdf92a642017-08-21 22:38:45 -07001118 goto done;
Craig Tiller8502ecb2017-04-28 14:22:01 -07001119 } else {
Craig Tiller75aef7f2017-05-26 08:26:08 -07001120 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -07001121 gpr_log(GPR_ERROR, " .. kick non-waiting worker");
Craig Tiller75aef7f2017-05-26 08:26:08 -07001122 }
Craig Tiller55624a32017-05-26 08:14:44 -07001123 SET_KICK_STATE(specific_worker, KICKED);
yang-gdf92a642017-08-21 22:38:45 -07001124 goto done;
Craig Tiller4509c472017-04-27 19:05:13 +00001125 }
yang-gdf92a642017-08-21 22:38:45 -07001126done:
1127 GPR_TIMER_END("pollset_kick", 0);
1128 return ret_err;
Craig Tiller4509c472017-04-27 19:05:13 +00001129}
1130
1131static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1132 grpc_fd *fd) {}
1133
Craig Tiller4509c472017-04-27 19:05:13 +00001134/*******************************************************************************
Craig Tillerc67cc992017-04-27 10:15:51 -07001135 * Pollset-set Definitions
1136 */
1137
1138static grpc_pollset_set *pollset_set_create(void) {
1139 return (grpc_pollset_set *)((intptr_t)0xdeafbeef);
1140}
1141
1142static void pollset_set_destroy(grpc_exec_ctx *exec_ctx,
1143 grpc_pollset_set *pss) {}
1144
1145static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
1146 grpc_fd *fd) {}
1147
1148static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
1149 grpc_fd *fd) {}
1150
1151static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
1152 grpc_pollset_set *pss, grpc_pollset *ps) {}
1153
1154static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
1155 grpc_pollset_set *pss, grpc_pollset *ps) {}
1156
1157static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
1158 grpc_pollset_set *bag,
1159 grpc_pollset_set *item) {}
1160
1161static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
1162 grpc_pollset_set *bag,
1163 grpc_pollset_set *item) {}
1164
1165/*******************************************************************************
1166 * Event engine binding
1167 */
1168
1169static void shutdown_engine(void) {
1170 fd_global_shutdown();
1171 pollset_global_shutdown();
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -07001172 epoll_set_shutdown();
Craig Tillerc67cc992017-04-27 10:15:51 -07001173}
1174
1175static const grpc_event_engine_vtable vtable = {
Yash Tibrewal533d1182017-09-18 10:48:22 -07001176 sizeof(grpc_pollset),
Craig Tillerc67cc992017-04-27 10:15:51 -07001177
Yash Tibrewal533d1182017-09-18 10:48:22 -07001178 fd_create,
1179 fd_wrapped_fd,
1180 fd_orphan,
1181 fd_shutdown,
1182 fd_notify_on_read,
1183 fd_notify_on_write,
1184 fd_is_shutdown,
1185 fd_get_read_notifier_pollset,
Craig Tillerc67cc992017-04-27 10:15:51 -07001186
Yash Tibrewal533d1182017-09-18 10:48:22 -07001187 pollset_init,
1188 pollset_shutdown,
1189 pollset_destroy,
1190 pollset_work,
1191 pollset_kick,
1192 pollset_add_fd,
Craig Tillerc67cc992017-04-27 10:15:51 -07001193
Yash Tibrewal533d1182017-09-18 10:48:22 -07001194 pollset_set_create,
1195 pollset_set_destroy,
1196 pollset_set_add_pollset,
1197 pollset_set_del_pollset,
1198 pollset_set_add_pollset_set,
1199 pollset_set_del_pollset_set,
1200 pollset_set_add_fd,
1201 pollset_set_del_fd,
Craig Tillerc67cc992017-04-27 10:15:51 -07001202
Yash Tibrewal533d1182017-09-18 10:48:22 -07001203 shutdown_engine,
Craig Tillerc67cc992017-04-27 10:15:51 -07001204};
1205
1206/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -07001207 * Create epoll_fd (epoll_set_init() takes care of that) to make sure epoll
1208 * support is available */
Craig Tiller6f0af492017-04-27 19:26:16 +00001209const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) {
Craig Tillerc67cc992017-04-27 10:15:51 -07001210 if (!grpc_has_wakeup_fd()) {
1211 return NULL;
1212 }
1213
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -07001214 if (!epoll_set_init()) {
Craig Tillerc67cc992017-04-27 10:15:51 -07001215 return NULL;
1216 }
1217
Craig Tillerc67cc992017-04-27 10:15:51 -07001218 fd_global_init();
1219
1220 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
Craig Tiller4509c472017-04-27 19:05:13 +00001221 fd_global_shutdown();
Sree Kuchibhotla5efc9132017-08-17 14:10:38 -07001222 epoll_set_shutdown();
Craig Tillerc67cc992017-04-27 10:15:51 -07001223 return NULL;
1224 }
1225
1226 return &vtable;
1227}
1228
1229#else /* defined(GRPC_LINUX_EPOLL) */
1230#if defined(GRPC_POSIX_SOCKET)
1231#include "src/core/lib/iomgr/ev_posix.h"
1232/* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
1233 * NULL */
Craig Tiller9ddb3152017-04-27 21:32:56 +00001234const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) {
1235 return NULL;
1236}
Craig Tillerc67cc992017-04-27 10:15:51 -07001237#endif /* defined(GRPC_POSIX_SOCKET) */
1238#endif /* !defined(GRPC_LINUX_EPOLL) */