blob: 16782dacc5195797cb9a723ba43d9c90087b6515 [file] [log] [blame]
David Garcia Quintas4bc34632015-10-07 16:12:35 -07001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2015 gRPC authors.
David Garcia Quintas4bc34632015-10-07 16:12:35 -07004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
David Garcia Quintas4bc34632015-10-07 16:12:35 -07008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
David Garcia Quintas4bc34632015-10-07 16:12:35 -070010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
David Garcia Quintas4bc34632015-10-07 16:12:35 -070016 *
17 */
18
Craig Tiller9533d042016-03-25 17:11:06 -070019#include "src/core/lib/iomgr/executor.h"
David Garcia Quintas4bc34632015-10-07 16:12:35 -070020
21#include <string.h>
22
23#include <grpc/support/alloc.h>
Craig Tiller3e9f98e2017-05-12 13:17:47 -070024#include <grpc/support/cpu.h>
David Garcia Quintas4bc34632015-10-07 16:12:35 -070025#include <grpc/support/log.h>
26#include <grpc/support/sync.h>
27#include <grpc/support/thd.h>
Craig Tiller3e9f98e2017-05-12 13:17:47 -070028
Craig Tiller57bb9a92017-08-31 16:44:15 -070029#include "src/core/lib/debug/stats.h"
Mark D. Rothdbdf4952018-01-18 11:21:12 -080030#include "src/core/lib/gpr/spinlock.h"
Vijay Paib6cf1232018-01-25 21:02:26 -080031#include "src/core/lib/gpr/tls.h"
Vijay Paid4d0a302018-01-25 13:24:03 -080032#include "src/core/lib/gpr/useful.h"
Craig Tiller9533d042016-03-25 17:11:06 -070033#include "src/core/lib/iomgr/exec_ctx.h"
David Garcia Quintas4bc34632015-10-07 16:12:35 -070034
Craig Tiller2477cf32017-09-26 12:20:35 -070035#define MAX_DEPTH 2
36
Craig Tiller3e9f98e2017-05-12 13:17:47 -070037typedef struct {
David Garcia Quintas4bc34632015-10-07 16:12:35 -070038 gpr_mu mu;
Craig Tiller3e9f98e2017-05-12 13:17:47 -070039 gpr_cv cv;
40 grpc_closure_list elems;
Craig Tiller2477cf32017-09-26 12:20:35 -070041 size_t depth;
Craig Tiller3e9f98e2017-05-12 13:17:47 -070042 bool shutdown;
Craig Tillerc2fb83e2017-07-18 12:38:25 -070043 bool queued_long_job;
Craig Tiller3e9f98e2017-05-12 13:17:47 -070044 gpr_thd_id id;
45} thread_state;
David Garcia Quintas4bc34632015-10-07 16:12:35 -070046
Craig Tillerbaa14a92017-11-03 09:09:36 -070047static thread_state* g_thread_state;
Craig Tiller3e9f98e2017-05-12 13:17:47 -070048static size_t g_max_threads;
49static gpr_atm g_cur_threads;
50static gpr_spinlock g_adding_thread_lock = GPR_SPINLOCK_STATIC_INITIALIZER;
51
52GPR_TLS_DECL(g_this_thread_state);
53
ncteisen72afb762017-11-10 12:23:12 -080054grpc_core::TraceFlag executor_trace(false, "executor");
Craig Tilleraf723b02017-07-17 17:56:28 -070055
Craig Tillerbaa14a92017-11-03 09:09:36 -070056static void executor_thread(void* arg);
David Garcia Quintas4bc34632015-10-07 16:12:35 -070057
Yash Tibrewal8cf14702017-12-06 09:47:54 -080058static size_t run_closures(grpc_closure_list list) {
Craig Tiller2477cf32017-09-26 12:20:35 -070059 size_t n = 0;
David Garcia Quintas4bc34632015-10-07 16:12:35 -070060
Craig Tillerbaa14a92017-11-03 09:09:36 -070061 grpc_closure* c = list.head;
Craig Tiller4782d922017-11-10 09:53:21 -080062 while (c != nullptr) {
Craig Tillerbaa14a92017-11-03 09:09:36 -070063 grpc_closure* next = c->next_data.next;
64 grpc_error* error = c->error_data.error;
Craig Tiller6014e8a2017-10-16 13:50:29 -070065 if (executor_trace.enabled()) {
Craig Tilleraf723b02017-07-17 17:56:28 -070066#ifndef NDEBUG
Craig Tiller2477cf32017-09-26 12:20:35 -070067 gpr_log(GPR_DEBUG, "EXECUTOR: run %p [created by %s:%d]", c,
68 c->file_created, c->line_created);
Craig Tilleraf723b02017-07-17 17:56:28 -070069#else
Craig Tiller2477cf32017-09-26 12:20:35 -070070 gpr_log(GPR_DEBUG, "EXECUTOR: run %p", c);
Craig Tilleraf723b02017-07-17 17:56:28 -070071#endif
Craig Tillerfb0262b2017-09-13 15:19:19 -070072 }
Craig Tiller2477cf32017-09-26 12:20:35 -070073#ifndef NDEBUG
74 c->scheduled = false;
75#endif
Yash Tibrewal8cf14702017-12-06 09:47:54 -080076 c->cb(c->cb_arg, error);
Craig Tiller2477cf32017-09-26 12:20:35 -070077 GRPC_ERROR_UNREF(error);
78 c = next;
79 n++;
Yash Tibrewal8cf14702017-12-06 09:47:54 -080080 grpc_core::ExecCtx::Get()->Flush();
Craig Tiller061ef742016-12-29 10:54:09 -080081 }
Craig Tiller3e9f98e2017-05-12 13:17:47 -070082
Craig Tiller2477cf32017-09-26 12:20:35 -070083 return n;
Craig Tiller3e9f98e2017-05-12 13:17:47 -070084}
85
Craig Tiller5e56f002017-05-16 15:02:50 -070086bool grpc_executor_is_threaded() {
87 return gpr_atm_no_barrier_load(&g_cur_threads) > 0;
88}
89
Yash Tibrewal8cf14702017-12-06 09:47:54 -080090void grpc_executor_set_threading(bool threading) {
Craig Tiller5e56f002017-05-16 15:02:50 -070091 gpr_atm cur_threads = gpr_atm_no_barrier_load(&g_cur_threads);
92 if (threading) {
93 if (cur_threads > 0) return;
94 g_max_threads = GPR_MAX(1, 2 * gpr_cpu_num_cores());
95 gpr_atm_no_barrier_store(&g_cur_threads, 1);
96 gpr_tls_init(&g_this_thread_state);
Noah Eisen4d20a662018-02-09 09:34:04 -080097 g_thread_state = static_cast<thread_state*>(
98 gpr_zalloc(sizeof(thread_state) * g_max_threads));
Craig Tiller5e56f002017-05-16 15:02:50 -070099 for (size_t i = 0; i < g_max_threads; i++) {
100 gpr_mu_init(&g_thread_state[i].mu);
101 gpr_cv_init(&g_thread_state[i].cv);
Yash Tibrewal37fdb732017-09-25 16:45:02 -0700102 g_thread_state[i].elems = GRPC_CLOSURE_LIST_INIT;
Craig Tiller5e56f002017-05-16 15:02:50 -0700103 }
104
105 gpr_thd_options opt = gpr_thd_options_default();
106 gpr_thd_options_set_joinable(&opt);
Dave MacLachlanda341bc2017-11-30 12:48:22 -0800107 gpr_thd_new(&g_thread_state[0].id, "grpc_executor", executor_thread,
Dave MacLachlanaf5c54d2017-11-29 16:25:10 -0800108 &g_thread_state[0], &opt);
Craig Tiller5e56f002017-05-16 15:02:50 -0700109 } else {
110 if (cur_threads == 0) return;
111 for (size_t i = 0; i < g_max_threads; i++) {
112 gpr_mu_lock(&g_thread_state[i].mu);
113 g_thread_state[i].shutdown = true;
114 gpr_cv_signal(&g_thread_state[i].cv);
115 gpr_mu_unlock(&g_thread_state[i].mu);
116 }
Craig Tillerf7c8c9f2017-05-17 15:22:05 -0700117 /* ensure no thread is adding a new thread... once this is past, then
118 no thread will try to add a new one either (since shutdown is true) */
119 gpr_spinlock_lock(&g_adding_thread_lock);
120 gpr_spinlock_unlock(&g_adding_thread_lock);
Craig Tiller5e56f002017-05-16 15:02:50 -0700121 for (gpr_atm i = 0; i < g_cur_threads; i++) {
122 gpr_thd_join(g_thread_state[i].id);
123 }
124 gpr_atm_no_barrier_store(&g_cur_threads, 0);
125 for (size_t i = 0; i < g_max_threads; i++) {
126 gpr_mu_destroy(&g_thread_state[i].mu);
127 gpr_cv_destroy(&g_thread_state[i].cv);
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800128 run_closures(g_thread_state[i].elems);
Craig Tiller5e56f002017-05-16 15:02:50 -0700129 }
130 gpr_free(g_thread_state);
131 gpr_tls_destroy(&g_this_thread_state);
David Garcia Quintas4bc34632015-10-07 16:12:35 -0700132 }
Craig Tiller5e56f002017-05-16 15:02:50 -0700133}
134
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800135void grpc_executor_init() {
Craig Tillerb9b01ce2017-05-12 13:47:10 -0700136 gpr_atm_no_barrier_store(&g_cur_threads, 0);
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800137 grpc_executor_set_threading(true);
Craig Tiller5e56f002017-05-16 15:02:50 -0700138}
139
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800140void grpc_executor_shutdown() { grpc_executor_set_threading(false); }
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700141
Craig Tillerbaa14a92017-11-03 09:09:36 -0700142static void executor_thread(void* arg) {
Noah Eisenbe82e642018-02-09 09:16:55 -0800143 thread_state* ts = static_cast<thread_state*>(arg);
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700144 gpr_tls_set(&g_this_thread_state, (intptr_t)ts);
145
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800146 grpc_core::ExecCtx exec_ctx(0);
Craig Tiller89962082017-05-12 14:30:42 -0700147
Craig Tiller2477cf32017-09-26 12:20:35 -0700148 size_t subtract_depth = 0;
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700149 for (;;) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700150 if (executor_trace.enabled()) {
Craig Tiller2477cf32017-09-26 12:20:35 -0700151 gpr_log(GPR_DEBUG, "EXECUTOR[%d]: step (sub_depth=%" PRIdPTR ")",
Noah Eisenbe82e642018-02-09 09:16:55 -0800152 static_cast<int>(ts - g_thread_state), subtract_depth);
Craig Tilleraf723b02017-07-17 17:56:28 -0700153 }
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700154 gpr_mu_lock(&ts->mu);
Craig Tiller2477cf32017-09-26 12:20:35 -0700155 ts->depth -= subtract_depth;
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700156 while (grpc_closure_list_empty(ts->elems) && !ts->shutdown) {
Craig Tiller1ab56d82017-07-19 09:55:57 -0700157 ts->queued_long_job = false;
Sree Kuchibhotla54961bb2017-12-04 12:50:27 -0800158 gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_MONOTONIC));
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700159 }
160 if (ts->shutdown) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700161 if (executor_trace.enabled()) {
Craig Tiller7d079942017-09-05 12:46:48 -0700162 gpr_log(GPR_DEBUG, "EXECUTOR[%d]: shutdown",
Noah Eisenbe82e642018-02-09 09:16:55 -0800163 static_cast<int>(ts - g_thread_state));
Craig Tilleraf723b02017-07-17 17:56:28 -0700164 }
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700165 gpr_mu_unlock(&ts->mu);
166 break;
167 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800168 GRPC_STATS_INC_EXECUTOR_QUEUE_DRAINED();
Craig Tiller2477cf32017-09-26 12:20:35 -0700169 grpc_closure_list exec = ts->elems;
Yash Tibrewal37fdb732017-09-25 16:45:02 -0700170 ts->elems = GRPC_CLOSURE_LIST_INIT;
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700171 gpr_mu_unlock(&ts->mu);
Craig Tiller6014e8a2017-10-16 13:50:29 -0700172 if (executor_trace.enabled()) {
Noah Eisen4d20a662018-02-09 09:34:04 -0800173 gpr_log(GPR_DEBUG, "EXECUTOR[%d]: execute",
174 static_cast<int>(ts - g_thread_state));
Craig Tilleraf723b02017-07-17 17:56:28 -0700175 }
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700176
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800177 grpc_core::ExecCtx::Get()->InvalidateNow();
178 subtract_depth = run_closures(exec);
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700179 }
180}
181
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800182static void executor_push(grpc_closure* closure, grpc_error* error,
183 bool is_short) {
Craig Tiller2f767eb2017-07-20 12:06:14 -0700184 bool retry_push;
Craig Tiller07d2fa72017-09-07 13:13:36 -0700185 if (is_short) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800186 GRPC_STATS_INC_EXECUTOR_SCHEDULED_SHORT_ITEMS();
Craig Tiller07d2fa72017-09-07 13:13:36 -0700187 } else {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800188 GRPC_STATS_INC_EXECUTOR_SCHEDULED_LONG_ITEMS();
Craig Tiller07d2fa72017-09-07 13:13:36 -0700189 }
Craig Tiller2f767eb2017-07-20 12:06:14 -0700190 do {
191 retry_push = false;
Noah Eisen4d20a662018-02-09 09:34:04 -0800192 size_t cur_thread_count =
193 static_cast<size_t> gpr_atm_no_barrier_load(&g_cur_threads);
Craig Tiller2f767eb2017-07-20 12:06:14 -0700194 if (cur_thread_count == 0) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700195 if (executor_trace.enabled()) {
Craig Tillerc2fb83e2017-07-18 12:38:25 -0700196#ifndef NDEBUG
Craig Tiller2f767eb2017-07-20 12:06:14 -0700197 gpr_log(GPR_DEBUG, "EXECUTOR: schedule %p (created %s:%d) inline",
198 closure, closure->file_created, closure->line_created);
Craig Tillerc2fb83e2017-07-18 12:38:25 -0700199#else
Craig Tiller2f767eb2017-07-20 12:06:14 -0700200 gpr_log(GPR_DEBUG, "EXECUTOR: schedule %p inline", closure);
Craig Tillerc2fb83e2017-07-18 12:38:25 -0700201#endif
Craig Tillerc2fb83e2017-07-18 12:38:25 -0700202 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800203 grpc_closure_list_append(grpc_core::ExecCtx::Get()->closure_list(),
204 closure, error);
Craig Tiller2f767eb2017-07-20 12:06:14 -0700205 return;
Craig Tillerc2fb83e2017-07-18 12:38:25 -0700206 }
Craig Tillerbaa14a92017-11-03 09:09:36 -0700207 thread_state* ts = (thread_state*)gpr_tls_get(&g_this_thread_state);
Craig Tiller4782d922017-11-10 09:53:21 -0800208 if (ts == nullptr) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800209 ts = &g_thread_state[GPR_HASH_POINTER(grpc_core::ExecCtx::Get(),
210 cur_thread_count)];
Craig Tiller022ad3a2017-09-07 13:01:56 -0700211 } else {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800212 GRPC_STATS_INC_EXECUTOR_SCHEDULED_TO_SELF();
Craig Tillerc2fb83e2017-07-18 12:38:25 -0700213 }
Craig Tillerbaa14a92017-11-03 09:09:36 -0700214 thread_state* orig_ts = ts;
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700215
Craig Tiller2f767eb2017-07-20 12:06:14 -0700216 bool try_new_thread;
217 for (;;) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700218 if (executor_trace.enabled()) {
Craig Tiller2f767eb2017-07-20 12:06:14 -0700219#ifndef NDEBUG
Craig Tiller8af33db2017-07-20 16:37:36 -0700220 gpr_log(
221 GPR_DEBUG,
222 "EXECUTOR: try to schedule %p (%s) (created %s:%d) to thread %d",
223 closure, is_short ? "short" : "long", closure->file_created,
Noah Eisenbe82e642018-02-09 09:16:55 -0800224 closure->line_created, static_cast<int>(ts - g_thread_state));
Craig Tiller2f767eb2017-07-20 12:06:14 -0700225#else
Craig Tiller8af33db2017-07-20 16:37:36 -0700226 gpr_log(GPR_DEBUG, "EXECUTOR: try to schedule %p (%s) to thread %d",
227 closure, is_short ? "short" : "long",
228 (int)(ts - g_thread_state));
Craig Tiller2f767eb2017-07-20 12:06:14 -0700229#endif
230 }
231 gpr_mu_lock(&ts->mu);
232 if (ts->queued_long_job) {
Craig Tillerb0ce25e2017-09-08 14:42:26 -0700233 // if there's a long job queued, we never queue anything else to this
234 // queue (since long jobs can take 'infinite' time and we need to
235 // guarantee no starvation)
236 // ... spin through queues and try again
Craig Tiller2f767eb2017-07-20 12:06:14 -0700237 gpr_mu_unlock(&ts->mu);
Noah Eisenbe82e642018-02-09 09:16:55 -0800238 size_t idx = static_cast<size_t>(ts - g_thread_state);
Craig Tiller18908832017-07-21 13:27:27 -0700239 ts = &g_thread_state[(idx + 1) % cur_thread_count];
Craig Tiller2f767eb2017-07-20 12:06:14 -0700240 if (ts == orig_ts) {
241 retry_push = true;
242 try_new_thread = true;
243 break;
244 }
245 continue;
246 }
ncteisenc0b00c32017-12-14 11:30:38 -0800247 if (grpc_closure_list_empty(ts->elems) && !ts->shutdown) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800248 GRPC_STATS_INC_EXECUTOR_WAKEUP_INITIATED();
Craig Tiller2f767eb2017-07-20 12:06:14 -0700249 gpr_cv_signal(&ts->cv);
250 }
251 grpc_closure_list_append(&ts->elems, closure, error);
Craig Tiller2477cf32017-09-26 12:20:35 -0700252 ts->depth++;
253 try_new_thread = ts->depth > MAX_DEPTH &&
Craig Tiller2f767eb2017-07-20 12:06:14 -0700254 cur_thread_count < g_max_threads && !ts->shutdown;
255 if (!is_short) ts->queued_long_job = true;
256 gpr_mu_unlock(&ts->mu);
257 break;
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700258 }
Craig Tiller2f767eb2017-07-20 12:06:14 -0700259 if (try_new_thread && gpr_spinlock_trylock(&g_adding_thread_lock)) {
Noah Eisen4d20a662018-02-09 09:34:04 -0800260 cur_thread_count =
261 static_cast<size_t> gpr_atm_no_barrier_load(&g_cur_threads);
Craig Tiller2f767eb2017-07-20 12:06:14 -0700262 if (cur_thread_count < g_max_threads) {
263 gpr_atm_no_barrier_store(&g_cur_threads, cur_thread_count + 1);
264
265 gpr_thd_options opt = gpr_thd_options_default();
266 gpr_thd_options_set_joinable(&opt);
Dave MacLachlanaf5c54d2017-11-29 16:25:10 -0800267 gpr_thd_new(&g_thread_state[cur_thread_count].id, "gpr_executor",
268 executor_thread, &g_thread_state[cur_thread_count], &opt);
Craig Tiller2f767eb2017-07-20 12:06:14 -0700269 }
270 gpr_spinlock_unlock(&g_adding_thread_lock);
271 }
Craig Tiller07d2fa72017-09-07 13:13:36 -0700272 if (retry_push) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800273 GRPC_STATS_INC_EXECUTOR_PUSH_RETRIES();
Craig Tiller07d2fa72017-09-07 13:13:36 -0700274 }
Craig Tiller2f767eb2017-07-20 12:06:14 -0700275 } while (retry_push);
David Garcia Quintas4bc34632015-10-07 16:12:35 -0700276}
Craig Tiller91031da2016-12-28 15:44:25 -0800277
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800278static void executor_push_short(grpc_closure* closure, grpc_error* error) {
279 executor_push(closure, error, true);
Craig Tiller7a82afd2017-07-18 09:40:40 -0700280}
281
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800282static void executor_push_long(grpc_closure* closure, grpc_error* error) {
283 executor_push(closure, error, false);
Craig Tiller7a82afd2017-07-18 09:40:40 -0700284}
285
286static const grpc_closure_scheduler_vtable executor_vtable_short = {
287 executor_push_short, executor_push_short, "executor"};
288static grpc_closure_scheduler executor_scheduler_short = {
289 &executor_vtable_short};
290
291static const grpc_closure_scheduler_vtable executor_vtable_long = {
292 executor_push_long, executor_push_long, "executor"};
293static grpc_closure_scheduler executor_scheduler_long = {&executor_vtable_long};
294
Craig Tillerbaa14a92017-11-03 09:09:36 -0700295grpc_closure_scheduler* grpc_executor_scheduler(
Craig Tiller7a82afd2017-07-18 09:40:40 -0700296 grpc_executor_job_length length) {
297 return length == GRPC_EXECUTOR_SHORT ? &executor_scheduler_short
298 : &executor_scheduler_long;
299}