blob: dd5cb2a64e538918703cc768192b6120a58b8985 [file] [log] [blame]
David Garcia Quintas4bc34632015-10-07 16:12:35 -07001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2015 gRPC authors.
David Garcia Quintas4bc34632015-10-07 16:12:35 -07004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
David Garcia Quintas4bc34632015-10-07 16:12:35 -07008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
David Garcia Quintas4bc34632015-10-07 16:12:35 -070010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
David Garcia Quintas4bc34632015-10-07 16:12:35 -070016 *
17 */
18
Craig Tiller9533d042016-03-25 17:11:06 -070019#include "src/core/lib/iomgr/executor.h"
David Garcia Quintas4bc34632015-10-07 16:12:35 -070020
21#include <string.h>
22
23#include <grpc/support/alloc.h>
Craig Tiller3e9f98e2017-05-12 13:17:47 -070024#include <grpc/support/cpu.h>
David Garcia Quintas4bc34632015-10-07 16:12:35 -070025#include <grpc/support/log.h>
26#include <grpc/support/sync.h>
27#include <grpc/support/thd.h>
Craig Tiller3e9f98e2017-05-12 13:17:47 -070028#include <grpc/support/tls.h>
29#include <grpc/support/useful.h>
30
Craig Tiller57bb9a92017-08-31 16:44:15 -070031#include "src/core/lib/debug/stats.h"
Craig Tiller9533d042016-03-25 17:11:06 -070032#include "src/core/lib/iomgr/exec_ctx.h"
Craig Tiller3e9f98e2017-05-12 13:17:47 -070033#include "src/core/lib/support/spinlock.h"
David Garcia Quintas4bc34632015-10-07 16:12:35 -070034
Craig Tiller1ed31182017-05-24 16:42:35 -070035#define MAX_DEPTH 2
Craig Tiller3e9f98e2017-05-12 13:17:47 -070036
37typedef struct {
David Garcia Quintas4bc34632015-10-07 16:12:35 -070038 gpr_mu mu;
Craig Tiller3e9f98e2017-05-12 13:17:47 -070039 gpr_cv cv;
40 grpc_closure_list elems;
41 size_t depth;
42 bool shutdown;
43 gpr_thd_id id;
44} thread_state;
David Garcia Quintas4bc34632015-10-07 16:12:35 -070045
Craig Tiller3e9f98e2017-05-12 13:17:47 -070046static thread_state *g_thread_state;
47static size_t g_max_threads;
48static gpr_atm g_cur_threads;
49static gpr_spinlock g_adding_thread_lock = GPR_SPINLOCK_STATIC_INITIALIZER;
50
51GPR_TLS_DECL(g_this_thread_state);
52
53static void executor_thread(void *arg);
David Garcia Quintas4bc34632015-10-07 16:12:35 -070054
Craig Tiller3e9f98e2017-05-12 13:17:47 -070055static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) {
56 size_t n = 0;
David Garcia Quintas4bc34632015-10-07 16:12:35 -070057
Craig Tiller3e9f98e2017-05-12 13:17:47 -070058 grpc_closure *c = list.head;
Craig Tiller061ef742016-12-29 10:54:09 -080059 while (c != NULL) {
60 grpc_closure *next = c->next_data.next;
61 grpc_error *error = c->error_data.error;
ncteisenf8061e82017-06-09 10:44:42 -070062#ifndef NDEBUG
Craig Tillerb9b01ce2017-05-12 13:47:10 -070063 c->scheduled = false;
Mark D. Roth43f774e2017-04-04 16:35:37 -070064#endif
Craig Tiller0b093412017-01-03 09:49:07 -080065 c->cb(exec_ctx, c->cb_arg, error);
Craig Tiller061ef742016-12-29 10:54:09 -080066 GRPC_ERROR_UNREF(error);
67 c = next;
Craig Tillerf21acdd2017-06-08 08:09:35 -070068 n++;
Craig Tiller061ef742016-12-29 10:54:09 -080069 }
Craig Tiller3e9f98e2017-05-12 13:17:47 -070070
71 return n;
72}
73
Craig Tiller5e56f002017-05-16 15:02:50 -070074bool grpc_executor_is_threaded() {
75 return gpr_atm_no_barrier_load(&g_cur_threads) > 0;
76}
77
78void grpc_executor_set_threading(grpc_exec_ctx *exec_ctx, bool threading) {
79 gpr_atm cur_threads = gpr_atm_no_barrier_load(&g_cur_threads);
80 if (threading) {
81 if (cur_threads > 0) return;
82 g_max_threads = GPR_MAX(1, 2 * gpr_cpu_num_cores());
83 gpr_atm_no_barrier_store(&g_cur_threads, 1);
84 gpr_tls_init(&g_this_thread_state);
85 g_thread_state = gpr_zalloc(sizeof(thread_state) * g_max_threads);
86 for (size_t i = 0; i < g_max_threads; i++) {
87 gpr_mu_init(&g_thread_state[i].mu);
88 gpr_cv_init(&g_thread_state[i].cv);
89 g_thread_state[i].elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
90 }
91
92 gpr_thd_options opt = gpr_thd_options_default();
93 gpr_thd_options_set_joinable(&opt);
94 gpr_thd_new(&g_thread_state[0].id, executor_thread, &g_thread_state[0],
95 &opt);
96 } else {
97 if (cur_threads == 0) return;
98 for (size_t i = 0; i < g_max_threads; i++) {
99 gpr_mu_lock(&g_thread_state[i].mu);
100 g_thread_state[i].shutdown = true;
101 gpr_cv_signal(&g_thread_state[i].cv);
102 gpr_mu_unlock(&g_thread_state[i].mu);
103 }
Craig Tillerf7c8c9f2017-05-17 15:22:05 -0700104 /* ensure no thread is adding a new thread... once this is past, then
105 no thread will try to add a new one either (since shutdown is true) */
106 gpr_spinlock_lock(&g_adding_thread_lock);
107 gpr_spinlock_unlock(&g_adding_thread_lock);
Craig Tiller5e56f002017-05-16 15:02:50 -0700108 for (gpr_atm i = 0; i < g_cur_threads; i++) {
109 gpr_thd_join(g_thread_state[i].id);
110 }
111 gpr_atm_no_barrier_store(&g_cur_threads, 0);
112 for (size_t i = 0; i < g_max_threads; i++) {
113 gpr_mu_destroy(&g_thread_state[i].mu);
114 gpr_cv_destroy(&g_thread_state[i].cv);
115 run_closures(exec_ctx, g_thread_state[i].elems);
116 }
117 gpr_free(g_thread_state);
118 gpr_tls_destroy(&g_this_thread_state);
David Garcia Quintas4bc34632015-10-07 16:12:35 -0700119 }
Craig Tiller5e56f002017-05-16 15:02:50 -0700120}
121
122void grpc_executor_init(grpc_exec_ctx *exec_ctx) {
Craig Tillerb9b01ce2017-05-12 13:47:10 -0700123 gpr_atm_no_barrier_store(&g_cur_threads, 0);
Craig Tiller5e56f002017-05-16 15:02:50 -0700124 grpc_executor_set_threading(exec_ctx, true);
125}
126
127void grpc_executor_shutdown(grpc_exec_ctx *exec_ctx) {
128 grpc_executor_set_threading(exec_ctx, false);
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700129}
130
131static void executor_thread(void *arg) {
132 thread_state *ts = arg;
133 gpr_tls_set(&g_this_thread_state, (intptr_t)ts);
134
Craig Tiller89962082017-05-12 14:30:42 -0700135 grpc_exec_ctx exec_ctx =
136 GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL);
137
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700138 size_t subtract_depth = 0;
139 for (;;) {
140 gpr_mu_lock(&ts->mu);
141 ts->depth -= subtract_depth;
142 while (grpc_closure_list_empty(ts->elems) && !ts->shutdown) {
143 gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_REALTIME));
144 }
145 if (ts->shutdown) {
146 gpr_mu_unlock(&ts->mu);
147 break;
148 }
Craig Tiller57bb9a92017-08-31 16:44:15 -0700149 GRPC_STATS_INC_EXECUTOR_QUEUE_DRAINED(&exec_ctx);
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700150 grpc_closure_list exec = ts->elems;
151 ts->elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
152 gpr_mu_unlock(&ts->mu);
153
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700154 subtract_depth = run_closures(&exec_ctx, exec);
Craig Tiller89962082017-05-12 14:30:42 -0700155 grpc_exec_ctx_flush(&exec_ctx);
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700156 }
Craig Tiller89962082017-05-12 14:30:42 -0700157 grpc_exec_ctx_finish(&exec_ctx);
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700158}
159
160static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
161 grpc_error *error) {
Craig Tiller61f96c12017-05-12 13:36:39 -0700162 size_t cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads);
Craig Tiller57bb9a92017-08-31 16:44:15 -0700163 GRPC_STATS_INC_EXECUTOR_SCHEDULED_ITEMS(exec_ctx);
Craig Tillerb9b01ce2017-05-12 13:47:10 -0700164 if (cur_thread_count == 0) {
165 grpc_closure_list_append(&exec_ctx->closure_list, closure, error);
166 return;
167 }
168 thread_state *ts = (thread_state *)gpr_tls_get(&g_this_thread_state);
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700169 if (ts == NULL) {
Craig Tiller61f96c12017-05-12 13:36:39 -0700170 ts = &g_thread_state[GPR_HASH_POINTER(exec_ctx, cur_thread_count)];
Craig Tiller57bb9a92017-08-31 16:44:15 -0700171 } else {
172 GRPC_STATS_INC_EXECUTOR_SCHEDULED_TO_SELF(exec_ctx);
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700173 }
174 gpr_mu_lock(&ts->mu);
Craig Tiller89962082017-05-12 14:30:42 -0700175 if (grpc_closure_list_empty(ts->elems)) {
Craig Tiller57bb9a92017-08-31 16:44:15 -0700176 GRPC_STATS_INC_EXECUTOR_WAKEUP_INITIATED(exec_ctx);
Craig Tiller89962082017-05-12 14:30:42 -0700177 gpr_cv_signal(&ts->cv);
178 }
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700179 grpc_closure_list_append(&ts->elems, closure, error);
180 ts->depth++;
Craig Tillerf7c8c9f2017-05-17 15:22:05 -0700181 bool try_new_thread = ts->depth > MAX_DEPTH &&
182 cur_thread_count < g_max_threads && !ts->shutdown;
Craig Tiller747216f2017-06-08 08:08:11 -0700183 gpr_mu_unlock(&ts->mu);
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700184 if (try_new_thread && gpr_spinlock_trylock(&g_adding_thread_lock)) {
Craig Tiller61f96c12017-05-12 13:36:39 -0700185 cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads);
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700186 if (cur_thread_count < g_max_threads) {
187 gpr_atm_no_barrier_store(&g_cur_threads, cur_thread_count + 1);
188
189 gpr_thd_options opt = gpr_thd_options_default();
190 gpr_thd_options_set_joinable(&opt);
191 gpr_thd_new(&g_thread_state[cur_thread_count].id, executor_thread,
192 &g_thread_state[cur_thread_count], &opt);
193 }
194 gpr_spinlock_unlock(&g_adding_thread_lock);
195 }
David Garcia Quintas4bc34632015-10-07 16:12:35 -0700196}
Craig Tiller91031da2016-12-28 15:44:25 -0800197
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800198static const grpc_closure_scheduler_vtable executor_vtable = {
199 executor_push, executor_push, "executor"};
Craig Tiller91031da2016-12-28 15:44:25 -0800200static grpc_closure_scheduler executor_scheduler = {&executor_vtable};
201grpc_closure_scheduler *grpc_executor_scheduler = &executor_scheduler;