blob: 513248ca57b105641509d3b5d9c5a3dcc53a6837 [file] [log] [blame]
David Garcia Quintas4bc34632015-10-07 16:12:35 -07001/*
2 *
Craig Tiller6169d5f2016-03-31 07:46:18 -07003 * Copyright 2015, Google Inc.
David Garcia Quintas4bc34632015-10-07 16:12:35 -07004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Craig Tiller9533d042016-03-25 17:11:06 -070034#include "src/core/lib/iomgr/executor.h"
David Garcia Quintas4bc34632015-10-07 16:12:35 -070035
36#include <string.h>
37
38#include <grpc/support/alloc.h>
Craig Tiller3e9f98e2017-05-12 13:17:47 -070039#include <grpc/support/cpu.h>
David Garcia Quintas4bc34632015-10-07 16:12:35 -070040#include <grpc/support/log.h>
41#include <grpc/support/sync.h>
42#include <grpc/support/thd.h>
Craig Tiller3e9f98e2017-05-12 13:17:47 -070043#include <grpc/support/tls.h>
44#include <grpc/support/useful.h>
45
Craig Tiller9533d042016-03-25 17:11:06 -070046#include "src/core/lib/iomgr/exec_ctx.h"
Craig Tiller3e9f98e2017-05-12 13:17:47 -070047#include "src/core/lib/support/spinlock.h"
David Garcia Quintas4bc34632015-10-07 16:12:35 -070048
Craig Tiller3e9f98e2017-05-12 13:17:47 -070049#define MAX_DEPTH 32
50
51typedef struct {
David Garcia Quintas4bc34632015-10-07 16:12:35 -070052 gpr_mu mu;
Craig Tiller3e9f98e2017-05-12 13:17:47 -070053 gpr_cv cv;
54 grpc_closure_list elems;
55 size_t depth;
56 bool shutdown;
57 gpr_thd_id id;
58} thread_state;
David Garcia Quintas4bc34632015-10-07 16:12:35 -070059
Craig Tiller3e9f98e2017-05-12 13:17:47 -070060static thread_state *g_thread_state;
61static size_t g_max_threads;
62static gpr_atm g_cur_threads;
63static gpr_spinlock g_adding_thread_lock = GPR_SPINLOCK_STATIC_INITIALIZER;
64
65GPR_TLS_DECL(g_this_thread_state);
66
67static void executor_thread(void *arg);
David Garcia Quintas4bc34632015-10-07 16:12:35 -070068
Craig Tiller3e9f98e2017-05-12 13:17:47 -070069static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) {
70 size_t n = 0;
David Garcia Quintas4bc34632015-10-07 16:12:35 -070071
Craig Tiller3e9f98e2017-05-12 13:17:47 -070072 grpc_closure *c = list.head;
Craig Tiller061ef742016-12-29 10:54:09 -080073 while (c != NULL) {
74 grpc_closure *next = c->next_data.next;
75 grpc_error *error = c->error_data.error;
Mark D. Roth43f774e2017-04-04 16:35:37 -070076#ifndef NDEBUG
Craig Tillerb9b01ce2017-05-12 13:47:10 -070077 c->scheduled = false;
Mark D. Roth43f774e2017-04-04 16:35:37 -070078#endif
Craig Tiller0b093412017-01-03 09:49:07 -080079 c->cb(exec_ctx, c->cb_arg, error);
Craig Tiller061ef742016-12-29 10:54:09 -080080 GRPC_ERROR_UNREF(error);
81 c = next;
82 }
Craig Tiller3e9f98e2017-05-12 13:17:47 -070083
84 return n;
85}
86
Craig Tiller5e56f002017-05-16 15:02:50 -070087bool grpc_executor_is_threaded() {
88 return gpr_atm_no_barrier_load(&g_cur_threads) > 0;
89}
90
91void grpc_executor_set_threading(grpc_exec_ctx *exec_ctx, bool threading) {
92 gpr_atm cur_threads = gpr_atm_no_barrier_load(&g_cur_threads);
93 if (threading) {
94 if (cur_threads > 0) return;
95 g_max_threads = GPR_MAX(1, 2 * gpr_cpu_num_cores());
96 gpr_atm_no_barrier_store(&g_cur_threads, 1);
97 gpr_tls_init(&g_this_thread_state);
98 g_thread_state = gpr_zalloc(sizeof(thread_state) * g_max_threads);
99 for (size_t i = 0; i < g_max_threads; i++) {
100 gpr_mu_init(&g_thread_state[i].mu);
101 gpr_cv_init(&g_thread_state[i].cv);
102 g_thread_state[i].elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
103 }
104
105 gpr_thd_options opt = gpr_thd_options_default();
106 gpr_thd_options_set_joinable(&opt);
107 gpr_thd_new(&g_thread_state[0].id, executor_thread, &g_thread_state[0],
108 &opt);
109 } else {
110 if (cur_threads == 0) return;
111 for (size_t i = 0; i < g_max_threads; i++) {
112 gpr_mu_lock(&g_thread_state[i].mu);
113 g_thread_state[i].shutdown = true;
114 gpr_cv_signal(&g_thread_state[i].cv);
115 gpr_mu_unlock(&g_thread_state[i].mu);
116 }
117 for (gpr_atm i = 0; i < g_cur_threads; i++) {
118 gpr_thd_join(g_thread_state[i].id);
119 }
120 gpr_atm_no_barrier_store(&g_cur_threads, 0);
121 for (size_t i = 0; i < g_max_threads; i++) {
122 gpr_mu_destroy(&g_thread_state[i].mu);
123 gpr_cv_destroy(&g_thread_state[i].cv);
124 run_closures(exec_ctx, g_thread_state[i].elems);
125 }
126 gpr_free(g_thread_state);
127 gpr_tls_destroy(&g_this_thread_state);
David Garcia Quintas4bc34632015-10-07 16:12:35 -0700128 }
Craig Tiller5e56f002017-05-16 15:02:50 -0700129}
130
131void grpc_executor_init(grpc_exec_ctx *exec_ctx) {
Craig Tillerb9b01ce2017-05-12 13:47:10 -0700132 gpr_atm_no_barrier_store(&g_cur_threads, 0);
Craig Tiller5e56f002017-05-16 15:02:50 -0700133 grpc_executor_set_threading(exec_ctx, true);
134}
135
136void grpc_executor_shutdown(grpc_exec_ctx *exec_ctx) {
137 grpc_executor_set_threading(exec_ctx, false);
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700138}
139
140static void executor_thread(void *arg) {
141 thread_state *ts = arg;
142 gpr_tls_set(&g_this_thread_state, (intptr_t)ts);
143
Craig Tiller89962082017-05-12 14:30:42 -0700144 grpc_exec_ctx exec_ctx =
145 GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL);
146
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700147 size_t subtract_depth = 0;
148 for (;;) {
149 gpr_mu_lock(&ts->mu);
150 ts->depth -= subtract_depth;
151 while (grpc_closure_list_empty(ts->elems) && !ts->shutdown) {
152 gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_REALTIME));
153 }
154 if (ts->shutdown) {
155 gpr_mu_unlock(&ts->mu);
156 break;
157 }
158 grpc_closure_list exec = ts->elems;
159 ts->elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
160 gpr_mu_unlock(&ts->mu);
161
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700162 subtract_depth = run_closures(&exec_ctx, exec);
Craig Tiller89962082017-05-12 14:30:42 -0700163 grpc_exec_ctx_flush(&exec_ctx);
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700164 }
Craig Tiller89962082017-05-12 14:30:42 -0700165 grpc_exec_ctx_finish(&exec_ctx);
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700166}
167
168static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
169 grpc_error *error) {
Craig Tiller61f96c12017-05-12 13:36:39 -0700170 size_t cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads);
Craig Tillerb9b01ce2017-05-12 13:47:10 -0700171 if (cur_thread_count == 0) {
172 grpc_closure_list_append(&exec_ctx->closure_list, closure, error);
173 return;
174 }
175 thread_state *ts = (thread_state *)gpr_tls_get(&g_this_thread_state);
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700176 if (ts == NULL) {
Craig Tiller61f96c12017-05-12 13:36:39 -0700177 ts = &g_thread_state[GPR_HASH_POINTER(exec_ctx, cur_thread_count)];
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700178 }
179 gpr_mu_lock(&ts->mu);
Craig Tiller89962082017-05-12 14:30:42 -0700180 if (grpc_closure_list_empty(ts->elems)) {
181 gpr_cv_signal(&ts->cv);
182 }
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700183 grpc_closure_list_append(&ts->elems, closure, error);
184 ts->depth++;
185 bool try_new_thread =
186 ts->depth > MAX_DEPTH && cur_thread_count < g_max_threads;
187 gpr_mu_unlock(&ts->mu);
188 if (try_new_thread && gpr_spinlock_trylock(&g_adding_thread_lock)) {
Craig Tiller61f96c12017-05-12 13:36:39 -0700189 cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads);
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700190 if (cur_thread_count < g_max_threads) {
191 gpr_atm_no_barrier_store(&g_cur_threads, cur_thread_count + 1);
192
193 gpr_thd_options opt = gpr_thd_options_default();
194 gpr_thd_options_set_joinable(&opt);
195 gpr_thd_new(&g_thread_state[cur_thread_count].id, executor_thread,
196 &g_thread_state[cur_thread_count], &opt);
197 }
198 gpr_spinlock_unlock(&g_adding_thread_lock);
199 }
David Garcia Quintas4bc34632015-10-07 16:12:35 -0700200}
Craig Tiller91031da2016-12-28 15:44:25 -0800201
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800202static const grpc_closure_scheduler_vtable executor_vtable = {
203 executor_push, executor_push, "executor"};
Craig Tiller91031da2016-12-28 15:44:25 -0800204static grpc_closure_scheduler executor_scheduler = {&executor_vtable};
205grpc_closure_scheduler *grpc_executor_scheduler = &executor_scheduler;