blob: 1ad13b831ddbd6dc9e36ce1c27d5eb32211af0dc [file] [log] [blame]
David Garcia Quintas4bc34632015-10-07 16:12:35 -07001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2015 gRPC authors.
David Garcia Quintas4bc34632015-10-07 16:12:35 -07004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
David Garcia Quintas4bc34632015-10-07 16:12:35 -07008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
David Garcia Quintas4bc34632015-10-07 16:12:35 -070010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
David Garcia Quintas4bc34632015-10-07 16:12:35 -070016 *
17 */
18
Alexander Polcyndb3e8982018-02-21 16:59:24 -080019#include <grpc/support/port_platform.h>
20
Craig Tiller9533d042016-03-25 17:11:06 -070021#include "src/core/lib/iomgr/executor.h"
David Garcia Quintas4bc34632015-10-07 16:12:35 -070022
23#include <string.h>
24
25#include <grpc/support/alloc.h>
Craig Tiller3e9f98e2017-05-12 13:17:47 -070026#include <grpc/support/cpu.h>
David Garcia Quintas4bc34632015-10-07 16:12:35 -070027#include <grpc/support/log.h>
28#include <grpc/support/sync.h>
Craig Tiller3e9f98e2017-05-12 13:17:47 -070029
Craig Tiller57bb9a92017-08-31 16:44:15 -070030#include "src/core/lib/debug/stats.h"
Vijay Paib6cf1232018-01-25 21:02:26 -080031#include "src/core/lib/gpr/tls.h"
Vijay Paid4d0a302018-01-25 13:24:03 -080032#include "src/core/lib/gpr/useful.h"
Sree Kuchibhotla37e49902018-07-11 18:46:29 -070033#include "src/core/lib/gprpp/memory.h"
Craig Tiller9533d042016-03-25 17:11:06 -070034#include "src/core/lib/iomgr/exec_ctx.h"
David Garcia Quintas4bc34632015-10-07 16:12:35 -070035
Craig Tiller2477cf32017-09-26 12:20:35 -070036#define MAX_DEPTH 2
37
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -070038#define EXECUTOR_TRACE(format, ...) \
39 if (executor_trace.enabled()) { \
40 gpr_log(GPR_INFO, "EXECUTOR " format, __VA_ARGS__); \
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -070041 }
Craig Tiller3e9f98e2017-05-12 13:17:47 -070042
ncteisen72afb762017-11-10 12:23:12 -080043grpc_core::TraceFlag executor_trace(false, "executor");
Craig Tilleraf723b02017-07-17 17:56:28 -070044
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -070045GPR_TLS_DECL(g_this_thread_state);
David Garcia Quintas4bc34632015-10-07 16:12:35 -070046
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -070047GrpcExecutor::GrpcExecutor(const char* executor_name) : name_(executor_name) {
48 adding_thread_lock_ = GPR_SPINLOCK_STATIC_INITIALIZER;
49 gpr_atm_no_barrier_store(&num_threads_, 0);
50 max_threads_ = GPR_MAX(1, 2 * gpr_cpu_num_cores());
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -070051}
52
53void GrpcExecutor::Init() { SetThreading(true); }
54
55size_t GrpcExecutor::RunClosures(grpc_closure_list list) {
Craig Tiller2477cf32017-09-26 12:20:35 -070056 size_t n = 0;
David Garcia Quintas4bc34632015-10-07 16:12:35 -070057
Craig Tillerbaa14a92017-11-03 09:09:36 -070058 grpc_closure* c = list.head;
Craig Tiller4782d922017-11-10 09:53:21 -080059 while (c != nullptr) {
Craig Tillerbaa14a92017-11-03 09:09:36 -070060 grpc_closure* next = c->next_data.next;
61 grpc_error* error = c->error_data.error;
Craig Tilleraf723b02017-07-17 17:56:28 -070062#ifndef NDEBUG
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -070063 EXECUTOR_TRACE("run %p [created by %s:%d]", c, c->file_created,
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -070064 c->line_created);
Craig Tiller2477cf32017-09-26 12:20:35 -070065 c->scheduled = false;
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -070066#else
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -070067 EXECUTOR_TRACE("run %p", c);
Craig Tiller2477cf32017-09-26 12:20:35 -070068#endif
Yash Tibrewal8cf14702017-12-06 09:47:54 -080069 c->cb(c->cb_arg, error);
Craig Tiller2477cf32017-09-26 12:20:35 -070070 GRPC_ERROR_UNREF(error);
71 c = next;
72 n++;
Yash Tibrewal8cf14702017-12-06 09:47:54 -080073 grpc_core::ExecCtx::Get()->Flush();
Craig Tiller061ef742016-12-29 10:54:09 -080074 }
Craig Tiller3e9f98e2017-05-12 13:17:47 -070075
Craig Tiller2477cf32017-09-26 12:20:35 -070076 return n;
Craig Tiller3e9f98e2017-05-12 13:17:47 -070077}
78
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -070079bool GrpcExecutor::IsThreaded() const {
80 return gpr_atm_no_barrier_load(&num_threads_) > 0;
Craig Tiller5e56f002017-05-16 15:02:50 -070081}
82
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -070083void GrpcExecutor::SetThreading(bool threading) {
Sree Kuchibhotla7b8a6b62018-07-11 11:51:46 -070084 gpr_atm curr_num_threads = gpr_atm_no_barrier_load(&num_threads_);
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -070085
Craig Tiller5e56f002017-05-16 15:02:50 -070086 if (threading) {
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -070087 if (curr_num_threads > 0) return;
88
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -070089 GPR_ASSERT(num_threads_ == 0);
90 gpr_atm_no_barrier_store(&num_threads_, 1);
Sree Kuchibhotla8cc3a002018-07-10 13:32:35 -070091 gpr_tls_init(&g_this_thread_state);
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -070092 thd_state_ = static_cast<ThreadState*>(
93 gpr_zalloc(sizeof(ThreadState) * max_threads_));
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -070094
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -070095 for (size_t i = 0; i < max_threads_; i++) {
96 gpr_mu_init(&thd_state_[i].mu);
97 gpr_cv_init(&thd_state_[i].cv);
98 thd_state_[i].id = i;
99 thd_state_[i].thd = grpc_core::Thread();
100 thd_state_[i].elems = GRPC_CLOSURE_LIST_INIT;
Craig Tiller5e56f002017-05-16 15:02:50 -0700101 }
102
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700103 thd_state_[0].thd =
104 grpc_core::Thread(name_, &GrpcExecutor::ThreadMain, &thd_state_[0]);
105 thd_state_[0].thd.Start();
Sree Kuchibhotla02872df2018-07-10 14:21:51 -0700106 } else { // !threading
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700107 if (curr_num_threads == 0) return;
108
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700109 for (size_t i = 0; i < max_threads_; i++) {
110 gpr_mu_lock(&thd_state_[i].mu);
111 thd_state_[i].shutdown = true;
112 gpr_cv_signal(&thd_state_[i].cv);
113 gpr_mu_unlock(&thd_state_[i].mu);
Craig Tiller5e56f002017-05-16 15:02:50 -0700114 }
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700115
116 /* Ensure no thread is adding a new thread. Once this is past, then no
117 * thread will try to add a new one either (since shutdown is true) */
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700118 gpr_spinlock_lock(&adding_thread_lock_);
119 gpr_spinlock_unlock(&adding_thread_lock_);
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700120
Sree Kuchibhotla7b8a6b62018-07-11 11:51:46 -0700121 curr_num_threads = gpr_atm_no_barrier_load(&num_threads_);
122 for (gpr_atm i = 0; i < curr_num_threads; i++) {
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700123 thd_state_[i].thd.Join();
Sree Kuchibhotla7b8a6b62018-07-11 11:51:46 -0700124 EXECUTOR_TRACE(" Thread %" PRIdPTR " of %" PRIdPTR " joined", i,
125 curr_num_threads);
Craig Tiller5e56f002017-05-16 15:02:50 -0700126 }
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700127
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700128 gpr_atm_no_barrier_store(&num_threads_, 0);
129 for (size_t i = 0; i < max_threads_; i++) {
130 gpr_mu_destroy(&thd_state_[i].mu);
131 gpr_cv_destroy(&thd_state_[i].cv);
132 RunClosures(thd_state_[i].elems);
Craig Tiller5e56f002017-05-16 15:02:50 -0700133 }
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700134
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700135 gpr_free(thd_state_);
Craig Tiller5e56f002017-05-16 15:02:50 -0700136 gpr_tls_destroy(&g_this_thread_state);
David Garcia Quintas4bc34632015-10-07 16:12:35 -0700137 }
Craig Tiller5e56f002017-05-16 15:02:50 -0700138}
139
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700140void GrpcExecutor::Shutdown() { SetThreading(false); }
Craig Tiller5e56f002017-05-16 15:02:50 -0700141
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700142void GrpcExecutor::ThreadMain(void* arg) {
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700143 ThreadState* ts = static_cast<ThreadState*>(arg);
Sree Kuchibhotla02872df2018-07-10 14:21:51 -0700144 gpr_tls_set(&g_this_thread_state, reinterpret_cast<intptr_t>(ts));
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700145
kpayson6446a60592018-06-20 15:18:38 -0700146 grpc_core::ExecCtx exec_ctx(GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD);
Craig Tiller89962082017-05-12 14:30:42 -0700147
Craig Tiller2477cf32017-09-26 12:20:35 -0700148 size_t subtract_depth = 0;
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700149 for (;;) {
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700150 EXECUTOR_TRACE("[%" PRIdPTR "]: step (sub_depth=%" PRIdPTR ")", ts->id,
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700151 subtract_depth);
152
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700153 gpr_mu_lock(&ts->mu);
Craig Tiller2477cf32017-09-26 12:20:35 -0700154 ts->depth -= subtract_depth;
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700155 // Wait for closures to be enqueued or for the executor to be shutdown
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700156 while (grpc_closure_list_empty(ts->elems) && !ts->shutdown) {
Craig Tiller1ab56d82017-07-19 09:55:57 -0700157 ts->queued_long_job = false;
Sree Kuchibhotla54961bb2017-12-04 12:50:27 -0800158 gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_MONOTONIC));
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700159 }
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700160
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700161 if (ts->shutdown) {
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700162 EXECUTOR_TRACE("[%" PRIdPTR "]: shutdown", ts->id);
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700163 gpr_mu_unlock(&ts->mu);
164 break;
165 }
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700166
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800167 GRPC_STATS_INC_EXECUTOR_QUEUE_DRAINED();
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700168 grpc_closure_list closures = ts->elems;
Yash Tibrewal37fdb732017-09-25 16:45:02 -0700169 ts->elems = GRPC_CLOSURE_LIST_INIT;
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700170 gpr_mu_unlock(&ts->mu);
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700171
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700172 EXECUTOR_TRACE("[%" PRIdPTR "]: execute", ts->id);
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700173
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800174 grpc_core::ExecCtx::Get()->InvalidateNow();
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700175 subtract_depth = RunClosures(closures);
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700176 }
177}
178
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700179void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error,
180 bool is_short) {
Craig Tiller2f767eb2017-07-20 12:06:14 -0700181 bool retry_push;
Craig Tiller07d2fa72017-09-07 13:13:36 -0700182 if (is_short) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800183 GRPC_STATS_INC_EXECUTOR_SCHEDULED_SHORT_ITEMS();
Craig Tiller07d2fa72017-09-07 13:13:36 -0700184 } else {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800185 GRPC_STATS_INC_EXECUTOR_SCHEDULED_LONG_ITEMS();
Craig Tiller07d2fa72017-09-07 13:13:36 -0700186 }
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700187
Craig Tiller2f767eb2017-07-20 12:06:14 -0700188 do {
189 retry_push = false;
Noah Eisen4d20a662018-02-09 09:34:04 -0800190 size_t cur_thread_count =
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700191 static_cast<size_t>(gpr_atm_no_barrier_load(&num_threads_));
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700192
193 // If the number of threads is zero(i.e either the executor is not threaded
194 // or already shutdown), then queue the closure on the exec context itself
Craig Tiller2f767eb2017-07-20 12:06:14 -0700195 if (cur_thread_count == 0) {
Craig Tillerc2fb83e2017-07-18 12:38:25 -0700196#ifndef NDEBUG
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700197 EXECUTOR_TRACE("schedule %p (created %s:%d) inline", closure,
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700198 closure->file_created, closure->line_created);
Craig Tillerc2fb83e2017-07-18 12:38:25 -0700199#else
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700200 EXECUTOR_TRACE("schedule %p inline", closure);
Craig Tillerc2fb83e2017-07-18 12:38:25 -0700201#endif
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800202 grpc_closure_list_append(grpc_core::ExecCtx::Get()->closure_list(),
203 closure, error);
Craig Tiller2f767eb2017-07-20 12:06:14 -0700204 return;
Craig Tillerc2fb83e2017-07-18 12:38:25 -0700205 }
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700206
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700207 ThreadState* ts = (ThreadState*)gpr_tls_get(&g_this_thread_state);
Craig Tiller4782d922017-11-10 09:53:21 -0800208 if (ts == nullptr) {
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700209 ts = &thd_state_[GPR_HASH_POINTER(grpc_core::ExecCtx::Get(),
210 cur_thread_count)];
Craig Tiller022ad3a2017-09-07 13:01:56 -0700211 } else {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800212 GRPC_STATS_INC_EXECUTOR_SCHEDULED_TO_SELF();
Craig Tillerc2fb83e2017-07-18 12:38:25 -0700213 }
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700214
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700215 ThreadState* orig_ts = ts;
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700216
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700217 bool try_new_thread = false;
Craig Tiller2f767eb2017-07-20 12:06:14 -0700218 for (;;) {
Craig Tiller2f767eb2017-07-20 12:06:14 -0700219#ifndef NDEBUG
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700220 EXECUTOR_TRACE(
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700221 "try to schedule %p (%s) (created %s:%d) to thread "
222 "%" PRIdPTR,
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700223 closure, is_short ? "short" : "long", closure->file_created,
224 closure->line_created, ts->id);
Craig Tiller2f767eb2017-07-20 12:06:14 -0700225#else
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700226 EXECUTOR_TRACE("try to schedule %p (%s) to thread %" PRIdPTR, closure,
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700227 is_short ? "short" : "long", ts->id);
Craig Tiller2f767eb2017-07-20 12:06:14 -0700228#endif
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700229
Craig Tiller2f767eb2017-07-20 12:06:14 -0700230 gpr_mu_lock(&ts->mu);
231 if (ts->queued_long_job) {
Craig Tillerb0ce25e2017-09-08 14:42:26 -0700232 // if there's a long job queued, we never queue anything else to this
233 // queue (since long jobs can take 'infinite' time and we need to
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700234 // guarantee no starvation). Spin through queues and try again
Craig Tiller2f767eb2017-07-20 12:06:14 -0700235 gpr_mu_unlock(&ts->mu);
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700236 size_t idx = ts->id;
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700237 ts = &thd_state_[(idx + 1) % cur_thread_count];
Craig Tiller2f767eb2017-07-20 12:06:14 -0700238 if (ts == orig_ts) {
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700239 // We cycled through all the threads. Retry enqueue again (by creating
240 // a new thread)
Craig Tiller2f767eb2017-07-20 12:06:14 -0700241 retry_push = true;
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700242 // TODO (sreek): What if the executor is shutdown OR if
243 // cur_thread_count is already equal to max_threads ? (currently - as
244 // of July 2018, we do not run in to this issue because there is only
245 // one instance of long job in gRPC. This has to be fixed soon)
Craig Tiller2f767eb2017-07-20 12:06:14 -0700246 try_new_thread = true;
247 break;
248 }
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700249
Craig Tiller2f767eb2017-07-20 12:06:14 -0700250 continue;
251 }
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700252
253 // == Found the thread state (i.e thread) to enqueue this closure! ==
254
255 // Also, if this thread has been waiting for closures, wake it up.
256 // - If grpc_closure_list_empty() is true and the Executor is not
257 // shutdown, it means that the thread must be waiting in ThreadMain()
258 // - Note that gpr_cv_signal() won't immediately wakeup the thread. That
259 // happens after we release the mutex &ts->mu a few lines below
ncteisenc0b00c32017-12-14 11:30:38 -0800260 if (grpc_closure_list_empty(ts->elems) && !ts->shutdown) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800261 GRPC_STATS_INC_EXECUTOR_WAKEUP_INITIATED();
Craig Tiller2f767eb2017-07-20 12:06:14 -0700262 gpr_cv_signal(&ts->cv);
263 }
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700264
Craig Tiller2f767eb2017-07-20 12:06:14 -0700265 grpc_closure_list_append(&ts->elems, closure, error);
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700266
267 // If we already queued more than MAX_DEPTH number of closures on this
268 // thread, use this as a hint to create more threads
Craig Tiller2477cf32017-09-26 12:20:35 -0700269 ts->depth++;
270 try_new_thread = ts->depth > MAX_DEPTH &&
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700271 cur_thread_count < max_threads_ && !ts->shutdown;
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700272
273 ts->queued_long_job = !is_short;
274
Craig Tiller2f767eb2017-07-20 12:06:14 -0700275 gpr_mu_unlock(&ts->mu);
276 break;
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700277 }
Craig Tiller2f767eb2017-07-20 12:06:14 -0700278
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700279 if (try_new_thread && gpr_spinlock_trylock(&adding_thread_lock_)) {
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700280 cur_thread_count =
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700281 static_cast<size_t>(gpr_atm_no_barrier_load(&num_threads_));
282 if (cur_thread_count < max_threads_) {
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700283 // Increment num_threads (Safe to do a no_barrier_store instead of a
284 // cas because we always increment num_threads under the
285 // 'adding_thread_lock')
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700286 gpr_atm_no_barrier_store(&num_threads_, cur_thread_count + 1);
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700287
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700288 thd_state_[cur_thread_count].thd = grpc_core::Thread(
289 name_, &GrpcExecutor::ThreadMain, &thd_state_[cur_thread_count]);
290 thd_state_[cur_thread_count].thd.Start();
Craig Tiller2f767eb2017-07-20 12:06:14 -0700291 }
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700292 gpr_spinlock_unlock(&adding_thread_lock_);
Craig Tiller2f767eb2017-07-20 12:06:14 -0700293 }
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700294
Craig Tiller07d2fa72017-09-07 13:13:36 -0700295 if (retry_push) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800296 GRPC_STATS_INC_EXECUTOR_PUSH_RETRIES();
Craig Tiller07d2fa72017-09-07 13:13:36 -0700297 }
Craig Tiller2f767eb2017-07-20 12:06:14 -0700298 } while (retry_push);
David Garcia Quintas4bc34632015-10-07 16:12:35 -0700299}
Craig Tiller91031da2016-12-28 15:44:25 -0800300
Sree Kuchibhotla8aefdd32018-07-11 17:44:08 -0700301static GrpcExecutor* global_executor;
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700302
303void enqueue_long(grpc_closure* closure, grpc_error* error) {
Sree Kuchibhotla8aefdd32018-07-11 17:44:08 -0700304 global_executor->Enqueue(closure, error, false /* is_short */);
Craig Tiller7a82afd2017-07-18 09:40:40 -0700305}
306
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700307void enqueue_short(grpc_closure* closure, grpc_error* error) {
Sree Kuchibhotla8aefdd32018-07-11 17:44:08 -0700308 global_executor->Enqueue(closure, error, true /* is_short */);
Craig Tiller7a82afd2017-07-18 09:40:40 -0700309}
310
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700311// Short-Job executor scheduler
312static const grpc_closure_scheduler_vtable global_executor_vtable_short = {
313 enqueue_short, enqueue_short, "executor-short"};
314static grpc_closure_scheduler global_scheduler_short = {
315 &global_executor_vtable_short};
Craig Tiller7a82afd2017-07-18 09:40:40 -0700316
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700317// Long-job executor scheduler
318static const grpc_closure_scheduler_vtable global_executor_vtable_long = {
319 enqueue_long, enqueue_long, "executor-long"};
320static grpc_closure_scheduler global_scheduler_long = {
321 &global_executor_vtable_long};
322
Sree Kuchibhotla69e5dff2018-07-12 10:58:04 -0700323// grpc_executor_init() and grpc_executor_shutdown() functions are called in the
324// the grpc_init() and grpc_shutdown() code paths which are protected by a
325// global mutex. So it is okay to assume that these functions are thread-safe
Sree Kuchibhotla8aefdd32018-07-11 17:44:08 -0700326void grpc_executor_init() {
Sree Kuchibhotla69e5dff2018-07-12 10:58:04 -0700327 if (global_executor != nullptr) {
328 // grpc_executor_init() already called once (and grpc_executor_shutdown()
329 // wasn't called)
330 return;
331 }
332
Sree Kuchibhotla37e49902018-07-11 18:46:29 -0700333 global_executor = grpc_core::New<GrpcExecutor>("global-executor");
Sree Kuchibhotla8aefdd32018-07-11 17:44:08 -0700334 global_executor->Init();
335}
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700336
Sree Kuchibhotla8aefdd32018-07-11 17:44:08 -0700337void grpc_executor_shutdown() {
Sree Kuchibhotla69e5dff2018-07-12 10:58:04 -0700338 // Shutdown already called
339 if (global_executor == nullptr) {
340 return;
341 }
342
Sree Kuchibhotla8aefdd32018-07-11 17:44:08 -0700343 global_executor->Shutdown();
Sree Kuchibhotla37e49902018-07-11 18:46:29 -0700344 grpc_core::Delete<GrpcExecutor>(global_executor);
Sree Kuchibhotla8aefdd32018-07-11 17:44:08 -0700345 global_executor = nullptr;
346}
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700347
Sree Kuchibhotla8aefdd32018-07-11 17:44:08 -0700348bool grpc_executor_is_threaded() { return global_executor->IsThreaded(); }
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700349
350void grpc_executor_set_threading(bool enable) {
Sree Kuchibhotla8aefdd32018-07-11 17:44:08 -0700351 global_executor->SetThreading(enable);
Sree Kuchibhotla7e9d5252018-07-09 14:53:54 -0700352}
Craig Tiller7a82afd2017-07-18 09:40:40 -0700353
Sree Kuchibhotla1e69b7c2018-07-10 19:30:09 -0700354grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorJobType job_type) {
Sree Kuchibhotla83d0bfa2018-07-10 11:29:43 -0700355 return job_type == GRPC_EXECUTOR_SHORT ? &global_scheduler_short
356 : &global_scheduler_long;
Craig Tiller7a82afd2017-07-18 09:40:40 -0700357}