blob: 00a839b64c6eb9c610b40962f71df3c610b019c4 [file] [log] [blame]
Mark D. Roth764cf042017-09-01 09:00:06 -07001/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
Alexander Polcyndb3e8982018-02-21 16:59:24 -080019#include <grpc/support/port_platform.h>
20
Mark D. Roth764cf042017-09-01 09:00:06 -070021#include "src/core/lib/iomgr/call_combiner.h"
22
Yash Tibrewalfcd26bc2017-09-25 15:08:28 -070023#include <inttypes.h>
24
Mark D. Roth764cf042017-09-01 09:00:06 -070025#include <grpc/support/log.h>
Craig Tillerfa7ae242017-10-12 09:37:57 -070026#include "src/core/lib/debug/stats.h"
Craig Tillerf6086b32017-10-12 22:25:10 -070027#include "src/core/lib/profiling/timers.h"
Mark D. Roth764cf042017-09-01 09:00:06 -070028
Craig Tiller694580f2017-10-18 14:48:14 -070029grpc_core::TraceFlag grpc_call_combiner_trace(false, "call_combiner");
Mark D. Roth764cf042017-09-01 09:00:06 -070030
31static grpc_error* decode_cancel_state_error(gpr_atm cancel_state) {
32 if (cancel_state & 1) {
Noah Eisenbe82e642018-02-09 09:16:55 -080033 return (grpc_error*)(cancel_state & ~static_cast<gpr_atm>(1));
Mark D. Roth764cf042017-09-01 09:00:06 -070034 }
35 return GRPC_ERROR_NONE;
36}
37
38static gpr_atm encode_cancel_state_error(grpc_error* error) {
Noah Eisenbe82e642018-02-09 09:16:55 -080039 return static_cast<gpr_atm>(1) | (gpr_atm)error;
Mark D. Roth764cf042017-09-01 09:00:06 -070040}
41
42void grpc_call_combiner_init(grpc_call_combiner* call_combiner) {
43 gpr_mpscq_init(&call_combiner->queue);
44}
45
46void grpc_call_combiner_destroy(grpc_call_combiner* call_combiner) {
47 gpr_mpscq_destroy(&call_combiner->queue);
48 GRPC_ERROR_UNREF(decode_cancel_state_error(call_combiner->cancel_state));
49}
50
51#ifndef NDEBUG
52#define DEBUG_ARGS , const char *file, int line
53#define DEBUG_FMT_STR "%s:%d: "
54#define DEBUG_FMT_ARGS , file, line
55#else
56#define DEBUG_ARGS
57#define DEBUG_FMT_STR
58#define DEBUG_FMT_ARGS
59#endif
60
Yash Tibrewal8cf14702017-12-06 09:47:54 -080061void grpc_call_combiner_start(grpc_call_combiner* call_combiner,
Mark D. Roth764cf042017-09-01 09:00:06 -070062 grpc_closure* closure,
63 grpc_error* error DEBUG_ARGS,
64 const char* reason) {
yang-gce1cfea2018-01-31 15:59:50 -080065 GPR_TIMER_SCOPE("call_combiner_start", 0);
Craig Tiller6014e8a2017-10-16 13:50:29 -070066 if (grpc_call_combiner_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -070067 gpr_log(GPR_INFO,
Mark D. Roth764cf042017-09-01 09:00:06 -070068 "==> grpc_call_combiner_start() [%p] closure=%p [" DEBUG_FMT_STR
69 "%s] error=%s",
70 call_combiner, closure DEBUG_FMT_ARGS, reason,
71 grpc_error_string(error));
72 }
Noah Eisen4d20a662018-02-09 09:34:04 -080073 size_t prev_size = static_cast<size_t>(
74 gpr_atm_full_fetch_add(&call_combiner->size, (gpr_atm)1));
Craig Tiller6014e8a2017-10-16 13:50:29 -070075 if (grpc_call_combiner_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -070076 gpr_log(GPR_INFO, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size,
Mark D. Roth764cf042017-09-01 09:00:06 -070077 prev_size + 1);
78 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -080079 GRPC_STATS_INC_CALL_COMBINER_LOCKS_SCHEDULED_ITEMS();
Mark D. Roth764cf042017-09-01 09:00:06 -070080 if (prev_size == 0) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -080081 GRPC_STATS_INC_CALL_COMBINER_LOCKS_INITIATED();
82
Craig Tillerf6086b32017-10-12 22:25:10 -070083 GPR_TIMER_MARK("call_combiner_initiate", 0);
Craig Tiller6014e8a2017-10-16 13:50:29 -070084 if (grpc_call_combiner_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -070085 gpr_log(GPR_INFO, " EXECUTING IMMEDIATELY");
Mark D. Roth764cf042017-09-01 09:00:06 -070086 }
87 // Queue was empty, so execute this closure immediately.
Yash Tibrewal8cf14702017-12-06 09:47:54 -080088 GRPC_CLOSURE_SCHED(closure, error);
Mark D. Roth764cf042017-09-01 09:00:06 -070089 } else {
Craig Tiller6014e8a2017-10-16 13:50:29 -070090 if (grpc_call_combiner_trace.enabled()) {
Mark D. Roth764cf042017-09-01 09:00:06 -070091 gpr_log(GPR_INFO, " QUEUING");
92 }
93 // Queue was not empty, so add closure to queue.
94 closure->error_data.error = error;
Noah Eisen4d20a662018-02-09 09:34:04 -080095 gpr_mpscq_push(&call_combiner->queue,
96 reinterpret_cast<gpr_mpscq_node*>(closure));
Mark D. Roth764cf042017-09-01 09:00:06 -070097 }
98}
99
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800100void grpc_call_combiner_stop(grpc_call_combiner* call_combiner DEBUG_ARGS,
Mark D. Roth764cf042017-09-01 09:00:06 -0700101 const char* reason) {
yang-gce1cfea2018-01-31 15:59:50 -0800102 GPR_TIMER_SCOPE("call_combiner_stop", 0);
Craig Tiller6014e8a2017-10-16 13:50:29 -0700103 if (grpc_call_combiner_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -0700104 gpr_log(GPR_INFO,
Mark D. Roth764cf042017-09-01 09:00:06 -0700105 "==> grpc_call_combiner_stop() [%p] [" DEBUG_FMT_STR "%s]",
106 call_combiner DEBUG_FMT_ARGS, reason);
107 }
Noah Eisen4d20a662018-02-09 09:34:04 -0800108 size_t prev_size = static_cast<size_t>(
109 gpr_atm_full_fetch_add(&call_combiner->size, (gpr_atm)-1));
Craig Tiller6014e8a2017-10-16 13:50:29 -0700110 if (grpc_call_combiner_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -0700111 gpr_log(GPR_INFO, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size,
Mark D. Roth764cf042017-09-01 09:00:06 -0700112 prev_size - 1);
113 }
114 GPR_ASSERT(prev_size >= 1);
115 if (prev_size > 1) {
116 while (true) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700117 if (grpc_call_combiner_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -0700118 gpr_log(GPR_INFO, " checking queue");
Mark D. Roth764cf042017-09-01 09:00:06 -0700119 }
120 bool empty;
Noah Eisen4d20a662018-02-09 09:34:04 -0800121 grpc_closure* closure = reinterpret_cast<grpc_closure*>(
122 gpr_mpscq_pop_and_check_end(&call_combiner->queue, &empty));
Craig Tiller4782d922017-11-10 09:53:21 -0800123 if (closure == nullptr) {
Mark D. Roth764cf042017-09-01 09:00:06 -0700124 // This can happen either due to a race condition within the mpscq
125 // code or because of a race with grpc_call_combiner_start().
Craig Tiller6014e8a2017-10-16 13:50:29 -0700126 if (grpc_call_combiner_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -0700127 gpr_log(GPR_INFO, " queue returned no result; checking again");
Mark D. Roth764cf042017-09-01 09:00:06 -0700128 }
129 continue;
130 }
Craig Tiller6014e8a2017-10-16 13:50:29 -0700131 if (grpc_call_combiner_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -0700132 gpr_log(GPR_INFO, " EXECUTING FROM QUEUE: closure=%p error=%s",
Mark D. Roth764cf042017-09-01 09:00:06 -0700133 closure, grpc_error_string(closure->error_data.error));
134 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800135 GRPC_CLOSURE_SCHED(closure, closure->error_data.error);
Mark D. Roth764cf042017-09-01 09:00:06 -0700136 break;
137 }
Craig Tiller6014e8a2017-10-16 13:50:29 -0700138 } else if (grpc_call_combiner_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -0700139 gpr_log(GPR_INFO, " queue empty");
Mark D. Roth764cf042017-09-01 09:00:06 -0700140 }
141}
142
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800143void grpc_call_combiner_set_notify_on_cancel(grpc_call_combiner* call_combiner,
Mark D. Roth764cf042017-09-01 09:00:06 -0700144 grpc_closure* closure) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800145 GRPC_STATS_INC_CALL_COMBINER_SET_NOTIFY_ON_CANCEL();
Mark D. Roth764cf042017-09-01 09:00:06 -0700146 while (true) {
147 // Decode original state.
148 gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state);
149 grpc_error* original_error = decode_cancel_state_error(original_state);
150 // If error is set, invoke the cancellation closure immediately.
151 // Otherwise, store the new closure.
152 if (original_error != GRPC_ERROR_NONE) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700153 if (grpc_call_combiner_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -0700154 gpr_log(GPR_INFO,
Mark D. Roth66f3d2b2017-09-01 09:02:17 -0700155 "call_combiner=%p: scheduling notify_on_cancel callback=%p "
156 "for pre-existing cancellation",
157 call_combiner, closure);
158 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800159 GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_REF(original_error));
Mark D. Roth764cf042017-09-01 09:00:06 -0700160 break;
161 } else {
162 if (gpr_atm_full_cas(&call_combiner->cancel_state, original_state,
163 (gpr_atm)closure)) {
Craig Tiller6014e8a2017-10-16 13:50:29 -0700164 if (grpc_call_combiner_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -0700165 gpr_log(GPR_INFO, "call_combiner=%p: setting notify_on_cancel=%p",
Mark D. Roth66f3d2b2017-09-01 09:02:17 -0700166 call_combiner, closure);
167 }
168 // If we replaced an earlier closure, invoke the original
169 // closure with GRPC_ERROR_NONE. This allows callers to clean
170 // up any resources they may be holding for the callback.
171 if (original_state != 0) {
172 closure = (grpc_closure*)original_state;
Craig Tiller6014e8a2017-10-16 13:50:29 -0700173 if (grpc_call_combiner_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -0700174 gpr_log(GPR_INFO,
Mark D. Roth66f3d2b2017-09-01 09:02:17 -0700175 "call_combiner=%p: scheduling old cancel callback=%p",
176 call_combiner, closure);
177 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800178 GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
Mark D. Roth66f3d2b2017-09-01 09:02:17 -0700179 }
Mark D. Roth764cf042017-09-01 09:00:06 -0700180 break;
181 }
182 }
183 // cas failed, try again.
184 }
185}
186
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800187void grpc_call_combiner_cancel(grpc_call_combiner* call_combiner,
Mark D. Roth764cf042017-09-01 09:00:06 -0700188 grpc_error* error) {
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800189 GRPC_STATS_INC_CALL_COMBINER_CANCELLED();
Mark D. Roth764cf042017-09-01 09:00:06 -0700190 while (true) {
191 gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state);
192 grpc_error* original_error = decode_cancel_state_error(original_state);
193 if (original_error != GRPC_ERROR_NONE) {
194 GRPC_ERROR_UNREF(error);
195 break;
196 }
197 if (gpr_atm_full_cas(&call_combiner->cancel_state, original_state,
198 encode_cancel_state_error(error))) {
199 if (original_state != 0) {
200 grpc_closure* notify_on_cancel = (grpc_closure*)original_state;
Craig Tiller6014e8a2017-10-16 13:50:29 -0700201 if (grpc_call_combiner_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -0700202 gpr_log(GPR_INFO,
Mark D. Roth764cf042017-09-01 09:00:06 -0700203 "call_combiner=%p: scheduling notify_on_cancel callback=%p",
204 call_combiner, notify_on_cancel);
205 }
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800206 GRPC_CLOSURE_SCHED(notify_on_cancel, GRPC_ERROR_REF(error));
Mark D. Roth764cf042017-09-01 09:00:06 -0700207 }
208 break;
209 }
210 // cas failed, try again.
211 }
212}