blob: 3c3a784966ab7b8bc95cbeda86da312502184fff [file] [log] [blame]
David Garcia Quintas4bc34632015-10-07 16:12:35 -07001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2015 gRPC authors.
David Garcia Quintas4bc34632015-10-07 16:12:35 -07004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
David Garcia Quintas4bc34632015-10-07 16:12:35 -07008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
David Garcia Quintas4bc34632015-10-07 16:12:35 -070010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
David Garcia Quintas4bc34632015-10-07 16:12:35 -070016 *
17 */
18
Alexander Polcyndb3e8982018-02-21 16:59:24 -080019#include <grpc/support/port_platform.h>
20
Craig Tiller9533d042016-03-25 17:11:06 -070021#include "src/core/lib/iomgr/executor.h"
David Garcia Quintas4bc34632015-10-07 16:12:35 -070022
23#include <string.h>
24
25#include <grpc/support/alloc.h>
Craig Tiller3e9f98e2017-05-12 13:17:47 -070026#include <grpc/support/cpu.h>
David Garcia Quintas4bc34632015-10-07 16:12:35 -070027#include <grpc/support/log.h>
28#include <grpc/support/sync.h>
Craig Tiller3e9f98e2017-05-12 13:17:47 -070029
Craig Tiller57bb9a92017-08-31 16:44:15 -070030#include "src/core/lib/debug/stats.h"
Vijay Paib6cf1232018-01-25 21:02:26 -080031#include "src/core/lib/gpr/tls.h"
Vijay Paid4d0a302018-01-25 13:24:03 -080032#include "src/core/lib/gpr/useful.h"
Sree Kuchibhotla37e49902018-07-11 18:46:29 -070033#include "src/core/lib/gprpp/memory.h"
Craig Tiller9533d042016-03-25 17:11:06 -070034#include "src/core/lib/iomgr/exec_ctx.h"
David Garcia Quintas4bc34632015-10-07 16:12:35 -070035
Craig Tiller2477cf32017-09-26 12:20:35 -070036#define MAX_DEPTH 2
37
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -070038#define EXECUTOR_TRACE(format, ...) \
39 if (executor_trace.enabled()) { \
40 gpr_log(GPR_INFO, "EXECUTOR " format, __VA_ARGS__); \
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -070041 }
Craig Tiller3e9f98e2017-05-12 13:17:47 -070042
Sree Kuchibhotla00476fd2018-07-16 18:09:27 -070043#define EXECUTOR_TRACE0(str) \
44 if (executor_trace.enabled()) { \
45 gpr_log(GPR_INFO, "EXECUTOR " str); \
46 }
47
ncteisen72afb762017-11-10 12:23:12 -080048grpc_core::TraceFlag executor_trace(false, "executor");
Craig Tilleraf723b02017-07-17 17:56:28 -070049
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -070050GPR_TLS_DECL(g_this_thread_state);
David Garcia Quintas4bc34632015-10-07 16:12:35 -070051
Sree Kuchibhotla37d8bbc2018-07-10 13:30:57 -070052GrpcExecutor::GrpcExecutor(const char* name) : name_(name) {
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -070053 adding_thread_lock_ = GPR_SPINLOCK_STATIC_INITIALIZER;
Sree Kuchibhotla00476fd2018-07-16 18:09:27 -070054 gpr_atm_rel_store(&num_threads_, 0);
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -070055 max_threads_ = GPR_MAX(1, 2 * gpr_cpu_num_cores());
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -070056}
57
58void GrpcExecutor::Init() { SetThreading(true); }
59
Sree Kuchibhotla00476fd2018-07-16 18:09:27 -070060size_t GrpcExecutor::RunClosures(const char* executor_name,
61 grpc_closure_list list) {
Craig Tiller2477cf32017-09-26 12:20:35 -070062 size_t n = 0;
David Garcia Quintas4bc34632015-10-07 16:12:35 -070063
Craig Tillerbaa14a92017-11-03 09:09:36 -070064 grpc_closure* c = list.head;
Craig Tiller4782d922017-11-10 09:53:21 -080065 while (c != nullptr) {
Craig Tillerbaa14a92017-11-03 09:09:36 -070066 grpc_closure* next = c->next_data.next;
67 grpc_error* error = c->error_data.error;
Craig Tilleraf723b02017-07-17 17:56:28 -070068#ifndef NDEBUG
Sree Kuchibhotla00476fd2018-07-16 18:09:27 -070069 EXECUTOR_TRACE("(%s) run %p [created by %s:%d]", executor_name, c,
70 c->file_created, c->line_created);
Craig Tiller2477cf32017-09-26 12:20:35 -070071 c->scheduled = false;
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -070072#else
Sree Kuchibhotla00476fd2018-07-16 18:09:27 -070073 EXECUTOR_TRACE("(%s) run %p", executor_name, c);
Craig Tiller2477cf32017-09-26 12:20:35 -070074#endif
Yash Tibrewal8cf14702017-12-06 09:47:54 -080075 c->cb(c->cb_arg, error);
Craig Tiller2477cf32017-09-26 12:20:35 -070076 GRPC_ERROR_UNREF(error);
77 c = next;
78 n++;
Yash Tibrewal8cf14702017-12-06 09:47:54 -080079 grpc_core::ExecCtx::Get()->Flush();
Craig Tiller061ef742016-12-29 10:54:09 -080080 }
Craig Tiller3e9f98e2017-05-12 13:17:47 -070081
Craig Tiller2477cf32017-09-26 12:20:35 -070082 return n;
Craig Tiller3e9f98e2017-05-12 13:17:47 -070083}
84
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -070085bool GrpcExecutor::IsThreaded() const {
Sree Kuchibhotla00476fd2018-07-16 18:09:27 -070086 return gpr_atm_acq_load(&num_threads_) > 0;
Craig Tiller5e56f002017-05-16 15:02:50 -070087}
88
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -070089void GrpcExecutor::SetThreading(bool threading) {
Sree Kuchibhotla00476fd2018-07-16 18:09:27 -070090 gpr_atm curr_num_threads = gpr_atm_acq_load(&num_threads_);
91 EXECUTOR_TRACE("(%s) SetThreading(%d) begin", name_, threading);
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -070092
Craig Tiller5e56f002017-05-16 15:02:50 -070093 if (threading) {
Sree Kuchibhotla00476fd2018-07-16 18:09:27 -070094 if (curr_num_threads > 0) {
95 EXECUTOR_TRACE("(%s) SetThreading(true). curr_num_threads == 0", name_);
96 return;
97 }
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -070098
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -070099 GPR_ASSERT(num_threads_ == 0);
Sree Kuchibhotla00476fd2018-07-16 18:09:27 -0700100 gpr_atm_rel_store(&num_threads_, 1);
Sree Kuchibhotla8cc3a002018-07-10 13:32:35 -0700101 gpr_tls_init(&g_this_thread_state);
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700102 thd_state_ = static_cast<ThreadState*>(
103 gpr_zalloc(sizeof(ThreadState) * max_threads_));
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700104
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700105 for (size_t i = 0; i < max_threads_; i++) {
106 gpr_mu_init(&thd_state_[i].mu);
107 gpr_cv_init(&thd_state_[i].cv);
108 thd_state_[i].id = i;
Sree Kuchibhotla00476fd2018-07-16 18:09:27 -0700109 thd_state_[i].name = name_;
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700110 thd_state_[i].thd = grpc_core::Thread();
111 thd_state_[i].elems = GRPC_CLOSURE_LIST_INIT;
Craig Tiller5e56f002017-05-16 15:02:50 -0700112 }
113
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700114 thd_state_[0].thd =
115 grpc_core::Thread(name_, &GrpcExecutor::ThreadMain, &thd_state_[0]);
116 thd_state_[0].thd.Start();
Sree Kuchibhotla02872df2018-07-10 14:21:51 -0700117 } else { // !threading
Sree Kuchibhotla00476fd2018-07-16 18:09:27 -0700118 if (curr_num_threads == 0) {
119 EXECUTOR_TRACE("(%s) SetThreading(false). curr_num_threads == 0", name_);
120 return;
121 }
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700122
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700123 for (size_t i = 0; i < max_threads_; i++) {
124 gpr_mu_lock(&thd_state_[i].mu);
125 thd_state_[i].shutdown = true;
126 gpr_cv_signal(&thd_state_[i].cv);
127 gpr_mu_unlock(&thd_state_[i].mu);
Craig Tiller5e56f002017-05-16 15:02:50 -0700128 }
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700129
130 /* Ensure no thread is adding a new thread. Once this is past, then no
131 * thread will try to add a new one either (since shutdown is true) */
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700132 gpr_spinlock_lock(&adding_thread_lock_);
133 gpr_spinlock_unlock(&adding_thread_lock_);
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700134
Sree Kuchibhotla7b8a6b62018-07-11 11:51:46 -0700135 curr_num_threads = gpr_atm_no_barrier_load(&num_threads_);
136 for (gpr_atm i = 0; i < curr_num_threads; i++) {
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700137 thd_state_[i].thd.Join();
Sree Kuchibhotla00476fd2018-07-16 18:09:27 -0700138 EXECUTOR_TRACE("(%s) Thread %" PRIdPTR " of %" PRIdPTR " joined", name_,
139 i + 1, curr_num_threads);
Craig Tiller5e56f002017-05-16 15:02:50 -0700140 }
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700141
Sree Kuchibhotla00476fd2018-07-16 18:09:27 -0700142 gpr_atm_rel_store(&num_threads_, 0);
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700143 for (size_t i = 0; i < max_threads_; i++) {
144 gpr_mu_destroy(&thd_state_[i].mu);
145 gpr_cv_destroy(&thd_state_[i].cv);
Sree Kuchibhotla00476fd2018-07-16 18:09:27 -0700146 RunClosures(thd_state_[i].name, thd_state_[i].elems);
Craig Tiller5e56f002017-05-16 15:02:50 -0700147 }
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700148
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700149 gpr_free(thd_state_);
Craig Tiller5e56f002017-05-16 15:02:50 -0700150 gpr_tls_destroy(&g_this_thread_state);
David Garcia Quintas4bc34632015-10-07 16:12:35 -0700151 }
Sree Kuchibhotla00476fd2018-07-16 18:09:27 -0700152
153 EXECUTOR_TRACE("(%s) SetThreading(%d) done", name_, threading);
Craig Tiller5e56f002017-05-16 15:02:50 -0700154}
155
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700156void GrpcExecutor::Shutdown() { SetThreading(false); }
Craig Tiller5e56f002017-05-16 15:02:50 -0700157
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700158void GrpcExecutor::ThreadMain(void* arg) {
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700159 ThreadState* ts = static_cast<ThreadState*>(arg);
Sree Kuchibhotla02872df2018-07-10 14:21:51 -0700160 gpr_tls_set(&g_this_thread_state, reinterpret_cast<intptr_t>(ts));
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700161
kpayson6446a60592018-06-20 15:18:38 -0700162 grpc_core::ExecCtx exec_ctx(GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD);
Craig Tiller89962082017-05-12 14:30:42 -0700163
Craig Tiller2477cf32017-09-26 12:20:35 -0700164 size_t subtract_depth = 0;
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700165 for (;;) {
Sree Kuchibhotla00476fd2018-07-16 18:09:27 -0700166 EXECUTOR_TRACE("(%s) [%" PRIdPTR "]: step (sub_depth=%" PRIdPTR ")",
167 ts->name, ts->id, subtract_depth);
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700168
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700169 gpr_mu_lock(&ts->mu);
Craig Tiller2477cf32017-09-26 12:20:35 -0700170 ts->depth -= subtract_depth;
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700171 // Wait for closures to be enqueued or for the executor to be shutdown
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700172 while (grpc_closure_list_empty(ts->elems) && !ts->shutdown) {
Craig Tiller1ab56d82017-07-19 09:55:57 -0700173 ts->queued_long_job = false;
Sree Kuchibhotla54961bb2017-12-04 12:50:27 -0800174 gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_MONOTONIC));
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700175 }
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700176
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700177 if (ts->shutdown) {
Sree Kuchibhotla00476fd2018-07-16 18:09:27 -0700178 EXECUTOR_TRACE("(%s) [%" PRIdPTR "]: shutdown", ts->name, ts->id);
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700179 gpr_mu_unlock(&ts->mu);
180 break;
181 }
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700182
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800183 GRPC_STATS_INC_EXECUTOR_QUEUE_DRAINED();
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700184 grpc_closure_list closures = ts->elems;
Yash Tibrewal37fdb732017-09-25 16:45:02 -0700185 ts->elems = GRPC_CLOSURE_LIST_INIT;
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700186 gpr_mu_unlock(&ts->mu);
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700187
Sree Kuchibhotla00476fd2018-07-16 18:09:27 -0700188 EXECUTOR_TRACE("(%s) [%" PRIdPTR "]: execute", ts->name, ts->id);
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700189
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800190 grpc_core::ExecCtx::Get()->InvalidateNow();
Sree Kuchibhotla00476fd2018-07-16 18:09:27 -0700191 subtract_depth = RunClosures(ts->name, closures);
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700192 }
193}
194
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700195void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error,
196 bool is_short) {
Craig Tiller2f767eb2017-07-20 12:06:14 -0700197 bool retry_push;
Craig Tiller07d2fa72017-09-07 13:13:36 -0700198 if (is_short) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800199 GRPC_STATS_INC_EXECUTOR_SCHEDULED_SHORT_ITEMS();
Craig Tiller07d2fa72017-09-07 13:13:36 -0700200 } else {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800201 GRPC_STATS_INC_EXECUTOR_SCHEDULED_LONG_ITEMS();
Craig Tiller07d2fa72017-09-07 13:13:36 -0700202 }
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700203
Craig Tiller2f767eb2017-07-20 12:06:14 -0700204 do {
205 retry_push = false;
Noah Eisen4d20a662018-02-09 09:34:04 -0800206 size_t cur_thread_count =
Sree Kuchibhotla00476fd2018-07-16 18:09:27 -0700207 static_cast<size_t>(gpr_atm_acq_load(&num_threads_));
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700208
209 // If the number of threads is zero(i.e either the executor is not threaded
210 // or already shutdown), then queue the closure on the exec context itself
Craig Tiller2f767eb2017-07-20 12:06:14 -0700211 if (cur_thread_count == 0) {
Craig Tillerc2fb83e2017-07-18 12:38:25 -0700212#ifndef NDEBUG
Sree Kuchibhotla00476fd2018-07-16 18:09:27 -0700213 EXECUTOR_TRACE("(%s) schedule %p (created %s:%d) inline", name_, closure,
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700214 closure->file_created, closure->line_created);
Craig Tillerc2fb83e2017-07-18 12:38:25 -0700215#else
Sree Kuchibhotla00476fd2018-07-16 18:09:27 -0700216 EXECUTOR_TRACE("(%s) schedule %p inline", name_, closure);
Craig Tillerc2fb83e2017-07-18 12:38:25 -0700217#endif
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800218 grpc_closure_list_append(grpc_core::ExecCtx::Get()->closure_list(),
219 closure, error);
Craig Tiller2f767eb2017-07-20 12:06:14 -0700220 return;
Craig Tillerc2fb83e2017-07-18 12:38:25 -0700221 }
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700222
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700223 ThreadState* ts = (ThreadState*)gpr_tls_get(&g_this_thread_state);
Craig Tiller4782d922017-11-10 09:53:21 -0800224 if (ts == nullptr) {
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700225 ts = &thd_state_[GPR_HASH_POINTER(grpc_core::ExecCtx::Get(),
226 cur_thread_count)];
Craig Tiller022ad3a2017-09-07 13:01:56 -0700227 } else {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800228 GRPC_STATS_INC_EXECUTOR_SCHEDULED_TO_SELF();
Craig Tillerc2fb83e2017-07-18 12:38:25 -0700229 }
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700230
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700231 ThreadState* orig_ts = ts;
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700232 bool try_new_thread = false;
Sree Kuchibhotla00476fd2018-07-16 18:09:27 -0700233
Craig Tiller2f767eb2017-07-20 12:06:14 -0700234 for (;;) {
Craig Tiller2f767eb2017-07-20 12:06:14 -0700235#ifndef NDEBUG
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700236 EXECUTOR_TRACE(
Sree Kuchibhotla00476fd2018-07-16 18:09:27 -0700237 "(%s) try to schedule %p (%s) (created %s:%d) to thread "
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700238 "%" PRIdPTR,
Sree Kuchibhotla00476fd2018-07-16 18:09:27 -0700239 name_, closure, is_short ? "short" : "long", closure->file_created,
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700240 closure->line_created, ts->id);
Craig Tiller2f767eb2017-07-20 12:06:14 -0700241#else
Sree Kuchibhotla00476fd2018-07-16 18:09:27 -0700242 EXECUTOR_TRACE("(%s) try to schedule %p (%s) to thread %" PRIdPTR, name_,
243 closure, is_short ? "short" : "long", ts->id);
Craig Tiller2f767eb2017-07-20 12:06:14 -0700244#endif
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700245
Craig Tiller2f767eb2017-07-20 12:06:14 -0700246 gpr_mu_lock(&ts->mu);
247 if (ts->queued_long_job) {
Craig Tillerb0ce25e2017-09-08 14:42:26 -0700248 // if there's a long job queued, we never queue anything else to this
249 // queue (since long jobs can take 'infinite' time and we need to
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700250 // guarantee no starvation). Spin through queues and try again
Craig Tiller2f767eb2017-07-20 12:06:14 -0700251 gpr_mu_unlock(&ts->mu);
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700252 size_t idx = ts->id;
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700253 ts = &thd_state_[(idx + 1) % cur_thread_count];
Craig Tiller2f767eb2017-07-20 12:06:14 -0700254 if (ts == orig_ts) {
Sree Kuchibhotla00476fd2018-07-16 18:09:27 -0700255 // We cycled through all the threads. Retry enqueue again by creating
256 // a new thread
257 //
258 // TODO (sreek): There is a potential issue here. We are
259 // unconditionally setting try_new_thread to true here. What if the
260 // executor is shutdown OR if cur_thread_count is already equal to
261 // max_threads ?
262 // (Fortunately, this is not an issue yet (as of july 2018) because
263 // there is only one instance of long job in gRPC and hence we will
264 // not hit this code path)
Craig Tiller2f767eb2017-07-20 12:06:14 -0700265 retry_push = true;
266 try_new_thread = true;
267 break;
268 }
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700269
Sree Kuchibhotla00476fd2018-07-16 18:09:27 -0700270 continue; // Try the next thread-state
Craig Tiller2f767eb2017-07-20 12:06:14 -0700271 }
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700272
273 // == Found the thread state (i.e thread) to enqueue this closure! ==
274
275 // Also, if this thread has been waiting for closures, wake it up.
276 // - If grpc_closure_list_empty() is true and the Executor is not
277 // shutdown, it means that the thread must be waiting in ThreadMain()
278 // - Note that gpr_cv_signal() won't immediately wakeup the thread. That
279 // happens after we release the mutex &ts->mu a few lines below
ncteisenc0b00c32017-12-14 11:30:38 -0800280 if (grpc_closure_list_empty(ts->elems) && !ts->shutdown) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800281 GRPC_STATS_INC_EXECUTOR_WAKEUP_INITIATED();
Craig Tiller2f767eb2017-07-20 12:06:14 -0700282 gpr_cv_signal(&ts->cv);
283 }
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700284
Craig Tiller2f767eb2017-07-20 12:06:14 -0700285 grpc_closure_list_append(&ts->elems, closure, error);
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700286
287 // If we already queued more than MAX_DEPTH number of closures on this
288 // thread, use this as a hint to create more threads
Craig Tiller2477cf32017-09-26 12:20:35 -0700289 ts->depth++;
290 try_new_thread = ts->depth > MAX_DEPTH &&
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700291 cur_thread_count < max_threads_ && !ts->shutdown;
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700292
293 ts->queued_long_job = !is_short;
294
Craig Tiller2f767eb2017-07-20 12:06:14 -0700295 gpr_mu_unlock(&ts->mu);
296 break;
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700297 }
Craig Tiller2f767eb2017-07-20 12:06:14 -0700298
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700299 if (try_new_thread && gpr_spinlock_trylock(&adding_thread_lock_)) {
Sree Kuchibhotla00476fd2018-07-16 18:09:27 -0700300 cur_thread_count = static_cast<size_t>(gpr_atm_acq_load(&num_threads_));
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700301 if (cur_thread_count < max_threads_) {
Sree Kuchibhotla00476fd2018-07-16 18:09:27 -0700302 // Increment num_threads (safe to do a store instead of a cas because we
303 // always increment num_threads under the 'adding_thread_lock')
304 gpr_atm_rel_store(&num_threads_, cur_thread_count + 1);
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700305
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700306 thd_state_[cur_thread_count].thd = grpc_core::Thread(
307 name_, &GrpcExecutor::ThreadMain, &thd_state_[cur_thread_count]);
308 thd_state_[cur_thread_count].thd.Start();
Craig Tiller2f767eb2017-07-20 12:06:14 -0700309 }
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700310 gpr_spinlock_unlock(&adding_thread_lock_);
Craig Tiller2f767eb2017-07-20 12:06:14 -0700311 }
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700312
Craig Tiller07d2fa72017-09-07 13:13:36 -0700313 if (retry_push) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800314 GRPC_STATS_INC_EXECUTOR_PUSH_RETRIES();
Craig Tiller07d2fa72017-09-07 13:13:36 -0700315 }
Craig Tiller2f767eb2017-07-20 12:06:14 -0700316 } while (retry_push);
David Garcia Quintas4bc34632015-10-07 16:12:35 -0700317}
Craig Tiller91031da2016-12-28 15:44:25 -0800318
Sree Kuchibhotla37d8bbc2018-07-10 13:30:57 -0700319static GrpcExecutor* executors[GRPC_NUM_EXECUTORS];
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700320
Sree Kuchibhotla37d8bbc2018-07-10 13:30:57 -0700321void default_enqueue_short(grpc_closure* closure, grpc_error* error) {
322 executors[GRPC_DEFAULT_EXECUTOR]->Enqueue(closure, error,
323 true /* is_short */);
Craig Tiller7a82afd2017-07-18 09:40:40 -0700324}
325
Sree Kuchibhotla37d8bbc2018-07-10 13:30:57 -0700326void default_enqueue_long(grpc_closure* closure, grpc_error* error) {
327 executors[GRPC_DEFAULT_EXECUTOR]->Enqueue(closure, error,
328 false /* is_short */);
Craig Tiller7a82afd2017-07-18 09:40:40 -0700329}
330
Sree Kuchibhotla37d8bbc2018-07-10 13:30:57 -0700331void resolver_enqueue_short(grpc_closure* closure, grpc_error* error) {
332 executors[GRPC_RESOLVER_EXECUTOR]->Enqueue(closure, error,
333 true /* is_short */);
334}
Craig Tiller7a82afd2017-07-18 09:40:40 -0700335
Sree Kuchibhotla37d8bbc2018-07-10 13:30:57 -0700336void resolver_enqueue_long(grpc_closure* closure, grpc_error* error) {
337 executors[GRPC_RESOLVER_EXECUTOR]->Enqueue(closure, error,
338 false /* is_short */);
339}
340
341static const grpc_closure_scheduler_vtable vtables_[] = {
342 {&default_enqueue_short, &default_enqueue_short, "def-ex-short"},
343 {&default_enqueue_long, &default_enqueue_long, "def-ex-long"},
344 {&resolver_enqueue_short, &resolver_enqueue_short, "res-ex-short"},
345 {&resolver_enqueue_long, &resolver_enqueue_long, "res-ex-long"}};
346
347static grpc_closure_scheduler schedulers_[] = {
348 {&vtables_[0]}, // Default short
349 {&vtables_[1]}, // Default long
350 {&vtables_[2]}, // Resolver short
351 {&vtables_[3]} // Resolver long
352};
353
354const char* executor_name(GrpcExecutorType executor_type) {
355 switch (executor_type) {
356 case GRPC_DEFAULT_EXECUTOR:
357 return "default-executor";
358 case GRPC_RESOLVER_EXECUTOR:
359 return "resolver-executor";
360 default:
361 GPR_UNREACHABLE_CODE(return "unknown");
362 }
363 GPR_UNREACHABLE_CODE(return "unknown");
364}
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700365
Sree Kuchibhotla69e5dff2018-07-12 10:58:04 -0700366// grpc_executor_init() and grpc_executor_shutdown() functions are called in the
367// the grpc_init() and grpc_shutdown() code paths which are protected by a
368// global mutex. So it is okay to assume that these functions are thread-safe
Sree Kuchibhotla8aefdd32018-07-11 17:44:08 -0700369void grpc_executor_init() {
Sree Kuchibhotla00476fd2018-07-16 18:09:27 -0700370 EXECUTOR_TRACE0("grpc_executor_init() enter");
Sree Kuchibhotla37d8bbc2018-07-10 13:30:57 -0700371 for (int i = 0; i < GRPC_NUM_EXECUTORS; i++) {
372 // Return if grpc_executor_init() already called earlier
373 if (executors[i] != nullptr) {
Sree Kuchibhotla00476fd2018-07-16 18:09:27 -0700374 // Ideally we should also assert that all executors i.e executor[0] to
375 // executor[GRPC_NUM_EXECUTORS-1] are != nullptr too.
Sree Kuchibhotla37d8bbc2018-07-10 13:30:57 -0700376 GPR_ASSERT(i == 0);
377 break;
378 }
Sree Kuchibhotla69e5dff2018-07-12 10:58:04 -0700379
Sree Kuchibhotla37d8bbc2018-07-10 13:30:57 -0700380 executors[i] = grpc_core::New<GrpcExecutor>(
381 executor_name(static_cast<GrpcExecutorType>(i)));
382 executors[i]->Init();
383 }
Sree Kuchibhotla00476fd2018-07-16 18:09:27 -0700384 EXECUTOR_TRACE0("grpc_executor_init() done");
Sree Kuchibhotla8aefdd32018-07-11 17:44:08 -0700385}
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700386
Sree Kuchibhotla37d8bbc2018-07-10 13:30:57 -0700387grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorType executor_type,
388 GrpcExecutorJobType job_type) {
389 return &schedulers_[(executor_type * GRPC_NUM_EXECUTORS) + job_type];
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700390}
Craig Tiller7a82afd2017-07-18 09:40:40 -0700391
Sree Kuchibhotla1e69b7c2018-07-10 19:30:09 -0700392grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorJobType job_type) {
Sree Kuchibhotla37d8bbc2018-07-10 13:30:57 -0700393 return grpc_executor_scheduler(GRPC_DEFAULT_EXECUTOR, job_type);
394}
395
396void grpc_executor_shutdown() {
Sree Kuchibhotla00476fd2018-07-16 18:09:27 -0700397 EXECUTOR_TRACE0("grpc_executor_shutdown() enter");
Sree Kuchibhotla37d8bbc2018-07-10 13:30:57 -0700398 for (int i = 0; i < GRPC_NUM_EXECUTORS; i++) {
399 // Return if grpc_executor_shutdown() is already called earlier
400 if (executors[i] == nullptr) {
Sree Kuchibhotla00476fd2018-07-16 18:09:27 -0700401 // Ideally we should also assert that all executors i.e executor[0] to
402 // executor[GRPC_NUM_EXECUTORS-1] are nullptr too.
Sree Kuchibhotla37d8bbc2018-07-10 13:30:57 -0700403 GPR_ASSERT(i == 0);
404 break;
405 }
Sree Kuchibhotla37d8bbc2018-07-10 13:30:57 -0700406 executors[i]->Shutdown();
Sree Kuchibhotla00476fd2018-07-16 18:09:27 -0700407 }
408
409 // Delete the executor objects.
410 //
411 // NOTE: It is important to do this in a separate loop (i.e ONLY after all the
412 // executors are 'Shutdown' first) because it is possible for one executor
413 // (that is not shutdown yet) to call Enqueue() on a different executor which
414 // is already shutdown. This is legal and in such cases, the Enqueue()
415 // operation effectively "fails" and enqueues that closure on the calling
416 // thread's exec_ctx.
417 //
418 // By ensuring that all executors are shutdown first, we are also ensuring
419 // that no thread is active across all executors.
420 for (int i = 0; i < GRPC_NUM_EXECUTORS; i++) {
Sree Kuchibhotla37d8bbc2018-07-10 13:30:57 -0700421 grpc_core::Delete<GrpcExecutor>(executors[i]);
422 executors[i] = nullptr;
423 }
Sree Kuchibhotla00476fd2018-07-16 18:09:27 -0700424 EXECUTOR_TRACE0("grpc_executor_shutdown() done");
Sree Kuchibhotla37d8bbc2018-07-10 13:30:57 -0700425}
426
427bool grpc_executor_is_threaded(GrpcExecutorType executor_type) {
428 GPR_ASSERT(executor_type < GRPC_NUM_EXECUTORS);
429 return executors[executor_type]->IsThreaded();
430}
431
432bool grpc_executor_is_threaded() {
433 return grpc_executor_is_threaded(GRPC_DEFAULT_EXECUTOR);
434}
435
436void grpc_executor_set_threading(bool enable) {
Sree Kuchibhotla00476fd2018-07-16 18:09:27 -0700437 EXECUTOR_TRACE("grpc_executor_set_threading(%d) called", enable);
Sree Kuchibhotla37d8bbc2018-07-10 13:30:57 -0700438 for (int i = 0; i < GRPC_NUM_EXECUTORS; i++) {
439 executors[i]->SetThreading(enable);
440 }
Craig Tiller7a82afd2017-07-18 09:40:40 -0700441}