blob: dfe2d143a5090fafbb0b8dd502844103d76c2040 [file] [log] [blame]
Craig Tillerc67cc992017-04-27 10:15:51 -07001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2017 gRPC authors.
Craig Tillerc67cc992017-04-27 10:15:51 -07004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
Craig Tillerc67cc992017-04-27 10:15:51 -07008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
Craig Tillerc67cc992017-04-27 10:15:51 -070010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
Craig Tillerc67cc992017-04-27 10:15:51 -070016 *
17 */
18
19#include "src/core/lib/iomgr/port.h"
20
21/* This polling engine is only relevant on linux kernels supporting epoll() */
22#ifdef GRPC_LINUX_EPOLL
23
Craig Tiller4509c472017-04-27 19:05:13 +000024#include "src/core/lib/iomgr/ev_epoll1_linux.h"
Craig Tillerc67cc992017-04-27 10:15:51 -070025
26#include <assert.h>
27#include <errno.h>
28#include <poll.h>
29#include <pthread.h>
30#include <string.h>
31#include <sys/epoll.h>
32#include <sys/socket.h>
33#include <unistd.h>
34
35#include <grpc/support/alloc.h>
Craig Tiller6de05932017-04-28 09:17:38 -070036#include <grpc/support/cpu.h>
Craig Tillerc67cc992017-04-27 10:15:51 -070037#include <grpc/support/log.h>
38#include <grpc/support/string_util.h>
39#include <grpc/support/tls.h>
40#include <grpc/support/useful.h>
41
42#include "src/core/lib/iomgr/ev_posix.h"
43#include "src/core/lib/iomgr/iomgr_internal.h"
44#include "src/core/lib/iomgr/lockfree_event.h"
Craig Tillerc67cc992017-04-27 10:15:51 -070045#include "src/core/lib/iomgr/wakeup_fd_posix.h"
Craig Tillerc67cc992017-04-27 10:15:51 -070046#include "src/core/lib/profiling/timers.h"
47#include "src/core/lib/support/block_annotate.h"
Craig Tillerb89bac02017-05-26 15:20:32 +000048#include "src/core/lib/support/string.h"
Craig Tillerc67cc992017-04-27 10:15:51 -070049
Craig Tillerc67cc992017-04-27 10:15:51 -070050static grpc_wakeup_fd global_wakeup_fd;
51static int g_epfd;
52
53/*******************************************************************************
54 * Fd Declarations
55 */
56
57struct grpc_fd {
58 int fd;
59
Craig Tillerc67cc992017-04-27 10:15:51 -070060 gpr_atm read_closure;
61 gpr_atm write_closure;
62
63 struct grpc_fd *freelist_next;
Craig Tillerc67cc992017-04-27 10:15:51 -070064
65 /* The pollset that last noticed that the fd is readable. The actual type
66 * stored in this is (grpc_pollset *) */
67 gpr_atm read_notifier_pollset;
68
69 grpc_iomgr_object iomgr_object;
70};
71
72static void fd_global_init(void);
73static void fd_global_shutdown(void);
74
75/*******************************************************************************
76 * Pollset Declarations
77 */
78
Craig Tiller43bf2592017-04-28 23:21:01 +000079typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state;
Craig Tillerc67cc992017-04-27 10:15:51 -070080
Craig Tiller830e82a2017-05-31 16:26:27 -070081static const char *kick_state_string(kick_state st) {
82 switch (st) {
83 case UNKICKED:
84 return "UNKICKED";
85 case KICKED:
86 return "KICKED";
87 case DESIGNATED_POLLER:
88 return "DESIGNATED_POLLER";
89 }
90 GPR_UNREACHABLE_CODE(return "UNKNOWN");
91}
92
Craig Tillerc67cc992017-04-27 10:15:51 -070093struct grpc_pollset_worker {
Craig Tiller32f90ee2017-04-28 12:46:41 -070094 kick_state kick_state;
Craig Tiller55624a32017-05-26 08:14:44 -070095 int kick_state_mutator; // which line of code last changed kick state
Craig Tillerc67cc992017-04-27 10:15:51 -070096 bool initialized_cv;
Craig Tiller32f90ee2017-04-28 12:46:41 -070097 grpc_pollset_worker *next;
98 grpc_pollset_worker *prev;
Craig Tillerc67cc992017-04-27 10:15:51 -070099 gpr_cv cv;
Craig Tiller50da5ec2017-05-01 13:51:14 -0700100 grpc_closure_list schedule_on_end_work;
Craig Tillerc67cc992017-04-27 10:15:51 -0700101};
102
Craig Tiller55624a32017-05-26 08:14:44 -0700103#define SET_KICK_STATE(worker, state) \
104 do { \
105 (worker)->kick_state = (state); \
106 (worker)->kick_state_mutator = __LINE__; \
107 } while (false)
108
Craig Tillerba550da2017-05-01 14:26:31 +0000109#define MAX_NEIGHBOURHOODS 1024
110
Craig Tiller6de05932017-04-28 09:17:38 -0700111typedef struct pollset_neighbourhood {
112 gpr_mu mu;
113 grpc_pollset *active_root;
Craig Tiller6de05932017-04-28 09:17:38 -0700114 char pad[GPR_CACHELINE_SIZE];
115} pollset_neighbourhood;
116
Craig Tillerc67cc992017-04-27 10:15:51 -0700117struct grpc_pollset {
Craig Tiller6de05932017-04-28 09:17:38 -0700118 gpr_mu mu;
119 pollset_neighbourhood *neighbourhood;
Craig Tillere00d7332017-05-01 15:43:51 +0000120 bool reassigning_neighbourhood;
Craig Tiller4509c472017-04-27 19:05:13 +0000121 grpc_pollset_worker *root_worker;
122 bool kicked_without_poller;
Craig Tiller6de05932017-04-28 09:17:38 -0700123 bool seen_inactive;
Craig Tillerc67cc992017-04-27 10:15:51 -0700124 bool shutting_down; /* Is the pollset shutting down ? */
125 bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
Craig Tiller4509c472017-04-27 19:05:13 +0000126 grpc_closure *shutdown_closure; /* Called after after shutdown is complete */
Craig Tillerba550da2017-05-01 14:26:31 +0000127 int begin_refs;
Craig Tiller6de05932017-04-28 09:17:38 -0700128
129 grpc_pollset *next;
130 grpc_pollset *prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700131};
132
133/*******************************************************************************
134 * Pollset-set Declarations
135 */
Craig Tiller6de05932017-04-28 09:17:38 -0700136
Craig Tiller61f96c12017-05-12 13:36:39 -0700137struct grpc_pollset_set {
138 char unused;
139};
Craig Tillerc67cc992017-04-27 10:15:51 -0700140
141/*******************************************************************************
142 * Common helpers
143 */
144
145static bool append_error(grpc_error **composite, grpc_error *error,
146 const char *desc) {
147 if (error == GRPC_ERROR_NONE) return true;
148 if (*composite == GRPC_ERROR_NONE) {
149 *composite = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc);
150 }
151 *composite = grpc_error_add_child(*composite, error);
152 return false;
153}
154
155/*******************************************************************************
156 * Fd Definitions
157 */
158
159/* We need to keep a freelist not because of any concerns of malloc performance
160 * but instead so that implementations with multiple threads in (for example)
161 * epoll_wait deal with the race between pollset removal and incoming poll
162 * notifications.
163 *
164 * The problem is that the poller ultimately holds a reference to this
165 * object, so it is very difficult to know when is safe to free it, at least
166 * without some expensive synchronization.
167 *
168 * If we keep the object freelisted, in the worst case losing this race just
169 * becomes a spurious read notification on a reused fd.
170 */
171
172/* The alarm system needs to be able to wakeup 'some poller' sometimes
173 * (specifically when a new alarm needs to be triggered earlier than the next
174 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
175 * case occurs. */
176
177static grpc_fd *fd_freelist = NULL;
178static gpr_mu fd_freelist_mu;
179
Craig Tillerc67cc992017-04-27 10:15:51 -0700180static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
181
182static void fd_global_shutdown(void) {
183 gpr_mu_lock(&fd_freelist_mu);
184 gpr_mu_unlock(&fd_freelist_mu);
185 while (fd_freelist != NULL) {
186 grpc_fd *fd = fd_freelist;
187 fd_freelist = fd_freelist->freelist_next;
Craig Tillerc67cc992017-04-27 10:15:51 -0700188 gpr_free(fd);
189 }
190 gpr_mu_destroy(&fd_freelist_mu);
191}
192
193static grpc_fd *fd_create(int fd, const char *name) {
194 grpc_fd *new_fd = NULL;
195
196 gpr_mu_lock(&fd_freelist_mu);
197 if (fd_freelist != NULL) {
198 new_fd = fd_freelist;
199 fd_freelist = fd_freelist->freelist_next;
200 }
201 gpr_mu_unlock(&fd_freelist_mu);
202
203 if (new_fd == NULL) {
204 new_fd = gpr_malloc(sizeof(grpc_fd));
Craig Tillerc67cc992017-04-27 10:15:51 -0700205 }
206
Craig Tillerc67cc992017-04-27 10:15:51 -0700207 new_fd->fd = fd;
Craig Tillerc67cc992017-04-27 10:15:51 -0700208 grpc_lfev_init(&new_fd->read_closure);
209 grpc_lfev_init(&new_fd->write_closure);
210 gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
211
212 new_fd->freelist_next = NULL;
Craig Tillerc67cc992017-04-27 10:15:51 -0700213
214 char *fd_name;
215 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
216 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
Noah Eisen264879f2017-06-20 17:14:47 -0700217#ifndef NDEBUG
218 if (GRPC_TRACER_ON(grpc_trace_fd_refcount)) {
219 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, new_fd, fd_name);
220 }
Craig Tillerc67cc992017-04-27 10:15:51 -0700221#endif
222 gpr_free(fd_name);
Craig Tiller9ddb3152017-04-27 21:32:56 +0000223
224 struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET),
225 .data.ptr = new_fd};
226 if (epoll_ctl(g_epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
227 gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno));
228 }
229
Craig Tillerc67cc992017-04-27 10:15:51 -0700230 return new_fd;
231}
232
Craig Tiller4509c472017-04-27 19:05:13 +0000233static int fd_wrapped_fd(grpc_fd *fd) { return fd->fd; }
Craig Tillerc67cc992017-04-27 10:15:51 -0700234
Craig Tiller9ddb3152017-04-27 21:32:56 +0000235/* Might be called multiple times */
236static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
237 if (grpc_lfev_set_shutdown(exec_ctx, &fd->read_closure,
238 GRPC_ERROR_REF(why))) {
239 shutdown(fd->fd, SHUT_RDWR);
240 grpc_lfev_set_shutdown(exec_ctx, &fd->write_closure, GRPC_ERROR_REF(why));
241 }
242 GRPC_ERROR_UNREF(why);
243}
244
Craig Tillerc67cc992017-04-27 10:15:51 -0700245static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
246 grpc_closure *on_done, int *release_fd,
247 const char *reason) {
Craig Tillerc67cc992017-04-27 10:15:51 -0700248 grpc_error *error = GRPC_ERROR_NONE;
Craig Tillerc67cc992017-04-27 10:15:51 -0700249
Craig Tiller9ddb3152017-04-27 21:32:56 +0000250 if (!grpc_lfev_is_shutdown(&fd->read_closure)) {
251 fd_shutdown(exec_ctx, fd, GRPC_ERROR_CREATE_FROM_COPIED_STRING(reason));
252 }
253
Craig Tillerc67cc992017-04-27 10:15:51 -0700254 /* If release_fd is not NULL, we should be relinquishing control of the file
255 descriptor fd->fd (but we still own the grpc_fd structure). */
256 if (release_fd != NULL) {
257 *release_fd = fd->fd;
258 } else {
259 close(fd->fd);
Craig Tillerc67cc992017-04-27 10:15:51 -0700260 }
261
ncteisen274bbbe2017-06-08 14:57:11 -0700262 GRPC_CLOSURE_SCHED(exec_ctx, on_done, GRPC_ERROR_REF(error));
Craig Tillerc67cc992017-04-27 10:15:51 -0700263
Craig Tiller4509c472017-04-27 19:05:13 +0000264 grpc_iomgr_unregister_object(&fd->iomgr_object);
265 grpc_lfev_destroy(&fd->read_closure);
266 grpc_lfev_destroy(&fd->write_closure);
Craig Tillerc67cc992017-04-27 10:15:51 -0700267
Craig Tiller4509c472017-04-27 19:05:13 +0000268 gpr_mu_lock(&fd_freelist_mu);
269 fd->freelist_next = fd_freelist;
270 fd_freelist = fd;
271 gpr_mu_unlock(&fd_freelist_mu);
Craig Tillerc67cc992017-04-27 10:15:51 -0700272}
273
274static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
275 grpc_fd *fd) {
276 gpr_atm notifier = gpr_atm_acq_load(&fd->read_notifier_pollset);
277 return (grpc_pollset *)notifier;
278}
279
280static bool fd_is_shutdown(grpc_fd *fd) {
281 return grpc_lfev_is_shutdown(&fd->read_closure);
282}
283
Craig Tillerc67cc992017-04-27 10:15:51 -0700284static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
285 grpc_closure *closure) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700286 grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read");
Craig Tillerc67cc992017-04-27 10:15:51 -0700287}
288
289static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
290 grpc_closure *closure) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700291 grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write");
Craig Tillerc67cc992017-04-27 10:15:51 -0700292}
293
Craig Tiller4509c472017-04-27 19:05:13 +0000294static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
295 grpc_pollset *notifier) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700296 grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read");
Craig Tiller4509c472017-04-27 19:05:13 +0000297
298 /* Note, it is possible that fd_become_readable might be called twice with
299 different 'notifier's when an fd becomes readable and it is in two epoll
300 sets (This can happen briefly during polling island merges). In such cases
301 it does not really matter which notifer is set as the read_notifier_pollset
302 (They would both point to the same polling island anyway) */
303 /* Use release store to match with acquire load in fd_get_read_notifier */
304 gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
305}
306
307static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700308 grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write");
Craig Tillerc67cc992017-04-27 10:15:51 -0700309}
310
311/*******************************************************************************
312 * Pollset Definitions
313 */
314
Craig Tiller6de05932017-04-28 09:17:38 -0700315GPR_TLS_DECL(g_current_thread_pollset);
316GPR_TLS_DECL(g_current_thread_worker);
317static gpr_atm g_active_poller;
318static pollset_neighbourhood *g_neighbourhoods;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700319static size_t g_num_neighbourhoods;
Craig Tiller6de05932017-04-28 09:17:38 -0700320
Craig Tillerc67cc992017-04-27 10:15:51 -0700321/* Return true if first in list */
Craig Tiller32f90ee2017-04-28 12:46:41 -0700322static bool worker_insert(grpc_pollset *pollset, grpc_pollset_worker *worker) {
323 if (pollset->root_worker == NULL) {
324 pollset->root_worker = worker;
325 worker->next = worker->prev = worker;
Craig Tillerc67cc992017-04-27 10:15:51 -0700326 return true;
327 } else {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700328 worker->next = pollset->root_worker;
329 worker->prev = worker->next->prev;
330 worker->next->prev = worker;
331 worker->prev->next = worker;
Craig Tillerc67cc992017-04-27 10:15:51 -0700332 return false;
333 }
334}
335
336/* Return true if last in list */
337typedef enum { EMPTIED, NEW_ROOT, REMOVED } worker_remove_result;
338
Craig Tiller32f90ee2017-04-28 12:46:41 -0700339static worker_remove_result worker_remove(grpc_pollset *pollset,
Craig Tillerc67cc992017-04-27 10:15:51 -0700340 grpc_pollset_worker *worker) {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700341 if (worker == pollset->root_worker) {
342 if (worker == worker->next) {
343 pollset->root_worker = NULL;
Craig Tillerc67cc992017-04-27 10:15:51 -0700344 return EMPTIED;
345 } else {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700346 pollset->root_worker = worker->next;
347 worker->prev->next = worker->next;
348 worker->next->prev = worker->prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700349 return NEW_ROOT;
350 }
351 } else {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700352 worker->prev->next = worker->next;
353 worker->next->prev = worker->prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700354 return REMOVED;
355 }
356}
357
Craig Tillerba550da2017-05-01 14:26:31 +0000358static size_t choose_neighbourhood(void) {
359 return (size_t)gpr_cpu_current_cpu() % g_num_neighbourhoods;
360}
361
Craig Tiller4509c472017-04-27 19:05:13 +0000362static grpc_error *pollset_global_init(void) {
Craig Tiller4509c472017-04-27 19:05:13 +0000363 gpr_tls_init(&g_current_thread_pollset);
364 gpr_tls_init(&g_current_thread_worker);
Craig Tiller6de05932017-04-28 09:17:38 -0700365 gpr_atm_no_barrier_store(&g_active_poller, 0);
Craig Tiller375eb252017-04-27 23:29:12 +0000366 global_wakeup_fd.read_fd = -1;
367 grpc_error *err = grpc_wakeup_fd_init(&global_wakeup_fd);
368 if (err != GRPC_ERROR_NONE) return err;
Craig Tiller4509c472017-04-27 19:05:13 +0000369 struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLET),
370 .data.ptr = &global_wakeup_fd};
371 if (epoll_ctl(g_epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd, &ev) != 0) {
372 return GRPC_OS_ERROR(errno, "epoll_ctl");
373 }
Craig Tillerba550da2017-05-01 14:26:31 +0000374 g_num_neighbourhoods = GPR_CLAMP(gpr_cpu_num_cores(), 1, MAX_NEIGHBOURHOODS);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700375 g_neighbourhoods =
376 gpr_zalloc(sizeof(*g_neighbourhoods) * g_num_neighbourhoods);
377 for (size_t i = 0; i < g_num_neighbourhoods; i++) {
378 gpr_mu_init(&g_neighbourhoods[i].mu);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700379 }
Craig Tiller4509c472017-04-27 19:05:13 +0000380 return GRPC_ERROR_NONE;
381}
382
383static void pollset_global_shutdown(void) {
Craig Tiller4509c472017-04-27 19:05:13 +0000384 gpr_tls_destroy(&g_current_thread_pollset);
385 gpr_tls_destroy(&g_current_thread_worker);
Craig Tiller375eb252017-04-27 23:29:12 +0000386 if (global_wakeup_fd.read_fd != -1) grpc_wakeup_fd_destroy(&global_wakeup_fd);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700387 for (size_t i = 0; i < g_num_neighbourhoods; i++) {
388 gpr_mu_destroy(&g_neighbourhoods[i].mu);
389 }
390 gpr_free(g_neighbourhoods);
Craig Tiller4509c472017-04-27 19:05:13 +0000391}
392
393static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
Craig Tiller6de05932017-04-28 09:17:38 -0700394 gpr_mu_init(&pollset->mu);
395 *mu = &pollset->mu;
Craig Tillerba550da2017-05-01 14:26:31 +0000396 pollset->neighbourhood = &g_neighbourhoods[choose_neighbourhood()];
Craig Tiller6de05932017-04-28 09:17:38 -0700397 pollset->seen_inactive = true;
Craig Tiller6de05932017-04-28 09:17:38 -0700398}
399
Craig Tillerc6109852017-05-01 14:26:49 -0700400static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
Craig Tillere00d7332017-05-01 15:43:51 +0000401 gpr_mu_lock(&pollset->mu);
Craig Tillerba550da2017-05-01 14:26:31 +0000402 if (!pollset->seen_inactive) {
Craig Tillere00d7332017-05-01 15:43:51 +0000403 pollset_neighbourhood *neighbourhood = pollset->neighbourhood;
404 gpr_mu_unlock(&pollset->mu);
Craig Tillera95bacf2017-05-01 12:51:24 -0700405 retry_lock_neighbourhood:
Craig Tillere00d7332017-05-01 15:43:51 +0000406 gpr_mu_lock(&neighbourhood->mu);
407 gpr_mu_lock(&pollset->mu);
408 if (!pollset->seen_inactive) {
409 if (pollset->neighbourhood != neighbourhood) {
410 gpr_mu_unlock(&neighbourhood->mu);
411 neighbourhood = pollset->neighbourhood;
412 gpr_mu_unlock(&pollset->mu);
413 goto retry_lock_neighbourhood;
414 }
415 pollset->prev->next = pollset->next;
416 pollset->next->prev = pollset->prev;
417 if (pollset == pollset->neighbourhood->active_root) {
418 pollset->neighbourhood->active_root =
419 pollset->next == pollset ? NULL : pollset->next;
420 }
Craig Tillerba550da2017-05-01 14:26:31 +0000421 }
422 gpr_mu_unlock(&pollset->neighbourhood->mu);
Craig Tiller6de05932017-04-28 09:17:38 -0700423 }
Craig Tillere00d7332017-05-01 15:43:51 +0000424 gpr_mu_unlock(&pollset->mu);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700425 gpr_mu_destroy(&pollset->mu);
Craig Tiller4509c472017-04-27 19:05:13 +0000426}
427
428static grpc_error *pollset_kick_all(grpc_pollset *pollset) {
429 grpc_error *error = GRPC_ERROR_NONE;
430 if (pollset->root_worker != NULL) {
431 grpc_pollset_worker *worker = pollset->root_worker;
432 do {
Craig Tiller55624a32017-05-26 08:14:44 -0700433 switch (worker->kick_state) {
434 case KICKED:
435 break;
436 case UNKICKED:
437 SET_KICK_STATE(worker, KICKED);
438 if (worker->initialized_cv) {
439 gpr_cv_signal(&worker->cv);
440 }
441 break;
442 case DESIGNATED_POLLER:
443 SET_KICK_STATE(worker, KICKED);
444 append_error(&error, grpc_wakeup_fd_wakeup(&global_wakeup_fd),
445 "pollset_shutdown");
446 break;
Craig Tiller4509c472017-04-27 19:05:13 +0000447 }
448
Craig Tiller32f90ee2017-04-28 12:46:41 -0700449 worker = worker->next;
Craig Tiller4509c472017-04-27 19:05:13 +0000450 } while (worker != pollset->root_worker);
451 }
452 return error;
453}
454
455static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx,
456 grpc_pollset *pollset) {
Craig Tillerba550da2017-05-01 14:26:31 +0000457 if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL &&
458 pollset->begin_refs == 0) {
ncteisen274bbbe2017-06-08 14:57:11 -0700459 GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE);
Craig Tiller4509c472017-04-27 19:05:13 +0000460 pollset->shutdown_closure = NULL;
461 }
462}
463
464static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
465 grpc_closure *closure) {
466 GPR_ASSERT(pollset->shutdown_closure == NULL);
Craig Tillerc81512a2017-05-26 09:53:58 -0700467 GPR_ASSERT(!pollset->shutting_down);
Craig Tiller4509c472017-04-27 19:05:13 +0000468 pollset->shutdown_closure = closure;
Craig Tillerc81512a2017-05-26 09:53:58 -0700469 pollset->shutting_down = true;
Craig Tiller4509c472017-04-27 19:05:13 +0000470 GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(pollset));
471 pollset_maybe_finish_shutdown(exec_ctx, pollset);
472}
473
Craig Tillera95bacf2017-05-01 12:51:24 -0700474#define MAX_EPOLL_EVENTS 100
Craig Tiller4509c472017-04-27 19:05:13 +0000475
476static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
477 gpr_timespec now) {
478 gpr_timespec timeout;
479 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
480 return -1;
481 }
482
483 if (gpr_time_cmp(deadline, now) <= 0) {
484 return 0;
485 }
486
487 static const gpr_timespec round_up = {
488 .clock_type = GPR_TIMESPAN, .tv_sec = 0, .tv_nsec = GPR_NS_PER_MS - 1};
489 timeout = gpr_time_sub(deadline, now);
490 int millis = gpr_time_to_millis(gpr_time_add(timeout, round_up));
491 return millis >= 1 ? millis : 1;
492}
493
494static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
495 gpr_timespec now, gpr_timespec deadline) {
496 struct epoll_event events[MAX_EPOLL_EVENTS];
497 static const char *err_desc = "pollset_poll";
498
499 int timeout = poll_deadline_to_millis_timeout(deadline, now);
500
501 if (timeout != 0) {
502 GRPC_SCHEDULING_START_BLOCKING_REGION;
503 }
504 int r;
505 do {
506 r = epoll_wait(g_epfd, events, MAX_EPOLL_EVENTS, timeout);
507 } while (r < 0 && errno == EINTR);
508 if (timeout != 0) {
509 GRPC_SCHEDULING_END_BLOCKING_REGION;
510 }
511
512 if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
513
514 grpc_error *error = GRPC_ERROR_NONE;
515 for (int i = 0; i < r; i++) {
516 void *data_ptr = events[i].data.ptr;
517 if (data_ptr == &global_wakeup_fd) {
Craig Tiller4509c472017-04-27 19:05:13 +0000518 append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
519 err_desc);
520 } else {
521 grpc_fd *fd = (grpc_fd *)(data_ptr);
522 bool cancel = (events[i].events & (EPOLLERR | EPOLLHUP)) != 0;
523 bool read_ev = (events[i].events & (EPOLLIN | EPOLLPRI)) != 0;
524 bool write_ev = (events[i].events & EPOLLOUT) != 0;
525 if (read_ev || cancel) {
526 fd_become_readable(exec_ctx, fd, pollset);
527 }
528 if (write_ev || cancel) {
529 fd_become_writable(exec_ctx, fd);
530 }
531 }
532 }
533
534 return error;
535}
536
537static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
538 grpc_pollset_worker **worker_hdl, gpr_timespec *now,
539 gpr_timespec deadline) {
Craig Tiller4509c472017-04-27 19:05:13 +0000540 if (worker_hdl != NULL) *worker_hdl = worker;
541 worker->initialized_cv = false;
Craig Tiller55624a32017-05-26 08:14:44 -0700542 SET_KICK_STATE(worker, UNKICKED);
Craig Tiller50da5ec2017-05-01 13:51:14 -0700543 worker->schedule_on_end_work = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
Craig Tillerba550da2017-05-01 14:26:31 +0000544 pollset->begin_refs++;
Craig Tiller4509c472017-04-27 19:05:13 +0000545
Craig Tiller830e82a2017-05-31 16:26:27 -0700546 if (GRPC_TRACER_ON(grpc_polling_trace)) {
547 gpr_log(GPR_ERROR, "PS:%p BEGIN_STARTS:%p", pollset, worker);
548 }
549
Craig Tiller32f90ee2017-04-28 12:46:41 -0700550 if (pollset->seen_inactive) {
551 // pollset has been observed to be inactive, we need to move back to the
552 // active list
Craig Tillere00d7332017-05-01 15:43:51 +0000553 bool is_reassigning = false;
554 if (!pollset->reassigning_neighbourhood) {
555 is_reassigning = true;
556 pollset->reassigning_neighbourhood = true;
557 pollset->neighbourhood = &g_neighbourhoods[choose_neighbourhood()];
558 }
559 pollset_neighbourhood *neighbourhood = pollset->neighbourhood;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700560 gpr_mu_unlock(&pollset->mu);
Craig Tillerba550da2017-05-01 14:26:31 +0000561 // pollset unlocked: state may change (even worker->kick_state)
562 retry_lock_neighbourhood:
Craig Tiller32f90ee2017-04-28 12:46:41 -0700563 gpr_mu_lock(&neighbourhood->mu);
564 gpr_mu_lock(&pollset->mu);
Craig Tiller830e82a2017-05-31 16:26:27 -0700565 if (GRPC_TRACER_ON(grpc_polling_trace)) {
566 gpr_log(GPR_ERROR, "PS:%p BEGIN_REORG:%p kick_state=%s is_reassigning=%d",
567 pollset, worker, kick_state_string(worker->kick_state),
568 is_reassigning);
569 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700570 if (pollset->seen_inactive) {
Craig Tiller2acab6e2017-04-30 23:06:33 +0000571 if (neighbourhood != pollset->neighbourhood) {
572 gpr_mu_unlock(&neighbourhood->mu);
573 neighbourhood = pollset->neighbourhood;
574 gpr_mu_unlock(&pollset->mu);
575 goto retry_lock_neighbourhood;
576 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700577 pollset->seen_inactive = false;
Craig Tiller2acab6e2017-04-30 23:06:33 +0000578 if (neighbourhood->active_root == NULL) {
579 neighbourhood->active_root = pollset->next = pollset->prev = pollset;
Craig Tiller55624a32017-05-26 08:14:44 -0700580 if (worker->kick_state == UNKICKED &&
581 gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)worker)) {
582 SET_KICK_STATE(worker, DESIGNATED_POLLER);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700583 }
Craig Tiller2acab6e2017-04-30 23:06:33 +0000584 } else {
585 pollset->next = neighbourhood->active_root;
586 pollset->prev = pollset->next->prev;
587 pollset->next->prev = pollset->prev->next = pollset;
Craig Tiller4509c472017-04-27 19:05:13 +0000588 }
589 }
Craig Tillere00d7332017-05-01 15:43:51 +0000590 if (is_reassigning) {
591 GPR_ASSERT(pollset->reassigning_neighbourhood);
592 pollset->reassigning_neighbourhood = false;
593 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700594 gpr_mu_unlock(&neighbourhood->mu);
595 }
596 worker_insert(pollset, worker);
Craig Tillerba550da2017-05-01 14:26:31 +0000597 pollset->begin_refs--;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700598 if (worker->kick_state == UNKICKED) {
Craig Tillera4b8eb02017-04-29 00:13:52 +0000599 GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700600 worker->initialized_cv = true;
601 gpr_cv_init(&worker->cv);
Craig Tillerc81512a2017-05-26 09:53:58 -0700602 while (worker->kick_state == UNKICKED && !pollset->shutting_down) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700603 if (GRPC_TRACER_ON(grpc_polling_trace)) {
604 gpr_log(GPR_ERROR, "PS:%p BEGIN_WAIT:%p kick_state=%s shutdown=%d",
605 pollset, worker, kick_state_string(worker->kick_state),
606 pollset->shutting_down);
607 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700608 if (gpr_cv_wait(&worker->cv, &pollset->mu, deadline) &&
609 worker->kick_state == UNKICKED) {
Craig Tiller55624a32017-05-26 08:14:44 -0700610 SET_KICK_STATE(worker, KICKED);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700611 }
Craig Tillerba550da2017-05-01 14:26:31 +0000612 }
Craig Tiller4509c472017-04-27 19:05:13 +0000613 *now = gpr_now(now->clock_type);
614 }
Craig Tiller830e82a2017-05-31 16:26:27 -0700615 if (GRPC_TRACER_ON(grpc_polling_trace)) {
616 gpr_log(GPR_ERROR, "PS:%p BEGIN_DONE:%p kick_state=%s shutdown=%d", pollset,
617 worker, kick_state_string(worker->kick_state),
618 pollset->shutting_down);
619 }
Craig Tiller4509c472017-04-27 19:05:13 +0000620
Craig Tillerc81512a2017-05-26 09:53:58 -0700621 return worker->kick_state == DESIGNATED_POLLER && !pollset->shutting_down;
Craig Tiller4509c472017-04-27 19:05:13 +0000622}
623
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700624static bool check_neighbourhood_for_available_poller(
Craig Tillera4b8eb02017-04-29 00:13:52 +0000625 pollset_neighbourhood *neighbourhood) {
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700626 bool found_worker = false;
627 do {
628 grpc_pollset *inspect = neighbourhood->active_root;
629 if (inspect == NULL) {
630 break;
631 }
632 gpr_mu_lock(&inspect->mu);
633 GPR_ASSERT(!inspect->seen_inactive);
634 grpc_pollset_worker *inspect_worker = inspect->root_worker;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700635 if (inspect_worker != NULL) {
Craig Tillera4b8eb02017-04-29 00:13:52 +0000636 do {
Craig Tillerba550da2017-05-01 14:26:31 +0000637 switch (inspect_worker->kick_state) {
638 case UNKICKED:
639 if (gpr_atm_no_barrier_cas(&g_active_poller, 0,
640 (gpr_atm)inspect_worker)) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700641 if (GRPC_TRACER_ON(grpc_polling_trace)) {
642 gpr_log(GPR_DEBUG, " .. choose next poller to be %p",
643 inspect_worker);
644 }
Craig Tiller55624a32017-05-26 08:14:44 -0700645 SET_KICK_STATE(inspect_worker, DESIGNATED_POLLER);
Craig Tillerba550da2017-05-01 14:26:31 +0000646 if (inspect_worker->initialized_cv) {
647 gpr_cv_signal(&inspect_worker->cv);
648 }
Craig Tiller830e82a2017-05-31 16:26:27 -0700649 } else {
650 if (GRPC_TRACER_ON(grpc_polling_trace)) {
651 gpr_log(GPR_DEBUG, " .. beaten to choose next poller");
652 }
Craig Tillera4b8eb02017-04-29 00:13:52 +0000653 }
Craig Tillerba550da2017-05-01 14:26:31 +0000654 // even if we didn't win the cas, there's a worker, we can stop
655 found_worker = true;
656 break;
657 case KICKED:
658 break;
659 case DESIGNATED_POLLER:
660 found_worker = true; // ok, so someone else found the worker, but
661 // we'll accept that
662 break;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700663 }
Craig Tillera4b8eb02017-04-29 00:13:52 +0000664 inspect_worker = inspect_worker->next;
Craig Tiller830e82a2017-05-31 16:26:27 -0700665 } while (!found_worker && inspect_worker != inspect->root_worker);
Craig Tillera4b8eb02017-04-29 00:13:52 +0000666 }
667 if (!found_worker) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700668 if (GRPC_TRACER_ON(grpc_polling_trace)) {
669 gpr_log(GPR_DEBUG, " .. mark pollset %p inactive", inspect);
670 }
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700671 inspect->seen_inactive = true;
Craig Tiller2acab6e2017-04-30 23:06:33 +0000672 if (inspect == neighbourhood->active_root) {
Craig Tillera95bacf2017-05-01 12:51:24 -0700673 neighbourhood->active_root =
674 inspect->next == inspect ? NULL : inspect->next;
Craig Tiller2acab6e2017-04-30 23:06:33 +0000675 }
676 inspect->next->prev = inspect->prev;
677 inspect->prev->next = inspect->next;
Craig Tillere00d7332017-05-01 15:43:51 +0000678 inspect->next = inspect->prev = NULL;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700679 }
680 gpr_mu_unlock(&inspect->mu);
681 } while (!found_worker);
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700682 return found_worker;
683}
684
Craig Tiller4509c472017-04-27 19:05:13 +0000685static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
686 grpc_pollset_worker *worker,
687 grpc_pollset_worker **worker_hdl) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700688 if (GRPC_TRACER_ON(grpc_polling_trace)) {
689 gpr_log(GPR_DEBUG, "PS:%p END_WORKER:%p", pollset, worker);
690 }
Craig Tiller8502ecb2017-04-28 14:22:01 -0700691 if (worker_hdl != NULL) *worker_hdl = NULL;
Craig Tiller830e82a2017-05-31 16:26:27 -0700692 /* Make sure we appear kicked */
Craig Tiller55624a32017-05-26 08:14:44 -0700693 SET_KICK_STATE(worker, KICKED);
Craig Tiller50da5ec2017-05-01 13:51:14 -0700694 grpc_closure_list_move(&worker->schedule_on_end_work,
695 &exec_ctx->closure_list);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700696 if (gpr_atm_no_barrier_load(&g_active_poller) == (gpr_atm)worker) {
Craig Tillera4b8eb02017-04-29 00:13:52 +0000697 if (worker->next != worker && worker->next->kick_state == UNKICKED) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700698 if (GRPC_TRACER_ON(grpc_polling_trace)) {
699 gpr_log(GPR_DEBUG, " .. choose next poller to be peer %p", worker);
700 }
Craig Tiller2acab6e2017-04-30 23:06:33 +0000701 GPR_ASSERT(worker->next->initialized_cv);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700702 gpr_atm_no_barrier_store(&g_active_poller, (gpr_atm)worker->next);
Craig Tiller55624a32017-05-26 08:14:44 -0700703 SET_KICK_STATE(worker->next, DESIGNATED_POLLER);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700704 gpr_cv_signal(&worker->next->cv);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700705 if (grpc_exec_ctx_has_work(exec_ctx)) {
706 gpr_mu_unlock(&pollset->mu);
707 grpc_exec_ctx_flush(exec_ctx);
708 gpr_mu_lock(&pollset->mu);
709 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700710 } else {
711 gpr_atm_no_barrier_store(&g_active_poller, 0);
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700712 size_t poller_neighbourhood_idx =
713 (size_t)(pollset->neighbourhood - g_neighbourhoods);
Craig Tillerbb742672017-05-17 22:19:05 +0000714 gpr_mu_unlock(&pollset->mu);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700715 bool found_worker = false;
Craig Tillerba550da2017-05-01 14:26:31 +0000716 bool scan_state[MAX_NEIGHBOURHOODS];
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700717 for (size_t i = 0; !found_worker && i < g_num_neighbourhoods; i++) {
718 pollset_neighbourhood *neighbourhood =
719 &g_neighbourhoods[(poller_neighbourhood_idx + i) %
720 g_num_neighbourhoods];
721 if (gpr_mu_trylock(&neighbourhood->mu)) {
722 found_worker =
Craig Tillera4b8eb02017-04-29 00:13:52 +0000723 check_neighbourhood_for_available_poller(neighbourhood);
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700724 gpr_mu_unlock(&neighbourhood->mu);
Craig Tillerba550da2017-05-01 14:26:31 +0000725 scan_state[i] = true;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700726 } else {
Craig Tillerba550da2017-05-01 14:26:31 +0000727 scan_state[i] = false;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700728 }
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700729 }
Craig Tiller2acab6e2017-04-30 23:06:33 +0000730 for (size_t i = 0; !found_worker && i < g_num_neighbourhoods; i++) {
Craig Tillerba550da2017-05-01 14:26:31 +0000731 if (scan_state[i]) continue;
Craig Tiller2acab6e2017-04-30 23:06:33 +0000732 pollset_neighbourhood *neighbourhood =
733 &g_neighbourhoods[(poller_neighbourhood_idx + i) %
734 g_num_neighbourhoods];
735 gpr_mu_lock(&neighbourhood->mu);
Craig Tillerba550da2017-05-01 14:26:31 +0000736 found_worker = check_neighbourhood_for_available_poller(neighbourhood);
Craig Tiller2acab6e2017-04-30 23:06:33 +0000737 gpr_mu_unlock(&neighbourhood->mu);
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700738 }
Craig Tiller8502ecb2017-04-28 14:22:01 -0700739 grpc_exec_ctx_flush(exec_ctx);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700740 gpr_mu_lock(&pollset->mu);
741 }
Craig Tiller50da5ec2017-05-01 13:51:14 -0700742 } else if (grpc_exec_ctx_has_work(exec_ctx)) {
743 gpr_mu_unlock(&pollset->mu);
744 grpc_exec_ctx_flush(exec_ctx);
745 gpr_mu_lock(&pollset->mu);
Craig Tiller4509c472017-04-27 19:05:13 +0000746 }
747 if (worker->initialized_cv) {
748 gpr_cv_destroy(&worker->cv);
749 }
Craig Tiller830e82a2017-05-31 16:26:27 -0700750 if (GRPC_TRACER_ON(grpc_polling_trace)) {
751 gpr_log(GPR_DEBUG, " .. remove worker");
752 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700753 if (EMPTIED == worker_remove(pollset, worker)) {
Craig Tiller4509c472017-04-27 19:05:13 +0000754 pollset_maybe_finish_shutdown(exec_ctx, pollset);
755 }
Craig Tillera4b8eb02017-04-29 00:13:52 +0000756 GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
Craig Tiller4509c472017-04-27 19:05:13 +0000757}
758
759/* pollset->po.mu lock must be held by the caller before calling this.
760 The function pollset_work() may temporarily release the lock (pollset->po.mu)
761 during the course of its execution but it will always re-acquire the lock and
762 ensure that it is held by the time the function returns */
763static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
764 grpc_pollset_worker **worker_hdl,
765 gpr_timespec now, gpr_timespec deadline) {
766 grpc_pollset_worker worker;
767 grpc_error *error = GRPC_ERROR_NONE;
768 static const char *err_desc = "pollset_work";
769 if (pollset->kicked_without_poller) {
770 pollset->kicked_without_poller = false;
771 return GRPC_ERROR_NONE;
772 }
773 if (begin_worker(pollset, &worker, worker_hdl, &now, deadline)) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700774 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
Craig Tiller4509c472017-04-27 19:05:13 +0000775 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
Craig Tillerc81512a2017-05-26 09:53:58 -0700776 GPR_ASSERT(!pollset->shutting_down);
Craig Tiller2acab6e2017-04-30 23:06:33 +0000777 GPR_ASSERT(!pollset->seen_inactive);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700778 gpr_mu_unlock(&pollset->mu);
Craig Tiller4509c472017-04-27 19:05:13 +0000779 append_error(&error, pollset_epoll(exec_ctx, pollset, now, deadline),
780 err_desc);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700781 gpr_mu_lock(&pollset->mu);
Craig Tiller4509c472017-04-27 19:05:13 +0000782 gpr_tls_set(&g_current_thread_worker, 0);
Craig Tiller830e82a2017-05-31 16:26:27 -0700783 } else {
784 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
Craig Tiller4509c472017-04-27 19:05:13 +0000785 }
786 end_worker(exec_ctx, pollset, &worker, worker_hdl);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700787 gpr_tls_set(&g_current_thread_pollset, 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000788 return error;
789}
790
791static grpc_error *pollset_kick(grpc_pollset *pollset,
792 grpc_pollset_worker *specific_worker) {
Craig Tillerb89bac02017-05-26 15:20:32 +0000793 if (GRPC_TRACER_ON(grpc_polling_trace)) {
794 gpr_strvec log;
795 gpr_strvec_init(&log);
796 char *tmp;
Craig Tiller75aef7f2017-05-26 08:26:08 -0700797 gpr_asprintf(
798 &tmp, "PS:%p KICK:%p curps=%p curworker=%p root=%p", pollset,
799 specific_worker, (void *)gpr_tls_get(&g_current_thread_pollset),
800 (void *)gpr_tls_get(&g_current_thread_worker), pollset->root_worker);
Craig Tillerb89bac02017-05-26 15:20:32 +0000801 gpr_strvec_add(&log, tmp);
802 if (pollset->root_worker != NULL) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700803 gpr_asprintf(&tmp, " {kick_state=%s next=%p {kick_state=%s}}",
804 kick_state_string(pollset->root_worker->kick_state),
805 pollset->root_worker->next,
806 kick_state_string(pollset->root_worker->next->kick_state));
Craig Tillerb89bac02017-05-26 15:20:32 +0000807 gpr_strvec_add(&log, tmp);
808 }
809 if (specific_worker != NULL) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700810 gpr_asprintf(&tmp, " worker_kick_state=%s",
811 kick_state_string(specific_worker->kick_state));
Craig Tillerb89bac02017-05-26 15:20:32 +0000812 gpr_strvec_add(&log, tmp);
813 }
814 tmp = gpr_strvec_flatten(&log, NULL);
815 gpr_strvec_destroy(&log);
Craig Tiller830e82a2017-05-31 16:26:27 -0700816 gpr_log(GPR_ERROR, "%s", tmp);
Craig Tillerb89bac02017-05-26 15:20:32 +0000817 gpr_free(tmp);
818 }
Craig Tiller4509c472017-04-27 19:05:13 +0000819 if (specific_worker == NULL) {
820 if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) {
Craig Tiller375eb252017-04-27 23:29:12 +0000821 grpc_pollset_worker *root_worker = pollset->root_worker;
822 if (root_worker == NULL) {
Craig Tiller4509c472017-04-27 19:05:13 +0000823 pollset->kicked_without_poller = true;
Craig Tiller75aef7f2017-05-26 08:26:08 -0700824 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700825 gpr_log(GPR_ERROR, " .. kicked_without_poller");
Craig Tiller75aef7f2017-05-26 08:26:08 -0700826 }
Craig Tiller4509c472017-04-27 19:05:13 +0000827 return GRPC_ERROR_NONE;
Craig Tiller375eb252017-04-27 23:29:12 +0000828 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700829 grpc_pollset_worker *next_worker = root_worker->next;
Craig Tiller830e82a2017-05-31 16:26:27 -0700830 if (root_worker->kick_state == KICKED) {
Craig Tiller75aef7f2017-05-26 08:26:08 -0700831 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700832 gpr_log(GPR_ERROR, " .. already kicked %p", root_worker);
833 }
834 SET_KICK_STATE(root_worker, KICKED);
835 return GRPC_ERROR_NONE;
836 } else if (next_worker->kick_state == KICKED) {
837 if (GRPC_TRACER_ON(grpc_polling_trace)) {
838 gpr_log(GPR_ERROR, " .. already kicked %p", next_worker);
839 }
840 SET_KICK_STATE(next_worker, KICKED);
841 return GRPC_ERROR_NONE;
842 } else if (root_worker ==
843 next_worker && // only try and wake up a poller if
844 // there is no next worker
845 root_worker == (grpc_pollset_worker *)gpr_atm_no_barrier_load(
846 &g_active_poller)) {
847 if (GRPC_TRACER_ON(grpc_polling_trace)) {
848 gpr_log(GPR_ERROR, " .. kicked %p", root_worker);
Craig Tiller75aef7f2017-05-26 08:26:08 -0700849 }
Craig Tiller55624a32017-05-26 08:14:44 -0700850 SET_KICK_STATE(root_worker, KICKED);
Craig Tiller4509c472017-04-27 19:05:13 +0000851 return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700852 } else if (next_worker->kick_state == UNKICKED) {
Craig Tiller75aef7f2017-05-26 08:26:08 -0700853 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700854 gpr_log(GPR_ERROR, " .. kicked %p", next_worker);
Craig Tiller75aef7f2017-05-26 08:26:08 -0700855 }
Craig Tiller8502ecb2017-04-28 14:22:01 -0700856 GPR_ASSERT(next_worker->initialized_cv);
Craig Tiller55624a32017-05-26 08:14:44 -0700857 SET_KICK_STATE(next_worker, KICKED);
Craig Tiller375eb252017-04-27 23:29:12 +0000858 gpr_cv_signal(&next_worker->cv);
859 return GRPC_ERROR_NONE;
Craig Tiller55624a32017-05-26 08:14:44 -0700860 } else if (next_worker->kick_state == DESIGNATED_POLLER) {
861 if (root_worker->kick_state != DESIGNATED_POLLER) {
Craig Tiller75aef7f2017-05-26 08:26:08 -0700862 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700863 gpr_log(
864 GPR_ERROR,
865 " .. kicked root non-poller %p (initialized_cv=%d) (poller=%p)",
866 root_worker, root_worker->initialized_cv, next_worker);
Craig Tiller75aef7f2017-05-26 08:26:08 -0700867 }
Craig Tiller55624a32017-05-26 08:14:44 -0700868 SET_KICK_STATE(root_worker, KICKED);
869 if (root_worker->initialized_cv) {
870 gpr_cv_signal(&root_worker->cv);
871 }
872 return GRPC_ERROR_NONE;
873 } else {
Craig Tiller75aef7f2017-05-26 08:26:08 -0700874 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700875 gpr_log(GPR_ERROR, " .. non-root poller %p (root=%p)", next_worker,
Craig Tiller75aef7f2017-05-26 08:26:08 -0700876 root_worker);
877 }
Craig Tiller55624a32017-05-26 08:14:44 -0700878 SET_KICK_STATE(next_worker, KICKED);
879 return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
880 }
Craig Tiller8502ecb2017-04-28 14:22:01 -0700881 } else {
Craig Tiller55624a32017-05-26 08:14:44 -0700882 GPR_ASSERT(next_worker->kick_state == KICKED);
883 SET_KICK_STATE(next_worker, KICKED);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700884 return GRPC_ERROR_NONE;
Craig Tiller4509c472017-04-27 19:05:13 +0000885 }
886 } else {
Craig Tiller830e82a2017-05-31 16:26:27 -0700887 if (GRPC_TRACER_ON(grpc_polling_trace)) {
888 gpr_log(GPR_ERROR, " .. kicked while waking up");
889 }
Craig Tiller4509c472017-04-27 19:05:13 +0000890 return GRPC_ERROR_NONE;
891 }
Craig Tiller43bf2592017-04-28 23:21:01 +0000892 } else if (specific_worker->kick_state == KICKED) {
Craig Tiller75aef7f2017-05-26 08:26:08 -0700893 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700894 gpr_log(GPR_ERROR, " .. specific worker already kicked");
Craig Tiller75aef7f2017-05-26 08:26:08 -0700895 }
Craig Tiller4509c472017-04-27 19:05:13 +0000896 return GRPC_ERROR_NONE;
897 } else if (gpr_tls_get(&g_current_thread_worker) ==
898 (intptr_t)specific_worker) {
Craig Tiller75aef7f2017-05-26 08:26:08 -0700899 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700900 gpr_log(GPR_ERROR, " .. mark %p kicked", specific_worker);
Craig Tiller75aef7f2017-05-26 08:26:08 -0700901 }
Craig Tiller55624a32017-05-26 08:14:44 -0700902 SET_KICK_STATE(specific_worker, KICKED);
Craig Tiller4509c472017-04-27 19:05:13 +0000903 return GRPC_ERROR_NONE;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700904 } else if (specific_worker ==
905 (grpc_pollset_worker *)gpr_atm_no_barrier_load(&g_active_poller)) {
Craig Tiller75aef7f2017-05-26 08:26:08 -0700906 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700907 gpr_log(GPR_ERROR, " .. kick active poller");
Craig Tiller75aef7f2017-05-26 08:26:08 -0700908 }
Craig Tiller55624a32017-05-26 08:14:44 -0700909 SET_KICK_STATE(specific_worker, KICKED);
Craig Tiller4509c472017-04-27 19:05:13 +0000910 return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700911 } else if (specific_worker->initialized_cv) {
Craig Tiller75aef7f2017-05-26 08:26:08 -0700912 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700913 gpr_log(GPR_ERROR, " .. kick waiting worker");
Craig Tiller75aef7f2017-05-26 08:26:08 -0700914 }
Craig Tiller55624a32017-05-26 08:14:44 -0700915 SET_KICK_STATE(specific_worker, KICKED);
Craig Tiller4509c472017-04-27 19:05:13 +0000916 gpr_cv_signal(&specific_worker->cv);
917 return GRPC_ERROR_NONE;
Craig Tiller8502ecb2017-04-28 14:22:01 -0700918 } else {
Craig Tiller75aef7f2017-05-26 08:26:08 -0700919 if (GRPC_TRACER_ON(grpc_polling_trace)) {
Craig Tiller830e82a2017-05-31 16:26:27 -0700920 gpr_log(GPR_ERROR, " .. kick non-waiting worker");
Craig Tiller75aef7f2017-05-26 08:26:08 -0700921 }
Craig Tiller55624a32017-05-26 08:14:44 -0700922 SET_KICK_STATE(specific_worker, KICKED);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700923 return GRPC_ERROR_NONE;
Craig Tiller4509c472017-04-27 19:05:13 +0000924 }
925}
926
927static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
928 grpc_fd *fd) {}
929
Craig Tiller4509c472017-04-27 19:05:13 +0000930/*******************************************************************************
Craig Tillerc67cc992017-04-27 10:15:51 -0700931 * Pollset-set Definitions
932 */
933
934static grpc_pollset_set *pollset_set_create(void) {
935 return (grpc_pollset_set *)((intptr_t)0xdeafbeef);
936}
937
938static void pollset_set_destroy(grpc_exec_ctx *exec_ctx,
939 grpc_pollset_set *pss) {}
940
941static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
942 grpc_fd *fd) {}
943
944static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
945 grpc_fd *fd) {}
946
947static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
948 grpc_pollset_set *pss, grpc_pollset *ps) {}
949
950static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
951 grpc_pollset_set *pss, grpc_pollset *ps) {}
952
953static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
954 grpc_pollset_set *bag,
955 grpc_pollset_set *item) {}
956
957static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
958 grpc_pollset_set *bag,
959 grpc_pollset_set *item) {}
960
961/*******************************************************************************
962 * Event engine binding
963 */
964
965static void shutdown_engine(void) {
966 fd_global_shutdown();
967 pollset_global_shutdown();
Sree Kuchibhotla54c31c72017-07-17 14:57:27 -0700968 close(g_epfd);
Craig Tillerc67cc992017-04-27 10:15:51 -0700969}
970
971static const grpc_event_engine_vtable vtable = {
972 .pollset_size = sizeof(grpc_pollset),
973
974 .fd_create = fd_create,
975 .fd_wrapped_fd = fd_wrapped_fd,
976 .fd_orphan = fd_orphan,
977 .fd_shutdown = fd_shutdown,
978 .fd_is_shutdown = fd_is_shutdown,
979 .fd_notify_on_read = fd_notify_on_read,
980 .fd_notify_on_write = fd_notify_on_write,
981 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
Craig Tillerc67cc992017-04-27 10:15:51 -0700982
983 .pollset_init = pollset_init,
984 .pollset_shutdown = pollset_shutdown,
985 .pollset_destroy = pollset_destroy,
986 .pollset_work = pollset_work,
987 .pollset_kick = pollset_kick,
988 .pollset_add_fd = pollset_add_fd,
989
990 .pollset_set_create = pollset_set_create,
991 .pollset_set_destroy = pollset_set_destroy,
992 .pollset_set_add_pollset = pollset_set_add_pollset,
993 .pollset_set_del_pollset = pollset_set_del_pollset,
994 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
995 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
996 .pollset_set_add_fd = pollset_set_add_fd,
997 .pollset_set_del_fd = pollset_set_del_fd,
998
Craig Tillerc67cc992017-04-27 10:15:51 -0700999 .shutdown_engine = shutdown_engine,
1000};
1001
1002/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
1003 * Create a dummy epoll_fd to make sure epoll support is available */
Craig Tiller6f0af492017-04-27 19:26:16 +00001004const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) {
Craig Tillerc67cc992017-04-27 10:15:51 -07001005 if (!grpc_has_wakeup_fd()) {
1006 return NULL;
1007 }
1008
Craig Tiller4509c472017-04-27 19:05:13 +00001009 g_epfd = epoll_create1(EPOLL_CLOEXEC);
1010 if (g_epfd < 0) {
1011 gpr_log(GPR_ERROR, "epoll unavailable");
Craig Tillerc67cc992017-04-27 10:15:51 -07001012 return NULL;
1013 }
1014
Craig Tillerc67cc992017-04-27 10:15:51 -07001015 fd_global_init();
1016
1017 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
Craig Tiller4509c472017-04-27 19:05:13 +00001018 close(g_epfd);
1019 fd_global_shutdown();
Craig Tillerc67cc992017-04-27 10:15:51 -07001020 return NULL;
1021 }
1022
Craig Tiller830e82a2017-05-31 16:26:27 -07001023 gpr_log(GPR_ERROR, "grpc epoll fd: %d", g_epfd);
1024
Craig Tillerc67cc992017-04-27 10:15:51 -07001025 return &vtable;
1026}
1027
1028#else /* defined(GRPC_LINUX_EPOLL) */
1029#if defined(GRPC_POSIX_SOCKET)
1030#include "src/core/lib/iomgr/ev_posix.h"
1031/* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
1032 * NULL */
Craig Tiller9ddb3152017-04-27 21:32:56 +00001033const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) {
1034 return NULL;
1035}
Craig Tillerc67cc992017-04-27 10:15:51 -07001036#endif /* defined(GRPC_POSIX_SOCKET) */
1037#endif /* !defined(GRPC_LINUX_EPOLL) */