blob: 30157e3d1d691f87908b2ebb141eea7afb44fbdd [file] [log] [blame]
David Garcia Quintas4bc34632015-10-07 16:12:35 -07001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2015 gRPC authors.
David Garcia Quintas4bc34632015-10-07 16:12:35 -07004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
David Garcia Quintas4bc34632015-10-07 16:12:35 -07008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
David Garcia Quintas4bc34632015-10-07 16:12:35 -070010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
David Garcia Quintas4bc34632015-10-07 16:12:35 -070016 *
17 */
18
Craig Tiller9533d042016-03-25 17:11:06 -070019#include "src/core/lib/iomgr/executor.h"
David Garcia Quintas4bc34632015-10-07 16:12:35 -070020
21#include <string.h>
22
23#include <grpc/support/alloc.h>
Craig Tiller3e9f98e2017-05-12 13:17:47 -070024#include <grpc/support/cpu.h>
David Garcia Quintas4bc34632015-10-07 16:12:35 -070025#include <grpc/support/log.h>
26#include <grpc/support/sync.h>
27#include <grpc/support/thd.h>
Craig Tiller3e9f98e2017-05-12 13:17:47 -070028#include <grpc/support/useful.h>
29
Craig Tiller57bb9a92017-08-31 16:44:15 -070030#include "src/core/lib/debug/stats.h"
Mark D. Rothdbdf4952018-01-18 11:21:12 -080031#include "src/core/lib/gpr/spinlock.h"
Vijay Paib6cf1232018-01-25 21:02:26 -080032#include "src/core/lib/gpr/tls.h"
Craig Tiller9533d042016-03-25 17:11:06 -070033#include "src/core/lib/iomgr/exec_ctx.h"
David Garcia Quintas4bc34632015-10-07 16:12:35 -070034
Craig Tiller2477cf32017-09-26 12:20:35 -070035#define MAX_DEPTH 2
36
Craig Tiller3e9f98e2017-05-12 13:17:47 -070037typedef struct {
David Garcia Quintas4bc34632015-10-07 16:12:35 -070038 gpr_mu mu;
Craig Tiller3e9f98e2017-05-12 13:17:47 -070039 gpr_cv cv;
40 grpc_closure_list elems;
Craig Tiller2477cf32017-09-26 12:20:35 -070041 size_t depth;
Craig Tiller3e9f98e2017-05-12 13:17:47 -070042 bool shutdown;
Craig Tillerc2fb83e2017-07-18 12:38:25 -070043 bool queued_long_job;
Craig Tiller3e9f98e2017-05-12 13:17:47 -070044 gpr_thd_id id;
45} thread_state;
David Garcia Quintas4bc34632015-10-07 16:12:35 -070046
Craig Tillerbaa14a92017-11-03 09:09:36 -070047static thread_state* g_thread_state;
Craig Tiller3e9f98e2017-05-12 13:17:47 -070048static size_t g_max_threads;
49static gpr_atm g_cur_threads;
50static gpr_spinlock g_adding_thread_lock = GPR_SPINLOCK_STATIC_INITIALIZER;
51
52GPR_TLS_DECL(g_this_thread_state);
53
ncteisen72afb762017-11-10 12:23:12 -080054grpc_core::TraceFlag executor_trace(false, "executor");
Craig Tilleraf723b02017-07-17 17:56:28 -070055
Craig Tillerbaa14a92017-11-03 09:09:36 -070056static void executor_thread(void* arg);
David Garcia Quintas4bc34632015-10-07 16:12:35 -070057
Yash Tibrewal8cf14702017-12-06 09:47:54 -080058static size_t run_closures(grpc_closure_list list) {
Craig Tiller2477cf32017-09-26 12:20:35 -070059 size_t n = 0;
David Garcia Quintas4bc34632015-10-07 16:12:35 -070060
Craig Tillerbaa14a92017-11-03 09:09:36 -070061 grpc_closure* c = list.head;
Craig Tiller4782d922017-11-10 09:53:21 -080062 while (c != nullptr) {
Craig Tillerbaa14a92017-11-03 09:09:36 -070063 grpc_closure* next = c->next_data.next;
64 grpc_error* error = c->error_data.error;
Craig Tiller6014e8a2017-10-16 13:50:29 -070065 if (executor_trace.enabled()) {
Craig Tilleraf723b02017-07-17 17:56:28 -070066#ifndef NDEBUG
Craig Tiller2477cf32017-09-26 12:20:35 -070067 gpr_log(GPR_DEBUG, "EXECUTOR: run %p [created by %s:%d]", c,
68 c->file_created, c->line_created);
Craig Tilleraf723b02017-07-17 17:56:28 -070069#else
Craig Tiller2477cf32017-09-26 12:20:35 -070070 gpr_log(GPR_DEBUG, "EXECUTOR: run %p", c);
Craig Tilleraf723b02017-07-17 17:56:28 -070071#endif
Craig Tillerfb0262b2017-09-13 15:19:19 -070072 }
Craig Tiller2477cf32017-09-26 12:20:35 -070073#ifndef NDEBUG
74 c->scheduled = false;
75#endif
Yash Tibrewal8cf14702017-12-06 09:47:54 -080076 c->cb(c->cb_arg, error);
Craig Tiller2477cf32017-09-26 12:20:35 -070077 GRPC_ERROR_UNREF(error);
78 c = next;
79 n++;
Yash Tibrewal8cf14702017-12-06 09:47:54 -080080 grpc_core::ExecCtx::Get()->Flush();
Craig Tiller061ef742016-12-29 10:54:09 -080081 }
Craig Tiller3e9f98e2017-05-12 13:17:47 -070082
Craig Tiller2477cf32017-09-26 12:20:35 -070083 return n;
Craig Tiller3e9f98e2017-05-12 13:17:47 -070084}
85
Craig Tiller5e56f002017-05-16 15:02:50 -070086bool grpc_executor_is_threaded() {
87 return gpr_atm_no_barrier_load(&g_cur_threads) > 0;
88}
89
Yash Tibrewal8cf14702017-12-06 09:47:54 -080090void grpc_executor_set_threading(bool threading) {
Craig Tiller5e56f002017-05-16 15:02:50 -070091 gpr_atm cur_threads = gpr_atm_no_barrier_load(&g_cur_threads);
92 if (threading) {
93 if (cur_threads > 0) return;
94 g_max_threads = GPR_MAX(1, 2 * gpr_cpu_num_cores());
95 gpr_atm_no_barrier_store(&g_cur_threads, 1);
96 gpr_tls_init(&g_this_thread_state);
Yash Tibrewalca3c1c02017-09-07 22:47:16 -070097 g_thread_state =
Craig Tillerbaa14a92017-11-03 09:09:36 -070098 (thread_state*)gpr_zalloc(sizeof(thread_state) * g_max_threads);
Craig Tiller5e56f002017-05-16 15:02:50 -070099 for (size_t i = 0; i < g_max_threads; i++) {
100 gpr_mu_init(&g_thread_state[i].mu);
101 gpr_cv_init(&g_thread_state[i].cv);
Yash Tibrewal37fdb732017-09-25 16:45:02 -0700102 g_thread_state[i].elems = GRPC_CLOSURE_LIST_INIT;
Craig Tiller5e56f002017-05-16 15:02:50 -0700103 }
104
105 gpr_thd_options opt = gpr_thd_options_default();
106 gpr_thd_options_set_joinable(&opt);
Dave MacLachlanda341bc2017-11-30 12:48:22 -0800107 gpr_thd_new(&g_thread_state[0].id, "grpc_executor", executor_thread,
Dave MacLachlanaf5c54d2017-11-29 16:25:10 -0800108 &g_thread_state[0], &opt);
Craig Tiller5e56f002017-05-16 15:02:50 -0700109 } else {
110 if (cur_threads == 0) return;
111 for (size_t i = 0; i < g_max_threads; i++) {
112 gpr_mu_lock(&g_thread_state[i].mu);
113 g_thread_state[i].shutdown = true;
114 gpr_cv_signal(&g_thread_state[i].cv);
115 gpr_mu_unlock(&g_thread_state[i].mu);
116 }
Craig Tillerf7c8c9f2017-05-17 15:22:05 -0700117 /* ensure no thread is adding a new thread... once this is past, then
118 no thread will try to add a new one either (since shutdown is true) */
119 gpr_spinlock_lock(&g_adding_thread_lock);
120 gpr_spinlock_unlock(&g_adding_thread_lock);
Craig Tiller5e56f002017-05-16 15:02:50 -0700121 for (gpr_atm i = 0; i < g_cur_threads; i++) {
122 gpr_thd_join(g_thread_state[i].id);
123 }
124 gpr_atm_no_barrier_store(&g_cur_threads, 0);
125 for (size_t i = 0; i < g_max_threads; i++) {
126 gpr_mu_destroy(&g_thread_state[i].mu);
127 gpr_cv_destroy(&g_thread_state[i].cv);
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800128 run_closures(g_thread_state[i].elems);
Craig Tiller5e56f002017-05-16 15:02:50 -0700129 }
130 gpr_free(g_thread_state);
131 gpr_tls_destroy(&g_this_thread_state);
David Garcia Quintas4bc34632015-10-07 16:12:35 -0700132 }
Craig Tiller5e56f002017-05-16 15:02:50 -0700133}
134
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800135void grpc_executor_init() {
Craig Tillerb9b01ce2017-05-12 13:47:10 -0700136 gpr_atm_no_barrier_store(&g_cur_threads, 0);
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800137 grpc_executor_set_threading(true);
Craig Tiller5e56f002017-05-16 15:02:50 -0700138}
139
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800140void grpc_executor_shutdown() { grpc_executor_set_threading(false); }
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700141
Craig Tillerbaa14a92017-11-03 09:09:36 -0700142static void executor_thread(void* arg) {
143 thread_state* ts = (thread_state*)arg;
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700144 gpr_tls_set(&g_this_thread_state, (intptr_t)ts);
145
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800146 grpc_core::ExecCtx exec_ctx(0);
Craig Tiller89962082017-05-12 14:30:42 -0700147
Craig Tiller2477cf32017-09-26 12:20:35 -0700148 size_t subtract_depth = 0;
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700149 for (;;) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700150 if (executor_trace.enabled()) {
Craig Tiller2477cf32017-09-26 12:20:35 -0700151 gpr_log(GPR_DEBUG, "EXECUTOR[%d]: step (sub_depth=%" PRIdPTR ")",
152 (int)(ts - g_thread_state), subtract_depth);
Craig Tilleraf723b02017-07-17 17:56:28 -0700153 }
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700154 gpr_mu_lock(&ts->mu);
Craig Tiller2477cf32017-09-26 12:20:35 -0700155 ts->depth -= subtract_depth;
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700156 while (grpc_closure_list_empty(ts->elems) && !ts->shutdown) {
Craig Tiller1ab56d82017-07-19 09:55:57 -0700157 ts->queued_long_job = false;
Sree Kuchibhotla54961bb2017-12-04 12:50:27 -0800158 gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_MONOTONIC));
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700159 }
160 if (ts->shutdown) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700161 if (executor_trace.enabled()) {
Craig Tiller7d079942017-09-05 12:46:48 -0700162 gpr_log(GPR_DEBUG, "EXECUTOR[%d]: shutdown",
163 (int)(ts - g_thread_state));
Craig Tilleraf723b02017-07-17 17:56:28 -0700164 }
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700165 gpr_mu_unlock(&ts->mu);
166 break;
167 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800168 GRPC_STATS_INC_EXECUTOR_QUEUE_DRAINED();
Craig Tiller2477cf32017-09-26 12:20:35 -0700169 grpc_closure_list exec = ts->elems;
Yash Tibrewal37fdb732017-09-25 16:45:02 -0700170 ts->elems = GRPC_CLOSURE_LIST_INIT;
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700171 gpr_mu_unlock(&ts->mu);
Craig Tiller6014e8a2017-10-16 13:50:29 -0700172 if (executor_trace.enabled()) {
Craig Tiller7d079942017-09-05 12:46:48 -0700173 gpr_log(GPR_DEBUG, "EXECUTOR[%d]: execute", (int)(ts - g_thread_state));
Craig Tilleraf723b02017-07-17 17:56:28 -0700174 }
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700175
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800176 grpc_core::ExecCtx::Get()->InvalidateNow();
177 subtract_depth = run_closures(exec);
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700178 }
179}
180
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800181static void executor_push(grpc_closure* closure, grpc_error* error,
182 bool is_short) {
Craig Tiller2f767eb2017-07-20 12:06:14 -0700183 bool retry_push;
Craig Tiller07d2fa72017-09-07 13:13:36 -0700184 if (is_short) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800185 GRPC_STATS_INC_EXECUTOR_SCHEDULED_SHORT_ITEMS();
Craig Tiller07d2fa72017-09-07 13:13:36 -0700186 } else {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800187 GRPC_STATS_INC_EXECUTOR_SCHEDULED_LONG_ITEMS();
Craig Tiller07d2fa72017-09-07 13:13:36 -0700188 }
Craig Tiller2f767eb2017-07-20 12:06:14 -0700189 do {
190 retry_push = false;
191 size_t cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads);
192 if (cur_thread_count == 0) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700193 if (executor_trace.enabled()) {
Craig Tillerc2fb83e2017-07-18 12:38:25 -0700194#ifndef NDEBUG
Craig Tiller2f767eb2017-07-20 12:06:14 -0700195 gpr_log(GPR_DEBUG, "EXECUTOR: schedule %p (created %s:%d) inline",
196 closure, closure->file_created, closure->line_created);
Craig Tillerc2fb83e2017-07-18 12:38:25 -0700197#else
Craig Tiller2f767eb2017-07-20 12:06:14 -0700198 gpr_log(GPR_DEBUG, "EXECUTOR: schedule %p inline", closure);
Craig Tillerc2fb83e2017-07-18 12:38:25 -0700199#endif
Craig Tillerc2fb83e2017-07-18 12:38:25 -0700200 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800201 grpc_closure_list_append(grpc_core::ExecCtx::Get()->closure_list(),
202 closure, error);
Craig Tiller2f767eb2017-07-20 12:06:14 -0700203 return;
Craig Tillerc2fb83e2017-07-18 12:38:25 -0700204 }
Craig Tillerbaa14a92017-11-03 09:09:36 -0700205 thread_state* ts = (thread_state*)gpr_tls_get(&g_this_thread_state);
Craig Tiller4782d922017-11-10 09:53:21 -0800206 if (ts == nullptr) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800207 ts = &g_thread_state[GPR_HASH_POINTER(grpc_core::ExecCtx::Get(),
208 cur_thread_count)];
Craig Tiller022ad3a2017-09-07 13:01:56 -0700209 } else {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800210 GRPC_STATS_INC_EXECUTOR_SCHEDULED_TO_SELF();
Craig Tillerc2fb83e2017-07-18 12:38:25 -0700211 }
Craig Tillerbaa14a92017-11-03 09:09:36 -0700212 thread_state* orig_ts = ts;
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700213
Craig Tiller2f767eb2017-07-20 12:06:14 -0700214 bool try_new_thread;
215 for (;;) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700216 if (executor_trace.enabled()) {
Craig Tiller2f767eb2017-07-20 12:06:14 -0700217#ifndef NDEBUG
Craig Tiller8af33db2017-07-20 16:37:36 -0700218 gpr_log(
219 GPR_DEBUG,
220 "EXECUTOR: try to schedule %p (%s) (created %s:%d) to thread %d",
221 closure, is_short ? "short" : "long", closure->file_created,
222 closure->line_created, (int)(ts - g_thread_state));
Craig Tiller2f767eb2017-07-20 12:06:14 -0700223#else
Craig Tiller8af33db2017-07-20 16:37:36 -0700224 gpr_log(GPR_DEBUG, "EXECUTOR: try to schedule %p (%s) to thread %d",
225 closure, is_short ? "short" : "long",
226 (int)(ts - g_thread_state));
Craig Tiller2f767eb2017-07-20 12:06:14 -0700227#endif
228 }
229 gpr_mu_lock(&ts->mu);
230 if (ts->queued_long_job) {
Craig Tillerb0ce25e2017-09-08 14:42:26 -0700231 // if there's a long job queued, we never queue anything else to this
232 // queue (since long jobs can take 'infinite' time and we need to
233 // guarantee no starvation)
234 // ... spin through queues and try again
Craig Tiller2f767eb2017-07-20 12:06:14 -0700235 gpr_mu_unlock(&ts->mu);
Craig Tillera0d51852017-07-21 13:49:49 -0700236 size_t idx = (size_t)(ts - g_thread_state);
Craig Tiller18908832017-07-21 13:27:27 -0700237 ts = &g_thread_state[(idx + 1) % cur_thread_count];
Craig Tiller2f767eb2017-07-20 12:06:14 -0700238 if (ts == orig_ts) {
239 retry_push = true;
240 try_new_thread = true;
241 break;
242 }
243 continue;
244 }
ncteisenc0b00c32017-12-14 11:30:38 -0800245 if (grpc_closure_list_empty(ts->elems) && !ts->shutdown) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800246 GRPC_STATS_INC_EXECUTOR_WAKEUP_INITIATED();
Craig Tiller2f767eb2017-07-20 12:06:14 -0700247 gpr_cv_signal(&ts->cv);
248 }
249 grpc_closure_list_append(&ts->elems, closure, error);
Craig Tiller2477cf32017-09-26 12:20:35 -0700250 ts->depth++;
251 try_new_thread = ts->depth > MAX_DEPTH &&
Craig Tiller2f767eb2017-07-20 12:06:14 -0700252 cur_thread_count < g_max_threads && !ts->shutdown;
253 if (!is_short) ts->queued_long_job = true;
254 gpr_mu_unlock(&ts->mu);
255 break;
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700256 }
Craig Tiller2f767eb2017-07-20 12:06:14 -0700257 if (try_new_thread && gpr_spinlock_trylock(&g_adding_thread_lock)) {
258 cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads);
259 if (cur_thread_count < g_max_threads) {
260 gpr_atm_no_barrier_store(&g_cur_threads, cur_thread_count + 1);
261
262 gpr_thd_options opt = gpr_thd_options_default();
263 gpr_thd_options_set_joinable(&opt);
Dave MacLachlanaf5c54d2017-11-29 16:25:10 -0800264 gpr_thd_new(&g_thread_state[cur_thread_count].id, "gpr_executor",
265 executor_thread, &g_thread_state[cur_thread_count], &opt);
Craig Tiller2f767eb2017-07-20 12:06:14 -0700266 }
267 gpr_spinlock_unlock(&g_adding_thread_lock);
268 }
Craig Tiller07d2fa72017-09-07 13:13:36 -0700269 if (retry_push) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800270 GRPC_STATS_INC_EXECUTOR_PUSH_RETRIES();
Craig Tiller07d2fa72017-09-07 13:13:36 -0700271 }
Craig Tiller2f767eb2017-07-20 12:06:14 -0700272 } while (retry_push);
David Garcia Quintas4bc34632015-10-07 16:12:35 -0700273}
Craig Tiller91031da2016-12-28 15:44:25 -0800274
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800275static void executor_push_short(grpc_closure* closure, grpc_error* error) {
276 executor_push(closure, error, true);
Craig Tiller7a82afd2017-07-18 09:40:40 -0700277}
278
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800279static void executor_push_long(grpc_closure* closure, grpc_error* error) {
280 executor_push(closure, error, false);
Craig Tiller7a82afd2017-07-18 09:40:40 -0700281}
282
283static const grpc_closure_scheduler_vtable executor_vtable_short = {
284 executor_push_short, executor_push_short, "executor"};
285static grpc_closure_scheduler executor_scheduler_short = {
286 &executor_vtable_short};
287
288static const grpc_closure_scheduler_vtable executor_vtable_long = {
289 executor_push_long, executor_push_long, "executor"};
290static grpc_closure_scheduler executor_scheduler_long = {&executor_vtable_long};
291
Craig Tillerbaa14a92017-11-03 09:09:36 -0700292grpc_closure_scheduler* grpc_executor_scheduler(
Craig Tiller7a82afd2017-07-18 09:40:40 -0700293 grpc_executor_job_length length) {
294 return length == GRPC_EXECUTOR_SHORT ? &executor_scheduler_short
295 : &executor_scheduler_long;
296}