blob: e5f9764f5f6c84bf0358bf3d9fd92728d7a2d85d [file] [log] [blame]
David Garcia Quintas4bc34632015-10-07 16:12:35 -07001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2015 gRPC authors.
David Garcia Quintas4bc34632015-10-07 16:12:35 -07004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
David Garcia Quintas4bc34632015-10-07 16:12:35 -07008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
David Garcia Quintas4bc34632015-10-07 16:12:35 -070010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
David Garcia Quintas4bc34632015-10-07 16:12:35 -070016 *
17 */
18
Craig Tiller9533d042016-03-25 17:11:06 -070019#include "src/core/lib/iomgr/executor.h"
David Garcia Quintas4bc34632015-10-07 16:12:35 -070020
21#include <string.h>
22
23#include <grpc/support/alloc.h>
Craig Tiller3e9f98e2017-05-12 13:17:47 -070024#include <grpc/support/cpu.h>
David Garcia Quintas4bc34632015-10-07 16:12:35 -070025#include <grpc/support/log.h>
26#include <grpc/support/sync.h>
27#include <grpc/support/thd.h>
Craig Tiller3e9f98e2017-05-12 13:17:47 -070028#include <grpc/support/tls.h>
29#include <grpc/support/useful.h>
30
Craig Tiller9533d042016-03-25 17:11:06 -070031#include "src/core/lib/iomgr/exec_ctx.h"
Craig Tiller3e9f98e2017-05-12 13:17:47 -070032#include "src/core/lib/support/spinlock.h"
David Garcia Quintas4bc34632015-10-07 16:12:35 -070033
Craig Tiller1ed31182017-05-24 16:42:35 -070034#define MAX_DEPTH 2
Craig Tiller3e9f98e2017-05-12 13:17:47 -070035
36typedef struct {
David Garcia Quintas4bc34632015-10-07 16:12:35 -070037 gpr_mu mu;
Craig Tiller3e9f98e2017-05-12 13:17:47 -070038 gpr_cv cv;
39 grpc_closure_list elems;
40 size_t depth;
41 bool shutdown;
Craig Tillerc2fb83e2017-07-18 12:38:25 -070042 bool queued_long_job;
Craig Tiller3e9f98e2017-05-12 13:17:47 -070043 gpr_thd_id id;
44} thread_state;
David Garcia Quintas4bc34632015-10-07 16:12:35 -070045
Craig Tiller3e9f98e2017-05-12 13:17:47 -070046static thread_state *g_thread_state;
47static size_t g_max_threads;
48static gpr_atm g_cur_threads;
49static gpr_spinlock g_adding_thread_lock = GPR_SPINLOCK_STATIC_INITIALIZER;
50
51GPR_TLS_DECL(g_this_thread_state);
52
Craig Tilleraf723b02017-07-17 17:56:28 -070053static grpc_tracer_flag executor_trace =
54 GRPC_TRACER_INITIALIZER(false, "executor");
55
Craig Tiller3e9f98e2017-05-12 13:17:47 -070056static void executor_thread(void *arg);
David Garcia Quintas4bc34632015-10-07 16:12:35 -070057
Craig Tiller3e9f98e2017-05-12 13:17:47 -070058static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) {
59 size_t n = 0;
David Garcia Quintas4bc34632015-10-07 16:12:35 -070060
Craig Tiller3e9f98e2017-05-12 13:17:47 -070061 grpc_closure *c = list.head;
Craig Tiller061ef742016-12-29 10:54:09 -080062 while (c != NULL) {
63 grpc_closure *next = c->next_data.next;
64 grpc_error *error = c->error_data.error;
Craig Tilleraf723b02017-07-17 17:56:28 -070065 if (GRPC_TRACER_ON(executor_trace)) {
66#ifndef NDEBUG
67 gpr_log(GPR_DEBUG, "EXECUTOR: run %p [created by %s:%d]", c,
68 c->file_created, c->line_created);
69#else
70 gpr_log(GPR_DEBUG, "EXECUTOR: run %p", c);
71#endif
72 }
ncteisenf8061e82017-06-09 10:44:42 -070073#ifndef NDEBUG
Craig Tillerb9b01ce2017-05-12 13:47:10 -070074 c->scheduled = false;
Mark D. Roth43f774e2017-04-04 16:35:37 -070075#endif
Craig Tiller0b093412017-01-03 09:49:07 -080076 c->cb(exec_ctx, c->cb_arg, error);
Craig Tiller061ef742016-12-29 10:54:09 -080077 GRPC_ERROR_UNREF(error);
78 c = next;
Craig Tillerf21acdd2017-06-08 08:09:35 -070079 n++;
Craig Tiller1ab56d82017-07-19 09:55:57 -070080 grpc_exec_ctx_flush(exec_ctx);
Craig Tiller061ef742016-12-29 10:54:09 -080081 }
Craig Tiller3e9f98e2017-05-12 13:17:47 -070082
83 return n;
84}
85
Craig Tiller5e56f002017-05-16 15:02:50 -070086bool grpc_executor_is_threaded() {
87 return gpr_atm_no_barrier_load(&g_cur_threads) > 0;
88}
89
90void grpc_executor_set_threading(grpc_exec_ctx *exec_ctx, bool threading) {
91 gpr_atm cur_threads = gpr_atm_no_barrier_load(&g_cur_threads);
92 if (threading) {
93 if (cur_threads > 0) return;
94 g_max_threads = GPR_MAX(1, 2 * gpr_cpu_num_cores());
95 gpr_atm_no_barrier_store(&g_cur_threads, 1);
96 gpr_tls_init(&g_this_thread_state);
97 g_thread_state = gpr_zalloc(sizeof(thread_state) * g_max_threads);
98 for (size_t i = 0; i < g_max_threads; i++) {
99 gpr_mu_init(&g_thread_state[i].mu);
100 gpr_cv_init(&g_thread_state[i].cv);
101 g_thread_state[i].elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
102 }
103
104 gpr_thd_options opt = gpr_thd_options_default();
105 gpr_thd_options_set_joinable(&opt);
106 gpr_thd_new(&g_thread_state[0].id, executor_thread, &g_thread_state[0],
107 &opt);
108 } else {
109 if (cur_threads == 0) return;
110 for (size_t i = 0; i < g_max_threads; i++) {
111 gpr_mu_lock(&g_thread_state[i].mu);
112 g_thread_state[i].shutdown = true;
113 gpr_cv_signal(&g_thread_state[i].cv);
114 gpr_mu_unlock(&g_thread_state[i].mu);
115 }
Craig Tillerf7c8c9f2017-05-17 15:22:05 -0700116 /* ensure no thread is adding a new thread... once this is past, then
117 no thread will try to add a new one either (since shutdown is true) */
118 gpr_spinlock_lock(&g_adding_thread_lock);
119 gpr_spinlock_unlock(&g_adding_thread_lock);
Craig Tiller5e56f002017-05-16 15:02:50 -0700120 for (gpr_atm i = 0; i < g_cur_threads; i++) {
121 gpr_thd_join(g_thread_state[i].id);
122 }
123 gpr_atm_no_barrier_store(&g_cur_threads, 0);
124 for (size_t i = 0; i < g_max_threads; i++) {
125 gpr_mu_destroy(&g_thread_state[i].mu);
126 gpr_cv_destroy(&g_thread_state[i].cv);
127 run_closures(exec_ctx, g_thread_state[i].elems);
128 }
129 gpr_free(g_thread_state);
130 gpr_tls_destroy(&g_this_thread_state);
David Garcia Quintas4bc34632015-10-07 16:12:35 -0700131 }
Craig Tiller5e56f002017-05-16 15:02:50 -0700132}
133
134void grpc_executor_init(grpc_exec_ctx *exec_ctx) {
Craig Tilleraf723b02017-07-17 17:56:28 -0700135 grpc_register_tracer(&executor_trace);
Craig Tillerb9b01ce2017-05-12 13:47:10 -0700136 gpr_atm_no_barrier_store(&g_cur_threads, 0);
Craig Tiller5e56f002017-05-16 15:02:50 -0700137 grpc_executor_set_threading(exec_ctx, true);
138}
139
140void grpc_executor_shutdown(grpc_exec_ctx *exec_ctx) {
141 grpc_executor_set_threading(exec_ctx, false);
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700142}
143
144static void executor_thread(void *arg) {
145 thread_state *ts = arg;
146 gpr_tls_set(&g_this_thread_state, (intptr_t)ts);
147
Craig Tiller89962082017-05-12 14:30:42 -0700148 grpc_exec_ctx exec_ctx =
149 GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL);
150
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700151 size_t subtract_depth = 0;
152 for (;;) {
Craig Tilleraf723b02017-07-17 17:56:28 -0700153 if (GRPC_TRACER_ON(executor_trace)) {
154 gpr_log(GPR_DEBUG,
155 "EXECUTOR[%" PRIdPTR "]: step (sub_depth=%" PRIdPTR ")",
156 ts - g_thread_state, subtract_depth);
157 }
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700158 gpr_mu_lock(&ts->mu);
159 ts->depth -= subtract_depth;
160 while (grpc_closure_list_empty(ts->elems) && !ts->shutdown) {
Craig Tiller1ab56d82017-07-19 09:55:57 -0700161 ts->queued_long_job = false;
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700162 gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_REALTIME));
163 }
164 if (ts->shutdown) {
Craig Tilleraf723b02017-07-17 17:56:28 -0700165 if (GRPC_TRACER_ON(executor_trace)) {
166 gpr_log(GPR_DEBUG, "EXECUTOR[%" PRIdPTR "]: shutdown",
167 ts - g_thread_state);
168 }
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700169 gpr_mu_unlock(&ts->mu);
170 break;
171 }
172 grpc_closure_list exec = ts->elems;
173 ts->elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
174 gpr_mu_unlock(&ts->mu);
Craig Tilleraf723b02017-07-17 17:56:28 -0700175 if (GRPC_TRACER_ON(executor_trace)) {
176 gpr_log(GPR_DEBUG, "EXECUTOR[%" PRIdPTR "]: execute",
177 ts - g_thread_state);
178 }
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700179
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700180 subtract_depth = run_closures(&exec_ctx, exec);
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700181 }
Craig Tiller89962082017-05-12 14:30:42 -0700182 grpc_exec_ctx_finish(&exec_ctx);
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700183}
184
185static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
Craig Tiller7a82afd2017-07-18 09:40:40 -0700186 grpc_error *error, bool is_short) {
Craig Tiller2f767eb2017-07-20 12:06:14 -0700187 bool retry_push;
188 do {
189 retry_push = false;
190 size_t cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads);
191 if (cur_thread_count == 0) {
192 if (GRPC_TRACER_ON(executor_trace)) {
Craig Tillerc2fb83e2017-07-18 12:38:25 -0700193#ifndef NDEBUG
Craig Tiller2f767eb2017-07-20 12:06:14 -0700194 gpr_log(GPR_DEBUG, "EXECUTOR: schedule %p (created %s:%d) inline",
195 closure, closure->file_created, closure->line_created);
Craig Tillerc2fb83e2017-07-18 12:38:25 -0700196#else
Craig Tiller2f767eb2017-07-20 12:06:14 -0700197 gpr_log(GPR_DEBUG, "EXECUTOR: schedule %p inline", closure);
Craig Tillerc2fb83e2017-07-18 12:38:25 -0700198#endif
Craig Tillerc2fb83e2017-07-18 12:38:25 -0700199 }
Craig Tiller2f767eb2017-07-20 12:06:14 -0700200 grpc_closure_list_append(&exec_ctx->closure_list, closure, error);
201 return;
Craig Tillerc2fb83e2017-07-18 12:38:25 -0700202 }
Craig Tiller2f767eb2017-07-20 12:06:14 -0700203 thread_state *ts = (thread_state *)gpr_tls_get(&g_this_thread_state);
204 if (ts == NULL) {
205 ts = &g_thread_state[GPR_HASH_POINTER(exec_ctx, cur_thread_count)];
Craig Tillerc2fb83e2017-07-18 12:38:25 -0700206 }
Craig Tiller2f767eb2017-07-20 12:06:14 -0700207 thread_state *orig_ts = ts;
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700208
Craig Tiller2f767eb2017-07-20 12:06:14 -0700209 bool try_new_thread;
210 for (;;) {
211 if (GRPC_TRACER_ON(executor_trace)) {
212#ifndef NDEBUG
213 gpr_log(GPR_DEBUG,
214 "EXECUTOR: try to schedule %p (%s) (created %s:%d) to thread "
215 "%" PRIdPTR,
216 closure, is_short ? "short" : "long", closure->file_created,
217 closure->line_created, ts - g_thread_state);
218#else
219 gpr_log(GPR_DEBUG,
220 "EXECUTOR: try to schedule %p (%s) to thread %" PRIdPTR,
221 closure, is_short ? "short" : "long", ts - g_thread_state);
222#endif
223 }
224 gpr_mu_lock(&ts->mu);
225 if (ts->queued_long_job) {
226 gpr_mu_unlock(&ts->mu);
227 intptr_t idx = ts - g_thread_state;
228 ts = &g_thread_state[(idx + 1) % g_cur_threads];
229 if (ts == orig_ts) {
230 retry_push = true;
231 try_new_thread = true;
232 break;
233 }
234 continue;
235 }
236 if (grpc_closure_list_empty(ts->elems)) {
237 gpr_cv_signal(&ts->cv);
238 }
239 grpc_closure_list_append(&ts->elems, closure, error);
240 ts->depth++;
241 try_new_thread = ts->depth > MAX_DEPTH &&
242 cur_thread_count < g_max_threads && !ts->shutdown;
243 if (!is_short) ts->queued_long_job = true;
244 gpr_mu_unlock(&ts->mu);
245 break;
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700246 }
Craig Tiller2f767eb2017-07-20 12:06:14 -0700247 if (try_new_thread && gpr_spinlock_trylock(&g_adding_thread_lock)) {
248 cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads);
249 if (cur_thread_count < g_max_threads) {
250 gpr_atm_no_barrier_store(&g_cur_threads, cur_thread_count + 1);
251
252 gpr_thd_options opt = gpr_thd_options_default();
253 gpr_thd_options_set_joinable(&opt);
254 gpr_thd_new(&g_thread_state[cur_thread_count].id, executor_thread,
255 &g_thread_state[cur_thread_count], &opt);
256 }
257 gpr_spinlock_unlock(&g_adding_thread_lock);
258 }
259 } while (retry_push);
David Garcia Quintas4bc34632015-10-07 16:12:35 -0700260}
Craig Tiller91031da2016-12-28 15:44:25 -0800261
Craig Tiller7a82afd2017-07-18 09:40:40 -0700262static void executor_push_short(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
263 grpc_error *error) {
264 executor_push(exec_ctx, closure, error, true);
265}
266
267static void executor_push_long(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
268 grpc_error *error) {
269 executor_push(exec_ctx, closure, error, false);
270}
271
272static const grpc_closure_scheduler_vtable executor_vtable_short = {
273 executor_push_short, executor_push_short, "executor"};
274static grpc_closure_scheduler executor_scheduler_short = {
275 &executor_vtable_short};
276
277static const grpc_closure_scheduler_vtable executor_vtable_long = {
278 executor_push_long, executor_push_long, "executor"};
279static grpc_closure_scheduler executor_scheduler_long = {&executor_vtable_long};
280
281grpc_closure_scheduler *grpc_executor_scheduler(
282 grpc_executor_job_length length) {
283 return length == GRPC_EXECUTOR_SHORT ? &executor_scheduler_short
284 : &executor_scheduler_long;
285}