blob: da5877479b2930545d3ecbb9eb7552a787ad6b82 [file] [log] [blame]
David Garcia Quintas4bc34632015-10-07 16:12:35 -07001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2015 gRPC authors.
David Garcia Quintas4bc34632015-10-07 16:12:35 -07004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
David Garcia Quintas4bc34632015-10-07 16:12:35 -07008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
David Garcia Quintas4bc34632015-10-07 16:12:35 -070010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
David Garcia Quintas4bc34632015-10-07 16:12:35 -070016 *
17 */
18
Craig Tiller9533d042016-03-25 17:11:06 -070019#include "src/core/lib/iomgr/executor.h"
David Garcia Quintas4bc34632015-10-07 16:12:35 -070020
21#include <string.h>
22
23#include <grpc/support/alloc.h>
Craig Tiller3e9f98e2017-05-12 13:17:47 -070024#include <grpc/support/cpu.h>
David Garcia Quintas4bc34632015-10-07 16:12:35 -070025#include <grpc/support/log.h>
26#include <grpc/support/sync.h>
27#include <grpc/support/thd.h>
Craig Tiller3e9f98e2017-05-12 13:17:47 -070028#include <grpc/support/tls.h>
29#include <grpc/support/useful.h>
30
Craig Tiller57bb9a92017-08-31 16:44:15 -070031#include "src/core/lib/debug/stats.h"
Craig Tiller9533d042016-03-25 17:11:06 -070032#include "src/core/lib/iomgr/exec_ctx.h"
Craig Tiller3e9f98e2017-05-12 13:17:47 -070033#include "src/core/lib/support/spinlock.h"
David Garcia Quintas4bc34632015-10-07 16:12:35 -070034
Craig Tiller2477cf32017-09-26 12:20:35 -070035#define MAX_DEPTH 2
36
Craig Tiller3e9f98e2017-05-12 13:17:47 -070037typedef struct {
David Garcia Quintas4bc34632015-10-07 16:12:35 -070038 gpr_mu mu;
Craig Tiller3e9f98e2017-05-12 13:17:47 -070039 gpr_cv cv;
40 grpc_closure_list elems;
Craig Tiller2477cf32017-09-26 12:20:35 -070041 size_t depth;
Craig Tiller3e9f98e2017-05-12 13:17:47 -070042 bool shutdown;
Craig Tillerc2fb83e2017-07-18 12:38:25 -070043 bool queued_long_job;
Craig Tiller3e9f98e2017-05-12 13:17:47 -070044 gpr_thd_id id;
45} thread_state;
David Garcia Quintas4bc34632015-10-07 16:12:35 -070046
Craig Tillerbaa14a92017-11-03 09:09:36 -070047static thread_state* g_thread_state;
Craig Tiller3e9f98e2017-05-12 13:17:47 -070048static size_t g_max_threads;
49static gpr_atm g_cur_threads;
50static gpr_spinlock g_adding_thread_lock = GPR_SPINLOCK_STATIC_INITIALIZER;
51
52GPR_TLS_DECL(g_this_thread_state);
53
ncteisen72afb762017-11-10 12:23:12 -080054grpc_core::TraceFlag executor_trace(false, "executor");
Craig Tilleraf723b02017-07-17 17:56:28 -070055
Craig Tillerbaa14a92017-11-03 09:09:36 -070056static void executor_thread(void* arg);
David Garcia Quintas4bc34632015-10-07 16:12:35 -070057
Craig Tillerbaa14a92017-11-03 09:09:36 -070058static size_t run_closures(grpc_exec_ctx* exec_ctx, grpc_closure_list list) {
Craig Tiller2477cf32017-09-26 12:20:35 -070059 size_t n = 0;
David Garcia Quintas4bc34632015-10-07 16:12:35 -070060
Craig Tillerbaa14a92017-11-03 09:09:36 -070061 grpc_closure* c = list.head;
Craig Tiller2477cf32017-09-26 12:20:35 -070062 while (c != NULL) {
Craig Tillerbaa14a92017-11-03 09:09:36 -070063 grpc_closure* next = c->next_data.next;
64 grpc_error* error = c->error_data.error;
Craig Tiller6014e8a2017-10-16 13:50:29 -070065 if (executor_trace.enabled()) {
Craig Tilleraf723b02017-07-17 17:56:28 -070066#ifndef NDEBUG
Craig Tiller2477cf32017-09-26 12:20:35 -070067 gpr_log(GPR_DEBUG, "EXECUTOR: run %p [created by %s:%d]", c,
68 c->file_created, c->line_created);
Craig Tilleraf723b02017-07-17 17:56:28 -070069#else
Craig Tiller2477cf32017-09-26 12:20:35 -070070 gpr_log(GPR_DEBUG, "EXECUTOR: run %p", c);
Craig Tilleraf723b02017-07-17 17:56:28 -070071#endif
Craig Tillerfb0262b2017-09-13 15:19:19 -070072 }
Craig Tiller2477cf32017-09-26 12:20:35 -070073#ifndef NDEBUG
74 c->scheduled = false;
75#endif
76 c->cb(exec_ctx, c->cb_arg, error);
77 GRPC_ERROR_UNREF(error);
78 c = next;
79 n++;
80 grpc_exec_ctx_flush(exec_ctx);
Craig Tiller061ef742016-12-29 10:54:09 -080081 }
Craig Tiller3e9f98e2017-05-12 13:17:47 -070082
Craig Tiller2477cf32017-09-26 12:20:35 -070083 return n;
Craig Tiller3e9f98e2017-05-12 13:17:47 -070084}
85
Craig Tiller5e56f002017-05-16 15:02:50 -070086bool grpc_executor_is_threaded() {
87 return gpr_atm_no_barrier_load(&g_cur_threads) > 0;
88}
89
Craig Tillerbaa14a92017-11-03 09:09:36 -070090void grpc_executor_set_threading(grpc_exec_ctx* exec_ctx, bool threading) {
Craig Tiller5e56f002017-05-16 15:02:50 -070091 gpr_atm cur_threads = gpr_atm_no_barrier_load(&g_cur_threads);
92 if (threading) {
93 if (cur_threads > 0) return;
94 g_max_threads = GPR_MAX(1, 2 * gpr_cpu_num_cores());
95 gpr_atm_no_barrier_store(&g_cur_threads, 1);
96 gpr_tls_init(&g_this_thread_state);
Yash Tibrewalca3c1c02017-09-07 22:47:16 -070097 g_thread_state =
Craig Tillerbaa14a92017-11-03 09:09:36 -070098 (thread_state*)gpr_zalloc(sizeof(thread_state) * g_max_threads);
Craig Tiller5e56f002017-05-16 15:02:50 -070099 for (size_t i = 0; i < g_max_threads; i++) {
100 gpr_mu_init(&g_thread_state[i].mu);
101 gpr_cv_init(&g_thread_state[i].cv);
Yash Tibrewal37fdb732017-09-25 16:45:02 -0700102 g_thread_state[i].elems = GRPC_CLOSURE_LIST_INIT;
Craig Tiller5e56f002017-05-16 15:02:50 -0700103 }
104
105 gpr_thd_options opt = gpr_thd_options_default();
106 gpr_thd_options_set_joinable(&opt);
107 gpr_thd_new(&g_thread_state[0].id, executor_thread, &g_thread_state[0],
108 &opt);
109 } else {
110 if (cur_threads == 0) return;
111 for (size_t i = 0; i < g_max_threads; i++) {
112 gpr_mu_lock(&g_thread_state[i].mu);
113 g_thread_state[i].shutdown = true;
114 gpr_cv_signal(&g_thread_state[i].cv);
115 gpr_mu_unlock(&g_thread_state[i].mu);
116 }
Craig Tillerf7c8c9f2017-05-17 15:22:05 -0700117 /* ensure no thread is adding a new thread... once this is past, then
118 no thread will try to add a new one either (since shutdown is true) */
119 gpr_spinlock_lock(&g_adding_thread_lock);
120 gpr_spinlock_unlock(&g_adding_thread_lock);
Craig Tiller5e56f002017-05-16 15:02:50 -0700121 for (gpr_atm i = 0; i < g_cur_threads; i++) {
122 gpr_thd_join(g_thread_state[i].id);
123 }
124 gpr_atm_no_barrier_store(&g_cur_threads, 0);
125 for (size_t i = 0; i < g_max_threads; i++) {
126 gpr_mu_destroy(&g_thread_state[i].mu);
127 gpr_cv_destroy(&g_thread_state[i].cv);
Craig Tiller2477cf32017-09-26 12:20:35 -0700128 run_closures(exec_ctx, g_thread_state[i].elems);
Craig Tiller5e56f002017-05-16 15:02:50 -0700129 }
130 gpr_free(g_thread_state);
131 gpr_tls_destroy(&g_this_thread_state);
David Garcia Quintas4bc34632015-10-07 16:12:35 -0700132 }
Craig Tiller5e56f002017-05-16 15:02:50 -0700133}
134
Craig Tillerbaa14a92017-11-03 09:09:36 -0700135void grpc_executor_init(grpc_exec_ctx* exec_ctx) {
Craig Tillerb9b01ce2017-05-12 13:47:10 -0700136 gpr_atm_no_barrier_store(&g_cur_threads, 0);
Craig Tiller5e56f002017-05-16 15:02:50 -0700137 grpc_executor_set_threading(exec_ctx, true);
138}
139
Craig Tillerbaa14a92017-11-03 09:09:36 -0700140void grpc_executor_shutdown(grpc_exec_ctx* exec_ctx) {
Craig Tiller5e56f002017-05-16 15:02:50 -0700141 grpc_executor_set_threading(exec_ctx, false);
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700142}
143
Craig Tillerbaa14a92017-11-03 09:09:36 -0700144static void executor_thread(void* arg) {
145 thread_state* ts = (thread_state*)arg;
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700146 gpr_tls_set(&g_this_thread_state, (intptr_t)ts);
147
Craig Tiller89962082017-05-12 14:30:42 -0700148 grpc_exec_ctx exec_ctx =
149 GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL);
150
Craig Tiller2477cf32017-09-26 12:20:35 -0700151 size_t subtract_depth = 0;
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700152 for (;;) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700153 if (executor_trace.enabled()) {
Craig Tiller2477cf32017-09-26 12:20:35 -0700154 gpr_log(GPR_DEBUG, "EXECUTOR[%d]: step (sub_depth=%" PRIdPTR ")",
155 (int)(ts - g_thread_state), subtract_depth);
Craig Tilleraf723b02017-07-17 17:56:28 -0700156 }
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700157 gpr_mu_lock(&ts->mu);
Craig Tiller2477cf32017-09-26 12:20:35 -0700158 ts->depth -= subtract_depth;
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700159 while (grpc_closure_list_empty(ts->elems) && !ts->shutdown) {
Craig Tiller1ab56d82017-07-19 09:55:57 -0700160 ts->queued_long_job = false;
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700161 gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_REALTIME));
162 }
163 if (ts->shutdown) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700164 if (executor_trace.enabled()) {
Craig Tiller7d079942017-09-05 12:46:48 -0700165 gpr_log(GPR_DEBUG, "EXECUTOR[%d]: shutdown",
166 (int)(ts - g_thread_state));
Craig Tilleraf723b02017-07-17 17:56:28 -0700167 }
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700168 gpr_mu_unlock(&ts->mu);
169 break;
170 }
Craig Tiller57bb9a92017-08-31 16:44:15 -0700171 GRPC_STATS_INC_EXECUTOR_QUEUE_DRAINED(&exec_ctx);
Craig Tiller2477cf32017-09-26 12:20:35 -0700172 grpc_closure_list exec = ts->elems;
Yash Tibrewal37fdb732017-09-25 16:45:02 -0700173 ts->elems = GRPC_CLOSURE_LIST_INIT;
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700174 gpr_mu_unlock(&ts->mu);
Craig Tiller6014e8a2017-10-16 13:50:29 -0700175 if (executor_trace.enabled()) {
Craig Tiller7d079942017-09-05 12:46:48 -0700176 gpr_log(GPR_DEBUG, "EXECUTOR[%d]: execute", (int)(ts - g_thread_state));
Craig Tilleraf723b02017-07-17 17:56:28 -0700177 }
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700178
Craig Tiller49a09642017-09-20 03:46:27 +0000179 grpc_exec_ctx_invalidate_now(&exec_ctx);
Craig Tiller2477cf32017-09-26 12:20:35 -0700180 subtract_depth = run_closures(&exec_ctx, exec);
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700181 }
Craig Tiller89962082017-05-12 14:30:42 -0700182 grpc_exec_ctx_finish(&exec_ctx);
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700183}
184
Craig Tillerbaa14a92017-11-03 09:09:36 -0700185static void executor_push(grpc_exec_ctx* exec_ctx, grpc_closure* closure,
186 grpc_error* error, bool is_short) {
Craig Tiller2f767eb2017-07-20 12:06:14 -0700187 bool retry_push;
Craig Tiller07d2fa72017-09-07 13:13:36 -0700188 if (is_short) {
189 GRPC_STATS_INC_EXECUTOR_SCHEDULED_SHORT_ITEMS(exec_ctx);
190 } else {
191 GRPC_STATS_INC_EXECUTOR_SCHEDULED_LONG_ITEMS(exec_ctx);
192 }
Craig Tiller2f767eb2017-07-20 12:06:14 -0700193 do {
194 retry_push = false;
195 size_t cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads);
196 if (cur_thread_count == 0) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700197 if (executor_trace.enabled()) {
Craig Tillerc2fb83e2017-07-18 12:38:25 -0700198#ifndef NDEBUG
Craig Tiller2f767eb2017-07-20 12:06:14 -0700199 gpr_log(GPR_DEBUG, "EXECUTOR: schedule %p (created %s:%d) inline",
200 closure, closure->file_created, closure->line_created);
Craig Tillerc2fb83e2017-07-18 12:38:25 -0700201#else
Craig Tiller2f767eb2017-07-20 12:06:14 -0700202 gpr_log(GPR_DEBUG, "EXECUTOR: schedule %p inline", closure);
Craig Tillerc2fb83e2017-07-18 12:38:25 -0700203#endif
Craig Tillerc2fb83e2017-07-18 12:38:25 -0700204 }
Craig Tiller2f767eb2017-07-20 12:06:14 -0700205 grpc_closure_list_append(&exec_ctx->closure_list, closure, error);
206 return;
Craig Tillerc2fb83e2017-07-18 12:38:25 -0700207 }
Craig Tillerbaa14a92017-11-03 09:09:36 -0700208 thread_state* ts = (thread_state*)gpr_tls_get(&g_this_thread_state);
Craig Tiller2f767eb2017-07-20 12:06:14 -0700209 if (ts == NULL) {
210 ts = &g_thread_state[GPR_HASH_POINTER(exec_ctx, cur_thread_count)];
Craig Tiller022ad3a2017-09-07 13:01:56 -0700211 } else {
212 GRPC_STATS_INC_EXECUTOR_SCHEDULED_TO_SELF(exec_ctx);
Craig Tillerc2fb83e2017-07-18 12:38:25 -0700213 }
Craig Tillerbaa14a92017-11-03 09:09:36 -0700214 thread_state* orig_ts = ts;
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700215
Craig Tiller2f767eb2017-07-20 12:06:14 -0700216 bool try_new_thread;
217 for (;;) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700218 if (executor_trace.enabled()) {
Craig Tiller2f767eb2017-07-20 12:06:14 -0700219#ifndef NDEBUG
Craig Tiller8af33db2017-07-20 16:37:36 -0700220 gpr_log(
221 GPR_DEBUG,
222 "EXECUTOR: try to schedule %p (%s) (created %s:%d) to thread %d",
223 closure, is_short ? "short" : "long", closure->file_created,
224 closure->line_created, (int)(ts - g_thread_state));
Craig Tiller2f767eb2017-07-20 12:06:14 -0700225#else
Craig Tiller8af33db2017-07-20 16:37:36 -0700226 gpr_log(GPR_DEBUG, "EXECUTOR: try to schedule %p (%s) to thread %d",
227 closure, is_short ? "short" : "long",
228 (int)(ts - g_thread_state));
Craig Tiller2f767eb2017-07-20 12:06:14 -0700229#endif
230 }
231 gpr_mu_lock(&ts->mu);
232 if (ts->queued_long_job) {
Craig Tillerb0ce25e2017-09-08 14:42:26 -0700233 // if there's a long job queued, we never queue anything else to this
234 // queue (since long jobs can take 'infinite' time and we need to
235 // guarantee no starvation)
236 // ... spin through queues and try again
Craig Tiller2f767eb2017-07-20 12:06:14 -0700237 gpr_mu_unlock(&ts->mu);
Craig Tillera0d51852017-07-21 13:49:49 -0700238 size_t idx = (size_t)(ts - g_thread_state);
Craig Tiller18908832017-07-21 13:27:27 -0700239 ts = &g_thread_state[(idx + 1) % cur_thread_count];
Craig Tiller2f767eb2017-07-20 12:06:14 -0700240 if (ts == orig_ts) {
241 retry_push = true;
242 try_new_thread = true;
243 break;
244 }
245 continue;
246 }
247 if (grpc_closure_list_empty(ts->elems)) {
Craig Tiller022ad3a2017-09-07 13:01:56 -0700248 GRPC_STATS_INC_EXECUTOR_WAKEUP_INITIATED(exec_ctx);
Craig Tiller2f767eb2017-07-20 12:06:14 -0700249 gpr_cv_signal(&ts->cv);
250 }
251 grpc_closure_list_append(&ts->elems, closure, error);
Craig Tiller2477cf32017-09-26 12:20:35 -0700252 ts->depth++;
253 try_new_thread = ts->depth > MAX_DEPTH &&
Craig Tiller2f767eb2017-07-20 12:06:14 -0700254 cur_thread_count < g_max_threads && !ts->shutdown;
255 if (!is_short) ts->queued_long_job = true;
256 gpr_mu_unlock(&ts->mu);
257 break;
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700258 }
Craig Tiller2f767eb2017-07-20 12:06:14 -0700259 if (try_new_thread && gpr_spinlock_trylock(&g_adding_thread_lock)) {
260 cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads);
261 if (cur_thread_count < g_max_threads) {
262 gpr_atm_no_barrier_store(&g_cur_threads, cur_thread_count + 1);
263
264 gpr_thd_options opt = gpr_thd_options_default();
265 gpr_thd_options_set_joinable(&opt);
266 gpr_thd_new(&g_thread_state[cur_thread_count].id, executor_thread,
267 &g_thread_state[cur_thread_count], &opt);
268 }
269 gpr_spinlock_unlock(&g_adding_thread_lock);
270 }
Craig Tiller07d2fa72017-09-07 13:13:36 -0700271 if (retry_push) {
272 GRPC_STATS_INC_EXECUTOR_PUSH_RETRIES(exec_ctx);
273 }
Craig Tiller2f767eb2017-07-20 12:06:14 -0700274 } while (retry_push);
David Garcia Quintas4bc34632015-10-07 16:12:35 -0700275}
Craig Tiller91031da2016-12-28 15:44:25 -0800276
Craig Tillerbaa14a92017-11-03 09:09:36 -0700277static void executor_push_short(grpc_exec_ctx* exec_ctx, grpc_closure* closure,
278 grpc_error* error) {
Craig Tiller7a82afd2017-07-18 09:40:40 -0700279 executor_push(exec_ctx, closure, error, true);
280}
281
Craig Tillerbaa14a92017-11-03 09:09:36 -0700282static void executor_push_long(grpc_exec_ctx* exec_ctx, grpc_closure* closure,
283 grpc_error* error) {
Craig Tiller7a82afd2017-07-18 09:40:40 -0700284 executor_push(exec_ctx, closure, error, false);
285}
286
287static const grpc_closure_scheduler_vtable executor_vtable_short = {
288 executor_push_short, executor_push_short, "executor"};
289static grpc_closure_scheduler executor_scheduler_short = {
290 &executor_vtable_short};
291
292static const grpc_closure_scheduler_vtable executor_vtable_long = {
293 executor_push_long, executor_push_long, "executor"};
294static grpc_closure_scheduler executor_scheduler_long = {&executor_vtable_long};
295
Craig Tillerbaa14a92017-11-03 09:09:36 -0700296grpc_closure_scheduler* grpc_executor_scheduler(
Craig Tiller7a82afd2017-07-18 09:40:40 -0700297 grpc_executor_job_length length) {
298 return length == GRPC_EXECUTOR_SHORT ? &executor_scheduler_short
299 : &executor_scheduler_long;
300}