blob: 737ee016aabb94d6f052683c9413c5f45492cdff [file] [log] [blame]
ctiller58393c22015-01-07 14:03:30 -08001/*
2 *
3 * Copyright 2014, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Craig Tillerd14a1a52015-01-21 15:26:29 -080034#include <grpc/support/port_platform.h>
35
36#ifdef GPR_POSIX_SOCKET
37
ctiller58393c22015-01-07 14:03:30 -080038#include "src/core/iomgr/fd_posix.h"
39
40#include <assert.h>
41#include <unistd.h>
42
43#include "src/core/iomgr/iomgr_internal.h"
44#include <grpc/support/alloc.h>
45#include <grpc/support/log.h>
46#include <grpc/support/useful.h>
47
48enum descriptor_state { NOT_READY, READY, WAITING };
49
David Klempnerd1785242015-01-28 17:00:21 -080050/* We need to keep a freelist not because of any concerns of malloc performance
51 * but instead so that implementations with multiple threads in (for example)
52 * epoll_wait deal with the race between pollset removal and incoming poll
53 * notifications.
54 *
55 * The problem is that the poller ultimately holds a reference to this
56 * object, so it is very difficult to know when is safe to free it, at least
57 * without some expensive synchronization.
58 *
59 * If we keep the object freelisted, in the worst case losing this race just
60 * becomes a spurious read notification on a reused fd.
61 */
62/* TODO(klempner): We could use some form of polling generation count to know
63 * when these are safe to free. */
64/* TODO(klempner): Consider disabling freelisting if we don't have multiple
65 * threads in poll on the same fd */
66/* TODO(klempner): Batch these allocations to reduce fragmentation */
67static grpc_fd *fd_freelist = NULL;
68static gpr_mu fd_freelist_mu;
69
70static void freelist_fd(grpc_fd *fd) {
David Klempnerd1785242015-01-28 17:00:21 -080071 gpr_mu_lock(&fd_freelist_mu);
72 fd->freelist_next = fd_freelist;
73 fd_freelist = fd;
74 gpr_mu_unlock(&fd_freelist_mu);
75}
76
77static grpc_fd *alloc_fd(int fd) {
78 grpc_fd *r = NULL;
79 gpr_mu_lock(&fd_freelist_mu);
80 if (fd_freelist != NULL) {
81 r = fd_freelist;
82 fd_freelist = fd_freelist->freelist_next;
83 }
84 gpr_mu_unlock(&fd_freelist_mu);
85 if (r == NULL) {
86 r = gpr_malloc(sizeof(grpc_fd));
87 gpr_mu_init(&r->set_state_mu);
88 gpr_mu_init(&r->watcher_mu);
89 }
90 gpr_atm_rel_store(&r->refst, 1);
91 gpr_atm_rel_store(&r->readst.state, NOT_READY);
92 gpr_atm_rel_store(&r->writest.state, NOT_READY);
93 gpr_atm_rel_store(&r->shutdown, 0);
94 r->fd = fd;
Craig Tiller7d413212015-02-09 08:00:02 -080095 r->watcher_root.next = r->watcher_root.prev = &r->watcher_root;
David Klempnerd1785242015-01-28 17:00:21 -080096 r->freelist_next = NULL;
97 return r;
98}
99
100static void destroy(grpc_fd *fd) {
101 gpr_mu_destroy(&fd->set_state_mu);
102 gpr_mu_destroy(&fd->watcher_mu);
ctiller58393c22015-01-07 14:03:30 -0800103 gpr_free(fd);
ctiller58393c22015-01-07 14:03:30 -0800104}
105
106static void ref_by(grpc_fd *fd, int n) {
107 gpr_atm_no_barrier_fetch_add(&fd->refst, n);
108}
109
110static void unref_by(grpc_fd *fd, int n) {
111 if (gpr_atm_full_fetch_add(&fd->refst, -n) == n) {
David Klempnerd1785242015-01-28 17:00:21 -0800112 grpc_iomgr_add_callback(fd->on_done, fd->on_done_user_data);
113 freelist_fd(fd);
114 grpc_iomgr_unref();
115 }
116}
117
Craig Tiller7d413212015-02-09 08:00:02 -0800118void grpc_fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
David Klempnerd1785242015-01-28 17:00:21 -0800119
120void grpc_fd_global_shutdown(void) {
121 while (fd_freelist != NULL) {
122 grpc_fd *fd = fd_freelist;
123 fd_freelist = fd_freelist->freelist_next;
ctiller58393c22015-01-07 14:03:30 -0800124 destroy(fd);
125 }
David Klempnerd1785242015-01-28 17:00:21 -0800126 gpr_mu_destroy(&fd_freelist_mu);
ctiller58393c22015-01-07 14:03:30 -0800127}
128
129static void do_nothing(void *ignored, int success) {}
130
131grpc_fd *grpc_fd_create(int fd) {
David Klempnerd1785242015-01-28 17:00:21 -0800132 grpc_fd *r = alloc_fd(fd);
ctiller58393c22015-01-07 14:03:30 -0800133 grpc_iomgr_ref();
ctiller58393c22015-01-07 14:03:30 -0800134 grpc_pollset_add_fd(grpc_backup_pollset(), r);
135 return r;
136}
137
138int grpc_fd_is_orphaned(grpc_fd *fd) {
139 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
140}
141
142static void wake_watchers(grpc_fd *fd) {
Craig Tiller7d413212015-02-09 08:00:02 -0800143 grpc_fd_watcher *watcher;
ctiller58393c22015-01-07 14:03:30 -0800144 gpr_mu_lock(&fd->watcher_mu);
Craig Tiller7d413212015-02-09 08:00:02 -0800145 for (watcher = fd->watcher_root.next; watcher != &fd->watcher_root;
146 watcher = watcher->next) {
147 grpc_pollset_force_kick(watcher->pollset);
ctiller58393c22015-01-07 14:03:30 -0800148 }
149 gpr_mu_unlock(&fd->watcher_mu);
150}
151
152void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data) {
153 fd->on_done = on_done ? on_done : do_nothing;
154 fd->on_done_user_data = user_data;
155 ref_by(fd, 1); /* remove active status, but keep referenced */
156 wake_watchers(fd);
157 close(fd->fd);
158 unref_by(fd, 2); /* drop the reference */
159}
160
161/* increment refcount by two to avoid changing the orphan bit */
162void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
163
164void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
165
166typedef struct {
167 grpc_iomgr_cb_func cb;
168 void *arg;
169} callback;
170
171static void make_callback(grpc_iomgr_cb_func cb, void *arg, int success,
172 int allow_synchronous_callback) {
173 if (allow_synchronous_callback) {
174 cb(arg, success);
175 } else {
176 grpc_iomgr_add_delayed_callback(cb, arg, success);
177 }
178}
179
180static void make_callbacks(callback *callbacks, size_t n, int success,
181 int allow_synchronous_callback) {
182 size_t i;
183 for (i = 0; i < n; i++) {
184 make_callback(callbacks[i].cb, callbacks[i].arg, success,
185 allow_synchronous_callback);
186 }
187}
188
189static void notify_on(grpc_fd *fd, grpc_fd_state *st, grpc_iomgr_cb_func cb,
190 void *arg, int allow_synchronous_callback) {
191 switch ((enum descriptor_state)gpr_atm_acq_load(&st->state)) {
192 case NOT_READY:
193 /* There is no race if the descriptor is already ready, so we skip
194 the interlocked op in that case. As long as the app doesn't
195 try to set the same upcall twice (which it shouldn't) then
196 oldval should never be anything other than READY or NOT_READY. We
197 don't
198 check for user error on the fast path. */
199 st->cb = cb;
200 st->cb_arg = arg;
201 if (gpr_atm_rel_cas(&st->state, NOT_READY, WAITING)) {
202 /* swap was successful -- the closure will run after the next
203 set_ready call. NOTE: we don't have an ABA problem here,
204 since we should never have concurrent calls to the same
205 notify_on function. */
206 wake_watchers(fd);
207 return;
208 }
209 /* swap was unsuccessful due to an intervening set_ready call.
210 Fall through to the READY code below */
211 case READY:
212 assert(gpr_atm_acq_load(&st->state) == READY);
213 gpr_atm_rel_store(&st->state, NOT_READY);
214 make_callback(cb, arg, !gpr_atm_acq_load(&fd->shutdown),
215 allow_synchronous_callback);
216 return;
217 case WAITING:
218 /* upcallptr was set to a different closure. This is an error! */
219 gpr_log(GPR_ERROR,
220 "User called a notify_on function with a previous callback still "
221 "pending");
222 abort();
223 }
224 gpr_log(GPR_ERROR, "Corrupt memory in &st->state");
225 abort();
226}
227
228static void set_ready_locked(grpc_fd_state *st, callback *callbacks,
229 size_t *ncallbacks) {
230 callback *c;
231
232 switch ((enum descriptor_state)gpr_atm_acq_load(&st->state)) {
233 case NOT_READY:
234 if (gpr_atm_rel_cas(&st->state, NOT_READY, READY)) {
235 /* swap was successful -- the closure will run after the next
236 notify_on call. */
237 return;
238 }
239 /* swap was unsuccessful due to an intervening set_ready call.
240 Fall through to the WAITING code below */
241 case WAITING:
242 assert(gpr_atm_acq_load(&st->state) == WAITING);
243 c = &callbacks[(*ncallbacks)++];
244 c->cb = st->cb;
245 c->arg = st->cb_arg;
246 gpr_atm_rel_store(&st->state, NOT_READY);
247 return;
248 case READY:
249 /* duplicate ready, ignore */
250 return;
251 }
252}
253
254static void set_ready(grpc_fd *fd, grpc_fd_state *st,
255 int allow_synchronous_callback) {
256 /* only one set_ready can be active at once (but there may be a racing
257 notify_on) */
258 int success;
259 callback cb;
260 size_t ncb = 0;
261 gpr_mu_lock(&fd->set_state_mu);
262 set_ready_locked(st, &cb, &ncb);
263 gpr_mu_unlock(&fd->set_state_mu);
264 success = !gpr_atm_acq_load(&fd->shutdown);
265 make_callbacks(&cb, ncb, success, allow_synchronous_callback);
266}
267
268void grpc_fd_shutdown(grpc_fd *fd) {
269 callback cb[2];
270 size_t ncb = 0;
271 gpr_mu_lock(&fd->set_state_mu);
272 GPR_ASSERT(!gpr_atm_acq_load(&fd->shutdown));
273 gpr_atm_rel_store(&fd->shutdown, 1);
274 set_ready_locked(&fd->readst, cb, &ncb);
275 set_ready_locked(&fd->writest, cb, &ncb);
276 gpr_mu_unlock(&fd->set_state_mu);
277 make_callbacks(cb, ncb, 0, 0);
278}
279
280void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_cb_func read_cb,
281 void *read_cb_arg) {
282 notify_on(fd, &fd->readst, read_cb, read_cb_arg, 0);
283}
284
285void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_cb_func write_cb,
286 void *write_cb_arg) {
287 notify_on(fd, &fd->writest, write_cb, write_cb_arg, 0);
288}
289
290gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
Craig Tiller7d413212015-02-09 08:00:02 -0800291 gpr_uint32 read_mask, gpr_uint32 write_mask,
292 grpc_fd_watcher *watcher) {
ctiller58393c22015-01-07 14:03:30 -0800293 /* keep track of pollers that have requested our events, in case they change
294 */
295 gpr_mu_lock(&fd->watcher_mu);
Craig Tiller7d413212015-02-09 08:00:02 -0800296 watcher->next = &fd->watcher_root;
297 watcher->prev = watcher->next->prev;
298 watcher->next->prev = watcher->prev->next = watcher;
299 watcher->pollset = pollset;
300 watcher->fd = fd;
ctiller58393c22015-01-07 14:03:30 -0800301 gpr_mu_unlock(&fd->watcher_mu);
302
303 return (gpr_atm_acq_load(&fd->readst.state) != READY ? read_mask : 0) |
304 (gpr_atm_acq_load(&fd->writest.state) != READY ? write_mask : 0);
305}
306
Craig Tiller7d413212015-02-09 08:00:02 -0800307void grpc_fd_end_poll(grpc_fd_watcher *watcher) {
308 gpr_mu_lock(&watcher->fd->watcher_mu);
309 watcher->next->prev = watcher->prev;
310 watcher->prev->next = watcher->next;
311 gpr_mu_unlock(&watcher->fd->watcher_mu);
ctiller58393c22015-01-07 14:03:30 -0800312}
313
314void grpc_fd_become_readable(grpc_fd *fd, int allow_synchronous_callback) {
315 set_ready(fd, &fd->readst, allow_synchronous_callback);
316}
317
318void grpc_fd_become_writable(grpc_fd *fd, int allow_synchronous_callback) {
319 set_ready(fd, &fd->writest, allow_synchronous_callback);
320}
Craig Tillerd14a1a52015-01-21 15:26:29 -0800321
322#endif