blob: 9dff1c4d636c75658340a7cd0a4c7500fcdbeaab [file] [log] [blame]
Mark D. Roth14c072c2016-08-26 08:31:34 -07001//
2// Copyright 2016, Google Inc.
3// All rights reserved.
4//
5// Redistribution and use in source and binary forms, with or without
6// modification, are permitted provided that the following conditions are
7// met:
8//
9// * Redistributions of source code must retain the above copyright
10// notice, this list of conditions and the following disclaimer.
11// * Redistributions in binary form must reproduce the above
12// copyright notice, this list of conditions and the following disclaimer
13// in the documentation and/or other materials provided with the
14// distribution.
15// * Neither the name of Google Inc. nor the names of its
16// contributors may be used to endorse or promote products derived from
17// this software without specific prior written permission.
18//
19// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
20// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
21// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
22// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
23// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
26// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
27// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
28// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30//
31
32#include "src/core/lib/channel/deadline_filter.h"
33
34#include <stdbool.h>
35#include <string.h>
36
37#include <grpc/support/log.h>
Mark D. Roth1bbe6cb2016-08-31 08:33:37 -070038#include <grpc/support/sync.h>
Mark D. Roth14c072c2016-08-26 08:31:34 -070039#include <grpc/support/time.h>
40
41#include "src/core/lib/iomgr/timer.h"
42
43// Used for both client and server filters.
44typedef struct channel_data {
45} channel_data;
46
47// Call data used for both client and server filter.
48typedef struct base_call_data {
Mark D. Rothd2b45332016-08-26 11:18:00 -070049 // We take a reference to the call stack for the timer callback.
Mark D. Roth14c072c2016-08-26 08:31:34 -070050 grpc_call_stack* call_stack;
Mark D. Roth1bbe6cb2016-08-31 08:33:37 -070051 // Guards access to timer_pending and timer.
52 gpr_mu timer_mu;
Mark D. Rothd2b45332016-08-26 11:18:00 -070053 // True if the timer callback is currently pending.
Mark D. Roth14c072c2016-08-26 08:31:34 -070054 bool timer_pending;
Mark D. Rothd2b45332016-08-26 11:18:00 -070055 // The deadline timer.
Mark D. Roth14c072c2016-08-26 08:31:34 -070056 grpc_timer timer;
Mark D. Rothd2b45332016-08-26 11:18:00 -070057 // Closure to invoke when the call is complete.
58 // We use this to cancel the timer.
59 grpc_closure on_complete;
60 // The original on_complete closure, which we chain to after our own
61 // closure is invoked.
62 grpc_closure* next_on_complete;
Mark D. Roth14c072c2016-08-26 08:31:34 -070063} base_call_data;
64
65// Additional call data used only for the server filter.
66typedef struct server_call_data {
67 base_call_data base; // Must be first.
68 // The closure for receiving initial metadata.
69 grpc_closure recv_initial_metadata_ready;
70 // Received initial metadata batch.
71 grpc_metadata_batch* recv_initial_metadata;
72 // The original recv_initial_metadata_ready closure, which we chain to
73 // after our own closure is invoked.
74 grpc_closure* next_recv_initial_metadata_ready;
75} server_call_data;
76
77// Constructor for channel_data. Used for both client and server filters.
78static void init_channel_elem(grpc_exec_ctx* exec_ctx,
79 grpc_channel_element* elem,
80 grpc_channel_element_args* args) {
81 GPR_ASSERT(!args->is_last);
82}
83
84// Destructor for channel_data. Used for both client and server filters.
85static void destroy_channel_elem(grpc_exec_ctx* exec_ctx,
86 grpc_channel_element* elem) {
87}
88
89// Constructor for call_data. Used for both client and server filters.
90static grpc_error *init_call_elem(grpc_exec_ctx* exec_ctx,
91 grpc_call_element* elem,
92 grpc_call_element_args* args) {
93 base_call_data* calld = elem->call_data;
94 // Note: size of call data is different between client and server.
95 memset(calld, 0, elem->filter->sizeof_call_data);
96 calld->call_stack = args->call_stack;
Mark D. Roth1bbe6cb2016-08-31 08:33:37 -070097 gpr_mu_init(&calld->timer_mu);
Mark D. Roth14c072c2016-08-26 08:31:34 -070098 return GRPC_ERROR_NONE;
99}
100
101// Destructor for call_data. Used for both client and server filters.
102static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
103 const grpc_call_final_info* final_info,
104 void* and_free_memory) {
Mark D. Roth1bbe6cb2016-08-31 08:33:37 -0700105 base_call_data* calld = elem->call_data;
106 gpr_mu_destroy(&calld->timer_mu);
Mark D. Roth14c072c2016-08-26 08:31:34 -0700107}
108
109// Timer callback.
110static void timer_callback(grpc_exec_ctx *exec_ctx, void *arg,
111 grpc_error *error) {
112 grpc_call_element* elem = arg;
113 base_call_data* calld = elem->call_data;
Mark D. Roth1bbe6cb2016-08-31 08:33:37 -0700114 gpr_mu_lock(&calld->timer_mu);
Mark D. Roth14c072c2016-08-26 08:31:34 -0700115 calld->timer_pending = false;
Mark D. Roth1bbe6cb2016-08-31 08:33:37 -0700116 gpr_mu_unlock(&calld->timer_mu);
Mark D. Roth14c072c2016-08-26 08:31:34 -0700117 if (error != GRPC_ERROR_CANCELLED) {
Mark D. Roth1bbe6cb2016-08-31 08:33:37 -0700118 grpc_call_element_send_cancel(exec_ctx, elem);
Mark D. Roth14c072c2016-08-26 08:31:34 -0700119 }
Mark D. Roth1bbe6cb2016-08-31 08:33:37 -0700120 GRPC_CALL_STACK_UNREF(exec_ctx, calld->call_stack, "deadline_timer");
Mark D. Roth14c072c2016-08-26 08:31:34 -0700121}
122
123// Starts the deadline timer.
124static void start_timer_if_needed(grpc_exec_ctx *exec_ctx,
125 grpc_call_element* elem,
126 gpr_timespec deadline) {
127 base_call_data* calld = elem->call_data;
128 deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
129 if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) {
130 // Take a reference to the call stack, to be owned by the timer.
Mark D. Roth1bbe6cb2016-08-31 08:33:37 -0700131 GRPC_CALL_STACK_REF(calld->call_stack, "deadline_timer");
132 gpr_mu_lock(&calld->timer_mu);
133 calld->timer_pending = true;
Mark D. Roth14c072c2016-08-26 08:31:34 -0700134 grpc_timer_init(exec_ctx, &calld->timer, deadline, timer_callback, elem,
135 gpr_now(GPR_CLOCK_MONOTONIC));
Mark D. Roth1bbe6cb2016-08-31 08:33:37 -0700136 gpr_mu_unlock(&calld->timer_mu);
Mark D. Roth14c072c2016-08-26 08:31:34 -0700137 }
138}
139
Mark D. Roth1bbe6cb2016-08-31 08:33:37 -0700140// Cancels the deadline timer.
141static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx,
142 base_call_data* calld) {
143 gpr_mu_lock(&calld->timer_mu);
144 if (calld->timer_pending) {
145 grpc_timer_cancel(exec_ctx, &calld->timer);
146 calld->timer_pending = false;
147 }
148 gpr_mu_unlock(&calld->timer_mu);
149}
150
Mark D. Rothd2b45332016-08-26 11:18:00 -0700151// Callback run when the call is complete.
152static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
153 base_call_data* calld = arg;
Mark D. Roth1bbe6cb2016-08-31 08:33:37 -0700154 cancel_timer_if_needed(exec_ctx, calld);
Mark D. Rothd2b45332016-08-26 11:18:00 -0700155 // Invoke the next callback.
156 calld->next_on_complete->cb(exec_ctx, calld->next_on_complete->cb_arg, error);
157}
158
Mark D. Roth14c072c2016-08-26 08:31:34 -0700159// Method for starting a call op for client filter.
160static void client_start_transport_stream_op(grpc_exec_ctx* exec_ctx,
161 grpc_call_element* elem,
162 grpc_transport_stream_op* op) {
Mark D. Rothd2b45332016-08-26 11:18:00 -0700163 base_call_data* calld = elem->call_data;
Mark D. Roth1bbe6cb2016-08-31 08:33:37 -0700164 // If the call is cancelled or closed, cancel the timer.
165 if (op->cancel_error != GRPC_ERROR_NONE ||
166 op->close_error != GRPC_ERROR_NONE) {
167 cancel_timer_if_needed(exec_ctx, calld);
168 } else {
169 // If we're sending initial metadata, get the deadline from the metadata
170 // and start the timer if needed.
171 if (op->send_initial_metadata != NULL) {
172 start_timer_if_needed(exec_ctx, elem,
173 op->send_initial_metadata->deadline);
174 }
175 // Make sure we know when the call is complete, so that we can cancel
176 // the timer.
177 if (op->recv_trailing_metadata != NULL) {
178 calld->next_on_complete = op->on_complete;
179 grpc_closure_init(&calld->on_complete, on_complete, calld);
180 op->on_complete = &calld->on_complete;
181 }
Mark D. Rothd2b45332016-08-26 11:18:00 -0700182 }
Mark D. Roth14c072c2016-08-26 08:31:34 -0700183 // Chain to next filter.
184 grpc_call_next_op(exec_ctx, elem, op);
185}
186
187// Callback for receiving initial metadata on the server.
188static void recv_initial_metadata_ready(grpc_exec_ctx *exec_ctx, void *arg,
189 grpc_error *error) {
190 grpc_call_element* elem = arg;
191 server_call_data* calld = elem->call_data;
192 // Get deadline from metadata and start the timer if needed.
193 start_timer_if_needed(exec_ctx, elem,
194 calld->recv_initial_metadata->deadline);
195 // Invoke the next callback.
196 calld->next_recv_initial_metadata_ready->cb(
197 exec_ctx, calld->next_recv_initial_metadata_ready->cb_arg, error);
198}
199
200// Method for starting a call op for server filter.
201static void server_start_transport_stream_op(grpc_exec_ctx* exec_ctx,
202 grpc_call_element* elem,
203 grpc_transport_stream_op* op) {
204 server_call_data* calld = elem->call_data;
Mark D. Roth1bbe6cb2016-08-31 08:33:37 -0700205 // If the call is cancelled or closed, cancel the timer.
206 if (op->cancel_error != GRPC_ERROR_NONE ||
207 op->close_error != GRPC_ERROR_NONE) {
208 cancel_timer_if_needed(exec_ctx, &calld->base);
209 } else {
210 // If we're receiving initial metadata, we need to get the deadline
211 // from the recv_initial_metadata_ready callback. So we inject our
212 // own callback into that hook.
213 if (op->recv_initial_metadata_ready != NULL) {
214 calld->next_recv_initial_metadata_ready = op->recv_initial_metadata_ready;
215 calld->recv_initial_metadata = op->recv_initial_metadata;
216 grpc_closure_init(&calld->recv_initial_metadata_ready,
217 recv_initial_metadata_ready, elem);
218 op->recv_initial_metadata_ready = &calld->recv_initial_metadata_ready;
219 }
220 // Make sure we know when the call is complete, so that we can cancel
221 // the timer.
222 // Note that we trigger this on recv_trailing_metadata, even though
223 // the client never sends trailing metadata, because this is the
224 // hook that tells us when the call is complete on the server side.
225 if (op->recv_trailing_metadata != NULL) {
226 calld->base.next_on_complete = op->on_complete;
227 grpc_closure_init(&calld->base.on_complete, on_complete, calld);
228 op->on_complete = &calld->base.on_complete;
229 }
Mark D. Rothd2b45332016-08-26 11:18:00 -0700230 }
Mark D. Roth14c072c2016-08-26 08:31:34 -0700231 // Chain to next filter.
232 grpc_call_next_op(exec_ctx, elem, op);
233}
234
235const grpc_channel_filter grpc_client_deadline_filter = {
236 client_start_transport_stream_op,
237 grpc_channel_next_op,
238 sizeof(base_call_data),
239 init_call_elem,
240 grpc_call_stack_ignore_set_pollset_or_pollset_set,
241 destroy_call_elem,
242 sizeof(channel_data),
243 init_channel_elem,
244 destroy_channel_elem,
245 grpc_call_next_get_peer,
246 "deadline",
247};
248
249const grpc_channel_filter grpc_server_deadline_filter = {
250 server_start_transport_stream_op,
251 grpc_channel_next_op,
252 sizeof(server_call_data),
253 init_call_elem,
254 grpc_call_stack_ignore_set_pollset_or_pollset_set,
255 destroy_call_elem,
256 sizeof(channel_data),
257 init_channel_elem,
258 destroy_channel_elem,
259 grpc_call_next_get_peer,
260 "deadline",
261};