blob: 2a2544dc1f6fb6324e8e1715447ea07d3328d3c2 [file] [log] [blame]
David Garcia Quintas4bc34632015-10-07 16:12:35 -07001/*
2 *
Craig Tiller6169d5f2016-03-31 07:46:18 -07003 * Copyright 2015, Google Inc.
David Garcia Quintas4bc34632015-10-07 16:12:35 -07004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Craig Tiller9533d042016-03-25 17:11:06 -070034#include "src/core/lib/iomgr/executor.h"
David Garcia Quintas4bc34632015-10-07 16:12:35 -070035
36#include <string.h>
37
38#include <grpc/support/alloc.h>
Craig Tiller3e9f98e2017-05-12 13:17:47 -070039#include <grpc/support/cpu.h>
David Garcia Quintas4bc34632015-10-07 16:12:35 -070040#include <grpc/support/log.h>
41#include <grpc/support/sync.h>
42#include <grpc/support/thd.h>
Craig Tiller3e9f98e2017-05-12 13:17:47 -070043#include <grpc/support/tls.h>
44#include <grpc/support/useful.h>
45
Craig Tiller9533d042016-03-25 17:11:06 -070046#include "src/core/lib/iomgr/exec_ctx.h"
Craig Tiller3e9f98e2017-05-12 13:17:47 -070047#include "src/core/lib/support/spinlock.h"
David Garcia Quintas4bc34632015-10-07 16:12:35 -070048
Craig Tiller3e9f98e2017-05-12 13:17:47 -070049#define MAX_DEPTH 32
50
51typedef struct {
David Garcia Quintas4bc34632015-10-07 16:12:35 -070052 gpr_mu mu;
Craig Tiller3e9f98e2017-05-12 13:17:47 -070053 gpr_cv cv;
54 grpc_closure_list elems;
55 size_t depth;
56 bool shutdown;
57 gpr_thd_id id;
58} thread_state;
David Garcia Quintas4bc34632015-10-07 16:12:35 -070059
Craig Tiller3e9f98e2017-05-12 13:17:47 -070060static thread_state *g_thread_state;
61static size_t g_max_threads;
62static gpr_atm g_cur_threads;
63static gpr_spinlock g_adding_thread_lock = GPR_SPINLOCK_STATIC_INITIALIZER;
64
65GPR_TLS_DECL(g_this_thread_state);
66
67static void executor_thread(void *arg);
David Garcia Quintas4bc34632015-10-07 16:12:35 -070068
69void grpc_executor_init() {
Craig Tiller3e9f98e2017-05-12 13:17:47 -070070 g_max_threads = GPR_MAX(1, 2 * gpr_cpu_num_cores());
71 gpr_atm_no_barrier_store(&g_cur_threads, 1);
72 gpr_tls_init(&g_this_thread_state);
73 g_thread_state = gpr_zalloc(sizeof(thread_state) * g_max_threads);
74 for (size_t i = 0; i < g_max_threads; i++) {
75 gpr_mu_init(&g_thread_state[i].mu);
76 gpr_cv_init(&g_thread_state[i].cv);
77 g_thread_state[i].elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
78 }
79
80 gpr_thd_options opt = gpr_thd_options_default();
81 gpr_thd_options_set_joinable(&opt);
82 gpr_thd_new(&g_thread_state[0].id, executor_thread, &g_thread_state[0], &opt);
David Garcia Quintas4bc34632015-10-07 16:12:35 -070083}
84
Craig Tiller3e9f98e2017-05-12 13:17:47 -070085static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) {
86 size_t n = 0;
David Garcia Quintas4bc34632015-10-07 16:12:35 -070087
Craig Tiller3e9f98e2017-05-12 13:17:47 -070088 grpc_closure *c = list.head;
Craig Tiller061ef742016-12-29 10:54:09 -080089 while (c != NULL) {
90 grpc_closure *next = c->next_data.next;
91 grpc_error *error = c->error_data.error;
Mark D. Roth43f774e2017-04-04 16:35:37 -070092#ifndef NDEBUG
Craig Tillerb9b01ce2017-05-12 13:47:10 -070093 c->scheduled = false;
Mark D. Roth43f774e2017-04-04 16:35:37 -070094#endif
Craig Tiller0b093412017-01-03 09:49:07 -080095 c->cb(exec_ctx, c->cb_arg, error);
Craig Tiller061ef742016-12-29 10:54:09 -080096 GRPC_ERROR_UNREF(error);
97 c = next;
98 }
Craig Tiller3e9f98e2017-05-12 13:17:47 -070099
100 return n;
101}
102
103void grpc_executor_shutdown(grpc_exec_ctx *exec_ctx) {
104 for (size_t i = 0; i < g_max_threads; i++) {
105 gpr_mu_lock(&g_thread_state[i].mu);
106 g_thread_state[i].shutdown = true;
107 gpr_cv_signal(&g_thread_state[i].cv);
108 gpr_mu_unlock(&g_thread_state[i].mu);
David Garcia Quintas4bc34632015-10-07 16:12:35 -0700109 }
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700110 for (gpr_atm i = 0; i < g_cur_threads; i++) {
111 gpr_thd_join(g_thread_state[i].id);
112 }
Craig Tillerb9b01ce2017-05-12 13:47:10 -0700113 gpr_atm_no_barrier_store(&g_cur_threads, 0);
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700114 for (size_t i = 0; i < g_max_threads; i++) {
115 gpr_mu_destroy(&g_thread_state[i].mu);
116 gpr_cv_destroy(&g_thread_state[i].cv);
117 run_closures(exec_ctx, g_thread_state[i].elems);
118 }
119 gpr_free(g_thread_state);
120 gpr_tls_destroy(&g_this_thread_state);
121}
122
123static void executor_thread(void *arg) {
124 thread_state *ts = arg;
125 gpr_tls_set(&g_this_thread_state, (intptr_t)ts);
126
Craig Tiller89962082017-05-12 14:30:42 -0700127 grpc_exec_ctx exec_ctx =
128 GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL);
129
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700130 size_t subtract_depth = 0;
131 for (;;) {
132 gpr_mu_lock(&ts->mu);
133 ts->depth -= subtract_depth;
134 while (grpc_closure_list_empty(ts->elems) && !ts->shutdown) {
135 gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_REALTIME));
136 }
137 if (ts->shutdown) {
138 gpr_mu_unlock(&ts->mu);
139 break;
140 }
141 grpc_closure_list exec = ts->elems;
142 ts->elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
143 gpr_mu_unlock(&ts->mu);
144
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700145 subtract_depth = run_closures(&exec_ctx, exec);
Craig Tiller89962082017-05-12 14:30:42 -0700146 grpc_exec_ctx_flush(&exec_ctx);
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700147 }
Craig Tiller89962082017-05-12 14:30:42 -0700148 grpc_exec_ctx_finish(&exec_ctx);
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700149}
150
151static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
152 grpc_error *error) {
Craig Tiller61f96c12017-05-12 13:36:39 -0700153 size_t cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads);
Craig Tillerb9b01ce2017-05-12 13:47:10 -0700154 if (cur_thread_count == 0) {
155 grpc_closure_list_append(&exec_ctx->closure_list, closure, error);
156 return;
157 }
158 thread_state *ts = (thread_state *)gpr_tls_get(&g_this_thread_state);
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700159 if (ts == NULL) {
Craig Tiller61f96c12017-05-12 13:36:39 -0700160 ts = &g_thread_state[GPR_HASH_POINTER(exec_ctx, cur_thread_count)];
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700161 }
162 gpr_mu_lock(&ts->mu);
Craig Tiller89962082017-05-12 14:30:42 -0700163 if (grpc_closure_list_empty(ts->elems)) {
164 gpr_cv_signal(&ts->cv);
165 }
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700166 grpc_closure_list_append(&ts->elems, closure, error);
167 ts->depth++;
168 bool try_new_thread =
169 ts->depth > MAX_DEPTH && cur_thread_count < g_max_threads;
170 gpr_mu_unlock(&ts->mu);
171 if (try_new_thread && gpr_spinlock_trylock(&g_adding_thread_lock)) {
Craig Tiller61f96c12017-05-12 13:36:39 -0700172 cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads);
Craig Tiller3e9f98e2017-05-12 13:17:47 -0700173 if (cur_thread_count < g_max_threads) {
174 gpr_atm_no_barrier_store(&g_cur_threads, cur_thread_count + 1);
175
176 gpr_thd_options opt = gpr_thd_options_default();
177 gpr_thd_options_set_joinable(&opt);
178 gpr_thd_new(&g_thread_state[cur_thread_count].id, executor_thread,
179 &g_thread_state[cur_thread_count], &opt);
180 }
181 gpr_spinlock_unlock(&g_adding_thread_lock);
182 }
David Garcia Quintas4bc34632015-10-07 16:12:35 -0700183}
Craig Tiller91031da2016-12-28 15:44:25 -0800184
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800185static const grpc_closure_scheduler_vtable executor_vtable = {
186 executor_push, executor_push, "executor"};
Craig Tiller91031da2016-12-28 15:44:25 -0800187static grpc_closure_scheduler executor_scheduler = {&executor_vtable};
188grpc_closure_scheduler *grpc_executor_scheduler = &executor_scheduler;