blob: e4f132624fee3c1ec0b2c5ee7d947ebe7e0be7f9 [file] [log] [blame]
Mark D. Roth14c072c2016-08-26 08:31:34 -07001//
2// Copyright 2016, Google Inc.
3// All rights reserved.
4//
5// Redistribution and use in source and binary forms, with or without
6// modification, are permitted provided that the following conditions are
7// met:
8//
9// * Redistributions of source code must retain the above copyright
10// notice, this list of conditions and the following disclaimer.
11// * Redistributions in binary form must reproduce the above
12// copyright notice, this list of conditions and the following disclaimer
13// in the documentation and/or other materials provided with the
14// distribution.
15// * Neither the name of Google Inc. nor the names of its
16// contributors may be used to endorse or promote products derived from
17// this software without specific prior written permission.
18//
19// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
20// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
21// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
22// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
23// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
26// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
27// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
28// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30//
31
32#include "src/core/lib/channel/deadline_filter.h"
33
34#include <stdbool.h>
35#include <string.h>
36
37#include <grpc/support/log.h>
Mark D. Roth1bbe6cb2016-08-31 08:33:37 -070038#include <grpc/support/sync.h>
Mark D. Roth14c072c2016-08-26 08:31:34 -070039#include <grpc/support/time.h>
40
41#include "src/core/lib/iomgr/timer.h"
42
Mark D. Roth72f6da82016-09-02 13:42:38 -070043//
44// grpc_deadline_state
45//
46
47// Timer callback.
48static void timer_callback(grpc_exec_ctx *exec_ctx, void *arg,
49 grpc_error *error) {
50 grpc_call_element* elem = arg;
51 grpc_deadline_state* deadline_state = elem->call_data;
52gpr_log(GPR_INFO, "==> %s(), is_client=%d", __func__, deadline_state->is_client);
53 gpr_mu_lock(&deadline_state->timer_mu);
54 deadline_state->timer_pending = false;
55 gpr_mu_unlock(&deadline_state->timer_mu);
56 if (error != GRPC_ERROR_CANCELLED) {
57gpr_log(GPR_INFO, "DEADLINE_EXCEEDED");
58// FIXME: change grpc_call_element_send_cancel_with_message to not call close
59// grpc_call_element_send_cancel(exec_ctx, elem);
60 grpc_transport_stream_op op;
61 memset(&op, 0, sizeof(op));
62 op.cancel_error = grpc_error_set_int(
63 GRPC_ERROR_CREATE("Deadline Exceeded"),
64 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_DEADLINE_EXCEEDED);
65 elem->filter->start_transport_stream_op(exec_ctx, elem, &op);
66 }
67 GRPC_CALL_STACK_UNREF(exec_ctx, deadline_state->call_stack, "deadline_timer");
68}
69
70// Starts the deadline timer.
71static void start_timer_if_needed(grpc_exec_ctx *exec_ctx,
72 grpc_call_element* elem,
73 gpr_timespec deadline) {
74 grpc_deadline_state* deadline_state = elem->call_data;
75gpr_log(GPR_INFO, "==> %s(), is_client=%d", __func__, deadline_state->is_client);
76 deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
77 if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) {
78 // Take a reference to the call stack, to be owned by the timer.
79 GRPC_CALL_STACK_REF(deadline_state->call_stack, "deadline_timer");
80 gpr_mu_lock(&deadline_state->timer_mu);
81gpr_log(GPR_INFO, "STARTING TIMER -- is_client=%d", deadline_state->is_client);
82 deadline_state->timer_pending = true;
83 grpc_timer_init(exec_ctx, &deadline_state->timer, deadline, timer_callback,
84 elem, gpr_now(GPR_CLOCK_MONOTONIC));
85 gpr_mu_unlock(&deadline_state->timer_mu);
86 }
87}
88
89// Cancels the deadline timer.
90static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx,
91 grpc_deadline_state* deadline_state) {
92gpr_log(GPR_INFO, "==> %s(), is_client=%d", __func__, deadline_state->is_client);
93 gpr_mu_lock(&deadline_state->timer_mu);
94 if (deadline_state->timer_pending) {
95gpr_log(GPR_INFO, "CANCELLING TIMER -- is_client=%d", deadline_state->is_client);
96 grpc_timer_cancel(exec_ctx, &deadline_state->timer);
97 deadline_state->timer_pending = false;
98 }
99 gpr_mu_unlock(&deadline_state->timer_mu);
100}
101
102// Callback run when the call is complete.
103static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
104 grpc_deadline_state* deadline_state = arg;
105gpr_log(GPR_INFO, "==> %s(), is_client=%d, next_on_complete->cb=%p", __func__, deadline_state->is_client, deadline_state->next_on_complete->cb);
106 cancel_timer_if_needed(exec_ctx, deadline_state);
107 // Invoke the next callback.
108 deadline_state->next_on_complete->cb(
109 exec_ctx, deadline_state->next_on_complete->cb_arg, error);
110}
111
112// Inject our own on_complete callback into op.
113static void inject_on_complete_cb(grpc_deadline_state* deadline_state,
114 grpc_transport_stream_op* op) {
115gpr_log(GPR_INFO, "==> %s(), is_client=%d", __func__, deadline_state->is_client);
116 deadline_state->next_on_complete = op->on_complete;
117 grpc_closure_init(&deadline_state->on_complete, on_complete, deadline_state);
118 op->on_complete = &deadline_state->on_complete;
119}
120
121void grpc_deadline_state_init(grpc_deadline_state* deadline_state,
122 grpc_call_stack* call_stack) {
123gpr_log(GPR_INFO, "==> %s()", __func__);
124 memset(deadline_state, 0, sizeof(*deadline_state));
125 deadline_state->call_stack = call_stack;
126 gpr_mu_init(&deadline_state->timer_mu);
127}
128
129void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx,
130 grpc_deadline_state* deadline_state) {
131gpr_log(GPR_INFO, "==> %s(), is_client=%d", __func__, deadline_state->is_client);
132 cancel_timer_if_needed(exec_ctx, deadline_state);
133 gpr_mu_destroy(&deadline_state->timer_mu);
134}
135
136void grpc_deadline_state_client_start_transport_stream_op(
137 grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
138 grpc_transport_stream_op* op) {
139gpr_log(GPR_INFO, "==> %s(): op=%p {on_complete=%p, cancel_error=%p, close_error=%p, send_initial_metadata=%p, send_trailing_metadata=%p, send_message=%p, recv_initial_metadata_ready=%p, recv_trailing_metadata=%p}", __func__, op, op->on_complete, op->cancel_error, op->close_error, op->send_initial_metadata, op->send_trailing_metadata, op->send_message, op->recv_initial_metadata_ready, op->recv_trailing_metadata);
140 grpc_deadline_state* deadline_state = elem->call_data;
141 if (op->cancel_error != GRPC_ERROR_NONE ||
142 op->close_error != GRPC_ERROR_NONE) {
143 cancel_timer_if_needed(exec_ctx, deadline_state);
144 } else {
145 // If we're sending initial metadata, get the deadline from the metadata
146 // and start the timer if needed.
147 if (op->send_initial_metadata != NULL) {
148 start_timer_if_needed(exec_ctx, elem,
149 op->send_initial_metadata->deadline);
150 }
151 // Make sure we know when the call is complete, so that we can cancel
152 // the timer.
153 if (op->recv_trailing_metadata != NULL) {
154 inject_on_complete_cb(deadline_state, op);
155 }
156 }
157}
158
159//
160// filter code
161//
162
Mark D. Roth14c072c2016-08-26 08:31:34 -0700163// Used for both client and server filters.
164typedef struct channel_data {
165} channel_data;
166
Mark D. Roth72f6da82016-09-02 13:42:38 -0700167// Constructor for channel_data. Used for both client and server filters.
168static void init_channel_elem(grpc_exec_ctx* exec_ctx,
169 grpc_channel_element* elem,
170 grpc_channel_element_args* args) {
171 GPR_ASSERT(!args->is_last);
172}
173
174// Destructor for channel_data. Used for both client and server filters.
175static void destroy_channel_elem(grpc_exec_ctx* exec_ctx,
176 grpc_channel_element* elem) {
177}
178
Mark D. Roth14c072c2016-08-26 08:31:34 -0700179// Call data used for both client and server filter.
180typedef struct base_call_data {
Mark D. Roth72f6da82016-09-02 13:42:38 -0700181 grpc_deadline_state deadline_state;
Mark D. Roth14c072c2016-08-26 08:31:34 -0700182} base_call_data;
183
184// Additional call data used only for the server filter.
185typedef struct server_call_data {
186 base_call_data base; // Must be first.
187 // The closure for receiving initial metadata.
188 grpc_closure recv_initial_metadata_ready;
189 // Received initial metadata batch.
190 grpc_metadata_batch* recv_initial_metadata;
191 // The original recv_initial_metadata_ready closure, which we chain to
192 // after our own closure is invoked.
193 grpc_closure* next_recv_initial_metadata_ready;
194} server_call_data;
195
Mark D. Roth14c072c2016-08-26 08:31:34 -0700196// Constructor for call_data. Used for both client and server filters.
197static grpc_error *init_call_elem(grpc_exec_ctx* exec_ctx,
198 grpc_call_element* elem,
199 grpc_call_element_args* args) {
Mark D. Roth72f6da82016-09-02 13:42:38 -0700200gpr_log(GPR_INFO, "==> %s() -- call_data_size=%lu", __func__, elem->filter->sizeof_call_data);
Mark D. Roth14c072c2016-08-26 08:31:34 -0700201 base_call_data* calld = elem->call_data;
202 // Note: size of call data is different between client and server.
203 memset(calld, 0, elem->filter->sizeof_call_data);
Mark D. Roth72f6da82016-09-02 13:42:38 -0700204 grpc_deadline_state_init(&calld->deadline_state, args->call_stack);
205
206calld->deadline_state.is_client = elem->filter->sizeof_call_data == sizeof(base_call_data);
207
Mark D. Roth14c072c2016-08-26 08:31:34 -0700208 return GRPC_ERROR_NONE;
209}
210
211// Destructor for call_data. Used for both client and server filters.
212static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
213 const grpc_call_final_info* final_info,
214 void* and_free_memory) {
Mark D. Roth1bbe6cb2016-08-31 08:33:37 -0700215 base_call_data* calld = elem->call_data;
Mark D. Roth72f6da82016-09-02 13:42:38 -0700216 grpc_deadline_state_destroy(exec_ctx, &calld->deadline_state);
Mark D. Rothd2b45332016-08-26 11:18:00 -0700217}
218
Mark D. Roth14c072c2016-08-26 08:31:34 -0700219// Method for starting a call op for client filter.
220static void client_start_transport_stream_op(grpc_exec_ctx* exec_ctx,
221 grpc_call_element* elem,
222 grpc_transport_stream_op* op) {
Mark D. Roth72f6da82016-09-02 13:42:38 -0700223gpr_log(GPR_INFO, "==> %s()", __func__);
224 grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op);
Mark D. Roth14c072c2016-08-26 08:31:34 -0700225 // Chain to next filter.
226 grpc_call_next_op(exec_ctx, elem, op);
227}
228
229// Callback for receiving initial metadata on the server.
230static void recv_initial_metadata_ready(grpc_exec_ctx *exec_ctx, void *arg,
231 grpc_error *error) {
232 grpc_call_element* elem = arg;
233 server_call_data* calld = elem->call_data;
234 // Get deadline from metadata and start the timer if needed.
235 start_timer_if_needed(exec_ctx, elem,
236 calld->recv_initial_metadata->deadline);
237 // Invoke the next callback.
238 calld->next_recv_initial_metadata_ready->cb(
239 exec_ctx, calld->next_recv_initial_metadata_ready->cb_arg, error);
240}
241
242// Method for starting a call op for server filter.
243static void server_start_transport_stream_op(grpc_exec_ctx* exec_ctx,
244 grpc_call_element* elem,
245 grpc_transport_stream_op* op) {
Mark D. Roth72f6da82016-09-02 13:42:38 -0700246gpr_log(GPR_INFO, "==> %s(): op=%p {on_complete=%p, cancel_error=%p, close_error=%p, send_initial_metadata=%p, send_trailing_metadata=%p, send_message=%p, recv_initial_metadata_ready=%p, recv_trailing_metadata=%p}", __func__, op, op->on_complete, op->cancel_error, op->close_error, op->send_initial_metadata, op->send_trailing_metadata, op->send_message, op->recv_initial_metadata_ready, op->recv_trailing_metadata);
Mark D. Roth14c072c2016-08-26 08:31:34 -0700247 server_call_data* calld = elem->call_data;
Mark D. Roth1bbe6cb2016-08-31 08:33:37 -0700248 if (op->cancel_error != GRPC_ERROR_NONE ||
249 op->close_error != GRPC_ERROR_NONE) {
Mark D. Roth72f6da82016-09-02 13:42:38 -0700250 cancel_timer_if_needed(exec_ctx, &calld->base.deadline_state);
Mark D. Roth1bbe6cb2016-08-31 08:33:37 -0700251 } else {
252 // If we're receiving initial metadata, we need to get the deadline
253 // from the recv_initial_metadata_ready callback. So we inject our
254 // own callback into that hook.
255 if (op->recv_initial_metadata_ready != NULL) {
256 calld->next_recv_initial_metadata_ready = op->recv_initial_metadata_ready;
257 calld->recv_initial_metadata = op->recv_initial_metadata;
258 grpc_closure_init(&calld->recv_initial_metadata_ready,
259 recv_initial_metadata_ready, elem);
260 op->recv_initial_metadata_ready = &calld->recv_initial_metadata_ready;
261 }
262 // Make sure we know when the call is complete, so that we can cancel
263 // the timer.
264 // Note that we trigger this on recv_trailing_metadata, even though
265 // the client never sends trailing metadata, because this is the
266 // hook that tells us when the call is complete on the server side.
267 if (op->recv_trailing_metadata != NULL) {
Mark D. Roth72f6da82016-09-02 13:42:38 -0700268 inject_on_complete_cb(&calld->base.deadline_state, op);
Mark D. Roth1bbe6cb2016-08-31 08:33:37 -0700269 }
Mark D. Rothd2b45332016-08-26 11:18:00 -0700270 }
Mark D. Roth14c072c2016-08-26 08:31:34 -0700271 // Chain to next filter.
272 grpc_call_next_op(exec_ctx, elem, op);
273}
274
275const grpc_channel_filter grpc_client_deadline_filter = {
276 client_start_transport_stream_op,
277 grpc_channel_next_op,
278 sizeof(base_call_data),
279 init_call_elem,
280 grpc_call_stack_ignore_set_pollset_or_pollset_set,
281 destroy_call_elem,
282 sizeof(channel_data),
283 init_channel_elem,
284 destroy_channel_elem,
285 grpc_call_next_get_peer,
286 "deadline",
287};
288
289const grpc_channel_filter grpc_server_deadline_filter = {
290 server_start_transport_stream_op,
291 grpc_channel_next_op,
292 sizeof(server_call_data),
293 init_call_elem,
294 grpc_call_stack_ignore_set_pollset_or_pollset_set,
295 destroy_call_elem,
296 sizeof(channel_data),
297 init_channel_elem,
298 destroy_channel_elem,
299 grpc_call_next_get_peer,
300 "deadline",
301};