blob: b67c6cde709602f9ed28783cd216724473e19639 [file] [log] [blame]
ctiller58393c22015-01-07 14:03:30 -08001/*
2 *
3 * Copyright 2014, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Craig Tillerd14a1a52015-01-21 15:26:29 -080034#include <grpc/support/port_platform.h>
35
36#ifdef GPR_POSIX_SOCKET
37
ctiller58393c22015-01-07 14:03:30 -080038#include "src/core/iomgr/fd_posix.h"
39
40#include <assert.h>
41#include <unistd.h>
42
43#include "src/core/iomgr/iomgr_internal.h"
44#include <grpc/support/alloc.h>
45#include <grpc/support/log.h>
46#include <grpc/support/useful.h>
47
48enum descriptor_state { NOT_READY, READY, WAITING };
49
David Klempnerd1785242015-01-28 17:00:21 -080050/* We need to keep a freelist not because of any concerns of malloc performance
51 * but instead so that implementations with multiple threads in (for example)
52 * epoll_wait deal with the race between pollset removal and incoming poll
53 * notifications.
54 *
55 * The problem is that the poller ultimately holds a reference to this
56 * object, so it is very difficult to know when is safe to free it, at least
57 * without some expensive synchronization.
58 *
59 * If we keep the object freelisted, in the worst case losing this race just
60 * becomes a spurious read notification on a reused fd.
61 */
62/* TODO(klempner): We could use some form of polling generation count to know
63 * when these are safe to free. */
64/* TODO(klempner): Consider disabling freelisting if we don't have multiple
65 * threads in poll on the same fd */
66/* TODO(klempner): Batch these allocations to reduce fragmentation */
67static grpc_fd *fd_freelist = NULL;
68static gpr_mu fd_freelist_mu;
69
70static void freelist_fd(grpc_fd *fd) {
ctiller58393c22015-01-07 14:03:30 -080071 gpr_free(fd->watchers);
David Klempnerd1785242015-01-28 17:00:21 -080072 gpr_mu_lock(&fd_freelist_mu);
73 fd->freelist_next = fd_freelist;
74 fd_freelist = fd;
75 gpr_mu_unlock(&fd_freelist_mu);
76}
77
78static grpc_fd *alloc_fd(int fd) {
79 grpc_fd *r = NULL;
80 gpr_mu_lock(&fd_freelist_mu);
81 if (fd_freelist != NULL) {
82 r = fd_freelist;
83 fd_freelist = fd_freelist->freelist_next;
84 }
85 gpr_mu_unlock(&fd_freelist_mu);
86 if (r == NULL) {
87 r = gpr_malloc(sizeof(grpc_fd));
88 gpr_mu_init(&r->set_state_mu);
89 gpr_mu_init(&r->watcher_mu);
90 }
91 gpr_atm_rel_store(&r->refst, 1);
92 gpr_atm_rel_store(&r->readst.state, NOT_READY);
93 gpr_atm_rel_store(&r->writest.state, NOT_READY);
94 gpr_atm_rel_store(&r->shutdown, 0);
95 r->fd = fd;
96 r->watchers = NULL;
97 r->watcher_count = 0;
98 r->watcher_capacity = 0;
99 r->freelist_next = NULL;
100 return r;
101}
102
103static void destroy(grpc_fd *fd) {
104 gpr_mu_destroy(&fd->set_state_mu);
105 gpr_mu_destroy(&fd->watcher_mu);
ctiller58393c22015-01-07 14:03:30 -0800106 gpr_free(fd);
ctiller58393c22015-01-07 14:03:30 -0800107}
108
109static void ref_by(grpc_fd *fd, int n) {
110 gpr_atm_no_barrier_fetch_add(&fd->refst, n);
111}
112
113static void unref_by(grpc_fd *fd, int n) {
114 if (gpr_atm_full_fetch_add(&fd->refst, -n) == n) {
David Klempnerd1785242015-01-28 17:00:21 -0800115 grpc_iomgr_add_callback(fd->on_done, fd->on_done_user_data);
116 freelist_fd(fd);
117 grpc_iomgr_unref();
118 }
119}
120
121void grpc_fd_global_init(void) {
122 gpr_mu_init(&fd_freelist_mu);
123}
124
125void grpc_fd_global_shutdown(void) {
126 while (fd_freelist != NULL) {
127 grpc_fd *fd = fd_freelist;
128 fd_freelist = fd_freelist->freelist_next;
ctiller58393c22015-01-07 14:03:30 -0800129 destroy(fd);
130 }
David Klempnerd1785242015-01-28 17:00:21 -0800131 gpr_mu_destroy(&fd_freelist_mu);
ctiller58393c22015-01-07 14:03:30 -0800132}
133
134static void do_nothing(void *ignored, int success) {}
135
136grpc_fd *grpc_fd_create(int fd) {
David Klempnerd1785242015-01-28 17:00:21 -0800137 grpc_fd *r = alloc_fd(fd);
ctiller58393c22015-01-07 14:03:30 -0800138 grpc_iomgr_ref();
ctiller58393c22015-01-07 14:03:30 -0800139 grpc_pollset_add_fd(grpc_backup_pollset(), r);
140 return r;
141}
142
143int grpc_fd_is_orphaned(grpc_fd *fd) {
144 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
145}
146
147static void wake_watchers(grpc_fd *fd) {
148 size_t i, n;
149 gpr_mu_lock(&fd->watcher_mu);
150 n = fd->watcher_count;
151 for (i = 0; i < n; i++) {
152 grpc_pollset_force_kick(fd->watchers[i]);
153 }
154 gpr_mu_unlock(&fd->watcher_mu);
155}
156
157void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data) {
158 fd->on_done = on_done ? on_done : do_nothing;
159 fd->on_done_user_data = user_data;
160 ref_by(fd, 1); /* remove active status, but keep referenced */
161 wake_watchers(fd);
162 close(fd->fd);
163 unref_by(fd, 2); /* drop the reference */
164}
165
166/* increment refcount by two to avoid changing the orphan bit */
167void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
168
169void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
170
171typedef struct {
172 grpc_iomgr_cb_func cb;
173 void *arg;
174} callback;
175
176static void make_callback(grpc_iomgr_cb_func cb, void *arg, int success,
177 int allow_synchronous_callback) {
178 if (allow_synchronous_callback) {
179 cb(arg, success);
180 } else {
181 grpc_iomgr_add_delayed_callback(cb, arg, success);
182 }
183}
184
185static void make_callbacks(callback *callbacks, size_t n, int success,
186 int allow_synchronous_callback) {
187 size_t i;
188 for (i = 0; i < n; i++) {
189 make_callback(callbacks[i].cb, callbacks[i].arg, success,
190 allow_synchronous_callback);
191 }
192}
193
194static void notify_on(grpc_fd *fd, grpc_fd_state *st, grpc_iomgr_cb_func cb,
195 void *arg, int allow_synchronous_callback) {
196 switch ((enum descriptor_state)gpr_atm_acq_load(&st->state)) {
197 case NOT_READY:
198 /* There is no race if the descriptor is already ready, so we skip
199 the interlocked op in that case. As long as the app doesn't
200 try to set the same upcall twice (which it shouldn't) then
201 oldval should never be anything other than READY or NOT_READY. We
202 don't
203 check for user error on the fast path. */
204 st->cb = cb;
205 st->cb_arg = arg;
206 if (gpr_atm_rel_cas(&st->state, NOT_READY, WAITING)) {
207 /* swap was successful -- the closure will run after the next
208 set_ready call. NOTE: we don't have an ABA problem here,
209 since we should never have concurrent calls to the same
210 notify_on function. */
211 wake_watchers(fd);
212 return;
213 }
214 /* swap was unsuccessful due to an intervening set_ready call.
215 Fall through to the READY code below */
216 case READY:
217 assert(gpr_atm_acq_load(&st->state) == READY);
218 gpr_atm_rel_store(&st->state, NOT_READY);
219 make_callback(cb, arg, !gpr_atm_acq_load(&fd->shutdown),
220 allow_synchronous_callback);
221 return;
222 case WAITING:
223 /* upcallptr was set to a different closure. This is an error! */
224 gpr_log(GPR_ERROR,
225 "User called a notify_on function with a previous callback still "
226 "pending");
227 abort();
228 }
229 gpr_log(GPR_ERROR, "Corrupt memory in &st->state");
230 abort();
231}
232
233static void set_ready_locked(grpc_fd_state *st, callback *callbacks,
234 size_t *ncallbacks) {
235 callback *c;
236
237 switch ((enum descriptor_state)gpr_atm_acq_load(&st->state)) {
238 case NOT_READY:
239 if (gpr_atm_rel_cas(&st->state, NOT_READY, READY)) {
240 /* swap was successful -- the closure will run after the next
241 notify_on call. */
242 return;
243 }
244 /* swap was unsuccessful due to an intervening set_ready call.
245 Fall through to the WAITING code below */
246 case WAITING:
247 assert(gpr_atm_acq_load(&st->state) == WAITING);
248 c = &callbacks[(*ncallbacks)++];
249 c->cb = st->cb;
250 c->arg = st->cb_arg;
251 gpr_atm_rel_store(&st->state, NOT_READY);
252 return;
253 case READY:
254 /* duplicate ready, ignore */
255 return;
256 }
257}
258
259static void set_ready(grpc_fd *fd, grpc_fd_state *st,
260 int allow_synchronous_callback) {
261 /* only one set_ready can be active at once (but there may be a racing
262 notify_on) */
263 int success;
264 callback cb;
265 size_t ncb = 0;
266 gpr_mu_lock(&fd->set_state_mu);
267 set_ready_locked(st, &cb, &ncb);
268 gpr_mu_unlock(&fd->set_state_mu);
269 success = !gpr_atm_acq_load(&fd->shutdown);
270 make_callbacks(&cb, ncb, success, allow_synchronous_callback);
271}
272
273void grpc_fd_shutdown(grpc_fd *fd) {
274 callback cb[2];
275 size_t ncb = 0;
276 gpr_mu_lock(&fd->set_state_mu);
277 GPR_ASSERT(!gpr_atm_acq_load(&fd->shutdown));
278 gpr_atm_rel_store(&fd->shutdown, 1);
279 set_ready_locked(&fd->readst, cb, &ncb);
280 set_ready_locked(&fd->writest, cb, &ncb);
281 gpr_mu_unlock(&fd->set_state_mu);
282 make_callbacks(cb, ncb, 0, 0);
283}
284
285void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_cb_func read_cb,
286 void *read_cb_arg) {
287 notify_on(fd, &fd->readst, read_cb, read_cb_arg, 0);
288}
289
290void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_cb_func write_cb,
291 void *write_cb_arg) {
292 notify_on(fd, &fd->writest, write_cb, write_cb_arg, 0);
293}
294
295gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
296 gpr_uint32 read_mask, gpr_uint32 write_mask) {
297 /* keep track of pollers that have requested our events, in case they change
298 */
299 gpr_mu_lock(&fd->watcher_mu);
300 if (fd->watcher_capacity == fd->watcher_count) {
301 fd->watcher_capacity =
302 GPR_MAX(fd->watcher_capacity + 8, fd->watcher_capacity * 3 / 2);
303 fd->watchers = gpr_realloc(fd->watchers,
304 fd->watcher_capacity * sizeof(grpc_pollset *));
305 }
306 fd->watchers[fd->watcher_count++] = pollset;
307 gpr_mu_unlock(&fd->watcher_mu);
308
309 return (gpr_atm_acq_load(&fd->readst.state) != READY ? read_mask : 0) |
310 (gpr_atm_acq_load(&fd->writest.state) != READY ? write_mask : 0);
311}
312
313void grpc_fd_end_poll(grpc_fd *fd, grpc_pollset *pollset) {
314 size_t r, w, n;
315
316 gpr_mu_lock(&fd->watcher_mu);
317 n = fd->watcher_count;
318 for (r = 0, w = 0; r < n; r++) {
319 if (fd->watchers[r] == pollset) {
320 fd->watcher_count--;
321 continue;
322 }
323 fd->watchers[w++] = fd->watchers[r];
324 }
325 gpr_mu_unlock(&fd->watcher_mu);
326}
327
328void grpc_fd_become_readable(grpc_fd *fd, int allow_synchronous_callback) {
329 set_ready(fd, &fd->readst, allow_synchronous_callback);
330}
331
332void grpc_fd_become_writable(grpc_fd *fd, int allow_synchronous_callback) {
333 set_ready(fd, &fd->writest, allow_synchronous_callback);
334}
Craig Tillerd14a1a52015-01-21 15:26:29 -0800335
336#endif