blob: 48d8eaec189c307c219054298d9d7637c21071d8 [file] [log] [blame]
Mark D. Roth764cf042017-09-01 09:00:06 -07001/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19#include "src/core/lib/iomgr/call_combiner.h"
20
21#include <grpc/support/log.h>
22
23grpc_tracer_flag grpc_call_combiner_trace =
24 GRPC_TRACER_INITIALIZER(false, "call_combiner");
25
26static grpc_error* decode_cancel_state_error(gpr_atm cancel_state) {
27 if (cancel_state & 1) {
28 return (grpc_error*)(cancel_state & ~(gpr_atm)1);
29 }
30 return GRPC_ERROR_NONE;
31}
32
33static gpr_atm encode_cancel_state_error(grpc_error* error) {
34 return (gpr_atm)1 | (gpr_atm)error;
35}
36
37void grpc_call_combiner_init(grpc_call_combiner* call_combiner) {
38 gpr_mpscq_init(&call_combiner->queue);
39}
40
41void grpc_call_combiner_destroy(grpc_call_combiner* call_combiner) {
42 gpr_mpscq_destroy(&call_combiner->queue);
43 GRPC_ERROR_UNREF(decode_cancel_state_error(call_combiner->cancel_state));
44}
45
46#ifndef NDEBUG
47#define DEBUG_ARGS , const char *file, int line
48#define DEBUG_FMT_STR "%s:%d: "
49#define DEBUG_FMT_ARGS , file, line
50#else
51#define DEBUG_ARGS
52#define DEBUG_FMT_STR
53#define DEBUG_FMT_ARGS
54#endif
55
56void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx,
57 grpc_call_combiner* call_combiner,
58 grpc_closure* closure,
59 grpc_error* error DEBUG_ARGS,
60 const char* reason) {
61 if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
62 gpr_log(GPR_DEBUG,
63 "==> grpc_call_combiner_start() [%p] closure=%p [" DEBUG_FMT_STR
64 "%s] error=%s",
65 call_combiner, closure DEBUG_FMT_ARGS, reason,
66 grpc_error_string(error));
67 }
68 size_t prev_size =
69 (size_t)gpr_atm_full_fetch_add(&call_combiner->size, (gpr_atm)1);
70 if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
71 gpr_log(GPR_DEBUG, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size,
72 prev_size + 1);
73 }
74 if (prev_size == 0) {
75 if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
76 gpr_log(GPR_DEBUG, " EXECUTING IMMEDIATELY");
77 }
78 // Queue was empty, so execute this closure immediately.
79 GRPC_CLOSURE_SCHED(exec_ctx, closure, error);
80 } else {
81 if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
82 gpr_log(GPR_INFO, " QUEUING");
83 }
84 // Queue was not empty, so add closure to queue.
85 closure->error_data.error = error;
86 gpr_mpscq_push(&call_combiner->queue, (gpr_mpscq_node*)closure);
87 }
88}
89
90void grpc_call_combiner_stop(grpc_exec_ctx* exec_ctx,
91 grpc_call_combiner* call_combiner DEBUG_ARGS,
92 const char* reason) {
93 if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
94 gpr_log(GPR_DEBUG,
95 "==> grpc_call_combiner_stop() [%p] [" DEBUG_FMT_STR "%s]",
96 call_combiner DEBUG_FMT_ARGS, reason);
97 }
98 size_t prev_size =
99 (size_t)gpr_atm_full_fetch_add(&call_combiner->size, (gpr_atm)-1);
100 if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
101 gpr_log(GPR_DEBUG, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size,
102 prev_size - 1);
103 }
104 GPR_ASSERT(prev_size >= 1);
105 if (prev_size > 1) {
106 while (true) {
107 if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
108 gpr_log(GPR_DEBUG, " checking queue");
109 }
110 bool empty;
111 grpc_closure* closure = (grpc_closure*)gpr_mpscq_pop_and_check_end(
112 &call_combiner->queue, &empty);
113 if (closure == NULL) {
114 // This can happen either due to a race condition within the mpscq
115 // code or because of a race with grpc_call_combiner_start().
116 if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
117 gpr_log(GPR_DEBUG, " queue returned no result; checking again");
118 }
119 continue;
120 }
121 if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
122 gpr_log(GPR_DEBUG, " EXECUTING FROM QUEUE: closure=%p error=%s",
123 closure, grpc_error_string(closure->error_data.error));
124 }
125 GRPC_CLOSURE_SCHED(exec_ctx, closure, closure->error_data.error);
126 break;
127 }
128 } else if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
129 gpr_log(GPR_DEBUG, " queue empty");
130 }
131}
132
133void grpc_call_combiner_set_notify_on_cancel(grpc_exec_ctx* exec_ctx,
134 grpc_call_combiner* call_combiner,
135 grpc_closure* closure) {
136 while (true) {
137 // Decode original state.
138 gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state);
139 grpc_error* original_error = decode_cancel_state_error(original_state);
140 // If error is set, invoke the cancellation closure immediately.
141 // Otherwise, store the new closure.
142 if (original_error != GRPC_ERROR_NONE) {
Mark D. Roth66f3d2b2017-09-01 09:02:17 -0700143 if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
144 gpr_log(GPR_DEBUG,
145 "call_combiner=%p: scheduling notify_on_cancel callback=%p "
146 "for pre-existing cancellation",
147 call_combiner, closure);
148 }
Mark D. Roth764cf042017-09-01 09:00:06 -0700149 GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_REF(original_error));
150 break;
151 } else {
152 if (gpr_atm_full_cas(&call_combiner->cancel_state, original_state,
153 (gpr_atm)closure)) {
Mark D. Roth66f3d2b2017-09-01 09:02:17 -0700154 if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
155 gpr_log(GPR_DEBUG, "call_combiner=%p: setting notify_on_cancel=%p",
156 call_combiner, closure);
157 }
158 // If we replaced an earlier closure, invoke the original
159 // closure with GRPC_ERROR_NONE. This allows callers to clean
160 // up any resources they may be holding for the callback.
161 if (original_state != 0) {
162 closure = (grpc_closure*)original_state;
163 if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
164 gpr_log(GPR_DEBUG,
165 "call_combiner=%p: scheduling old cancel callback=%p",
166 call_combiner, closure);
167 }
168 GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE);
169 }
Mark D. Roth764cf042017-09-01 09:00:06 -0700170 break;
171 }
172 }
173 // cas failed, try again.
174 }
175}
176
177void grpc_call_combiner_cancel(grpc_exec_ctx* exec_ctx,
178 grpc_call_combiner* call_combiner,
179 grpc_error* error) {
180 while (true) {
181 gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state);
182 grpc_error* original_error = decode_cancel_state_error(original_state);
183 if (original_error != GRPC_ERROR_NONE) {
184 GRPC_ERROR_UNREF(error);
185 break;
186 }
187 if (gpr_atm_full_cas(&call_combiner->cancel_state, original_state,
188 encode_cancel_state_error(error))) {
189 if (original_state != 0) {
190 grpc_closure* notify_on_cancel = (grpc_closure*)original_state;
191 if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
192 gpr_log(GPR_DEBUG,
193 "call_combiner=%p: scheduling notify_on_cancel callback=%p",
194 call_combiner, notify_on_cancel);
195 }
196 GRPC_CLOSURE_SCHED(exec_ctx, notify_on_cancel, GRPC_ERROR_REF(error));
197 }
198 break;
199 }
200 // cas failed, try again.
201 }
202}