blob: 7d7aa44912f016ded48e006961f89e26fe2af917 [file] [log] [blame]
Craig Tillerc67cc992017-04-27 10:15:51 -07001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2017 gRPC authors.
Craig Tillerc67cc992017-04-27 10:15:51 -07004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
Craig Tillerc67cc992017-04-27 10:15:51 -07008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
Craig Tillerc67cc992017-04-27 10:15:51 -070010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
Craig Tillerc67cc992017-04-27 10:15:51 -070016 *
17 */
18
19#include "src/core/lib/iomgr/port.h"
20
21/* This polling engine is only relevant on linux kernels supporting epoll() */
22#ifdef GRPC_LINUX_EPOLL
23
Craig Tiller4509c472017-04-27 19:05:13 +000024#include "src/core/lib/iomgr/ev_epoll1_linux.h"
Craig Tillerc67cc992017-04-27 10:15:51 -070025
26#include <assert.h>
27#include <errno.h>
28#include <poll.h>
29#include <pthread.h>
30#include <string.h>
31#include <sys/epoll.h>
32#include <sys/socket.h>
33#include <unistd.h>
34
35#include <grpc/support/alloc.h>
Craig Tiller6de05932017-04-28 09:17:38 -070036#include <grpc/support/cpu.h>
Craig Tillerc67cc992017-04-27 10:15:51 -070037#include <grpc/support/log.h>
38#include <grpc/support/string_util.h>
39#include <grpc/support/tls.h>
40#include <grpc/support/useful.h>
41
42#include "src/core/lib/iomgr/ev_posix.h"
43#include "src/core/lib/iomgr/iomgr_internal.h"
44#include "src/core/lib/iomgr/lockfree_event.h"
Craig Tillerc67cc992017-04-27 10:15:51 -070045#include "src/core/lib/iomgr/wakeup_fd_posix.h"
46#include "src/core/lib/iomgr/workqueue.h"
47#include "src/core/lib/profiling/timers.h"
48#include "src/core/lib/support/block_annotate.h"
49
Craig Tillerc67cc992017-04-27 10:15:51 -070050static grpc_wakeup_fd global_wakeup_fd;
51static int g_epfd;
52
53/*******************************************************************************
54 * Fd Declarations
55 */
56
57struct grpc_fd {
58 int fd;
59
Craig Tillerc67cc992017-04-27 10:15:51 -070060 gpr_atm read_closure;
61 gpr_atm write_closure;
62
63 struct grpc_fd *freelist_next;
Craig Tillerc67cc992017-04-27 10:15:51 -070064
65 /* The pollset that last noticed that the fd is readable. The actual type
66 * stored in this is (grpc_pollset *) */
67 gpr_atm read_notifier_pollset;
68
69 grpc_iomgr_object iomgr_object;
70};
71
72static void fd_global_init(void);
73static void fd_global_shutdown(void);
74
75/*******************************************************************************
76 * Pollset Declarations
77 */
78
Craig Tiller43bf2592017-04-28 23:21:01 +000079typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state;
Craig Tillerc67cc992017-04-27 10:15:51 -070080
81struct grpc_pollset_worker {
Craig Tiller32f90ee2017-04-28 12:46:41 -070082 kick_state kick_state;
Craig Tillerc67cc992017-04-27 10:15:51 -070083 bool initialized_cv;
Craig Tiller32f90ee2017-04-28 12:46:41 -070084 grpc_pollset_worker *next;
85 grpc_pollset_worker *prev;
Craig Tillerc67cc992017-04-27 10:15:51 -070086 gpr_cv cv;
Craig Tiller50da5ec2017-05-01 13:51:14 -070087 grpc_closure_list schedule_on_end_work;
Craig Tillerc67cc992017-04-27 10:15:51 -070088};
89
Craig Tillerba550da2017-05-01 14:26:31 +000090#define MAX_NEIGHBOURHOODS 1024
91
Craig Tiller6de05932017-04-28 09:17:38 -070092typedef struct pollset_neighbourhood {
93 gpr_mu mu;
94 grpc_pollset *active_root;
Craig Tiller6de05932017-04-28 09:17:38 -070095 char pad[GPR_CACHELINE_SIZE];
96} pollset_neighbourhood;
97
Craig Tillerc67cc992017-04-27 10:15:51 -070098struct grpc_pollset {
Craig Tiller6de05932017-04-28 09:17:38 -070099 gpr_mu mu;
100 pollset_neighbourhood *neighbourhood;
Craig Tillere00d7332017-05-01 15:43:51 +0000101 bool reassigning_neighbourhood;
Craig Tiller4509c472017-04-27 19:05:13 +0000102 grpc_pollset_worker *root_worker;
103 bool kicked_without_poller;
Craig Tiller6de05932017-04-28 09:17:38 -0700104 bool seen_inactive;
Craig Tillerc67cc992017-04-27 10:15:51 -0700105 bool shutting_down; /* Is the pollset shutting down ? */
106 bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
Craig Tiller4509c472017-04-27 19:05:13 +0000107 grpc_closure *shutdown_closure; /* Called after after shutdown is complete */
Craig Tillerba550da2017-05-01 14:26:31 +0000108 int begin_refs;
Craig Tiller6de05932017-04-28 09:17:38 -0700109
110 grpc_pollset *next;
111 grpc_pollset *prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700112};
113
114/*******************************************************************************
115 * Pollset-set Declarations
116 */
Craig Tiller6de05932017-04-28 09:17:38 -0700117
Craig Tillerc67cc992017-04-27 10:15:51 -0700118struct grpc_pollset_set {};
119
120/*******************************************************************************
121 * Common helpers
122 */
123
124static bool append_error(grpc_error **composite, grpc_error *error,
125 const char *desc) {
126 if (error == GRPC_ERROR_NONE) return true;
127 if (*composite == GRPC_ERROR_NONE) {
128 *composite = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc);
129 }
130 *composite = grpc_error_add_child(*composite, error);
131 return false;
132}
133
134/*******************************************************************************
135 * Fd Definitions
136 */
137
138/* We need to keep a freelist not because of any concerns of malloc performance
139 * but instead so that implementations with multiple threads in (for example)
140 * epoll_wait deal with the race between pollset removal and incoming poll
141 * notifications.
142 *
143 * The problem is that the poller ultimately holds a reference to this
144 * object, so it is very difficult to know when is safe to free it, at least
145 * without some expensive synchronization.
146 *
147 * If we keep the object freelisted, in the worst case losing this race just
148 * becomes a spurious read notification on a reused fd.
149 */
150
151/* The alarm system needs to be able to wakeup 'some poller' sometimes
152 * (specifically when a new alarm needs to be triggered earlier than the next
153 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
154 * case occurs. */
155
156static grpc_fd *fd_freelist = NULL;
157static gpr_mu fd_freelist_mu;
158
Craig Tillerc67cc992017-04-27 10:15:51 -0700159static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
160
161static void fd_global_shutdown(void) {
162 gpr_mu_lock(&fd_freelist_mu);
163 gpr_mu_unlock(&fd_freelist_mu);
164 while (fd_freelist != NULL) {
165 grpc_fd *fd = fd_freelist;
166 fd_freelist = fd_freelist->freelist_next;
Craig Tillerc67cc992017-04-27 10:15:51 -0700167 gpr_free(fd);
168 }
169 gpr_mu_destroy(&fd_freelist_mu);
170}
171
172static grpc_fd *fd_create(int fd, const char *name) {
173 grpc_fd *new_fd = NULL;
174
175 gpr_mu_lock(&fd_freelist_mu);
176 if (fd_freelist != NULL) {
177 new_fd = fd_freelist;
178 fd_freelist = fd_freelist->freelist_next;
179 }
180 gpr_mu_unlock(&fd_freelist_mu);
181
182 if (new_fd == NULL) {
183 new_fd = gpr_malloc(sizeof(grpc_fd));
Craig Tillerc67cc992017-04-27 10:15:51 -0700184 }
185
Craig Tillerc67cc992017-04-27 10:15:51 -0700186 new_fd->fd = fd;
Craig Tillerc67cc992017-04-27 10:15:51 -0700187 grpc_lfev_init(&new_fd->read_closure);
188 grpc_lfev_init(&new_fd->write_closure);
189 gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
190
191 new_fd->freelist_next = NULL;
Craig Tillerc67cc992017-04-27 10:15:51 -0700192
193 char *fd_name;
194 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
195 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
196#ifdef GRPC_FD_REF_COUNT_DEBUG
197 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, (void *)new_fd, fd_name);
198#endif
199 gpr_free(fd_name);
Craig Tiller9ddb3152017-04-27 21:32:56 +0000200
201 struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET),
202 .data.ptr = new_fd};
203 if (epoll_ctl(g_epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
204 gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno));
205 }
206
Craig Tillerc67cc992017-04-27 10:15:51 -0700207 return new_fd;
208}
209
Craig Tiller4509c472017-04-27 19:05:13 +0000210static int fd_wrapped_fd(grpc_fd *fd) { return fd->fd; }
Craig Tillerc67cc992017-04-27 10:15:51 -0700211
Craig Tiller9ddb3152017-04-27 21:32:56 +0000212/* Might be called multiple times */
213static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
214 if (grpc_lfev_set_shutdown(exec_ctx, &fd->read_closure,
215 GRPC_ERROR_REF(why))) {
216 shutdown(fd->fd, SHUT_RDWR);
217 grpc_lfev_set_shutdown(exec_ctx, &fd->write_closure, GRPC_ERROR_REF(why));
218 }
219 GRPC_ERROR_UNREF(why);
220}
221
Craig Tillerc67cc992017-04-27 10:15:51 -0700222static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
223 grpc_closure *on_done, int *release_fd,
224 const char *reason) {
Craig Tillerc67cc992017-04-27 10:15:51 -0700225 grpc_error *error = GRPC_ERROR_NONE;
Craig Tillerc67cc992017-04-27 10:15:51 -0700226
Craig Tiller9ddb3152017-04-27 21:32:56 +0000227 if (!grpc_lfev_is_shutdown(&fd->read_closure)) {
228 fd_shutdown(exec_ctx, fd, GRPC_ERROR_CREATE_FROM_COPIED_STRING(reason));
229 }
230
Craig Tillerc67cc992017-04-27 10:15:51 -0700231 /* If release_fd is not NULL, we should be relinquishing control of the file
232 descriptor fd->fd (but we still own the grpc_fd structure). */
233 if (release_fd != NULL) {
234 *release_fd = fd->fd;
235 } else {
236 close(fd->fd);
Craig Tillerc67cc992017-04-27 10:15:51 -0700237 }
238
Craig Tiller4509c472017-04-27 19:05:13 +0000239 grpc_closure_sched(exec_ctx, on_done, GRPC_ERROR_REF(error));
Craig Tillerc67cc992017-04-27 10:15:51 -0700240
Craig Tiller4509c472017-04-27 19:05:13 +0000241 grpc_iomgr_unregister_object(&fd->iomgr_object);
242 grpc_lfev_destroy(&fd->read_closure);
243 grpc_lfev_destroy(&fd->write_closure);
Craig Tillerc67cc992017-04-27 10:15:51 -0700244
Craig Tiller4509c472017-04-27 19:05:13 +0000245 gpr_mu_lock(&fd_freelist_mu);
246 fd->freelist_next = fd_freelist;
247 fd_freelist = fd;
248 gpr_mu_unlock(&fd_freelist_mu);
Craig Tillerc67cc992017-04-27 10:15:51 -0700249}
250
251static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
252 grpc_fd *fd) {
253 gpr_atm notifier = gpr_atm_acq_load(&fd->read_notifier_pollset);
254 return (grpc_pollset *)notifier;
255}
256
257static bool fd_is_shutdown(grpc_fd *fd) {
258 return grpc_lfev_is_shutdown(&fd->read_closure);
259}
260
Craig Tillerc67cc992017-04-27 10:15:51 -0700261static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
262 grpc_closure *closure) {
263 grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure);
264}
265
266static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
267 grpc_closure *closure) {
268 grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
269}
270
271static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
Craig Tiller50da5ec2017-05-01 13:51:14 -0700272 return (grpc_workqueue *)0xb0b51ed;
Craig Tiller4509c472017-04-27 19:05:13 +0000273}
274
275static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
276 grpc_pollset *notifier) {
277 grpc_lfev_set_ready(exec_ctx, &fd->read_closure);
278
279 /* Note, it is possible that fd_become_readable might be called twice with
280 different 'notifier's when an fd becomes readable and it is in two epoll
281 sets (This can happen briefly during polling island merges). In such cases
282 it does not really matter which notifer is set as the read_notifier_pollset
283 (They would both point to the same polling island anyway) */
284 /* Use release store to match with acquire load in fd_get_read_notifier */
285 gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
286}
287
288static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
289 grpc_lfev_set_ready(exec_ctx, &fd->write_closure);
Craig Tillerc67cc992017-04-27 10:15:51 -0700290}
291
292/*******************************************************************************
293 * Pollset Definitions
294 */
295
Craig Tiller6de05932017-04-28 09:17:38 -0700296GPR_TLS_DECL(g_current_thread_pollset);
297GPR_TLS_DECL(g_current_thread_worker);
298static gpr_atm g_active_poller;
299static pollset_neighbourhood *g_neighbourhoods;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700300static size_t g_num_neighbourhoods;
Craig Tiller67e229e2017-05-01 20:57:59 +0000301static gpr_mu g_wq_mu;
302static grpc_closure_list g_wq_items;
Craig Tiller6de05932017-04-28 09:17:38 -0700303
Craig Tillerc67cc992017-04-27 10:15:51 -0700304/* Return true if first in list */
Craig Tiller32f90ee2017-04-28 12:46:41 -0700305static bool worker_insert(grpc_pollset *pollset, grpc_pollset_worker *worker) {
306 if (pollset->root_worker == NULL) {
307 pollset->root_worker = worker;
308 worker->next = worker->prev = worker;
Craig Tillerc67cc992017-04-27 10:15:51 -0700309 return true;
310 } else {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700311 worker->next = pollset->root_worker;
312 worker->prev = worker->next->prev;
313 worker->next->prev = worker;
314 worker->prev->next = worker;
Craig Tillerc67cc992017-04-27 10:15:51 -0700315 return false;
316 }
317}
318
319/* Return true if last in list */
320typedef enum { EMPTIED, NEW_ROOT, REMOVED } worker_remove_result;
321
Craig Tiller32f90ee2017-04-28 12:46:41 -0700322static worker_remove_result worker_remove(grpc_pollset *pollset,
Craig Tillerc67cc992017-04-27 10:15:51 -0700323 grpc_pollset_worker *worker) {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700324 if (worker == pollset->root_worker) {
325 if (worker == worker->next) {
326 pollset->root_worker = NULL;
Craig Tillerc67cc992017-04-27 10:15:51 -0700327 return EMPTIED;
328 } else {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700329 pollset->root_worker = worker->next;
330 worker->prev->next = worker->next;
331 worker->next->prev = worker->prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700332 return NEW_ROOT;
333 }
334 } else {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700335 worker->prev->next = worker->next;
336 worker->next->prev = worker->prev;
Craig Tillerc67cc992017-04-27 10:15:51 -0700337 return REMOVED;
338 }
339}
340
Craig Tillerba550da2017-05-01 14:26:31 +0000341static size_t choose_neighbourhood(void) {
342 return (size_t)gpr_cpu_current_cpu() % g_num_neighbourhoods;
343}
344
Craig Tiller4509c472017-04-27 19:05:13 +0000345static grpc_error *pollset_global_init(void) {
Craig Tiller4509c472017-04-27 19:05:13 +0000346 gpr_tls_init(&g_current_thread_pollset);
347 gpr_tls_init(&g_current_thread_worker);
Craig Tiller6de05932017-04-28 09:17:38 -0700348 gpr_atm_no_barrier_store(&g_active_poller, 0);
Craig Tiller375eb252017-04-27 23:29:12 +0000349 global_wakeup_fd.read_fd = -1;
350 grpc_error *err = grpc_wakeup_fd_init(&global_wakeup_fd);
Craig Tiller67e229e2017-05-01 20:57:59 +0000351 gpr_mu_init(&g_wq_mu);
352 g_wq_items = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
Craig Tiller375eb252017-04-27 23:29:12 +0000353 if (err != GRPC_ERROR_NONE) return err;
Craig Tiller4509c472017-04-27 19:05:13 +0000354 struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLET),
355 .data.ptr = &global_wakeup_fd};
356 if (epoll_ctl(g_epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd, &ev) != 0) {
357 return GRPC_OS_ERROR(errno, "epoll_ctl");
358 }
Craig Tillerba550da2017-05-01 14:26:31 +0000359 g_num_neighbourhoods = GPR_CLAMP(gpr_cpu_num_cores(), 1, MAX_NEIGHBOURHOODS);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700360 g_neighbourhoods =
361 gpr_zalloc(sizeof(*g_neighbourhoods) * g_num_neighbourhoods);
362 for (size_t i = 0; i < g_num_neighbourhoods; i++) {
363 gpr_mu_init(&g_neighbourhoods[i].mu);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700364 }
Craig Tiller4509c472017-04-27 19:05:13 +0000365 return GRPC_ERROR_NONE;
366}
367
368static void pollset_global_shutdown(void) {
Craig Tiller4509c472017-04-27 19:05:13 +0000369 gpr_tls_destroy(&g_current_thread_pollset);
370 gpr_tls_destroy(&g_current_thread_worker);
Craig Tiller67e229e2017-05-01 20:57:59 +0000371 gpr_mu_destroy(&g_wq_mu);
Craig Tiller375eb252017-04-27 23:29:12 +0000372 if (global_wakeup_fd.read_fd != -1) grpc_wakeup_fd_destroy(&global_wakeup_fd);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700373 for (size_t i = 0; i < g_num_neighbourhoods; i++) {
374 gpr_mu_destroy(&g_neighbourhoods[i].mu);
375 }
376 gpr_free(g_neighbourhoods);
Craig Tiller4509c472017-04-27 19:05:13 +0000377}
378
379static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
Craig Tiller6de05932017-04-28 09:17:38 -0700380 gpr_mu_init(&pollset->mu);
381 *mu = &pollset->mu;
Craig Tillerba550da2017-05-01 14:26:31 +0000382 pollset->neighbourhood = &g_neighbourhoods[choose_neighbourhood()];
Craig Tiller6de05932017-04-28 09:17:38 -0700383 pollset->seen_inactive = true;
Craig Tiller6de05932017-04-28 09:17:38 -0700384}
385
Craig Tillerc6109852017-05-01 14:26:49 -0700386static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
Craig Tillere00d7332017-05-01 15:43:51 +0000387 gpr_mu_lock(&pollset->mu);
Craig Tillerba550da2017-05-01 14:26:31 +0000388 if (!pollset->seen_inactive) {
Craig Tillere00d7332017-05-01 15:43:51 +0000389 pollset_neighbourhood *neighbourhood = pollset->neighbourhood;
390 gpr_mu_unlock(&pollset->mu);
Craig Tillera95bacf2017-05-01 12:51:24 -0700391 retry_lock_neighbourhood:
Craig Tillere00d7332017-05-01 15:43:51 +0000392 gpr_mu_lock(&neighbourhood->mu);
393 gpr_mu_lock(&pollset->mu);
394 if (!pollset->seen_inactive) {
395 if (pollset->neighbourhood != neighbourhood) {
396 gpr_mu_unlock(&neighbourhood->mu);
397 neighbourhood = pollset->neighbourhood;
398 gpr_mu_unlock(&pollset->mu);
399 goto retry_lock_neighbourhood;
400 }
401 pollset->prev->next = pollset->next;
402 pollset->next->prev = pollset->prev;
403 if (pollset == pollset->neighbourhood->active_root) {
404 pollset->neighbourhood->active_root =
405 pollset->next == pollset ? NULL : pollset->next;
406 }
Craig Tillerba550da2017-05-01 14:26:31 +0000407 }
408 gpr_mu_unlock(&pollset->neighbourhood->mu);
Craig Tiller6de05932017-04-28 09:17:38 -0700409 }
Craig Tillere00d7332017-05-01 15:43:51 +0000410 gpr_mu_unlock(&pollset->mu);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700411 gpr_mu_destroy(&pollset->mu);
Craig Tiller4509c472017-04-27 19:05:13 +0000412}
413
414static grpc_error *pollset_kick_all(grpc_pollset *pollset) {
415 grpc_error *error = GRPC_ERROR_NONE;
416 if (pollset->root_worker != NULL) {
417 grpc_pollset_worker *worker = pollset->root_worker;
418 do {
419 if (worker->initialized_cv) {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700420 worker->kick_state = KICKED;
Craig Tiller4509c472017-04-27 19:05:13 +0000421 gpr_cv_signal(&worker->cv);
422 } else {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700423 worker->kick_state = KICKED;
Craig Tiller4509c472017-04-27 19:05:13 +0000424 append_error(&error, grpc_wakeup_fd_wakeup(&global_wakeup_fd),
425 "pollset_shutdown");
426 }
427
Craig Tiller32f90ee2017-04-28 12:46:41 -0700428 worker = worker->next;
Craig Tiller4509c472017-04-27 19:05:13 +0000429 } while (worker != pollset->root_worker);
430 }
431 return error;
432}
433
434static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx,
435 grpc_pollset *pollset) {
Craig Tillerba550da2017-05-01 14:26:31 +0000436 if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL &&
437 pollset->begin_refs == 0) {
Craig Tiller4509c472017-04-27 19:05:13 +0000438 grpc_closure_sched(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE);
439 pollset->shutdown_closure = NULL;
440 }
441}
442
443static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
444 grpc_closure *closure) {
445 GPR_ASSERT(pollset->shutdown_closure == NULL);
446 pollset->shutdown_closure = closure;
447 GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(pollset));
448 pollset_maybe_finish_shutdown(exec_ctx, pollset);
449}
450
Craig Tillera95bacf2017-05-01 12:51:24 -0700451#define MAX_EPOLL_EVENTS 100
Craig Tiller4509c472017-04-27 19:05:13 +0000452
453static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
454 gpr_timespec now) {
455 gpr_timespec timeout;
456 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
457 return -1;
458 }
459
460 if (gpr_time_cmp(deadline, now) <= 0) {
461 return 0;
462 }
463
464 static const gpr_timespec round_up = {
465 .clock_type = GPR_TIMESPAN, .tv_sec = 0, .tv_nsec = GPR_NS_PER_MS - 1};
466 timeout = gpr_time_sub(deadline, now);
467 int millis = gpr_time_to_millis(gpr_time_add(timeout, round_up));
468 return millis >= 1 ? millis : 1;
469}
470
471static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
472 gpr_timespec now, gpr_timespec deadline) {
473 struct epoll_event events[MAX_EPOLL_EVENTS];
474 static const char *err_desc = "pollset_poll";
475
476 int timeout = poll_deadline_to_millis_timeout(deadline, now);
477
478 if (timeout != 0) {
479 GRPC_SCHEDULING_START_BLOCKING_REGION;
480 }
481 int r;
482 do {
483 r = epoll_wait(g_epfd, events, MAX_EPOLL_EVENTS, timeout);
484 } while (r < 0 && errno == EINTR);
485 if (timeout != 0) {
486 GRPC_SCHEDULING_END_BLOCKING_REGION;
487 }
488
489 if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
490
491 grpc_error *error = GRPC_ERROR_NONE;
492 for (int i = 0; i < r; i++) {
493 void *data_ptr = events[i].data.ptr;
494 if (data_ptr == &global_wakeup_fd) {
Craig Tiller67e229e2017-05-01 20:57:59 +0000495 gpr_mu_lock(&g_wq_mu);
496 grpc_closure_list_move(&g_wq_items, &exec_ctx->closure_list);
497 gpr_mu_unlock(&g_wq_mu);
Craig Tiller4509c472017-04-27 19:05:13 +0000498 append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
499 err_desc);
500 } else {
501 grpc_fd *fd = (grpc_fd *)(data_ptr);
502 bool cancel = (events[i].events & (EPOLLERR | EPOLLHUP)) != 0;
503 bool read_ev = (events[i].events & (EPOLLIN | EPOLLPRI)) != 0;
504 bool write_ev = (events[i].events & EPOLLOUT) != 0;
505 if (read_ev || cancel) {
506 fd_become_readable(exec_ctx, fd, pollset);
507 }
508 if (write_ev || cancel) {
509 fd_become_writable(exec_ctx, fd);
510 }
511 }
512 }
513
514 return error;
515}
516
517static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
518 grpc_pollset_worker **worker_hdl, gpr_timespec *now,
519 gpr_timespec deadline) {
Craig Tiller4509c472017-04-27 19:05:13 +0000520 if (worker_hdl != NULL) *worker_hdl = worker;
521 worker->initialized_cv = false;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700522 worker->kick_state = UNKICKED;
Craig Tiller50da5ec2017-05-01 13:51:14 -0700523 worker->schedule_on_end_work = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
Craig Tillerba550da2017-05-01 14:26:31 +0000524 pollset->begin_refs++;
Craig Tiller4509c472017-04-27 19:05:13 +0000525
Craig Tiller32f90ee2017-04-28 12:46:41 -0700526 if (pollset->seen_inactive) {
527 // pollset has been observed to be inactive, we need to move back to the
528 // active list
Craig Tillere00d7332017-05-01 15:43:51 +0000529 bool is_reassigning = false;
530 if (!pollset->reassigning_neighbourhood) {
531 is_reassigning = true;
532 pollset->reassigning_neighbourhood = true;
533 pollset->neighbourhood = &g_neighbourhoods[choose_neighbourhood()];
534 }
535 pollset_neighbourhood *neighbourhood = pollset->neighbourhood;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700536 gpr_mu_unlock(&pollset->mu);
Craig Tillerba550da2017-05-01 14:26:31 +0000537 // pollset unlocked: state may change (even worker->kick_state)
538 retry_lock_neighbourhood:
Craig Tiller32f90ee2017-04-28 12:46:41 -0700539 gpr_mu_lock(&neighbourhood->mu);
540 gpr_mu_lock(&pollset->mu);
541 if (pollset->seen_inactive) {
Craig Tiller2acab6e2017-04-30 23:06:33 +0000542 if (neighbourhood != pollset->neighbourhood) {
543 gpr_mu_unlock(&neighbourhood->mu);
544 neighbourhood = pollset->neighbourhood;
545 gpr_mu_unlock(&pollset->mu);
546 goto retry_lock_neighbourhood;
547 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700548 pollset->seen_inactive = false;
Craig Tiller2acab6e2017-04-30 23:06:33 +0000549 if (neighbourhood->active_root == NULL) {
550 neighbourhood->active_root = pollset->next = pollset->prev = pollset;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700551 if (gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)worker)) {
Craig Tiller43bf2592017-04-28 23:21:01 +0000552 worker->kick_state = DESIGNATED_POLLER;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700553 }
Craig Tiller2acab6e2017-04-30 23:06:33 +0000554 } else {
555 pollset->next = neighbourhood->active_root;
556 pollset->prev = pollset->next->prev;
557 pollset->next->prev = pollset->prev->next = pollset;
Craig Tiller4509c472017-04-27 19:05:13 +0000558 }
559 }
Craig Tillere00d7332017-05-01 15:43:51 +0000560 if (is_reassigning) {
561 GPR_ASSERT(pollset->reassigning_neighbourhood);
562 pollset->reassigning_neighbourhood = false;
563 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700564 gpr_mu_unlock(&neighbourhood->mu);
565 }
566 worker_insert(pollset, worker);
Craig Tillerba550da2017-05-01 14:26:31 +0000567 pollset->begin_refs--;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700568 if (worker->kick_state == UNKICKED) {
Craig Tillera4b8eb02017-04-29 00:13:52 +0000569 GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700570 worker->initialized_cv = true;
571 gpr_cv_init(&worker->cv);
Craig Tillerba550da2017-05-01 14:26:31 +0000572 while (worker->kick_state == UNKICKED &&
573 pollset->shutdown_closure == NULL) {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700574 if (gpr_cv_wait(&worker->cv, &pollset->mu, deadline) &&
575 worker->kick_state == UNKICKED) {
576 worker->kick_state = KICKED;
577 }
Craig Tillerba550da2017-05-01 14:26:31 +0000578 }
Craig Tiller4509c472017-04-27 19:05:13 +0000579 *now = gpr_now(now->clock_type);
580 }
581
Craig Tiller43bf2592017-04-28 23:21:01 +0000582 return worker->kick_state == DESIGNATED_POLLER &&
Craig Tiller32f90ee2017-04-28 12:46:41 -0700583 pollset->shutdown_closure == NULL;
Craig Tiller4509c472017-04-27 19:05:13 +0000584}
585
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700586static bool check_neighbourhood_for_available_poller(
Craig Tillera4b8eb02017-04-29 00:13:52 +0000587 pollset_neighbourhood *neighbourhood) {
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700588 bool found_worker = false;
589 do {
590 grpc_pollset *inspect = neighbourhood->active_root;
591 if (inspect == NULL) {
592 break;
593 }
594 gpr_mu_lock(&inspect->mu);
595 GPR_ASSERT(!inspect->seen_inactive);
596 grpc_pollset_worker *inspect_worker = inspect->root_worker;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700597 if (inspect_worker != NULL) {
Craig Tillera4b8eb02017-04-29 00:13:52 +0000598 do {
Craig Tillerba550da2017-05-01 14:26:31 +0000599 switch (inspect_worker->kick_state) {
600 case UNKICKED:
601 if (gpr_atm_no_barrier_cas(&g_active_poller, 0,
602 (gpr_atm)inspect_worker)) {
603 inspect_worker->kick_state = DESIGNATED_POLLER;
604 if (inspect_worker->initialized_cv) {
605 gpr_cv_signal(&inspect_worker->cv);
606 }
Craig Tillera4b8eb02017-04-29 00:13:52 +0000607 }
Craig Tillerba550da2017-05-01 14:26:31 +0000608 // even if we didn't win the cas, there's a worker, we can stop
609 found_worker = true;
610 break;
611 case KICKED:
612 break;
613 case DESIGNATED_POLLER:
614 found_worker = true; // ok, so someone else found the worker, but
615 // we'll accept that
616 break;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700617 }
Craig Tillera4b8eb02017-04-29 00:13:52 +0000618 inspect_worker = inspect_worker->next;
619 } while (inspect_worker != inspect->root_worker);
620 }
621 if (!found_worker) {
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700622 inspect->seen_inactive = true;
Craig Tiller2acab6e2017-04-30 23:06:33 +0000623 if (inspect == neighbourhood->active_root) {
Craig Tillera95bacf2017-05-01 12:51:24 -0700624 neighbourhood->active_root =
625 inspect->next == inspect ? NULL : inspect->next;
Craig Tiller2acab6e2017-04-30 23:06:33 +0000626 }
627 inspect->next->prev = inspect->prev;
628 inspect->prev->next = inspect->next;
Craig Tillere00d7332017-05-01 15:43:51 +0000629 inspect->next = inspect->prev = NULL;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700630 }
631 gpr_mu_unlock(&inspect->mu);
632 } while (!found_worker);
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700633 return found_worker;
634}
635
Craig Tiller4509c472017-04-27 19:05:13 +0000636static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
637 grpc_pollset_worker *worker,
638 grpc_pollset_worker **worker_hdl) {
Craig Tiller8502ecb2017-04-28 14:22:01 -0700639 if (worker_hdl != NULL) *worker_hdl = NULL;
Craig Tillera4b8eb02017-04-29 00:13:52 +0000640 worker->kick_state = KICKED;
Craig Tiller50da5ec2017-05-01 13:51:14 -0700641 grpc_closure_list_move(&worker->schedule_on_end_work,
642 &exec_ctx->closure_list);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700643 if (gpr_atm_no_barrier_load(&g_active_poller) == (gpr_atm)worker) {
Craig Tillera4b8eb02017-04-29 00:13:52 +0000644 if (worker->next != worker && worker->next->kick_state == UNKICKED) {
Craig Tiller2acab6e2017-04-30 23:06:33 +0000645 GPR_ASSERT(worker->next->initialized_cv);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700646 gpr_atm_no_barrier_store(&g_active_poller, (gpr_atm)worker->next);
Craig Tiller43bf2592017-04-28 23:21:01 +0000647 worker->next->kick_state = DESIGNATED_POLLER;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700648 gpr_cv_signal(&worker->next->cv);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700649 if (grpc_exec_ctx_has_work(exec_ctx)) {
650 gpr_mu_unlock(&pollset->mu);
651 grpc_exec_ctx_flush(exec_ctx);
652 gpr_mu_lock(&pollset->mu);
653 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700654 } else {
655 gpr_atm_no_barrier_store(&g_active_poller, 0);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700656 gpr_mu_unlock(&pollset->mu);
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700657 size_t poller_neighbourhood_idx =
658 (size_t)(pollset->neighbourhood - g_neighbourhoods);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700659 bool found_worker = false;
Craig Tillerba550da2017-05-01 14:26:31 +0000660 bool scan_state[MAX_NEIGHBOURHOODS];
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700661 for (size_t i = 0; !found_worker && i < g_num_neighbourhoods; i++) {
662 pollset_neighbourhood *neighbourhood =
663 &g_neighbourhoods[(poller_neighbourhood_idx + i) %
664 g_num_neighbourhoods];
665 if (gpr_mu_trylock(&neighbourhood->mu)) {
666 found_worker =
Craig Tillera4b8eb02017-04-29 00:13:52 +0000667 check_neighbourhood_for_available_poller(neighbourhood);
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700668 gpr_mu_unlock(&neighbourhood->mu);
Craig Tillerba550da2017-05-01 14:26:31 +0000669 scan_state[i] = true;
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700670 } else {
Craig Tillerba550da2017-05-01 14:26:31 +0000671 scan_state[i] = false;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700672 }
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700673 }
Craig Tiller2acab6e2017-04-30 23:06:33 +0000674 for (size_t i = 0; !found_worker && i < g_num_neighbourhoods; i++) {
Craig Tillerba550da2017-05-01 14:26:31 +0000675 if (scan_state[i]) continue;
Craig Tiller2acab6e2017-04-30 23:06:33 +0000676 pollset_neighbourhood *neighbourhood =
677 &g_neighbourhoods[(poller_neighbourhood_idx + i) %
678 g_num_neighbourhoods];
679 gpr_mu_lock(&neighbourhood->mu);
Craig Tillerba550da2017-05-01 14:26:31 +0000680 found_worker = check_neighbourhood_for_available_poller(neighbourhood);
Craig Tiller2acab6e2017-04-30 23:06:33 +0000681 gpr_mu_unlock(&neighbourhood->mu);
Craig Tillerbbf4c7a2017-04-28 15:12:10 -0700682 }
Craig Tiller8502ecb2017-04-28 14:22:01 -0700683 grpc_exec_ctx_flush(exec_ctx);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700684 gpr_mu_lock(&pollset->mu);
685 }
Craig Tiller50da5ec2017-05-01 13:51:14 -0700686 } else if (grpc_exec_ctx_has_work(exec_ctx)) {
687 gpr_mu_unlock(&pollset->mu);
688 grpc_exec_ctx_flush(exec_ctx);
689 gpr_mu_lock(&pollset->mu);
Craig Tiller4509c472017-04-27 19:05:13 +0000690 }
691 if (worker->initialized_cv) {
692 gpr_cv_destroy(&worker->cv);
693 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700694 if (EMPTIED == worker_remove(pollset, worker)) {
Craig Tiller4509c472017-04-27 19:05:13 +0000695 pollset_maybe_finish_shutdown(exec_ctx, pollset);
696 }
Craig Tillera4b8eb02017-04-29 00:13:52 +0000697 GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
Craig Tiller4509c472017-04-27 19:05:13 +0000698}
699
700/* pollset->po.mu lock must be held by the caller before calling this.
701 The function pollset_work() may temporarily release the lock (pollset->po.mu)
702 during the course of its execution but it will always re-acquire the lock and
703 ensure that it is held by the time the function returns */
704static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
705 grpc_pollset_worker **worker_hdl,
706 gpr_timespec now, gpr_timespec deadline) {
707 grpc_pollset_worker worker;
708 grpc_error *error = GRPC_ERROR_NONE;
709 static const char *err_desc = "pollset_work";
710 if (pollset->kicked_without_poller) {
711 pollset->kicked_without_poller = false;
712 return GRPC_ERROR_NONE;
713 }
Craig Tiller8502ecb2017-04-28 14:22:01 -0700714 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
Craig Tiller4509c472017-04-27 19:05:13 +0000715 if (begin_worker(pollset, &worker, worker_hdl, &now, deadline)) {
Craig Tiller4509c472017-04-27 19:05:13 +0000716 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
717 GPR_ASSERT(!pollset->shutdown_closure);
Craig Tiller2acab6e2017-04-30 23:06:33 +0000718 GPR_ASSERT(!pollset->seen_inactive);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700719 gpr_mu_unlock(&pollset->mu);
Craig Tiller4509c472017-04-27 19:05:13 +0000720 append_error(&error, pollset_epoll(exec_ctx, pollset, now, deadline),
721 err_desc);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700722 gpr_mu_lock(&pollset->mu);
Craig Tiller4509c472017-04-27 19:05:13 +0000723 gpr_tls_set(&g_current_thread_worker, 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000724 }
725 end_worker(exec_ctx, pollset, &worker, worker_hdl);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700726 gpr_tls_set(&g_current_thread_pollset, 0);
Craig Tiller4509c472017-04-27 19:05:13 +0000727 return error;
728}
729
730static grpc_error *pollset_kick(grpc_pollset *pollset,
731 grpc_pollset_worker *specific_worker) {
732 if (specific_worker == NULL) {
733 if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) {
Craig Tiller375eb252017-04-27 23:29:12 +0000734 grpc_pollset_worker *root_worker = pollset->root_worker;
735 if (root_worker == NULL) {
Craig Tiller4509c472017-04-27 19:05:13 +0000736 pollset->kicked_without_poller = true;
737 return GRPC_ERROR_NONE;
Craig Tiller375eb252017-04-27 23:29:12 +0000738 }
Craig Tiller32f90ee2017-04-28 12:46:41 -0700739 grpc_pollset_worker *next_worker = root_worker->next;
740 if (root_worker == next_worker &&
741 root_worker == (grpc_pollset_worker *)gpr_atm_no_barrier_load(
742 &g_active_poller)) {
743 root_worker->kick_state = KICKED;
Craig Tiller4509c472017-04-27 19:05:13 +0000744 return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700745 } else if (next_worker->kick_state == UNKICKED) {
746 GPR_ASSERT(next_worker->initialized_cv);
Craig Tiller32f90ee2017-04-28 12:46:41 -0700747 next_worker->kick_state = KICKED;
Craig Tiller375eb252017-04-27 23:29:12 +0000748 gpr_cv_signal(&next_worker->cv);
749 return GRPC_ERROR_NONE;
Craig Tiller8502ecb2017-04-28 14:22:01 -0700750 } else {
751 return GRPC_ERROR_NONE;
Craig Tiller4509c472017-04-27 19:05:13 +0000752 }
753 } else {
754 return GRPC_ERROR_NONE;
755 }
Craig Tiller43bf2592017-04-28 23:21:01 +0000756 } else if (specific_worker->kick_state == KICKED) {
Craig Tiller4509c472017-04-27 19:05:13 +0000757 return GRPC_ERROR_NONE;
758 } else if (gpr_tls_get(&g_current_thread_worker) ==
759 (intptr_t)specific_worker) {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700760 specific_worker->kick_state = KICKED;
Craig Tiller4509c472017-04-27 19:05:13 +0000761 return GRPC_ERROR_NONE;
Craig Tiller32f90ee2017-04-28 12:46:41 -0700762 } else if (specific_worker ==
763 (grpc_pollset_worker *)gpr_atm_no_barrier_load(&g_active_poller)) {
764 specific_worker->kick_state = KICKED;
Craig Tiller4509c472017-04-27 19:05:13 +0000765 return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
Craig Tiller8502ecb2017-04-28 14:22:01 -0700766 } else if (specific_worker->initialized_cv) {
Craig Tiller32f90ee2017-04-28 12:46:41 -0700767 specific_worker->kick_state = KICKED;
Craig Tiller4509c472017-04-27 19:05:13 +0000768 gpr_cv_signal(&specific_worker->cv);
769 return GRPC_ERROR_NONE;
Craig Tiller8502ecb2017-04-28 14:22:01 -0700770 } else {
771 specific_worker->kick_state = KICKED;
772 return GRPC_ERROR_NONE;
Craig Tiller4509c472017-04-27 19:05:13 +0000773 }
774}
775
776static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
777 grpc_fd *fd) {}
778
Craig Tiller4509c472017-04-27 19:05:13 +0000779/*******************************************************************************
780 * Workqueue Definitions
781 */
782
783#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
784static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
785 const char *file, int line,
786 const char *reason) {
787 return workqueue;
788}
789
790static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
791 const char *file, int line, const char *reason) {}
792#else
793static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
794 return workqueue;
795}
796
797static void workqueue_unref(grpc_exec_ctx *exec_ctx,
798 grpc_workqueue *workqueue) {}
799#endif
800
Craig Tiller50da5ec2017-05-01 13:51:14 -0700801static void wq_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
802 grpc_error *error) {
803 // find a neighbourhood to wakeup
804 bool scheduled = false;
805 size_t initial_neighbourhood = choose_neighbourhood();
806 for (size_t i = 0; !scheduled && i < g_num_neighbourhoods; i++) {
807 pollset_neighbourhood *neighbourhood =
808 &g_neighbourhoods[(initial_neighbourhood + i) % g_num_neighbourhoods];
809 if (gpr_mu_trylock(&neighbourhood->mu)) {
810 if (neighbourhood->active_root != NULL) {
811 grpc_pollset *inspect = neighbourhood->active_root;
812 do {
813 if (gpr_mu_trylock(&inspect->mu)) {
814 if (inspect->root_worker != NULL) {
815 grpc_pollset_worker *inspect_worker = inspect->root_worker;
816 do {
817 if (inspect_worker->kick_state == UNKICKED) {
818 inspect_worker->kick_state = KICKED;
819 grpc_closure_list_append(
820 &inspect_worker->schedule_on_end_work, closure, error);
821 if (inspect_worker->initialized_cv) {
822 gpr_cv_signal(&inspect_worker->cv);
823 }
824 scheduled = true;
825 }
826 inspect_worker = inspect_worker->next;
827 } while (!scheduled && inspect_worker != inspect->root_worker);
828 }
829 gpr_mu_unlock(&inspect->mu);
830 }
831 inspect = inspect->next;
832 } while (!scheduled && inspect != neighbourhood->active_root);
833 }
834 gpr_mu_unlock(&neighbourhood->mu);
835 }
836 }
837 if (!scheduled) {
Craig Tiller67e229e2017-05-01 20:57:59 +0000838 gpr_mu_lock(&g_wq_mu);
839 grpc_closure_list_append(&g_wq_items, closure, error);
840 gpr_mu_unlock(&g_wq_mu);
Craig Tiller50da5ec2017-05-01 13:51:14 -0700841 GRPC_LOG_IF_ERROR("workqueue_scheduler",
842 grpc_wakeup_fd_wakeup(&global_wakeup_fd));
843 }
844}
845
846static const grpc_closure_scheduler_vtable
847 singleton_workqueue_scheduler_vtable = {wq_sched, wq_sched,
848 "epoll1_workqueue"};
849
850static grpc_closure_scheduler singleton_workqueue_scheduler = {
851 &singleton_workqueue_scheduler_vtable};
852
Craig Tiller4509c472017-04-27 19:05:13 +0000853static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) {
Craig Tiller50da5ec2017-05-01 13:51:14 -0700854 return &singleton_workqueue_scheduler;
Craig Tiller4509c472017-04-27 19:05:13 +0000855}
Craig Tillerc67cc992017-04-27 10:15:51 -0700856
857/*******************************************************************************
858 * Pollset-set Definitions
859 */
860
861static grpc_pollset_set *pollset_set_create(void) {
862 return (grpc_pollset_set *)((intptr_t)0xdeafbeef);
863}
864
865static void pollset_set_destroy(grpc_exec_ctx *exec_ctx,
866 grpc_pollset_set *pss) {}
867
868static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
869 grpc_fd *fd) {}
870
871static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
872 grpc_fd *fd) {}
873
874static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
875 grpc_pollset_set *pss, grpc_pollset *ps) {}
876
877static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
878 grpc_pollset_set *pss, grpc_pollset *ps) {}
879
880static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
881 grpc_pollset_set *bag,
882 grpc_pollset_set *item) {}
883
884static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
885 grpc_pollset_set *bag,
886 grpc_pollset_set *item) {}
887
888/*******************************************************************************
889 * Event engine binding
890 */
891
892static void shutdown_engine(void) {
893 fd_global_shutdown();
894 pollset_global_shutdown();
Craig Tillerc67cc992017-04-27 10:15:51 -0700895}
896
897static const grpc_event_engine_vtable vtable = {
898 .pollset_size = sizeof(grpc_pollset),
899
900 .fd_create = fd_create,
901 .fd_wrapped_fd = fd_wrapped_fd,
902 .fd_orphan = fd_orphan,
903 .fd_shutdown = fd_shutdown,
904 .fd_is_shutdown = fd_is_shutdown,
905 .fd_notify_on_read = fd_notify_on_read,
906 .fd_notify_on_write = fd_notify_on_write,
907 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
908 .fd_get_workqueue = fd_get_workqueue,
909
910 .pollset_init = pollset_init,
911 .pollset_shutdown = pollset_shutdown,
912 .pollset_destroy = pollset_destroy,
913 .pollset_work = pollset_work,
914 .pollset_kick = pollset_kick,
915 .pollset_add_fd = pollset_add_fd,
916
917 .pollset_set_create = pollset_set_create,
918 .pollset_set_destroy = pollset_set_destroy,
919 .pollset_set_add_pollset = pollset_set_add_pollset,
920 .pollset_set_del_pollset = pollset_set_del_pollset,
921 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
922 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
923 .pollset_set_add_fd = pollset_set_add_fd,
924 .pollset_set_del_fd = pollset_set_del_fd,
925
Craig Tillerc67cc992017-04-27 10:15:51 -0700926 .workqueue_ref = workqueue_ref,
927 .workqueue_unref = workqueue_unref,
928 .workqueue_scheduler = workqueue_scheduler,
929
930 .shutdown_engine = shutdown_engine,
931};
932
933/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
934 * Create a dummy epoll_fd to make sure epoll support is available */
Craig Tiller6f0af492017-04-27 19:26:16 +0000935const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) {
Craig Tiller924353a2017-05-05 17:36:31 +0000936 /* TODO(ctiller): temporary, until this stabilizes */
937 if (!explicit_request) return NULL;
938
Craig Tillerc67cc992017-04-27 10:15:51 -0700939 if (!grpc_has_wakeup_fd()) {
940 return NULL;
941 }
942
Craig Tiller4509c472017-04-27 19:05:13 +0000943 g_epfd = epoll_create1(EPOLL_CLOEXEC);
944 if (g_epfd < 0) {
945 gpr_log(GPR_ERROR, "epoll unavailable");
Craig Tillerc67cc992017-04-27 10:15:51 -0700946 return NULL;
947 }
948
Craig Tillerc67cc992017-04-27 10:15:51 -0700949 fd_global_init();
950
951 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
Craig Tiller4509c472017-04-27 19:05:13 +0000952 close(g_epfd);
953 fd_global_shutdown();
Craig Tillerc67cc992017-04-27 10:15:51 -0700954 return NULL;
955 }
956
957 return &vtable;
958}
959
960#else /* defined(GRPC_LINUX_EPOLL) */
961#if defined(GRPC_POSIX_SOCKET)
962#include "src/core/lib/iomgr/ev_posix.h"
963/* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
964 * NULL */
Craig Tiller9ddb3152017-04-27 21:32:56 +0000965const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) {
966 return NULL;
967}
Craig Tillerc67cc992017-04-27 10:15:51 -0700968#endif /* defined(GRPC_POSIX_SOCKET) */
969#endif /* !defined(GRPC_LINUX_EPOLL) */