blob: df1b9f0b548cedae09f2f51c2eeb37328b208889 [file] [log] [blame]
David Garcia Quintas4bc34632015-10-07 16:12:35 -07001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2015 gRPC authors.
David Garcia Quintas4bc34632015-10-07 16:12:35 -07004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
David Garcia Quintas4bc34632015-10-07 16:12:35 -07008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
David Garcia Quintas4bc34632015-10-07 16:12:35 -070010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
David Garcia Quintas4bc34632015-10-07 16:12:35 -070016 *
17 */
18
Craig Tiller9533d042016-03-25 17:11:06 -070019#include "src/core/lib/iomgr/executor.h"
David Garcia Quintas4bc34632015-10-07 16:12:35 -070020
21#include <string.h>
22
23#include <grpc/support/alloc.h>
Craig Tiller3e9f98e2017-05-12 13:17:47 -070024#include <grpc/support/cpu.h>
David Garcia Quintas4bc34632015-10-07 16:12:35 -070025#include <grpc/support/log.h>
26#include <grpc/support/sync.h>
27#include <grpc/support/thd.h>
Craig Tiller3e9f98e2017-05-12 13:17:47 -070028#include <grpc/support/tls.h>
29#include <grpc/support/useful.h>
30
Craig Tiller9533d042016-03-25 17:11:06 -070031#include "src/core/lib/iomgr/exec_ctx.h"
Craig Tiller3e9f98e2017-05-12 13:17:47 -070032#include "src/core/lib/support/spinlock.h"
David Garcia Quintas4bc34632015-10-07 16:12:35 -070033
Craig Tiller1ed31182017-05-24 16:42:35 -070034#define MAX_DEPTH 2
Craig Tiller3e9f98e2017-05-12 13:17:47 -070035
36typedef struct {
David Garcia Quintas4bc34632015-10-07 16:12:35 -070037 gpr_mu mu;
Craig Tiller3e9f98e2017-05-12 13:17:47 -070038 gpr_cv cv;
39 grpc_closure_list elems;
40 size_t depth;
41 bool shutdown;
Craig Tillerc2fb83e2017-07-18 12:38:25 -070042 bool queued_long_job;
Craig Tiller3e9f98e2017-05-12 13:17:47 -070043 gpr_thd_id id;
44} thread_state;
David Garcia Quintas4bc34632015-10-07 16:12:35 -070045
Craig Tiller3e9f98e2017-05-12 13:17:47 -070046static thread_state *g_thread_state;
47static size_t g_max_threads;
48static gpr_atm g_cur_threads;
49static gpr_spinlock g_adding_thread_lock = GPR_SPINLOCK_STATIC_INITIALIZER;
50
51GPR_TLS_DECL(g_this_thread_state);
52
Craig Tilleraf723b02017-07-17 17:56:28 -070053static grpc_tracer_flag executor_trace =
54 GRPC_TRACER_INITIALIZER(false, "executor");
55
Craig Tiller3e9f98e2017-05-12 13:17:47 -070056static void executor_thread(void *arg);
David Garcia Quintas4bc34632015-10-07 16:12:35 -070057
Craig Tiller3e9f98e2017-05-12 13:17:47 -070058static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) {
59 size_t n = 0;
David Garcia Quintas4bc34632015-10-07 16:12:35 -070060
Craig Tiller3e9f98e2017-05-12 13:17:47 -070061 grpc_closure *c = list.head;
Craig Tiller061ef742016-12-29 10:54:09 -080062 while (c != NULL) {
63 grpc_closure *next = c->next_data.next;
64 grpc_error *error = c->error_data.error;
Craig Tilleraf723b02017-07-17 17:56:28 -070065 if (GRPC_TRACER_ON(executor_trace)) {
66#ifndef NDEBUG
67 gpr_log(GPR_DEBUG, "EXECUTOR: run %p [created by %s:%d]", c,
68 c->file_created, c->line_created);
69#else
70 gpr_log(GPR_DEBUG, "EXECUTOR: run %p", c);
71#endif
72 }
ncteisenf8061e82017-06-09 10:44:42 -070073#ifndef NDEBUG
Craig Tillerb9b01ce2017-05-12 13:47:10 -070074 c->scheduled = false;
Mark D. Roth43f774e2017-04-04 16:35:37 -070075#endif
Craig Tiller0b093412017-01-03 09:49:07 -080076 c->cb(exec_ctx, c->cb_arg, error);
Craig Tiller061ef742016-12-29 10:54:09 -080077 GRPC_ERROR_UNREF(error);
78 c = next;
Craig Tillerf21acdd2017-06-08 08:09:35 -070079 n++;
Craig Tiller061ef742016-12-29 10:54:09 -080080 }
Craig Tiller3e9f98e2017-05-12 13:17:47 -070081
82 return n;
83}
84
Craig Tiller5e56f002017-05-16 15:02:50 -070085bool grpc_executor_is_threaded() {
86 return gpr_atm_no_barrier_load(&g_cur_threads) > 0;
87}
88
89void grpc_executor_set_threading(grpc_exec_ctx *exec_ctx, bool threading) {
90 gpr_atm cur_threads = gpr_atm_no_barrier_load(&g_cur_threads);
91 if (threading) {
92 if (cur_threads > 0) return;
93 g_max_threads = GPR_MAX(1, 2 * gpr_cpu_num_cores());
94 gpr_atm_no_barrier_store(&g_cur_threads, 1);
95 gpr_tls_init(&g_this_thread_state);
96 g_thread_state = gpr_zalloc(sizeof(thread_state) * g_max_threads);
97 for (size_t i = 0; i < g_max_threads; i++) {
98 gpr_mu_init(&g_thread_state[i].mu);
99 gpr_cv_init(&g_thread_state[i].cv);
100 g_thread_state[i].elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
101 }
102
103 gpr_thd_options opt = gpr_thd_options_default();
104 gpr_thd_options_set_joinable(&opt);
105 gpr_thd_new(&g_thread_state[0].id, executor_thread, &g_thread_state[0],
106 &opt);
107 } else {
108 if (cur_threads == 0) return;
109 for (size_t i = 0; i < g_max_threads; i++) {
110 gpr_mu_lock(&g_thread_state[i].mu);
111 g_thread_state[i].shutdown = true;
112 gpr_cv_signal(&g_thread_state[i].cv);
113 gpr_mu_unlock(&g_thread_state[i].mu);
114 }
Craig Tillerf7c8c9f2017-05-17 15:22:05 -0700115 /* ensure no thread is adding a new thread... once this is past, then
116 no thread will try to add a new one either (since shutdown is true) */
117 gpr_spinlock_lock(&g_adding_thread_lock);
118 gpr_spinlock_unlock(&g_adding_thread_lock);
Craig Tiller5e56f002017-05-16 15:02:50 -0700119 for (gpr_atm i = 0; i < g_cur_threads; i++) {
120 gpr_thd_join(g_thread_state[i].id);
121 }
122 gpr_atm_no_barrier_store(&g_cur_threads, 0);
123 for (size_t i = 0; i < g_max_threads; i++) {
124 gpr_mu_destroy(&g_thread_state[i].mu);
125 gpr_cv_destroy(&g_thread_state[i].cv);
126 run_closures(exec_ctx, g_thread_state[i].elems);
127 }
128 gpr_free(g_thread_state);
129 gpr_tls_destroy(&g_this_thread_state);
David Garcia Quintas4bc34632015-10-07 16:12:35 -0700130 }
Craig Tiller5e56f002017-05-16 15:02:50 -0700131}
132
133void grpc_executor_init(grpc_exec_ctx *exec_ctx) {
Craig Tilleraf723b02017-07-17 17:56:28 -0700134 grpc_register_tracer(&executor_trace);
Craig Tillerb9b01ce2017-05-12 13:47:10 -0700135 gpr_atm_no_barrier_store(&g_cur_threads, 0);
Craig Tiller5e56f002017-05-16 15:02:50 -0700136 grpc_executor_set_threading(exec_ctx, true);
137}
138
139void grpc_executor_shutdown(grpc_exec_ctx *exec_ctx) {
140 grpc_executor_set_threading(exec_ctx, false);
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700141}
142
143static void executor_thread(void *arg) {
144 thread_state *ts = arg;
145 gpr_tls_set(&g_this_thread_state, (intptr_t)ts);
146
Craig Tiller89962082017-05-12 14:30:42 -0700147 grpc_exec_ctx exec_ctx =
148 GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL);
149
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700150 size_t subtract_depth = 0;
151 for (;;) {
Craig Tilleraf723b02017-07-17 17:56:28 -0700152 if (GRPC_TRACER_ON(executor_trace)) {
153 gpr_log(GPR_DEBUG,
154 "EXECUTOR[%" PRIdPTR "]: step (sub_depth=%" PRIdPTR ")",
155 ts - g_thread_state, subtract_depth);
156 }
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700157 gpr_mu_lock(&ts->mu);
158 ts->depth -= subtract_depth;
159 while (grpc_closure_list_empty(ts->elems) && !ts->shutdown) {
160 gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_REALTIME));
161 }
162 if (ts->shutdown) {
Craig Tilleraf723b02017-07-17 17:56:28 -0700163 if (GRPC_TRACER_ON(executor_trace)) {
164 gpr_log(GPR_DEBUG, "EXECUTOR[%" PRIdPTR "]: shutdown",
165 ts - g_thread_state);
166 }
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700167 gpr_mu_unlock(&ts->mu);
168 break;
169 }
Craig Tillerc2fb83e2017-07-18 12:38:25 -0700170 ts->queued_long_job = false;
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700171 grpc_closure_list exec = ts->elems;
172 ts->elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
173 gpr_mu_unlock(&ts->mu);
Craig Tilleraf723b02017-07-17 17:56:28 -0700174 if (GRPC_TRACER_ON(executor_trace)) {
175 gpr_log(GPR_DEBUG, "EXECUTOR[%" PRIdPTR "]: execute",
176 ts - g_thread_state);
177 }
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700178
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700179 subtract_depth = run_closures(&exec_ctx, exec);
Craig Tiller89962082017-05-12 14:30:42 -0700180 grpc_exec_ctx_flush(&exec_ctx);
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700181 }
Craig Tiller89962082017-05-12 14:30:42 -0700182 grpc_exec_ctx_finish(&exec_ctx);
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700183}
184
185static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
Craig Tiller7a82afd2017-07-18 09:40:40 -0700186 grpc_error *error, bool is_short) {
Craig Tiller61f96c12017-05-12 13:36:39 -0700187 size_t cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads);
Craig Tillerb9b01ce2017-05-12 13:47:10 -0700188 if (cur_thread_count == 0) {
Craig Tilleraf723b02017-07-17 17:56:28 -0700189 if (GRPC_TRACER_ON(executor_trace)) {
Craig Tillerc2fb83e2017-07-18 12:38:25 -0700190#ifndef NDEBUG
Craig Tilleraf723b02017-07-17 17:56:28 -0700191 gpr_log(GPR_DEBUG, "EXECUTOR: schedule %p inline", closure);
Craig Tillerc2fb83e2017-07-18 12:38:25 -0700192#else
193 gpr_log(GPR_DEBUG, "EXECUTOR: schedule %p (created %s:%d) inline",
194 closure, closure->file_created, closure->line_created);
195#endif
Craig Tilleraf723b02017-07-17 17:56:28 -0700196 }
Craig Tillerb9b01ce2017-05-12 13:47:10 -0700197 grpc_closure_list_append(&exec_ctx->closure_list, closure, error);
198 return;
199 }
200 thread_state *ts = (thread_state *)gpr_tls_get(&g_this_thread_state);
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700201 if (ts == NULL) {
Craig Tiller61f96c12017-05-12 13:36:39 -0700202 ts = &g_thread_state[GPR_HASH_POINTER(exec_ctx, cur_thread_count)];
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700203 }
Craig Tillerc2fb83e2017-07-18 12:38:25 -0700204 thread_state *orig_ts = ts;
205
206 bool try_new_thread;
207 for (;;) {
208 if (GRPC_TRACER_ON(executor_trace)) {
209#ifndef NDEBUG
210 gpr_log(GPR_DEBUG,
211 "EXECUTOR: try to schedule %p (%s) (created %s:%d) to thread "
212 "%" PRIdPTR,
213 closure, is_short ? "short" : "long", closure->file_created,
214 closure->line_created, ts - g_thread_state);
215#else
216 gpr_log(GPR_DEBUG,
217 "EXECUTOR: try to schedule %p (%s) to thread %" PRIdPTR, closure,
218 is_short ? "short" : "long", ts - g_thread_state);
219#endif
220 }
221 gpr_mu_lock(&ts->mu);
222 if (ts->queued_long_job) {
223 gpr_mu_unlock(&ts->mu);
224 intptr_t idx = ts - g_thread_state;
225 ts = &g_thread_state[(idx + 1) % g_cur_threads];
226 if (ts == orig_ts) {
227 // wtf to do here
228 abort();
229 }
230 continue;
231 }
232 if (grpc_closure_list_empty(ts->elems)) {
233 gpr_cv_signal(&ts->cv);
234 }
235 grpc_closure_list_append(&ts->elems, closure, error);
236 ts->depth++;
237 try_new_thread = ts->depth > MAX_DEPTH &&
238 cur_thread_count < g_max_threads && !ts->shutdown;
239 if (!is_short) ts->queued_long_job = true;
240 gpr_mu_unlock(&ts->mu);
241 break;
Craig Tilleraf723b02017-07-17 17:56:28 -0700242 }
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700243 if (try_new_thread && gpr_spinlock_trylock(&g_adding_thread_lock)) {
Craig Tiller61f96c12017-05-12 13:36:39 -0700244 cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads);
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700245 if (cur_thread_count < g_max_threads) {
246 gpr_atm_no_barrier_store(&g_cur_threads, cur_thread_count + 1);
247
248 gpr_thd_options opt = gpr_thd_options_default();
249 gpr_thd_options_set_joinable(&opt);
250 gpr_thd_new(&g_thread_state[cur_thread_count].id, executor_thread,
251 &g_thread_state[cur_thread_count], &opt);
252 }
253 gpr_spinlock_unlock(&g_adding_thread_lock);
254 }
David Garcia Quintas4bc34632015-10-07 16:12:35 -0700255}
Craig Tiller91031da2016-12-28 15:44:25 -0800256
Craig Tiller7a82afd2017-07-18 09:40:40 -0700257static void executor_push_short(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
258 grpc_error *error) {
259 executor_push(exec_ctx, closure, error, true);
260}
261
262static void executor_push_long(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
263 grpc_error *error) {
264 executor_push(exec_ctx, closure, error, false);
265}
266
267static const grpc_closure_scheduler_vtable executor_vtable_short = {
268 executor_push_short, executor_push_short, "executor"};
269static grpc_closure_scheduler executor_scheduler_short = {
270 &executor_vtable_short};
271
272static const grpc_closure_scheduler_vtable executor_vtable_long = {
273 executor_push_long, executor_push_long, "executor"};
274static grpc_closure_scheduler executor_scheduler_long = {&executor_vtable_long};
275
276grpc_closure_scheduler *grpc_executor_scheduler(
277 grpc_executor_job_length length) {
278 return length == GRPC_EXECUTOR_SHORT ? &executor_scheduler_short
279 : &executor_scheduler_long;
280}