blob: a7105a62e4deb985c708474cd7c5fc727c268051 [file] [log] [blame]
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
murgatroid9954070892016-08-08 17:01:18 -070034#include "src/core/lib/iomgr/port.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070035
Sree Kuchibhotla76a07952016-06-22 15:09:06 -070036/* This polling engine is only relevant on linux kernels supporting epoll() */
murgatroid99623dd4f2016-08-08 17:31:27 -070037#ifdef GRPC_LINUX_EPOLL
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070038
Sree Kuchibhotla4c11a202016-06-06 09:23:25 -070039#include "src/core/lib/iomgr/ev_epoll_linux.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070040
41#include <assert.h>
42#include <errno.h>
43#include <poll.h>
Sree Kuchibhotla4998e302016-08-16 07:26:31 -070044#include <pthread.h>
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070045#include <signal.h>
46#include <string.h>
47#include <sys/epoll.h>
48#include <sys/socket.h>
49#include <unistd.h>
50
51#include <grpc/support/alloc.h>
52#include <grpc/support/log.h>
53#include <grpc/support/string_util.h>
54#include <grpc/support/tls.h>
55#include <grpc/support/useful.h>
56
57#include "src/core/lib/iomgr/ev_posix.h"
58#include "src/core/lib/iomgr/iomgr_internal.h"
Craig Tiller376887d2017-04-06 08:27:03 -070059#include "src/core/lib/iomgr/lockfree_event.h"
Craig Tiller185f6c92017-03-17 08:33:19 -070060#include "src/core/lib/iomgr/timer.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070061#include "src/core/lib/iomgr/wakeup_fd_posix.h"
Craig Tillerb39307d2016-06-30 15:39:13 -070062#include "src/core/lib/iomgr/workqueue.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070063#include "src/core/lib/profiling/timers.h"
64#include "src/core/lib/support/block_annotate.h"
65
Sree Kuchibhotla34217242016-06-29 00:19:07 -070066/* TODO: sreek - Move this to init.c and initialize this like other tracers. */
67static int grpc_polling_trace = 0; /* Disabled by default */
Sree Kuchibhotla1e776682016-06-28 14:09:26 -070068#define GRPC_POLLING_TRACE(fmt, ...) \
69 if (grpc_polling_trace) { \
70 gpr_log(GPR_INFO, (fmt), __VA_ARGS__); \
71 }
72
Sree Kuchibhotla82d73412017-02-09 18:27:45 -080073/* Uncomment the following to enable extra checks on poll_object operations */
Sree Kuchibhotlae6f516e2016-12-08 12:20:23 -080074/* #define PO_DEBUG */
75
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070076static int grpc_wakeup_signal = -1;
77static bool is_grpc_wakeup_signal_initialized = false;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070078
Craig Tillerb4b8e1e2016-11-28 07:33:13 -080079/* TODO: sreek: Right now, this wakes up all pollers. In future we should make
80 * sure to wake up one polling thread (which can wake up other threads if
81 * needed) */
82static grpc_wakeup_fd global_wakeup_fd;
83
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070084/* Implements the function defined in grpc_posix.h. This function might be
85 * called before even calling grpc_init() to set either a different signal to
86 * use. If signum == -1, then the use of signals is disabled */
87void grpc_use_signal(int signum) {
88 grpc_wakeup_signal = signum;
89 is_grpc_wakeup_signal_initialized = true;
90
91 if (grpc_wakeup_signal < 0) {
92 gpr_log(GPR_INFO,
93 "Use of signals is disabled. Epoll engine will not be used");
94 } else {
95 gpr_log(GPR_INFO, "epoll engine will be using signal: %d",
96 grpc_wakeup_signal);
97 }
98}
99
100struct polling_island;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700101
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800102typedef enum {
103 POLL_OBJ_FD,
104 POLL_OBJ_POLLSET,
105 POLL_OBJ_POLLSET_SET
106} poll_obj_type;
107
108typedef struct poll_obj {
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -0800109#ifdef PO_DEBUG
110 poll_obj_type obj_type;
111#endif
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800112 gpr_mu mu;
113 struct polling_island *pi;
114} poll_obj;
115
116const char *poll_obj_string(poll_obj_type po_type) {
117 switch (po_type) {
118 case POLL_OBJ_FD:
119 return "fd";
120 case POLL_OBJ_POLLSET:
121 return "pollset";
122 case POLL_OBJ_POLLSET_SET:
123 return "pollset_set";
124 }
125
126 GPR_UNREACHABLE_CODE(return "UNKNOWN");
127}
128
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700129/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700130 * Fd Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700131 */
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800132
133#define FD_FROM_PO(po) ((grpc_fd *)(po))
134
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700135struct grpc_fd {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800136 poll_obj po;
137
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700138 int fd;
139 /* refst format:
Sree Kuchibhotla5098f912016-05-31 10:58:17 -0700140 bit 0 : 1=Active / 0=Orphaned
141 bits 1-n : refcount
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700142 Ref/Unref by two to avoid altering the orphaned bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700143 gpr_atm refst;
144
Sree Kuchibhotla99983382017-02-12 17:03:27 -0800145 /* The fd is either closed or we relinquished control of it. In either
146 cases, this indicates that the 'fd' on this structure is no longer
147 valid */
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700148 bool orphaned;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700149
Sree Kuchibhotla82d73412017-02-09 18:27:45 -0800150 gpr_atm read_closure;
151 gpr_atm write_closure;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700152
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700153 struct grpc_fd *freelist_next;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700154 grpc_closure *on_done_closure;
155
Sree Kuchibhotla82d73412017-02-09 18:27:45 -0800156 /* The pollset that last noticed that the fd is readable. The actual type
157 * stored in this is (grpc_pollset *) */
158 gpr_atm read_notifier_pollset;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700159
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700160 grpc_iomgr_object iomgr_object;
161};
162
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700163/* Reference counting for fds */
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700164// #define GRPC_FD_REF_COUNT_DEBUG
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700165#ifdef GRPC_FD_REF_COUNT_DEBUG
166static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
167static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
168 int line);
169#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
170#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
171#else
172static void fd_ref(grpc_fd *fd);
173static void fd_unref(grpc_fd *fd);
174#define GRPC_FD_REF(fd, reason) fd_ref(fd)
175#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
176#endif
177
178static void fd_global_init(void);
179static void fd_global_shutdown(void);
180
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700181/*******************************************************************************
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700182 * Polling island Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700183 */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700184
Craig Tillerd8a3c042016-09-09 12:42:37 -0700185#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700186
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700187#define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__)
Craig Tillerb39307d2016-06-30 15:39:13 -0700188#define PI_UNREF(exec_ctx, p, r) \
189 pi_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__)
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700190
Craig Tillerd8a3c042016-09-09 12:42:37 -0700191#else /* defined(GRPC_WORKQUEUE_REFCOUNT_DEBUG) */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700192
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700193#define PI_ADD_REF(p, r) pi_add_ref((p))
Craig Tillerb39307d2016-06-30 15:39:13 -0700194#define PI_UNREF(exec_ctx, p, r) pi_unref((exec_ctx), (p))
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700195
Yuchen Zeng362ac1b2016-09-13 16:01:31 -0700196#endif /* !defined(GRPC_PI_REF_COUNT_DEBUG) */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700197
Craig Tiller460502e2016-10-13 10:02:08 -0700198/* This is also used as grpc_workqueue (by directly casing it) */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700199typedef struct polling_island {
Craig Tiller91031da2016-12-28 15:44:25 -0800200 grpc_closure_scheduler workqueue_scheduler;
201
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700202 gpr_mu mu;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700203 /* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
204 the refcount.
205 Once the ref count becomes zero, this structure is destroyed which means
206 we should ensure that there is never a scenario where a PI_ADD_REF() is
207 racing with a PI_UNREF() that just made the ref_count zero. */
Craig Tiller15007612016-07-06 09:36:16 -0700208 gpr_atm ref_count;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700209
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700210 /* Pointer to the polling_island this merged into.
211 * merged_to value is only set once in polling_island's lifetime (and that too
212 * only if the island is merged with another island). Because of this, we can
213 * use gpr_atm type here so that we can do atomic access on this and reduce
214 * lock contention on 'mu' mutex.
215 *
216 * Note that if this field is not NULL (i.e not 0), all the remaining fields
217 * (except mu and ref_count) are invalid and must be ignored. */
218 gpr_atm merged_to;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700219
Craig Tiller460502e2016-10-13 10:02:08 -0700220 /* Number of threads currently polling on this island */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700221 gpr_atm poller_count;
Craig Tiller460502e2016-10-13 10:02:08 -0700222 /* Mutex guarding the read end of the workqueue (must be held to pop from
223 * workqueue_items) */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700224 gpr_mu workqueue_read_mu;
Craig Tiller460502e2016-10-13 10:02:08 -0700225 /* Queue of closures to be executed */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700226 gpr_mpscq workqueue_items;
Craig Tiller460502e2016-10-13 10:02:08 -0700227 /* Count of items in workqueue_items */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700228 gpr_atm workqueue_item_count;
Craig Tiller460502e2016-10-13 10:02:08 -0700229 /* Wakeup fd used to wake pollers to check the contents of workqueue_items */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700230 grpc_wakeup_fd workqueue_wakeup_fd;
Craig Tillerb39307d2016-06-30 15:39:13 -0700231
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700232 /* The fd of the underlying epoll set */
233 int epoll_fd;
234
235 /* The file descriptors in the epoll set */
236 size_t fd_cnt;
237 size_t fd_capacity;
238 grpc_fd **fds;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700239} polling_island;
240
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700241/*******************************************************************************
242 * Pollset Declarations
243 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700244struct grpc_pollset_worker {
Sree Kuchibhotla34217242016-06-29 00:19:07 -0700245 /* Thread id of this worker */
246 pthread_t pt_id;
247
248 /* Used to prevent a worker from getting kicked multiple times */
249 gpr_atm is_kicked;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700250 struct grpc_pollset_worker *next;
251 struct grpc_pollset_worker *prev;
252};
253
254struct grpc_pollset {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800255 poll_obj po;
256
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700257 grpc_pollset_worker root_worker;
258 bool kicked_without_pollers;
259
260 bool shutting_down; /* Is the pollset shutting down ? */
261 bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
262 grpc_closure *shutdown_done; /* Called after after shutdown is complete */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700263};
264
265/*******************************************************************************
266 * Pollset-set Declarations
267 */
268struct grpc_pollset_set {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800269 poll_obj po;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700270};
271
272/*******************************************************************************
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700273 * Common helpers
274 */
275
Craig Tillerf975f742016-07-01 14:56:27 -0700276static bool append_error(grpc_error **composite, grpc_error *error,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700277 const char *desc) {
Craig Tillerf975f742016-07-01 14:56:27 -0700278 if (error == GRPC_ERROR_NONE) return true;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700279 if (*composite == GRPC_ERROR_NONE) {
Noah Eisen3005ce82017-03-14 13:38:41 -0700280 *composite = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700281 }
282 *composite = grpc_error_add_child(*composite, error);
Craig Tillerf975f742016-07-01 14:56:27 -0700283 return false;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700284}
285
286/*******************************************************************************
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700287 * Polling island Definitions
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700288 */
289
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700290/* The wakeup fd that is used to wake up all threads in a Polling island. This
291 is useful in the polling island merge operation where we need to wakeup all
292 the threads currently polling the smaller polling island (so that they can
293 start polling the new/merged polling island)
294
295 NOTE: This fd is initialized to be readable and MUST NOT be consumed i.e the
296 threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
297static grpc_wakeup_fd polling_island_wakeup_fd;
298
Craig Tiller2e620132016-10-10 15:27:44 -0700299/* The polling island being polled right now.
300 See comments in workqueue_maybe_wakeup for why this is tracked. */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700301static __thread polling_island *g_current_thread_polling_island;
302
Craig Tillerb39307d2016-06-30 15:39:13 -0700303/* Forward declaration */
Craig Tiller2b49ea92016-07-01 13:21:27 -0700304static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi);
Craig Tiller91031da2016-12-28 15:44:25 -0800305static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
306 grpc_error *error);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700307
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700308#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700309/* Currently TSAN may incorrectly flag data races between epoll_ctl and
310 epoll_wait for any grpc_fd structs that are added to the epoll set via
311 epoll_ctl and are returned (within a very short window) via epoll_wait().
312
313 To work-around this race, we establish a happens-before relation between
314 the code just-before epoll_ctl() and the code after epoll_wait() by using
315 this atomic */
316gpr_atm g_epoll_sync;
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700317#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700318
Craig Tiller91031da2016-12-28 15:44:25 -0800319static const grpc_closure_scheduler_vtable workqueue_scheduler_vtable = {
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800320 workqueue_enqueue, workqueue_enqueue, "workqueue"};
Craig Tiller91031da2016-12-28 15:44:25 -0800321
Craig Tillerb39307d2016-06-30 15:39:13 -0700322static void pi_add_ref(polling_island *pi);
323static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700324
Craig Tillerd8a3c042016-09-09 12:42:37 -0700325#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
Craig Tillera10b0b12016-09-09 16:20:07 -0700326static void pi_add_ref_dbg(polling_island *pi, const char *reason,
327 const char *file, int line) {
Craig Tiller15007612016-07-06 09:36:16 -0700328 long old_cnt = gpr_atm_acq_load(&pi->ref_count);
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700329 pi_add_ref(pi);
330 gpr_log(GPR_DEBUG, "Add ref pi: %p, old: %ld -> new:%ld (%s) - (%s, %d)",
331 (void *)pi, old_cnt, old_cnt + 1, reason, file, line);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700332}
333
Craig Tillerb39307d2016-06-30 15:39:13 -0700334static void pi_unref_dbg(grpc_exec_ctx *exec_ctx, polling_island *pi,
Craig Tillera10b0b12016-09-09 16:20:07 -0700335 const char *reason, const char *file, int line) {
Craig Tiller15007612016-07-06 09:36:16 -0700336 long old_cnt = gpr_atm_acq_load(&pi->ref_count);
Craig Tillerb39307d2016-06-30 15:39:13 -0700337 pi_unref(exec_ctx, pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700338 gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700339 (void *)pi, old_cnt, (old_cnt - 1), reason, file, line);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700340}
Craig Tillerd8a3c042016-09-09 12:42:37 -0700341
342static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
343 const char *file, int line,
344 const char *reason) {
345 if (workqueue != NULL) {
Craig Tillera10b0b12016-09-09 16:20:07 -0700346 pi_add_ref_dbg((polling_island *)workqueue, reason, file, line);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700347 }
348 return workqueue;
349}
350
351static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
352 const char *file, int line, const char *reason) {
353 if (workqueue != NULL) {
Craig Tillera10b0b12016-09-09 16:20:07 -0700354 pi_unref_dbg(exec_ctx, (polling_island *)workqueue, reason, file, line);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700355 }
356}
357#else
358static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
359 if (workqueue != NULL) {
360 pi_add_ref((polling_island *)workqueue);
361 }
362 return workqueue;
363}
364
365static void workqueue_unref(grpc_exec_ctx *exec_ctx,
366 grpc_workqueue *workqueue) {
367 if (workqueue != NULL) {
368 pi_unref(exec_ctx, (polling_island *)workqueue);
369 }
370}
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700371#endif
372
Craig Tiller15007612016-07-06 09:36:16 -0700373static void pi_add_ref(polling_island *pi) {
374 gpr_atm_no_barrier_fetch_add(&pi->ref_count, 1);
375}
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700376
Craig Tillerb39307d2016-06-30 15:39:13 -0700377static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi) {
Craig Tillerd8a3c042016-09-09 12:42:37 -0700378 /* If ref count went to zero, delete the polling island.
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700379 Note that this deletion not be done under a lock. Once the ref count goes
380 to zero, we are guaranteed that no one else holds a reference to the
381 polling island (and that there is no racing pi_add_ref() call either).
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700382
383 Also, if we are deleting the polling island and the merged_to field is
384 non-empty, we should remove a ref to the merged_to polling island
385 */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700386 if (1 == gpr_atm_full_fetch_add(&pi->ref_count, -1)) {
387 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
388 polling_island_delete(exec_ctx, pi);
389 if (next != NULL) {
390 PI_UNREF(exec_ctx, next, "pi_delete"); /* Recursive call */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700391 }
392 }
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700393}
394
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700395/* The caller is expected to hold pi->mu lock before calling this function */
396static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700397 size_t fd_count, bool add_fd_refs,
398 grpc_error **error) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700399 int err;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700400 size_t i;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700401 struct epoll_event ev;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700402 char *err_msg;
403 const char *err_desc = "polling_island_add_fds";
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700404
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700405#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700406 /* See the definition of g_epoll_sync for more context */
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700407 gpr_atm_rel_store(&g_epoll_sync, (gpr_atm)0);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700408#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700409
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700410 for (i = 0; i < fd_count; i++) {
411 ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
412 ev.data.ptr = fds[i];
413 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700414
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700415 if (err < 0) {
416 if (errno != EEXIST) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700417 gpr_asprintf(
418 &err_msg,
419 "epoll_ctl (epoll_fd: %d) add fd: %d failed with error: %d (%s)",
420 pi->epoll_fd, fds[i]->fd, errno, strerror(errno));
421 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
422 gpr_free(err_msg);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700423 }
424
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700425 continue;
426 }
427
428 if (pi->fd_cnt == pi->fd_capacity) {
429 pi->fd_capacity = GPR_MAX(pi->fd_capacity + 8, pi->fd_cnt * 3 / 2);
430 pi->fds = gpr_realloc(pi->fds, sizeof(grpc_fd *) * pi->fd_capacity);
431 }
432
433 pi->fds[pi->fd_cnt++] = fds[i];
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700434 if (add_fd_refs) {
435 GRPC_FD_REF(fds[i], "polling_island");
436 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700437 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700438}
439
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700440/* The caller is expected to hold pi->mu before calling this */
441static void polling_island_add_wakeup_fd_locked(polling_island *pi,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700442 grpc_wakeup_fd *wakeup_fd,
443 grpc_error **error) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700444 struct epoll_event ev;
445 int err;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700446 char *err_msg;
447 const char *err_desc = "polling_island_add_wakeup_fd";
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700448
449 ev.events = (uint32_t)(EPOLLIN | EPOLLET);
450 ev.data.ptr = wakeup_fd;
451 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD,
452 GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), &ev);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700453 if (err < 0 && errno != EEXIST) {
454 gpr_asprintf(&err_msg,
455 "epoll_ctl (epoll_fd: %d) add wakeup fd: %d failed with "
456 "error: %d (%s)",
Craig Tiller1fa9ddb2016-11-28 08:19:37 -0800457 pi->epoll_fd, GRPC_WAKEUP_FD_GET_READ_FD(&global_wakeup_fd),
458 errno, strerror(errno));
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700459 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
460 gpr_free(err_msg);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700461 }
462}
463
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700464/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700465static void polling_island_remove_all_fds_locked(polling_island *pi,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700466 bool remove_fd_refs,
467 grpc_error **error) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700468 int err;
469 size_t i;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700470 char *err_msg;
471 const char *err_desc = "polling_island_remove_fds";
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700472
473 for (i = 0; i < pi->fd_cnt; i++) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700474 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, pi->fds[i]->fd, NULL);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700475 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700476 gpr_asprintf(&err_msg,
477 "epoll_ctl (epoll_fd: %d) delete fds[%zu]: %d failed with "
478 "error: %d (%s)",
479 pi->epoll_fd, i, pi->fds[i]->fd, errno, strerror(errno));
480 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
481 gpr_free(err_msg);
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700482 }
483
484 if (remove_fd_refs) {
485 GRPC_FD_UNREF(pi->fds[i], "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700486 }
487 }
488
489 pi->fd_cnt = 0;
490}
491
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700492/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700493static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700494 bool is_fd_closed,
495 grpc_error **error) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700496 int err;
497 size_t i;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700498 char *err_msg;
499 const char *err_desc = "polling_island_remove_fd";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700500
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700501 /* If fd is already closed, then it would have been automatically been removed
502 from the epoll set */
503 if (!is_fd_closed) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700504 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
505 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700506 gpr_asprintf(
507 &err_msg,
508 "epoll_ctl (epoll_fd: %d) del fd: %d failed with error: %d (%s)",
509 pi->epoll_fd, fd->fd, errno, strerror(errno));
510 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
511 gpr_free(err_msg);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700512 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700513 }
514
515 for (i = 0; i < pi->fd_cnt; i++) {
516 if (pi->fds[i] == fd) {
517 pi->fds[i] = pi->fds[--pi->fd_cnt];
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700518 GRPC_FD_UNREF(fd, "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700519 break;
520 }
521 }
522}
523
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700524/* Might return NULL in case of an error */
Craig Tillerb39307d2016-06-30 15:39:13 -0700525static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
526 grpc_fd *initial_fd,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700527 grpc_error **error) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700528 polling_island *pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700529 const char *err_desc = "polling_island_create";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700530
Craig Tillerb39307d2016-06-30 15:39:13 -0700531 *error = GRPC_ERROR_NONE;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700532
Craig Tillerb39307d2016-06-30 15:39:13 -0700533 pi = gpr_malloc(sizeof(*pi));
Craig Tiller91031da2016-12-28 15:44:25 -0800534 pi->workqueue_scheduler.vtable = &workqueue_scheduler_vtable;
Craig Tillerb39307d2016-06-30 15:39:13 -0700535 gpr_mu_init(&pi->mu);
536 pi->fd_cnt = 0;
537 pi->fd_capacity = 0;
538 pi->fds = NULL;
539 pi->epoll_fd = -1;
Craig Tillerd8a3c042016-09-09 12:42:37 -0700540
541 gpr_mu_init(&pi->workqueue_read_mu);
542 gpr_mpscq_init(&pi->workqueue_items);
543 gpr_atm_rel_store(&pi->workqueue_item_count, 0);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700544
Craig Tiller15007612016-07-06 09:36:16 -0700545 gpr_atm_rel_store(&pi->ref_count, 0);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700546 gpr_atm_rel_store(&pi->poller_count, 0);
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700547 gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700548
Craig Tillerd8a3c042016-09-09 12:42:37 -0700549 if (!append_error(error, grpc_wakeup_fd_init(&pi->workqueue_wakeup_fd),
550 err_desc)) {
551 goto done;
552 }
553
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700554 pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700555
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700556 if (pi->epoll_fd < 0) {
Craig Tillerb39307d2016-06-30 15:39:13 -0700557 append_error(error, GRPC_OS_ERROR(errno, "epoll_create1"), err_desc);
558 goto done;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700559 }
560
Craig Tillerb4b8e1e2016-11-28 07:33:13 -0800561 polling_island_add_wakeup_fd_locked(pi, &global_wakeup_fd, error);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700562 polling_island_add_wakeup_fd_locked(pi, &pi->workqueue_wakeup_fd, error);
Craig Tillerb39307d2016-06-30 15:39:13 -0700563
564 if (initial_fd != NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -0700565 polling_island_add_fds_locked(pi, &initial_fd, 1, true, error);
Craig Tillerb39307d2016-06-30 15:39:13 -0700566 }
567
Craig Tillerb39307d2016-06-30 15:39:13 -0700568done:
569 if (*error != GRPC_ERROR_NONE) {
Craig Tiller0a06cd72016-07-14 13:21:24 -0700570 polling_island_delete(exec_ctx, pi);
Craig Tillerb39307d2016-06-30 15:39:13 -0700571 pi = NULL;
572 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700573 return pi;
574}
575
Craig Tillerb39307d2016-06-30 15:39:13 -0700576static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700577 GPR_ASSERT(pi->fd_cnt == 0);
578
Craig Tiller0a06cd72016-07-14 13:21:24 -0700579 if (pi->epoll_fd >= 0) {
580 close(pi->epoll_fd);
581 }
Craig Tillerd8a3c042016-09-09 12:42:37 -0700582 GPR_ASSERT(gpr_atm_no_barrier_load(&pi->workqueue_item_count) == 0);
583 gpr_mu_destroy(&pi->workqueue_read_mu);
584 gpr_mpscq_destroy(&pi->workqueue_items);
Craig Tillerb39307d2016-06-30 15:39:13 -0700585 gpr_mu_destroy(&pi->mu);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700586 grpc_wakeup_fd_destroy(&pi->workqueue_wakeup_fd);
Craig Tillerb39307d2016-06-30 15:39:13 -0700587 gpr_free(pi->fds);
588 gpr_free(pi);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700589}
590
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700591/* Attempts to gets the last polling island in the linked list (liked by the
592 * 'merged_to' field). Since this does not lock the polling island, there are no
593 * guarantees that the island returned is the last island */
594static polling_island *polling_island_maybe_get_latest(polling_island *pi) {
595 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
596 while (next != NULL) {
597 pi = next;
598 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
599 }
600
601 return pi;
602}
603
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700604/* Gets the lock on the *latest* polling island i.e the last polling island in
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700605 the linked list (linked by the 'merged_to' field). Call gpr_mu_unlock on the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700606 returned polling island's mu.
607 Usage: To lock/unlock polling island "pi", do the following:
608 polling_island *pi_latest = polling_island_lock(pi);
609 ...
610 ... critical section ..
611 ...
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700612 gpr_mu_unlock(&pi_latest->mu); // NOTE: use pi_latest->mu. NOT pi->mu */
613static polling_island *polling_island_lock(polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700614 polling_island *next = NULL;
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700615
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700616 while (true) {
617 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
618 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700619 /* Looks like 'pi' is the last node in the linked list but unless we check
620 this by holding the pi->mu lock, we cannot be sure (i.e without the
621 pi->mu lock, we don't prevent island merges).
622 To be absolutely sure, check once more by holding the pi->mu lock */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700623 gpr_mu_lock(&pi->mu);
624 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
625 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700626 /* pi is infact the last node and we have the pi->mu lock. we're done */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700627 break;
628 }
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700629
630 /* pi->merged_to is not NULL i.e pi isn't the last node anymore. pi->mu
631 * isn't the lock we are interested in. Continue traversing the list */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700632 gpr_mu_unlock(&pi->mu);
633 }
634
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700635 pi = next;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700636 }
637
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700638 return pi;
639}
640
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700641/* Gets the lock on the *latest* polling islands in the linked lists pointed by
642 *p and *q (and also updates *p and *q to point to the latest polling islands)
643
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700644 This function is needed because calling the following block of code to obtain
645 locks on polling islands (*p and *q) is prone to deadlocks.
646 {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700647 polling_island_lock(*p, true);
648 polling_island_lock(*q, true);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700649 }
650
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700651 Usage/example:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700652 polling_island *p1;
653 polling_island *p2;
654 ..
655 polling_island_lock_pair(&p1, &p2);
656 ..
657 .. Critical section with both p1 and p2 locked
658 ..
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700659 // Release locks: Always call polling_island_unlock_pair() to release locks
660 polling_island_unlock_pair(p1, p2);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700661*/
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700662static void polling_island_lock_pair(polling_island **p, polling_island **q) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700663 polling_island *pi_1 = *p;
664 polling_island *pi_2 = *q;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700665 polling_island *next_1 = NULL;
666 polling_island *next_2 = NULL;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700667
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700668 /* The algorithm is simple:
669 - Go to the last polling islands in the linked lists *pi_1 and *pi_2 (and
670 keep updating pi_1 and pi_2)
671 - Then obtain locks on the islands by following a lock order rule of
672 locking polling_island with lower address first
673 Special case: Before obtaining the locks, check if pi_1 and pi_2 are
674 pointing to the same island. If that is the case, we can just call
675 polling_island_lock()
676 - After obtaining both the locks, double check that the polling islands
677 are still the last polling islands in their respective linked lists
678 (this is because there might have been polling island merges before
679 we got the lock)
680 - If the polling islands are the last islands, we are done. If not,
681 release the locks and continue the process from the first step */
682 while (true) {
683 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
684 while (next_1 != NULL) {
685 pi_1 = next_1;
686 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700687 }
688
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700689 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
690 while (next_2 != NULL) {
691 pi_2 = next_2;
692 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
693 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700694
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700695 if (pi_1 == pi_2) {
696 pi_1 = pi_2 = polling_island_lock(pi_1);
697 break;
698 }
699
700 if (pi_1 < pi_2) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700701 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700702 gpr_mu_lock(&pi_2->mu);
703 } else {
704 gpr_mu_lock(&pi_2->mu);
705 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700706 }
707
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700708 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
709 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
710 if (next_1 == NULL && next_2 == NULL) {
711 break;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700712 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700713
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700714 gpr_mu_unlock(&pi_1->mu);
715 gpr_mu_unlock(&pi_2->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700716 }
717
718 *p = pi_1;
719 *q = pi_2;
720}
721
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700722static void polling_island_unlock_pair(polling_island *p, polling_island *q) {
723 if (p == q) {
724 gpr_mu_unlock(&p->mu);
725 } else {
726 gpr_mu_unlock(&p->mu);
727 gpr_mu_unlock(&q->mu);
728 }
729}
730
Craig Tillerd8a3c042016-09-09 12:42:37 -0700731static void workqueue_maybe_wakeup(polling_island *pi) {
Craig Tiller2e620132016-10-10 15:27:44 -0700732 /* If this thread is the current poller, then it may be that it's about to
733 decrement the current poller count, so we need to look past this thread */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700734 bool is_current_poller = (g_current_thread_polling_island == pi);
735 gpr_atm min_current_pollers_for_wakeup = is_current_poller ? 1 : 0;
736 gpr_atm current_pollers = gpr_atm_no_barrier_load(&pi->poller_count);
Craig Tiller2e620132016-10-10 15:27:44 -0700737 /* Only issue a wakeup if it's likely that some poller could come in and take
738 it right now. Note that since we do an anticipatory mpscq_pop every poll
739 loop, it's ok if we miss the wakeup here, as we'll get the work item when
740 the next poller enters anyway. */
741 if (current_pollers > min_current_pollers_for_wakeup) {
Craig Tillerd8a3c042016-09-09 12:42:37 -0700742 GRPC_LOG_IF_ERROR("workqueue_wakeup_fd",
743 grpc_wakeup_fd_wakeup(&pi->workqueue_wakeup_fd));
744 }
745}
746
747static void workqueue_move_items_to_parent(polling_island *q) {
748 polling_island *p = (polling_island *)gpr_atm_no_barrier_load(&q->merged_to);
749 if (p == NULL) {
750 return;
751 }
752 gpr_mu_lock(&q->workqueue_read_mu);
753 int num_added = 0;
754 while (gpr_atm_no_barrier_load(&q->workqueue_item_count) > 0) {
755 gpr_mpscq_node *n = gpr_mpscq_pop(&q->workqueue_items);
756 if (n != NULL) {
757 gpr_atm_no_barrier_fetch_add(&q->workqueue_item_count, -1);
758 gpr_atm_no_barrier_fetch_add(&p->workqueue_item_count, 1);
759 gpr_mpscq_push(&p->workqueue_items, n);
760 num_added++;
761 }
762 }
763 gpr_mu_unlock(&q->workqueue_read_mu);
764 if (num_added > 0) {
765 workqueue_maybe_wakeup(p);
766 }
767 workqueue_move_items_to_parent(p);
768}
769
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700770static polling_island *polling_island_merge(polling_island *p,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700771 polling_island *q,
772 grpc_error **error) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700773 /* Get locks on both the polling islands */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700774 polling_island_lock_pair(&p, &q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700775
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700776 if (p != q) {
777 /* Make sure that p points to the polling island with fewer fds than q */
778 if (p->fd_cnt > q->fd_cnt) {
779 GPR_SWAP(polling_island *, p, q);
780 }
781
782 /* Merge p with q i.e move all the fds from p (The one with fewer fds) to q
783 Note that the refcounts on the fds being moved will not change here.
784 This is why the last param in the following two functions is 'false') */
785 polling_island_add_fds_locked(q, p->fds, p->fd_cnt, false, error);
786 polling_island_remove_all_fds_locked(p, false, error);
787
788 /* Wakeup all the pollers (if any) on p so that they pickup this change */
789 polling_island_add_wakeup_fd_locked(p, &polling_island_wakeup_fd, error);
790
791 /* Add the 'merged_to' link from p --> q */
792 gpr_atm_rel_store(&p->merged_to, (gpr_atm)q);
793 PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700794
Harvey Tuchdaa9f452016-11-21 15:42:49 -0500795 workqueue_move_items_to_parent(p);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700796 }
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700797 /* else if p == q, nothing needs to be done */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700798
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700799 polling_island_unlock_pair(p, q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700800
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700801 /* Return the merged polling island (Note that no merge would have happened
802 if p == q which is ok) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700803 return q;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700804}
805
Craig Tiller91031da2016-12-28 15:44:25 -0800806static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
Craig Tillerd8a3c042016-09-09 12:42:37 -0700807 grpc_error *error) {
Craig Tillerd8a3c042016-09-09 12:42:37 -0700808 GPR_TIMER_BEGIN("workqueue.enqueue", 0);
Craig Tiller91031da2016-12-28 15:44:25 -0800809 grpc_workqueue *workqueue = (grpc_workqueue *)closure->scheduler;
Craig Tiller1dc5dbb2016-09-09 17:09:39 -0700810 /* take a ref to the workqueue: otherwise it can happen that whatever events
811 * this kicks off ends up destroying the workqueue before this function
812 * completes */
813 GRPC_WORKQUEUE_REF(workqueue, "enqueue");
814 polling_island *pi = (polling_island *)workqueue;
Craig Tillerd8a3c042016-09-09 12:42:37 -0700815 gpr_atm last = gpr_atm_no_barrier_fetch_add(&pi->workqueue_item_count, 1);
816 closure->error_data.error = error;
817 gpr_mpscq_push(&pi->workqueue_items, &closure->next_data.atm_next);
818 if (last == 0) {
819 workqueue_maybe_wakeup(pi);
820 }
Craig Tillerd8a3c042016-09-09 12:42:37 -0700821 workqueue_move_items_to_parent(pi);
Craig Tiller1dc5dbb2016-09-09 17:09:39 -0700822 GRPC_WORKQUEUE_UNREF(exec_ctx, workqueue, "enqueue");
823 GPR_TIMER_END("workqueue.enqueue", 0);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700824}
825
Craig Tiller91031da2016-12-28 15:44:25 -0800826static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) {
827 polling_island *pi = (polling_island *)workqueue;
Craig Tiller801c6cc2017-01-03 08:13:13 -0800828 return workqueue == NULL ? grpc_schedule_on_exec_ctx
829 : &pi->workqueue_scheduler;
Craig Tiller91031da2016-12-28 15:44:25 -0800830}
831
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700832static grpc_error *polling_island_global_init() {
833 grpc_error *error = GRPC_ERROR_NONE;
834
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700835 error = grpc_wakeup_fd_init(&polling_island_wakeup_fd);
836 if (error == GRPC_ERROR_NONE) {
837 error = grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd);
838 }
839
840 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700841}
842
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700843static void polling_island_global_shutdown() {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700844 grpc_wakeup_fd_destroy(&polling_island_wakeup_fd);
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700845}
846
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700847/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700848 * Fd Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700849 */
850
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700851/* We need to keep a freelist not because of any concerns of malloc performance
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700852 * but instead so that implementations with multiple threads in (for example)
853 * epoll_wait deal with the race between pollset removal and incoming poll
854 * notifications.
855 *
856 * The problem is that the poller ultimately holds a reference to this
857 * object, so it is very difficult to know when is safe to free it, at least
858 * without some expensive synchronization.
859 *
860 * If we keep the object freelisted, in the worst case losing this race just
861 * becomes a spurious read notification on a reused fd.
862 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700863
864/* The alarm system needs to be able to wakeup 'some poller' sometimes
865 * (specifically when a new alarm needs to be triggered earlier than the next
866 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
867 * case occurs. */
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700868
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700869static grpc_fd *fd_freelist = NULL;
870static gpr_mu fd_freelist_mu;
871
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700872#ifdef GRPC_FD_REF_COUNT_DEBUG
873#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
874#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
875static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
876 int line) {
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700877 gpr_log(GPR_DEBUG, "FD %d %p ref %d %ld -> %ld [%s; %s:%d]", fd->fd,
878 (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700879 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
880#else
881#define REF_BY(fd, n, reason) ref_by(fd, n)
882#define UNREF_BY(fd, n, reason) unref_by(fd, n)
883static void ref_by(grpc_fd *fd, int n) {
884#endif
885 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
886}
887
888#ifdef GRPC_FD_REF_COUNT_DEBUG
889static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
890 int line) {
891 gpr_atm old;
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700892 gpr_log(GPR_DEBUG, "FD %d %p unref %d %ld -> %ld [%s; %s:%d]", fd->fd,
893 (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700894 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
895#else
896static void unref_by(grpc_fd *fd, int n) {
897 gpr_atm old;
898#endif
899 old = gpr_atm_full_fetch_add(&fd->refst, -n);
900 if (old == n) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700901 /* Add the fd to the freelist */
902 gpr_mu_lock(&fd_freelist_mu);
903 fd->freelist_next = fd_freelist;
904 fd_freelist = fd;
905 grpc_iomgr_unregister_object(&fd->iomgr_object);
Sree Kuchibhotla82d73412017-02-09 18:27:45 -0800906
Craig Tiller376887d2017-04-06 08:27:03 -0700907 grpc_lfev_destroy(&fd->read_closure);
908 grpc_lfev_destroy(&fd->write_closure);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700909
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700910 gpr_mu_unlock(&fd_freelist_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700911 } else {
912 GPR_ASSERT(old > n);
913 }
914}
915
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700916/* Increment refcount by two to avoid changing the orphan bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700917#ifdef GRPC_FD_REF_COUNT_DEBUG
918static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
919 int line) {
920 ref_by(fd, 2, reason, file, line);
921}
922
923static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
924 int line) {
925 unref_by(fd, 2, reason, file, line);
926}
927#else
928static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700929static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
930#endif
931
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700932static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
933
934static void fd_global_shutdown(void) {
935 gpr_mu_lock(&fd_freelist_mu);
936 gpr_mu_unlock(&fd_freelist_mu);
937 while (fd_freelist != NULL) {
938 grpc_fd *fd = fd_freelist;
939 fd_freelist = fd_freelist->freelist_next;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800940 gpr_mu_destroy(&fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700941 gpr_free(fd);
942 }
943 gpr_mu_destroy(&fd_freelist_mu);
944}
945
946static grpc_fd *fd_create(int fd, const char *name) {
947 grpc_fd *new_fd = NULL;
948
949 gpr_mu_lock(&fd_freelist_mu);
950 if (fd_freelist != NULL) {
951 new_fd = fd_freelist;
952 fd_freelist = fd_freelist->freelist_next;
953 }
954 gpr_mu_unlock(&fd_freelist_mu);
955
956 if (new_fd == NULL) {
957 new_fd = gpr_malloc(sizeof(grpc_fd));
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800958 gpr_mu_init(&new_fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700959 }
960
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800961 /* Note: It is not really needed to get the new_fd->po.mu lock here. If this
962 * is a newly created fd (or an fd we got from the freelist), no one else
963 * would be holding a lock to it anyway. */
964 gpr_mu_lock(&new_fd->po.mu);
965 new_fd->po.pi = NULL;
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -0800966#ifdef PO_DEBUG
967 new_fd->po.obj_type = POLL_OBJ_FD;
968#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700969
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700970 gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700971 new_fd->fd = fd;
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -0800972 gpr_atm_no_barrier_store(&new_fd->shutdown_error, (gpr_atm)GRPC_ERROR_NONE);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700973 new_fd->orphaned = false;
Craig Tiller376887d2017-04-06 08:27:03 -0700974 grpc_lfev_init(&new_fd->read_closure);
975 grpc_lfev_init(&new_fd->write_closure);
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -0800976 gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
Sree Kuchibhotla82d73412017-02-09 18:27:45 -0800977
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700978 new_fd->freelist_next = NULL;
979 new_fd->on_done_closure = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700980
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800981 gpr_mu_unlock(&new_fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700982
983 char *fd_name;
984 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
985 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700986#ifdef GRPC_FD_REF_COUNT_DEBUG
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700987 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, (void *)new_fd, fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700988#endif
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700989 gpr_free(fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700990 return new_fd;
991}
992
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700993static int fd_wrapped_fd(grpc_fd *fd) {
994 int ret_fd = -1;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800995 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700996 if (!fd->orphaned) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700997 ret_fd = fd->fd;
998 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800999 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001000
1001 return ret_fd;
1002}
1003
1004static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1005 grpc_closure *on_done, int *release_fd,
1006 const char *reason) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001007 bool is_fd_closed = false;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001008 grpc_error *error = GRPC_ERROR_NONE;
Craig Tiller15007612016-07-06 09:36:16 -07001009 polling_island *unref_pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001010
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001011 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001012 fd->on_done_closure = on_done;
1013
1014 /* If release_fd is not NULL, we should be relinquishing control of the file
1015 descriptor fd->fd (but we still own the grpc_fd structure). */
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001016 if (release_fd != NULL) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001017 *release_fd = fd->fd;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001018 } else {
1019 close(fd->fd);
1020 is_fd_closed = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001021 }
1022
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001023 fd->orphaned = true;
1024
1025 /* Remove the active status but keep referenced. We want this grpc_fd struct
1026 to be alive (and not added to freelist) until the end of this function */
1027 REF_BY(fd, 1, reason);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001028
1029 /* Remove the fd from the polling island:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001030 - Get a lock on the latest polling island (i.e the last island in the
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001031 linked list pointed by fd->po.pi). This is the island that
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001032 would actually contain the fd
1033 - Remove the fd from the latest polling island
1034 - Unlock the latest polling island
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001035 - Set fd->po.pi to NULL (but remove the ref on the polling island
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001036 before doing this.) */
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001037 if (fd->po.pi != NULL) {
1038 polling_island *pi_latest = polling_island_lock(fd->po.pi);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001039 polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed, &error);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001040 gpr_mu_unlock(&pi_latest->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001041
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001042 unref_pi = fd->po.pi;
1043 fd->po.pi = NULL;
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001044 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001045
Craig Tiller91031da2016-12-28 15:44:25 -08001046 grpc_closure_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error));
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001047
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001048 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001049 UNREF_BY(fd, 2, reason); /* Drop the reference */
Craig Tiller15007612016-07-06 09:36:16 -07001050 if (unref_pi != NULL) {
Craig Tiller0a06cd72016-07-14 13:21:24 -07001051 /* Unref stale polling island here, outside the fd lock above.
1052 The polling island owns a workqueue which owns an fd, and unreffing
1053 inside the lock can cause an eventual lock loop that makes TSAN very
1054 unhappy. */
Craig Tiller15007612016-07-06 09:36:16 -07001055 PI_UNREF(exec_ctx, unref_pi, "fd_orphan");
1056 }
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001057 GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error));
Yuchen Zenga0399f22016-08-04 17:52:53 -07001058 GRPC_ERROR_UNREF(error);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001059}
1060
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001061static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
1062 grpc_fd *fd) {
Sree Kuchibhotlafb7ced62017-02-15 18:31:57 -08001063 gpr_atm notifier = gpr_atm_acq_load(&fd->read_notifier_pollset);
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001064 return (grpc_pollset *)notifier;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001065}
1066
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001067static bool fd_is_shutdown(grpc_fd *fd) {
Craig Tiller376887d2017-04-06 08:27:03 -07001068 return grpc_lfev_is_shutdown(&fd->read_closure);
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001069}
1070
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001071/* Might be called multiple times */
Craig Tillercda759d2017-01-27 11:37:37 -08001072static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
Craig Tiller376887d2017-04-06 08:27:03 -07001073 if (grpc_lfev_set_shutdown(&fd->read_closure, GRPC_ERROR_REF(why))) {
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001074 shutdown(fd->fd, SHUT_RDWR);
Craig Tiller376887d2017-04-06 08:27:03 -07001075 grpc_lfev_set_shutdown(&fd->write_closure, GRPC_ERROR_REF(why));
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001076 }
Craig Tiller376887d2017-04-06 08:27:03 -07001077 GRPC_ERROR_UNREF(why);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001078}
1079
1080static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1081 grpc_closure *closure) {
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001082 notify_on(exec_ctx, fd, &fd->read_closure, closure);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001083}
1084
1085static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1086 grpc_closure *closure) {
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001087 notify_on(exec_ctx, fd, &fd->write_closure, closure);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001088}
1089
Craig Tillerd6ba6192016-06-30 15:42:41 -07001090static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001091 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001092 grpc_workqueue *workqueue =
1093 GRPC_WORKQUEUE_REF((grpc_workqueue *)fd->po.pi, "fd_get_workqueue");
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001094 gpr_mu_unlock(&fd->po.mu);
Craig Tillerd6ba6192016-06-30 15:42:41 -07001095 return workqueue;
1096}
Craig Tiller70bd4832016-06-30 14:20:46 -07001097
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001098/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001099 * Pollset Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001100 */
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001101GPR_TLS_DECL(g_current_thread_pollset);
1102GPR_TLS_DECL(g_current_thread_worker);
Craig Tiller19196992016-06-27 18:45:56 -07001103static __thread bool g_initialized_sigmask;
1104static __thread sigset_t g_orig_sigmask;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001105
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001106static void sig_handler(int sig_num) {
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001107#ifdef GRPC_EPOLL_DEBUG
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001108 gpr_log(GPR_INFO, "Received signal %d", sig_num);
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -07001109#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001110}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001111
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001112static void poller_kick_init() { signal(grpc_wakeup_signal, sig_handler); }
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001113
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001114/* Global state management */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001115static grpc_error *pollset_global_init(void) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001116 gpr_tls_init(&g_current_thread_pollset);
1117 gpr_tls_init(&g_current_thread_worker);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001118 poller_kick_init();
Craig Tillerb4b8e1e2016-11-28 07:33:13 -08001119 return grpc_wakeup_fd_init(&global_wakeup_fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001120}
1121
1122static void pollset_global_shutdown(void) {
Craig Tillerb4b8e1e2016-11-28 07:33:13 -08001123 grpc_wakeup_fd_destroy(&global_wakeup_fd);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001124 gpr_tls_destroy(&g_current_thread_pollset);
1125 gpr_tls_destroy(&g_current_thread_worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001126}
1127
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001128static grpc_error *pollset_worker_kick(grpc_pollset_worker *worker) {
1129 grpc_error *err = GRPC_ERROR_NONE;
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001130
1131 /* Kick the worker only if it was not already kicked */
1132 if (gpr_atm_no_barrier_cas(&worker->is_kicked, (gpr_atm)0, (gpr_atm)1)) {
1133 GRPC_POLLING_TRACE(
1134 "pollset_worker_kick: Kicking worker: %p (thread id: %ld)",
1135 (void *)worker, worker->pt_id);
1136 int err_num = pthread_kill(worker->pt_id, grpc_wakeup_signal);
1137 if (err_num != 0) {
1138 err = GRPC_OS_ERROR(err_num, "pthread_kill");
1139 }
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001140 }
1141 return err;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001142}
1143
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001144/* Return 1 if the pollset has active threads in pollset_work (pollset must
1145 * be locked) */
1146static int pollset_has_workers(grpc_pollset *p) {
1147 return p->root_worker.next != &p->root_worker;
1148}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001149
1150static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1151 worker->prev->next = worker->next;
1152 worker->next->prev = worker->prev;
1153}
1154
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001155static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
1156 if (pollset_has_workers(p)) {
1157 grpc_pollset_worker *w = p->root_worker.next;
1158 remove_worker(p, w);
1159 return w;
1160 } else {
1161 return NULL;
1162 }
1163}
1164
1165static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1166 worker->next = &p->root_worker;
1167 worker->prev = worker->next->prev;
1168 worker->prev->next = worker->next->prev = worker;
1169}
1170
1171static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1172 worker->prev = &p->root_worker;
1173 worker->next = worker->prev->next;
1174 worker->prev->next = worker->next->prev = worker;
1175}
1176
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001177/* p->mu must be held before calling this function */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001178static grpc_error *pollset_kick(grpc_pollset *p,
1179 grpc_pollset_worker *specific_worker) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001180 GPR_TIMER_BEGIN("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001181 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001182 const char *err_desc = "Kick Failure";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001183 grpc_pollset_worker *worker = specific_worker;
1184 if (worker != NULL) {
1185 if (worker == GRPC_POLLSET_KICK_BROADCAST) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001186 if (pollset_has_workers(p)) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001187 GPR_TIMER_BEGIN("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001188 for (worker = p->root_worker.next; worker != &p->root_worker;
1189 worker = worker->next) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001190 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001191 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001192 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001193 }
Craig Tillera218a062016-06-26 09:58:37 -07001194 GPR_TIMER_END("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001195 } else {
1196 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001197 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001198 } else {
1199 GPR_TIMER_MARK("kicked_specifically", 0);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001200 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001201 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001202 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001203 }
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001204 } else if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)p) {
1205 /* Since worker == NULL, it means that we can kick "any" worker on this
1206 pollset 'p'. If 'p' happens to be the same pollset this thread is
1207 currently polling (i.e in pollset_work() function), then there is no need
1208 to kick any other worker since the current thread can just absorb the
1209 kick. This is the reason why we enter this case only when
1210 g_current_thread_pollset is != p */
1211
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001212 GPR_TIMER_MARK("kick_anonymous", 0);
1213 worker = pop_front_worker(p);
1214 if (worker != NULL) {
1215 GPR_TIMER_MARK("finally_kick", 0);
1216 push_back_worker(p, worker);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001217 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001218 } else {
1219 GPR_TIMER_MARK("kicked_no_pollers", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001220 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001221 }
1222 }
1223
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001224 GPR_TIMER_END("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001225 GRPC_LOG_IF_ERROR("pollset_kick", GRPC_ERROR_REF(error));
1226 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001227}
1228
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001229static grpc_error *kick_poller(void) {
Craig Tillerb4b8e1e2016-11-28 07:33:13 -08001230 return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001231}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001232
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001233static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001234 gpr_mu_init(&pollset->po.mu);
1235 *mu = &pollset->po.mu;
1236 pollset->po.pi = NULL;
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001237#ifdef PO_DEBUG
1238 pollset->po.obj_type = POLL_OBJ_POLLSET;
1239#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001240
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001241 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001242 pollset->kicked_without_pollers = false;
1243
1244 pollset->shutting_down = false;
1245 pollset->finish_shutdown_called = false;
1246 pollset->shutdown_done = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001247}
1248
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001249/* Convert a timespec to milliseconds:
1250 - Very small or negative poll times are clamped to zero to do a non-blocking
1251 poll (which becomes spin polling)
1252 - Other small values are rounded up to one millisecond
1253 - Longer than a millisecond polls are rounded up to the next nearest
1254 millisecond to avoid spinning
1255 - Infinite timeouts are converted to -1 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001256static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
1257 gpr_timespec now) {
1258 gpr_timespec timeout;
1259 static const int64_t max_spin_polling_us = 10;
1260 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
1261 return -1;
1262 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001263
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001264 if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
1265 max_spin_polling_us,
1266 GPR_TIMESPAN))) <= 0) {
1267 return 0;
1268 }
1269 timeout = gpr_time_sub(deadline, now);
Craig Tiller799e7e82017-03-27 12:42:34 -07001270 int millis = gpr_time_to_millis(gpr_time_add(
1271 timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
1272 return millis >= 1 ? millis : 1;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001273}
1274
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001275static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1276 grpc_pollset *notifier) {
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001277 set_ready(exec_ctx, fd, &fd->read_closure);
1278
Sree Kuchibhotla4db8c822017-02-12 17:07:31 -08001279 /* Note, it is possible that fd_become_readable might be called twice with
Sree Kuchibhotla4c60d0d2017-02-12 17:09:08 -08001280 different 'notifier's when an fd becomes readable and it is in two epoll
1281 sets (This can happen briefly during polling island merges). In such cases
1282 it does not really matter which notifer is set as the read_notifier_pollset
1283 (They would both point to the same polling island anyway) */
Sree Kuchibhotlafb7ced62017-02-15 18:31:57 -08001284 /* Use release store to match with acquire load in fd_get_read_notifier */
1285 gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001286}
1287
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001288static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001289 set_ready(exec_ctx, fd, &fd->write_closure);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001290}
1291
Craig Tillerb39307d2016-06-30 15:39:13 -07001292static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx,
1293 grpc_pollset *ps, char *reason) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001294 if (ps->po.pi != NULL) {
1295 PI_UNREF(exec_ctx, ps->po.pi, reason);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001296 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001297 ps->po.pi = NULL;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001298}
1299
1300static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
1301 grpc_pollset *pollset) {
1302 /* The pollset cannot have any workers if we are at this stage */
1303 GPR_ASSERT(!pollset_has_workers(pollset));
1304
1305 pollset->finish_shutdown_called = true;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001306
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001307 /* Release the ref and set pollset->po.pi to NULL */
Craig Tillerb39307d2016-06-30 15:39:13 -07001308 pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown");
Craig Tiller91031da2016-12-28 15:44:25 -08001309 grpc_closure_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001310}
1311
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001312/* pollset->po.mu lock must be held by the caller before calling this */
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001313static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1314 grpc_closure *closure) {
1315 GPR_TIMER_BEGIN("pollset_shutdown", 0);
1316 GPR_ASSERT(!pollset->shutting_down);
1317 pollset->shutting_down = true;
1318 pollset->shutdown_done = closure;
1319 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1320
1321 /* If the pollset has any workers, we cannot call finish_shutdown_locked()
1322 because it would release the underlying polling island. In such a case, we
1323 let the last worker call finish_shutdown_locked() from pollset_work() */
1324 if (!pollset_has_workers(pollset)) {
1325 GPR_ASSERT(!pollset->finish_shutdown_called);
1326 GPR_TIMER_MARK("pollset_shutdown.finish_shutdown_locked", 0);
1327 finish_shutdown_locked(exec_ctx, pollset);
1328 }
1329 GPR_TIMER_END("pollset_shutdown", 0);
1330}
1331
1332/* pollset_shutdown is guaranteed to be called before pollset_destroy. So other
1333 * than destroying the mutexes, there is nothing special that needs to be done
1334 * here */
1335static void pollset_destroy(grpc_pollset *pollset) {
1336 GPR_ASSERT(!pollset_has_workers(pollset));
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001337 gpr_mu_destroy(&pollset->po.mu);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001338}
1339
Craig Tillerd8a3c042016-09-09 12:42:37 -07001340static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx,
1341 polling_island *pi) {
1342 if (gpr_mu_trylock(&pi->workqueue_read_mu)) {
1343 gpr_mpscq_node *n = gpr_mpscq_pop(&pi->workqueue_items);
1344 gpr_mu_unlock(&pi->workqueue_read_mu);
1345 if (n != NULL) {
1346 if (gpr_atm_full_fetch_add(&pi->workqueue_item_count, -1) > 1) {
1347 workqueue_maybe_wakeup(pi);
1348 }
1349 grpc_closure *c = (grpc_closure *)n;
Craig Tiller061ef742016-12-29 10:54:09 -08001350 grpc_error *error = c->error_data.error;
1351 c->cb(exec_ctx, c->cb_arg, error);
1352 GRPC_ERROR_UNREF(error);
Craig Tillerd8a3c042016-09-09 12:42:37 -07001353 return true;
1354 } else if (gpr_atm_no_barrier_load(&pi->workqueue_item_count) > 0) {
Craig Tiller460502e2016-10-13 10:02:08 -07001355 /* n == NULL might mean there's work but it's not available to be popped
1356 * yet - try to ensure another workqueue wakes up to check shortly if so
1357 */
Craig Tillerd8a3c042016-09-09 12:42:37 -07001358 workqueue_maybe_wakeup(pi);
1359 }
1360 }
1361 return false;
1362}
1363
Craig Tiller84ea3412016-09-08 14:57:56 -07001364#define GRPC_EPOLL_MAX_EVENTS 100
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001365/* Note: sig_mask contains the signal mask to use *during* epoll_wait() */
1366static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001367 grpc_pollset *pollset,
1368 grpc_pollset_worker *worker, int timeout_ms,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001369 sigset_t *sig_mask, grpc_error **error) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001370 struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001371 int epoll_fd = -1;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001372 int ep_rv;
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001373 polling_island *pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001374 char *err_msg;
1375 const char *err_desc = "pollset_work_and_unlock";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001376 GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
1377
1378 /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001379 latest polling island pointed by pollset->po.pi
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001380
1381 Since epoll_fd is immutable, we can read it without obtaining the polling
1382 island lock. There is however a possibility that the polling island (from
1383 which we got the epoll_fd) got merged with another island while we are
1384 in this function. This is still okay because in such a case, we will wakeup
1385 right-away from epoll_wait() and pick up the latest polling_island the next
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001386 this function (i.e pollset_work_and_unlock()) is called */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001387
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001388 if (pollset->po.pi == NULL) {
1389 pollset->po.pi = polling_island_create(exec_ctx, NULL, error);
1390 if (pollset->po.pi == NULL) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001391 GPR_TIMER_END("pollset_work_and_unlock", 0);
1392 return; /* Fatal error. We cannot continue */
1393 }
1394
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001395 PI_ADD_REF(pollset->po.pi, "ps");
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001396 GRPC_POLLING_TRACE("pollset_work: pollset: %p created new pi: %p",
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001397 (void *)pollset, (void *)pollset->po.pi);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001398 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001399
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001400 pi = polling_island_maybe_get_latest(pollset->po.pi);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001401 epoll_fd = pi->epoll_fd;
1402
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001403 /* Update the pollset->po.pi since the island being pointed by
1404 pollset->po.pi maybe older than the one pointed by pi) */
1405 if (pollset->po.pi != pi) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001406 /* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the
1407 polling island to be deleted */
1408 PI_ADD_REF(pi, "ps");
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001409 PI_UNREF(exec_ctx, pollset->po.pi, "ps");
1410 pollset->po.pi = pi;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001411 }
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001412
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001413 /* Add an extra ref so that the island does not get destroyed (which means
1414 the epoll_fd won't be closed) while we are are doing an epoll_wait() on the
1415 epoll_fd */
1416 PI_ADD_REF(pi, "ps_work");
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001417 gpr_mu_unlock(&pollset->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001418
Craig Tiller460502e2016-10-13 10:02:08 -07001419 /* If we get some workqueue work to do, it might end up completing an item on
1420 the completion queue, so there's no need to poll... so we skip that and
1421 redo the complete loop to verify */
Craig Tillerd8a3c042016-09-09 12:42:37 -07001422 if (!maybe_do_workqueue_work(exec_ctx, pi)) {
1423 gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1);
1424 g_current_thread_polling_island = pi;
1425
Vijay Paicef54012016-08-28 23:05:31 -07001426 GRPC_SCHEDULING_START_BLOCKING_REGION;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001427 ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms,
1428 sig_mask);
Vijay Paicef54012016-08-28 23:05:31 -07001429 GRPC_SCHEDULING_END_BLOCKING_REGION;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001430 if (ep_rv < 0) {
1431 if (errno != EINTR) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001432 gpr_asprintf(&err_msg,
1433 "epoll_wait() epoll fd: %d failed with error: %d (%s)",
1434 epoll_fd, errno, strerror(errno));
1435 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001436 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001437 /* We were interrupted. Save an interation by doing a zero timeout
1438 epoll_wait to see if there are any other events of interest */
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001439 GRPC_POLLING_TRACE(
1440 "pollset_work: pollset: %p, worker: %p received kick",
1441 (void *)pollset, (void *)worker);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001442 ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001443 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001444 }
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001445
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001446#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001447 /* See the definition of g_poll_sync for more details */
1448 gpr_atm_acq_load(&g_epoll_sync);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001449#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001450
Sree Kuchibhotla58e58962016-06-13 00:52:56 -07001451 for (int i = 0; i < ep_rv; ++i) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001452 void *data_ptr = ep_ev[i].data.ptr;
Craig Tillerb4b8e1e2016-11-28 07:33:13 -08001453 if (data_ptr == &global_wakeup_fd) {
Craig Tiller185f6c92017-03-17 08:33:19 -07001454 grpc_timer_consume_kick();
Craig Tiller1fa9ddb2016-11-28 08:19:37 -08001455 append_error(error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001456 err_desc);
Craig Tillerd8a3c042016-09-09 12:42:37 -07001457 } else if (data_ptr == &pi->workqueue_wakeup_fd) {
Craig Tillere49959d2017-01-26 08:39:38 -08001458 append_error(error,
1459 grpc_wakeup_fd_consume_wakeup(&pi->workqueue_wakeup_fd),
Craig Tillerd8a3c042016-09-09 12:42:37 -07001460 err_desc);
1461 maybe_do_workqueue_work(exec_ctx, pi);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001462 } else if (data_ptr == &polling_island_wakeup_fd) {
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001463 GRPC_POLLING_TRACE(
1464 "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: "
1465 "%d) got merged",
1466 (void *)pollset, (void *)worker, epoll_fd);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001467 /* This means that our polling island is merged with a different
1468 island. We do not have to do anything here since the subsequent call
1469 to the function pollset_work_and_unlock() will pick up the correct
1470 epoll_fd */
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001471 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001472 grpc_fd *fd = data_ptr;
1473 int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
1474 int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
1475 int write_ev = ep_ev[i].events & EPOLLOUT;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001476 if (read_ev || cancel) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001477 fd_become_readable(exec_ctx, fd, pollset);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001478 }
1479 if (write_ev || cancel) {
1480 fd_become_writable(exec_ctx, fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001481 }
1482 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001483 }
Craig Tillerd8a3c042016-09-09 12:42:37 -07001484
1485 g_current_thread_polling_island = NULL;
1486 gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1);
1487 }
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001488
1489 GPR_ASSERT(pi != NULL);
1490
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001491 /* Before leaving, release the extra ref we added to the polling island. It
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001492 is important to use "pi" here (i.e our old copy of pollset->po.pi
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001493 that we got before releasing the polling island lock). This is because
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001494 pollset->po.pi pointer might get udpated in other parts of the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001495 code when there is an island merge while we are doing epoll_wait() above */
Craig Tillerb39307d2016-06-30 15:39:13 -07001496 PI_UNREF(exec_ctx, pi, "ps_work");
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001497
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001498 GPR_TIMER_END("pollset_work_and_unlock", 0);
1499}
1500
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001501/* pollset->po.mu lock must be held by the caller before calling this.
1502 The function pollset_work() may temporarily release the lock (pollset->po.mu)
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001503 during the course of its execution but it will always re-acquire the lock and
1504 ensure that it is held by the time the function returns */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001505static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1506 grpc_pollset_worker **worker_hdl,
1507 gpr_timespec now, gpr_timespec deadline) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001508 GPR_TIMER_BEGIN("pollset_work", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001509 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001510 int timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
1511
1512 sigset_t new_mask;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001513
1514 grpc_pollset_worker worker;
1515 worker.next = worker.prev = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001516 worker.pt_id = pthread_self();
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001517 gpr_atm_no_barrier_store(&worker.is_kicked, (gpr_atm)0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001518
Craig Tiller557c88c2017-04-05 17:20:18 -07001519 if (worker_hdl) *worker_hdl = &worker;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001520
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001521 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
1522 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001523
1524 if (pollset->kicked_without_pollers) {
1525 /* If the pollset was kicked without pollers, pretend that the current
1526 worker got the kick and skip polling. A kick indicates that there is some
1527 work that needs attention like an event on the completion queue or an
1528 alarm */
1529 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1530 pollset->kicked_without_pollers = 0;
1531 } else if (!pollset->shutting_down) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001532 /* We use the posix-signal with number 'grpc_wakeup_signal' for waking up
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001533 (i.e 'kicking') a worker in the pollset. A 'kick' is a way to inform the
1534 worker that there is some pending work that needs immediate attention
1535 (like an event on the completion queue, or a polling island merge that
1536 results in a new epoll-fd to wait on) and that the worker should not
1537 spend time waiting in epoll_pwait().
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001538
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001539 A worker can be kicked anytime from the point it is added to the pollset
1540 via push_front_worker() (or push_back_worker()) to the point it is
1541 removed via remove_worker().
1542 If the worker is kicked before/during it calls epoll_pwait(), it should
1543 immediately exit from epoll_wait(). If the worker is kicked after it
1544 returns from epoll_wait(), then nothing really needs to be done.
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001545
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001546 To accomplish this, we mask 'grpc_wakeup_signal' on this thread at all
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001547 times *except* when it is in epoll_pwait(). This way, the worker never
1548 misses acting on a kick */
1549
Craig Tiller19196992016-06-27 18:45:56 -07001550 if (!g_initialized_sigmask) {
1551 sigemptyset(&new_mask);
1552 sigaddset(&new_mask, grpc_wakeup_signal);
1553 pthread_sigmask(SIG_BLOCK, &new_mask, &g_orig_sigmask);
1554 sigdelset(&g_orig_sigmask, grpc_wakeup_signal);
1555 g_initialized_sigmask = true;
1556 /* new_mask: The new thread mask which blocks 'grpc_wakeup_signal'.
1557 This is the mask used at all times *except during
1558 epoll_wait()*"
1559 g_orig_sigmask: The thread mask which allows 'grpc_wakeup_signal' and
Craig Tiller510ff692016-06-27 20:31:49 -07001560 this is the mask to use *during epoll_wait()*
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001561
Craig Tiller19196992016-06-27 18:45:56 -07001562 The new_mask is set on the worker before it is added to the pollset
Craig Tiller510ff692016-06-27 20:31:49 -07001563 (i.e before it can be kicked) */
Craig Tiller19196992016-06-27 18:45:56 -07001564 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001565
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001566 push_front_worker(pollset, &worker); /* Add worker to pollset */
1567
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001568 pollset_work_and_unlock(exec_ctx, pollset, &worker, timeout_ms,
1569 &g_orig_sigmask, &error);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001570 grpc_exec_ctx_flush(exec_ctx);
1571
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001572 gpr_mu_lock(&pollset->po.mu);
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001573
1574 /* Note: There is no need to reset worker.is_kicked to 0 since we are no
1575 longer going to use this worker */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001576 remove_worker(pollset, &worker);
1577 }
1578
1579 /* If we are the last worker on the pollset (i.e pollset_has_workers() is
1580 false at this point) and the pollset is shutting down, we may have to
1581 finish the shutdown process by calling finish_shutdown_locked().
1582 See pollset_shutdown() for more details.
1583
1584 Note: Continuing to access pollset here is safe; it is the caller's
1585 responsibility to not destroy a pollset when it has outstanding calls to
1586 pollset_work() */
1587 if (pollset->shutting_down && !pollset_has_workers(pollset) &&
1588 !pollset->finish_shutdown_called) {
1589 GPR_TIMER_MARK("pollset_work.finish_shutdown_locked", 0);
1590 finish_shutdown_locked(exec_ctx, pollset);
1591
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001592 gpr_mu_unlock(&pollset->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001593 grpc_exec_ctx_flush(exec_ctx);
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001594 gpr_mu_lock(&pollset->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001595 }
1596
Craig Tiller557c88c2017-04-05 17:20:18 -07001597 if (worker_hdl) *worker_hdl = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001598
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001599 gpr_tls_set(&g_current_thread_pollset, (intptr_t)0);
1600 gpr_tls_set(&g_current_thread_worker, (intptr_t)0);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001601
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001602 GPR_TIMER_END("pollset_work", 0);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001603
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001604 GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
1605 return error;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001606}
1607
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001608static void add_poll_object(grpc_exec_ctx *exec_ctx, poll_obj *bag,
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001609 poll_obj_type bag_type, poll_obj *item,
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001610 poll_obj_type item_type) {
1611 GPR_TIMER_BEGIN("add_poll_object", 0);
1612
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001613#ifdef PO_DEBUG
1614 GPR_ASSERT(item->obj_type == item_type);
1615 GPR_ASSERT(bag->obj_type == bag_type);
1616#endif
Craig Tiller57726ca2016-09-12 11:59:45 -07001617
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001618 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001619 polling_island *pi_new = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001620
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001621 gpr_mu_lock(&bag->mu);
1622 gpr_mu_lock(&item->mu);
1623
Craig Tiller7212c232016-07-06 13:11:09 -07001624retry:
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001625 /*
1626 * 1) If item->pi and bag->pi are both non-NULL and equal, do nothing
1627 * 2) If item->pi and bag->pi are both NULL, create a new polling island (with
1628 * a refcount of 2) and point item->pi and bag->pi to the new island
1629 * 3) If exactly one of item->pi or bag->pi is NULL, update it to point to
1630 * the other's non-NULL pi
1631 * 4) Finally if item->pi and bag-pi are non-NULL and not-equal, merge the
1632 * polling islands and update item->pi and bag->pi to point to the new
1633 * island
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001634 */
Craig Tiller8e8027b2016-07-07 10:42:09 -07001635
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001636 /* Early out if we are trying to add an 'fd' to a 'bag' but the fd is already
1637 * orphaned */
1638 if (item_type == POLL_OBJ_FD && (FD_FROM_PO(item))->orphaned) {
1639 gpr_mu_unlock(&item->mu);
1640 gpr_mu_unlock(&bag->mu);
Craig Tiller42ac6db2016-07-06 17:13:56 -07001641 return;
1642 }
1643
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001644 if (item->pi == bag->pi) {
1645 pi_new = item->pi;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001646 if (pi_new == NULL) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001647 /* GPR_ASSERT(item->pi == bag->pi == NULL) */
Sree Kuchibhotlaa129adf2016-10-26 16:44:44 -07001648
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001649 /* If we are adding an fd to a bag (i.e pollset or pollset_set), then
1650 * we need to do some extra work to make TSAN happy */
1651 if (item_type == POLL_OBJ_FD) {
1652 /* Unlock before creating a new polling island: the polling island will
1653 create a workqueue which creates a file descriptor, and holding an fd
1654 lock here can eventually cause a loop to appear to TSAN (making it
1655 unhappy). We don't think it's a real loop (there's an epoch point
1656 where that loop possibility disappears), but the advantages of
1657 keeping TSAN happy outweigh any performance advantage we might have
1658 by keeping the lock held. */
1659 gpr_mu_unlock(&item->mu);
1660 pi_new = polling_island_create(exec_ctx, FD_FROM_PO(item), &error);
1661 gpr_mu_lock(&item->mu);
Sree Kuchibhotlaa129adf2016-10-26 16:44:44 -07001662
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001663 /* Need to reverify any assumptions made between the initial lock and
1664 getting to this branch: if they've changed, we need to throw away our
1665 work and figure things out again. */
1666 if (item->pi != NULL) {
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001667 GRPC_POLLING_TRACE(
1668 "add_poll_object: Raced creating new polling island. pi_new: %p "
1669 "(fd: %d, %s: %p)",
1670 (void *)pi_new, FD_FROM_PO(item)->fd, poll_obj_string(bag_type),
1671 (void *)bag);
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001672 /* No need to lock 'pi_new' here since this is a new polling island
Sree Kuchibhotla8214b872017-02-23 12:49:48 -08001673 and no one has a reference to it yet */
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001674 polling_island_remove_all_fds_locked(pi_new, true, &error);
1675
1676 /* Ref and unref so that the polling island gets deleted during unref
1677 */
1678 PI_ADD_REF(pi_new, "dance_of_destruction");
1679 PI_UNREF(exec_ctx, pi_new, "dance_of_destruction");
1680 goto retry;
1681 }
Craig Tiller27da6422016-07-06 13:14:46 -07001682 } else {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001683 pi_new = polling_island_create(exec_ctx, NULL, &error);
Craig Tiller7212c232016-07-06 13:11:09 -07001684 }
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001685
1686 GRPC_POLLING_TRACE(
1687 "add_poll_object: Created new polling island. pi_new: %p (%s: %p, "
1688 "%s: %p)",
1689 (void *)pi_new, poll_obj_string(item_type), (void *)item,
1690 poll_obj_string(bag_type), (void *)bag);
1691 } else {
1692 GRPC_POLLING_TRACE(
1693 "add_poll_object: Same polling island. pi: %p (%s, %s)",
1694 (void *)pi_new, poll_obj_string(item_type),
1695 poll_obj_string(bag_type));
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001696 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001697 } else if (item->pi == NULL) {
1698 /* GPR_ASSERT(bag->pi != NULL) */
1699 /* Make pi_new point to latest pi*/
1700 pi_new = polling_island_lock(bag->pi);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001701
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001702 if (item_type == POLL_OBJ_FD) {
1703 grpc_fd *fd = FD_FROM_PO(item);
1704 polling_island_add_fds_locked(pi_new, &fd, 1, true, &error);
1705 }
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001706
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001707 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001708 GRPC_POLLING_TRACE(
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001709 "add_poll_obj: item->pi was NULL. pi_new: %p (item(%s): %p, "
1710 "bag(%s): %p)",
1711 (void *)pi_new, poll_obj_string(item_type), (void *)item,
1712 poll_obj_string(bag_type), (void *)bag);
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001713 } else if (bag->pi == NULL) {
1714 /* GPR_ASSERT(item->pi != NULL) */
1715 /* Make pi_new to point to latest pi */
1716 pi_new = polling_island_lock(item->pi);
1717 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001718 GRPC_POLLING_TRACE(
1719 "add_poll_obj: bag->pi was NULL. pi_new: %p (item(%s): %p, "
1720 "bag(%s): %p)",
1721 (void *)pi_new, poll_obj_string(item_type), (void *)item,
1722 poll_obj_string(bag_type), (void *)bag);
Sree Kuchibhotla5098f912016-05-31 10:58:17 -07001723 } else {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001724 pi_new = polling_island_merge(item->pi, bag->pi, &error);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001725 GRPC_POLLING_TRACE(
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001726 "add_poll_obj: polling islands merged. pi_new: %p (item(%s): %p, "
1727 "bag(%s): %p)",
1728 (void *)pi_new, poll_obj_string(item_type), (void *)item,
1729 poll_obj_string(bag_type), (void *)bag);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001730 }
1731
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001732 /* At this point, pi_new is the polling island that both item->pi and bag->pi
1733 MUST be pointing to */
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001734
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001735 if (item->pi != pi_new) {
1736 PI_ADD_REF(pi_new, poll_obj_string(item_type));
1737 if (item->pi != NULL) {
1738 PI_UNREF(exec_ctx, item->pi, poll_obj_string(item_type));
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001739 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001740 item->pi = pi_new;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001741 }
1742
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001743 if (bag->pi != pi_new) {
1744 PI_ADD_REF(pi_new, poll_obj_string(bag_type));
1745 if (bag->pi != NULL) {
1746 PI_UNREF(exec_ctx, bag->pi, poll_obj_string(bag_type));
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001747 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001748 bag->pi = pi_new;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001749 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001750
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001751 gpr_mu_unlock(&item->mu);
1752 gpr_mu_unlock(&bag->mu);
Craig Tiller15007612016-07-06 09:36:16 -07001753
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001754 GRPC_LOG_IF_ERROR("add_poll_object", error);
1755 GPR_TIMER_END("add_poll_object", 0);
1756}
Craig Tiller57726ca2016-09-12 11:59:45 -07001757
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001758static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1759 grpc_fd *fd) {
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001760 add_poll_object(exec_ctx, &pollset->po, POLL_OBJ_POLLSET, &fd->po,
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001761 POLL_OBJ_FD);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001762}
1763
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001764/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001765 * Pollset-set Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001766 */
1767
1768static grpc_pollset_set *pollset_set_create(void) {
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001769 grpc_pollset_set *pss = gpr_malloc(sizeof(*pss));
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001770 gpr_mu_init(&pss->po.mu);
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001771 pss->po.pi = NULL;
1772#ifdef PO_DEBUG
1773 pss->po.obj_type = POLL_OBJ_POLLSET_SET;
1774#endif
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001775 return pss;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001776}
1777
Craig Tiller9e5ac1b2017-02-14 22:25:50 -08001778static void pollset_set_destroy(grpc_exec_ctx *exec_ctx,
1779 grpc_pollset_set *pss) {
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001780 gpr_mu_destroy(&pss->po.mu);
1781
1782 if (pss->po.pi != NULL) {
Craig Tiller9e5ac1b2017-02-14 22:25:50 -08001783 PI_UNREF(exec_ctx, pss->po.pi, "pss_destroy");
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001784 }
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001785
1786 gpr_free(pss);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001787}
1788
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001789static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
1790 grpc_fd *fd) {
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001791 add_poll_object(exec_ctx, &pss->po, POLL_OBJ_POLLSET_SET, &fd->po,
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001792 POLL_OBJ_FD);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001793}
1794
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001795static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
1796 grpc_fd *fd) {
1797 /* Nothing to do */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001798}
1799
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001800static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001801 grpc_pollset_set *pss, grpc_pollset *ps) {
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001802 add_poll_object(exec_ctx, &pss->po, POLL_OBJ_POLLSET_SET, &ps->po,
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001803 POLL_OBJ_POLLSET);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001804}
1805
1806static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001807 grpc_pollset_set *pss, grpc_pollset *ps) {
1808 /* Nothing to do */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001809}
1810
1811static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
1812 grpc_pollset_set *bag,
1813 grpc_pollset_set *item) {
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001814 add_poll_object(exec_ctx, &bag->po, POLL_OBJ_POLLSET_SET, &item->po,
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001815 POLL_OBJ_POLLSET_SET);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001816}
1817
1818static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
1819 grpc_pollset_set *bag,
1820 grpc_pollset_set *item) {
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001821 /* Nothing to do */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001822}
1823
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001824/* Test helper functions
1825 * */
1826void *grpc_fd_get_polling_island(grpc_fd *fd) {
1827 polling_island *pi;
1828
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001829 gpr_mu_lock(&fd->po.mu);
1830 pi = fd->po.pi;
1831 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001832
1833 return pi;
1834}
1835
1836void *grpc_pollset_get_polling_island(grpc_pollset *ps) {
1837 polling_island *pi;
1838
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001839 gpr_mu_lock(&ps->po.mu);
1840 pi = ps->po.pi;
1841 gpr_mu_unlock(&ps->po.mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001842
1843 return pi;
1844}
1845
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001846bool grpc_are_polling_islands_equal(void *p, void *q) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001847 polling_island *p1 = p;
1848 polling_island *p2 = q;
1849
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001850 /* Note: polling_island_lock_pair() may change p1 and p2 to point to the
1851 latest polling islands in their respective linked lists */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001852 polling_island_lock_pair(&p1, &p2);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001853 polling_island_unlock_pair(p1, p2);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001854
1855 return p1 == p2;
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001856}
1857
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001858/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001859 * Event engine binding
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001860 */
1861
1862static void shutdown_engine(void) {
1863 fd_global_shutdown();
1864 pollset_global_shutdown();
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001865 polling_island_global_shutdown();
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001866}
1867
1868static const grpc_event_engine_vtable vtable = {
1869 .pollset_size = sizeof(grpc_pollset),
1870
1871 .fd_create = fd_create,
1872 .fd_wrapped_fd = fd_wrapped_fd,
1873 .fd_orphan = fd_orphan,
1874 .fd_shutdown = fd_shutdown,
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001875 .fd_is_shutdown = fd_is_shutdown,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001876 .fd_notify_on_read = fd_notify_on_read,
1877 .fd_notify_on_write = fd_notify_on_write,
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001878 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
Craig Tiller70bd4832016-06-30 14:20:46 -07001879 .fd_get_workqueue = fd_get_workqueue,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001880
1881 .pollset_init = pollset_init,
1882 .pollset_shutdown = pollset_shutdown,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001883 .pollset_destroy = pollset_destroy,
1884 .pollset_work = pollset_work,
1885 .pollset_kick = pollset_kick,
1886 .pollset_add_fd = pollset_add_fd,
1887
1888 .pollset_set_create = pollset_set_create,
1889 .pollset_set_destroy = pollset_set_destroy,
1890 .pollset_set_add_pollset = pollset_set_add_pollset,
1891 .pollset_set_del_pollset = pollset_set_del_pollset,
1892 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
1893 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
1894 .pollset_set_add_fd = pollset_set_add_fd,
1895 .pollset_set_del_fd = pollset_set_del_fd,
1896
1897 .kick_poller = kick_poller,
1898
Craig Tillerd8a3c042016-09-09 12:42:37 -07001899 .workqueue_ref = workqueue_ref,
1900 .workqueue_unref = workqueue_unref,
Craig Tiller91031da2016-12-28 15:44:25 -08001901 .workqueue_scheduler = workqueue_scheduler,
Craig Tillerd8a3c042016-09-09 12:42:37 -07001902
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001903 .shutdown_engine = shutdown_engine,
1904};
1905
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001906/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
1907 * Create a dummy epoll_fd to make sure epoll support is available */
1908static bool is_epoll_available() {
1909 int fd = epoll_create1(EPOLL_CLOEXEC);
1910 if (fd < 0) {
1911 gpr_log(
1912 GPR_ERROR,
1913 "epoll_create1 failed with error: %d. Not using epoll polling engine",
1914 fd);
1915 return false;
1916 }
1917 close(fd);
1918 return true;
1919}
1920
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001921const grpc_event_engine_vtable *grpc_init_epoll_linux(void) {
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001922 /* If use of signals is disabled, we cannot use epoll engine*/
1923 if (is_grpc_wakeup_signal_initialized && grpc_wakeup_signal < 0) {
1924 return NULL;
1925 }
1926
Ken Paysoncd7d0472016-10-11 12:24:20 -07001927 if (!grpc_has_wakeup_fd()) {
Ken Paysonbc544be2016-10-06 19:23:47 -07001928 return NULL;
1929 }
1930
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001931 if (!is_epoll_available()) {
1932 return NULL;
1933 }
1934
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001935 if (!is_grpc_wakeup_signal_initialized) {
Sree Kuchibhotlabd48c912016-09-27 16:48:25 -07001936 grpc_use_signal(SIGRTMIN + 6);
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001937 }
1938
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001939 fd_global_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001940
1941 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1942 return NULL;
1943 }
1944
1945 if (!GRPC_LOG_IF_ERROR("polling_island_global_init",
1946 polling_island_global_init())) {
1947 return NULL;
1948 }
1949
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001950 return &vtable;
1951}
1952
murgatroid99623dd4f2016-08-08 17:31:27 -07001953#else /* defined(GRPC_LINUX_EPOLL) */
1954#if defined(GRPC_POSIX_SOCKET)
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001955#include "src/core/lib/iomgr/ev_posix.h"
murgatroid99623dd4f2016-08-08 17:31:27 -07001956/* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001957 * NULL */
1958const grpc_event_engine_vtable *grpc_init_epoll_linux(void) { return NULL; }
murgatroid99623dd4f2016-08-08 17:31:27 -07001959#endif /* defined(GRPC_POSIX_SOCKET) */
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001960
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001961void grpc_use_signal(int signum) {}
murgatroid99623dd4f2016-08-08 17:31:27 -07001962#endif /* !defined(GRPC_LINUX_EPOLL) */