blob: e3571e8e280c8a8a916fecd8bad1f313c475954b [file] [log] [blame]
ctiller58393c22015-01-07 14:03:30 -08001/*
2 *
Craig Tiller06059952015-02-18 08:34:56 -08003 * Copyright 2015, Google Inc.
ctiller58393c22015-01-07 14:03:30 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Craig Tillerd14a1a52015-01-21 15:26:29 -080034#include <grpc/support/port_platform.h>
35
36#ifdef GPR_POSIX_SOCKET
37
ctiller58393c22015-01-07 14:03:30 -080038#include "src/core/iomgr/fd_posix.h"
39
40#include <assert.h>
41#include <unistd.h>
42
43#include "src/core/iomgr/iomgr_internal.h"
44#include <grpc/support/alloc.h>
45#include <grpc/support/log.h>
46#include <grpc/support/useful.h>
47
48enum descriptor_state { NOT_READY, READY, WAITING };
49
David Klempnerd1785242015-01-28 17:00:21 -080050/* We need to keep a freelist not because of any concerns of malloc performance
51 * but instead so that implementations with multiple threads in (for example)
52 * epoll_wait deal with the race between pollset removal and incoming poll
53 * notifications.
54 *
55 * The problem is that the poller ultimately holds a reference to this
56 * object, so it is very difficult to know when is safe to free it, at least
57 * without some expensive synchronization.
58 *
59 * If we keep the object freelisted, in the worst case losing this race just
60 * becomes a spurious read notification on a reused fd.
61 */
62/* TODO(klempner): We could use some form of polling generation count to know
63 * when these are safe to free. */
64/* TODO(klempner): Consider disabling freelisting if we don't have multiple
65 * threads in poll on the same fd */
66/* TODO(klempner): Batch these allocations to reduce fragmentation */
67static grpc_fd *fd_freelist = NULL;
68static gpr_mu fd_freelist_mu;
69
70static void freelist_fd(grpc_fd *fd) {
David Klempnerd1785242015-01-28 17:00:21 -080071 gpr_mu_lock(&fd_freelist_mu);
72 fd->freelist_next = fd_freelist;
73 fd_freelist = fd;
74 gpr_mu_unlock(&fd_freelist_mu);
75}
76
77static grpc_fd *alloc_fd(int fd) {
78 grpc_fd *r = NULL;
79 gpr_mu_lock(&fd_freelist_mu);
80 if (fd_freelist != NULL) {
81 r = fd_freelist;
82 fd_freelist = fd_freelist->freelist_next;
83 }
84 gpr_mu_unlock(&fd_freelist_mu);
85 if (r == NULL) {
86 r = gpr_malloc(sizeof(grpc_fd));
87 gpr_mu_init(&r->set_state_mu);
88 gpr_mu_init(&r->watcher_mu);
89 }
90 gpr_atm_rel_store(&r->refst, 1);
91 gpr_atm_rel_store(&r->readst.state, NOT_READY);
92 gpr_atm_rel_store(&r->writest.state, NOT_READY);
93 gpr_atm_rel_store(&r->shutdown, 0);
94 r->fd = fd;
Craig Tiller7d413212015-02-09 08:00:02 -080095 r->watcher_root.next = r->watcher_root.prev = &r->watcher_root;
David Klempnerd1785242015-01-28 17:00:21 -080096 r->freelist_next = NULL;
97 return r;
98}
99
100static void destroy(grpc_fd *fd) {
101 gpr_mu_destroy(&fd->set_state_mu);
102 gpr_mu_destroy(&fd->watcher_mu);
ctiller58393c22015-01-07 14:03:30 -0800103 gpr_free(fd);
ctiller58393c22015-01-07 14:03:30 -0800104}
105
106static void ref_by(grpc_fd *fd, int n) {
Craig Tiller23139ae2015-02-17 15:46:13 -0800107 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
ctiller58393c22015-01-07 14:03:30 -0800108}
109
110static void unref_by(grpc_fd *fd, int n) {
Craig Tiller23139ae2015-02-17 15:46:13 -0800111 gpr_atm old = gpr_atm_full_fetch_add(&fd->refst, -n);
112 if (old == n) {
David Klempnerd1785242015-01-28 17:00:21 -0800113 grpc_iomgr_add_callback(fd->on_done, fd->on_done_user_data);
114 freelist_fd(fd);
115 grpc_iomgr_unref();
Craig Tiller23139ae2015-02-17 15:46:13 -0800116 } else {
117 GPR_ASSERT(old > n);
David Klempnerd1785242015-01-28 17:00:21 -0800118 }
119}
120
Craig Tiller7d413212015-02-09 08:00:02 -0800121void grpc_fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
David Klempnerd1785242015-01-28 17:00:21 -0800122
123void grpc_fd_global_shutdown(void) {
124 while (fd_freelist != NULL) {
125 grpc_fd *fd = fd_freelist;
126 fd_freelist = fd_freelist->freelist_next;
ctiller58393c22015-01-07 14:03:30 -0800127 destroy(fd);
128 }
David Klempnerd1785242015-01-28 17:00:21 -0800129 gpr_mu_destroy(&fd_freelist_mu);
ctiller58393c22015-01-07 14:03:30 -0800130}
131
132static void do_nothing(void *ignored, int success) {}
133
134grpc_fd *grpc_fd_create(int fd) {
David Klempnerd1785242015-01-28 17:00:21 -0800135 grpc_fd *r = alloc_fd(fd);
ctiller58393c22015-01-07 14:03:30 -0800136 grpc_iomgr_ref();
ctiller58393c22015-01-07 14:03:30 -0800137 grpc_pollset_add_fd(grpc_backup_pollset(), r);
138 return r;
139}
140
141int grpc_fd_is_orphaned(grpc_fd *fd) {
142 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
143}
144
145static void wake_watchers(grpc_fd *fd) {
Craig Tiller7d413212015-02-09 08:00:02 -0800146 grpc_fd_watcher *watcher;
ctiller58393c22015-01-07 14:03:30 -0800147 gpr_mu_lock(&fd->watcher_mu);
Craig Tiller7d413212015-02-09 08:00:02 -0800148 for (watcher = fd->watcher_root.next; watcher != &fd->watcher_root;
149 watcher = watcher->next) {
150 grpc_pollset_force_kick(watcher->pollset);
ctiller58393c22015-01-07 14:03:30 -0800151 }
152 gpr_mu_unlock(&fd->watcher_mu);
153}
154
155void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data) {
156 fd->on_done = on_done ? on_done : do_nothing;
157 fd->on_done_user_data = user_data;
158 ref_by(fd, 1); /* remove active status, but keep referenced */
159 wake_watchers(fd);
160 close(fd->fd);
161 unref_by(fd, 2); /* drop the reference */
162}
163
164/* increment refcount by two to avoid changing the orphan bit */
165void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
166
167void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
168
169typedef struct {
170 grpc_iomgr_cb_func cb;
171 void *arg;
172} callback;
173
174static void make_callback(grpc_iomgr_cb_func cb, void *arg, int success,
175 int allow_synchronous_callback) {
176 if (allow_synchronous_callback) {
177 cb(arg, success);
178 } else {
179 grpc_iomgr_add_delayed_callback(cb, arg, success);
180 }
181}
182
183static void make_callbacks(callback *callbacks, size_t n, int success,
184 int allow_synchronous_callback) {
185 size_t i;
186 for (i = 0; i < n; i++) {
187 make_callback(callbacks[i].cb, callbacks[i].arg, success,
188 allow_synchronous_callback);
189 }
190}
191
192static void notify_on(grpc_fd *fd, grpc_fd_state *st, grpc_iomgr_cb_func cb,
193 void *arg, int allow_synchronous_callback) {
194 switch ((enum descriptor_state)gpr_atm_acq_load(&st->state)) {
195 case NOT_READY:
196 /* There is no race if the descriptor is already ready, so we skip
197 the interlocked op in that case. As long as the app doesn't
198 try to set the same upcall twice (which it shouldn't) then
199 oldval should never be anything other than READY or NOT_READY. We
200 don't
201 check for user error on the fast path. */
202 st->cb = cb;
203 st->cb_arg = arg;
204 if (gpr_atm_rel_cas(&st->state, NOT_READY, WAITING)) {
205 /* swap was successful -- the closure will run after the next
206 set_ready call. NOTE: we don't have an ABA problem here,
207 since we should never have concurrent calls to the same
208 notify_on function. */
209 wake_watchers(fd);
210 return;
211 }
212 /* swap was unsuccessful due to an intervening set_ready call.
213 Fall through to the READY code below */
214 case READY:
215 assert(gpr_atm_acq_load(&st->state) == READY);
216 gpr_atm_rel_store(&st->state, NOT_READY);
217 make_callback(cb, arg, !gpr_atm_acq_load(&fd->shutdown),
218 allow_synchronous_callback);
219 return;
220 case WAITING:
221 /* upcallptr was set to a different closure. This is an error! */
222 gpr_log(GPR_ERROR,
223 "User called a notify_on function with a previous callback still "
224 "pending");
225 abort();
226 }
227 gpr_log(GPR_ERROR, "Corrupt memory in &st->state");
228 abort();
229}
230
231static void set_ready_locked(grpc_fd_state *st, callback *callbacks,
232 size_t *ncallbacks) {
233 callback *c;
234
235 switch ((enum descriptor_state)gpr_atm_acq_load(&st->state)) {
236 case NOT_READY:
237 if (gpr_atm_rel_cas(&st->state, NOT_READY, READY)) {
238 /* swap was successful -- the closure will run after the next
239 notify_on call. */
240 return;
241 }
242 /* swap was unsuccessful due to an intervening set_ready call.
243 Fall through to the WAITING code below */
244 case WAITING:
245 assert(gpr_atm_acq_load(&st->state) == WAITING);
246 c = &callbacks[(*ncallbacks)++];
247 c->cb = st->cb;
248 c->arg = st->cb_arg;
249 gpr_atm_rel_store(&st->state, NOT_READY);
250 return;
251 case READY:
252 /* duplicate ready, ignore */
253 return;
254 }
255}
256
257static void set_ready(grpc_fd *fd, grpc_fd_state *st,
258 int allow_synchronous_callback) {
259 /* only one set_ready can be active at once (but there may be a racing
260 notify_on) */
261 int success;
262 callback cb;
263 size_t ncb = 0;
264 gpr_mu_lock(&fd->set_state_mu);
265 set_ready_locked(st, &cb, &ncb);
266 gpr_mu_unlock(&fd->set_state_mu);
267 success = !gpr_atm_acq_load(&fd->shutdown);
268 make_callbacks(&cb, ncb, success, allow_synchronous_callback);
269}
270
271void grpc_fd_shutdown(grpc_fd *fd) {
272 callback cb[2];
273 size_t ncb = 0;
274 gpr_mu_lock(&fd->set_state_mu);
275 GPR_ASSERT(!gpr_atm_acq_load(&fd->shutdown));
276 gpr_atm_rel_store(&fd->shutdown, 1);
277 set_ready_locked(&fd->readst, cb, &ncb);
278 set_ready_locked(&fd->writest, cb, &ncb);
279 gpr_mu_unlock(&fd->set_state_mu);
280 make_callbacks(cb, ncb, 0, 0);
281}
282
283void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_cb_func read_cb,
284 void *read_cb_arg) {
285 notify_on(fd, &fd->readst, read_cb, read_cb_arg, 0);
286}
287
288void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_cb_func write_cb,
289 void *write_cb_arg) {
290 notify_on(fd, &fd->writest, write_cb, write_cb_arg, 0);
291}
292
293gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
Craig Tiller7d413212015-02-09 08:00:02 -0800294 gpr_uint32 read_mask, gpr_uint32 write_mask,
295 grpc_fd_watcher *watcher) {
ctiller58393c22015-01-07 14:03:30 -0800296 /* keep track of pollers that have requested our events, in case they change
297 */
298 gpr_mu_lock(&fd->watcher_mu);
Craig Tiller7d413212015-02-09 08:00:02 -0800299 watcher->next = &fd->watcher_root;
300 watcher->prev = watcher->next->prev;
301 watcher->next->prev = watcher->prev->next = watcher;
302 watcher->pollset = pollset;
303 watcher->fd = fd;
ctiller58393c22015-01-07 14:03:30 -0800304 gpr_mu_unlock(&fd->watcher_mu);
305
306 return (gpr_atm_acq_load(&fd->readst.state) != READY ? read_mask : 0) |
307 (gpr_atm_acq_load(&fd->writest.state) != READY ? write_mask : 0);
308}
309
Craig Tiller7d413212015-02-09 08:00:02 -0800310void grpc_fd_end_poll(grpc_fd_watcher *watcher) {
311 gpr_mu_lock(&watcher->fd->watcher_mu);
312 watcher->next->prev = watcher->prev;
313 watcher->prev->next = watcher->next;
314 gpr_mu_unlock(&watcher->fd->watcher_mu);
ctiller58393c22015-01-07 14:03:30 -0800315}
316
317void grpc_fd_become_readable(grpc_fd *fd, int allow_synchronous_callback) {
318 set_ready(fd, &fd->readst, allow_synchronous_callback);
319}
320
321void grpc_fd_become_writable(grpc_fd *fd, int allow_synchronous_callback) {
322 set_ready(fd, &fd->writest, allow_synchronous_callback);
323}
Craig Tillerd14a1a52015-01-21 15:26:29 -0800324
Craig Tiller190d3602015-02-18 09:23:38 -0800325#endif