blob: d5534ec39735dcecaf3eac7690b3dc9dbdf37b5f [file] [log] [blame]
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
murgatroid9954070892016-08-08 17:01:18 -070034#include "src/core/lib/iomgr/port.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070035
Sree Kuchibhotla76a07952016-06-22 15:09:06 -070036/* This polling engine is only relevant on linux kernels supporting epoll() */
murgatroid99623dd4f2016-08-08 17:31:27 -070037#ifdef GRPC_LINUX_EPOLL
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070038
Craig Tiller4509c472017-04-27 19:05:13 +000039#include "src/core/lib/iomgr/ev_epollsig_linux.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070040
41#include <assert.h>
42#include <errno.h>
43#include <poll.h>
Sree Kuchibhotla4998e302016-08-16 07:26:31 -070044#include <pthread.h>
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070045#include <signal.h>
46#include <string.h>
47#include <sys/epoll.h>
48#include <sys/socket.h>
49#include <unistd.h>
50
51#include <grpc/support/alloc.h>
52#include <grpc/support/log.h>
53#include <grpc/support/string_util.h>
54#include <grpc/support/tls.h>
55#include <grpc/support/useful.h>
56
57#include "src/core/lib/iomgr/ev_posix.h"
58#include "src/core/lib/iomgr/iomgr_internal.h"
Craig Tiller376887d2017-04-06 08:27:03 -070059#include "src/core/lib/iomgr/lockfree_event.h"
Craig Tiller185f6c92017-03-17 08:33:19 -070060#include "src/core/lib/iomgr/timer.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070061#include "src/core/lib/iomgr/wakeup_fd_posix.h"
Craig Tillerb39307d2016-06-30 15:39:13 -070062#include "src/core/lib/iomgr/workqueue.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070063#include "src/core/lib/profiling/timers.h"
64#include "src/core/lib/support/block_annotate.h"
65
Craig Tillere24b24d2017-04-06 16:05:45 -070066#define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1)
67
Craig Tiller2d1e8cd2017-05-17 12:41:44 -070068#define GRPC_POLLING_TRACE(...) \
Craig Tillerbc0ab082017-05-05 10:42:44 -070069 if (GRPC_TRACER_ON(grpc_polling_trace)) { \
Craig Tiller2d1e8cd2017-05-17 12:41:44 -070070 gpr_log(GPR_INFO, __VA_ARGS__); \
Sree Kuchibhotla1e776682016-06-28 14:09:26 -070071 }
72
Sree Kuchibhotla82d73412017-02-09 18:27:45 -080073/* Uncomment the following to enable extra checks on poll_object operations */
Sree Kuchibhotlae6f516e2016-12-08 12:20:23 -080074/* #define PO_DEBUG */
75
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070076static int grpc_wakeup_signal = -1;
77static bool is_grpc_wakeup_signal_initialized = false;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070078
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070079/* Implements the function defined in grpc_posix.h. This function might be
80 * called before even calling grpc_init() to set either a different signal to
81 * use. If signum == -1, then the use of signals is disabled */
82void grpc_use_signal(int signum) {
83 grpc_wakeup_signal = signum;
84 is_grpc_wakeup_signal_initialized = true;
85
86 if (grpc_wakeup_signal < 0) {
87 gpr_log(GPR_INFO,
88 "Use of signals is disabled. Epoll engine will not be used");
89 } else {
90 gpr_log(GPR_INFO, "epoll engine will be using signal: %d",
91 grpc_wakeup_signal);
92 }
93}
94
95struct polling_island;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070096
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -080097typedef enum {
98 POLL_OBJ_FD,
99 POLL_OBJ_POLLSET,
100 POLL_OBJ_POLLSET_SET
101} poll_obj_type;
102
103typedef struct poll_obj {
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -0800104#ifdef PO_DEBUG
105 poll_obj_type obj_type;
106#endif
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800107 gpr_mu mu;
108 struct polling_island *pi;
109} poll_obj;
110
111const char *poll_obj_string(poll_obj_type po_type) {
112 switch (po_type) {
113 case POLL_OBJ_FD:
114 return "fd";
115 case POLL_OBJ_POLLSET:
116 return "pollset";
117 case POLL_OBJ_POLLSET_SET:
118 return "pollset_set";
119 }
120
121 GPR_UNREACHABLE_CODE(return "UNKNOWN");
122}
123
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700124/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700125 * Fd Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700126 */
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800127
128#define FD_FROM_PO(po) ((grpc_fd *)(po))
129
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700130struct grpc_fd {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800131 poll_obj po;
132
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700133 int fd;
134 /* refst format:
Sree Kuchibhotla5098f912016-05-31 10:58:17 -0700135 bit 0 : 1=Active / 0=Orphaned
136 bits 1-n : refcount
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700137 Ref/Unref by two to avoid altering the orphaned bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700138 gpr_atm refst;
139
Sree Kuchibhotla99983382017-02-12 17:03:27 -0800140 /* The fd is either closed or we relinquished control of it. In either
141 cases, this indicates that the 'fd' on this structure is no longer
142 valid */
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700143 bool orphaned;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700144
Sree Kuchibhotla82d73412017-02-09 18:27:45 -0800145 gpr_atm read_closure;
146 gpr_atm write_closure;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700147
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700148 struct grpc_fd *freelist_next;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700149 grpc_closure *on_done_closure;
150
Sree Kuchibhotla82d73412017-02-09 18:27:45 -0800151 /* The pollset that last noticed that the fd is readable. The actual type
152 * stored in this is (grpc_pollset *) */
153 gpr_atm read_notifier_pollset;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700154
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700155 grpc_iomgr_object iomgr_object;
156};
157
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700158/* Reference counting for fds */
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700159// #define GRPC_FD_REF_COUNT_DEBUG
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700160#ifdef GRPC_FD_REF_COUNT_DEBUG
161static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
162static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
163 int line);
164#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
165#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
166#else
167static void fd_ref(grpc_fd *fd);
168static void fd_unref(grpc_fd *fd);
169#define GRPC_FD_REF(fd, reason) fd_ref(fd)
170#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
171#endif
172
173static void fd_global_init(void);
174static void fd_global_shutdown(void);
175
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700176/*******************************************************************************
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700177 * Polling island Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700178 */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700179
Craig Tillerd8a3c042016-09-09 12:42:37 -0700180#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700181
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700182#define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__)
Craig Tillerb39307d2016-06-30 15:39:13 -0700183#define PI_UNREF(exec_ctx, p, r) \
184 pi_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__)
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700185
Craig Tillerd8a3c042016-09-09 12:42:37 -0700186#else /* defined(GRPC_WORKQUEUE_REFCOUNT_DEBUG) */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700187
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700188#define PI_ADD_REF(p, r) pi_add_ref((p))
Craig Tillerb39307d2016-06-30 15:39:13 -0700189#define PI_UNREF(exec_ctx, p, r) pi_unref((exec_ctx), (p))
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700190
Yuchen Zeng362ac1b2016-09-13 16:01:31 -0700191#endif /* !defined(GRPC_PI_REF_COUNT_DEBUG) */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700192
Craig Tiller460502e2016-10-13 10:02:08 -0700193/* This is also used as grpc_workqueue (by directly casing it) */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700194typedef struct polling_island {
Craig Tiller91031da2016-12-28 15:44:25 -0800195 grpc_closure_scheduler workqueue_scheduler;
196
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700197 gpr_mu mu;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700198 /* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
199 the refcount.
200 Once the ref count becomes zero, this structure is destroyed which means
201 we should ensure that there is never a scenario where a PI_ADD_REF() is
202 racing with a PI_UNREF() that just made the ref_count zero. */
Craig Tiller15007612016-07-06 09:36:16 -0700203 gpr_atm ref_count;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700204
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700205 /* Pointer to the polling_island this merged into.
206 * merged_to value is only set once in polling_island's lifetime (and that too
207 * only if the island is merged with another island). Because of this, we can
208 * use gpr_atm type here so that we can do atomic access on this and reduce
209 * lock contention on 'mu' mutex.
210 *
211 * Note that if this field is not NULL (i.e not 0), all the remaining fields
212 * (except mu and ref_count) are invalid and must be ignored. */
213 gpr_atm merged_to;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700214
Craig Tiller460502e2016-10-13 10:02:08 -0700215 /* Number of threads currently polling on this island */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700216 gpr_atm poller_count;
Craig Tiller460502e2016-10-13 10:02:08 -0700217 /* Mutex guarding the read end of the workqueue (must be held to pop from
218 * workqueue_items) */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700219 gpr_mu workqueue_read_mu;
Craig Tiller460502e2016-10-13 10:02:08 -0700220 /* Queue of closures to be executed */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700221 gpr_mpscq workqueue_items;
Craig Tiller460502e2016-10-13 10:02:08 -0700222 /* Count of items in workqueue_items */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700223 gpr_atm workqueue_item_count;
Craig Tiller460502e2016-10-13 10:02:08 -0700224 /* Wakeup fd used to wake pollers to check the contents of workqueue_items */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700225 grpc_wakeup_fd workqueue_wakeup_fd;
Craig Tillerb39307d2016-06-30 15:39:13 -0700226
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700227 /* The fd of the underlying epoll set */
228 int epoll_fd;
229
230 /* The file descriptors in the epoll set */
231 size_t fd_cnt;
232 size_t fd_capacity;
233 grpc_fd **fds;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700234} polling_island;
235
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700236/*******************************************************************************
237 * Pollset Declarations
238 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700239struct grpc_pollset_worker {
Sree Kuchibhotla34217242016-06-29 00:19:07 -0700240 /* Thread id of this worker */
241 pthread_t pt_id;
242
243 /* Used to prevent a worker from getting kicked multiple times */
244 gpr_atm is_kicked;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700245 struct grpc_pollset_worker *next;
246 struct grpc_pollset_worker *prev;
247};
248
249struct grpc_pollset {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800250 poll_obj po;
251
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700252 grpc_pollset_worker root_worker;
253 bool kicked_without_pollers;
254
255 bool shutting_down; /* Is the pollset shutting down ? */
256 bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
257 grpc_closure *shutdown_done; /* Called after after shutdown is complete */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700258};
259
260/*******************************************************************************
261 * Pollset-set Declarations
262 */
263struct grpc_pollset_set {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800264 poll_obj po;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700265};
266
267/*******************************************************************************
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700268 * Common helpers
269 */
270
Craig Tillerf975f742016-07-01 14:56:27 -0700271static bool append_error(grpc_error **composite, grpc_error *error,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700272 const char *desc) {
Craig Tillerf975f742016-07-01 14:56:27 -0700273 if (error == GRPC_ERROR_NONE) return true;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700274 if (*composite == GRPC_ERROR_NONE) {
Noah Eisen3005ce82017-03-14 13:38:41 -0700275 *composite = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700276 }
277 *composite = grpc_error_add_child(*composite, error);
Craig Tillerf975f742016-07-01 14:56:27 -0700278 return false;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700279}
280
281/*******************************************************************************
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700282 * Polling island Definitions
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700283 */
284
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700285/* The wakeup fd that is used to wake up all threads in a Polling island. This
286 is useful in the polling island merge operation where we need to wakeup all
287 the threads currently polling the smaller polling island (so that they can
288 start polling the new/merged polling island)
289
290 NOTE: This fd is initialized to be readable and MUST NOT be consumed i.e the
291 threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
292static grpc_wakeup_fd polling_island_wakeup_fd;
293
Craig Tiller2e620132016-10-10 15:27:44 -0700294/* The polling island being polled right now.
295 See comments in workqueue_maybe_wakeup for why this is tracked. */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700296static __thread polling_island *g_current_thread_polling_island;
297
Craig Tillerb39307d2016-06-30 15:39:13 -0700298/* Forward declaration */
Craig Tiller2b49ea92016-07-01 13:21:27 -0700299static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi);
Craig Tiller91031da2016-12-28 15:44:25 -0800300static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
301 grpc_error *error);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700302
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700303#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700304/* Currently TSAN may incorrectly flag data races between epoll_ctl and
305 epoll_wait for any grpc_fd structs that are added to the epoll set via
306 epoll_ctl and are returned (within a very short window) via epoll_wait().
307
308 To work-around this race, we establish a happens-before relation between
309 the code just-before epoll_ctl() and the code after epoll_wait() by using
310 this atomic */
311gpr_atm g_epoll_sync;
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700312#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700313
Craig Tiller91031da2016-12-28 15:44:25 -0800314static const grpc_closure_scheduler_vtable workqueue_scheduler_vtable = {
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800315 workqueue_enqueue, workqueue_enqueue, "workqueue"};
Craig Tiller91031da2016-12-28 15:44:25 -0800316
Craig Tillerb39307d2016-06-30 15:39:13 -0700317static void pi_add_ref(polling_island *pi);
318static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700319
Craig Tillerd8a3c042016-09-09 12:42:37 -0700320#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
Craig Tillera10b0b12016-09-09 16:20:07 -0700321static void pi_add_ref_dbg(polling_island *pi, const char *reason,
322 const char *file, int line) {
Craig Tiller15007612016-07-06 09:36:16 -0700323 long old_cnt = gpr_atm_acq_load(&pi->ref_count);
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700324 pi_add_ref(pi);
325 gpr_log(GPR_DEBUG, "Add ref pi: %p, old: %ld -> new:%ld (%s) - (%s, %d)",
326 (void *)pi, old_cnt, old_cnt + 1, reason, file, line);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700327}
328
Craig Tillerb39307d2016-06-30 15:39:13 -0700329static void pi_unref_dbg(grpc_exec_ctx *exec_ctx, polling_island *pi,
Craig Tillera10b0b12016-09-09 16:20:07 -0700330 const char *reason, const char *file, int line) {
Craig Tiller15007612016-07-06 09:36:16 -0700331 long old_cnt = gpr_atm_acq_load(&pi->ref_count);
Craig Tillerb39307d2016-06-30 15:39:13 -0700332 pi_unref(exec_ctx, pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700333 gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700334 (void *)pi, old_cnt, (old_cnt - 1), reason, file, line);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700335}
Craig Tillerd8a3c042016-09-09 12:42:37 -0700336
337static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
338 const char *file, int line,
339 const char *reason) {
340 if (workqueue != NULL) {
Craig Tillera10b0b12016-09-09 16:20:07 -0700341 pi_add_ref_dbg((polling_island *)workqueue, reason, file, line);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700342 }
343 return workqueue;
344}
345
346static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
347 const char *file, int line, const char *reason) {
348 if (workqueue != NULL) {
Craig Tillera10b0b12016-09-09 16:20:07 -0700349 pi_unref_dbg(exec_ctx, (polling_island *)workqueue, reason, file, line);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700350 }
351}
352#else
353static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
354 if (workqueue != NULL) {
355 pi_add_ref((polling_island *)workqueue);
356 }
357 return workqueue;
358}
359
360static void workqueue_unref(grpc_exec_ctx *exec_ctx,
361 grpc_workqueue *workqueue) {
362 if (workqueue != NULL) {
363 pi_unref(exec_ctx, (polling_island *)workqueue);
364 }
365}
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700366#endif
367
Craig Tiller15007612016-07-06 09:36:16 -0700368static void pi_add_ref(polling_island *pi) {
369 gpr_atm_no_barrier_fetch_add(&pi->ref_count, 1);
370}
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700371
Craig Tillerb39307d2016-06-30 15:39:13 -0700372static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi) {
Craig Tillerd8a3c042016-09-09 12:42:37 -0700373 /* If ref count went to zero, delete the polling island.
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700374 Note that this deletion not be done under a lock. Once the ref count goes
375 to zero, we are guaranteed that no one else holds a reference to the
376 polling island (and that there is no racing pi_add_ref() call either).
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700377
378 Also, if we are deleting the polling island and the merged_to field is
379 non-empty, we should remove a ref to the merged_to polling island
380 */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700381 if (1 == gpr_atm_full_fetch_add(&pi->ref_count, -1)) {
382 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
383 polling_island_delete(exec_ctx, pi);
384 if (next != NULL) {
385 PI_UNREF(exec_ctx, next, "pi_delete"); /* Recursive call */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700386 }
387 }
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700388}
389
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700390/* The caller is expected to hold pi->mu lock before calling this function */
391static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700392 size_t fd_count, bool add_fd_refs,
393 grpc_error **error) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700394 int err;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700395 size_t i;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700396 struct epoll_event ev;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700397 char *err_msg;
398 const char *err_desc = "polling_island_add_fds";
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700399
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700400#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700401 /* See the definition of g_epoll_sync for more context */
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700402 gpr_atm_rel_store(&g_epoll_sync, (gpr_atm)0);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700403#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700404
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700405 for (i = 0; i < fd_count; i++) {
406 ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
407 ev.data.ptr = fds[i];
408 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700409
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700410 if (err < 0) {
411 if (errno != EEXIST) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700412 gpr_asprintf(
413 &err_msg,
414 "epoll_ctl (epoll_fd: %d) add fd: %d failed with error: %d (%s)",
415 pi->epoll_fd, fds[i]->fd, errno, strerror(errno));
416 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
417 gpr_free(err_msg);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700418 }
419
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700420 continue;
421 }
422
423 if (pi->fd_cnt == pi->fd_capacity) {
424 pi->fd_capacity = GPR_MAX(pi->fd_capacity + 8, pi->fd_cnt * 3 / 2);
425 pi->fds = gpr_realloc(pi->fds, sizeof(grpc_fd *) * pi->fd_capacity);
426 }
427
428 pi->fds[pi->fd_cnt++] = fds[i];
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700429 if (add_fd_refs) {
430 GRPC_FD_REF(fds[i], "polling_island");
431 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700432 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700433}
434
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700435/* The caller is expected to hold pi->mu before calling this */
436static void polling_island_add_wakeup_fd_locked(polling_island *pi,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700437 grpc_wakeup_fd *wakeup_fd,
438 grpc_error **error) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700439 struct epoll_event ev;
440 int err;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700441 char *err_msg;
442 const char *err_desc = "polling_island_add_wakeup_fd";
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700443
444 ev.events = (uint32_t)(EPOLLIN | EPOLLET);
445 ev.data.ptr = wakeup_fd;
446 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD,
447 GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), &ev);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700448 if (err < 0 && errno != EEXIST) {
449 gpr_asprintf(&err_msg,
450 "epoll_ctl (epoll_fd: %d) add wakeup fd: %d failed with "
451 "error: %d (%s)",
Craig Tillerc3571792017-05-02 12:33:38 -0700452 pi->epoll_fd, GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), errno,
453 strerror(errno));
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700454 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
455 gpr_free(err_msg);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700456 }
457}
458
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700459/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700460static void polling_island_remove_all_fds_locked(polling_island *pi,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700461 bool remove_fd_refs,
462 grpc_error **error) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700463 int err;
464 size_t i;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700465 char *err_msg;
466 const char *err_desc = "polling_island_remove_fds";
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700467
468 for (i = 0; i < pi->fd_cnt; i++) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700469 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, pi->fds[i]->fd, NULL);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700470 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700471 gpr_asprintf(&err_msg,
472 "epoll_ctl (epoll_fd: %d) delete fds[%zu]: %d failed with "
473 "error: %d (%s)",
474 pi->epoll_fd, i, pi->fds[i]->fd, errno, strerror(errno));
475 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
476 gpr_free(err_msg);
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700477 }
478
479 if (remove_fd_refs) {
480 GRPC_FD_UNREF(pi->fds[i], "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700481 }
482 }
483
484 pi->fd_cnt = 0;
485}
486
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700487/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700488static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700489 bool is_fd_closed,
490 grpc_error **error) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700491 int err;
492 size_t i;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700493 char *err_msg;
494 const char *err_desc = "polling_island_remove_fd";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700495
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700496 /* If fd is already closed, then it would have been automatically been removed
497 from the epoll set */
498 if (!is_fd_closed) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700499 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
500 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700501 gpr_asprintf(
502 &err_msg,
503 "epoll_ctl (epoll_fd: %d) del fd: %d failed with error: %d (%s)",
504 pi->epoll_fd, fd->fd, errno, strerror(errno));
505 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
506 gpr_free(err_msg);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700507 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700508 }
509
510 for (i = 0; i < pi->fd_cnt; i++) {
511 if (pi->fds[i] == fd) {
512 pi->fds[i] = pi->fds[--pi->fd_cnt];
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700513 GRPC_FD_UNREF(fd, "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700514 break;
515 }
516 }
517}
518
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700519/* Might return NULL in case of an error */
Craig Tillerb39307d2016-06-30 15:39:13 -0700520static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
521 grpc_fd *initial_fd,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700522 grpc_error **error) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700523 polling_island *pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700524 const char *err_desc = "polling_island_create";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700525
Craig Tillerb39307d2016-06-30 15:39:13 -0700526 *error = GRPC_ERROR_NONE;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700527
Craig Tillerb39307d2016-06-30 15:39:13 -0700528 pi = gpr_malloc(sizeof(*pi));
Craig Tiller91031da2016-12-28 15:44:25 -0800529 pi->workqueue_scheduler.vtable = &workqueue_scheduler_vtable;
Craig Tillerb39307d2016-06-30 15:39:13 -0700530 gpr_mu_init(&pi->mu);
531 pi->fd_cnt = 0;
532 pi->fd_capacity = 0;
533 pi->fds = NULL;
534 pi->epoll_fd = -1;
Craig Tillerd8a3c042016-09-09 12:42:37 -0700535
536 gpr_mu_init(&pi->workqueue_read_mu);
537 gpr_mpscq_init(&pi->workqueue_items);
538 gpr_atm_rel_store(&pi->workqueue_item_count, 0);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700539
Craig Tiller15007612016-07-06 09:36:16 -0700540 gpr_atm_rel_store(&pi->ref_count, 0);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700541 gpr_atm_rel_store(&pi->poller_count, 0);
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700542 gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700543
Craig Tillerd8a3c042016-09-09 12:42:37 -0700544 if (!append_error(error, grpc_wakeup_fd_init(&pi->workqueue_wakeup_fd),
545 err_desc)) {
546 goto done;
547 }
548
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700549 pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700550
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700551 if (pi->epoll_fd < 0) {
Craig Tillerb39307d2016-06-30 15:39:13 -0700552 append_error(error, GRPC_OS_ERROR(errno, "epoll_create1"), err_desc);
553 goto done;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700554 }
555
Craig Tillerd8a3c042016-09-09 12:42:37 -0700556 polling_island_add_wakeup_fd_locked(pi, &pi->workqueue_wakeup_fd, error);
Craig Tillerb39307d2016-06-30 15:39:13 -0700557
558 if (initial_fd != NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -0700559 polling_island_add_fds_locked(pi, &initial_fd, 1, true, error);
Craig Tillerb39307d2016-06-30 15:39:13 -0700560 }
561
Craig Tillerb39307d2016-06-30 15:39:13 -0700562done:
563 if (*error != GRPC_ERROR_NONE) {
Craig Tiller0a06cd72016-07-14 13:21:24 -0700564 polling_island_delete(exec_ctx, pi);
Craig Tillerb39307d2016-06-30 15:39:13 -0700565 pi = NULL;
566 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700567 return pi;
568}
569
Craig Tillerb39307d2016-06-30 15:39:13 -0700570static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700571 GPR_ASSERT(pi->fd_cnt == 0);
572
Craig Tiller0a06cd72016-07-14 13:21:24 -0700573 if (pi->epoll_fd >= 0) {
574 close(pi->epoll_fd);
575 }
Craig Tillerd8a3c042016-09-09 12:42:37 -0700576 GPR_ASSERT(gpr_atm_no_barrier_load(&pi->workqueue_item_count) == 0);
577 gpr_mu_destroy(&pi->workqueue_read_mu);
578 gpr_mpscq_destroy(&pi->workqueue_items);
Craig Tillerb39307d2016-06-30 15:39:13 -0700579 gpr_mu_destroy(&pi->mu);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700580 grpc_wakeup_fd_destroy(&pi->workqueue_wakeup_fd);
Craig Tillerb39307d2016-06-30 15:39:13 -0700581 gpr_free(pi->fds);
582 gpr_free(pi);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700583}
584
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700585/* Attempts to gets the last polling island in the linked list (liked by the
586 * 'merged_to' field). Since this does not lock the polling island, there are no
587 * guarantees that the island returned is the last island */
588static polling_island *polling_island_maybe_get_latest(polling_island *pi) {
589 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
590 while (next != NULL) {
591 pi = next;
592 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
593 }
594
595 return pi;
596}
597
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700598/* Gets the lock on the *latest* polling island i.e the last polling island in
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700599 the linked list (linked by the 'merged_to' field). Call gpr_mu_unlock on the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700600 returned polling island's mu.
601 Usage: To lock/unlock polling island "pi", do the following:
602 polling_island *pi_latest = polling_island_lock(pi);
603 ...
604 ... critical section ..
605 ...
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700606 gpr_mu_unlock(&pi_latest->mu); // NOTE: use pi_latest->mu. NOT pi->mu */
607static polling_island *polling_island_lock(polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700608 polling_island *next = NULL;
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700609
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700610 while (true) {
611 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
612 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700613 /* Looks like 'pi' is the last node in the linked list but unless we check
614 this by holding the pi->mu lock, we cannot be sure (i.e without the
615 pi->mu lock, we don't prevent island merges).
616 To be absolutely sure, check once more by holding the pi->mu lock */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700617 gpr_mu_lock(&pi->mu);
618 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
619 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700620 /* pi is infact the last node and we have the pi->mu lock. we're done */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700621 break;
622 }
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700623
624 /* pi->merged_to is not NULL i.e pi isn't the last node anymore. pi->mu
625 * isn't the lock we are interested in. Continue traversing the list */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700626 gpr_mu_unlock(&pi->mu);
627 }
628
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700629 pi = next;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700630 }
631
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700632 return pi;
633}
634
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700635/* Gets the lock on the *latest* polling islands in the linked lists pointed by
636 *p and *q (and also updates *p and *q to point to the latest polling islands)
637
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700638 This function is needed because calling the following block of code to obtain
639 locks on polling islands (*p and *q) is prone to deadlocks.
640 {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700641 polling_island_lock(*p, true);
642 polling_island_lock(*q, true);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700643 }
644
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700645 Usage/example:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700646 polling_island *p1;
647 polling_island *p2;
648 ..
649 polling_island_lock_pair(&p1, &p2);
650 ..
651 .. Critical section with both p1 and p2 locked
652 ..
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700653 // Release locks: Always call polling_island_unlock_pair() to release locks
654 polling_island_unlock_pair(p1, p2);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700655*/
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700656static void polling_island_lock_pair(polling_island **p, polling_island **q) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700657 polling_island *pi_1 = *p;
658 polling_island *pi_2 = *q;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700659 polling_island *next_1 = NULL;
660 polling_island *next_2 = NULL;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700661
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700662 /* The algorithm is simple:
663 - Go to the last polling islands in the linked lists *pi_1 and *pi_2 (and
664 keep updating pi_1 and pi_2)
665 - Then obtain locks on the islands by following a lock order rule of
666 locking polling_island with lower address first
667 Special case: Before obtaining the locks, check if pi_1 and pi_2 are
668 pointing to the same island. If that is the case, we can just call
669 polling_island_lock()
670 - After obtaining both the locks, double check that the polling islands
671 are still the last polling islands in their respective linked lists
672 (this is because there might have been polling island merges before
673 we got the lock)
674 - If the polling islands are the last islands, we are done. If not,
675 release the locks and continue the process from the first step */
676 while (true) {
677 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
678 while (next_1 != NULL) {
679 pi_1 = next_1;
680 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700681 }
682
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700683 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
684 while (next_2 != NULL) {
685 pi_2 = next_2;
686 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
687 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700688
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700689 if (pi_1 == pi_2) {
690 pi_1 = pi_2 = polling_island_lock(pi_1);
691 break;
692 }
693
694 if (pi_1 < pi_2) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700695 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700696 gpr_mu_lock(&pi_2->mu);
697 } else {
698 gpr_mu_lock(&pi_2->mu);
699 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700700 }
701
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700702 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
703 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
704 if (next_1 == NULL && next_2 == NULL) {
705 break;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700706 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700707
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700708 gpr_mu_unlock(&pi_1->mu);
709 gpr_mu_unlock(&pi_2->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700710 }
711
712 *p = pi_1;
713 *q = pi_2;
714}
715
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700716static void polling_island_unlock_pair(polling_island *p, polling_island *q) {
717 if (p == q) {
718 gpr_mu_unlock(&p->mu);
719 } else {
720 gpr_mu_unlock(&p->mu);
721 gpr_mu_unlock(&q->mu);
722 }
723}
724
Craig Tillerd8a3c042016-09-09 12:42:37 -0700725static void workqueue_maybe_wakeup(polling_island *pi) {
Craig Tiller2e620132016-10-10 15:27:44 -0700726 /* If this thread is the current poller, then it may be that it's about to
727 decrement the current poller count, so we need to look past this thread */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700728 bool is_current_poller = (g_current_thread_polling_island == pi);
729 gpr_atm min_current_pollers_for_wakeup = is_current_poller ? 1 : 0;
730 gpr_atm current_pollers = gpr_atm_no_barrier_load(&pi->poller_count);
Craig Tiller2e620132016-10-10 15:27:44 -0700731 /* Only issue a wakeup if it's likely that some poller could come in and take
732 it right now. Note that since we do an anticipatory mpscq_pop every poll
733 loop, it's ok if we miss the wakeup here, as we'll get the work item when
734 the next poller enters anyway. */
Craig Tiller2d1e8cd2017-05-17 12:41:44 -0700735 if (current_pollers >= min_current_pollers_for_wakeup) {
Craig Tillerd8a3c042016-09-09 12:42:37 -0700736 GRPC_LOG_IF_ERROR("workqueue_wakeup_fd",
737 grpc_wakeup_fd_wakeup(&pi->workqueue_wakeup_fd));
738 }
739}
740
741static void workqueue_move_items_to_parent(polling_island *q) {
742 polling_island *p = (polling_island *)gpr_atm_no_barrier_load(&q->merged_to);
743 if (p == NULL) {
744 return;
745 }
746 gpr_mu_lock(&q->workqueue_read_mu);
747 int num_added = 0;
748 while (gpr_atm_no_barrier_load(&q->workqueue_item_count) > 0) {
749 gpr_mpscq_node *n = gpr_mpscq_pop(&q->workqueue_items);
750 if (n != NULL) {
751 gpr_atm_no_barrier_fetch_add(&q->workqueue_item_count, -1);
752 gpr_atm_no_barrier_fetch_add(&p->workqueue_item_count, 1);
753 gpr_mpscq_push(&p->workqueue_items, n);
754 num_added++;
755 }
756 }
757 gpr_mu_unlock(&q->workqueue_read_mu);
758 if (num_added > 0) {
759 workqueue_maybe_wakeup(p);
760 }
761 workqueue_move_items_to_parent(p);
762}
763
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700764static polling_island *polling_island_merge(polling_island *p,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700765 polling_island *q,
766 grpc_error **error) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700767 /* Get locks on both the polling islands */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700768 polling_island_lock_pair(&p, &q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700769
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700770 if (p != q) {
771 /* Make sure that p points to the polling island with fewer fds than q */
772 if (p->fd_cnt > q->fd_cnt) {
773 GPR_SWAP(polling_island *, p, q);
774 }
775
776 /* Merge p with q i.e move all the fds from p (The one with fewer fds) to q
777 Note that the refcounts on the fds being moved will not change here.
778 This is why the last param in the following two functions is 'false') */
779 polling_island_add_fds_locked(q, p->fds, p->fd_cnt, false, error);
780 polling_island_remove_all_fds_locked(p, false, error);
781
782 /* Wakeup all the pollers (if any) on p so that they pickup this change */
783 polling_island_add_wakeup_fd_locked(p, &polling_island_wakeup_fd, error);
784
785 /* Add the 'merged_to' link from p --> q */
786 gpr_atm_rel_store(&p->merged_to, (gpr_atm)q);
787 PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700788
Harvey Tuchdaa9f452016-11-21 15:42:49 -0500789 workqueue_move_items_to_parent(p);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700790 }
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700791 /* else if p == q, nothing needs to be done */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700792
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700793 polling_island_unlock_pair(p, q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700794
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700795 /* Return the merged polling island (Note that no merge would have happened
796 if p == q which is ok) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700797 return q;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700798}
799
Craig Tiller91031da2016-12-28 15:44:25 -0800800static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
Craig Tillerd8a3c042016-09-09 12:42:37 -0700801 grpc_error *error) {
Craig Tillerd8a3c042016-09-09 12:42:37 -0700802 GPR_TIMER_BEGIN("workqueue.enqueue", 0);
Craig Tiller91031da2016-12-28 15:44:25 -0800803 grpc_workqueue *workqueue = (grpc_workqueue *)closure->scheduler;
Craig Tiller1dc5dbb2016-09-09 17:09:39 -0700804 /* take a ref to the workqueue: otherwise it can happen that whatever events
805 * this kicks off ends up destroying the workqueue before this function
806 * completes */
807 GRPC_WORKQUEUE_REF(workqueue, "enqueue");
808 polling_island *pi = (polling_island *)workqueue;
Craig Tillerd8a3c042016-09-09 12:42:37 -0700809 gpr_atm last = gpr_atm_no_barrier_fetch_add(&pi->workqueue_item_count, 1);
810 closure->error_data.error = error;
811 gpr_mpscq_push(&pi->workqueue_items, &closure->next_data.atm_next);
812 if (last == 0) {
813 workqueue_maybe_wakeup(pi);
814 }
Craig Tillerd8a3c042016-09-09 12:42:37 -0700815 workqueue_move_items_to_parent(pi);
Craig Tiller1dc5dbb2016-09-09 17:09:39 -0700816 GRPC_WORKQUEUE_UNREF(exec_ctx, workqueue, "enqueue");
817 GPR_TIMER_END("workqueue.enqueue", 0);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700818}
819
Craig Tiller91031da2016-12-28 15:44:25 -0800820static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) {
821 polling_island *pi = (polling_island *)workqueue;
Craig Tiller801c6cc2017-01-03 08:13:13 -0800822 return workqueue == NULL ? grpc_schedule_on_exec_ctx
823 : &pi->workqueue_scheduler;
Craig Tiller91031da2016-12-28 15:44:25 -0800824}
825
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700826static grpc_error *polling_island_global_init() {
827 grpc_error *error = GRPC_ERROR_NONE;
828
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700829 error = grpc_wakeup_fd_init(&polling_island_wakeup_fd);
830 if (error == GRPC_ERROR_NONE) {
831 error = grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd);
832 }
833
834 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700835}
836
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700837static void polling_island_global_shutdown() {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700838 grpc_wakeup_fd_destroy(&polling_island_wakeup_fd);
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700839}
840
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700841/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700842 * Fd Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700843 */
844
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700845/* We need to keep a freelist not because of any concerns of malloc performance
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700846 * but instead so that implementations with multiple threads in (for example)
847 * epoll_wait deal with the race between pollset removal and incoming poll
848 * notifications.
849 *
850 * The problem is that the poller ultimately holds a reference to this
851 * object, so it is very difficult to know when is safe to free it, at least
852 * without some expensive synchronization.
853 *
854 * If we keep the object freelisted, in the worst case losing this race just
855 * becomes a spurious read notification on a reused fd.
856 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700857
858/* The alarm system needs to be able to wakeup 'some poller' sometimes
859 * (specifically when a new alarm needs to be triggered earlier than the next
860 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
861 * case occurs. */
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700862
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700863static grpc_fd *fd_freelist = NULL;
864static gpr_mu fd_freelist_mu;
865
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700866#ifdef GRPC_FD_REF_COUNT_DEBUG
867#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
868#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
869static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
870 int line) {
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700871 gpr_log(GPR_DEBUG, "FD %d %p ref %d %ld -> %ld [%s; %s:%d]", fd->fd,
872 (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700873 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
874#else
875#define REF_BY(fd, n, reason) ref_by(fd, n)
876#define UNREF_BY(fd, n, reason) unref_by(fd, n)
877static void ref_by(grpc_fd *fd, int n) {
878#endif
879 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
880}
881
882#ifdef GRPC_FD_REF_COUNT_DEBUG
883static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
884 int line) {
885 gpr_atm old;
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700886 gpr_log(GPR_DEBUG, "FD %d %p unref %d %ld -> %ld [%s; %s:%d]", fd->fd,
887 (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700888 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
889#else
890static void unref_by(grpc_fd *fd, int n) {
891 gpr_atm old;
892#endif
893 old = gpr_atm_full_fetch_add(&fd->refst, -n);
894 if (old == n) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700895 /* Add the fd to the freelist */
896 gpr_mu_lock(&fd_freelist_mu);
897 fd->freelist_next = fd_freelist;
898 fd_freelist = fd;
899 grpc_iomgr_unregister_object(&fd->iomgr_object);
Sree Kuchibhotla82d73412017-02-09 18:27:45 -0800900
Craig Tiller376887d2017-04-06 08:27:03 -0700901 grpc_lfev_destroy(&fd->read_closure);
902 grpc_lfev_destroy(&fd->write_closure);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700903
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700904 gpr_mu_unlock(&fd_freelist_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700905 } else {
906 GPR_ASSERT(old > n);
907 }
908}
909
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700910/* Increment refcount by two to avoid changing the orphan bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700911#ifdef GRPC_FD_REF_COUNT_DEBUG
912static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
913 int line) {
914 ref_by(fd, 2, reason, file, line);
915}
916
917static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
918 int line) {
919 unref_by(fd, 2, reason, file, line);
920}
921#else
922static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700923static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
924#endif
925
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700926static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
927
928static void fd_global_shutdown(void) {
929 gpr_mu_lock(&fd_freelist_mu);
930 gpr_mu_unlock(&fd_freelist_mu);
931 while (fd_freelist != NULL) {
932 grpc_fd *fd = fd_freelist;
933 fd_freelist = fd_freelist->freelist_next;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800934 gpr_mu_destroy(&fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700935 gpr_free(fd);
936 }
937 gpr_mu_destroy(&fd_freelist_mu);
938}
939
940static grpc_fd *fd_create(int fd, const char *name) {
941 grpc_fd *new_fd = NULL;
942
943 gpr_mu_lock(&fd_freelist_mu);
944 if (fd_freelist != NULL) {
945 new_fd = fd_freelist;
946 fd_freelist = fd_freelist->freelist_next;
947 }
948 gpr_mu_unlock(&fd_freelist_mu);
949
950 if (new_fd == NULL) {
951 new_fd = gpr_malloc(sizeof(grpc_fd));
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800952 gpr_mu_init(&new_fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700953 }
954
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800955 /* Note: It is not really needed to get the new_fd->po.mu lock here. If this
956 * is a newly created fd (or an fd we got from the freelist), no one else
957 * would be holding a lock to it anyway. */
958 gpr_mu_lock(&new_fd->po.mu);
959 new_fd->po.pi = NULL;
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -0800960#ifdef PO_DEBUG
961 new_fd->po.obj_type = POLL_OBJ_FD;
962#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700963
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700964 gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700965 new_fd->fd = fd;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700966 new_fd->orphaned = false;
Craig Tiller376887d2017-04-06 08:27:03 -0700967 grpc_lfev_init(&new_fd->read_closure);
968 grpc_lfev_init(&new_fd->write_closure);
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -0800969 gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
Sree Kuchibhotla82d73412017-02-09 18:27:45 -0800970
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700971 new_fd->freelist_next = NULL;
972 new_fd->on_done_closure = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700973
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800974 gpr_mu_unlock(&new_fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700975
976 char *fd_name;
977 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
978 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700979#ifdef GRPC_FD_REF_COUNT_DEBUG
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700980 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, (void *)new_fd, fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700981#endif
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700982 gpr_free(fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700983 return new_fd;
984}
985
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700986static int fd_wrapped_fd(grpc_fd *fd) {
987 int ret_fd = -1;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800988 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700989 if (!fd->orphaned) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700990 ret_fd = fd->fd;
991 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800992 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700993
994 return ret_fd;
995}
996
997static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
998 grpc_closure *on_done, int *release_fd,
999 const char *reason) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001000 bool is_fd_closed = false;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001001 grpc_error *error = GRPC_ERROR_NONE;
Craig Tiller15007612016-07-06 09:36:16 -07001002 polling_island *unref_pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001003
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001004 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001005 fd->on_done_closure = on_done;
1006
1007 /* If release_fd is not NULL, we should be relinquishing control of the file
1008 descriptor fd->fd (but we still own the grpc_fd structure). */
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001009 if (release_fd != NULL) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001010 *release_fd = fd->fd;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001011 } else {
1012 close(fd->fd);
1013 is_fd_closed = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001014 }
1015
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001016 fd->orphaned = true;
1017
1018 /* Remove the active status but keep referenced. We want this grpc_fd struct
1019 to be alive (and not added to freelist) until the end of this function */
1020 REF_BY(fd, 1, reason);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001021
1022 /* Remove the fd from the polling island:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001023 - Get a lock on the latest polling island (i.e the last island in the
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001024 linked list pointed by fd->po.pi). This is the island that
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001025 would actually contain the fd
1026 - Remove the fd from the latest polling island
1027 - Unlock the latest polling island
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001028 - Set fd->po.pi to NULL (but remove the ref on the polling island
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001029 before doing this.) */
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001030 if (fd->po.pi != NULL) {
1031 polling_island *pi_latest = polling_island_lock(fd->po.pi);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001032 polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed, &error);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001033 gpr_mu_unlock(&pi_latest->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001034
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001035 unref_pi = fd->po.pi;
1036 fd->po.pi = NULL;
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001037 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001038
Craig Tiller91031da2016-12-28 15:44:25 -08001039 grpc_closure_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error));
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001040
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001041 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001042 UNREF_BY(fd, 2, reason); /* Drop the reference */
Craig Tiller15007612016-07-06 09:36:16 -07001043 if (unref_pi != NULL) {
Craig Tiller0a06cd72016-07-14 13:21:24 -07001044 /* Unref stale polling island here, outside the fd lock above.
1045 The polling island owns a workqueue which owns an fd, and unreffing
1046 inside the lock can cause an eventual lock loop that makes TSAN very
1047 unhappy. */
Craig Tiller15007612016-07-06 09:36:16 -07001048 PI_UNREF(exec_ctx, unref_pi, "fd_orphan");
1049 }
Yuchen Zeng4ebace72017-06-05 17:24:06 -07001050 if (error != GRPC_ERROR_NONE) {
1051 const char *msg = grpc_error_string(error);
1052 gpr_log(GPR_DEBUG, "fd_orphan: %s", msg);
1053 }
Yuchen Zenga0399f22016-08-04 17:52:53 -07001054 GRPC_ERROR_UNREF(error);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001055}
1056
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001057static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
1058 grpc_fd *fd) {
Sree Kuchibhotlafb7ced62017-02-15 18:31:57 -08001059 gpr_atm notifier = gpr_atm_acq_load(&fd->read_notifier_pollset);
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001060 return (grpc_pollset *)notifier;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001061}
1062
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001063static bool fd_is_shutdown(grpc_fd *fd) {
Craig Tiller376887d2017-04-06 08:27:03 -07001064 return grpc_lfev_is_shutdown(&fd->read_closure);
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001065}
1066
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001067/* Might be called multiple times */
Craig Tillercda759d2017-01-27 11:37:37 -08001068static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
Craig Tillere16372b2017-04-06 08:51:39 -07001069 if (grpc_lfev_set_shutdown(exec_ctx, &fd->read_closure,
1070 GRPC_ERROR_REF(why))) {
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001071 shutdown(fd->fd, SHUT_RDWR);
Craig Tillere16372b2017-04-06 08:51:39 -07001072 grpc_lfev_set_shutdown(exec_ctx, &fd->write_closure, GRPC_ERROR_REF(why));
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001073 }
Craig Tiller376887d2017-04-06 08:27:03 -07001074 GRPC_ERROR_UNREF(why);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001075}
1076
1077static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1078 grpc_closure *closure) {
Craig Tiller70652142017-04-06 08:31:23 -07001079 grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001080}
1081
1082static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1083 grpc_closure *closure) {
Craig Tiller70652142017-04-06 08:31:23 -07001084 grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001085}
1086
Craig Tillerd6ba6192016-06-30 15:42:41 -07001087static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001088 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001089 grpc_workqueue *workqueue =
1090 GRPC_WORKQUEUE_REF((grpc_workqueue *)fd->po.pi, "fd_get_workqueue");
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001091 gpr_mu_unlock(&fd->po.mu);
Craig Tillerd6ba6192016-06-30 15:42:41 -07001092 return workqueue;
1093}
Craig Tiller70bd4832016-06-30 14:20:46 -07001094
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001095/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001096 * Pollset Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001097 */
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001098GPR_TLS_DECL(g_current_thread_pollset);
1099GPR_TLS_DECL(g_current_thread_worker);
Craig Tiller19196992016-06-27 18:45:56 -07001100static __thread bool g_initialized_sigmask;
1101static __thread sigset_t g_orig_sigmask;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001102
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001103static void sig_handler(int sig_num) {
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001104#ifdef GRPC_EPOLL_DEBUG
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001105 gpr_log(GPR_INFO, "Received signal %d", sig_num);
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -07001106#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001107}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001108
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001109static void poller_kick_init() { signal(grpc_wakeup_signal, sig_handler); }
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001110
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001111/* Global state management */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001112static grpc_error *pollset_global_init(void) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001113 gpr_tls_init(&g_current_thread_pollset);
1114 gpr_tls_init(&g_current_thread_worker);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001115 poller_kick_init();
Craig Tillerc3571792017-05-02 12:33:38 -07001116 return GRPC_ERROR_NONE;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001117}
1118
1119static void pollset_global_shutdown(void) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001120 gpr_tls_destroy(&g_current_thread_pollset);
1121 gpr_tls_destroy(&g_current_thread_worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001122}
1123
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001124static grpc_error *pollset_worker_kick(grpc_pollset_worker *worker) {
1125 grpc_error *err = GRPC_ERROR_NONE;
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001126
1127 /* Kick the worker only if it was not already kicked */
1128 if (gpr_atm_no_barrier_cas(&worker->is_kicked, (gpr_atm)0, (gpr_atm)1)) {
1129 GRPC_POLLING_TRACE(
1130 "pollset_worker_kick: Kicking worker: %p (thread id: %ld)",
Ken Payson975b5102017-03-30 17:38:40 -07001131 (void *)worker, (long int)worker->pt_id);
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001132 int err_num = pthread_kill(worker->pt_id, grpc_wakeup_signal);
1133 if (err_num != 0) {
1134 err = GRPC_OS_ERROR(err_num, "pthread_kill");
1135 }
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001136 }
1137 return err;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001138}
1139
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001140/* Return 1 if the pollset has active threads in pollset_work (pollset must
1141 * be locked) */
1142static int pollset_has_workers(grpc_pollset *p) {
1143 return p->root_worker.next != &p->root_worker;
1144}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001145
1146static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1147 worker->prev->next = worker->next;
1148 worker->next->prev = worker->prev;
1149}
1150
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001151static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
1152 if (pollset_has_workers(p)) {
1153 grpc_pollset_worker *w = p->root_worker.next;
1154 remove_worker(p, w);
1155 return w;
1156 } else {
1157 return NULL;
1158 }
1159}
1160
1161static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1162 worker->next = &p->root_worker;
1163 worker->prev = worker->next->prev;
1164 worker->prev->next = worker->next->prev = worker;
1165}
1166
1167static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1168 worker->prev = &p->root_worker;
1169 worker->next = worker->prev->next;
1170 worker->prev->next = worker->next->prev = worker;
1171}
1172
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001173/* p->mu must be held before calling this function */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001174static grpc_error *pollset_kick(grpc_pollset *p,
1175 grpc_pollset_worker *specific_worker) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001176 GPR_TIMER_BEGIN("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001177 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001178 const char *err_desc = "Kick Failure";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001179 grpc_pollset_worker *worker = specific_worker;
1180 if (worker != NULL) {
1181 if (worker == GRPC_POLLSET_KICK_BROADCAST) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001182 if (pollset_has_workers(p)) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001183 GPR_TIMER_BEGIN("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001184 for (worker = p->root_worker.next; worker != &p->root_worker;
1185 worker = worker->next) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001186 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001187 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001188 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001189 }
Craig Tillera218a062016-06-26 09:58:37 -07001190 GPR_TIMER_END("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001191 } else {
1192 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001193 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001194 } else {
1195 GPR_TIMER_MARK("kicked_specifically", 0);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001196 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001197 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001198 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001199 }
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001200 } else if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)p) {
1201 /* Since worker == NULL, it means that we can kick "any" worker on this
1202 pollset 'p'. If 'p' happens to be the same pollset this thread is
1203 currently polling (i.e in pollset_work() function), then there is no need
1204 to kick any other worker since the current thread can just absorb the
1205 kick. This is the reason why we enter this case only when
1206 g_current_thread_pollset is != p */
1207
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001208 GPR_TIMER_MARK("kick_anonymous", 0);
1209 worker = pop_front_worker(p);
1210 if (worker != NULL) {
1211 GPR_TIMER_MARK("finally_kick", 0);
1212 push_back_worker(p, worker);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001213 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001214 } else {
1215 GPR_TIMER_MARK("kicked_no_pollers", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001216 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001217 }
1218 }
1219
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001220 GPR_TIMER_END("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001221 GRPC_LOG_IF_ERROR("pollset_kick", GRPC_ERROR_REF(error));
1222 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001223}
1224
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001225static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001226 gpr_mu_init(&pollset->po.mu);
1227 *mu = &pollset->po.mu;
1228 pollset->po.pi = NULL;
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001229#ifdef PO_DEBUG
1230 pollset->po.obj_type = POLL_OBJ_POLLSET;
1231#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001232
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001233 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001234 pollset->kicked_without_pollers = false;
1235
1236 pollset->shutting_down = false;
1237 pollset->finish_shutdown_called = false;
1238 pollset->shutdown_done = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001239}
1240
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001241/* Convert a timespec to milliseconds:
1242 - Very small or negative poll times are clamped to zero to do a non-blocking
1243 poll (which becomes spin polling)
1244 - Other small values are rounded up to one millisecond
1245 - Longer than a millisecond polls are rounded up to the next nearest
1246 millisecond to avoid spinning
1247 - Infinite timeouts are converted to -1 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001248static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
1249 gpr_timespec now) {
1250 gpr_timespec timeout;
1251 static const int64_t max_spin_polling_us = 10;
1252 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
1253 return -1;
1254 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001255
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001256 if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
1257 max_spin_polling_us,
1258 GPR_TIMESPAN))) <= 0) {
1259 return 0;
1260 }
1261 timeout = gpr_time_sub(deadline, now);
Craig Tiller799e7e82017-03-27 12:42:34 -07001262 int millis = gpr_time_to_millis(gpr_time_add(
1263 timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
1264 return millis >= 1 ? millis : 1;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001265}
1266
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001267static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1268 grpc_pollset *notifier) {
Craig Tiller70652142017-04-06 08:31:23 -07001269 grpc_lfev_set_ready(exec_ctx, &fd->read_closure);
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001270
Sree Kuchibhotla4db8c822017-02-12 17:07:31 -08001271 /* Note, it is possible that fd_become_readable might be called twice with
Sree Kuchibhotla4c60d0d2017-02-12 17:09:08 -08001272 different 'notifier's when an fd becomes readable and it is in two epoll
1273 sets (This can happen briefly during polling island merges). In such cases
1274 it does not really matter which notifer is set as the read_notifier_pollset
1275 (They would both point to the same polling island anyway) */
Sree Kuchibhotlafb7ced62017-02-15 18:31:57 -08001276 /* Use release store to match with acquire load in fd_get_read_notifier */
1277 gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001278}
1279
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001280static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
Craig Tillere16372b2017-04-06 08:51:39 -07001281 grpc_lfev_set_ready(exec_ctx, &fd->write_closure);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001282}
1283
Craig Tillerb39307d2016-06-30 15:39:13 -07001284static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx,
1285 grpc_pollset *ps, char *reason) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001286 if (ps->po.pi != NULL) {
1287 PI_UNREF(exec_ctx, ps->po.pi, reason);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001288 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001289 ps->po.pi = NULL;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001290}
1291
1292static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
1293 grpc_pollset *pollset) {
1294 /* The pollset cannot have any workers if we are at this stage */
1295 GPR_ASSERT(!pollset_has_workers(pollset));
1296
1297 pollset->finish_shutdown_called = true;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001298
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001299 /* Release the ref and set pollset->po.pi to NULL */
Craig Tillerb39307d2016-06-30 15:39:13 -07001300 pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown");
Craig Tiller91031da2016-12-28 15:44:25 -08001301 grpc_closure_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001302}
1303
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001304/* pollset->po.mu lock must be held by the caller before calling this */
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001305static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1306 grpc_closure *closure) {
1307 GPR_TIMER_BEGIN("pollset_shutdown", 0);
1308 GPR_ASSERT(!pollset->shutting_down);
1309 pollset->shutting_down = true;
1310 pollset->shutdown_done = closure;
1311 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1312
1313 /* If the pollset has any workers, we cannot call finish_shutdown_locked()
1314 because it would release the underlying polling island. In such a case, we
1315 let the last worker call finish_shutdown_locked() from pollset_work() */
1316 if (!pollset_has_workers(pollset)) {
1317 GPR_ASSERT(!pollset->finish_shutdown_called);
1318 GPR_TIMER_MARK("pollset_shutdown.finish_shutdown_locked", 0);
1319 finish_shutdown_locked(exec_ctx, pollset);
1320 }
1321 GPR_TIMER_END("pollset_shutdown", 0);
1322}
1323
1324/* pollset_shutdown is guaranteed to be called before pollset_destroy. So other
1325 * than destroying the mutexes, there is nothing special that needs to be done
1326 * here */
Craig Tillerf8401102017-04-17 09:47:28 -07001327static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001328 GPR_ASSERT(!pollset_has_workers(pollset));
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001329 gpr_mu_destroy(&pollset->po.mu);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001330}
1331
Craig Tillerd8a3c042016-09-09 12:42:37 -07001332static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx,
1333 polling_island *pi) {
1334 if (gpr_mu_trylock(&pi->workqueue_read_mu)) {
1335 gpr_mpscq_node *n = gpr_mpscq_pop(&pi->workqueue_items);
1336 gpr_mu_unlock(&pi->workqueue_read_mu);
1337 if (n != NULL) {
Craig Tiller2d1e8cd2017-05-17 12:41:44 -07001338 gpr_atm remaining =
1339 gpr_atm_full_fetch_add(&pi->workqueue_item_count, -1) - 1;
1340 GRPC_POLLING_TRACE(
1341 "maybe_do_workqueue_work: pi: %p: got closure %p, remaining = "
1342 "%" PRIdPTR,
1343 pi, n, remaining);
1344 if (remaining > 0) {
Craig Tillerd8a3c042016-09-09 12:42:37 -07001345 workqueue_maybe_wakeup(pi);
1346 }
1347 grpc_closure *c = (grpc_closure *)n;
Craig Tiller061ef742016-12-29 10:54:09 -08001348 grpc_error *error = c->error_data.error;
Mark D. Roth43f774e2017-04-04 16:35:37 -07001349#ifndef NDEBUG
1350 c->scheduled = false;
1351#endif
Craig Tiller061ef742016-12-29 10:54:09 -08001352 c->cb(exec_ctx, c->cb_arg, error);
1353 GRPC_ERROR_UNREF(error);
Craig Tillerd8a3c042016-09-09 12:42:37 -07001354 return true;
1355 } else if (gpr_atm_no_barrier_load(&pi->workqueue_item_count) > 0) {
Craig Tiller460502e2016-10-13 10:02:08 -07001356 /* n == NULL might mean there's work but it's not available to be popped
1357 * yet - try to ensure another workqueue wakes up to check shortly if so
1358 */
Craig Tiller2d1e8cd2017-05-17 12:41:44 -07001359 GRPC_POLLING_TRACE(
1360 "maybe_do_workqueue_work: pi: %p: more to do, but not yet", pi);
Craig Tillerd8a3c042016-09-09 12:42:37 -07001361 workqueue_maybe_wakeup(pi);
1362 }
Craig Tiller2d1e8cd2017-05-17 12:41:44 -07001363 } else {
1364 GRPC_POLLING_TRACE("maybe_do_workqueue_work: pi: %p: read already locked",
1365 pi);
Craig Tillerd8a3c042016-09-09 12:42:37 -07001366 }
1367 return false;
1368}
1369
Craig Tiller84ea3412016-09-08 14:57:56 -07001370#define GRPC_EPOLL_MAX_EVENTS 100
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001371/* Note: sig_mask contains the signal mask to use *during* epoll_wait() */
1372static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001373 grpc_pollset *pollset,
1374 grpc_pollset_worker *worker, int timeout_ms,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001375 sigset_t *sig_mask, grpc_error **error) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001376 struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001377 int epoll_fd = -1;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001378 int ep_rv;
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001379 polling_island *pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001380 char *err_msg;
1381 const char *err_desc = "pollset_work_and_unlock";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001382 GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
1383
1384 /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001385 latest polling island pointed by pollset->po.pi
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001386
1387 Since epoll_fd is immutable, we can read it without obtaining the polling
1388 island lock. There is however a possibility that the polling island (from
1389 which we got the epoll_fd) got merged with another island while we are
1390 in this function. This is still okay because in such a case, we will wakeup
1391 right-away from epoll_wait() and pick up the latest polling_island the next
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001392 this function (i.e pollset_work_and_unlock()) is called */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001393
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001394 if (pollset->po.pi == NULL) {
1395 pollset->po.pi = polling_island_create(exec_ctx, NULL, error);
1396 if (pollset->po.pi == NULL) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001397 GPR_TIMER_END("pollset_work_and_unlock", 0);
1398 return; /* Fatal error. We cannot continue */
1399 }
1400
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001401 PI_ADD_REF(pollset->po.pi, "ps");
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001402 GRPC_POLLING_TRACE("pollset_work: pollset: %p created new pi: %p",
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001403 (void *)pollset, (void *)pollset->po.pi);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001404 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001405
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001406 pi = polling_island_maybe_get_latest(pollset->po.pi);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001407 epoll_fd = pi->epoll_fd;
1408
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001409 /* Update the pollset->po.pi since the island being pointed by
1410 pollset->po.pi maybe older than the one pointed by pi) */
1411 if (pollset->po.pi != pi) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001412 /* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the
1413 polling island to be deleted */
1414 PI_ADD_REF(pi, "ps");
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001415 PI_UNREF(exec_ctx, pollset->po.pi, "ps");
1416 pollset->po.pi = pi;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001417 }
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001418
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001419 /* Add an extra ref so that the island does not get destroyed (which means
1420 the epoll_fd won't be closed) while we are are doing an epoll_wait() on the
1421 epoll_fd */
1422 PI_ADD_REF(pi, "ps_work");
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001423 gpr_mu_unlock(&pollset->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001424
Craig Tiller460502e2016-10-13 10:02:08 -07001425 /* If we get some workqueue work to do, it might end up completing an item on
1426 the completion queue, so there's no need to poll... so we skip that and
1427 redo the complete loop to verify */
Craig Tiller2d1e8cd2017-05-17 12:41:44 -07001428 GRPC_POLLING_TRACE("pollset_work: pollset: %p, worker %p, pi %p", pollset,
1429 worker, pi);
Craig Tillerd8a3c042016-09-09 12:42:37 -07001430 if (!maybe_do_workqueue_work(exec_ctx, pi)) {
Craig Tiller2d1e8cd2017-05-17 12:41:44 -07001431 GRPC_POLLING_TRACE("pollset_work: begins");
Craig Tillerd8a3c042016-09-09 12:42:37 -07001432 gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1);
1433 g_current_thread_polling_island = pi;
1434
Vijay Paicef54012016-08-28 23:05:31 -07001435 GRPC_SCHEDULING_START_BLOCKING_REGION;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001436 ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms,
1437 sig_mask);
Vijay Paicef54012016-08-28 23:05:31 -07001438 GRPC_SCHEDULING_END_BLOCKING_REGION;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001439 if (ep_rv < 0) {
1440 if (errno != EINTR) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001441 gpr_asprintf(&err_msg,
1442 "epoll_wait() epoll fd: %d failed with error: %d (%s)",
1443 epoll_fd, errno, strerror(errno));
1444 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001445 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001446 /* We were interrupted. Save an interation by doing a zero timeout
1447 epoll_wait to see if there are any other events of interest */
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001448 GRPC_POLLING_TRACE(
1449 "pollset_work: pollset: %p, worker: %p received kick",
1450 (void *)pollset, (void *)worker);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001451 ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001452 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001453 }
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001454
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001455#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001456 /* See the definition of g_poll_sync for more details */
1457 gpr_atm_acq_load(&g_epoll_sync);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001458#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001459
Sree Kuchibhotla58e58962016-06-13 00:52:56 -07001460 for (int i = 0; i < ep_rv; ++i) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001461 void *data_ptr = ep_ev[i].data.ptr;
Craig Tillerc3571792017-05-02 12:33:38 -07001462 if (data_ptr == &pi->workqueue_wakeup_fd) {
Craig Tillere49959d2017-01-26 08:39:38 -08001463 append_error(error,
1464 grpc_wakeup_fd_consume_wakeup(&pi->workqueue_wakeup_fd),
Craig Tillerd8a3c042016-09-09 12:42:37 -07001465 err_desc);
1466 maybe_do_workqueue_work(exec_ctx, pi);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001467 } else if (data_ptr == &polling_island_wakeup_fd) {
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001468 GRPC_POLLING_TRACE(
1469 "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: "
1470 "%d) got merged",
1471 (void *)pollset, (void *)worker, epoll_fd);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001472 /* This means that our polling island is merged with a different
1473 island. We do not have to do anything here since the subsequent call
1474 to the function pollset_work_and_unlock() will pick up the correct
1475 epoll_fd */
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001476 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001477 grpc_fd *fd = data_ptr;
1478 int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
1479 int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
1480 int write_ev = ep_ev[i].events & EPOLLOUT;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001481 if (read_ev || cancel) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001482 fd_become_readable(exec_ctx, fd, pollset);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001483 }
1484 if (write_ev || cancel) {
1485 fd_become_writable(exec_ctx, fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001486 }
1487 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001488 }
Craig Tillerd8a3c042016-09-09 12:42:37 -07001489
1490 g_current_thread_polling_island = NULL;
1491 gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1);
Craig Tiller2d1e8cd2017-05-17 12:41:44 -07001492 GRPC_POLLING_TRACE("pollset_work: ends");
Craig Tillerd8a3c042016-09-09 12:42:37 -07001493 }
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001494
1495 GPR_ASSERT(pi != NULL);
1496
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001497 /* Before leaving, release the extra ref we added to the polling island. It
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001498 is important to use "pi" here (i.e our old copy of pollset->po.pi
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001499 that we got before releasing the polling island lock). This is because
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001500 pollset->po.pi pointer might get udpated in other parts of the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001501 code when there is an island merge while we are doing epoll_wait() above */
Craig Tillerb39307d2016-06-30 15:39:13 -07001502 PI_UNREF(exec_ctx, pi, "ps_work");
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001503
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001504 GPR_TIMER_END("pollset_work_and_unlock", 0);
1505}
1506
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001507/* pollset->po.mu lock must be held by the caller before calling this.
1508 The function pollset_work() may temporarily release the lock (pollset->po.mu)
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001509 during the course of its execution but it will always re-acquire the lock and
1510 ensure that it is held by the time the function returns */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001511static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1512 grpc_pollset_worker **worker_hdl,
1513 gpr_timespec now, gpr_timespec deadline) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001514 GPR_TIMER_BEGIN("pollset_work", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001515 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001516 int timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
1517
1518 sigset_t new_mask;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001519
1520 grpc_pollset_worker worker;
1521 worker.next = worker.prev = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001522 worker.pt_id = pthread_self();
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001523 gpr_atm_no_barrier_store(&worker.is_kicked, (gpr_atm)0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001524
Craig Tiller557c88c2017-04-05 17:20:18 -07001525 if (worker_hdl) *worker_hdl = &worker;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001526
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001527 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
1528 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001529
1530 if (pollset->kicked_without_pollers) {
1531 /* If the pollset was kicked without pollers, pretend that the current
1532 worker got the kick and skip polling. A kick indicates that there is some
1533 work that needs attention like an event on the completion queue or an
1534 alarm */
1535 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1536 pollset->kicked_without_pollers = 0;
1537 } else if (!pollset->shutting_down) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001538 /* We use the posix-signal with number 'grpc_wakeup_signal' for waking up
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001539 (i.e 'kicking') a worker in the pollset. A 'kick' is a way to inform the
1540 worker that there is some pending work that needs immediate attention
1541 (like an event on the completion queue, or a polling island merge that
1542 results in a new epoll-fd to wait on) and that the worker should not
1543 spend time waiting in epoll_pwait().
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001544
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001545 A worker can be kicked anytime from the point it is added to the pollset
1546 via push_front_worker() (or push_back_worker()) to the point it is
1547 removed via remove_worker().
1548 If the worker is kicked before/during it calls epoll_pwait(), it should
1549 immediately exit from epoll_wait(). If the worker is kicked after it
1550 returns from epoll_wait(), then nothing really needs to be done.
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001551
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001552 To accomplish this, we mask 'grpc_wakeup_signal' on this thread at all
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001553 times *except* when it is in epoll_pwait(). This way, the worker never
1554 misses acting on a kick */
1555
Craig Tiller19196992016-06-27 18:45:56 -07001556 if (!g_initialized_sigmask) {
1557 sigemptyset(&new_mask);
1558 sigaddset(&new_mask, grpc_wakeup_signal);
1559 pthread_sigmask(SIG_BLOCK, &new_mask, &g_orig_sigmask);
1560 sigdelset(&g_orig_sigmask, grpc_wakeup_signal);
1561 g_initialized_sigmask = true;
1562 /* new_mask: The new thread mask which blocks 'grpc_wakeup_signal'.
1563 This is the mask used at all times *except during
1564 epoll_wait()*"
1565 g_orig_sigmask: The thread mask which allows 'grpc_wakeup_signal' and
Craig Tiller510ff692016-06-27 20:31:49 -07001566 this is the mask to use *during epoll_wait()*
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001567
Craig Tiller19196992016-06-27 18:45:56 -07001568 The new_mask is set on the worker before it is added to the pollset
Craig Tiller510ff692016-06-27 20:31:49 -07001569 (i.e before it can be kicked) */
Craig Tiller19196992016-06-27 18:45:56 -07001570 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001571
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001572 push_front_worker(pollset, &worker); /* Add worker to pollset */
1573
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001574 pollset_work_and_unlock(exec_ctx, pollset, &worker, timeout_ms,
1575 &g_orig_sigmask, &error);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001576 grpc_exec_ctx_flush(exec_ctx);
1577
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001578 gpr_mu_lock(&pollset->po.mu);
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001579
1580 /* Note: There is no need to reset worker.is_kicked to 0 since we are no
1581 longer going to use this worker */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001582 remove_worker(pollset, &worker);
1583 }
1584
1585 /* If we are the last worker on the pollset (i.e pollset_has_workers() is
1586 false at this point) and the pollset is shutting down, we may have to
1587 finish the shutdown process by calling finish_shutdown_locked().
1588 See pollset_shutdown() for more details.
1589
1590 Note: Continuing to access pollset here is safe; it is the caller's
1591 responsibility to not destroy a pollset when it has outstanding calls to
1592 pollset_work() */
1593 if (pollset->shutting_down && !pollset_has_workers(pollset) &&
1594 !pollset->finish_shutdown_called) {
1595 GPR_TIMER_MARK("pollset_work.finish_shutdown_locked", 0);
1596 finish_shutdown_locked(exec_ctx, pollset);
1597
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001598 gpr_mu_unlock(&pollset->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001599 grpc_exec_ctx_flush(exec_ctx);
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001600 gpr_mu_lock(&pollset->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001601 }
1602
Craig Tiller557c88c2017-04-05 17:20:18 -07001603 if (worker_hdl) *worker_hdl = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001604
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001605 gpr_tls_set(&g_current_thread_pollset, (intptr_t)0);
1606 gpr_tls_set(&g_current_thread_worker, (intptr_t)0);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001607
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001608 GPR_TIMER_END("pollset_work", 0);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001609
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001610 GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
1611 return error;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001612}
1613
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001614static void add_poll_object(grpc_exec_ctx *exec_ctx, poll_obj *bag,
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001615 poll_obj_type bag_type, poll_obj *item,
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001616 poll_obj_type item_type) {
1617 GPR_TIMER_BEGIN("add_poll_object", 0);
1618
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001619#ifdef PO_DEBUG
1620 GPR_ASSERT(item->obj_type == item_type);
1621 GPR_ASSERT(bag->obj_type == bag_type);
1622#endif
Craig Tiller57726ca2016-09-12 11:59:45 -07001623
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001624 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001625 polling_island *pi_new = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001626
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001627 gpr_mu_lock(&bag->mu);
1628 gpr_mu_lock(&item->mu);
1629
Craig Tiller7212c232016-07-06 13:11:09 -07001630retry:
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001631 /*
1632 * 1) If item->pi and bag->pi are both non-NULL and equal, do nothing
1633 * 2) If item->pi and bag->pi are both NULL, create a new polling island (with
1634 * a refcount of 2) and point item->pi and bag->pi to the new island
1635 * 3) If exactly one of item->pi or bag->pi is NULL, update it to point to
1636 * the other's non-NULL pi
1637 * 4) Finally if item->pi and bag-pi are non-NULL and not-equal, merge the
1638 * polling islands and update item->pi and bag->pi to point to the new
1639 * island
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001640 */
Craig Tiller8e8027b2016-07-07 10:42:09 -07001641
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001642 /* Early out if we are trying to add an 'fd' to a 'bag' but the fd is already
1643 * orphaned */
1644 if (item_type == POLL_OBJ_FD && (FD_FROM_PO(item))->orphaned) {
1645 gpr_mu_unlock(&item->mu);
1646 gpr_mu_unlock(&bag->mu);
Craig Tiller42ac6db2016-07-06 17:13:56 -07001647 return;
1648 }
1649
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001650 if (item->pi == bag->pi) {
1651 pi_new = item->pi;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001652 if (pi_new == NULL) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001653 /* GPR_ASSERT(item->pi == bag->pi == NULL) */
Sree Kuchibhotlaa129adf2016-10-26 16:44:44 -07001654
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001655 /* If we are adding an fd to a bag (i.e pollset or pollset_set), then
1656 * we need to do some extra work to make TSAN happy */
1657 if (item_type == POLL_OBJ_FD) {
1658 /* Unlock before creating a new polling island: the polling island will
1659 create a workqueue which creates a file descriptor, and holding an fd
1660 lock here can eventually cause a loop to appear to TSAN (making it
1661 unhappy). We don't think it's a real loop (there's an epoch point
1662 where that loop possibility disappears), but the advantages of
1663 keeping TSAN happy outweigh any performance advantage we might have
1664 by keeping the lock held. */
1665 gpr_mu_unlock(&item->mu);
1666 pi_new = polling_island_create(exec_ctx, FD_FROM_PO(item), &error);
1667 gpr_mu_lock(&item->mu);
Sree Kuchibhotlaa129adf2016-10-26 16:44:44 -07001668
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001669 /* Need to reverify any assumptions made between the initial lock and
1670 getting to this branch: if they've changed, we need to throw away our
1671 work and figure things out again. */
1672 if (item->pi != NULL) {
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001673 GRPC_POLLING_TRACE(
1674 "add_poll_object: Raced creating new polling island. pi_new: %p "
1675 "(fd: %d, %s: %p)",
1676 (void *)pi_new, FD_FROM_PO(item)->fd, poll_obj_string(bag_type),
1677 (void *)bag);
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001678 /* No need to lock 'pi_new' here since this is a new polling island
Sree Kuchibhotla8214b872017-02-23 12:49:48 -08001679 and no one has a reference to it yet */
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001680 polling_island_remove_all_fds_locked(pi_new, true, &error);
1681
1682 /* Ref and unref so that the polling island gets deleted during unref
1683 */
1684 PI_ADD_REF(pi_new, "dance_of_destruction");
1685 PI_UNREF(exec_ctx, pi_new, "dance_of_destruction");
1686 goto retry;
1687 }
Craig Tiller27da6422016-07-06 13:14:46 -07001688 } else {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001689 pi_new = polling_island_create(exec_ctx, NULL, &error);
Craig Tiller7212c232016-07-06 13:11:09 -07001690 }
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001691
1692 GRPC_POLLING_TRACE(
1693 "add_poll_object: Created new polling island. pi_new: %p (%s: %p, "
1694 "%s: %p)",
1695 (void *)pi_new, poll_obj_string(item_type), (void *)item,
1696 poll_obj_string(bag_type), (void *)bag);
1697 } else {
1698 GRPC_POLLING_TRACE(
1699 "add_poll_object: Same polling island. pi: %p (%s, %s)",
1700 (void *)pi_new, poll_obj_string(item_type),
1701 poll_obj_string(bag_type));
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001702 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001703 } else if (item->pi == NULL) {
1704 /* GPR_ASSERT(bag->pi != NULL) */
1705 /* Make pi_new point to latest pi*/
1706 pi_new = polling_island_lock(bag->pi);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001707
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001708 if (item_type == POLL_OBJ_FD) {
1709 grpc_fd *fd = FD_FROM_PO(item);
1710 polling_island_add_fds_locked(pi_new, &fd, 1, true, &error);
1711 }
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001712
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001713 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001714 GRPC_POLLING_TRACE(
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001715 "add_poll_obj: item->pi was NULL. pi_new: %p (item(%s): %p, "
1716 "bag(%s): %p)",
1717 (void *)pi_new, poll_obj_string(item_type), (void *)item,
1718 poll_obj_string(bag_type), (void *)bag);
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001719 } else if (bag->pi == NULL) {
1720 /* GPR_ASSERT(item->pi != NULL) */
1721 /* Make pi_new to point to latest pi */
1722 pi_new = polling_island_lock(item->pi);
1723 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001724 GRPC_POLLING_TRACE(
1725 "add_poll_obj: bag->pi was NULL. pi_new: %p (item(%s): %p, "
1726 "bag(%s): %p)",
1727 (void *)pi_new, poll_obj_string(item_type), (void *)item,
1728 poll_obj_string(bag_type), (void *)bag);
Sree Kuchibhotla5098f912016-05-31 10:58:17 -07001729 } else {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001730 pi_new = polling_island_merge(item->pi, bag->pi, &error);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001731 GRPC_POLLING_TRACE(
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001732 "add_poll_obj: polling islands merged. pi_new: %p (item(%s): %p, "
1733 "bag(%s): %p)",
1734 (void *)pi_new, poll_obj_string(item_type), (void *)item,
1735 poll_obj_string(bag_type), (void *)bag);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001736 }
1737
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001738 /* At this point, pi_new is the polling island that both item->pi and bag->pi
1739 MUST be pointing to */
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001740
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001741 if (item->pi != pi_new) {
1742 PI_ADD_REF(pi_new, poll_obj_string(item_type));
1743 if (item->pi != NULL) {
1744 PI_UNREF(exec_ctx, item->pi, poll_obj_string(item_type));
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001745 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001746 item->pi = pi_new;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001747 }
1748
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001749 if (bag->pi != pi_new) {
1750 PI_ADD_REF(pi_new, poll_obj_string(bag_type));
1751 if (bag->pi != NULL) {
1752 PI_UNREF(exec_ctx, bag->pi, poll_obj_string(bag_type));
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001753 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001754 bag->pi = pi_new;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001755 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001756
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001757 gpr_mu_unlock(&item->mu);
1758 gpr_mu_unlock(&bag->mu);
Craig Tiller15007612016-07-06 09:36:16 -07001759
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001760 GRPC_LOG_IF_ERROR("add_poll_object", error);
1761 GPR_TIMER_END("add_poll_object", 0);
1762}
Craig Tiller57726ca2016-09-12 11:59:45 -07001763
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001764static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1765 grpc_fd *fd) {
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001766 add_poll_object(exec_ctx, &pollset->po, POLL_OBJ_POLLSET, &fd->po,
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001767 POLL_OBJ_FD);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001768}
1769
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001770/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001771 * Pollset-set Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001772 */
1773
1774static grpc_pollset_set *pollset_set_create(void) {
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001775 grpc_pollset_set *pss = gpr_malloc(sizeof(*pss));
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001776 gpr_mu_init(&pss->po.mu);
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001777 pss->po.pi = NULL;
1778#ifdef PO_DEBUG
1779 pss->po.obj_type = POLL_OBJ_POLLSET_SET;
1780#endif
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001781 return pss;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001782}
1783
Craig Tiller9e5ac1b2017-02-14 22:25:50 -08001784static void pollset_set_destroy(grpc_exec_ctx *exec_ctx,
1785 grpc_pollset_set *pss) {
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001786 gpr_mu_destroy(&pss->po.mu);
1787
1788 if (pss->po.pi != NULL) {
Craig Tiller9e5ac1b2017-02-14 22:25:50 -08001789 PI_UNREF(exec_ctx, pss->po.pi, "pss_destroy");
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001790 }
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001791
1792 gpr_free(pss);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001793}
1794
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001795static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
1796 grpc_fd *fd) {
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001797 add_poll_object(exec_ctx, &pss->po, POLL_OBJ_POLLSET_SET, &fd->po,
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001798 POLL_OBJ_FD);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001799}
1800
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001801static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
1802 grpc_fd *fd) {
1803 /* Nothing to do */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001804}
1805
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001806static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001807 grpc_pollset_set *pss, grpc_pollset *ps) {
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001808 add_poll_object(exec_ctx, &pss->po, POLL_OBJ_POLLSET_SET, &ps->po,
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001809 POLL_OBJ_POLLSET);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001810}
1811
1812static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001813 grpc_pollset_set *pss, grpc_pollset *ps) {
1814 /* Nothing to do */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001815}
1816
1817static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
1818 grpc_pollset_set *bag,
1819 grpc_pollset_set *item) {
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001820 add_poll_object(exec_ctx, &bag->po, POLL_OBJ_POLLSET_SET, &item->po,
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001821 POLL_OBJ_POLLSET_SET);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001822}
1823
1824static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
1825 grpc_pollset_set *bag,
1826 grpc_pollset_set *item) {
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001827 /* Nothing to do */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001828}
1829
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001830/* Test helper functions
1831 * */
1832void *grpc_fd_get_polling_island(grpc_fd *fd) {
1833 polling_island *pi;
1834
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001835 gpr_mu_lock(&fd->po.mu);
1836 pi = fd->po.pi;
1837 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001838
1839 return pi;
1840}
1841
1842void *grpc_pollset_get_polling_island(grpc_pollset *ps) {
1843 polling_island *pi;
1844
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001845 gpr_mu_lock(&ps->po.mu);
1846 pi = ps->po.pi;
1847 gpr_mu_unlock(&ps->po.mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001848
1849 return pi;
1850}
1851
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001852bool grpc_are_polling_islands_equal(void *p, void *q) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001853 polling_island *p1 = p;
1854 polling_island *p2 = q;
1855
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001856 /* Note: polling_island_lock_pair() may change p1 and p2 to point to the
1857 latest polling islands in their respective linked lists */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001858 polling_island_lock_pair(&p1, &p2);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001859 polling_island_unlock_pair(p1, p2);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001860
1861 return p1 == p2;
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001862}
1863
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001864/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001865 * Event engine binding
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001866 */
1867
1868static void shutdown_engine(void) {
1869 fd_global_shutdown();
1870 pollset_global_shutdown();
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001871 polling_island_global_shutdown();
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001872}
1873
1874static const grpc_event_engine_vtable vtable = {
1875 .pollset_size = sizeof(grpc_pollset),
1876
1877 .fd_create = fd_create,
1878 .fd_wrapped_fd = fd_wrapped_fd,
1879 .fd_orphan = fd_orphan,
1880 .fd_shutdown = fd_shutdown,
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001881 .fd_is_shutdown = fd_is_shutdown,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001882 .fd_notify_on_read = fd_notify_on_read,
1883 .fd_notify_on_write = fd_notify_on_write,
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001884 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
Craig Tiller70bd4832016-06-30 14:20:46 -07001885 .fd_get_workqueue = fd_get_workqueue,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001886
1887 .pollset_init = pollset_init,
1888 .pollset_shutdown = pollset_shutdown,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001889 .pollset_destroy = pollset_destroy,
1890 .pollset_work = pollset_work,
1891 .pollset_kick = pollset_kick,
1892 .pollset_add_fd = pollset_add_fd,
1893
1894 .pollset_set_create = pollset_set_create,
1895 .pollset_set_destroy = pollset_set_destroy,
1896 .pollset_set_add_pollset = pollset_set_add_pollset,
1897 .pollset_set_del_pollset = pollset_set_del_pollset,
1898 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
1899 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
1900 .pollset_set_add_fd = pollset_set_add_fd,
1901 .pollset_set_del_fd = pollset_set_del_fd,
1902
Craig Tillerd8a3c042016-09-09 12:42:37 -07001903 .workqueue_ref = workqueue_ref,
1904 .workqueue_unref = workqueue_unref,
Craig Tiller91031da2016-12-28 15:44:25 -08001905 .workqueue_scheduler = workqueue_scheduler,
Craig Tillerd8a3c042016-09-09 12:42:37 -07001906
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001907 .shutdown_engine = shutdown_engine,
1908};
1909
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001910/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
1911 * Create a dummy epoll_fd to make sure epoll support is available */
1912static bool is_epoll_available() {
1913 int fd = epoll_create1(EPOLL_CLOEXEC);
1914 if (fd < 0) {
1915 gpr_log(
1916 GPR_ERROR,
1917 "epoll_create1 failed with error: %d. Not using epoll polling engine",
1918 fd);
1919 return false;
1920 }
1921 close(fd);
1922 return true;
1923}
1924
Craig Tillerf8382b82017-04-27 15:09:48 -07001925const grpc_event_engine_vtable *grpc_init_epollsig_linux(
1926 bool explicit_request) {
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001927 /* If use of signals is disabled, we cannot use epoll engine*/
1928 if (is_grpc_wakeup_signal_initialized && grpc_wakeup_signal < 0) {
1929 return NULL;
1930 }
1931
Ken Paysoncd7d0472016-10-11 12:24:20 -07001932 if (!grpc_has_wakeup_fd()) {
Ken Paysonbc544be2016-10-06 19:23:47 -07001933 return NULL;
1934 }
1935
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001936 if (!is_epoll_available()) {
1937 return NULL;
1938 }
1939
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001940 if (!is_grpc_wakeup_signal_initialized) {
Craig Tillerbc0ab082017-05-05 10:42:44 -07001941 /* TODO(ctiller): when other epoll engines are ready, remove the true || to
1942 * force this to be explitly chosen if needed */
Craig Tiller924353a2017-05-05 17:36:31 +00001943 if (true || explicit_request) {
Craig Tillerf8382b82017-04-27 15:09:48 -07001944 grpc_use_signal(SIGRTMIN + 6);
1945 } else {
1946 return NULL;
1947 }
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001948 }
1949
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001950 fd_global_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001951
1952 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1953 return NULL;
1954 }
1955
1956 if (!GRPC_LOG_IF_ERROR("polling_island_global_init",
1957 polling_island_global_init())) {
1958 return NULL;
1959 }
1960
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001961 return &vtable;
1962}
1963
murgatroid99623dd4f2016-08-08 17:31:27 -07001964#else /* defined(GRPC_LINUX_EPOLL) */
1965#if defined(GRPC_POSIX_SOCKET)
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001966#include "src/core/lib/iomgr/ev_posix.h"
murgatroid99623dd4f2016-08-08 17:31:27 -07001967/* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001968 * NULL */
Craig Tillerf8382b82017-04-27 15:09:48 -07001969const grpc_event_engine_vtable *grpc_init_epollsig_linux(
1970 bool explicit_request) {
1971 return NULL;
1972}
murgatroid99623dd4f2016-08-08 17:31:27 -07001973#endif /* defined(GRPC_POSIX_SOCKET) */
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001974
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001975void grpc_use_signal(int signum) {}
murgatroid99623dd4f2016-08-08 17:31:27 -07001976#endif /* !defined(GRPC_LINUX_EPOLL) */