blob: 63615ea25f74f9232259860257b0d66e8e41d3f4 [file] [log] [blame]
ctiller58393c22015-01-07 14:03:30 -08001/*
2 *
Craig Tiller06059952015-02-18 08:34:56 -08003 * Copyright 2015, Google Inc.
ctiller58393c22015-01-07 14:03:30 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Craig Tillerd14a1a52015-01-21 15:26:29 -080034#include <grpc/support/port_platform.h>
35
36#ifdef GPR_POSIX_SOCKET
37
ctiller58393c22015-01-07 14:03:30 -080038#include "src/core/iomgr/fd_posix.h"
39
40#include <assert.h>
David Klempnerc6bccc22015-02-24 17:33:05 -080041#include <sys/socket.h>
ctiller58393c22015-01-07 14:03:30 -080042#include <unistd.h>
43
44#include "src/core/iomgr/iomgr_internal.h"
45#include <grpc/support/alloc.h>
46#include <grpc/support/log.h>
47#include <grpc/support/useful.h>
48
Craig Tillerf95e37f2015-02-18 15:15:29 -080049enum descriptor_state {
50 NOT_READY = 0,
51 READY = 1
52}; /* or a pointer to a closure to call */
ctiller58393c22015-01-07 14:03:30 -080053
David Klempnerd1785242015-01-28 17:00:21 -080054/* We need to keep a freelist not because of any concerns of malloc performance
55 * but instead so that implementations with multiple threads in (for example)
56 * epoll_wait deal with the race between pollset removal and incoming poll
57 * notifications.
58 *
59 * The problem is that the poller ultimately holds a reference to this
60 * object, so it is very difficult to know when is safe to free it, at least
61 * without some expensive synchronization.
62 *
63 * If we keep the object freelisted, in the worst case losing this race just
64 * becomes a spurious read notification on a reused fd.
65 */
66/* TODO(klempner): We could use some form of polling generation count to know
67 * when these are safe to free. */
68/* TODO(klempner): Consider disabling freelisting if we don't have multiple
69 * threads in poll on the same fd */
70/* TODO(klempner): Batch these allocations to reduce fragmentation */
71static grpc_fd *fd_freelist = NULL;
72static gpr_mu fd_freelist_mu;
73
74static void freelist_fd(grpc_fd *fd) {
David Klempnerd1785242015-01-28 17:00:21 -080075 gpr_mu_lock(&fd_freelist_mu);
76 fd->freelist_next = fd_freelist;
77 fd_freelist = fd;
78 gpr_mu_unlock(&fd_freelist_mu);
79}
80
81static grpc_fd *alloc_fd(int fd) {
82 grpc_fd *r = NULL;
83 gpr_mu_lock(&fd_freelist_mu);
84 if (fd_freelist != NULL) {
85 r = fd_freelist;
86 fd_freelist = fd_freelist->freelist_next;
87 }
88 gpr_mu_unlock(&fd_freelist_mu);
89 if (r == NULL) {
90 r = gpr_malloc(sizeof(grpc_fd));
91 gpr_mu_init(&r->set_state_mu);
92 gpr_mu_init(&r->watcher_mu);
93 }
94 gpr_atm_rel_store(&r->refst, 1);
Craig Tiller0fcd53c2015-02-18 15:10:53 -080095 gpr_atm_rel_store(&r->readst, NOT_READY);
96 gpr_atm_rel_store(&r->writest, NOT_READY);
David Klempnerd1785242015-01-28 17:00:21 -080097 gpr_atm_rel_store(&r->shutdown, 0);
98 r->fd = fd;
Craig Tiller8e50fe92015-05-18 10:45:04 -070099 r->inactive_watcher_root.next = r->inactive_watcher_root.prev =
100 &r->inactive_watcher_root;
David Klempnerd1785242015-01-28 17:00:21 -0800101 r->freelist_next = NULL;
Craig Tiller886d7ec2015-05-14 16:18:42 -0700102 r->read_watcher = r->write_watcher = NULL;
David Klempnerd1785242015-01-28 17:00:21 -0800103 return r;
104}
105
106static void destroy(grpc_fd *fd) {
107 gpr_mu_destroy(&fd->set_state_mu);
108 gpr_mu_destroy(&fd->watcher_mu);
ctiller58393c22015-01-07 14:03:30 -0800109 gpr_free(fd);
ctiller58393c22015-01-07 14:03:30 -0800110}
111
112static void ref_by(grpc_fd *fd, int n) {
Craig Tiller23139ae2015-02-17 15:46:13 -0800113 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
ctiller58393c22015-01-07 14:03:30 -0800114}
115
116static void unref_by(grpc_fd *fd, int n) {
Craig Tiller23139ae2015-02-17 15:46:13 -0800117 gpr_atm old = gpr_atm_full_fetch_add(&fd->refst, -n);
118 if (old == n) {
David Klempnerc6bccc22015-02-24 17:33:05 -0800119 close(fd->fd);
David Klempnerd1785242015-01-28 17:00:21 -0800120 grpc_iomgr_add_callback(fd->on_done, fd->on_done_user_data);
121 freelist_fd(fd);
122 grpc_iomgr_unref();
Craig Tiller23139ae2015-02-17 15:46:13 -0800123 } else {
124 GPR_ASSERT(old > n);
David Klempnerd1785242015-01-28 17:00:21 -0800125 }
126}
127
Craig Tiller7d413212015-02-09 08:00:02 -0800128void grpc_fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
David Klempnerd1785242015-01-28 17:00:21 -0800129
130void grpc_fd_global_shutdown(void) {
131 while (fd_freelist != NULL) {
132 grpc_fd *fd = fd_freelist;
133 fd_freelist = fd_freelist->freelist_next;
ctiller58393c22015-01-07 14:03:30 -0800134 destroy(fd);
135 }
David Klempnerd1785242015-01-28 17:00:21 -0800136 gpr_mu_destroy(&fd_freelist_mu);
ctiller58393c22015-01-07 14:03:30 -0800137}
138
139static void do_nothing(void *ignored, int success) {}
140
141grpc_fd *grpc_fd_create(int fd) {
David Klempnerd1785242015-01-28 17:00:21 -0800142 grpc_fd *r = alloc_fd(fd);
ctiller58393c22015-01-07 14:03:30 -0800143 grpc_iomgr_ref();
ctiller58393c22015-01-07 14:03:30 -0800144 grpc_pollset_add_fd(grpc_backup_pollset(), r);
145 return r;
146}
147
148int grpc_fd_is_orphaned(grpc_fd *fd) {
149 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
150}
151
Craig Tiller886d7ec2015-05-14 16:18:42 -0700152static void maybe_wake_one_watcher_locked(grpc_fd *fd) {
Craig Tiller354bf6d2015-05-18 10:18:03 -0700153 if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) {
154 grpc_pollset_force_kick(fd->inactive_watcher_root.next->pollset);
155 } else if (fd->read_watcher) {
156 grpc_pollset_force_kick(fd->read_watcher->pollset);
157 } else if (fd->write_watcher) {
158 grpc_pollset_force_kick(fd->write_watcher->pollset);
Craig Tiller886d7ec2015-05-14 16:18:42 -0700159 }
160}
161
162static void maybe_wake_one_watcher(grpc_fd *fd) {
ctiller58393c22015-01-07 14:03:30 -0800163 gpr_mu_lock(&fd->watcher_mu);
Craig Tiller886d7ec2015-05-14 16:18:42 -0700164 maybe_wake_one_watcher_locked(fd);
165 gpr_mu_unlock(&fd->watcher_mu);
166}
167
168static void wake_all_watchers(grpc_fd *fd) {
169 grpc_fd_watcher *watcher;
Craig Tiller8e50fe92015-05-18 10:45:04 -0700170 for (watcher = fd->inactive_watcher_root.next;
171 watcher != &fd->inactive_watcher_root; watcher = watcher->next) {
Craig Tiller7d413212015-02-09 08:00:02 -0800172 grpc_pollset_force_kick(watcher->pollset);
ctiller58393c22015-01-07 14:03:30 -0800173 }
Craig Tiller354bf6d2015-05-18 10:18:03 -0700174 if (fd->read_watcher) {
175 grpc_pollset_force_kick(fd->read_watcher->pollset);
176 }
177 if (fd->write_watcher && fd->write_watcher != fd->read_watcher) {
178 grpc_pollset_force_kick(fd->write_watcher->pollset);
179 }
ctiller58393c22015-01-07 14:03:30 -0800180}
181
182void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data) {
183 fd->on_done = on_done ? on_done : do_nothing;
184 fd->on_done_user_data = user_data;
David Klempnerc6bccc22015-02-24 17:33:05 -0800185 shutdown(fd->fd, SHUT_RDWR);
ctiller58393c22015-01-07 14:03:30 -0800186 ref_by(fd, 1); /* remove active status, but keep referenced */
Craig Tiller886d7ec2015-05-14 16:18:42 -0700187 wake_all_watchers(fd);
ctiller58393c22015-01-07 14:03:30 -0800188 unref_by(fd, 2); /* drop the reference */
189}
190
191/* increment refcount by two to avoid changing the orphan bit */
192void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
193
194void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
195
ctiller58393c22015-01-07 14:03:30 -0800196static void make_callback(grpc_iomgr_cb_func cb, void *arg, int success,
197 int allow_synchronous_callback) {
198 if (allow_synchronous_callback) {
199 cb(arg, success);
200 } else {
201 grpc_iomgr_add_delayed_callback(cb, arg, success);
202 }
203}
204
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800205static void make_callbacks(grpc_iomgr_closure *callbacks, size_t n, int success,
ctiller58393c22015-01-07 14:03:30 -0800206 int allow_synchronous_callback) {
207 size_t i;
208 for (i = 0; i < n; i++) {
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800209 make_callback(callbacks[i].cb, callbacks[i].cb_arg, success,
ctiller58393c22015-01-07 14:03:30 -0800210 allow_synchronous_callback);
211 }
212}
213
Craig Tillerf95e37f2015-02-18 15:15:29 -0800214static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure,
215 int allow_synchronous_callback) {
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800216 switch (gpr_atm_acq_load(st)) {
ctiller58393c22015-01-07 14:03:30 -0800217 case NOT_READY:
218 /* There is no race if the descriptor is already ready, so we skip
219 the interlocked op in that case. As long as the app doesn't
220 try to set the same upcall twice (which it shouldn't) then
221 oldval should never be anything other than READY or NOT_READY. We
222 don't
223 check for user error on the fast path. */
Craig Tillerf95e37f2015-02-18 15:15:29 -0800224 if (gpr_atm_rel_cas(st, NOT_READY, (gpr_intptr)closure)) {
ctiller58393c22015-01-07 14:03:30 -0800225 /* swap was successful -- the closure will run after the next
226 set_ready call. NOTE: we don't have an ABA problem here,
227 since we should never have concurrent calls to the same
228 notify_on function. */
Craig Tiller886d7ec2015-05-14 16:18:42 -0700229 maybe_wake_one_watcher(fd);
ctiller58393c22015-01-07 14:03:30 -0800230 return;
231 }
232 /* swap was unsuccessful due to an intervening set_ready call.
233 Fall through to the READY code below */
234 case READY:
David Klempner466423b2015-03-11 15:00:46 -0700235 assert(gpr_atm_no_barrier_load(st) == READY);
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800236 gpr_atm_rel_store(st, NOT_READY);
Craig Tillerf95e37f2015-02-18 15:15:29 -0800237 make_callback(closure->cb, closure->cb_arg,
238 !gpr_atm_acq_load(&fd->shutdown),
ctiller58393c22015-01-07 14:03:30 -0800239 allow_synchronous_callback);
240 return;
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800241 default: /* WAITING */
ctiller58393c22015-01-07 14:03:30 -0800242 /* upcallptr was set to a different closure. This is an error! */
243 gpr_log(GPR_ERROR,
244 "User called a notify_on function with a previous callback still "
245 "pending");
246 abort();
247 }
248 gpr_log(GPR_ERROR, "Corrupt memory in &st->state");
249 abort();
250}
251
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800252static void set_ready_locked(gpr_atm *st, grpc_iomgr_closure *callbacks,
ctiller58393c22015-01-07 14:03:30 -0800253 size_t *ncallbacks) {
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800254 gpr_intptr state = gpr_atm_acq_load(st);
ctiller58393c22015-01-07 14:03:30 -0800255
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800256 switch (state) {
257 case READY:
258 /* duplicate ready, ignore */
259 return;
ctiller58393c22015-01-07 14:03:30 -0800260 case NOT_READY:
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800261 if (gpr_atm_rel_cas(st, NOT_READY, READY)) {
ctiller58393c22015-01-07 14:03:30 -0800262 /* swap was successful -- the closure will run after the next
263 notify_on call. */
264 return;
265 }
Craig Tillerf95e37f2015-02-18 15:15:29 -0800266 /* swap was unsuccessful due to an intervening set_ready call.
267 Fall through to the WAITING code below */
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800268 state = gpr_atm_acq_load(st);
269 default: /* waiting */
David Klempner466423b2015-03-11 15:00:46 -0700270 assert(gpr_atm_no_barrier_load(st) != READY &&
271 gpr_atm_no_barrier_load(st) != NOT_READY);
Craig Tillerf95e37f2015-02-18 15:15:29 -0800272 callbacks[(*ncallbacks)++] = *(grpc_iomgr_closure *)state;
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800273 gpr_atm_rel_store(st, NOT_READY);
ctiller58393c22015-01-07 14:03:30 -0800274 return;
275 }
276}
277
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800278static void set_ready(grpc_fd *fd, gpr_atm *st,
ctiller58393c22015-01-07 14:03:30 -0800279 int allow_synchronous_callback) {
280 /* only one set_ready can be active at once (but there may be a racing
281 notify_on) */
282 int success;
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800283 grpc_iomgr_closure cb;
ctiller58393c22015-01-07 14:03:30 -0800284 size_t ncb = 0;
285 gpr_mu_lock(&fd->set_state_mu);
286 set_ready_locked(st, &cb, &ncb);
287 gpr_mu_unlock(&fd->set_state_mu);
288 success = !gpr_atm_acq_load(&fd->shutdown);
289 make_callbacks(&cb, ncb, success, allow_synchronous_callback);
290}
291
292void grpc_fd_shutdown(grpc_fd *fd) {
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800293 grpc_iomgr_closure cb[2];
ctiller58393c22015-01-07 14:03:30 -0800294 size_t ncb = 0;
295 gpr_mu_lock(&fd->set_state_mu);
David Klempner466423b2015-03-11 15:00:46 -0700296 GPR_ASSERT(!gpr_atm_no_barrier_load(&fd->shutdown));
ctiller58393c22015-01-07 14:03:30 -0800297 gpr_atm_rel_store(&fd->shutdown, 1);
298 set_ready_locked(&fd->readst, cb, &ncb);
299 set_ready_locked(&fd->writest, cb, &ncb);
300 gpr_mu_unlock(&fd->set_state_mu);
301 make_callbacks(cb, ncb, 0, 0);
302}
303
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800304void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_closure *closure) {
305 notify_on(fd, &fd->readst, closure, 0);
ctiller58393c22015-01-07 14:03:30 -0800306}
307
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800308void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_closure *closure) {
309 notify_on(fd, &fd->writest, closure, 0);
ctiller58393c22015-01-07 14:03:30 -0800310}
311
312gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
Craig Tiller7d413212015-02-09 08:00:02 -0800313 gpr_uint32 read_mask, gpr_uint32 write_mask,
314 grpc_fd_watcher *watcher) {
Craig Tiller886d7ec2015-05-14 16:18:42 -0700315 gpr_uint32 mask = 0;
ctiller58393c22015-01-07 14:03:30 -0800316 /* keep track of pollers that have requested our events, in case they change
317 */
Craig Tiller59ea16f2015-02-18 16:18:08 -0800318 grpc_fd_ref(fd);
319
ctiller58393c22015-01-07 14:03:30 -0800320 gpr_mu_lock(&fd->watcher_mu);
Craig Tiller886d7ec2015-05-14 16:18:42 -0700321 /* if there is nobody polling for read, but we need to, then start doing so */
322 if (!fd->read_watcher && gpr_atm_acq_load(&fd->readst) > READY) {
323 fd->read_watcher = watcher;
324 mask |= read_mask;
325 }
Craig Tiller8e50fe92015-05-18 10:45:04 -0700326 /* if there is nobody polling for write, but we need to, then start doing so
327 */
Craig Tiller886d7ec2015-05-14 16:18:42 -0700328 if (!fd->write_watcher && gpr_atm_acq_load(&fd->writest) > READY) {
329 fd->write_watcher = watcher;
330 mask |= write_mask;
331 }
332 /* if not polling, remember this watcher in case we need someone to later */
333 if (mask == 0) {
Craig Tiller354bf6d2015-05-18 10:18:03 -0700334 watcher->next = &fd->inactive_watcher_root;
Craig Tiller886d7ec2015-05-14 16:18:42 -0700335 watcher->prev = watcher->next->prev;
336 watcher->next->prev = watcher->prev->next = watcher;
337 }
Craig Tiller7d413212015-02-09 08:00:02 -0800338 watcher->pollset = pollset;
339 watcher->fd = fd;
ctiller58393c22015-01-07 14:03:30 -0800340 gpr_mu_unlock(&fd->watcher_mu);
341
Craig Tiller886d7ec2015-05-14 16:18:42 -0700342 return mask;
ctiller58393c22015-01-07 14:03:30 -0800343}
344
Craig Tiller886d7ec2015-05-14 16:18:42 -0700345void grpc_fd_end_poll(grpc_fd_watcher *watcher, int got_read, int got_write) {
346 int was_polling = 0;
347 int kick = 0;
348 grpc_fd *fd = watcher->fd;
Craig Tiller59ea16f2015-02-18 16:18:08 -0800349
Craig Tiller886d7ec2015-05-14 16:18:42 -0700350 gpr_mu_lock(&fd->watcher_mu);
351 if (watcher == fd->read_watcher) {
Craig Tiller236d0982015-05-18 10:26:44 -0700352 /* remove read watcher, kick if we still need a read */
Craig Tiller886d7ec2015-05-14 16:18:42 -0700353 was_polling = 1;
Craig Tiller8e50fe92015-05-18 10:45:04 -0700354 kick = kick || !got_read;
Craig Tiller886d7ec2015-05-14 16:18:42 -0700355 fd->read_watcher = NULL;
356 }
357 if (watcher == fd->write_watcher) {
Craig Tiller236d0982015-05-18 10:26:44 -0700358 /* remove write watcher, kick if we still need a write */
Craig Tiller886d7ec2015-05-14 16:18:42 -0700359 was_polling = 1;
Craig Tiller8e50fe92015-05-18 10:45:04 -0700360 kick = kick || !got_write;
Craig Tiller886d7ec2015-05-14 16:18:42 -0700361 fd->write_watcher = NULL;
362 }
363 if (!was_polling) {
Craig Tiller236d0982015-05-18 10:26:44 -0700364 /* remove from inactive list */
Craig Tiller886d7ec2015-05-14 16:18:42 -0700365 watcher->next->prev = watcher->prev;
366 watcher->prev->next = watcher->next;
367 }
368 if (kick) {
369 maybe_wake_one_watcher_locked(fd);
370 }
371 gpr_mu_unlock(&fd->watcher_mu);
372
373 grpc_fd_unref(fd);
ctiller58393c22015-01-07 14:03:30 -0800374}
375
376void grpc_fd_become_readable(grpc_fd *fd, int allow_synchronous_callback) {
377 set_ready(fd, &fd->readst, allow_synchronous_callback);
378}
379
380void grpc_fd_become_writable(grpc_fd *fd, int allow_synchronous_callback) {
381 set_ready(fd, &fd->writest, allow_synchronous_callback);
382}
Craig Tillerd14a1a52015-01-21 15:26:29 -0800383
Craig Tiller190d3602015-02-18 09:23:38 -0800384#endif