blob: 328b5b297c103788b2e659303e63996b90bda5fe [file] [log] [blame]
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2016 gRPC authors.
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070016 *
17 */
18
murgatroid9954070892016-08-08 17:01:18 -070019#include "src/core/lib/iomgr/port.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070020
Sree Kuchibhotla76a07952016-06-22 15:09:06 -070021/* This polling engine is only relevant on linux kernels supporting epoll() */
murgatroid99623dd4f2016-08-08 17:31:27 -070022#ifdef GRPC_LINUX_EPOLL
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070023
Craig Tiller4509c472017-04-27 19:05:13 +000024#include "src/core/lib/iomgr/ev_epollsig_linux.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070025
26#include <assert.h>
27#include <errno.h>
28#include <poll.h>
Sree Kuchibhotla4998e302016-08-16 07:26:31 -070029#include <pthread.h>
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070030#include <signal.h>
31#include <string.h>
32#include <sys/epoll.h>
33#include <sys/socket.h>
34#include <unistd.h>
35
36#include <grpc/support/alloc.h>
37#include <grpc/support/log.h>
38#include <grpc/support/string_util.h>
39#include <grpc/support/tls.h>
40#include <grpc/support/useful.h>
41
42#include "src/core/lib/iomgr/ev_posix.h"
43#include "src/core/lib/iomgr/iomgr_internal.h"
Craig Tiller376887d2017-04-06 08:27:03 -070044#include "src/core/lib/iomgr/lockfree_event.h"
Craig Tiller185f6c92017-03-17 08:33:19 -070045#include "src/core/lib/iomgr/timer.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070046#include "src/core/lib/iomgr/wakeup_fd_posix.h"
Craig Tillerb39307d2016-06-30 15:39:13 -070047#include "src/core/lib/iomgr/workqueue.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070048#include "src/core/lib/profiling/timers.h"
49#include "src/core/lib/support/block_annotate.h"
50
Craig Tillere24b24d2017-04-06 16:05:45 -070051#define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1)
52
Craig Tiller2d1e8cd2017-05-17 12:41:44 -070053#define GRPC_POLLING_TRACE(...) \
Craig Tillerbc0ab082017-05-05 10:42:44 -070054 if (GRPC_TRACER_ON(grpc_polling_trace)) { \
Craig Tiller2d1e8cd2017-05-17 12:41:44 -070055 gpr_log(GPR_INFO, __VA_ARGS__); \
Sree Kuchibhotla1e776682016-06-28 14:09:26 -070056 }
57
Sree Kuchibhotla82d73412017-02-09 18:27:45 -080058/* Uncomment the following to enable extra checks on poll_object operations */
Sree Kuchibhotlae6f516e2016-12-08 12:20:23 -080059/* #define PO_DEBUG */
60
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070061static int grpc_wakeup_signal = -1;
62static bool is_grpc_wakeup_signal_initialized = false;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070063
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070064/* Implements the function defined in grpc_posix.h. This function might be
65 * called before even calling grpc_init() to set either a different signal to
66 * use. If signum == -1, then the use of signals is disabled */
67void grpc_use_signal(int signum) {
68 grpc_wakeup_signal = signum;
69 is_grpc_wakeup_signal_initialized = true;
70
71 if (grpc_wakeup_signal < 0) {
72 gpr_log(GPR_INFO,
73 "Use of signals is disabled. Epoll engine will not be used");
74 } else {
75 gpr_log(GPR_INFO, "epoll engine will be using signal: %d",
76 grpc_wakeup_signal);
77 }
78}
79
80struct polling_island;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070081
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -080082typedef enum {
83 POLL_OBJ_FD,
84 POLL_OBJ_POLLSET,
85 POLL_OBJ_POLLSET_SET
86} poll_obj_type;
87
88typedef struct poll_obj {
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -080089#ifdef PO_DEBUG
90 poll_obj_type obj_type;
91#endif
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -080092 gpr_mu mu;
93 struct polling_island *pi;
94} poll_obj;
95
96const char *poll_obj_string(poll_obj_type po_type) {
97 switch (po_type) {
98 case POLL_OBJ_FD:
99 return "fd";
100 case POLL_OBJ_POLLSET:
101 return "pollset";
102 case POLL_OBJ_POLLSET_SET:
103 return "pollset_set";
104 }
105
106 GPR_UNREACHABLE_CODE(return "UNKNOWN");
107}
108
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700109/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700110 * Fd Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700111 */
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800112
113#define FD_FROM_PO(po) ((grpc_fd *)(po))
114
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700115struct grpc_fd {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800116 poll_obj po;
117
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700118 int fd;
119 /* refst format:
Sree Kuchibhotla5098f912016-05-31 10:58:17 -0700120 bit 0 : 1=Active / 0=Orphaned
121 bits 1-n : refcount
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700122 Ref/Unref by two to avoid altering the orphaned bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700123 gpr_atm refst;
124
Sree Kuchibhotla99983382017-02-12 17:03:27 -0800125 /* The fd is either closed or we relinquished control of it. In either
126 cases, this indicates that the 'fd' on this structure is no longer
127 valid */
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700128 bool orphaned;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700129
Sree Kuchibhotla82d73412017-02-09 18:27:45 -0800130 gpr_atm read_closure;
131 gpr_atm write_closure;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700132
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700133 struct grpc_fd *freelist_next;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700134 grpc_closure *on_done_closure;
135
Sree Kuchibhotla82d73412017-02-09 18:27:45 -0800136 /* The pollset that last noticed that the fd is readable. The actual type
137 * stored in this is (grpc_pollset *) */
138 gpr_atm read_notifier_pollset;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700139
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700140 grpc_iomgr_object iomgr_object;
141};
142
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700143/* Reference counting for fds */
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700144// #define GRPC_FD_REF_COUNT_DEBUG
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700145#ifdef GRPC_FD_REF_COUNT_DEBUG
146static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
147static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
148 int line);
149#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
150#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
151#else
152static void fd_ref(grpc_fd *fd);
153static void fd_unref(grpc_fd *fd);
154#define GRPC_FD_REF(fd, reason) fd_ref(fd)
155#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
156#endif
157
158static void fd_global_init(void);
159static void fd_global_shutdown(void);
160
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700161/*******************************************************************************
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700162 * Polling island Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700163 */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700164
Craig Tillerd8a3c042016-09-09 12:42:37 -0700165#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700166
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700167#define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__)
Craig Tillerb39307d2016-06-30 15:39:13 -0700168#define PI_UNREF(exec_ctx, p, r) \
169 pi_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__)
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700170
Craig Tillerd8a3c042016-09-09 12:42:37 -0700171#else /* defined(GRPC_WORKQUEUE_REFCOUNT_DEBUG) */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700172
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700173#define PI_ADD_REF(p, r) pi_add_ref((p))
Craig Tillerb39307d2016-06-30 15:39:13 -0700174#define PI_UNREF(exec_ctx, p, r) pi_unref((exec_ctx), (p))
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700175
Yuchen Zeng362ac1b2016-09-13 16:01:31 -0700176#endif /* !defined(GRPC_PI_REF_COUNT_DEBUG) */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700177
Craig Tiller460502e2016-10-13 10:02:08 -0700178/* This is also used as grpc_workqueue (by directly casing it) */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700179typedef struct polling_island {
Craig Tiller91031da2016-12-28 15:44:25 -0800180 grpc_closure_scheduler workqueue_scheduler;
181
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700182 gpr_mu mu;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700183 /* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
184 the refcount.
185 Once the ref count becomes zero, this structure is destroyed which means
186 we should ensure that there is never a scenario where a PI_ADD_REF() is
187 racing with a PI_UNREF() that just made the ref_count zero. */
Craig Tiller15007612016-07-06 09:36:16 -0700188 gpr_atm ref_count;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700189
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700190 /* Pointer to the polling_island this merged into.
191 * merged_to value is only set once in polling_island's lifetime (and that too
192 * only if the island is merged with another island). Because of this, we can
193 * use gpr_atm type here so that we can do atomic access on this and reduce
194 * lock contention on 'mu' mutex.
195 *
196 * Note that if this field is not NULL (i.e not 0), all the remaining fields
197 * (except mu and ref_count) are invalid and must be ignored. */
198 gpr_atm merged_to;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700199
Craig Tiller460502e2016-10-13 10:02:08 -0700200 /* Number of threads currently polling on this island */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700201 gpr_atm poller_count;
Craig Tiller460502e2016-10-13 10:02:08 -0700202 /* Mutex guarding the read end of the workqueue (must be held to pop from
203 * workqueue_items) */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700204 gpr_mu workqueue_read_mu;
Craig Tiller460502e2016-10-13 10:02:08 -0700205 /* Queue of closures to be executed */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700206 gpr_mpscq workqueue_items;
Craig Tiller460502e2016-10-13 10:02:08 -0700207 /* Count of items in workqueue_items */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700208 gpr_atm workqueue_item_count;
Craig Tiller460502e2016-10-13 10:02:08 -0700209 /* Wakeup fd used to wake pollers to check the contents of workqueue_items */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700210 grpc_wakeup_fd workqueue_wakeup_fd;
Craig Tillerb39307d2016-06-30 15:39:13 -0700211
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700212 /* The fd of the underlying epoll set */
213 int epoll_fd;
214
215 /* The file descriptors in the epoll set */
216 size_t fd_cnt;
217 size_t fd_capacity;
218 grpc_fd **fds;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700219} polling_island;
220
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700221/*******************************************************************************
222 * Pollset Declarations
223 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700224struct grpc_pollset_worker {
Sree Kuchibhotla34217242016-06-29 00:19:07 -0700225 /* Thread id of this worker */
226 pthread_t pt_id;
227
228 /* Used to prevent a worker from getting kicked multiple times */
229 gpr_atm is_kicked;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700230 struct grpc_pollset_worker *next;
231 struct grpc_pollset_worker *prev;
232};
233
234struct grpc_pollset {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800235 poll_obj po;
236
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700237 grpc_pollset_worker root_worker;
238 bool kicked_without_pollers;
239
240 bool shutting_down; /* Is the pollset shutting down ? */
241 bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
242 grpc_closure *shutdown_done; /* Called after after shutdown is complete */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700243};
244
245/*******************************************************************************
246 * Pollset-set Declarations
247 */
248struct grpc_pollset_set {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800249 poll_obj po;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700250};
251
252/*******************************************************************************
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700253 * Common helpers
254 */
255
Craig Tillerf975f742016-07-01 14:56:27 -0700256static bool append_error(grpc_error **composite, grpc_error *error,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700257 const char *desc) {
Craig Tillerf975f742016-07-01 14:56:27 -0700258 if (error == GRPC_ERROR_NONE) return true;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700259 if (*composite == GRPC_ERROR_NONE) {
Noah Eisen3005ce82017-03-14 13:38:41 -0700260 *composite = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700261 }
262 *composite = grpc_error_add_child(*composite, error);
Craig Tillerf975f742016-07-01 14:56:27 -0700263 return false;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700264}
265
266/*******************************************************************************
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700267 * Polling island Definitions
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700268 */
269
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700270/* The wakeup fd that is used to wake up all threads in a Polling island. This
271 is useful in the polling island merge operation where we need to wakeup all
272 the threads currently polling the smaller polling island (so that they can
273 start polling the new/merged polling island)
274
275 NOTE: This fd is initialized to be readable and MUST NOT be consumed i.e the
276 threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
277static grpc_wakeup_fd polling_island_wakeup_fd;
278
Craig Tiller2e620132016-10-10 15:27:44 -0700279/* The polling island being polled right now.
280 See comments in workqueue_maybe_wakeup for why this is tracked. */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700281static __thread polling_island *g_current_thread_polling_island;
282
Craig Tillerb39307d2016-06-30 15:39:13 -0700283/* Forward declaration */
Craig Tiller2b49ea92016-07-01 13:21:27 -0700284static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi);
Craig Tiller91031da2016-12-28 15:44:25 -0800285static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
286 grpc_error *error);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700287
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700288#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700289/* Currently TSAN may incorrectly flag data races between epoll_ctl and
290 epoll_wait for any grpc_fd structs that are added to the epoll set via
291 epoll_ctl and are returned (within a very short window) via epoll_wait().
292
293 To work-around this race, we establish a happens-before relation between
294 the code just-before epoll_ctl() and the code after epoll_wait() by using
295 this atomic */
296gpr_atm g_epoll_sync;
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700297#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700298
Craig Tiller91031da2016-12-28 15:44:25 -0800299static const grpc_closure_scheduler_vtable workqueue_scheduler_vtable = {
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800300 workqueue_enqueue, workqueue_enqueue, "workqueue"};
Craig Tiller91031da2016-12-28 15:44:25 -0800301
Craig Tillerb39307d2016-06-30 15:39:13 -0700302static void pi_add_ref(polling_island *pi);
303static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700304
Craig Tillerd8a3c042016-09-09 12:42:37 -0700305#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
Craig Tillera10b0b12016-09-09 16:20:07 -0700306static void pi_add_ref_dbg(polling_island *pi, const char *reason,
307 const char *file, int line) {
Craig Tiller15007612016-07-06 09:36:16 -0700308 long old_cnt = gpr_atm_acq_load(&pi->ref_count);
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700309 pi_add_ref(pi);
310 gpr_log(GPR_DEBUG, "Add ref pi: %p, old: %ld -> new:%ld (%s) - (%s, %d)",
311 (void *)pi, old_cnt, old_cnt + 1, reason, file, line);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700312}
313
Craig Tillerb39307d2016-06-30 15:39:13 -0700314static void pi_unref_dbg(grpc_exec_ctx *exec_ctx, polling_island *pi,
Craig Tillera10b0b12016-09-09 16:20:07 -0700315 const char *reason, const char *file, int line) {
Craig Tiller15007612016-07-06 09:36:16 -0700316 long old_cnt = gpr_atm_acq_load(&pi->ref_count);
Craig Tillerb39307d2016-06-30 15:39:13 -0700317 pi_unref(exec_ctx, pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700318 gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700319 (void *)pi, old_cnt, (old_cnt - 1), reason, file, line);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700320}
Craig Tillerd8a3c042016-09-09 12:42:37 -0700321
322static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
323 const char *file, int line,
324 const char *reason) {
325 if (workqueue != NULL) {
Craig Tillera10b0b12016-09-09 16:20:07 -0700326 pi_add_ref_dbg((polling_island *)workqueue, reason, file, line);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700327 }
328 return workqueue;
329}
330
331static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
332 const char *file, int line, const char *reason) {
333 if (workqueue != NULL) {
Craig Tillera10b0b12016-09-09 16:20:07 -0700334 pi_unref_dbg(exec_ctx, (polling_island *)workqueue, reason, file, line);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700335 }
336}
337#else
338static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
339 if (workqueue != NULL) {
340 pi_add_ref((polling_island *)workqueue);
341 }
342 return workqueue;
343}
344
345static void workqueue_unref(grpc_exec_ctx *exec_ctx,
346 grpc_workqueue *workqueue) {
347 if (workqueue != NULL) {
348 pi_unref(exec_ctx, (polling_island *)workqueue);
349 }
350}
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700351#endif
352
Craig Tiller15007612016-07-06 09:36:16 -0700353static void pi_add_ref(polling_island *pi) {
354 gpr_atm_no_barrier_fetch_add(&pi->ref_count, 1);
355}
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700356
Craig Tillerb39307d2016-06-30 15:39:13 -0700357static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi) {
Craig Tillerd8a3c042016-09-09 12:42:37 -0700358 /* If ref count went to zero, delete the polling island.
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700359 Note that this deletion not be done under a lock. Once the ref count goes
360 to zero, we are guaranteed that no one else holds a reference to the
361 polling island (and that there is no racing pi_add_ref() call either).
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700362
363 Also, if we are deleting the polling island and the merged_to field is
364 non-empty, we should remove a ref to the merged_to polling island
365 */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700366 if (1 == gpr_atm_full_fetch_add(&pi->ref_count, -1)) {
367 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
368 polling_island_delete(exec_ctx, pi);
369 if (next != NULL) {
370 PI_UNREF(exec_ctx, next, "pi_delete"); /* Recursive call */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700371 }
372 }
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700373}
374
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700375/* The caller is expected to hold pi->mu lock before calling this function */
376static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700377 size_t fd_count, bool add_fd_refs,
378 grpc_error **error) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700379 int err;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700380 size_t i;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700381 struct epoll_event ev;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700382 char *err_msg;
383 const char *err_desc = "polling_island_add_fds";
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700384
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700385#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700386 /* See the definition of g_epoll_sync for more context */
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700387 gpr_atm_rel_store(&g_epoll_sync, (gpr_atm)0);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700388#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700389
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700390 for (i = 0; i < fd_count; i++) {
391 ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
392 ev.data.ptr = fds[i];
393 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700394
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700395 if (err < 0) {
396 if (errno != EEXIST) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700397 gpr_asprintf(
398 &err_msg,
399 "epoll_ctl (epoll_fd: %d) add fd: %d failed with error: %d (%s)",
400 pi->epoll_fd, fds[i]->fd, errno, strerror(errno));
401 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
402 gpr_free(err_msg);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700403 }
404
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700405 continue;
406 }
407
408 if (pi->fd_cnt == pi->fd_capacity) {
409 pi->fd_capacity = GPR_MAX(pi->fd_capacity + 8, pi->fd_cnt * 3 / 2);
410 pi->fds = gpr_realloc(pi->fds, sizeof(grpc_fd *) * pi->fd_capacity);
411 }
412
413 pi->fds[pi->fd_cnt++] = fds[i];
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700414 if (add_fd_refs) {
415 GRPC_FD_REF(fds[i], "polling_island");
416 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700417 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700418}
419
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700420/* The caller is expected to hold pi->mu before calling this */
421static void polling_island_add_wakeup_fd_locked(polling_island *pi,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700422 grpc_wakeup_fd *wakeup_fd,
423 grpc_error **error) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700424 struct epoll_event ev;
425 int err;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700426 char *err_msg;
427 const char *err_desc = "polling_island_add_wakeup_fd";
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700428
429 ev.events = (uint32_t)(EPOLLIN | EPOLLET);
430 ev.data.ptr = wakeup_fd;
431 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD,
432 GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), &ev);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700433 if (err < 0 && errno != EEXIST) {
434 gpr_asprintf(&err_msg,
435 "epoll_ctl (epoll_fd: %d) add wakeup fd: %d failed with "
436 "error: %d (%s)",
Craig Tillerc3571792017-05-02 12:33:38 -0700437 pi->epoll_fd, GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), errno,
438 strerror(errno));
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700439 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
440 gpr_free(err_msg);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700441 }
442}
443
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700444/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700445static void polling_island_remove_all_fds_locked(polling_island *pi,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700446 bool remove_fd_refs,
447 grpc_error **error) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700448 int err;
449 size_t i;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700450 char *err_msg;
451 const char *err_desc = "polling_island_remove_fds";
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700452
453 for (i = 0; i < pi->fd_cnt; i++) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700454 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, pi->fds[i]->fd, NULL);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700455 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700456 gpr_asprintf(&err_msg,
457 "epoll_ctl (epoll_fd: %d) delete fds[%zu]: %d failed with "
458 "error: %d (%s)",
459 pi->epoll_fd, i, pi->fds[i]->fd, errno, strerror(errno));
460 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
461 gpr_free(err_msg);
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700462 }
463
464 if (remove_fd_refs) {
465 GRPC_FD_UNREF(pi->fds[i], "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700466 }
467 }
468
469 pi->fd_cnt = 0;
470}
471
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700472/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700473static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700474 bool is_fd_closed,
475 grpc_error **error) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700476 int err;
477 size_t i;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700478 char *err_msg;
479 const char *err_desc = "polling_island_remove_fd";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700480
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700481 /* If fd is already closed, then it would have been automatically been removed
482 from the epoll set */
483 if (!is_fd_closed) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700484 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
485 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700486 gpr_asprintf(
487 &err_msg,
488 "epoll_ctl (epoll_fd: %d) del fd: %d failed with error: %d (%s)",
489 pi->epoll_fd, fd->fd, errno, strerror(errno));
490 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
491 gpr_free(err_msg);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700492 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700493 }
494
495 for (i = 0; i < pi->fd_cnt; i++) {
496 if (pi->fds[i] == fd) {
497 pi->fds[i] = pi->fds[--pi->fd_cnt];
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700498 GRPC_FD_UNREF(fd, "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700499 break;
500 }
501 }
502}
503
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700504/* Might return NULL in case of an error */
Craig Tillerb39307d2016-06-30 15:39:13 -0700505static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
506 grpc_fd *initial_fd,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700507 grpc_error **error) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700508 polling_island *pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700509 const char *err_desc = "polling_island_create";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700510
Craig Tillerb39307d2016-06-30 15:39:13 -0700511 *error = GRPC_ERROR_NONE;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700512
Craig Tillerb39307d2016-06-30 15:39:13 -0700513 pi = gpr_malloc(sizeof(*pi));
Craig Tiller91031da2016-12-28 15:44:25 -0800514 pi->workqueue_scheduler.vtable = &workqueue_scheduler_vtable;
Craig Tillerb39307d2016-06-30 15:39:13 -0700515 gpr_mu_init(&pi->mu);
516 pi->fd_cnt = 0;
517 pi->fd_capacity = 0;
518 pi->fds = NULL;
519 pi->epoll_fd = -1;
Craig Tillerd8a3c042016-09-09 12:42:37 -0700520
521 gpr_mu_init(&pi->workqueue_read_mu);
522 gpr_mpscq_init(&pi->workqueue_items);
523 gpr_atm_rel_store(&pi->workqueue_item_count, 0);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700524
Craig Tiller15007612016-07-06 09:36:16 -0700525 gpr_atm_rel_store(&pi->ref_count, 0);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700526 gpr_atm_rel_store(&pi->poller_count, 0);
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700527 gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700528
Craig Tillerd8a3c042016-09-09 12:42:37 -0700529 if (!append_error(error, grpc_wakeup_fd_init(&pi->workqueue_wakeup_fd),
530 err_desc)) {
531 goto done;
532 }
533
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700534 pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700535
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700536 if (pi->epoll_fd < 0) {
Craig Tillerb39307d2016-06-30 15:39:13 -0700537 append_error(error, GRPC_OS_ERROR(errno, "epoll_create1"), err_desc);
538 goto done;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700539 }
540
Craig Tillerd8a3c042016-09-09 12:42:37 -0700541 polling_island_add_wakeup_fd_locked(pi, &pi->workqueue_wakeup_fd, error);
Craig Tillerb39307d2016-06-30 15:39:13 -0700542
543 if (initial_fd != NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -0700544 polling_island_add_fds_locked(pi, &initial_fd, 1, true, error);
Craig Tillerb39307d2016-06-30 15:39:13 -0700545 }
546
Craig Tillerb39307d2016-06-30 15:39:13 -0700547done:
548 if (*error != GRPC_ERROR_NONE) {
Craig Tiller0a06cd72016-07-14 13:21:24 -0700549 polling_island_delete(exec_ctx, pi);
Craig Tillerb39307d2016-06-30 15:39:13 -0700550 pi = NULL;
551 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700552 return pi;
553}
554
Craig Tillerb39307d2016-06-30 15:39:13 -0700555static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700556 GPR_ASSERT(pi->fd_cnt == 0);
557
Craig Tiller0a06cd72016-07-14 13:21:24 -0700558 if (pi->epoll_fd >= 0) {
559 close(pi->epoll_fd);
560 }
Craig Tillerd8a3c042016-09-09 12:42:37 -0700561 GPR_ASSERT(gpr_atm_no_barrier_load(&pi->workqueue_item_count) == 0);
562 gpr_mu_destroy(&pi->workqueue_read_mu);
563 gpr_mpscq_destroy(&pi->workqueue_items);
Craig Tillerb39307d2016-06-30 15:39:13 -0700564 gpr_mu_destroy(&pi->mu);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700565 grpc_wakeup_fd_destroy(&pi->workqueue_wakeup_fd);
Craig Tillerb39307d2016-06-30 15:39:13 -0700566 gpr_free(pi->fds);
567 gpr_free(pi);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700568}
569
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700570/* Attempts to gets the last polling island in the linked list (liked by the
571 * 'merged_to' field). Since this does not lock the polling island, there are no
572 * guarantees that the island returned is the last island */
573static polling_island *polling_island_maybe_get_latest(polling_island *pi) {
574 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
575 while (next != NULL) {
576 pi = next;
577 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
578 }
579
580 return pi;
581}
582
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700583/* Gets the lock on the *latest* polling island i.e the last polling island in
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700584 the linked list (linked by the 'merged_to' field). Call gpr_mu_unlock on the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700585 returned polling island's mu.
586 Usage: To lock/unlock polling island "pi", do the following:
587 polling_island *pi_latest = polling_island_lock(pi);
588 ...
589 ... critical section ..
590 ...
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700591 gpr_mu_unlock(&pi_latest->mu); // NOTE: use pi_latest->mu. NOT pi->mu */
592static polling_island *polling_island_lock(polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700593 polling_island *next = NULL;
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700594
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700595 while (true) {
596 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
597 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700598 /* Looks like 'pi' is the last node in the linked list but unless we check
599 this by holding the pi->mu lock, we cannot be sure (i.e without the
600 pi->mu lock, we don't prevent island merges).
601 To be absolutely sure, check once more by holding the pi->mu lock */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700602 gpr_mu_lock(&pi->mu);
603 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
604 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700605 /* pi is infact the last node and we have the pi->mu lock. we're done */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700606 break;
607 }
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700608
609 /* pi->merged_to is not NULL i.e pi isn't the last node anymore. pi->mu
610 * isn't the lock we are interested in. Continue traversing the list */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700611 gpr_mu_unlock(&pi->mu);
612 }
613
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700614 pi = next;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700615 }
616
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700617 return pi;
618}
619
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700620/* Gets the lock on the *latest* polling islands in the linked lists pointed by
621 *p and *q (and also updates *p and *q to point to the latest polling islands)
622
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700623 This function is needed because calling the following block of code to obtain
624 locks on polling islands (*p and *q) is prone to deadlocks.
625 {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700626 polling_island_lock(*p, true);
627 polling_island_lock(*q, true);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700628 }
629
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700630 Usage/example:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700631 polling_island *p1;
632 polling_island *p2;
633 ..
634 polling_island_lock_pair(&p1, &p2);
635 ..
636 .. Critical section with both p1 and p2 locked
637 ..
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700638 // Release locks: Always call polling_island_unlock_pair() to release locks
639 polling_island_unlock_pair(p1, p2);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700640*/
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700641static void polling_island_lock_pair(polling_island **p, polling_island **q) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700642 polling_island *pi_1 = *p;
643 polling_island *pi_2 = *q;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700644 polling_island *next_1 = NULL;
645 polling_island *next_2 = NULL;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700646
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700647 /* The algorithm is simple:
648 - Go to the last polling islands in the linked lists *pi_1 and *pi_2 (and
649 keep updating pi_1 and pi_2)
650 - Then obtain locks on the islands by following a lock order rule of
651 locking polling_island with lower address first
652 Special case: Before obtaining the locks, check if pi_1 and pi_2 are
653 pointing to the same island. If that is the case, we can just call
654 polling_island_lock()
655 - After obtaining both the locks, double check that the polling islands
656 are still the last polling islands in their respective linked lists
657 (this is because there might have been polling island merges before
658 we got the lock)
659 - If the polling islands are the last islands, we are done. If not,
660 release the locks and continue the process from the first step */
661 while (true) {
662 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
663 while (next_1 != NULL) {
664 pi_1 = next_1;
665 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700666 }
667
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700668 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
669 while (next_2 != NULL) {
670 pi_2 = next_2;
671 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
672 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700673
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700674 if (pi_1 == pi_2) {
675 pi_1 = pi_2 = polling_island_lock(pi_1);
676 break;
677 }
678
679 if (pi_1 < pi_2) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700680 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700681 gpr_mu_lock(&pi_2->mu);
682 } else {
683 gpr_mu_lock(&pi_2->mu);
684 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700685 }
686
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700687 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
688 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
689 if (next_1 == NULL && next_2 == NULL) {
690 break;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700691 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700692
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700693 gpr_mu_unlock(&pi_1->mu);
694 gpr_mu_unlock(&pi_2->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700695 }
696
697 *p = pi_1;
698 *q = pi_2;
699}
700
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700701static void polling_island_unlock_pair(polling_island *p, polling_island *q) {
702 if (p == q) {
703 gpr_mu_unlock(&p->mu);
704 } else {
705 gpr_mu_unlock(&p->mu);
706 gpr_mu_unlock(&q->mu);
707 }
708}
709
Craig Tillerd8a3c042016-09-09 12:42:37 -0700710static void workqueue_maybe_wakeup(polling_island *pi) {
Craig Tiller2e620132016-10-10 15:27:44 -0700711 /* If this thread is the current poller, then it may be that it's about to
712 decrement the current poller count, so we need to look past this thread */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700713 bool is_current_poller = (g_current_thread_polling_island == pi);
714 gpr_atm min_current_pollers_for_wakeup = is_current_poller ? 1 : 0;
715 gpr_atm current_pollers = gpr_atm_no_barrier_load(&pi->poller_count);
Craig Tiller2e620132016-10-10 15:27:44 -0700716 /* Only issue a wakeup if it's likely that some poller could come in and take
717 it right now. Note that since we do an anticipatory mpscq_pop every poll
718 loop, it's ok if we miss the wakeup here, as we'll get the work item when
719 the next poller enters anyway. */
Craig Tiller2d1e8cd2017-05-17 12:41:44 -0700720 if (current_pollers >= min_current_pollers_for_wakeup) {
Craig Tillerd8a3c042016-09-09 12:42:37 -0700721 GRPC_LOG_IF_ERROR("workqueue_wakeup_fd",
722 grpc_wakeup_fd_wakeup(&pi->workqueue_wakeup_fd));
723 }
724}
725
726static void workqueue_move_items_to_parent(polling_island *q) {
727 polling_island *p = (polling_island *)gpr_atm_no_barrier_load(&q->merged_to);
728 if (p == NULL) {
729 return;
730 }
731 gpr_mu_lock(&q->workqueue_read_mu);
732 int num_added = 0;
733 while (gpr_atm_no_barrier_load(&q->workqueue_item_count) > 0) {
734 gpr_mpscq_node *n = gpr_mpscq_pop(&q->workqueue_items);
735 if (n != NULL) {
736 gpr_atm_no_barrier_fetch_add(&q->workqueue_item_count, -1);
737 gpr_atm_no_barrier_fetch_add(&p->workqueue_item_count, 1);
738 gpr_mpscq_push(&p->workqueue_items, n);
739 num_added++;
740 }
741 }
742 gpr_mu_unlock(&q->workqueue_read_mu);
743 if (num_added > 0) {
744 workqueue_maybe_wakeup(p);
745 }
746 workqueue_move_items_to_parent(p);
747}
748
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700749static polling_island *polling_island_merge(polling_island *p,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700750 polling_island *q,
751 grpc_error **error) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700752 /* Get locks on both the polling islands */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700753 polling_island_lock_pair(&p, &q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700754
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700755 if (p != q) {
756 /* Make sure that p points to the polling island with fewer fds than q */
757 if (p->fd_cnt > q->fd_cnt) {
758 GPR_SWAP(polling_island *, p, q);
759 }
760
761 /* Merge p with q i.e move all the fds from p (The one with fewer fds) to q
762 Note that the refcounts on the fds being moved will not change here.
763 This is why the last param in the following two functions is 'false') */
764 polling_island_add_fds_locked(q, p->fds, p->fd_cnt, false, error);
765 polling_island_remove_all_fds_locked(p, false, error);
766
767 /* Wakeup all the pollers (if any) on p so that they pickup this change */
768 polling_island_add_wakeup_fd_locked(p, &polling_island_wakeup_fd, error);
769
770 /* Add the 'merged_to' link from p --> q */
771 gpr_atm_rel_store(&p->merged_to, (gpr_atm)q);
772 PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700773
Harvey Tuchdaa9f452016-11-21 15:42:49 -0500774 workqueue_move_items_to_parent(p);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700775 }
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700776 /* else if p == q, nothing needs to be done */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700777
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700778 polling_island_unlock_pair(p, q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700779
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700780 /* Return the merged polling island (Note that no merge would have happened
781 if p == q which is ok) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700782 return q;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700783}
784
Craig Tiller91031da2016-12-28 15:44:25 -0800785static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
Craig Tillerd8a3c042016-09-09 12:42:37 -0700786 grpc_error *error) {
Craig Tillerd8a3c042016-09-09 12:42:37 -0700787 GPR_TIMER_BEGIN("workqueue.enqueue", 0);
Craig Tiller91031da2016-12-28 15:44:25 -0800788 grpc_workqueue *workqueue = (grpc_workqueue *)closure->scheduler;
Craig Tiller1dc5dbb2016-09-09 17:09:39 -0700789 /* take a ref to the workqueue: otherwise it can happen that whatever events
790 * this kicks off ends up destroying the workqueue before this function
791 * completes */
792 GRPC_WORKQUEUE_REF(workqueue, "enqueue");
793 polling_island *pi = (polling_island *)workqueue;
Craig Tillerd8a3c042016-09-09 12:42:37 -0700794 gpr_atm last = gpr_atm_no_barrier_fetch_add(&pi->workqueue_item_count, 1);
795 closure->error_data.error = error;
796 gpr_mpscq_push(&pi->workqueue_items, &closure->next_data.atm_next);
797 if (last == 0) {
798 workqueue_maybe_wakeup(pi);
799 }
Craig Tillerd8a3c042016-09-09 12:42:37 -0700800 workqueue_move_items_to_parent(pi);
Craig Tiller1dc5dbb2016-09-09 17:09:39 -0700801 GRPC_WORKQUEUE_UNREF(exec_ctx, workqueue, "enqueue");
802 GPR_TIMER_END("workqueue.enqueue", 0);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700803}
804
Craig Tiller91031da2016-12-28 15:44:25 -0800805static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) {
806 polling_island *pi = (polling_island *)workqueue;
Craig Tiller801c6cc2017-01-03 08:13:13 -0800807 return workqueue == NULL ? grpc_schedule_on_exec_ctx
808 : &pi->workqueue_scheduler;
Craig Tiller91031da2016-12-28 15:44:25 -0800809}
810
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700811static grpc_error *polling_island_global_init() {
812 grpc_error *error = GRPC_ERROR_NONE;
813
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700814 error = grpc_wakeup_fd_init(&polling_island_wakeup_fd);
815 if (error == GRPC_ERROR_NONE) {
816 error = grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd);
817 }
818
819 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700820}
821
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700822static void polling_island_global_shutdown() {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700823 grpc_wakeup_fd_destroy(&polling_island_wakeup_fd);
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700824}
825
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700826/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700827 * Fd Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700828 */
829
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700830/* We need to keep a freelist not because of any concerns of malloc performance
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700831 * but instead so that implementations with multiple threads in (for example)
832 * epoll_wait deal with the race between pollset removal and incoming poll
833 * notifications.
834 *
835 * The problem is that the poller ultimately holds a reference to this
836 * object, so it is very difficult to know when is safe to free it, at least
837 * without some expensive synchronization.
838 *
839 * If we keep the object freelisted, in the worst case losing this race just
840 * becomes a spurious read notification on a reused fd.
841 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700842
843/* The alarm system needs to be able to wakeup 'some poller' sometimes
844 * (specifically when a new alarm needs to be triggered earlier than the next
845 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
846 * case occurs. */
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700847
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700848static grpc_fd *fd_freelist = NULL;
849static gpr_mu fd_freelist_mu;
850
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700851#ifdef GRPC_FD_REF_COUNT_DEBUG
852#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
853#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
854static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
855 int line) {
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700856 gpr_log(GPR_DEBUG, "FD %d %p ref %d %ld -> %ld [%s; %s:%d]", fd->fd,
857 (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700858 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
859#else
860#define REF_BY(fd, n, reason) ref_by(fd, n)
861#define UNREF_BY(fd, n, reason) unref_by(fd, n)
862static void ref_by(grpc_fd *fd, int n) {
863#endif
864 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
865}
866
867#ifdef GRPC_FD_REF_COUNT_DEBUG
868static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
869 int line) {
870 gpr_atm old;
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700871 gpr_log(GPR_DEBUG, "FD %d %p unref %d %ld -> %ld [%s; %s:%d]", fd->fd,
872 (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700873 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
874#else
875static void unref_by(grpc_fd *fd, int n) {
876 gpr_atm old;
877#endif
878 old = gpr_atm_full_fetch_add(&fd->refst, -n);
879 if (old == n) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700880 /* Add the fd to the freelist */
881 gpr_mu_lock(&fd_freelist_mu);
882 fd->freelist_next = fd_freelist;
883 fd_freelist = fd;
884 grpc_iomgr_unregister_object(&fd->iomgr_object);
Sree Kuchibhotla82d73412017-02-09 18:27:45 -0800885
Craig Tiller376887d2017-04-06 08:27:03 -0700886 grpc_lfev_destroy(&fd->read_closure);
887 grpc_lfev_destroy(&fd->write_closure);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700888
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700889 gpr_mu_unlock(&fd_freelist_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700890 } else {
891 GPR_ASSERT(old > n);
892 }
893}
894
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700895/* Increment refcount by two to avoid changing the orphan bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700896#ifdef GRPC_FD_REF_COUNT_DEBUG
897static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
898 int line) {
899 ref_by(fd, 2, reason, file, line);
900}
901
902static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
903 int line) {
904 unref_by(fd, 2, reason, file, line);
905}
906#else
907static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700908static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
909#endif
910
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700911static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
912
913static void fd_global_shutdown(void) {
914 gpr_mu_lock(&fd_freelist_mu);
915 gpr_mu_unlock(&fd_freelist_mu);
916 while (fd_freelist != NULL) {
917 grpc_fd *fd = fd_freelist;
918 fd_freelist = fd_freelist->freelist_next;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800919 gpr_mu_destroy(&fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700920 gpr_free(fd);
921 }
922 gpr_mu_destroy(&fd_freelist_mu);
923}
924
925static grpc_fd *fd_create(int fd, const char *name) {
926 grpc_fd *new_fd = NULL;
927
928 gpr_mu_lock(&fd_freelist_mu);
929 if (fd_freelist != NULL) {
930 new_fd = fd_freelist;
931 fd_freelist = fd_freelist->freelist_next;
932 }
933 gpr_mu_unlock(&fd_freelist_mu);
934
935 if (new_fd == NULL) {
936 new_fd = gpr_malloc(sizeof(grpc_fd));
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800937 gpr_mu_init(&new_fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700938 }
939
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800940 /* Note: It is not really needed to get the new_fd->po.mu lock here. If this
941 * is a newly created fd (or an fd we got from the freelist), no one else
942 * would be holding a lock to it anyway. */
943 gpr_mu_lock(&new_fd->po.mu);
944 new_fd->po.pi = NULL;
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -0800945#ifdef PO_DEBUG
946 new_fd->po.obj_type = POLL_OBJ_FD;
947#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700948
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700949 gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700950 new_fd->fd = fd;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700951 new_fd->orphaned = false;
Craig Tiller376887d2017-04-06 08:27:03 -0700952 grpc_lfev_init(&new_fd->read_closure);
953 grpc_lfev_init(&new_fd->write_closure);
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -0800954 gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
Sree Kuchibhotla82d73412017-02-09 18:27:45 -0800955
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700956 new_fd->freelist_next = NULL;
957 new_fd->on_done_closure = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700958
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800959 gpr_mu_unlock(&new_fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700960
961 char *fd_name;
962 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
963 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700964#ifdef GRPC_FD_REF_COUNT_DEBUG
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700965 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, (void *)new_fd, fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700966#endif
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700967 gpr_free(fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700968 return new_fd;
969}
970
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700971static int fd_wrapped_fd(grpc_fd *fd) {
972 int ret_fd = -1;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800973 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700974 if (!fd->orphaned) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700975 ret_fd = fd->fd;
976 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800977 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700978
979 return ret_fd;
980}
981
982static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
983 grpc_closure *on_done, int *release_fd,
984 const char *reason) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700985 bool is_fd_closed = false;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700986 grpc_error *error = GRPC_ERROR_NONE;
Craig Tiller15007612016-07-06 09:36:16 -0700987 polling_island *unref_pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700988
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800989 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700990 fd->on_done_closure = on_done;
991
992 /* If release_fd is not NULL, we should be relinquishing control of the file
993 descriptor fd->fd (but we still own the grpc_fd structure). */
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700994 if (release_fd != NULL) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700995 *release_fd = fd->fd;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700996 } else {
997 close(fd->fd);
998 is_fd_closed = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700999 }
1000
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001001 fd->orphaned = true;
1002
1003 /* Remove the active status but keep referenced. We want this grpc_fd struct
1004 to be alive (and not added to freelist) until the end of this function */
1005 REF_BY(fd, 1, reason);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001006
1007 /* Remove the fd from the polling island:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001008 - Get a lock on the latest polling island (i.e the last island in the
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001009 linked list pointed by fd->po.pi). This is the island that
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001010 would actually contain the fd
1011 - Remove the fd from the latest polling island
1012 - Unlock the latest polling island
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001013 - Set fd->po.pi to NULL (but remove the ref on the polling island
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001014 before doing this.) */
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001015 if (fd->po.pi != NULL) {
1016 polling_island *pi_latest = polling_island_lock(fd->po.pi);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001017 polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed, &error);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001018 gpr_mu_unlock(&pi_latest->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001019
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001020 unref_pi = fd->po.pi;
1021 fd->po.pi = NULL;
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001022 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001023
Craig Tiller91031da2016-12-28 15:44:25 -08001024 grpc_closure_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error));
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001025
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001026 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001027 UNREF_BY(fd, 2, reason); /* Drop the reference */
Craig Tiller15007612016-07-06 09:36:16 -07001028 if (unref_pi != NULL) {
Craig Tiller0a06cd72016-07-14 13:21:24 -07001029 /* Unref stale polling island here, outside the fd lock above.
1030 The polling island owns a workqueue which owns an fd, and unreffing
1031 inside the lock can cause an eventual lock loop that makes TSAN very
1032 unhappy. */
Craig Tiller15007612016-07-06 09:36:16 -07001033 PI_UNREF(exec_ctx, unref_pi, "fd_orphan");
1034 }
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001035 GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error));
Yuchen Zenga0399f22016-08-04 17:52:53 -07001036 GRPC_ERROR_UNREF(error);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001037}
1038
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001039static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
1040 grpc_fd *fd) {
Sree Kuchibhotlafb7ced62017-02-15 18:31:57 -08001041 gpr_atm notifier = gpr_atm_acq_load(&fd->read_notifier_pollset);
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001042 return (grpc_pollset *)notifier;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001043}
1044
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001045static bool fd_is_shutdown(grpc_fd *fd) {
Craig Tiller376887d2017-04-06 08:27:03 -07001046 return grpc_lfev_is_shutdown(&fd->read_closure);
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001047}
1048
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001049/* Might be called multiple times */
Craig Tillercda759d2017-01-27 11:37:37 -08001050static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
Craig Tillere16372b2017-04-06 08:51:39 -07001051 if (grpc_lfev_set_shutdown(exec_ctx, &fd->read_closure,
1052 GRPC_ERROR_REF(why))) {
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001053 shutdown(fd->fd, SHUT_RDWR);
Craig Tillere16372b2017-04-06 08:51:39 -07001054 grpc_lfev_set_shutdown(exec_ctx, &fd->write_closure, GRPC_ERROR_REF(why));
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001055 }
Craig Tiller376887d2017-04-06 08:27:03 -07001056 GRPC_ERROR_UNREF(why);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001057}
1058
1059static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1060 grpc_closure *closure) {
Craig Tiller70652142017-04-06 08:31:23 -07001061 grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001062}
1063
1064static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1065 grpc_closure *closure) {
Craig Tiller70652142017-04-06 08:31:23 -07001066 grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001067}
1068
Craig Tillerd6ba6192016-06-30 15:42:41 -07001069static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001070 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001071 grpc_workqueue *workqueue =
1072 GRPC_WORKQUEUE_REF((grpc_workqueue *)fd->po.pi, "fd_get_workqueue");
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001073 gpr_mu_unlock(&fd->po.mu);
Craig Tillerd6ba6192016-06-30 15:42:41 -07001074 return workqueue;
1075}
Craig Tiller70bd4832016-06-30 14:20:46 -07001076
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001077/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001078 * Pollset Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001079 */
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001080GPR_TLS_DECL(g_current_thread_pollset);
1081GPR_TLS_DECL(g_current_thread_worker);
Craig Tiller19196992016-06-27 18:45:56 -07001082static __thread bool g_initialized_sigmask;
1083static __thread sigset_t g_orig_sigmask;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001084
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001085static void sig_handler(int sig_num) {
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001086#ifdef GRPC_EPOLL_DEBUG
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001087 gpr_log(GPR_INFO, "Received signal %d", sig_num);
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -07001088#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001089}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001090
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001091static void poller_kick_init() { signal(grpc_wakeup_signal, sig_handler); }
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001092
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001093/* Global state management */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001094static grpc_error *pollset_global_init(void) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001095 gpr_tls_init(&g_current_thread_pollset);
1096 gpr_tls_init(&g_current_thread_worker);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001097 poller_kick_init();
Craig Tillerc3571792017-05-02 12:33:38 -07001098 return GRPC_ERROR_NONE;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001099}
1100
1101static void pollset_global_shutdown(void) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001102 gpr_tls_destroy(&g_current_thread_pollset);
1103 gpr_tls_destroy(&g_current_thread_worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001104}
1105
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001106static grpc_error *pollset_worker_kick(grpc_pollset_worker *worker) {
1107 grpc_error *err = GRPC_ERROR_NONE;
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001108
1109 /* Kick the worker only if it was not already kicked */
1110 if (gpr_atm_no_barrier_cas(&worker->is_kicked, (gpr_atm)0, (gpr_atm)1)) {
1111 GRPC_POLLING_TRACE(
1112 "pollset_worker_kick: Kicking worker: %p (thread id: %ld)",
Ken Payson975b5102017-03-30 17:38:40 -07001113 (void *)worker, (long int)worker->pt_id);
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001114 int err_num = pthread_kill(worker->pt_id, grpc_wakeup_signal);
1115 if (err_num != 0) {
1116 err = GRPC_OS_ERROR(err_num, "pthread_kill");
1117 }
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001118 }
1119 return err;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001120}
1121
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001122/* Return 1 if the pollset has active threads in pollset_work (pollset must
1123 * be locked) */
1124static int pollset_has_workers(grpc_pollset *p) {
1125 return p->root_worker.next != &p->root_worker;
1126}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001127
1128static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1129 worker->prev->next = worker->next;
1130 worker->next->prev = worker->prev;
1131}
1132
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001133static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
1134 if (pollset_has_workers(p)) {
1135 grpc_pollset_worker *w = p->root_worker.next;
1136 remove_worker(p, w);
1137 return w;
1138 } else {
1139 return NULL;
1140 }
1141}
1142
1143static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1144 worker->next = &p->root_worker;
1145 worker->prev = worker->next->prev;
1146 worker->prev->next = worker->next->prev = worker;
1147}
1148
1149static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1150 worker->prev = &p->root_worker;
1151 worker->next = worker->prev->next;
1152 worker->prev->next = worker->next->prev = worker;
1153}
1154
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001155/* p->mu must be held before calling this function */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001156static grpc_error *pollset_kick(grpc_pollset *p,
1157 grpc_pollset_worker *specific_worker) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001158 GPR_TIMER_BEGIN("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001159 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001160 const char *err_desc = "Kick Failure";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001161 grpc_pollset_worker *worker = specific_worker;
1162 if (worker != NULL) {
1163 if (worker == GRPC_POLLSET_KICK_BROADCAST) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001164 if (pollset_has_workers(p)) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001165 GPR_TIMER_BEGIN("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001166 for (worker = p->root_worker.next; worker != &p->root_worker;
1167 worker = worker->next) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001168 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001169 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001170 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001171 }
Craig Tillera218a062016-06-26 09:58:37 -07001172 GPR_TIMER_END("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001173 } else {
1174 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001175 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001176 } else {
1177 GPR_TIMER_MARK("kicked_specifically", 0);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001178 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001179 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001180 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001181 }
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001182 } else if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)p) {
1183 /* Since worker == NULL, it means that we can kick "any" worker on this
1184 pollset 'p'. If 'p' happens to be the same pollset this thread is
1185 currently polling (i.e in pollset_work() function), then there is no need
1186 to kick any other worker since the current thread can just absorb the
1187 kick. This is the reason why we enter this case only when
1188 g_current_thread_pollset is != p */
1189
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001190 GPR_TIMER_MARK("kick_anonymous", 0);
1191 worker = pop_front_worker(p);
1192 if (worker != NULL) {
1193 GPR_TIMER_MARK("finally_kick", 0);
1194 push_back_worker(p, worker);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001195 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001196 } else {
1197 GPR_TIMER_MARK("kicked_no_pollers", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001198 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001199 }
1200 }
1201
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001202 GPR_TIMER_END("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001203 GRPC_LOG_IF_ERROR("pollset_kick", GRPC_ERROR_REF(error));
1204 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001205}
1206
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001207static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001208 gpr_mu_init(&pollset->po.mu);
1209 *mu = &pollset->po.mu;
1210 pollset->po.pi = NULL;
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001211#ifdef PO_DEBUG
1212 pollset->po.obj_type = POLL_OBJ_POLLSET;
1213#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001214
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001215 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001216 pollset->kicked_without_pollers = false;
1217
1218 pollset->shutting_down = false;
1219 pollset->finish_shutdown_called = false;
1220 pollset->shutdown_done = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001221}
1222
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001223/* Convert a timespec to milliseconds:
1224 - Very small or negative poll times are clamped to zero to do a non-blocking
1225 poll (which becomes spin polling)
1226 - Other small values are rounded up to one millisecond
1227 - Longer than a millisecond polls are rounded up to the next nearest
1228 millisecond to avoid spinning
1229 - Infinite timeouts are converted to -1 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001230static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
1231 gpr_timespec now) {
1232 gpr_timespec timeout;
1233 static const int64_t max_spin_polling_us = 10;
1234 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
1235 return -1;
1236 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001237
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001238 if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
1239 max_spin_polling_us,
1240 GPR_TIMESPAN))) <= 0) {
1241 return 0;
1242 }
1243 timeout = gpr_time_sub(deadline, now);
Craig Tiller799e7e82017-03-27 12:42:34 -07001244 int millis = gpr_time_to_millis(gpr_time_add(
1245 timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
1246 return millis >= 1 ? millis : 1;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001247}
1248
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001249static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1250 grpc_pollset *notifier) {
Craig Tiller70652142017-04-06 08:31:23 -07001251 grpc_lfev_set_ready(exec_ctx, &fd->read_closure);
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001252
Sree Kuchibhotla4db8c822017-02-12 17:07:31 -08001253 /* Note, it is possible that fd_become_readable might be called twice with
Sree Kuchibhotla4c60d0d2017-02-12 17:09:08 -08001254 different 'notifier's when an fd becomes readable and it is in two epoll
1255 sets (This can happen briefly during polling island merges). In such cases
1256 it does not really matter which notifer is set as the read_notifier_pollset
1257 (They would both point to the same polling island anyway) */
Sree Kuchibhotlafb7ced62017-02-15 18:31:57 -08001258 /* Use release store to match with acquire load in fd_get_read_notifier */
1259 gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001260}
1261
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001262static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
Craig Tillere16372b2017-04-06 08:51:39 -07001263 grpc_lfev_set_ready(exec_ctx, &fd->write_closure);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001264}
1265
Craig Tillerb39307d2016-06-30 15:39:13 -07001266static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx,
1267 grpc_pollset *ps, char *reason) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001268 if (ps->po.pi != NULL) {
1269 PI_UNREF(exec_ctx, ps->po.pi, reason);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001270 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001271 ps->po.pi = NULL;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001272}
1273
1274static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
1275 grpc_pollset *pollset) {
1276 /* The pollset cannot have any workers if we are at this stage */
1277 GPR_ASSERT(!pollset_has_workers(pollset));
1278
1279 pollset->finish_shutdown_called = true;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001280
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001281 /* Release the ref and set pollset->po.pi to NULL */
Craig Tillerb39307d2016-06-30 15:39:13 -07001282 pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown");
Craig Tiller91031da2016-12-28 15:44:25 -08001283 grpc_closure_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001284}
1285
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001286/* pollset->po.mu lock must be held by the caller before calling this */
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001287static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1288 grpc_closure *closure) {
1289 GPR_TIMER_BEGIN("pollset_shutdown", 0);
1290 GPR_ASSERT(!pollset->shutting_down);
1291 pollset->shutting_down = true;
1292 pollset->shutdown_done = closure;
1293 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1294
1295 /* If the pollset has any workers, we cannot call finish_shutdown_locked()
1296 because it would release the underlying polling island. In such a case, we
1297 let the last worker call finish_shutdown_locked() from pollset_work() */
1298 if (!pollset_has_workers(pollset)) {
1299 GPR_ASSERT(!pollset->finish_shutdown_called);
1300 GPR_TIMER_MARK("pollset_shutdown.finish_shutdown_locked", 0);
1301 finish_shutdown_locked(exec_ctx, pollset);
1302 }
1303 GPR_TIMER_END("pollset_shutdown", 0);
1304}
1305
1306/* pollset_shutdown is guaranteed to be called before pollset_destroy. So other
1307 * than destroying the mutexes, there is nothing special that needs to be done
1308 * here */
Craig Tillerf8401102017-04-17 09:47:28 -07001309static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001310 GPR_ASSERT(!pollset_has_workers(pollset));
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001311 gpr_mu_destroy(&pollset->po.mu);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001312}
1313
Craig Tillerd8a3c042016-09-09 12:42:37 -07001314static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx,
1315 polling_island *pi) {
1316 if (gpr_mu_trylock(&pi->workqueue_read_mu)) {
1317 gpr_mpscq_node *n = gpr_mpscq_pop(&pi->workqueue_items);
1318 gpr_mu_unlock(&pi->workqueue_read_mu);
1319 if (n != NULL) {
Craig Tiller2d1e8cd2017-05-17 12:41:44 -07001320 gpr_atm remaining =
1321 gpr_atm_full_fetch_add(&pi->workqueue_item_count, -1) - 1;
1322 GRPC_POLLING_TRACE(
1323 "maybe_do_workqueue_work: pi: %p: got closure %p, remaining = "
1324 "%" PRIdPTR,
1325 pi, n, remaining);
1326 if (remaining > 0) {
Craig Tillerd8a3c042016-09-09 12:42:37 -07001327 workqueue_maybe_wakeup(pi);
1328 }
1329 grpc_closure *c = (grpc_closure *)n;
Craig Tiller061ef742016-12-29 10:54:09 -08001330 grpc_error *error = c->error_data.error;
Mark D. Roth43f774e2017-04-04 16:35:37 -07001331#ifndef NDEBUG
1332 c->scheduled = false;
1333#endif
Craig Tiller061ef742016-12-29 10:54:09 -08001334 c->cb(exec_ctx, c->cb_arg, error);
1335 GRPC_ERROR_UNREF(error);
Craig Tillerd8a3c042016-09-09 12:42:37 -07001336 return true;
1337 } else if (gpr_atm_no_barrier_load(&pi->workqueue_item_count) > 0) {
Craig Tiller460502e2016-10-13 10:02:08 -07001338 /* n == NULL might mean there's work but it's not available to be popped
1339 * yet - try to ensure another workqueue wakes up to check shortly if so
1340 */
Craig Tiller2d1e8cd2017-05-17 12:41:44 -07001341 GRPC_POLLING_TRACE(
1342 "maybe_do_workqueue_work: pi: %p: more to do, but not yet", pi);
Craig Tillerd8a3c042016-09-09 12:42:37 -07001343 workqueue_maybe_wakeup(pi);
1344 }
Craig Tiller2d1e8cd2017-05-17 12:41:44 -07001345 } else {
1346 GRPC_POLLING_TRACE("maybe_do_workqueue_work: pi: %p: read already locked",
1347 pi);
Craig Tillerd8a3c042016-09-09 12:42:37 -07001348 }
1349 return false;
1350}
1351
Craig Tiller84ea3412016-09-08 14:57:56 -07001352#define GRPC_EPOLL_MAX_EVENTS 100
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001353/* Note: sig_mask contains the signal mask to use *during* epoll_wait() */
1354static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001355 grpc_pollset *pollset,
1356 grpc_pollset_worker *worker, int timeout_ms,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001357 sigset_t *sig_mask, grpc_error **error) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001358 struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001359 int epoll_fd = -1;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001360 int ep_rv;
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001361 polling_island *pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001362 char *err_msg;
1363 const char *err_desc = "pollset_work_and_unlock";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001364 GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
1365
1366 /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001367 latest polling island pointed by pollset->po.pi
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001368
1369 Since epoll_fd is immutable, we can read it without obtaining the polling
1370 island lock. There is however a possibility that the polling island (from
1371 which we got the epoll_fd) got merged with another island while we are
1372 in this function. This is still okay because in such a case, we will wakeup
1373 right-away from epoll_wait() and pick up the latest polling_island the next
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001374 this function (i.e pollset_work_and_unlock()) is called */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001375
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001376 if (pollset->po.pi == NULL) {
1377 pollset->po.pi = polling_island_create(exec_ctx, NULL, error);
1378 if (pollset->po.pi == NULL) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001379 GPR_TIMER_END("pollset_work_and_unlock", 0);
1380 return; /* Fatal error. We cannot continue */
1381 }
1382
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001383 PI_ADD_REF(pollset->po.pi, "ps");
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001384 GRPC_POLLING_TRACE("pollset_work: pollset: %p created new pi: %p",
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001385 (void *)pollset, (void *)pollset->po.pi);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001386 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001387
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001388 pi = polling_island_maybe_get_latest(pollset->po.pi);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001389 epoll_fd = pi->epoll_fd;
1390
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001391 /* Update the pollset->po.pi since the island being pointed by
1392 pollset->po.pi maybe older than the one pointed by pi) */
1393 if (pollset->po.pi != pi) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001394 /* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the
1395 polling island to be deleted */
1396 PI_ADD_REF(pi, "ps");
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001397 PI_UNREF(exec_ctx, pollset->po.pi, "ps");
1398 pollset->po.pi = pi;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001399 }
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001400
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001401 /* Add an extra ref so that the island does not get destroyed (which means
1402 the epoll_fd won't be closed) while we are are doing an epoll_wait() on the
1403 epoll_fd */
1404 PI_ADD_REF(pi, "ps_work");
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001405 gpr_mu_unlock(&pollset->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001406
Craig Tiller460502e2016-10-13 10:02:08 -07001407 /* If we get some workqueue work to do, it might end up completing an item on
1408 the completion queue, so there's no need to poll... so we skip that and
1409 redo the complete loop to verify */
Craig Tiller2d1e8cd2017-05-17 12:41:44 -07001410 GRPC_POLLING_TRACE("pollset_work: pollset: %p, worker %p, pi %p", pollset,
1411 worker, pi);
Craig Tillerd8a3c042016-09-09 12:42:37 -07001412 if (!maybe_do_workqueue_work(exec_ctx, pi)) {
Craig Tiller2d1e8cd2017-05-17 12:41:44 -07001413 GRPC_POLLING_TRACE("pollset_work: begins");
Craig Tillerd8a3c042016-09-09 12:42:37 -07001414 gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1);
1415 g_current_thread_polling_island = pi;
1416
Vijay Paicef54012016-08-28 23:05:31 -07001417 GRPC_SCHEDULING_START_BLOCKING_REGION;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001418 ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms,
1419 sig_mask);
Vijay Paicef54012016-08-28 23:05:31 -07001420 GRPC_SCHEDULING_END_BLOCKING_REGION;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001421 if (ep_rv < 0) {
1422 if (errno != EINTR) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001423 gpr_asprintf(&err_msg,
1424 "epoll_wait() epoll fd: %d failed with error: %d (%s)",
1425 epoll_fd, errno, strerror(errno));
1426 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001427 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001428 /* We were interrupted. Save an interation by doing a zero timeout
1429 epoll_wait to see if there are any other events of interest */
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001430 GRPC_POLLING_TRACE(
1431 "pollset_work: pollset: %p, worker: %p received kick",
1432 (void *)pollset, (void *)worker);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001433 ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001434 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001435 }
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001436
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001437#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001438 /* See the definition of g_poll_sync for more details */
1439 gpr_atm_acq_load(&g_epoll_sync);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001440#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001441
Sree Kuchibhotla58e58962016-06-13 00:52:56 -07001442 for (int i = 0; i < ep_rv; ++i) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001443 void *data_ptr = ep_ev[i].data.ptr;
Craig Tillerc3571792017-05-02 12:33:38 -07001444 if (data_ptr == &pi->workqueue_wakeup_fd) {
Craig Tillere49959d2017-01-26 08:39:38 -08001445 append_error(error,
1446 grpc_wakeup_fd_consume_wakeup(&pi->workqueue_wakeup_fd),
Craig Tillerd8a3c042016-09-09 12:42:37 -07001447 err_desc);
1448 maybe_do_workqueue_work(exec_ctx, pi);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001449 } else if (data_ptr == &polling_island_wakeup_fd) {
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001450 GRPC_POLLING_TRACE(
1451 "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: "
1452 "%d) got merged",
1453 (void *)pollset, (void *)worker, epoll_fd);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001454 /* This means that our polling island is merged with a different
1455 island. We do not have to do anything here since the subsequent call
1456 to the function pollset_work_and_unlock() will pick up the correct
1457 epoll_fd */
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001458 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001459 grpc_fd *fd = data_ptr;
1460 int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
1461 int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
1462 int write_ev = ep_ev[i].events & EPOLLOUT;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001463 if (read_ev || cancel) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001464 fd_become_readable(exec_ctx, fd, pollset);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001465 }
1466 if (write_ev || cancel) {
1467 fd_become_writable(exec_ctx, fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001468 }
1469 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001470 }
Craig Tillerd8a3c042016-09-09 12:42:37 -07001471
1472 g_current_thread_polling_island = NULL;
1473 gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1);
Craig Tiller2d1e8cd2017-05-17 12:41:44 -07001474 GRPC_POLLING_TRACE("pollset_work: ends");
Craig Tillerd8a3c042016-09-09 12:42:37 -07001475 }
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001476
1477 GPR_ASSERT(pi != NULL);
1478
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001479 /* Before leaving, release the extra ref we added to the polling island. It
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001480 is important to use "pi" here (i.e our old copy of pollset->po.pi
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001481 that we got before releasing the polling island lock). This is because
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001482 pollset->po.pi pointer might get udpated in other parts of the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001483 code when there is an island merge while we are doing epoll_wait() above */
Craig Tillerb39307d2016-06-30 15:39:13 -07001484 PI_UNREF(exec_ctx, pi, "ps_work");
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001485
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001486 GPR_TIMER_END("pollset_work_and_unlock", 0);
1487}
1488
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001489/* pollset->po.mu lock must be held by the caller before calling this.
1490 The function pollset_work() may temporarily release the lock (pollset->po.mu)
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001491 during the course of its execution but it will always re-acquire the lock and
1492 ensure that it is held by the time the function returns */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001493static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1494 grpc_pollset_worker **worker_hdl,
1495 gpr_timespec now, gpr_timespec deadline) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001496 GPR_TIMER_BEGIN("pollset_work", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001497 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001498 int timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
1499
1500 sigset_t new_mask;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001501
1502 grpc_pollset_worker worker;
1503 worker.next = worker.prev = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001504 worker.pt_id = pthread_self();
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001505 gpr_atm_no_barrier_store(&worker.is_kicked, (gpr_atm)0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001506
Craig Tiller557c88c2017-04-05 17:20:18 -07001507 if (worker_hdl) *worker_hdl = &worker;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001508
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001509 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
1510 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001511
1512 if (pollset->kicked_without_pollers) {
1513 /* If the pollset was kicked without pollers, pretend that the current
1514 worker got the kick and skip polling. A kick indicates that there is some
1515 work that needs attention like an event on the completion queue or an
1516 alarm */
1517 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1518 pollset->kicked_without_pollers = 0;
1519 } else if (!pollset->shutting_down) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001520 /* We use the posix-signal with number 'grpc_wakeup_signal' for waking up
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001521 (i.e 'kicking') a worker in the pollset. A 'kick' is a way to inform the
1522 worker that there is some pending work that needs immediate attention
1523 (like an event on the completion queue, or a polling island merge that
1524 results in a new epoll-fd to wait on) and that the worker should not
1525 spend time waiting in epoll_pwait().
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001526
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001527 A worker can be kicked anytime from the point it is added to the pollset
1528 via push_front_worker() (or push_back_worker()) to the point it is
1529 removed via remove_worker().
1530 If the worker is kicked before/during it calls epoll_pwait(), it should
1531 immediately exit from epoll_wait(). If the worker is kicked after it
1532 returns from epoll_wait(), then nothing really needs to be done.
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001533
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001534 To accomplish this, we mask 'grpc_wakeup_signal' on this thread at all
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001535 times *except* when it is in epoll_pwait(). This way, the worker never
1536 misses acting on a kick */
1537
Craig Tiller19196992016-06-27 18:45:56 -07001538 if (!g_initialized_sigmask) {
1539 sigemptyset(&new_mask);
1540 sigaddset(&new_mask, grpc_wakeup_signal);
1541 pthread_sigmask(SIG_BLOCK, &new_mask, &g_orig_sigmask);
1542 sigdelset(&g_orig_sigmask, grpc_wakeup_signal);
1543 g_initialized_sigmask = true;
1544 /* new_mask: The new thread mask which blocks 'grpc_wakeup_signal'.
1545 This is the mask used at all times *except during
1546 epoll_wait()*"
1547 g_orig_sigmask: The thread mask which allows 'grpc_wakeup_signal' and
Craig Tiller510ff692016-06-27 20:31:49 -07001548 this is the mask to use *during epoll_wait()*
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001549
Craig Tiller19196992016-06-27 18:45:56 -07001550 The new_mask is set on the worker before it is added to the pollset
Craig Tiller510ff692016-06-27 20:31:49 -07001551 (i.e before it can be kicked) */
Craig Tiller19196992016-06-27 18:45:56 -07001552 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001553
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001554 push_front_worker(pollset, &worker); /* Add worker to pollset */
1555
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001556 pollset_work_and_unlock(exec_ctx, pollset, &worker, timeout_ms,
1557 &g_orig_sigmask, &error);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001558 grpc_exec_ctx_flush(exec_ctx);
1559
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001560 gpr_mu_lock(&pollset->po.mu);
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001561
1562 /* Note: There is no need to reset worker.is_kicked to 0 since we are no
1563 longer going to use this worker */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001564 remove_worker(pollset, &worker);
1565 }
1566
1567 /* If we are the last worker on the pollset (i.e pollset_has_workers() is
1568 false at this point) and the pollset is shutting down, we may have to
1569 finish the shutdown process by calling finish_shutdown_locked().
1570 See pollset_shutdown() for more details.
1571
1572 Note: Continuing to access pollset here is safe; it is the caller's
1573 responsibility to not destroy a pollset when it has outstanding calls to
1574 pollset_work() */
1575 if (pollset->shutting_down && !pollset_has_workers(pollset) &&
1576 !pollset->finish_shutdown_called) {
1577 GPR_TIMER_MARK("pollset_work.finish_shutdown_locked", 0);
1578 finish_shutdown_locked(exec_ctx, pollset);
1579
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001580 gpr_mu_unlock(&pollset->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001581 grpc_exec_ctx_flush(exec_ctx);
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001582 gpr_mu_lock(&pollset->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001583 }
1584
Craig Tiller557c88c2017-04-05 17:20:18 -07001585 if (worker_hdl) *worker_hdl = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001586
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001587 gpr_tls_set(&g_current_thread_pollset, (intptr_t)0);
1588 gpr_tls_set(&g_current_thread_worker, (intptr_t)0);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001589
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001590 GPR_TIMER_END("pollset_work", 0);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001591
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001592 GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
1593 return error;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001594}
1595
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001596static void add_poll_object(grpc_exec_ctx *exec_ctx, poll_obj *bag,
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001597 poll_obj_type bag_type, poll_obj *item,
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001598 poll_obj_type item_type) {
1599 GPR_TIMER_BEGIN("add_poll_object", 0);
1600
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001601#ifdef PO_DEBUG
1602 GPR_ASSERT(item->obj_type == item_type);
1603 GPR_ASSERT(bag->obj_type == bag_type);
1604#endif
Craig Tiller57726ca2016-09-12 11:59:45 -07001605
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001606 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001607 polling_island *pi_new = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001608
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001609 gpr_mu_lock(&bag->mu);
1610 gpr_mu_lock(&item->mu);
1611
Craig Tiller7212c232016-07-06 13:11:09 -07001612retry:
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001613 /*
1614 * 1) If item->pi and bag->pi are both non-NULL and equal, do nothing
1615 * 2) If item->pi and bag->pi are both NULL, create a new polling island (with
1616 * a refcount of 2) and point item->pi and bag->pi to the new island
1617 * 3) If exactly one of item->pi or bag->pi is NULL, update it to point to
1618 * the other's non-NULL pi
1619 * 4) Finally if item->pi and bag-pi are non-NULL and not-equal, merge the
1620 * polling islands and update item->pi and bag->pi to point to the new
1621 * island
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001622 */
Craig Tiller8e8027b2016-07-07 10:42:09 -07001623
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001624 /* Early out if we are trying to add an 'fd' to a 'bag' but the fd is already
1625 * orphaned */
1626 if (item_type == POLL_OBJ_FD && (FD_FROM_PO(item))->orphaned) {
1627 gpr_mu_unlock(&item->mu);
1628 gpr_mu_unlock(&bag->mu);
Craig Tiller42ac6db2016-07-06 17:13:56 -07001629 return;
1630 }
1631
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001632 if (item->pi == bag->pi) {
1633 pi_new = item->pi;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001634 if (pi_new == NULL) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001635 /* GPR_ASSERT(item->pi == bag->pi == NULL) */
Sree Kuchibhotlaa129adf2016-10-26 16:44:44 -07001636
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001637 /* If we are adding an fd to a bag (i.e pollset or pollset_set), then
1638 * we need to do some extra work to make TSAN happy */
1639 if (item_type == POLL_OBJ_FD) {
1640 /* Unlock before creating a new polling island: the polling island will
1641 create a workqueue which creates a file descriptor, and holding an fd
1642 lock here can eventually cause a loop to appear to TSAN (making it
1643 unhappy). We don't think it's a real loop (there's an epoch point
1644 where that loop possibility disappears), but the advantages of
1645 keeping TSAN happy outweigh any performance advantage we might have
1646 by keeping the lock held. */
1647 gpr_mu_unlock(&item->mu);
1648 pi_new = polling_island_create(exec_ctx, FD_FROM_PO(item), &error);
1649 gpr_mu_lock(&item->mu);
Sree Kuchibhotlaa129adf2016-10-26 16:44:44 -07001650
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001651 /* Need to reverify any assumptions made between the initial lock and
1652 getting to this branch: if they've changed, we need to throw away our
1653 work and figure things out again. */
1654 if (item->pi != NULL) {
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001655 GRPC_POLLING_TRACE(
1656 "add_poll_object: Raced creating new polling island. pi_new: %p "
1657 "(fd: %d, %s: %p)",
1658 (void *)pi_new, FD_FROM_PO(item)->fd, poll_obj_string(bag_type),
1659 (void *)bag);
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001660 /* No need to lock 'pi_new' here since this is a new polling island
Sree Kuchibhotla8214b872017-02-23 12:49:48 -08001661 and no one has a reference to it yet */
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001662 polling_island_remove_all_fds_locked(pi_new, true, &error);
1663
1664 /* Ref and unref so that the polling island gets deleted during unref
1665 */
1666 PI_ADD_REF(pi_new, "dance_of_destruction");
1667 PI_UNREF(exec_ctx, pi_new, "dance_of_destruction");
1668 goto retry;
1669 }
Craig Tiller27da6422016-07-06 13:14:46 -07001670 } else {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001671 pi_new = polling_island_create(exec_ctx, NULL, &error);
Craig Tiller7212c232016-07-06 13:11:09 -07001672 }
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001673
1674 GRPC_POLLING_TRACE(
1675 "add_poll_object: Created new polling island. pi_new: %p (%s: %p, "
1676 "%s: %p)",
1677 (void *)pi_new, poll_obj_string(item_type), (void *)item,
1678 poll_obj_string(bag_type), (void *)bag);
1679 } else {
1680 GRPC_POLLING_TRACE(
1681 "add_poll_object: Same polling island. pi: %p (%s, %s)",
1682 (void *)pi_new, poll_obj_string(item_type),
1683 poll_obj_string(bag_type));
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001684 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001685 } else if (item->pi == NULL) {
1686 /* GPR_ASSERT(bag->pi != NULL) */
1687 /* Make pi_new point to latest pi*/
1688 pi_new = polling_island_lock(bag->pi);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001689
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001690 if (item_type == POLL_OBJ_FD) {
1691 grpc_fd *fd = FD_FROM_PO(item);
1692 polling_island_add_fds_locked(pi_new, &fd, 1, true, &error);
1693 }
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001694
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001695 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001696 GRPC_POLLING_TRACE(
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001697 "add_poll_obj: item->pi was NULL. pi_new: %p (item(%s): %p, "
1698 "bag(%s): %p)",
1699 (void *)pi_new, poll_obj_string(item_type), (void *)item,
1700 poll_obj_string(bag_type), (void *)bag);
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001701 } else if (bag->pi == NULL) {
1702 /* GPR_ASSERT(item->pi != NULL) */
1703 /* Make pi_new to point to latest pi */
1704 pi_new = polling_island_lock(item->pi);
1705 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001706 GRPC_POLLING_TRACE(
1707 "add_poll_obj: bag->pi was NULL. pi_new: %p (item(%s): %p, "
1708 "bag(%s): %p)",
1709 (void *)pi_new, poll_obj_string(item_type), (void *)item,
1710 poll_obj_string(bag_type), (void *)bag);
Sree Kuchibhotla5098f912016-05-31 10:58:17 -07001711 } else {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001712 pi_new = polling_island_merge(item->pi, bag->pi, &error);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001713 GRPC_POLLING_TRACE(
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001714 "add_poll_obj: polling islands merged. pi_new: %p (item(%s): %p, "
1715 "bag(%s): %p)",
1716 (void *)pi_new, poll_obj_string(item_type), (void *)item,
1717 poll_obj_string(bag_type), (void *)bag);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001718 }
1719
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001720 /* At this point, pi_new is the polling island that both item->pi and bag->pi
1721 MUST be pointing to */
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001722
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001723 if (item->pi != pi_new) {
1724 PI_ADD_REF(pi_new, poll_obj_string(item_type));
1725 if (item->pi != NULL) {
1726 PI_UNREF(exec_ctx, item->pi, poll_obj_string(item_type));
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001727 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001728 item->pi = pi_new;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001729 }
1730
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001731 if (bag->pi != pi_new) {
1732 PI_ADD_REF(pi_new, poll_obj_string(bag_type));
1733 if (bag->pi != NULL) {
1734 PI_UNREF(exec_ctx, bag->pi, poll_obj_string(bag_type));
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001735 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001736 bag->pi = pi_new;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001737 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001738
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001739 gpr_mu_unlock(&item->mu);
1740 gpr_mu_unlock(&bag->mu);
Craig Tiller15007612016-07-06 09:36:16 -07001741
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001742 GRPC_LOG_IF_ERROR("add_poll_object", error);
1743 GPR_TIMER_END("add_poll_object", 0);
1744}
Craig Tiller57726ca2016-09-12 11:59:45 -07001745
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001746static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1747 grpc_fd *fd) {
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001748 add_poll_object(exec_ctx, &pollset->po, POLL_OBJ_POLLSET, &fd->po,
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001749 POLL_OBJ_FD);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001750}
1751
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001752/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001753 * Pollset-set Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001754 */
1755
1756static grpc_pollset_set *pollset_set_create(void) {
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001757 grpc_pollset_set *pss = gpr_malloc(sizeof(*pss));
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001758 gpr_mu_init(&pss->po.mu);
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001759 pss->po.pi = NULL;
1760#ifdef PO_DEBUG
1761 pss->po.obj_type = POLL_OBJ_POLLSET_SET;
1762#endif
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001763 return pss;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001764}
1765
Craig Tiller9e5ac1b2017-02-14 22:25:50 -08001766static void pollset_set_destroy(grpc_exec_ctx *exec_ctx,
1767 grpc_pollset_set *pss) {
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001768 gpr_mu_destroy(&pss->po.mu);
1769
1770 if (pss->po.pi != NULL) {
Craig Tiller9e5ac1b2017-02-14 22:25:50 -08001771 PI_UNREF(exec_ctx, pss->po.pi, "pss_destroy");
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001772 }
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001773
1774 gpr_free(pss);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001775}
1776
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001777static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
1778 grpc_fd *fd) {
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001779 add_poll_object(exec_ctx, &pss->po, POLL_OBJ_POLLSET_SET, &fd->po,
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001780 POLL_OBJ_FD);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001781}
1782
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001783static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
1784 grpc_fd *fd) {
1785 /* Nothing to do */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001786}
1787
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001788static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001789 grpc_pollset_set *pss, grpc_pollset *ps) {
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001790 add_poll_object(exec_ctx, &pss->po, POLL_OBJ_POLLSET_SET, &ps->po,
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001791 POLL_OBJ_POLLSET);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001792}
1793
1794static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001795 grpc_pollset_set *pss, grpc_pollset *ps) {
1796 /* Nothing to do */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001797}
1798
1799static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
1800 grpc_pollset_set *bag,
1801 grpc_pollset_set *item) {
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001802 add_poll_object(exec_ctx, &bag->po, POLL_OBJ_POLLSET_SET, &item->po,
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001803 POLL_OBJ_POLLSET_SET);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001804}
1805
1806static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
1807 grpc_pollset_set *bag,
1808 grpc_pollset_set *item) {
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001809 /* Nothing to do */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001810}
1811
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001812/* Test helper functions
1813 * */
1814void *grpc_fd_get_polling_island(grpc_fd *fd) {
1815 polling_island *pi;
1816
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001817 gpr_mu_lock(&fd->po.mu);
1818 pi = fd->po.pi;
1819 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001820
1821 return pi;
1822}
1823
1824void *grpc_pollset_get_polling_island(grpc_pollset *ps) {
1825 polling_island *pi;
1826
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001827 gpr_mu_lock(&ps->po.mu);
1828 pi = ps->po.pi;
1829 gpr_mu_unlock(&ps->po.mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001830
1831 return pi;
1832}
1833
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001834bool grpc_are_polling_islands_equal(void *p, void *q) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001835 polling_island *p1 = p;
1836 polling_island *p2 = q;
1837
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001838 /* Note: polling_island_lock_pair() may change p1 and p2 to point to the
1839 latest polling islands in their respective linked lists */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001840 polling_island_lock_pair(&p1, &p2);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001841 polling_island_unlock_pair(p1, p2);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001842
1843 return p1 == p2;
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001844}
1845
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001846/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001847 * Event engine binding
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001848 */
1849
1850static void shutdown_engine(void) {
1851 fd_global_shutdown();
1852 pollset_global_shutdown();
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001853 polling_island_global_shutdown();
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001854}
1855
1856static const grpc_event_engine_vtable vtable = {
1857 .pollset_size = sizeof(grpc_pollset),
1858
1859 .fd_create = fd_create,
1860 .fd_wrapped_fd = fd_wrapped_fd,
1861 .fd_orphan = fd_orphan,
1862 .fd_shutdown = fd_shutdown,
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001863 .fd_is_shutdown = fd_is_shutdown,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001864 .fd_notify_on_read = fd_notify_on_read,
1865 .fd_notify_on_write = fd_notify_on_write,
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001866 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
Craig Tiller70bd4832016-06-30 14:20:46 -07001867 .fd_get_workqueue = fd_get_workqueue,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001868
1869 .pollset_init = pollset_init,
1870 .pollset_shutdown = pollset_shutdown,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001871 .pollset_destroy = pollset_destroy,
1872 .pollset_work = pollset_work,
1873 .pollset_kick = pollset_kick,
1874 .pollset_add_fd = pollset_add_fd,
1875
1876 .pollset_set_create = pollset_set_create,
1877 .pollset_set_destroy = pollset_set_destroy,
1878 .pollset_set_add_pollset = pollset_set_add_pollset,
1879 .pollset_set_del_pollset = pollset_set_del_pollset,
1880 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
1881 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
1882 .pollset_set_add_fd = pollset_set_add_fd,
1883 .pollset_set_del_fd = pollset_set_del_fd,
1884
Craig Tillerd8a3c042016-09-09 12:42:37 -07001885 .workqueue_ref = workqueue_ref,
1886 .workqueue_unref = workqueue_unref,
Craig Tiller91031da2016-12-28 15:44:25 -08001887 .workqueue_scheduler = workqueue_scheduler,
Craig Tillerd8a3c042016-09-09 12:42:37 -07001888
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001889 .shutdown_engine = shutdown_engine,
1890};
1891
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001892/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
1893 * Create a dummy epoll_fd to make sure epoll support is available */
1894static bool is_epoll_available() {
1895 int fd = epoll_create1(EPOLL_CLOEXEC);
1896 if (fd < 0) {
1897 gpr_log(
1898 GPR_ERROR,
1899 "epoll_create1 failed with error: %d. Not using epoll polling engine",
1900 fd);
1901 return false;
1902 }
1903 close(fd);
1904 return true;
1905}
1906
Craig Tillerf8382b82017-04-27 15:09:48 -07001907const grpc_event_engine_vtable *grpc_init_epollsig_linux(
1908 bool explicit_request) {
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001909 /* If use of signals is disabled, we cannot use epoll engine*/
1910 if (is_grpc_wakeup_signal_initialized && grpc_wakeup_signal < 0) {
1911 return NULL;
1912 }
1913
Ken Paysoncd7d0472016-10-11 12:24:20 -07001914 if (!grpc_has_wakeup_fd()) {
Ken Paysonbc544be2016-10-06 19:23:47 -07001915 return NULL;
1916 }
1917
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001918 if (!is_epoll_available()) {
1919 return NULL;
1920 }
1921
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001922 if (!is_grpc_wakeup_signal_initialized) {
Craig Tillerbc0ab082017-05-05 10:42:44 -07001923 /* TODO(ctiller): when other epoll engines are ready, remove the true || to
1924 * force this to be explitly chosen if needed */
Craig Tiller924353a2017-05-05 17:36:31 +00001925 if (true || explicit_request) {
Craig Tillerf8382b82017-04-27 15:09:48 -07001926 grpc_use_signal(SIGRTMIN + 6);
1927 } else {
1928 return NULL;
1929 }
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001930 }
1931
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001932 fd_global_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001933
1934 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1935 return NULL;
1936 }
1937
1938 if (!GRPC_LOG_IF_ERROR("polling_island_global_init",
1939 polling_island_global_init())) {
1940 return NULL;
1941 }
1942
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001943 return &vtable;
1944}
1945
murgatroid99623dd4f2016-08-08 17:31:27 -07001946#else /* defined(GRPC_LINUX_EPOLL) */
1947#if defined(GRPC_POSIX_SOCKET)
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001948#include "src/core/lib/iomgr/ev_posix.h"
murgatroid99623dd4f2016-08-08 17:31:27 -07001949/* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001950 * NULL */
Craig Tillerf8382b82017-04-27 15:09:48 -07001951const grpc_event_engine_vtable *grpc_init_epollsig_linux(
1952 bool explicit_request) {
1953 return NULL;
1954}
murgatroid99623dd4f2016-08-08 17:31:27 -07001955#endif /* defined(GRPC_POSIX_SOCKET) */
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001956
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001957void grpc_use_signal(int signum) {}
murgatroid99623dd4f2016-08-08 17:31:27 -07001958#endif /* !defined(GRPC_LINUX_EPOLL) */