blob: afd6ddce64dbf90ee76d583f82f7effd733bc1f8 [file] [log] [blame]
ctiller58393c22015-01-07 14:03:30 -08001/*
2 *
Craig Tiller06059952015-02-18 08:34:56 -08003 * Copyright 2015, Google Inc.
ctiller58393c22015-01-07 14:03:30 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Craig Tillerd14a1a52015-01-21 15:26:29 -080034#include <grpc/support/port_platform.h>
35
36#ifdef GPR_POSIX_SOCKET
37
ctiller58393c22015-01-07 14:03:30 -080038#include "src/core/iomgr/fd_posix.h"
39
40#include <assert.h>
David Klempnerc6bccc22015-02-24 17:33:05 -080041#include <sys/socket.h>
ctiller58393c22015-01-07 14:03:30 -080042#include <unistd.h>
43
44#include "src/core/iomgr/iomgr_internal.h"
45#include <grpc/support/alloc.h>
46#include <grpc/support/log.h>
47#include <grpc/support/useful.h>
48
Craig Tillerf95e37f2015-02-18 15:15:29 -080049enum descriptor_state {
50 NOT_READY = 0,
51 READY = 1
52}; /* or a pointer to a closure to call */
ctiller58393c22015-01-07 14:03:30 -080053
David Klempnerd1785242015-01-28 17:00:21 -080054/* We need to keep a freelist not because of any concerns of malloc performance
55 * but instead so that implementations with multiple threads in (for example)
56 * epoll_wait deal with the race between pollset removal and incoming poll
57 * notifications.
58 *
59 * The problem is that the poller ultimately holds a reference to this
60 * object, so it is very difficult to know when is safe to free it, at least
61 * without some expensive synchronization.
62 *
63 * If we keep the object freelisted, in the worst case losing this race just
64 * becomes a spurious read notification on a reused fd.
65 */
66/* TODO(klempner): We could use some form of polling generation count to know
67 * when these are safe to free. */
68/* TODO(klempner): Consider disabling freelisting if we don't have multiple
69 * threads in poll on the same fd */
70/* TODO(klempner): Batch these allocations to reduce fragmentation */
71static grpc_fd *fd_freelist = NULL;
72static gpr_mu fd_freelist_mu;
73
74static void freelist_fd(grpc_fd *fd) {
David Klempnerd1785242015-01-28 17:00:21 -080075 gpr_mu_lock(&fd_freelist_mu);
76 fd->freelist_next = fd_freelist;
77 fd_freelist = fd;
78 gpr_mu_unlock(&fd_freelist_mu);
79}
80
81static grpc_fd *alloc_fd(int fd) {
82 grpc_fd *r = NULL;
83 gpr_mu_lock(&fd_freelist_mu);
84 if (fd_freelist != NULL) {
85 r = fd_freelist;
86 fd_freelist = fd_freelist->freelist_next;
87 }
88 gpr_mu_unlock(&fd_freelist_mu);
89 if (r == NULL) {
90 r = gpr_malloc(sizeof(grpc_fd));
91 gpr_mu_init(&r->set_state_mu);
92 gpr_mu_init(&r->watcher_mu);
93 }
94 gpr_atm_rel_store(&r->refst, 1);
Craig Tiller0fcd53c2015-02-18 15:10:53 -080095 gpr_atm_rel_store(&r->readst, NOT_READY);
96 gpr_atm_rel_store(&r->writest, NOT_READY);
David Klempnerd1785242015-01-28 17:00:21 -080097 gpr_atm_rel_store(&r->shutdown, 0);
98 r->fd = fd;
Craig Tiller7d413212015-02-09 08:00:02 -080099 r->watcher_root.next = r->watcher_root.prev = &r->watcher_root;
David Klempnerd1785242015-01-28 17:00:21 -0800100 r->freelist_next = NULL;
Craig Tiller886d7ec2015-05-14 16:18:42 -0700101 r->read_watcher = r->write_watcher = NULL;
David Klempnerd1785242015-01-28 17:00:21 -0800102 return r;
103}
104
105static void destroy(grpc_fd *fd) {
106 gpr_mu_destroy(&fd->set_state_mu);
107 gpr_mu_destroy(&fd->watcher_mu);
ctiller58393c22015-01-07 14:03:30 -0800108 gpr_free(fd);
ctiller58393c22015-01-07 14:03:30 -0800109}
110
111static void ref_by(grpc_fd *fd, int n) {
Craig Tiller23139ae2015-02-17 15:46:13 -0800112 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
ctiller58393c22015-01-07 14:03:30 -0800113}
114
115static void unref_by(grpc_fd *fd, int n) {
Craig Tiller23139ae2015-02-17 15:46:13 -0800116 gpr_atm old = gpr_atm_full_fetch_add(&fd->refst, -n);
117 if (old == n) {
David Klempnerc6bccc22015-02-24 17:33:05 -0800118 close(fd->fd);
David Klempnerd1785242015-01-28 17:00:21 -0800119 grpc_iomgr_add_callback(fd->on_done, fd->on_done_user_data);
120 freelist_fd(fd);
121 grpc_iomgr_unref();
Craig Tiller23139ae2015-02-17 15:46:13 -0800122 } else {
123 GPR_ASSERT(old > n);
David Klempnerd1785242015-01-28 17:00:21 -0800124 }
125}
126
Craig Tiller7d413212015-02-09 08:00:02 -0800127void grpc_fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
David Klempnerd1785242015-01-28 17:00:21 -0800128
129void grpc_fd_global_shutdown(void) {
130 while (fd_freelist != NULL) {
131 grpc_fd *fd = fd_freelist;
132 fd_freelist = fd_freelist->freelist_next;
ctiller58393c22015-01-07 14:03:30 -0800133 destroy(fd);
134 }
David Klempnerd1785242015-01-28 17:00:21 -0800135 gpr_mu_destroy(&fd_freelist_mu);
ctiller58393c22015-01-07 14:03:30 -0800136}
137
138static void do_nothing(void *ignored, int success) {}
139
140grpc_fd *grpc_fd_create(int fd) {
David Klempnerd1785242015-01-28 17:00:21 -0800141 grpc_fd *r = alloc_fd(fd);
ctiller58393c22015-01-07 14:03:30 -0800142 grpc_iomgr_ref();
ctiller58393c22015-01-07 14:03:30 -0800143 grpc_pollset_add_fd(grpc_backup_pollset(), r);
144 return r;
145}
146
147int grpc_fd_is_orphaned(grpc_fd *fd) {
148 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
149}
150
Craig Tiller886d7ec2015-05-14 16:18:42 -0700151static void maybe_wake_one_watcher_locked(grpc_fd *fd) {
152 if (fd->watcher_root.next != &fd->watcher_root) {
153 grpc_pollset_force_kick(fd->watcher_root.next->pollset);
154 }
155}
156
157static void maybe_wake_one_watcher(grpc_fd *fd) {
ctiller58393c22015-01-07 14:03:30 -0800158 gpr_mu_lock(&fd->watcher_mu);
Craig Tiller886d7ec2015-05-14 16:18:42 -0700159 maybe_wake_one_watcher_locked(fd);
160 gpr_mu_unlock(&fd->watcher_mu);
161}
162
163static void wake_all_watchers(grpc_fd *fd) {
164 grpc_fd_watcher *watcher;
Craig Tiller7d413212015-02-09 08:00:02 -0800165 for (watcher = fd->watcher_root.next; watcher != &fd->watcher_root;
166 watcher = watcher->next) {
167 grpc_pollset_force_kick(watcher->pollset);
ctiller58393c22015-01-07 14:03:30 -0800168 }
ctiller58393c22015-01-07 14:03:30 -0800169}
170
171void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data) {
172 fd->on_done = on_done ? on_done : do_nothing;
173 fd->on_done_user_data = user_data;
David Klempnerc6bccc22015-02-24 17:33:05 -0800174 shutdown(fd->fd, SHUT_RDWR);
ctiller58393c22015-01-07 14:03:30 -0800175 ref_by(fd, 1); /* remove active status, but keep referenced */
Craig Tiller886d7ec2015-05-14 16:18:42 -0700176 wake_all_watchers(fd);
ctiller58393c22015-01-07 14:03:30 -0800177 unref_by(fd, 2); /* drop the reference */
178}
179
180/* increment refcount by two to avoid changing the orphan bit */
181void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
182
183void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
184
ctiller58393c22015-01-07 14:03:30 -0800185static void make_callback(grpc_iomgr_cb_func cb, void *arg, int success,
186 int allow_synchronous_callback) {
187 if (allow_synchronous_callback) {
188 cb(arg, success);
189 } else {
190 grpc_iomgr_add_delayed_callback(cb, arg, success);
191 }
192}
193
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800194static void make_callbacks(grpc_iomgr_closure *callbacks, size_t n, int success,
ctiller58393c22015-01-07 14:03:30 -0800195 int allow_synchronous_callback) {
196 size_t i;
197 for (i = 0; i < n; i++) {
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800198 make_callback(callbacks[i].cb, callbacks[i].cb_arg, success,
ctiller58393c22015-01-07 14:03:30 -0800199 allow_synchronous_callback);
200 }
201}
202
Craig Tillerf95e37f2015-02-18 15:15:29 -0800203static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure,
204 int allow_synchronous_callback) {
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800205 switch (gpr_atm_acq_load(st)) {
ctiller58393c22015-01-07 14:03:30 -0800206 case NOT_READY:
207 /* There is no race if the descriptor is already ready, so we skip
208 the interlocked op in that case. As long as the app doesn't
209 try to set the same upcall twice (which it shouldn't) then
210 oldval should never be anything other than READY or NOT_READY. We
211 don't
212 check for user error on the fast path. */
Craig Tillerf95e37f2015-02-18 15:15:29 -0800213 if (gpr_atm_rel_cas(st, NOT_READY, (gpr_intptr)closure)) {
ctiller58393c22015-01-07 14:03:30 -0800214 /* swap was successful -- the closure will run after the next
215 set_ready call. NOTE: we don't have an ABA problem here,
216 since we should never have concurrent calls to the same
217 notify_on function. */
Craig Tiller886d7ec2015-05-14 16:18:42 -0700218 maybe_wake_one_watcher(fd);
ctiller58393c22015-01-07 14:03:30 -0800219 return;
220 }
221 /* swap was unsuccessful due to an intervening set_ready call.
222 Fall through to the READY code below */
223 case READY:
David Klempner466423b2015-03-11 15:00:46 -0700224 assert(gpr_atm_no_barrier_load(st) == READY);
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800225 gpr_atm_rel_store(st, NOT_READY);
Craig Tillerf95e37f2015-02-18 15:15:29 -0800226 make_callback(closure->cb, closure->cb_arg,
227 !gpr_atm_acq_load(&fd->shutdown),
ctiller58393c22015-01-07 14:03:30 -0800228 allow_synchronous_callback);
229 return;
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800230 default: /* WAITING */
ctiller58393c22015-01-07 14:03:30 -0800231 /* upcallptr was set to a different closure. This is an error! */
232 gpr_log(GPR_ERROR,
233 "User called a notify_on function with a previous callback still "
234 "pending");
235 abort();
236 }
237 gpr_log(GPR_ERROR, "Corrupt memory in &st->state");
238 abort();
239}
240
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800241static void set_ready_locked(gpr_atm *st, grpc_iomgr_closure *callbacks,
ctiller58393c22015-01-07 14:03:30 -0800242 size_t *ncallbacks) {
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800243 gpr_intptr state = gpr_atm_acq_load(st);
ctiller58393c22015-01-07 14:03:30 -0800244
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800245 switch (state) {
246 case READY:
247 /* duplicate ready, ignore */
248 return;
ctiller58393c22015-01-07 14:03:30 -0800249 case NOT_READY:
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800250 if (gpr_atm_rel_cas(st, NOT_READY, READY)) {
ctiller58393c22015-01-07 14:03:30 -0800251 /* swap was successful -- the closure will run after the next
252 notify_on call. */
253 return;
254 }
Craig Tillerf95e37f2015-02-18 15:15:29 -0800255 /* swap was unsuccessful due to an intervening set_ready call.
256 Fall through to the WAITING code below */
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800257 state = gpr_atm_acq_load(st);
258 default: /* waiting */
David Klempner466423b2015-03-11 15:00:46 -0700259 assert(gpr_atm_no_barrier_load(st) != READY &&
260 gpr_atm_no_barrier_load(st) != NOT_READY);
Craig Tillerf95e37f2015-02-18 15:15:29 -0800261 callbacks[(*ncallbacks)++] = *(grpc_iomgr_closure *)state;
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800262 gpr_atm_rel_store(st, NOT_READY);
ctiller58393c22015-01-07 14:03:30 -0800263 return;
264 }
265}
266
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800267static void set_ready(grpc_fd *fd, gpr_atm *st,
ctiller58393c22015-01-07 14:03:30 -0800268 int allow_synchronous_callback) {
269 /* only one set_ready can be active at once (but there may be a racing
270 notify_on) */
271 int success;
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800272 grpc_iomgr_closure cb;
ctiller58393c22015-01-07 14:03:30 -0800273 size_t ncb = 0;
274 gpr_mu_lock(&fd->set_state_mu);
275 set_ready_locked(st, &cb, &ncb);
276 gpr_mu_unlock(&fd->set_state_mu);
277 success = !gpr_atm_acq_load(&fd->shutdown);
278 make_callbacks(&cb, ncb, success, allow_synchronous_callback);
279}
280
281void grpc_fd_shutdown(grpc_fd *fd) {
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800282 grpc_iomgr_closure cb[2];
ctiller58393c22015-01-07 14:03:30 -0800283 size_t ncb = 0;
284 gpr_mu_lock(&fd->set_state_mu);
David Klempner466423b2015-03-11 15:00:46 -0700285 GPR_ASSERT(!gpr_atm_no_barrier_load(&fd->shutdown));
ctiller58393c22015-01-07 14:03:30 -0800286 gpr_atm_rel_store(&fd->shutdown, 1);
287 set_ready_locked(&fd->readst, cb, &ncb);
288 set_ready_locked(&fd->writest, cb, &ncb);
289 gpr_mu_unlock(&fd->set_state_mu);
290 make_callbacks(cb, ncb, 0, 0);
291}
292
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800293void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_closure *closure) {
294 notify_on(fd, &fd->readst, closure, 0);
ctiller58393c22015-01-07 14:03:30 -0800295}
296
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800297void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_closure *closure) {
298 notify_on(fd, &fd->writest, closure, 0);
ctiller58393c22015-01-07 14:03:30 -0800299}
300
301gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
Craig Tiller7d413212015-02-09 08:00:02 -0800302 gpr_uint32 read_mask, gpr_uint32 write_mask,
303 grpc_fd_watcher *watcher) {
Craig Tiller886d7ec2015-05-14 16:18:42 -0700304 gpr_uint32 mask = 0;
ctiller58393c22015-01-07 14:03:30 -0800305 /* keep track of pollers that have requested our events, in case they change
306 */
Craig Tiller59ea16f2015-02-18 16:18:08 -0800307 grpc_fd_ref(fd);
308
ctiller58393c22015-01-07 14:03:30 -0800309 gpr_mu_lock(&fd->watcher_mu);
Craig Tiller886d7ec2015-05-14 16:18:42 -0700310 /* if there is nobody polling for read, but we need to, then start doing so */
311 if (!fd->read_watcher && gpr_atm_acq_load(&fd->readst) > READY) {
312 fd->read_watcher = watcher;
313 mask |= read_mask;
314 }
315 /* if there is nobody polling for write, but we need to, then start doing so */
316 if (!fd->write_watcher && gpr_atm_acq_load(&fd->writest) > READY) {
317 fd->write_watcher = watcher;
318 mask |= write_mask;
319 }
320 /* if not polling, remember this watcher in case we need someone to later */
321 if (mask == 0) {
322 watcher->next = &fd->watcher_root;
323 watcher->prev = watcher->next->prev;
324 watcher->next->prev = watcher->prev->next = watcher;
325 }
Craig Tiller7d413212015-02-09 08:00:02 -0800326 watcher->pollset = pollset;
327 watcher->fd = fd;
ctiller58393c22015-01-07 14:03:30 -0800328 gpr_mu_unlock(&fd->watcher_mu);
329
Craig Tiller886d7ec2015-05-14 16:18:42 -0700330 return mask;
ctiller58393c22015-01-07 14:03:30 -0800331}
332
Craig Tiller886d7ec2015-05-14 16:18:42 -0700333void grpc_fd_end_poll(grpc_fd_watcher *watcher, int got_read, int got_write) {
334 int was_polling = 0;
335 int kick = 0;
336 grpc_fd *fd = watcher->fd;
Craig Tiller59ea16f2015-02-18 16:18:08 -0800337
Craig Tiller886d7ec2015-05-14 16:18:42 -0700338 gpr_mu_lock(&fd->watcher_mu);
339 if (watcher == fd->read_watcher) {
340 was_polling = 1;
341 kick |= !got_read;
342 fd->read_watcher = NULL;
343 }
344 if (watcher == fd->write_watcher) {
345 was_polling = 1;
346 kick |= !got_write;
347 fd->write_watcher = NULL;
348 }
349 if (!was_polling) {
350 watcher->next->prev = watcher->prev;
351 watcher->prev->next = watcher->next;
352 }
353 if (kick) {
354 maybe_wake_one_watcher_locked(fd);
355 }
356 gpr_mu_unlock(&fd->watcher_mu);
357
358 grpc_fd_unref(fd);
ctiller58393c22015-01-07 14:03:30 -0800359}
360
361void grpc_fd_become_readable(grpc_fd *fd, int allow_synchronous_callback) {
362 set_ready(fd, &fd->readst, allow_synchronous_callback);
363}
364
365void grpc_fd_become_writable(grpc_fd *fd, int allow_synchronous_callback) {
366 set_ready(fd, &fd->writest, allow_synchronous_callback);
367}
Craig Tillerd14a1a52015-01-21 15:26:29 -0800368
Craig Tiller190d3602015-02-18 09:23:38 -0800369#endif