blob: 03f56a50a32003bead4c78a73c510c2cc4ec62b8 [file] [log] [blame]
ctiller58393c22015-01-07 14:03:30 -08001/*
2 *
3 * Copyright 2014, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/iomgr/iomgr.h"
35
36#include <stdlib.h>
37
38#include "src/core/iomgr/iomgr_internal.h"
39#include "src/core/iomgr/alarm_internal.h"
40#include <grpc/support/alloc.h>
41#include <grpc/support/log.h>
42#include <grpc/support/thd.h>
43#include <grpc/support/sync.h>
44
45typedef struct delayed_callback {
46 grpc_iomgr_cb_func cb;
47 void *cb_arg;
48 int success;
49 struct delayed_callback *next;
50} delayed_callback;
51
52static gpr_mu g_mu;
53static gpr_cv g_cv;
54static delayed_callback *g_cbs_head = NULL;
55static delayed_callback *g_cbs_tail = NULL;
56static int g_shutdown;
57static int g_refs;
58static gpr_event g_background_callback_executor_done;
59
60/* Execute followup callbacks continuously.
61 Other threads may check in and help during pollset_work() */
62static void background_callback_executor(void *ignored) {
63 gpr_mu_lock(&g_mu);
64 while (!g_shutdown) {
65 gpr_timespec deadline = gpr_inf_future;
66 if (g_cbs_head) {
67 delayed_callback *cb = g_cbs_head;
68 g_cbs_head = cb->next;
69 if (!g_cbs_head) g_cbs_tail = NULL;
70 gpr_mu_unlock(&g_mu);
71 cb->cb(cb->cb_arg, cb->success);
72 gpr_free(cb);
73 gpr_mu_lock(&g_mu);
74 } else if (grpc_alarm_check(&g_mu, gpr_now(), &deadline)) {
75 } else {
76 gpr_cv_wait(&g_cv, &g_mu, deadline);
77 }
78 }
79 gpr_mu_unlock(&g_mu);
80 gpr_event_set(&g_background_callback_executor_done, (void *)1);
81}
82
83void grpc_kick_poller() { gpr_cv_broadcast(&g_cv); }
84
85void grpc_iomgr_init() {
86 gpr_thd_id id;
87 gpr_mu_init(&g_mu);
88 gpr_cv_init(&g_cv);
89 grpc_alarm_list_init(gpr_now());
90 g_refs = 0;
91 grpc_iomgr_platform_init();
92 gpr_event_init(&g_background_callback_executor_done);
93 gpr_thd_new(&id, background_callback_executor, NULL, NULL);
94}
95
96void grpc_iomgr_shutdown() {
97 delayed_callback *cb;
98 gpr_timespec shutdown_deadline =
99 gpr_time_add(gpr_now(), gpr_time_from_seconds(10));
100
101 grpc_iomgr_platform_shutdown();
102
103 gpr_mu_lock(&g_mu);
104 g_shutdown = 1;
105 while (g_cbs_head || g_refs) {
106 gpr_log(GPR_DEBUG, "Waiting for %d iomgr objects to be destroyed%s", g_refs,
107 g_cbs_head ? " and executing final callbacks" : "");
108 while (g_cbs_head) {
109 cb = g_cbs_head;
110 g_cbs_head = cb->next;
111 if (!g_cbs_head) g_cbs_tail = NULL;
112 gpr_mu_unlock(&g_mu);
113
114 cb->cb(cb->cb_arg, 0);
115 gpr_free(cb);
116 gpr_mu_lock(&g_mu);
117 }
118 if (g_refs) {
119 if (gpr_cv_wait(&g_cv, &g_mu, shutdown_deadline) && g_cbs_head == NULL) {
120 gpr_log(GPR_DEBUG,
121 "Failed to free %d iomgr objects before shutdown deadline: "
122 "memory leaks are likely",
123 g_refs);
124 break;
125 }
126 }
127 }
128 gpr_mu_unlock(&g_mu);
129
130 gpr_event_wait(&g_background_callback_executor_done, gpr_inf_future);
131
132 grpc_alarm_list_shutdown();
133 gpr_mu_destroy(&g_mu);
134 gpr_cv_destroy(&g_cv);
135}
136
137void grpc_iomgr_ref() {
138 gpr_mu_lock(&g_mu);
139 ++g_refs;
140 gpr_mu_unlock(&g_mu);
141}
142
143void grpc_iomgr_unref() {
144 gpr_mu_lock(&g_mu);
145 if (0 == --g_refs) {
146 gpr_cv_signal(&g_cv);
147 }
148 gpr_mu_unlock(&g_mu);
149}
150
151void grpc_iomgr_add_delayed_callback(grpc_iomgr_cb_func cb, void *cb_arg,
152 int success) {
153 delayed_callback *dcb = gpr_malloc(sizeof(delayed_callback));
154 dcb->cb = cb;
155 dcb->cb_arg = cb_arg;
156 dcb->success = success;
157 gpr_mu_lock(&g_mu);
158 dcb->next = NULL;
159 if (!g_cbs_tail) {
160 g_cbs_head = g_cbs_tail = dcb;
161 } else {
162 g_cbs_tail->next = dcb;
163 g_cbs_tail = dcb;
164 }
165 gpr_cv_signal(&g_cv);
166 gpr_mu_unlock(&g_mu);
167}
168
169void grpc_iomgr_add_callback(grpc_iomgr_cb_func cb, void *cb_arg) {
170 grpc_iomgr_add_delayed_callback(cb, cb_arg, 1);
171}
172
173int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success) {
174 int n = 0;
175 gpr_mu *retake_mu = NULL;
176 delayed_callback *cb;
177 for (;;) {
178 /* check for new work */
179 if (!gpr_mu_trylock(&g_mu)) {
180 break;
181 }
182 cb = g_cbs_head;
183 if (!cb) {
184 gpr_mu_unlock(&g_mu);
185 break;
186 }
187 g_cbs_head = cb->next;
188 if (!g_cbs_head) g_cbs_tail = NULL;
189 gpr_mu_unlock(&g_mu);
190 /* if we have a mutex to drop, do so before executing work */
191 if (drop_mu) {
192 gpr_mu_unlock(drop_mu);
193 retake_mu = drop_mu;
194 drop_mu = NULL;
195 }
196 cb->cb(cb->cb_arg, success && cb->success);
197 gpr_free(cb);
198 n++;
199 }
200 if (retake_mu) {
201 gpr_mu_lock(retake_mu);
202 }
203 return n;
204}