blob: 6c813bf27affec619312636ad4565c6a391b09d3 [file] [log] [blame]
ctiller58393c22015-01-07 14:03:30 -08001/*
2 *
Craig Tiller06059952015-02-18 08:34:56 -08003 * Copyright 2015, Google Inc.
ctiller58393c22015-01-07 14:03:30 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Craig Tillerd14a1a52015-01-21 15:26:29 -080034#include <grpc/support/port_platform.h>
35
36#ifdef GPR_POSIX_SOCKET
37
ctiller58393c22015-01-07 14:03:30 -080038#include "src/core/iomgr/pollset_posix.h"
39
40#include <errno.h>
ctiller58393c22015-01-07 14:03:30 -080041#include <stdlib.h>
42#include <string.h>
43#include <unistd.h>
44
45#include "src/core/iomgr/alarm_internal.h"
46#include "src/core/iomgr/fd_posix.h"
47#include "src/core/iomgr/iomgr_internal.h"
48#include "src/core/iomgr/socket_utils_posix.h"
Craig Tiller1be373c2015-04-27 07:58:16 -070049#include "src/core/profiling/timers.h"
vjpai9839d282015-09-24 17:55:18 -070050#include "src/core/support/block_annotate.h"
ctiller58393c22015-01-07 14:03:30 -080051#include <grpc/support/alloc.h>
52#include <grpc/support/log.h>
53#include <grpc/support/thd.h>
Craig Tiller1be373c2015-04-27 07:58:16 -070054#include <grpc/support/tls.h>
ctiller58393c22015-01-07 14:03:30 -080055#include <grpc/support/useful.h>
56
Craig Tillera82950e2015-09-22 12:33:20 -070057GPR_TLS_DECL(g_current_thread_poller);
58GPR_TLS_DECL(g_current_thread_worker);
Craig Tiller1be373c2015-04-27 07:58:16 -070059
Craig Tiller191b79c2015-09-30 10:49:58 -070060/** Default poll() function - a pointer so that it can be overridden by some
61 * tests */
Craig Tiller69f90e62015-08-06 08:32:35 -070062grpc_poll_function_type grpc_poll_function = poll;
Craig Tiller191b79c2015-09-30 10:49:58 -070063
64/** The alarm system needs to be able to wakeup 'some poller' sometimes
65 * (specifically when a new alarm needs to be triggered earlier than the next
66 * alarm 'epoch').
67 * This wakeup_fd gives us something to alert on when such a case occurs. */
Craig Tiller8afeec82015-09-28 17:03:34 -070068grpc_wakeup_fd grpc_global_wakeup_fd;
Craig Tiller69f90e62015-08-06 08:32:35 -070069
Craig Tillera82950e2015-09-22 12:33:20 -070070static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
Craig Tiller5ddbb9d2015-07-29 15:58:11 -070071 worker->prev->next = worker->next;
72 worker->next->prev = worker->prev;
73}
74
Craig Tillera82950e2015-09-22 12:33:20 -070075int grpc_pollset_has_workers(grpc_pollset *p) {
Craig Tiller5ddbb9d2015-07-29 15:58:11 -070076 return p->root_worker.next != &p->root_worker;
77}
78
Craig Tillera82950e2015-09-22 12:33:20 -070079static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
80 if (grpc_pollset_has_workers(p)) {
81 grpc_pollset_worker *w = p->root_worker.next;
82 remove_worker(p, w);
83 return w;
84 } else {
85 return NULL;
86 }
ctiller58393c22015-01-07 14:03:30 -080087}
88
Craig Tillera82950e2015-09-22 12:33:20 -070089static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
Craig Tiller5ddbb9d2015-07-29 15:58:11 -070090 worker->next = &p->root_worker;
91 worker->prev = worker->next->prev;
92 worker->prev->next = worker->next->prev = worker;
Craig Tiller7d413212015-02-09 08:00:02 -080093}
ctiller58393c22015-01-07 14:03:30 -080094
Craig Tillera82950e2015-09-22 12:33:20 -070095static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
Craig Tiller5ddbb9d2015-07-29 15:58:11 -070096 worker->prev = &p->root_worker;
97 worker->next = worker->prev->next;
98 worker->prev->next = worker->next->prev = worker;
99}
100
Craig Tillerd0a00002015-10-06 11:30:37 -0700101void grpc_pollset_kick_ext(grpc_pollset *p,
102 grpc_pollset_worker *specific_worker,
Craig Tillerb937aa12015-10-06 11:30:30 -0700103 gpr_uint32 flags) {
Craig Tiller44011e82015-10-09 16:16:43 -0700104 GRPC_TIMER_BEGIN("grpc_pollset_kick_ext", 0);
105
Craig Tiller4c06b822015-08-06 08:41:31 -0700106 /* pollset->mu already held */
Craig Tillera82950e2015-09-22 12:33:20 -0700107 if (specific_worker != NULL) {
108 if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
Craig Tiller44011e82015-10-09 16:16:43 -0700109 GRPC_TIMER_BEGIN("grpc_pollset_kick_ext.broadcast", 0);
Craig Tiller988e37f2015-10-01 07:53:56 -0700110 GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
Craig Tillera82950e2015-09-22 12:33:20 -0700111 for (specific_worker = p->root_worker.next;
112 specific_worker != &p->root_worker;
113 specific_worker = specific_worker->next) {
114 grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd);
115 }
116 p->kicked_without_pollers = 1;
Craig Tiller44011e82015-10-09 16:16:43 -0700117 GRPC_TIMER_END("grpc_pollset_kick_ext.broadcast", 0);
Craig Tillera82950e2015-09-22 12:33:20 -0700118 } else if (gpr_tls_get(&g_current_thread_worker) !=
119 (gpr_intptr)specific_worker) {
Craig Tiller44011e82015-10-09 16:16:43 -0700120 GRPC_TIMER_MARK("different_thread_worker", 0);
Craig Tiller988e37f2015-10-01 07:53:56 -0700121 if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
122 specific_worker->reevaluate_polling_on_wakeup = 1;
123 }
Craig Tillera82950e2015-09-22 12:33:20 -0700124 grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd);
Craig Tiller988e37f2015-10-01 07:53:56 -0700125 } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) {
Craig Tiller44011e82015-10-09 16:16:43 -0700126 GRPC_TIMER_MARK("kick_yoself", 0);
Craig Tiller988e37f2015-10-01 07:53:56 -0700127 if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
128 specific_worker->reevaluate_polling_on_wakeup = 1;
129 }
130 grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd);
Craig Tiller5ddbb9d2015-07-29 15:58:11 -0700131 }
Craig Tillera82950e2015-09-22 12:33:20 -0700132 } else if (gpr_tls_get(&g_current_thread_poller) != (gpr_intptr)p) {
Craig Tiller988e37f2015-10-01 07:53:56 -0700133 GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
Craig Tiller44011e82015-10-09 16:16:43 -0700134 GRPC_TIMER_MARK("kick_anonymous", 0);
Craig Tillera82950e2015-09-22 12:33:20 -0700135 specific_worker = pop_front_worker(p);
136 if (specific_worker != NULL) {
Craig Tiller57f79d62015-10-02 14:00:12 -0700137 if (gpr_tls_get(&g_current_thread_worker) ==
138 (gpr_intptr)specific_worker) {
Craig Tiller44011e82015-10-09 16:16:43 -0700139 GRPC_TIMER_MARK("kick_anonymous_not_self", 0);
Craig Tiller988e37f2015-10-01 07:53:56 -0700140 push_back_worker(p, specific_worker);
141 specific_worker = pop_front_worker(p);
Craig Tiller57f79d62015-10-02 14:00:12 -0700142 if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 &&
143 gpr_tls_get(&g_current_thread_worker) ==
144 (gpr_intptr)specific_worker) {
Craig Tiller988e37f2015-10-01 07:53:56 -0700145 push_back_worker(p, specific_worker);
Craig Tiller44011e82015-10-09 16:16:43 -0700146 specific_worker = NULL;
Craig Tiller988e37f2015-10-01 07:53:56 -0700147 }
148 }
Craig Tiller44011e82015-10-09 16:16:43 -0700149 if (specific_worker != NULL) {
150 GRPC_TIMER_MARK("finally_kick", 0);
151 push_back_worker(p, specific_worker);
152 grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd);
153 }
Craig Tillera82950e2015-09-22 12:33:20 -0700154 } else {
Craig Tiller44011e82015-10-09 16:16:43 -0700155 GRPC_TIMER_MARK("kicked_no_pollers", 0);
Craig Tillera82950e2015-09-22 12:33:20 -0700156 p->kicked_without_pollers = 1;
Craig Tiller5ddbb9d2015-07-29 15:58:11 -0700157 }
Craig Tillera82950e2015-09-22 12:33:20 -0700158 }
Craig Tiller44011e82015-10-09 16:16:43 -0700159
160 GRPC_TIMER_END("grpc_pollset_kick_ext", 0);
David Klempnerbaced4d2015-02-10 17:10:15 -0800161}
162
Craig Tiller988e37f2015-10-01 07:53:56 -0700163void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {
Craig Tillerb937aa12015-10-06 11:30:30 -0700164 grpc_pollset_kick_ext(p, specific_worker, 0);
Craig Tiller988e37f2015-10-01 07:53:56 -0700165}
166
ctiller58393c22015-01-07 14:03:30 -0800167/* global state management */
168
Craig Tillera82950e2015-09-22 12:33:20 -0700169void grpc_pollset_global_init(void) {
170 gpr_tls_init(&g_current_thread_poller);
Craig Tiller926c0e92015-09-28 14:25:38 -0700171 gpr_tls_init(&g_current_thread_worker);
Craig Tillera82950e2015-09-22 12:33:20 -0700172 grpc_wakeup_fd_global_init();
Craig Tiller8afeec82015-09-28 17:03:34 -0700173 grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
David Klempnerb5056612015-02-24 14:22:50 -0800174}
175
Craig Tillera82950e2015-09-22 12:33:20 -0700176void grpc_pollset_global_shutdown(void) {
Craig Tiller8afeec82015-09-28 17:03:34 -0700177 grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
178 grpc_wakeup_fd_global_destroy();
Craig Tillera82950e2015-09-22 12:33:20 -0700179 gpr_tls_destroy(&g_current_thread_poller);
Craig Tiller926c0e92015-09-28 14:25:38 -0700180 gpr_tls_destroy(&g_current_thread_worker);
Craig Tiller8afeec82015-09-28 17:03:34 -0700181}
182
Craig Tiller71a0f9d2015-09-28 17:22:01 -0700183void grpc_kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); }
ctiller58393c22015-01-07 14:03:30 -0800184
185/* main interface */
186
Craig Tillera82950e2015-09-22 12:33:20 -0700187static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null);
ctiller58393c22015-01-07 14:03:30 -0800188
Craig Tillera82950e2015-09-22 12:33:20 -0700189void grpc_pollset_init(grpc_pollset *pollset) {
190 gpr_mu_init(&pollset->mu);
Craig Tiller5ddbb9d2015-07-29 15:58:11 -0700191 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
David Klempnerb5056612015-02-24 14:22:50 -0800192 pollset->in_flight_cbs = 0;
193 pollset->shutting_down = 0;
Jan Tattermuschd6ca8b42015-06-10 15:58:39 -0700194 pollset->called_shutdown = 0;
Craig Tillerd1bec032015-09-18 17:29:00 -0700195 pollset->idle_jobs.head = pollset->idle_jobs.tail = NULL;
Craig Tillera82950e2015-09-22 12:33:20 -0700196 become_basic_pollset(pollset, NULL);
ctiller58393c22015-01-07 14:03:30 -0800197}
198
Craig Tillera82950e2015-09-22 12:33:20 -0700199void grpc_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
200 grpc_fd *fd) {
201 gpr_mu_lock(&pollset->mu);
202 pollset->vtable->add_fd(exec_ctx, pollset, fd, 1);
Craig Tiller4c06b822015-08-06 08:41:31 -0700203/* the following (enabled only in debug) will reacquire and then release
204 our lock - meaning that if the unlocking flag passed to del_fd above is
205 not respected, the code will deadlock (in a way that we have a chance of
206 debugging) */
Craig Tillerb0c13ad2015-07-16 08:42:31 -0700207#ifndef NDEBUG
Craig Tillera82950e2015-09-22 12:33:20 -0700208 gpr_mu_lock(&pollset->mu);
209 gpr_mu_unlock(&pollset->mu);
Craig Tillerb0c13ad2015-07-16 08:42:31 -0700210#endif
ctiller58393c22015-01-07 14:03:30 -0800211}
212
Craig Tillera82950e2015-09-22 12:33:20 -0700213void grpc_pollset_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
214 grpc_fd *fd) {
215 gpr_mu_lock(&pollset->mu);
216 pollset->vtable->del_fd(exec_ctx, pollset, fd, 1);
Craig Tiller4c06b822015-08-06 08:41:31 -0700217/* the following (enabled only in debug) will reacquire and then release
218 our lock - meaning that if the unlocking flag passed to del_fd above is
219 not respected, the code will deadlock (in a way that we have a chance of
220 debugging) */
Craig Tillerabfaf2a2015-07-16 17:23:40 -0700221#ifndef NDEBUG
Craig Tillera82950e2015-09-22 12:33:20 -0700222 gpr_mu_lock(&pollset->mu);
223 gpr_mu_unlock(&pollset->mu);
Craig Tillerb0c13ad2015-07-16 08:42:31 -0700224#endif
ctiller58393c22015-01-07 14:03:30 -0800225}
226
Craig Tillera82950e2015-09-22 12:33:20 -0700227static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
Craig Tiller63010382015-09-24 15:00:58 -0700228 GPR_ASSERT(grpc_closure_list_empty(pollset->idle_jobs));
Craig Tillera82950e2015-09-22 12:33:20 -0700229 pollset->vtable->finish_shutdown(pollset);
230 grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, 1);
Craig Tiller000cd8f2015-09-18 07:20:29 -0700231}
232
Craig Tillera82950e2015-09-22 12:33:20 -0700233void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
234 grpc_pollset_worker *worker, gpr_timespec now,
235 gpr_timespec deadline) {
ctiller58393c22015-01-07 14:03:30 -0800236 /* pollset->mu already held */
Craig Tiller5ddbb9d2015-07-29 15:58:11 -0700237 int added_worker = 0;
Craig Tillerd1bec032015-09-18 17:29:00 -0700238 int locked = 1;
Craig Tillerdc174712015-10-01 10:25:02 -0700239 int queued_work = 0;
Craig Tiller6078a7d2015-10-06 11:50:21 -0700240 int keep_polling = 0;
Craig Tiller1f41b6b2015-10-09 15:07:02 -0700241 GRPC_TIMER_BEGIN("grpc_pollset_work", 0);
Craig Tiller5ddbb9d2015-07-29 15:58:11 -0700242 /* this must happen before we (potentially) drop pollset->mu */
243 worker->next = worker->prev = NULL;
Craig Tillerdc174712015-10-01 10:25:02 -0700244 worker->reevaluate_polling_on_wakeup = 0;
Craig Tiller5ddbb9d2015-07-29 15:58:11 -0700245 /* TODO(ctiller): pool these */
Craig Tillera82950e2015-09-22 12:33:20 -0700246 grpc_wakeup_fd_init(&worker->wakeup_fd);
Craig Tiller6078a7d2015-10-06 11:50:21 -0700247 /* If there's work waiting for the pollset to be idle, and the
248 pollset is idle, then do that work */
Craig Tillera82950e2015-09-22 12:33:20 -0700249 if (!grpc_pollset_has_workers(pollset) &&
250 !grpc_closure_list_empty(pollset->idle_jobs)) {
251 grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs);
252 goto done;
253 }
Craig Tiller6078a7d2015-10-06 11:50:21 -0700254 /* Check alarms - these are a global resource so we just ping
255 each time through on every pollset.
256 May update deadline to ensure timely wakeups.
257 TODO(ctiller): can this work be localized? */
Craig Tillera82950e2015-09-22 12:33:20 -0700258 if (grpc_alarm_check(exec_ctx, now, &deadline)) {
Craig Tiller8afeec82015-09-28 17:03:34 -0700259 gpr_mu_unlock(&pollset->mu);
260 locked = 0;
Craig Tillera82950e2015-09-22 12:33:20 -0700261 goto done;
262 }
Craig Tiller6078a7d2015-10-06 11:50:21 -0700263 /* If we're shutting down then we don't execute any extended work */
Craig Tillera82950e2015-09-22 12:33:20 -0700264 if (pollset->shutting_down) {
265 goto done;
266 }
Craig Tiller6078a7d2015-10-06 11:50:21 -0700267 /* Give do_promote priority so we don't starve it out */
Craig Tillera82950e2015-09-22 12:33:20 -0700268 if (pollset->in_flight_cbs) {
Craig Tillera82950e2015-09-22 12:33:20 -0700269 gpr_mu_unlock(&pollset->mu);
270 locked = 0;
271 goto done;
272 }
Craig Tiller6078a7d2015-10-06 11:50:21 -0700273 /* Start polling, and keep doing so while we're being asked to
274 re-evaluate our pollers (this allows poll() based pollers to
275 ensure they don't miss wakeups) */
276 keep_polling = 1;
277 while (keep_polling) {
278 keep_polling = 0;
Craig Tillerdc174712015-10-01 10:25:02 -0700279 if (!pollset->kicked_without_pollers) {
280 if (!added_worker) {
281 push_front_worker(pollset, worker);
282 added_worker = 1;
283 }
284 gpr_tls_set(&g_current_thread_poller, (gpr_intptr)pollset);
285 gpr_tls_set(&g_current_thread_worker, (gpr_intptr)worker);
Craig Tiller1f41b6b2015-10-09 15:07:02 -0700286 GRPC_TIMER_BEGIN("maybe_work_and_unlock", 0);
Craig Tillerdc174712015-10-01 10:25:02 -0700287 pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, worker,
288 deadline, now);
Craig Tiller1f41b6b2015-10-09 15:07:02 -0700289 GRPC_TIMER_END("maybe_work_and_unlock", 0);
Craig Tillerdc174712015-10-01 10:25:02 -0700290 locked = 0;
291 gpr_tls_set(&g_current_thread_poller, 0);
292 gpr_tls_set(&g_current_thread_worker, 0);
293 } else {
294 pollset->kicked_without_pollers = 0;
295 }
Craig Tiller6078a7d2015-10-06 11:50:21 -0700296 /* Finished execution - start cleaning up.
297 Note that we may arrive here from outside the enclosing while() loop.
298 In that case we won't loop though as we haven't added worker to the
299 worker list, which means nobody could ask us to re-evaluate polling). */
Craig Tillerdc174712015-10-01 10:25:02 -0700300 done:
301 if (!locked) {
302 queued_work |= grpc_exec_ctx_flush(exec_ctx);
303 gpr_mu_lock(&pollset->mu);
304 locked = 1;
305 }
Craig Tiller6078a7d2015-10-06 11:50:21 -0700306 /* If we're forced to re-evaluate polling (via grpc_pollset_kick with
307 GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force
308 a loop */
Craig Tillerdc174712015-10-01 10:25:02 -0700309 if (worker->reevaluate_polling_on_wakeup) {
310 worker->reevaluate_polling_on_wakeup = 0;
Craig Tiller58d05a62015-10-02 13:59:31 -0700311 pollset->kicked_without_pollers = 0;
Craig Tillerdc174712015-10-01 10:25:02 -0700312 if (queued_work) {
Craig Tiller6078a7d2015-10-06 11:50:21 -0700313 /* If there's queued work on the list, then set the deadline to be
314 immediate so we get back out of the polling loop quickly */
Craig Tillerdc174712015-10-01 10:25:02 -0700315 deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC);
316 }
Craig Tiller6078a7d2015-10-06 11:50:21 -0700317 keep_polling = 1;
Craig Tillerdc174712015-10-01 10:25:02 -0700318 }
Craig Tillera82950e2015-09-22 12:33:20 -0700319 }
Craig Tillera82950e2015-09-22 12:33:20 -0700320 if (added_worker) {
321 remove_worker(pollset, worker);
322 }
Craig Tillerdc174712015-10-01 10:25:02 -0700323 grpc_wakeup_fd_destroy(&worker->wakeup_fd);
Craig Tillera82950e2015-09-22 12:33:20 -0700324 if (pollset->shutting_down) {
325 if (grpc_pollset_has_workers(pollset)) {
326 grpc_pollset_kick(pollset, NULL);
327 } else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) {
Craig Tiller45724b32015-09-22 10:42:19 -0700328 pollset->called_shutdown = 1;
Craig Tillera82950e2015-09-22 12:33:20 -0700329 gpr_mu_unlock(&pollset->mu);
330 finish_shutdown(exec_ctx, pollset);
331 grpc_exec_ctx_flush(exec_ctx);
332 /* Continuing to access pollset here is safe -- it is the caller's
333 * responsibility to not destroy when it has outstanding calls to
334 * grpc_pollset_work.
335 * TODO(dklempner): Can we refactor the shutdown logic to avoid this? */
336 gpr_mu_lock(&pollset->mu);
Craig Tiller63010382015-09-24 15:00:58 -0700337 } else if (!grpc_closure_list_empty(pollset->idle_jobs)) {
338 gpr_mu_unlock(&pollset->mu);
339 grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs);
340 grpc_exec_ctx_flush(exec_ctx);
341 gpr_mu_lock(&pollset->mu);
Craig Tiller45724b32015-09-22 10:42:19 -0700342 }
Craig Tillera82950e2015-09-22 12:33:20 -0700343 }
Craig Tiller1f41b6b2015-10-09 15:07:02 -0700344 GRPC_TIMER_END("grpc_pollset_work", 0);
Craig Tillera82950e2015-09-22 12:33:20 -0700345}
346
347void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
348 grpc_closure *closure) {
349 int call_shutdown = 0;
350 gpr_mu_lock(&pollset->mu);
351 GPR_ASSERT(!pollset->shutting_down);
352 pollset->shutting_down = 1;
353 if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 &&
354 !grpc_pollset_has_workers(pollset)) {
355 pollset->called_shutdown = 1;
356 call_shutdown = 1;
357 }
Craig Tiller63010382015-09-24 15:00:58 -0700358 if (!grpc_pollset_has_workers(pollset)) {
359 grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs);
360 }
Craig Tillerd1bec032015-09-18 17:29:00 -0700361 pollset->shutdown_done = closure;
Craig Tillera82950e2015-09-22 12:33:20 -0700362 grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
363 gpr_mu_unlock(&pollset->mu);
David Klempnera4fb7002015-05-22 13:44:15 -0700364
Craig Tillera82950e2015-09-22 12:33:20 -0700365 if (call_shutdown) {
366 finish_shutdown(exec_ctx, pollset);
367 }
David Klempnerb5056612015-02-24 14:22:50 -0800368}
369
Craig Tillera82950e2015-09-22 12:33:20 -0700370void grpc_pollset_destroy(grpc_pollset *pollset) {
371 GPR_ASSERT(pollset->shutting_down);
372 GPR_ASSERT(pollset->in_flight_cbs == 0);
373 GPR_ASSERT(!grpc_pollset_has_workers(pollset));
374 pollset->vtable->destroy(pollset);
375 gpr_mu_destroy(&pollset->mu);
ctiller58393c22015-01-07 14:03:30 -0800376}
377
Craig Tillera82950e2015-09-22 12:33:20 -0700378int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline,
379 gpr_timespec now) {
Craig Tiller6174b9a2015-06-18 08:13:05 -0700380 gpr_timespec timeout;
381 static const int max_spin_polling_us = 10;
Craig Tillera82950e2015-09-22 12:33:20 -0700382 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
383 return -1;
384 }
385 if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
386 max_spin_polling_us,
387 GPR_TIMESPAN))) <= 0) {
388 return 0;
389 }
390 timeout = gpr_time_sub(deadline, now);
391 return gpr_time_to_millis(gpr_time_add(
Craig Tiller8afeec82015-09-28 17:03:34 -0700392 timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
Craig Tiller6174b9a2015-06-18 08:13:05 -0700393}
394
ctiller58393c22015-01-07 14:03:30 -0800395/*
Craig Tiller5ec3bfa2015-05-28 14:13:18 -0700396 * basic_pollset - a vtable that provides polling for zero or one file
Craig Tiller3f529a2c2015-05-28 14:00:47 -0700397 * descriptor via poll()
ctiller58393c22015-01-07 14:03:30 -0800398 */
399
Craig Tillera82950e2015-09-22 12:33:20 -0700400typedef struct grpc_unary_promote_args {
David Klempner7f43eaf2015-02-18 17:00:31 -0800401 const grpc_pollset_vtable *original_vtable;
402 grpc_pollset *pollset;
403 grpc_fd *fd;
Craig Tiller33825112015-09-18 07:44:19 -0700404 grpc_closure promotion_closure;
David Klempner7f43eaf2015-02-18 17:00:31 -0800405} grpc_unary_promote_args;
406
Craig Tillera82950e2015-09-22 12:33:20 -0700407static void basic_do_promote(grpc_exec_ctx *exec_ctx, void *args, int success) {
David Klempner7f43eaf2015-02-18 17:00:31 -0800408 grpc_unary_promote_args *up_args = args;
409 const grpc_pollset_vtable *original_vtable = up_args->original_vtable;
410 grpc_pollset *pollset = up_args->pollset;
411 grpc_fd *fd = up_args->fd;
David Klempner7f43eaf2015-02-18 17:00:31 -0800412
David Klempnerb5056612015-02-24 14:22:50 -0800413 /*
414 * This is quite tricky. There are a number of cases to keep in mind here:
415 * 1. fd may have been orphaned
416 * 2. The pollset may no longer be a unary poller (and we can't let case #1
417 * leak to other pollset types!)
418 * 3. pollset's fd (which may have changed) may have been orphaned
419 * 4. The pollset may be shutting down.
420 */
421
Craig Tillera82950e2015-09-22 12:33:20 -0700422 gpr_mu_lock(&pollset->mu);
David Klempner7f43eaf2015-02-18 17:00:31 -0800423 /* First we need to ensure that nobody is polling concurrently */
Craig Tillera82950e2015-09-22 12:33:20 -0700424 GPR_ASSERT(!grpc_pollset_has_workers(pollset));
Craig Tiller6bf45012015-05-15 17:11:44 -0700425
Craig Tillera82950e2015-09-22 12:33:20 -0700426 gpr_free(up_args);
David Klempner7f43eaf2015-02-18 17:00:31 -0800427 /* At this point the pollset may no longer be a unary poller. In that case
428 * we should just call the right add function and be done. */
429 /* TODO(klempner): If we're not careful this could cause infinite recursion.
430 * That's not a problem for now because empty_pollset has a trivial poller
431 * and we don't have any mechanism to unbecome multipoller. */
David Klempnerb5056612015-02-24 14:22:50 -0800432 pollset->in_flight_cbs--;
Craig Tillera82950e2015-09-22 12:33:20 -0700433 if (pollset->shutting_down) {
434 /* We don't care about this pollset anymore. */
435 if (pollset->in_flight_cbs == 0 && !pollset->called_shutdown) {
yang-g2c1c0002015-10-02 16:15:05 -0700436 pollset->called_shutdown = 1;
Craig Tiller63010382015-09-24 15:00:58 -0700437 finish_shutdown(exec_ctx, pollset);
David Klempnerb5056612015-02-24 14:22:50 -0800438 }
Craig Tillera82950e2015-09-22 12:33:20 -0700439 } else if (grpc_fd_is_orphaned(fd)) {
440 /* Don't try to add it to anything, we'll drop our ref on it below */
441 } else if (pollset->vtable != original_vtable) {
442 pollset->vtable->add_fd(exec_ctx, pollset, fd, 0);
443 } else if (fd != pollset->data.ptr) {
444 grpc_fd *fds[2];
445 fds[0] = pollset->data.ptr;
446 fds[1] = fd;
David Klempner7f43eaf2015-02-18 17:00:31 -0800447
Craig Tillera82950e2015-09-22 12:33:20 -0700448 if (fds[0] && !grpc_fd_is_orphaned(fds[0])) {
449 grpc_platform_become_multipoller(exec_ctx, pollset, fds,
450 GPR_ARRAY_SIZE(fds));
451 GRPC_FD_UNREF(fds[0], "basicpoll");
452 } else {
453 /* old fd is orphaned and we haven't cleaned it up until now, so remain a
454 * unary poller */
455 /* Note that it is possible that fds[1] is also orphaned at this point.
456 * That's okay, we'll correct it at the next add or poll. */
457 if (fds[0]) GRPC_FD_UNREF(fds[0], "basicpoll");
458 pollset->data.ptr = fd;
459 GRPC_FD_REF(fd, "basicpoll");
Craig Tiller45724b32015-09-22 10:42:19 -0700460 }
Craig Tillera82950e2015-09-22 12:33:20 -0700461 }
Craig Tiller45724b32015-09-22 10:42:19 -0700462
Craig Tillera82950e2015-09-22 12:33:20 -0700463 gpr_mu_unlock(&pollset->mu);
David Klempner7f43eaf2015-02-18 17:00:31 -0800464
Craig Tiller3f529a2c2015-05-28 14:00:47 -0700465 /* Matching ref in basic_pollset_add_fd */
Craig Tillera82950e2015-09-22 12:33:20 -0700466 GRPC_FD_UNREF(fd, "basicpoll_add");
David Klempner7f43eaf2015-02-18 17:00:31 -0800467}
468
Craig Tillera82950e2015-09-22 12:33:20 -0700469static void basic_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
470 grpc_fd *fd, int and_unlock_pollset) {
David Klempner7f43eaf2015-02-18 17:00:31 -0800471 grpc_unary_promote_args *up_args;
Craig Tillera82950e2015-09-22 12:33:20 -0700472 GPR_ASSERT(fd);
473 if (fd == pollset->data.ptr) goto exit;
Craig Tiller45724b32015-09-22 10:42:19 -0700474
Craig Tillera82950e2015-09-22 12:33:20 -0700475 if (!grpc_pollset_has_workers(pollset)) {
476 /* Fast path -- no in flight cbs */
477 /* TODO(klempner): Comment this out and fix any test failures or establish
478 * they are due to timing issues */
479 grpc_fd *fds[2];
480 fds[0] = pollset->data.ptr;
481 fds[1] = fd;
Craig Tiller45724b32015-09-22 10:42:19 -0700482
Craig Tillera82950e2015-09-22 12:33:20 -0700483 if (fds[0] == NULL) {
484 pollset->data.ptr = fd;
485 GRPC_FD_REF(fd, "basicpoll");
486 } else if (!grpc_fd_is_orphaned(fds[0])) {
487 grpc_platform_become_multipoller(exec_ctx, pollset, fds,
488 GPR_ARRAY_SIZE(fds));
489 GRPC_FD_UNREF(fds[0], "basicpoll");
490 } else {
491 /* old fd is orphaned and we haven't cleaned it up until now, so remain a
492 * unary poller */
493 GRPC_FD_UNREF(fds[0], "basicpoll");
494 pollset->data.ptr = fd;
495 GRPC_FD_REF(fd, "basicpoll");
Craig Tiller45724b32015-09-22 10:42:19 -0700496 }
Craig Tillera82950e2015-09-22 12:33:20 -0700497 goto exit;
498 }
David Klempner7f43eaf2015-02-18 17:00:31 -0800499
500 /* Now we need to promote. This needs to happen when we're not polling. Since
501 * this may be called from poll, the wait needs to happen asynchronously. */
Craig Tillera82950e2015-09-22 12:33:20 -0700502 GRPC_FD_REF(fd, "basicpoll_add");
David Klempnerb5056612015-02-24 14:22:50 -0800503 pollset->in_flight_cbs++;
Craig Tillera82950e2015-09-22 12:33:20 -0700504 up_args = gpr_malloc(sizeof(*up_args));
David Klempner7f43eaf2015-02-18 17:00:31 -0800505 up_args->fd = fd;
506 up_args->original_vtable = pollset->vtable;
Craig Tillerba496452015-09-21 17:15:19 -0700507 up_args->pollset = pollset;
Craig Tillerb4756552015-06-01 20:33:37 -0700508 up_args->promotion_closure.cb = basic_do_promote;
David Garcia Quintas284488b2015-05-28 16:27:39 -0700509 up_args->promotion_closure.cb_arg = up_args;
David Klempner7f43eaf2015-02-18 17:00:31 -0800510
Craig Tillera82950e2015-09-22 12:33:20 -0700511 grpc_closure_list_add(&pollset->idle_jobs, &up_args->promotion_closure, 1);
512 grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
Craig Tiller5c785d42015-07-14 08:23:43 -0700513
514exit:
Craig Tillera82950e2015-09-22 12:33:20 -0700515 if (and_unlock_pollset) {
516 gpr_mu_unlock(&pollset->mu);
517 }
ctiller58393c22015-01-07 14:03:30 -0800518}
519
Craig Tillera82950e2015-09-22 12:33:20 -0700520static void basic_pollset_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
521 grpc_fd *fd, int and_unlock_pollset) {
522 GPR_ASSERT(fd);
523 if (fd == pollset->data.ptr) {
524 GRPC_FD_UNREF(pollset->data.ptr, "basicpoll");
525 pollset->data.ptr = NULL;
526 }
Craig Tiller5c785d42015-07-14 08:23:43 -0700527
Craig Tillera82950e2015-09-22 12:33:20 -0700528 if (and_unlock_pollset) {
529 gpr_mu_unlock(&pollset->mu);
530 }
ctiller58393c22015-01-07 14:03:30 -0800531}
532
Craig Tillera82950e2015-09-22 12:33:20 -0700533static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx,
534 grpc_pollset *pollset,
535 grpc_pollset_worker *worker,
536 gpr_timespec deadline,
537 gpr_timespec now) {
Craig Tiller58d05a62015-10-02 13:59:31 -0700538#define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR)
Craig Tiller57f79d62015-10-02 14:00:12 -0700539#define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR)
Craig Tiller58d05a62015-10-02 13:59:31 -0700540
Craig Tiller8afeec82015-09-28 17:03:34 -0700541 struct pollfd pfd[3];
ctiller58393c22015-01-07 14:03:30 -0800542 grpc_fd *fd;
Craig Tiller7d413212015-02-09 08:00:02 -0800543 grpc_fd_watcher fd_watcher;
ctiller58393c22015-01-07 14:03:30 -0800544 int timeout;
545 int r;
Craig Tiller3121fd42015-09-10 09:56:20 -0700546 nfds_t nfds;
ctiller58393c22015-01-07 14:03:30 -0800547
ctiller58393c22015-01-07 14:03:30 -0800548 fd = pollset->data.ptr;
Craig Tillera82950e2015-09-22 12:33:20 -0700549 if (fd && grpc_fd_is_orphaned(fd)) {
550 GRPC_FD_UNREF(fd, "basicpoll");
551 fd = pollset->data.ptr = NULL;
552 }
553 timeout = grpc_poll_deadline_to_millis_timeout(deadline, now);
Craig Tiller8afeec82015-09-28 17:03:34 -0700554 pfd[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd);
ctiller58393c22015-01-07 14:03:30 -0800555 pfd[0].events = POLLIN;
556 pfd[0].revents = 0;
Craig Tiller8afeec82015-09-28 17:03:34 -0700557 pfd[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd);
558 pfd[1].events = POLLIN;
559 pfd[1].revents = 0;
560 nfds = 2;
Craig Tillera82950e2015-09-22 12:33:20 -0700561 if (fd) {
Craig Tiller8afeec82015-09-28 17:03:34 -0700562 pfd[2].fd = fd->fd;
563 pfd[2].revents = 0;
Craig Tillerbae235c2015-10-05 11:33:36 -0700564 GRPC_FD_REF(fd, "basicpoll_begin");
Craig Tillera82950e2015-09-22 12:33:20 -0700565 gpr_mu_unlock(&pollset->mu);
Craig Tillerdc174712015-10-01 10:25:02 -0700566 pfd[2].events = (short)grpc_fd_begin_poll(fd, pollset, worker, POLLIN,
567 POLLOUT, &fd_watcher);
Craig Tiller8afeec82015-09-28 17:03:34 -0700568 if (pfd[2].events != 0) {
Craig Tillera82950e2015-09-22 12:33:20 -0700569 nfds++;
Craig Tiller3f529a2c2015-05-28 14:00:47 -0700570 }
Craig Tillera82950e2015-09-22 12:33:20 -0700571 } else {
572 gpr_mu_unlock(&pollset->mu);
573 }
Craig Tiller48d01ae2015-01-29 22:08:59 -0800574
Vijay Pai8c7665e2015-09-25 21:40:19 -0700575 /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
576 even going into the blocking annotation if possible */
Craig Tillerbaac9402015-05-28 07:39:31 -0700577 /* poll fd count (argument 2) is shortened by one if we have no events
578 to poll on - such that it only includes the kicker */
Craig Tiller1f41b6b2015-10-09 15:07:02 -0700579 GRPC_TIMER_BEGIN("poll", 0);
vjpai9839d282015-09-24 17:55:18 -0700580 GRPC_SCHEDULING_START_BLOCKING_REGION;
Craig Tillera82950e2015-09-22 12:33:20 -0700581 r = grpc_poll_function(pfd, nfds, timeout);
vjpai9839d282015-09-24 17:55:18 -0700582 GRPC_SCHEDULING_END_BLOCKING_REGION;
Craig Tiller1f41b6b2015-10-09 15:07:02 -0700583 GRPC_TIMER_END("poll", 0);
Craig Tillera13752d2015-03-06 23:01:38 -0800584
Craig Tillera82950e2015-09-22 12:33:20 -0700585 if (r < 0) {
Craig Tiller58d05a62015-10-02 13:59:31 -0700586 gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
587 if (fd) {
588 grpc_fd_end_poll(exec_ctx, &fd_watcher, 0, 0);
ctillerd9962df2015-01-07 15:31:39 -0800589 }
Craig Tillera82950e2015-09-22 12:33:20 -0700590 } else if (r == 0) {
Craig Tiller58d05a62015-10-02 13:59:31 -0700591 if (fd) {
592 grpc_fd_end_poll(exec_ctx, &fd_watcher, 0, 0);
593 }
Craig Tillera82950e2015-09-22 12:33:20 -0700594 } else {
Craig Tiller58d05a62015-10-02 13:59:31 -0700595 if (pfd[0].revents & POLLIN_CHECK) {
Craig Tiller8afeec82015-09-28 17:03:34 -0700596 grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
597 }
Craig Tiller58d05a62015-10-02 13:59:31 -0700598 if (pfd[1].revents & POLLIN_CHECK) {
Craig Tillera82950e2015-09-22 12:33:20 -0700599 grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd);
ctiller58393c22015-01-07 14:03:30 -0800600 }
Craig Tiller8afeec82015-09-28 17:03:34 -0700601 if (nfds > 2) {
Craig Tiller58d05a62015-10-02 13:59:31 -0700602 grpc_fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN_CHECK,
603 pfd[2].revents & POLLOUT_CHECK);
604 } else if (fd) {
605 grpc_fd_end_poll(exec_ctx, &fd_watcher, 0, 0);
ctiller58393c22015-01-07 14:03:30 -0800606 }
Craig Tillera82950e2015-09-22 12:33:20 -0700607 }
Craig Tillerbae235c2015-10-05 11:33:36 -0700608
609 if (fd) {
610 GRPC_FD_UNREF(fd, "basicpoll_begin");
611 }
ctiller58393c22015-01-07 14:03:30 -0800612}
613
Craig Tillera82950e2015-09-22 12:33:20 -0700614static void basic_pollset_destroy(grpc_pollset *pollset) {
615 if (pollset->data.ptr != NULL) {
616 GRPC_FD_UNREF(pollset->data.ptr, "basicpoll");
617 pollset->data.ptr = NULL;
618 }
ctiller58393c22015-01-07 14:03:30 -0800619}
620
Craig Tiller3f529a2c2015-05-28 14:00:47 -0700621static const grpc_pollset_vtable basic_pollset = {
Craig Tillera82950e2015-09-22 12:33:20 -0700622 basic_pollset_add_fd, basic_pollset_del_fd,
623 basic_pollset_maybe_work_and_unlock, basic_pollset_destroy,
624 basic_pollset_destroy};
ctiller58393c22015-01-07 14:03:30 -0800625
Craig Tillera82950e2015-09-22 12:33:20 -0700626static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null) {
Craig Tiller3f529a2c2015-05-28 14:00:47 -0700627 pollset->vtable = &basic_pollset;
Craig Tiller3f529a2c2015-05-28 14:00:47 -0700628 pollset->data.ptr = fd_or_null;
Craig Tillera82950e2015-09-22 12:33:20 -0700629 if (fd_or_null != NULL) {
630 GRPC_FD_REF(fd_or_null, "basicpoll");
631 }
ctiller58393c22015-01-07 14:03:30 -0800632}
Craig Tillerd14a1a52015-01-21 15:26:29 -0800633
Craig Tiller190d3602015-02-18 09:23:38 -0800634#endif /* GPR_POSIX_POLLSET */