blob: abdd49bbda36b960fe1206a0e1a6095493eb08d7 [file] [log] [blame]
ctiller58393c22015-01-07 14:03:30 -08001/*
2 *
Craig Tiller06059952015-02-18 08:34:56 -08003 * Copyright 2015, Google Inc.
ctiller58393c22015-01-07 14:03:30 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Craig Tillerd14a1a52015-01-21 15:26:29 -080034#include <grpc/support/port_platform.h>
35
36#ifdef GPR_POSIX_SOCKET
37
ctiller58393c22015-01-07 14:03:30 -080038#include "src/core/iomgr/fd_posix.h"
39
40#include <assert.h>
David Klempnerc6bccc22015-02-24 17:33:05 -080041#include <sys/socket.h>
ctiller58393c22015-01-07 14:03:30 -080042#include <unistd.h>
43
44#include "src/core/iomgr/iomgr_internal.h"
45#include <grpc/support/alloc.h>
46#include <grpc/support/log.h>
47#include <grpc/support/useful.h>
48
Craig Tillerf95e37f2015-02-18 15:15:29 -080049enum descriptor_state {
50 NOT_READY = 0,
51 READY = 1
52}; /* or a pointer to a closure to call */
ctiller58393c22015-01-07 14:03:30 -080053
David Klempnerd1785242015-01-28 17:00:21 -080054/* We need to keep a freelist not because of any concerns of malloc performance
55 * but instead so that implementations with multiple threads in (for example)
56 * epoll_wait deal with the race between pollset removal and incoming poll
57 * notifications.
58 *
59 * The problem is that the poller ultimately holds a reference to this
60 * object, so it is very difficult to know when is safe to free it, at least
61 * without some expensive synchronization.
62 *
63 * If we keep the object freelisted, in the worst case losing this race just
64 * becomes a spurious read notification on a reused fd.
65 */
66/* TODO(klempner): We could use some form of polling generation count to know
67 * when these are safe to free. */
68/* TODO(klempner): Consider disabling freelisting if we don't have multiple
69 * threads in poll on the same fd */
70/* TODO(klempner): Batch these allocations to reduce fragmentation */
71static grpc_fd *fd_freelist = NULL;
72static gpr_mu fd_freelist_mu;
73
74static void freelist_fd(grpc_fd *fd) {
David Klempnerd1785242015-01-28 17:00:21 -080075 gpr_mu_lock(&fd_freelist_mu);
76 fd->freelist_next = fd_freelist;
77 fd_freelist = fd;
78 gpr_mu_unlock(&fd_freelist_mu);
79}
80
81static grpc_fd *alloc_fd(int fd) {
82 grpc_fd *r = NULL;
83 gpr_mu_lock(&fd_freelist_mu);
84 if (fd_freelist != NULL) {
85 r = fd_freelist;
86 fd_freelist = fd_freelist->freelist_next;
87 }
88 gpr_mu_unlock(&fd_freelist_mu);
89 if (r == NULL) {
90 r = gpr_malloc(sizeof(grpc_fd));
91 gpr_mu_init(&r->set_state_mu);
92 gpr_mu_init(&r->watcher_mu);
93 }
94 gpr_atm_rel_store(&r->refst, 1);
Craig Tiller0fcd53c2015-02-18 15:10:53 -080095 gpr_atm_rel_store(&r->readst, NOT_READY);
96 gpr_atm_rel_store(&r->writest, NOT_READY);
David Klempnerd1785242015-01-28 17:00:21 -080097 gpr_atm_rel_store(&r->shutdown, 0);
98 r->fd = fd;
Craig Tiller7d413212015-02-09 08:00:02 -080099 r->watcher_root.next = r->watcher_root.prev = &r->watcher_root;
David Klempnerd1785242015-01-28 17:00:21 -0800100 r->freelist_next = NULL;
101 return r;
102}
103
104static void destroy(grpc_fd *fd) {
105 gpr_mu_destroy(&fd->set_state_mu);
106 gpr_mu_destroy(&fd->watcher_mu);
ctiller58393c22015-01-07 14:03:30 -0800107 gpr_free(fd);
ctiller58393c22015-01-07 14:03:30 -0800108}
109
110static void ref_by(grpc_fd *fd, int n) {
Craig Tiller23139ae2015-02-17 15:46:13 -0800111 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
ctiller58393c22015-01-07 14:03:30 -0800112}
113
114static void unref_by(grpc_fd *fd, int n) {
Craig Tiller23139ae2015-02-17 15:46:13 -0800115 gpr_atm old = gpr_atm_full_fetch_add(&fd->refst, -n);
116 if (old == n) {
David Klempnerc6bccc22015-02-24 17:33:05 -0800117 close(fd->fd);
David Klempnerd1785242015-01-28 17:00:21 -0800118 grpc_iomgr_add_callback(fd->on_done, fd->on_done_user_data);
119 freelist_fd(fd);
120 grpc_iomgr_unref();
Craig Tiller23139ae2015-02-17 15:46:13 -0800121 } else {
122 GPR_ASSERT(old > n);
David Klempnerd1785242015-01-28 17:00:21 -0800123 }
124}
125
Craig Tiller7d413212015-02-09 08:00:02 -0800126void grpc_fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
David Klempnerd1785242015-01-28 17:00:21 -0800127
128void grpc_fd_global_shutdown(void) {
129 while (fd_freelist != NULL) {
130 grpc_fd *fd = fd_freelist;
131 fd_freelist = fd_freelist->freelist_next;
ctiller58393c22015-01-07 14:03:30 -0800132 destroy(fd);
133 }
David Klempnerd1785242015-01-28 17:00:21 -0800134 gpr_mu_destroy(&fd_freelist_mu);
ctiller58393c22015-01-07 14:03:30 -0800135}
136
137static void do_nothing(void *ignored, int success) {}
138
139grpc_fd *grpc_fd_create(int fd) {
David Klempnerd1785242015-01-28 17:00:21 -0800140 grpc_fd *r = alloc_fd(fd);
ctiller58393c22015-01-07 14:03:30 -0800141 grpc_iomgr_ref();
ctiller58393c22015-01-07 14:03:30 -0800142 grpc_pollset_add_fd(grpc_backup_pollset(), r);
143 return r;
144}
145
146int grpc_fd_is_orphaned(grpc_fd *fd) {
147 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
148}
149
150static void wake_watchers(grpc_fd *fd) {
Craig Tiller7d413212015-02-09 08:00:02 -0800151 grpc_fd_watcher *watcher;
ctiller58393c22015-01-07 14:03:30 -0800152 gpr_mu_lock(&fd->watcher_mu);
Craig Tiller7d413212015-02-09 08:00:02 -0800153 for (watcher = fd->watcher_root.next; watcher != &fd->watcher_root;
154 watcher = watcher->next) {
155 grpc_pollset_force_kick(watcher->pollset);
ctiller58393c22015-01-07 14:03:30 -0800156 }
157 gpr_mu_unlock(&fd->watcher_mu);
158}
159
160void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data) {
161 fd->on_done = on_done ? on_done : do_nothing;
162 fd->on_done_user_data = user_data;
David Klempnerc6bccc22015-02-24 17:33:05 -0800163 shutdown(fd->fd, SHUT_RDWR);
ctiller58393c22015-01-07 14:03:30 -0800164 ref_by(fd, 1); /* remove active status, but keep referenced */
165 wake_watchers(fd);
ctiller58393c22015-01-07 14:03:30 -0800166 unref_by(fd, 2); /* drop the reference */
167}
168
169/* increment refcount by two to avoid changing the orphan bit */
170void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
171
172void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
173
ctiller58393c22015-01-07 14:03:30 -0800174static void make_callback(grpc_iomgr_cb_func cb, void *arg, int success,
175 int allow_synchronous_callback) {
176 if (allow_synchronous_callback) {
177 cb(arg, success);
178 } else {
179 grpc_iomgr_add_delayed_callback(cb, arg, success);
180 }
181}
182
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800183static void make_callbacks(grpc_iomgr_closure *callbacks, size_t n, int success,
ctiller58393c22015-01-07 14:03:30 -0800184 int allow_synchronous_callback) {
185 size_t i;
186 for (i = 0; i < n; i++) {
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800187 make_callback(callbacks[i].cb, callbacks[i].cb_arg, success,
ctiller58393c22015-01-07 14:03:30 -0800188 allow_synchronous_callback);
189 }
190}
191
Craig Tillerf95e37f2015-02-18 15:15:29 -0800192static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure,
193 int allow_synchronous_callback) {
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800194 switch (gpr_atm_acq_load(st)) {
ctiller58393c22015-01-07 14:03:30 -0800195 case NOT_READY:
196 /* There is no race if the descriptor is already ready, so we skip
197 the interlocked op in that case. As long as the app doesn't
198 try to set the same upcall twice (which it shouldn't) then
199 oldval should never be anything other than READY or NOT_READY. We
200 don't
201 check for user error on the fast path. */
Craig Tillerf95e37f2015-02-18 15:15:29 -0800202 if (gpr_atm_rel_cas(st, NOT_READY, (gpr_intptr)closure)) {
ctiller58393c22015-01-07 14:03:30 -0800203 /* swap was successful -- the closure will run after the next
204 set_ready call. NOTE: we don't have an ABA problem here,
205 since we should never have concurrent calls to the same
206 notify_on function. */
207 wake_watchers(fd);
208 return;
209 }
210 /* swap was unsuccessful due to an intervening set_ready call.
211 Fall through to the READY code below */
212 case READY:
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800213 assert(gpr_atm_acq_load(st) == READY);
214 gpr_atm_rel_store(st, NOT_READY);
Craig Tillerf95e37f2015-02-18 15:15:29 -0800215 make_callback(closure->cb, closure->cb_arg,
216 !gpr_atm_acq_load(&fd->shutdown),
ctiller58393c22015-01-07 14:03:30 -0800217 allow_synchronous_callback);
218 return;
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800219 default: /* WAITING */
ctiller58393c22015-01-07 14:03:30 -0800220 /* upcallptr was set to a different closure. This is an error! */
221 gpr_log(GPR_ERROR,
222 "User called a notify_on function with a previous callback still "
223 "pending");
224 abort();
225 }
226 gpr_log(GPR_ERROR, "Corrupt memory in &st->state");
227 abort();
228}
229
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800230static void set_ready_locked(gpr_atm *st, grpc_iomgr_closure *callbacks,
ctiller58393c22015-01-07 14:03:30 -0800231 size_t *ncallbacks) {
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800232 gpr_intptr state = gpr_atm_acq_load(st);
ctiller58393c22015-01-07 14:03:30 -0800233
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800234 switch (state) {
235 case READY:
236 /* duplicate ready, ignore */
237 return;
ctiller58393c22015-01-07 14:03:30 -0800238 case NOT_READY:
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800239 if (gpr_atm_rel_cas(st, NOT_READY, READY)) {
ctiller58393c22015-01-07 14:03:30 -0800240 /* swap was successful -- the closure will run after the next
241 notify_on call. */
242 return;
243 }
Craig Tillerf95e37f2015-02-18 15:15:29 -0800244 /* swap was unsuccessful due to an intervening set_ready call.
245 Fall through to the WAITING code below */
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800246 state = gpr_atm_acq_load(st);
247 default: /* waiting */
Craig Tillerf95e37f2015-02-18 15:15:29 -0800248 assert(gpr_atm_acq_load(st) != READY &&
249 gpr_atm_acq_load(st) != NOT_READY);
250 callbacks[(*ncallbacks)++] = *(grpc_iomgr_closure *)state;
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800251 gpr_atm_rel_store(st, NOT_READY);
ctiller58393c22015-01-07 14:03:30 -0800252 return;
253 }
254}
255
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800256static void set_ready(grpc_fd *fd, gpr_atm *st,
ctiller58393c22015-01-07 14:03:30 -0800257 int allow_synchronous_callback) {
258 /* only one set_ready can be active at once (but there may be a racing
259 notify_on) */
260 int success;
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800261 grpc_iomgr_closure cb;
ctiller58393c22015-01-07 14:03:30 -0800262 size_t ncb = 0;
263 gpr_mu_lock(&fd->set_state_mu);
264 set_ready_locked(st, &cb, &ncb);
265 gpr_mu_unlock(&fd->set_state_mu);
266 success = !gpr_atm_acq_load(&fd->shutdown);
267 make_callbacks(&cb, ncb, success, allow_synchronous_callback);
268}
269
270void grpc_fd_shutdown(grpc_fd *fd) {
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800271 grpc_iomgr_closure cb[2];
ctiller58393c22015-01-07 14:03:30 -0800272 size_t ncb = 0;
273 gpr_mu_lock(&fd->set_state_mu);
274 GPR_ASSERT(!gpr_atm_acq_load(&fd->shutdown));
275 gpr_atm_rel_store(&fd->shutdown, 1);
276 set_ready_locked(&fd->readst, cb, &ncb);
277 set_ready_locked(&fd->writest, cb, &ncb);
278 gpr_mu_unlock(&fd->set_state_mu);
279 make_callbacks(cb, ncb, 0, 0);
280}
281
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800282void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_closure *closure) {
283 notify_on(fd, &fd->readst, closure, 0);
ctiller58393c22015-01-07 14:03:30 -0800284}
285
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800286void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_closure *closure) {
287 notify_on(fd, &fd->writest, closure, 0);
ctiller58393c22015-01-07 14:03:30 -0800288}
289
290gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
Craig Tiller7d413212015-02-09 08:00:02 -0800291 gpr_uint32 read_mask, gpr_uint32 write_mask,
292 grpc_fd_watcher *watcher) {
ctiller58393c22015-01-07 14:03:30 -0800293 /* keep track of pollers that have requested our events, in case they change
294 */
Craig Tiller59ea16f2015-02-18 16:18:08 -0800295 grpc_fd_ref(fd);
296
ctiller58393c22015-01-07 14:03:30 -0800297 gpr_mu_lock(&fd->watcher_mu);
Craig Tiller7d413212015-02-09 08:00:02 -0800298 watcher->next = &fd->watcher_root;
299 watcher->prev = watcher->next->prev;
300 watcher->next->prev = watcher->prev->next = watcher;
301 watcher->pollset = pollset;
302 watcher->fd = fd;
ctiller58393c22015-01-07 14:03:30 -0800303 gpr_mu_unlock(&fd->watcher_mu);
304
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800305 return (gpr_atm_acq_load(&fd->readst) != READY ? read_mask : 0) |
306 (gpr_atm_acq_load(&fd->writest) != READY ? write_mask : 0);
ctiller58393c22015-01-07 14:03:30 -0800307}
308
Craig Tiller7d413212015-02-09 08:00:02 -0800309void grpc_fd_end_poll(grpc_fd_watcher *watcher) {
310 gpr_mu_lock(&watcher->fd->watcher_mu);
311 watcher->next->prev = watcher->prev;
312 watcher->prev->next = watcher->next;
313 gpr_mu_unlock(&watcher->fd->watcher_mu);
Craig Tiller59ea16f2015-02-18 16:18:08 -0800314
315 grpc_fd_unref(watcher->fd);
ctiller58393c22015-01-07 14:03:30 -0800316}
317
318void grpc_fd_become_readable(grpc_fd *fd, int allow_synchronous_callback) {
319 set_ready(fd, &fd->readst, allow_synchronous_callback);
320}
321
322void grpc_fd_become_writable(grpc_fd *fd, int allow_synchronous_callback) {
323 set_ready(fd, &fd->writest, allow_synchronous_callback);
324}
Craig Tillerd14a1a52015-01-21 15:26:29 -0800325
Craig Tiller190d3602015-02-18 09:23:38 -0800326#endif