blob: 97d50a91be58ce68c92680e1ed453b88472b925c [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller6169d5f2016-03-31 07:46:18 -07003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
murgatroid999030c812016-09-16 13:25:08 -070033
David Garcia Quintasf74a49e2015-06-18 17:22:45 -070034#include <assert.h>
Craig Tillerc7e1a2a2015-11-02 14:17:32 -080035#include <limits.h>
David Garcia Quintasf74a49e2015-06-18 17:22:45 -070036#include <stdio.h>
37#include <stdlib.h>
38#include <string.h>
39
40#include <grpc/compression.h>
murgatroid99c3910ca2016-01-06 13:14:23 -080041#include <grpc/grpc.h>
Craig Tiller0f310802016-10-26 16:25:56 -070042#include <grpc/slice.h>
David Garcia Quintasf74a49e2015-06-18 17:22:45 -070043#include <grpc/support/alloc.h>
44#include <grpc/support/log.h>
45#include <grpc/support/string_util.h>
David Garcia Quintase091af82015-07-15 21:37:02 -070046#include <grpc/support/useful.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080047
Craig Tiller9533d042016-03-25 17:11:06 -070048#include "src/core/lib/channel/channel_stack.h"
49#include "src/core/lib/compression/algorithm_metadata.h"
50#include "src/core/lib/iomgr/timer.h"
51#include "src/core/lib/profiling/timers.h"
Craig Tillera59c16c2016-10-31 07:25:01 -070052#include "src/core/lib/slice/slice_internal.h"
Craig Tiller0f310802016-10-26 16:25:56 -070053#include "src/core/lib/slice/slice_string_helpers.h"
Craig Tillere7a17022017-03-13 10:20:38 -070054#include "src/core/lib/support/arena.h"
Craig Tiller9533d042016-03-25 17:11:06 -070055#include "src/core/lib/support/string.h"
56#include "src/core/lib/surface/api_trace.h"
57#include "src/core/lib/surface/call.h"
58#include "src/core/lib/surface/channel.h"
59#include "src/core/lib/surface/completion_queue.h"
Craig Tillerf2b5b7e2017-01-10 08:28:59 -080060#include "src/core/lib/surface/validate_metadata.h"
Craig Tiller732351f2016-12-13 16:40:38 -080061#include "src/core/lib/transport/error_utils.h"
David Garcia Quintas73dcbda2016-04-23 00:17:05 -070062#include "src/core/lib/transport/metadata.h"
Craig Tiller9533d042016-03-25 17:11:06 -070063#include "src/core/lib/transport/static_metadata.h"
David Garcia Quintas73dcbda2016-04-23 00:17:05 -070064#include "src/core/lib/transport/transport.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080065
Craig Tillerc7e1a2a2015-11-02 14:17:32 -080066/** The maximum number of concurrent batches possible.
Craig Tiller1b011672015-07-10 10:41:44 -070067 Based upon the maximum number of individually queueable ops in the batch
Craig Tiller94903892016-10-11 15:43:35 -070068 api:
Craig Tiller1b011672015-07-10 10:41:44 -070069 - initial metadata send
70 - message send
71 - status/close send (depending on client/server)
72 - initial metadata recv
73 - message recv
74 - status/close recv (depending on client/server) */
Craig Tillerc7e1a2a2015-11-02 14:17:32 -080075#define MAX_CONCURRENT_BATCHES 6
Craig Tiller1b011672015-07-10 10:41:44 -070076
Craig Tillerc7e1a2a2015-11-02 14:17:32 -080077#define MAX_SEND_EXTRA_METADATA_COUNT 3
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080078
Craig Tillerdaceea82015-02-02 16:15:53 -080079/* Status data for a request can come from several sources; this
80 enumerates them all, and acts as a priority sorting for which
81 status to return to the application - earlier entries override
82 later ones */
Craig Tillera82950e2015-09-22 12:33:20 -070083typedef enum {
Craig Tillerdaceea82015-02-02 16:15:53 -080084 /* Status came from the application layer overriding whatever
85 the wire says */
Craig Tiller68752722015-01-29 14:59:54 -080086 STATUS_FROM_API_OVERRIDE = 0,
Craig Tillerdaceea82015-02-02 16:15:53 -080087 /* Status came from 'the wire' - or somewhere below the surface
88 layer */
Craig Tiller68752722015-01-29 14:59:54 -080089 STATUS_FROM_WIRE,
Craig Tiller2dc32ea2017-01-31 15:32:34 -080090 /* Status was created by some internal channel stack operation: must come via
91 add_batch_error */
Craig Tiller2aa03df2016-03-16 08:24:55 -070092 STATUS_FROM_CORE,
Craig Tiller2dc32ea2017-01-31 15:32:34 -080093 /* Status was created by some surface error */
94 STATUS_FROM_SURFACE,
Craig Tilleraea081f2015-06-11 14:19:33 -070095 /* Status came from the server sending status */
96 STATUS_FROM_SERVER_STATUS,
Craig Tiller68752722015-01-29 14:59:54 -080097 STATUS_SOURCE_COUNT
98} status_source;
99
Craig Tillera82950e2015-09-22 12:33:20 -0700100typedef struct {
Craig Tiller841a99d2016-12-12 16:58:57 -0800101 bool is_set;
102 grpc_error *error;
Craig Tiller68752722015-01-29 14:59:54 -0800103} received_status;
104
Craig Tiller4bab9462017-02-22 08:56:02 -0800105static gpr_atm pack_received_status(received_status r) {
106 return r.is_set ? (1 | (gpr_atm)r.error) : 0;
107}
108
109static received_status unpack_received_status(gpr_atm atm) {
110 return (atm & 1) == 0
111 ? (received_status){.is_set = false, .error = GRPC_ERROR_NONE}
112 : (received_status){.is_set = true,
113 .error = (grpc_error *)(atm & ~(gpr_atm)1)};
114}
115
yang-g23f777d2017-02-22 23:32:26 -0800116#define MAX_ERRORS_PER_BATCH 4
Craig Tiller94903892016-10-11 15:43:35 -0700117
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800118typedef struct batch_control {
119 grpc_call *call;
Craig Tillere198b712017-03-31 15:29:33 -0700120 /* Share memory for cq_completion and notify_tag as they are never needed
121 simultaneously. Each byte used in this data structure count as six bytes
Craig Tiller7a8232d2017-04-03 10:59:42 -0700122 per call, so any savings we can make are worthwhile,
123
124 We use notify_tag to determine whether or not to send notification to the
125 completion queue. Once we've made that determination, we can reuse the
126 memory for cq_completion. */
Craig Tillerea54b8c2017-03-01 16:58:28 -0800127 union {
128 grpc_cq_completion cq_completion;
129 struct {
Craig Tillere198b712017-03-31 15:29:33 -0700130 /* Any given op indicates completion by either (a) calling a closure or
131 (b) sending a notification on the call's completion queue. If
132 \a is_closure is true, \a tag indicates a closure to be invoked;
133 otherwise, \a tag indicates the tag to be used in the notification to
134 be sent to the completion queue. */
Craig Tillerea54b8c2017-03-01 16:58:28 -0800135 void *tag;
136 bool is_closure;
137 } notify_tag;
138 } completion_data;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800139 grpc_closure finish_batch;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800140 gpr_refcount steps_to_complete;
Craig Tiller94903892016-10-11 15:43:35 -0700141
142 grpc_error *errors[MAX_ERRORS_PER_BATCH];
143 gpr_atm num_errors;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800144
Craig Tillera0f3abd2017-03-31 15:42:16 -0700145 grpc_transport_stream_op_batch op;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800146} batch_control;
147
Craig Tiller1c10a7b2017-03-29 14:35:16 -0700148typedef struct {
149 gpr_mu child_list_mu;
150 grpc_call *first_child;
151} parent_call;
152
153typedef struct {
154 grpc_call *parent;
155 /** siblings: children of the same parent form a list, and this list is
156 protected under
157 parent->mu */
158 grpc_call *sibling_next;
159 grpc_call *sibling_prev;
160} child_call;
161
Craig Tillera82950e2015-09-22 12:33:20 -0700162struct grpc_call {
Craig Tillere7a17022017-03-13 10:20:38 -0700163 gpr_arena *arena;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800164 grpc_completion_queue *cq;
David Garcia Quintas2a50dfe2016-05-31 15:09:12 -0700165 grpc_polling_entity pollent;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800166 grpc_channel *channel;
Mark D. Roth3d883412016-11-07 13:42:54 -0800167 gpr_timespec start_time;
Craig Tiller1c10a7b2017-03-29 14:35:16 -0700168 /* parent_call* */ gpr_atm parent_call_atm;
169 child_call *child_call;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800170
Craig Tillere5d683c2015-02-03 16:37:36 -0800171 /* client or server call */
Craig Tiller1cbf5762016-04-22 16:02:55 -0700172 bool is_client;
Craig Tillerf3fba742015-06-11 09:36:33 -0700173 /** has grpc_call_destroy been called */
Craig Tiller1cbf5762016-04-22 16:02:55 -0700174 bool destroy_called;
Craig Tillerc7df0df2015-08-03 08:06:50 -0700175 /** flag indicating that cancellation is inherited */
Craig Tiller1cbf5762016-04-22 16:02:55 -0700176 bool cancellation_is_inherited;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800177 /** which ops are in-flight */
Craig Tiller1cbf5762016-04-22 16:02:55 -0700178 bool sent_initial_metadata;
179 bool sending_message;
180 bool sent_final_op;
181 bool received_initial_metadata;
182 bool receiving_message;
183 bool requested_final_op;
Craig Tillerb597dcf2017-03-09 07:02:11 -0800184 gpr_atm any_ops_sent_atm;
185 gpr_atm received_final_op_atm;
yang-g0b6ad7d2015-06-25 14:39:01 -0700186
Craig Tillera44cbfc2016-02-03 16:02:49 -0800187 /* have we received initial metadata */
188 bool has_initial_md_been_received;
189
Craig Tillerb58de722017-03-29 14:15:12 -0700190 batch_control *active_batches[MAX_CONCURRENT_BATCHES];
Craig Tillera0f3abd2017-03-31 15:42:16 -0700191 grpc_transport_stream_op_batch_payload stream_op_payload;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800192
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800193 /* first idx: is_receiving, second idx: is_trailing */
194 grpc_metadata_batch metadata_batch[2][2];
Craig Tillerebf94bf2015-02-05 08:48:46 -0800195
Craig Tillere5d683c2015-02-03 16:37:36 -0800196 /* Buffered read metadata waiting to be returned to the application.
197 Element 0 is initial metadata, element 1 is trailing metadata. */
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800198 grpc_metadata_array *buffered_metadata[2];
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800199
Craig Tiller4bab9462017-02-22 08:56:02 -0800200 /* Packed received call statuses from various sources */
201 gpr_atm status[STATUS_SOURCE_COUNT];
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800202
David Garcia Quintas01c4d992016-07-07 20:11:27 -0700203 /* Call data useful used for reporting. Only valid after the call has
204 * completed */
205 grpc_call_final_info final_info;
Craig Tiller466129e2016-03-09 14:43:18 -0800206
David Garcia Quintas749367f2016-05-17 19:15:24 -0700207 /* Compression algorithm for *incoming* data */
208 grpc_compression_algorithm incoming_compression_algorithm;
David Garcia Quintase091af82015-07-15 21:37:02 -0700209 /* Supported encodings (compression algorithms), a bitset */
Craig Tiller7536af02015-12-22 13:49:30 -0800210 uint32_t encodings_accepted_by_peer;
David Garcia Quintasb8edf7e2015-07-08 20:18:57 -0700211
Julien Boeufc6f8d0a2015-05-11 22:40:02 -0700212 /* Contexts for various subsystems (security, tracing, ...). */
Julien Boeuf83b02972015-05-20 22:50:34 -0700213 grpc_call_context_element context[GRPC_CONTEXT_COUNT];
Craig Tiller935cf422015-05-01 14:10:46 -0700214
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800215 /* for the client, extra metadata is initial metadata; for the
216 server, it's trailing metadata */
217 grpc_linked_mdelem send_extra_metadata[MAX_SEND_EXTRA_METADATA_COUNT];
218 int send_extra_metadata_count;
Craig Tiller6902ad22015-04-16 08:01:49 -0700219 gpr_timespec send_deadline;
220
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800221 grpc_slice_buffer_stream sending_stream;
Craig Tiller94903892016-10-11 15:43:35 -0700222
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800223 grpc_byte_stream *receiving_stream;
224 grpc_byte_buffer **receiving_buffer;
Craig Tillerd41a4a72016-10-26 16:16:06 -0700225 grpc_slice receiving_slice;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800226 grpc_closure receiving_slice_ready;
227 grpc_closure receiving_stream_ready;
Craig Tillera44cbfc2016-02-03 16:02:49 -0800228 grpc_closure receiving_initial_metadata_ready;
Craig Tiller7536af02015-12-22 13:49:30 -0800229 uint32_t test_only_last_message_flags;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800230
Craig Tillere7a17022017-03-13 10:20:38 -0700231 grpc_closure release_call;
232
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800233 union {
234 struct {
235 grpc_status_code *status;
Craig Tiller68208fe2016-11-14 14:35:02 -0800236 grpc_slice *status_details;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800237 } client;
238 struct {
239 int *cancelled;
240 } server;
241 } final_op;
Craig Tillera44cbfc2016-02-03 16:02:49 -0800242
Craig Tiller8a677802016-04-22 15:07:53 -0700243 void *saved_receiving_stream_ready_bctlp;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800244};
245
Craig Tiller58b30cd2017-01-31 17:07:36 -0800246int grpc_call_error_trace = 0;
247
Craig Tiller87d5b192015-04-16 14:37:57 -0700248#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800249#define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1)
250#define CALL_ELEM_FROM_CALL(call, idx) \
251 grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx)
252#define CALL_FROM_TOP_ELEM(top_elem) \
253 CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
254
Craig Tillera82950e2015-09-22 12:33:20 -0700255static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
Craig Tillera0f3abd2017-03-31 15:42:16 -0700256 grpc_transport_stream_op_batch *op);
Craig Tiller2dc32ea2017-01-31 15:32:34 -0800257static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
258 status_source source, grpc_status_code status,
259 const char *description);
Craig Tiller255edaa2016-12-13 09:04:55 -0800260static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c,
Craig Tiller2dc32ea2017-01-31 15:32:34 -0800261 status_source source, grpc_error *error);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800262static void destroy_call(grpc_exec_ctx *exec_ctx, void *call_stack,
Craig Tillerc027e772016-05-03 16:27:00 -0700263 grpc_error *error);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800264static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
Craig Tillerc027e772016-05-03 16:27:00 -0700265 grpc_error *error);
Craig Tiller841a99d2016-12-12 16:58:57 -0800266static void get_final_status(grpc_call *call,
267 void (*set_value)(grpc_status_code code,
268 void *user_data),
269 void *set_value_user_data, grpc_slice *details);
270static void set_status_value_directly(grpc_status_code status, void *dest);
271static void set_status_from_error(grpc_exec_ctx *exec_ctx, grpc_call *call,
272 status_source source, grpc_error *error);
273static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl);
274static void post_batch_completion(grpc_exec_ctx *exec_ctx, batch_control *bctl);
275static void add_batch_error(grpc_exec_ctx *exec_ctx, batch_control *bctl,
yang-g23f777d2017-02-22 23:32:26 -0800276 grpc_error *error, bool has_cancelled);
Craig Tillerbac41422015-05-29 16:32:28 -0700277
Craig Tillerf4484cd2017-02-01 08:28:40 -0800278static void add_init_error(grpc_error **composite, grpc_error *new) {
279 if (new == GRPC_ERROR_NONE) return;
280 if (*composite == GRPC_ERROR_NONE)
ncteisen4b36a3d2017-03-13 19:08:06 -0700281 *composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Call creation failed");
Craig Tillerf4484cd2017-02-01 08:28:40 -0800282 *composite = grpc_error_add_child(*composite, new);
283}
284
Craig Tiller1c10a7b2017-03-29 14:35:16 -0700285static parent_call *get_or_create_parent_call(grpc_call *call) {
286 parent_call *p = (parent_call *)gpr_atm_acq_load(&call->parent_call_atm);
287 if (p == NULL) {
288 p = gpr_arena_alloc(call->arena, sizeof(*p));
289 gpr_mu_init(&p->child_list_mu);
290 if (!gpr_atm_rel_cas(&call->parent_call_atm, (gpr_atm)NULL, (gpr_atm)p)) {
291 gpr_mu_destroy(&p->child_list_mu);
292 p = (parent_call *)gpr_atm_acq_load(&call->parent_call_atm);
293 }
294 }
295 return p;
296}
297
298static parent_call *get_parent_call(grpc_call *call) {
299 return (parent_call *)gpr_atm_acq_load(&call->parent_call_atm);
300}
301
Craig Tillera59c16c2016-10-31 07:25:01 -0700302grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
303 const grpc_call_create_args *args,
Craig Tiller8e214652016-08-19 09:54:31 -0700304 grpc_call **out_call) {
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800305 size_t i, j;
Craig Tillerf4484cd2017-02-01 08:28:40 -0800306 grpc_error *error = GRPC_ERROR_NONE;
Craig Tiller8e214652016-08-19 09:54:31 -0700307 grpc_channel_stack *channel_stack =
308 grpc_channel_get_channel_stack(args->channel);
Craig Tiller1f41b6b2015-10-09 15:07:02 -0700309 grpc_call *call;
Craig Tiller0ba432d2015-10-09 16:57:11 -0700310 GPR_TIMER_BEGIN("grpc_call_create", 0);
Craig Tillera6bec8f2017-03-14 08:26:04 -0700311 gpr_arena *arena =
312 gpr_arena_create(grpc_channel_get_call_size_estimate(args->channel));
Craig Tillere7a17022017-03-13 10:20:38 -0700313 call = gpr_arena_alloc(arena,
314 sizeof(grpc_call) + channel_stack->call_stack_size);
315 call->arena = arena;
Craig Tiller0eaed722016-09-21 10:44:18 -0700316 *out_call = call;
Craig Tiller8e214652016-08-19 09:54:31 -0700317 call->channel = args->channel;
318 call->cq = args->cq;
Mark D. Roth3d883412016-11-07 13:42:54 -0800319 call->start_time = gpr_now(GPR_CLOCK_MONOTONIC);
David Garcia Quintas46123372016-05-09 15:28:42 -0700320 /* Always support no compression */
321 GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
Craig Tiller8e214652016-08-19 09:54:31 -0700322 call->is_client = args->server_transport_data == NULL;
Craig Tillerea54b8c2017-03-01 16:58:28 -0800323 call->stream_op_payload.context = call->context;
Craig Tiller4eecdde2016-11-14 08:21:17 -0800324 grpc_slice path = grpc_empty_slice();
Craig Tillera82950e2015-09-22 12:33:20 -0700325 if (call->is_client) {
Craig Tiller8e214652016-08-19 09:54:31 -0700326 GPR_ASSERT(args->add_initial_metadata_count <
327 MAX_SEND_EXTRA_METADATA_COUNT);
328 for (i = 0; i < args->add_initial_metadata_count; i++) {
329 call->send_extra_metadata[i].md = args->add_initial_metadata[i];
Craig Tiller3b05e1d2016-11-21 13:46:31 -0800330 if (grpc_slice_eq(GRPC_MDKEY(args->add_initial_metadata[i]),
331 GRPC_MDSTR_PATH)) {
Craig Tiller0160de92016-11-18 08:46:46 -0800332 path = grpc_slice_ref_internal(
333 GRPC_MDVALUE(args->add_initial_metadata[i]));
Mark D. Rothaa850a72016-09-26 13:38:02 -0700334 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800335 }
Craig Tiller8e214652016-08-19 09:54:31 -0700336 call->send_extra_metadata_count = (int)args->add_initial_metadata_count;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800337 } else {
Craig Tiller8e214652016-08-19 09:54:31 -0700338 GPR_ASSERT(args->add_initial_metadata_count == 0);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800339 call->send_extra_metadata_count = 0;
Craig Tillera82950e2015-09-22 12:33:20 -0700340 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800341 for (i = 0; i < 2; i++) {
342 for (j = 0; j < 2; j++) {
343 call->metadata_batch[i][j].deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
344 }
Craig Tillera82950e2015-09-22 12:33:20 -0700345 }
Craig Tillerca3451d2016-09-29 10:27:44 -0700346 gpr_timespec send_deadline =
Craig Tiller8e214652016-08-19 09:54:31 -0700347 gpr_convert_clock_type(args->send_deadline, GPR_CLOCK_MONOTONIC);
Mark D. Rothf28763c2016-09-14 15:18:40 -0700348
Craig Tiller8e214652016-08-19 09:54:31 -0700349 if (args->parent_call != NULL) {
Craig Tiller1c10a7b2017-03-29 14:35:16 -0700350 child_call *cc = call->child_call =
351 gpr_arena_alloc(arena, sizeof(child_call));
352 call->child_call->parent = args->parent_call;
353
Craig Tiller8e214652016-08-19 09:54:31 -0700354 GRPC_CALL_INTERNAL_REF(args->parent_call, "child");
Craig Tillera82950e2015-09-22 12:33:20 -0700355 GPR_ASSERT(call->is_client);
Craig Tiller8e214652016-08-19 09:54:31 -0700356 GPR_ASSERT(!args->parent_call->is_client);
Craig Tillera82950e2015-09-22 12:33:20 -0700357
Craig Tiller1c10a7b2017-03-29 14:35:16 -0700358 parent_call *pc = get_or_create_parent_call(args->parent_call);
359
360 gpr_mu_lock(&pc->child_list_mu);
Craig Tillera82950e2015-09-22 12:33:20 -0700361
Craig Tiller8e214652016-08-19 09:54:31 -0700362 if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) {
Craig Tillera82950e2015-09-22 12:33:20 -0700363 send_deadline = gpr_time_min(
364 gpr_convert_clock_type(send_deadline,
Craig Tiller8e214652016-08-19 09:54:31 -0700365 args->parent_call->send_deadline.clock_type),
366 args->parent_call->send_deadline);
Craig Tillerc7df0df2015-08-03 08:06:50 -0700367 }
Craig Tillera82950e2015-09-22 12:33:20 -0700368 /* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with
369 * GRPC_PROPAGATE_STATS_CONTEXT */
370 /* TODO(ctiller): This should change to use the appropriate census start_op
371 * call. */
Craig Tiller8e214652016-08-19 09:54:31 -0700372 if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT) {
Craig Tiller239af8b2017-02-01 10:21:42 -0800373 if (0 == (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT)) {
ncteisen4b36a3d2017-03-13 19:08:06 -0700374 add_init_error(&error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
375 "Census tracing propagation requested "
376 "without Census context propagation"));
Craig Tiller239af8b2017-02-01 10:21:42 -0800377 }
Craig Tiller8e214652016-08-19 09:54:31 -0700378 grpc_call_context_set(
379 call, GRPC_CONTEXT_TRACING,
380 args->parent_call->context[GRPC_CONTEXT_TRACING].value, NULL);
Craig Tillerf20d3072017-02-01 10:39:26 -0800381 } else if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT) {
ncteisen4b36a3d2017-03-13 19:08:06 -0700382 add_init_error(&error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
383 "Census context propagation requested "
384 "without Census tracing propagation"));
Craig Tiller45724b32015-09-22 10:42:19 -0700385 }
Craig Tiller8e214652016-08-19 09:54:31 -0700386 if (args->propagation_mask & GRPC_PROPAGATE_CANCELLATION) {
Craig Tillera82950e2015-09-22 12:33:20 -0700387 call->cancellation_is_inherited = 1;
Craig Tiller123c72b2017-03-10 07:33:27 -0800388 if (gpr_atm_acq_load(&args->parent_call->received_final_op_atm)) {
389 cancel_with_error(exec_ctx, call, STATUS_FROM_API_OVERRIDE,
390 GRPC_ERROR_CANCELLED);
391 }
Craig Tiller45724b32015-09-22 10:42:19 -0700392 }
Craig Tillera82950e2015-09-22 12:33:20 -0700393
Craig Tiller1c10a7b2017-03-29 14:35:16 -0700394 if (pc->first_child == NULL) {
395 pc->first_child = call;
396 cc->sibling_next = cc->sibling_prev = call;
Craig Tillera82950e2015-09-22 12:33:20 -0700397 } else {
Craig Tiller1c10a7b2017-03-29 14:35:16 -0700398 cc->sibling_next = pc->first_child;
399 cc->sibling_prev = pc->first_child->child_call->sibling_prev;
400 cc->sibling_next->child_call->sibling_prev =
401 cc->sibling_prev->child_call->sibling_next = call;
Craig Tillera82950e2015-09-22 12:33:20 -0700402 }
403
Craig Tiller1c10a7b2017-03-29 14:35:16 -0700404 gpr_mu_unlock(&pc->child_list_mu);
Craig Tillera82950e2015-09-22 12:33:20 -0700405 }
Mark D. Rothf28763c2016-09-14 15:18:40 -0700406
Mark D. Roth14c072c2016-08-26 08:31:34 -0700407 call->send_deadline = send_deadline;
Mark D. Rothf28763c2016-09-14 15:18:40 -0700408
Craig Tillerca3451d2016-09-29 10:27:44 -0700409 GRPC_CHANNEL_INTERNAL_REF(args->channel, "call");
Mark D. Rothf28763c2016-09-14 15:18:40 -0700410 /* initial refcount dropped by grpc_call_destroy */
Craig Tillerd426cac2017-03-13 12:30:45 -0700411 grpc_call_element_args call_args = {
412 .call_stack = CALL_STACK_FROM_CALL(call),
413 .server_transport_data = args->server_transport_data,
414 .context = call->context,
415 .path = path,
416 .start_time = call->start_time,
417 .deadline = send_deadline,
418 .arena = call->arena};
Craig Tillerf4484cd2017-02-01 08:28:40 -0800419 add_init_error(&error, grpc_call_stack_init(exec_ctx, channel_stack, 1,
Craig Tillerd426cac2017-03-13 12:30:45 -0700420 destroy_call, call, &call_args));
Mark D. Rothf28763c2016-09-14 15:18:40 -0700421 if (error != GRPC_ERROR_NONE) {
Craig Tiller58b30cd2017-01-31 17:07:36 -0800422 cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE,
423 GRPC_ERROR_REF(error));
Craig Tillera82950e2015-09-22 12:33:20 -0700424 }
Craig Tillerca3451d2016-09-29 10:27:44 -0700425 if (args->cq != NULL) {
Mark D. Rothf28763c2016-09-14 15:18:40 -0700426 GPR_ASSERT(
Craig Tillerca3451d2016-09-29 10:27:44 -0700427 args->pollset_set_alternative == NULL &&
Mark D. Rothf28763c2016-09-14 15:18:40 -0700428 "Only one of 'cq' and 'pollset_set_alternative' should be non-NULL.");
Craig Tillerca3451d2016-09-29 10:27:44 -0700429 GRPC_CQ_INTERNAL_REF(args->cq, "bind");
Mark D. Rothf28763c2016-09-14 15:18:40 -0700430 call->pollent =
Craig Tillerca3451d2016-09-29 10:27:44 -0700431 grpc_polling_entity_create_from_pollset(grpc_cq_pollset(args->cq));
Mark D. Rothf28763c2016-09-14 15:18:40 -0700432 }
Craig Tillerca3451d2016-09-29 10:27:44 -0700433 if (args->pollset_set_alternative != NULL) {
434 call->pollent = grpc_polling_entity_create_from_pollset_set(
435 args->pollset_set_alternative);
Mark D. Rothf28763c2016-09-14 15:18:40 -0700436 }
437 if (!grpc_polling_entity_is_empty(&call->pollent)) {
438 grpc_call_stack_set_pollset_or_pollset_set(
Craig Tillera59c16c2016-10-31 07:25:01 -0700439 exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent);
Mark D. Rothf28763c2016-09-14 15:18:40 -0700440 }
441
Craig Tiller4eecdde2016-11-14 08:21:17 -0800442 grpc_slice_unref_internal(exec_ctx, path);
Mark D. Rothaa850a72016-09-26 13:38:02 -0700443
Craig Tiller0ba432d2015-10-09 16:57:11 -0700444 GPR_TIMER_END("grpc_call_create", 0);
Craig Tiller8e214652016-08-19 09:54:31 -0700445 return error;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800446}
447
Craig Tillera82950e2015-09-22 12:33:20 -0700448void grpc_call_set_completion_queue(grpc_exec_ctx *exec_ctx, grpc_call *call,
449 grpc_completion_queue *cq) {
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800450 GPR_ASSERT(cq);
David Garcia Quintasf72eb972016-05-03 18:28:09 -0700451
David Garcia Quintasc4d51122016-06-06 14:56:02 -0700452 if (grpc_polling_entity_pollset_set(&call->pollent) != NULL) {
David Garcia Quintasf72eb972016-05-03 18:28:09 -0700453 gpr_log(GPR_ERROR, "A pollset_set is already registered for this call.");
454 abort();
455 }
Craig Tiller166e2502015-02-03 20:14:41 -0800456 call->cq = cq;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800457 GRPC_CQ_INTERNAL_REF(cq, "bind");
David Garcia Quintasc4d51122016-06-06 14:56:02 -0700458 call->pollent = grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq));
David Garcia Quintas4afce7e2016-04-18 16:25:17 -0700459 grpc_call_stack_set_pollset_or_pollset_set(
David Garcia Quintas2a50dfe2016-05-31 15:09:12 -0700460 exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent);
Craig Tiller166e2502015-02-03 20:14:41 -0800461}
462
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800463#ifdef GRPC_STREAM_REFCOUNT_DEBUG
Craig Tiller7b435612015-11-24 08:15:05 -0800464#define REF_REASON reason
465#define REF_ARG , const char *reason
Craig Tiller4df412b2015-04-28 07:57:54 -0700466#else
Craig Tiller7b435612015-11-24 08:15:05 -0800467#define REF_REASON ""
468#define REF_ARG
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800469#endif
Craig Tiller7b435612015-11-24 08:15:05 -0800470void grpc_call_internal_ref(grpc_call *c REF_ARG) {
471 GRPC_CALL_STACK_REF(CALL_STACK_FROM_CALL(c), REF_REASON);
472}
473void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c REF_ARG) {
474 GRPC_CALL_STACK_UNREF(exec_ctx, CALL_STACK_FROM_CALL(c), REF_REASON);
475}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800476
Craig Tillere7a17022017-03-13 10:20:38 -0700477static void release_call(grpc_exec_ctx *exec_ctx, void *call,
478 grpc_error *error) {
479 grpc_call *c = call;
Craig Tiller51006fe2017-03-15 08:07:02 -0700480 grpc_channel *channel = c->channel;
Craig Tillerdbad3702017-03-15 08:21:19 -0700481 grpc_channel_update_call_size_estimate(channel, gpr_arena_destroy(c->arena));
Craig Tiller51006fe2017-03-15 08:07:02 -0700482 GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call");
Craig Tillere7a17022017-03-13 10:20:38 -0700483}
484
David Garcia Quintas01c4d992016-07-07 20:11:27 -0700485static void set_status_value_directly(grpc_status_code status, void *dest);
Craig Tillerc027e772016-05-03 16:27:00 -0700486static void destroy_call(grpc_exec_ctx *exec_ctx, void *call,
487 grpc_error *error) {
Craig Tiller566316f2015-02-02 15:25:32 -0800488 size_t i;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800489 int ii;
Craig Tilleraef25da2015-01-29 17:19:45 -0800490 grpc_call *c = call;
Craig Tiller0ba432d2015-10-09 16:57:11 -0700491 GPR_TIMER_BEGIN("destroy_call", 0);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800492 for (i = 0; i < 2; i++) {
493 grpc_metadata_batch_destroy(
Craig Tillera59c16c2016-10-31 07:25:01 -0700494 exec_ctx, &c->metadata_batch[1 /* is_receiving */][i /* is_initial */]);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800495 }
496 if (c->receiving_stream != NULL) {
Craig Tiller3b66ab92015-12-09 19:42:22 -0800497 grpc_byte_stream_destroy(exec_ctx, c->receiving_stream);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800498 }
Craig Tiller1c10a7b2017-03-29 14:35:16 -0700499 parent_call *pc = get_parent_call(c);
500 if (pc != NULL) {
501 gpr_mu_destroy(&pc->child_list_mu);
502 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800503 for (ii = 0; ii < c->send_extra_metadata_count; ii++) {
Craig Tillera59c16c2016-10-31 07:25:01 -0700504 GRPC_MDELEM_UNREF(exec_ctx, c->send_extra_metadata[ii].md);
Craig Tillera82950e2015-09-22 12:33:20 -0700505 }
506 for (i = 0; i < GRPC_CONTEXT_COUNT; i++) {
507 if (c->context[i].destroy) {
508 c->context[i].destroy(c->context[i].value);
Craig Tiller935cf422015-05-01 14:10:46 -0700509 }
Craig Tillera82950e2015-09-22 12:33:20 -0700510 }
Craig Tillera82950e2015-09-22 12:33:20 -0700511 if (c->cq) {
512 GRPC_CQ_INTERNAL_UNREF(c->cq, "bind");
513 }
David Garcia Quintas01c4d992016-07-07 20:11:27 -0700514
Craig Tiller841a99d2016-12-12 16:58:57 -0800515 get_final_status(call, set_status_value_directly, &c->final_info.final_status,
516 NULL);
Mark D. Roth3d883412016-11-07 13:42:54 -0800517 c->final_info.stats.latency =
518 gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time);
David Garcia Quintas01c4d992016-07-07 20:11:27 -0700519
Craig Tiller841a99d2016-12-12 16:58:57 -0800520 for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
Craig Tiller4bab9462017-02-22 08:56:02 -0800521 GRPC_ERROR_UNREF(
Craig Tillerb597dcf2017-03-09 07:02:11 -0800522 unpack_received_status(gpr_atm_acq_load(&c->status[i])).error);
Craig Tiller841a99d2016-12-12 16:58:57 -0800523 }
524
Craig Tillere7a17022017-03-13 10:20:38 -0700525 grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->final_info,
526 grpc_closure_init(&c->release_call, release_call, c,
527 grpc_schedule_on_exec_ctx));
Craig Tiller0ba432d2015-10-09 16:57:11 -0700528 GPR_TIMER_END("destroy_call", 0);
Craig Tillera4541102015-01-29 11:46:11 -0800529}
530
Craig Tiller841a99d2016-12-12 16:58:57 -0800531void grpc_call_destroy(grpc_call *c) {
Craig Tiller1c10a7b2017-03-29 14:35:16 -0700532 child_call *cc = c->child_call;
Craig Tiller841a99d2016-12-12 16:58:57 -0800533 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
Craig Tillerb8d3a312015-06-19 17:27:53 -0700534
Craig Tiller841a99d2016-12-12 16:58:57 -0800535 GPR_TIMER_BEGIN("grpc_call_destroy", 0);
536 GRPC_API_TRACE("grpc_call_destroy(c=%p)", 1, (c));
537
Craig Tiller9fd9a442017-04-05 16:52:34 -0700538 if (cc) {
539 parent_call *pc = get_parent_call(cc->parent);
Craig Tiller1c10a7b2017-03-29 14:35:16 -0700540 gpr_mu_lock(&pc->child_list_mu);
541 if (c == pc->first_child) {
542 pc->first_child = cc->sibling_next;
543 if (c == pc->first_child) {
544 pc->first_child = NULL;
Craig Tiller841a99d2016-12-12 16:58:57 -0800545 }
Craig Tiller841a99d2016-12-12 16:58:57 -0800546 }
Craig Tiller1c10a7b2017-03-29 14:35:16 -0700547 cc->sibling_prev->child_call->sibling_next = cc->sibling_next;
548 cc->sibling_next->child_call->sibling_prev = cc->sibling_prev;
549 gpr_mu_unlock(&pc->child_list_mu);
550 GRPC_CALL_INTERNAL_UNREF(&exec_ctx, cc->parent, "child");
Craig Tiller841a99d2016-12-12 16:58:57 -0800551 }
552
Craig Tiller841a99d2016-12-12 16:58:57 -0800553 GPR_ASSERT(!c->destroy_called);
554 c->destroy_called = 1;
Craig Tiller1c10a7b2017-03-29 14:35:16 -0700555 bool cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) != 0 &&
556 gpr_atm_acq_load(&c->received_final_op_atm) == 0;
Craig Tiller37cbc3f2017-02-16 14:54:55 -0800557 if (cancel) {
558 cancel_with_error(&exec_ctx, c, STATUS_FROM_API_OVERRIDE,
559 GRPC_ERROR_CANCELLED);
560 }
Craig Tiller841a99d2016-12-12 16:58:57 -0800561 GRPC_CALL_INTERNAL_UNREF(&exec_ctx, c, "destroy");
562 grpc_exec_ctx_finish(&exec_ctx);
563 GPR_TIMER_END("grpc_call_destroy", 0);
Craig Tillerf0f70a82016-06-23 13:55:06 -0700564}
Craig Tiller30547562015-02-05 17:04:51 -0800565
Craig Tiller841a99d2016-12-12 16:58:57 -0800566grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) {
567 GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved));
568 GPR_ASSERT(!reserved);
Craig Tiller37cbc3f2017-02-16 14:54:55 -0800569 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
570 cancel_with_error(&exec_ctx, call, STATUS_FROM_API_OVERRIDE,
571 GRPC_ERROR_CANCELLED);
572 grpc_exec_ctx_finish(&exec_ctx);
573 return GRPC_CALL_OK;
Craig Tiller841a99d2016-12-12 16:58:57 -0800574}
575
576static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
Craig Tillera0f3abd2017-03-31 15:42:16 -0700577 grpc_transport_stream_op_batch *op) {
Craig Tiller841a99d2016-12-12 16:58:57 -0800578 grpc_call_element *elem;
579
580 GPR_TIMER_BEGIN("execute_op", 0);
581 elem = CALL_ELEM_FROM_CALL(call, 0);
Craig Tillera0f3abd2017-03-31 15:42:16 -0700582 elem->filter->start_transport_stream_op_batch(exec_ctx, elem, op);
Craig Tiller841a99d2016-12-12 16:58:57 -0800583 GPR_TIMER_END("execute_op", 0);
584}
585
586char *grpc_call_get_peer(grpc_call *call) {
587 grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0);
588 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
589 char *result;
590 GRPC_API_TRACE("grpc_call_get_peer(%p)", 1, (call));
591 result = elem->filter->get_peer(&exec_ctx, elem);
592 if (result == NULL) {
593 result = grpc_channel_get_target(call->channel);
594 }
595 if (result == NULL) {
596 result = gpr_strdup("unknown");
597 }
598 grpc_exec_ctx_finish(&exec_ctx);
599 return result;
600}
601
602grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
603 return CALL_FROM_TOP_ELEM(elem);
604}
605
606/*******************************************************************************
607 * CANCELLATION
608 */
609
610grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
611 grpc_status_code status,
612 const char *description,
613 void *reserved) {
Craig Tiller841a99d2016-12-12 16:58:57 -0800614 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
615 GRPC_API_TRACE(
616 "grpc_call_cancel_with_status("
617 "c=%p, status=%d, description=%s, reserved=%p)",
618 4, (c, (int)status, description, reserved));
619 GPR_ASSERT(reserved == NULL);
Craig Tiller2dc32ea2017-01-31 15:32:34 -0800620 cancel_with_status(&exec_ctx, c, STATUS_FROM_API_OVERRIDE, status,
621 description);
Craig Tiller841a99d2016-12-12 16:58:57 -0800622 grpc_exec_ctx_finish(&exec_ctx);
Craig Tiller2dc32ea2017-01-31 15:32:34 -0800623 return GRPC_CALL_OK;
Craig Tiller841a99d2016-12-12 16:58:57 -0800624}
625
Craig Tillerc5b90df2017-03-10 16:11:08 -0800626static void done_termination(grpc_exec_ctx *exec_ctx, void *call,
Craig Tiller841a99d2016-12-12 16:58:57 -0800627 grpc_error *error) {
Craig Tillerc5b90df2017-03-10 16:11:08 -0800628 GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "termination");
Craig Tiller841a99d2016-12-12 16:58:57 -0800629}
630
Craig Tiller255edaa2016-12-13 09:04:55 -0800631static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c,
Craig Tiller2dc32ea2017-01-31 15:32:34 -0800632 status_source source, grpc_error *error) {
Craig Tillerc5b90df2017-03-10 16:11:08 -0800633 GRPC_CALL_INTERNAL_REF(c, "termination");
Craig Tiller2dc32ea2017-01-31 15:32:34 -0800634 set_status_from_error(exec_ctx, c, source, GRPC_ERROR_REF(error));
Craig Tillera0f3abd2017-03-31 15:42:16 -0700635 grpc_transport_stream_op_batch *op = grpc_make_transport_stream_op(
Craig Tillerc5b90df2017-03-10 16:11:08 -0800636 grpc_closure_create(done_termination, c, grpc_schedule_on_exec_ctx));
Craig Tiller22b182b2017-03-16 10:56:58 -0700637 op->cancel_stream = true;
638 op->payload->cancel_stream.cancel_error = error;
Craig Tillerc5b90df2017-03-10 16:11:08 -0800639 execute_op(exec_ctx, c, op);
Craig Tiller255edaa2016-12-13 09:04:55 -0800640}
641
Craig Tiller841a99d2016-12-12 16:58:57 -0800642static grpc_error *error_from_status(grpc_status_code status,
643 const char *description) {
644 return grpc_error_set_int(
ncteisen4b36a3d2017-03-13 19:08:06 -0700645 grpc_error_set_str(GRPC_ERROR_CREATE_FROM_COPIED_STRING(description),
646 GRPC_ERROR_STR_GRPC_MESSAGE,
647 grpc_slice_from_copied_string(description)),
Craig Tiller841a99d2016-12-12 16:58:57 -0800648 GRPC_ERROR_INT_GRPC_STATUS, status);
649}
650
Craig Tiller2dc32ea2017-01-31 15:32:34 -0800651static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
652 status_source source, grpc_status_code status,
653 const char *description) {
654 cancel_with_error(exec_ctx, c, source,
655 error_from_status(status, description));
Craig Tiller841a99d2016-12-12 16:58:57 -0800656}
657
658/*******************************************************************************
659 * FINAL STATUS CODE MANIPULATION
660 */
661
Craig Tiller58b30cd2017-01-31 17:07:36 -0800662static bool get_final_status_from(
Craig Tiller4bab9462017-02-22 08:56:02 -0800663 grpc_call *call, grpc_error *error, bool allow_ok_status,
Craig Tiller58b30cd2017-01-31 17:07:36 -0800664 void (*set_value)(grpc_status_code code, void *user_data),
665 void *set_value_user_data, grpc_slice *details) {
Craig Tiller737b6252017-01-09 15:25:15 -0800666 grpc_status_code code;
ncteiseneb2b1152017-03-28 15:27:27 -0700667 grpc_slice slice = grpc_empty_slice();
ncteisenbbb38012017-03-10 14:58:43 -0800668 grpc_error_get_status(error, call->send_deadline, &code, &slice, NULL);
Craig Tiller58b30cd2017-01-31 17:07:36 -0800669 if (code == GRPC_STATUS_OK && !allow_ok_status) {
670 return false;
671 }
Craig Tiller737b6252017-01-09 15:25:15 -0800672
673 set_value(code, set_value_user_data);
674 if (details != NULL) {
ncteisenbbb38012017-03-10 14:58:43 -0800675 *details = grpc_slice_ref_internal(slice);
Craig Tiller737b6252017-01-09 15:25:15 -0800676 }
Craig Tiller58b30cd2017-01-31 17:07:36 -0800677 return true;
Craig Tiller737b6252017-01-09 15:25:15 -0800678}
679
Craig Tiller841a99d2016-12-12 16:58:57 -0800680static void get_final_status(grpc_call *call,
681 void (*set_value)(grpc_status_code code,
682 void *user_data),
683 void *set_value_user_data, grpc_slice *details) {
684 int i;
Craig Tiller4bab9462017-02-22 08:56:02 -0800685 received_status status[STATUS_SOURCE_COUNT];
686 for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
687 status[i] = unpack_received_status(gpr_atm_acq_load(&call->status[i]));
688 }
Craig Tiller58b30cd2017-01-31 17:07:36 -0800689 if (grpc_call_error_trace) {
690 gpr_log(GPR_DEBUG, "get_final_status %s", call->is_client ? "CLI" : "SVR");
691 for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
Craig Tiller4bab9462017-02-22 08:56:02 -0800692 if (status[i].is_set) {
693 gpr_log(GPR_DEBUG, " %d: %s", i, grpc_error_string(status[i].error));
Craig Tiller58b30cd2017-01-31 17:07:36 -0800694 }
Craig Tiller841a99d2016-12-12 16:58:57 -0800695 }
696 }
Craig Tiller58b30cd2017-01-31 17:07:36 -0800697 /* first search through ignoring "OK" statuses: if something went wrong,
698 * ensure we report it */
699 for (int allow_ok_status = 0; allow_ok_status < 2; allow_ok_status++) {
700 /* search for the best status we can present: ideally the error we use has a
701 clearly defined grpc-status, and we'll prefer that. */
702 for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
Craig Tiller4bab9462017-02-22 08:56:02 -0800703 if (status[i].is_set &&
704 grpc_error_has_clear_grpc_status(status[i].error)) {
705 if (get_final_status_from(call, status[i].error, allow_ok_status != 0,
Craig Tiller58b30cd2017-01-31 17:07:36 -0800706 set_value, set_value_user_data, details)) {
707 return;
708 }
709 }
710 }
711 /* If no clearly defined status exists, search for 'anything' */
712 for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
Craig Tiller4bab9462017-02-22 08:56:02 -0800713 if (status[i].is_set) {
714 if (get_final_status_from(call, status[i].error, allow_ok_status != 0,
Craig Tiller58b30cd2017-01-31 17:07:36 -0800715 set_value, set_value_user_data, details)) {
716 return;
717 }
718 }
Craig Tiller737b6252017-01-09 15:25:15 -0800719 }
720 }
721 /* If nothing exists, set some default */
Craig Tiller841a99d2016-12-12 16:58:57 -0800722 if (call->is_client) {
723 set_value(GRPC_STATUS_UNKNOWN, set_value_user_data);
Craig Tillerbe1b9a72016-06-24 13:22:11 -0700724 } else {
Craig Tiller841a99d2016-12-12 16:58:57 -0800725 set_value(GRPC_STATUS_OK, set_value_user_data);
Craig Tillerf0f70a82016-06-23 13:55:06 -0700726 }
Craig Tillerf0f70a82016-06-23 13:55:06 -0700727}
728
Craig Tillera59c16c2016-10-31 07:25:01 -0700729static void set_status_from_error(grpc_exec_ctx *exec_ctx, grpc_call *call,
730 status_source source, grpc_error *error) {
Craig Tiller4bab9462017-02-22 08:56:02 -0800731 if (!gpr_atm_rel_cas(&call->status[source],
732 pack_received_status((received_status){
733 .is_set = false, .error = GRPC_ERROR_NONE}),
734 pack_received_status((received_status){
735 .is_set = true, .error = error}))) {
Craig Tiller841a99d2016-12-12 16:58:57 -0800736 GRPC_ERROR_UNREF(error);
Craig Tiller841a99d2016-12-12 16:58:57 -0800737 }
Craig Tiller68752722015-01-29 14:59:54 -0800738}
739
Craig Tiller841a99d2016-12-12 16:58:57 -0800740/*******************************************************************************
741 * COMPRESSION
742 */
743
David Garcia Quintasac094472016-05-18 20:25:57 -0700744static void set_incoming_compression_algorithm(
745 grpc_call *call, grpc_compression_algorithm algo) {
David Garcia Quintas303d3082016-05-05 18:25:34 -0700746 GPR_ASSERT(algo < GRPC_COMPRESS_ALGORITHMS_COUNT);
David Garcia Quintas749367f2016-05-17 19:15:24 -0700747 call->incoming_compression_algorithm = algo;
David Garcia Quintasdb94b272015-06-15 18:37:01 -0700748}
749
David Garcia Quintas0c331882015-10-08 14:51:54 -0700750grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm(
David Garcia Quintas64824be2015-10-06 19:45:36 -0700751 grpc_call *call) {
752 grpc_compression_algorithm algorithm;
David Garcia Quintas749367f2016-05-17 19:15:24 -0700753 algorithm = call->incoming_compression_algorithm;
David Garcia Quintas64824be2015-10-06 19:45:36 -0700754 return algorithm;
David Garcia Quintas7c0d9142015-07-23 04:58:20 -0700755}
756
David Garcia Quintasa301eaa2016-05-06 16:59:03 -0700757static grpc_compression_algorithm compression_algorithm_for_level_locked(
758 grpc_call *call, grpc_compression_level level) {
David Garcia Quintasac094472016-05-18 20:25:57 -0700759 return grpc_compression_algorithm_for_level(level,
760 call->encodings_accepted_by_peer);
David Garcia Quintasa301eaa2016-05-06 16:59:03 -0700761}
762
Craig Tiller7536af02015-12-22 13:49:30 -0800763uint32_t grpc_call_test_only_get_message_flags(grpc_call *call) {
764 uint32_t flags;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800765 flags = call->test_only_last_message_flags;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800766 return flags;
767}
768
Craig Tiller3ff27542015-10-09 15:39:44 -0700769static void destroy_encodings_accepted_by_peer(void *p) { return; }
770
Craig Tillera59c16c2016-10-31 07:25:01 -0700771static void set_encodings_accepted_by_peer(grpc_exec_ctx *exec_ctx,
Craig Tiller0160de92016-11-18 08:46:46 -0800772 grpc_call *call, grpc_mdelem mdel) {
David Garcia Quintasb8edf7e2015-07-08 20:18:57 -0700773 size_t i;
774 grpc_compression_algorithm algorithm;
Craig Tillerd41a4a72016-10-26 16:16:06 -0700775 grpc_slice_buffer accept_encoding_parts;
776 grpc_slice accept_encoding_slice;
Craig Tiller3ff27542015-10-09 15:39:44 -0700777 void *accepted_user_data;
David Garcia Quintasb8edf7e2015-07-08 20:18:57 -0700778
Craig Tiller3ff27542015-10-09 15:39:44 -0700779 accepted_user_data =
780 grpc_mdelem_get_user_data(mdel, destroy_encodings_accepted_by_peer);
781 if (accepted_user_data != NULL) {
782 call->encodings_accepted_by_peer =
Craig Tiller7536af02015-12-22 13:49:30 -0800783 (uint32_t)(((uintptr_t)accepted_user_data) - 1);
Craig Tiller3ff27542015-10-09 15:39:44 -0700784 return;
785 }
786
Craig Tiller0160de92016-11-18 08:46:46 -0800787 accept_encoding_slice = GRPC_MDVALUE(mdel);
Craig Tillerd41a4a72016-10-26 16:16:06 -0700788 grpc_slice_buffer_init(&accept_encoding_parts);
789 grpc_slice_split(accept_encoding_slice, ",", &accept_encoding_parts);
David Garcia Quintasb8edf7e2015-07-08 20:18:57 -0700790
David Garcia Quintase091af82015-07-15 21:37:02 -0700791 /* No need to zero call->encodings_accepted_by_peer: grpc_call_create already
792 * zeroes the whole grpc_call */
David Garcia Quintasb1866bd2015-07-08 22:37:01 -0700793 /* Always support no compression */
Craig Tillera82950e2015-09-22 12:33:20 -0700794 GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
795 for (i = 0; i < accept_encoding_parts.count; i++) {
Craig Tiller68208fe2016-11-14 14:35:02 -0800796 grpc_slice accept_encoding_entry_slice = accept_encoding_parts.slices[i];
797 if (grpc_compression_algorithm_parse(accept_encoding_entry_slice,
798 &algorithm)) {
Craig Tillera82950e2015-09-22 12:33:20 -0700799 GPR_BITSET(&call->encodings_accepted_by_peer, algorithm);
800 } else {
801 char *accept_encoding_entry_str =
Craig Tillerb4aa70e2016-12-09 09:40:11 -0800802 grpc_slice_to_c_string(accept_encoding_entry_slice);
Craig Tillera82950e2015-09-22 12:33:20 -0700803 gpr_log(GPR_ERROR,
804 "Invalid entry in accept encoding metadata: '%s'. Ignoring.",
805 accept_encoding_entry_str);
806 gpr_free(accept_encoding_entry_str);
David Garcia Quintasb8edf7e2015-07-08 20:18:57 -0700807 }
Craig Tillera82950e2015-09-22 12:33:20 -0700808 }
Craig Tiller3ff27542015-10-09 15:39:44 -0700809
Craig Tillera59c16c2016-10-31 07:25:01 -0700810 grpc_slice_buffer_destroy_internal(exec_ctx, &accept_encoding_parts);
Craig Tiller3ff27542015-10-09 15:39:44 -0700811
812 grpc_mdelem_set_user_data(
813 mdel, destroy_encodings_accepted_by_peer,
Craig Tiller7536af02015-12-22 13:49:30 -0800814 (void *)(((uintptr_t)call->encodings_accepted_by_peer) + 1));
David Garcia Quintasb8edf7e2015-07-08 20:18:57 -0700815}
816
Craig Tiller7536af02015-12-22 13:49:30 -0800817uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call *call) {
818 uint32_t encodings_accepted_by_peer;
David Garcia Quintas0c331882015-10-08 14:51:54 -0700819 encodings_accepted_by_peer = call->encodings_accepted_by_peer;
David Garcia Quintas0c331882015-10-08 14:51:54 -0700820 return encodings_accepted_by_peer;
Craig Tiller68752722015-01-29 14:59:54 -0800821}
822
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800823static grpc_linked_mdelem *linked_from_md(grpc_metadata *md) {
824 return (grpc_linked_mdelem *)&md->internal_data;
Craig Tillerc12fee62015-02-03 11:55:50 -0800825}
826
David Garcia Quintasa301eaa2016-05-06 16:59:03 -0700827static grpc_metadata *get_md_elem(grpc_metadata *metadata,
828 grpc_metadata *additional_metadata, int i,
829 int count) {
830 grpc_metadata *res =
831 i < count ? &metadata[i] : &additional_metadata[i - count];
832 GPR_ASSERT(res);
833 return res;
834}
835
Craig Tillera59c16c2016-10-31 07:25:01 -0700836static int prepare_application_metadata(
837 grpc_exec_ctx *exec_ctx, grpc_call *call, int count,
838 grpc_metadata *metadata, int is_trailing, int prepend_extra_metadata,
839 grpc_metadata *additional_metadata, int additional_metadata_count) {
David Garcia Quintasa301eaa2016-05-06 16:59:03 -0700840 int total_count = count + additional_metadata_count;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800841 int i;
842 grpc_metadata_batch *batch =
843 &call->metadata_batch[0 /* is_receiving */][is_trailing];
David Garcia Quintasa301eaa2016-05-06 16:59:03 -0700844 for (i = 0; i < total_count; i++) {
845 const grpc_metadata *md =
846 get_md_elem(metadata, additional_metadata, i, count);
Craig Tillerb42445c2016-04-22 13:11:44 -0700847 grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data;
848 GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data));
Craig Tillerf2b5b7e2017-01-10 08:28:59 -0800849 if (!GRPC_LOG_IF_ERROR("validate_metadata",
850 grpc_validate_header_key_is_legal(md->key))) {
Craig Tillerb42445c2016-04-22 13:11:44 -0700851 break;
Craig Tillerdf2d9222016-11-18 16:38:57 -0800852 } else if (!grpc_is_binary_header(md->key) &&
Craig Tillerf2b5b7e2017-01-10 08:28:59 -0800853 !GRPC_LOG_IF_ERROR(
854 "validate_metadata",
855 grpc_validate_header_nonbin_value_is_legal(md->value))) {
Craig Tillerb42445c2016-04-22 13:11:44 -0700856 break;
857 }
Craig Tiller1282a672016-11-18 14:57:53 -0800858 l->md = grpc_mdelem_from_grpc_metadata(exec_ctx, (grpc_metadata *)md);
Craig Tillerb42445c2016-04-22 13:11:44 -0700859 }
David Garcia Quintasa301eaa2016-05-06 16:59:03 -0700860 if (i != total_count) {
Craig Tiller5ae3ffb2016-11-18 14:58:32 -0800861 for (int j = 0; j < i; j++) {
David Garcia Quintasa301eaa2016-05-06 16:59:03 -0700862 const grpc_metadata *md =
863 get_md_elem(metadata, additional_metadata, j, count);
Craig Tillerb42445c2016-04-22 13:11:44 -0700864 grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data;
Craig Tillera59c16c2016-10-31 07:25:01 -0700865 GRPC_MDELEM_UNREF(exec_ctx, l->md);
Craig Tillerb42445c2016-04-22 13:11:44 -0700866 }
867 return 0;
868 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800869 if (prepend_extra_metadata) {
870 if (call->send_extra_metadata_count == 0) {
871 prepend_extra_metadata = 0;
Craig Tillera82950e2015-09-22 12:33:20 -0700872 } else {
Craig Tiller09608182016-11-22 15:43:56 -0800873 for (i = 0; i < call->send_extra_metadata_count; i++) {
874 GRPC_LOG_IF_ERROR("prepare_application_metadata",
875 grpc_metadata_batch_link_tail(
Craig Tiller9277aa72017-01-11 14:15:38 -0800876 exec_ctx, batch, &call->send_extra_metadata[i]));
Craig Tillera82950e2015-09-22 12:33:20 -0700877 }
Craig Tiller629b0ed2015-04-22 11:14:26 -0700878 }
Craig Tillera82950e2015-09-22 12:33:20 -0700879 }
Craig Tiller09608182016-11-22 15:43:56 -0800880 for (i = 0; i < total_count; i++) {
David Garcia Quintasa301eaa2016-05-06 16:59:03 -0700881 grpc_metadata *md = get_md_elem(metadata, additional_metadata, i, count);
Craig Tiller9277aa72017-01-11 14:15:38 -0800882 GRPC_LOG_IF_ERROR(
883 "prepare_application_metadata",
884 grpc_metadata_batch_link_tail(exec_ctx, batch, linked_from_md(md)));
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800885 }
Craig Tiller09608182016-11-22 15:43:56 -0800886 call->send_extra_metadata_count = 0;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800887
Craig Tillerb96d0012015-05-06 15:33:23 -0700888 return 1;
889}
890
Craig Tiller566316f2015-02-02 15:25:32 -0800891/* we offset status by a small amount when storing it into transport metadata
892 as metadata cannot store a 0 value (which is used as OK for grpc_status_codes
893 */
894#define STATUS_OFFSET 1
Craig Tillera82950e2015-09-22 12:33:20 -0700895static void destroy_status(void *ignored) {}
Craig Tiller566316f2015-02-02 15:25:32 -0800896
Craig Tiller0160de92016-11-18 08:46:46 -0800897static uint32_t decode_status(grpc_mdelem md) {
Craig Tiller7536af02015-12-22 13:49:30 -0800898 uint32_t status;
Craig Tillerebdef9d2015-11-19 17:09:49 -0800899 void *user_data;
Craig Tiller0160de92016-11-18 08:46:46 -0800900 if (grpc_mdelem_eq(md, GRPC_MDELEM_GRPC_STATUS_0)) return 0;
901 if (grpc_mdelem_eq(md, GRPC_MDELEM_GRPC_STATUS_1)) return 1;
902 if (grpc_mdelem_eq(md, GRPC_MDELEM_GRPC_STATUS_2)) return 2;
Craig Tillerebdef9d2015-11-19 17:09:49 -0800903 user_data = grpc_mdelem_get_user_data(md, destroy_status);
904 if (user_data != NULL) {
Craig Tiller7536af02015-12-22 13:49:30 -0800905 status = ((uint32_t)(intptr_t)user_data) - STATUS_OFFSET;
Craig Tillera82950e2015-09-22 12:33:20 -0700906 } else {
Craig Tiller0160de92016-11-18 08:46:46 -0800907 if (!grpc_parse_slice_to_uint32(GRPC_MDVALUE(md), &status)) {
Craig Tillera82950e2015-09-22 12:33:20 -0700908 status = GRPC_STATUS_UNKNOWN; /* could not parse status code */
Craig Tiller566316f2015-02-02 15:25:32 -0800909 }
Craig Tillera82950e2015-09-22 12:33:20 -0700910 grpc_mdelem_set_user_data(md, destroy_status,
Craig Tiller7536af02015-12-22 13:49:30 -0800911 (void *)(intptr_t)(status + STATUS_OFFSET));
Craig Tillera82950e2015-09-22 12:33:20 -0700912 }
Craig Tiller566316f2015-02-02 15:25:32 -0800913 return status;
914}
915
Craig Tiller0160de92016-11-18 08:46:46 -0800916static grpc_compression_algorithm decode_compression(grpc_mdelem md) {
Craig Tillerebdef9d2015-11-19 17:09:49 -0800917 grpc_compression_algorithm algorithm =
Craig Tiller0160de92016-11-18 08:46:46 -0800918 grpc_compression_algorithm_from_slice(GRPC_MDVALUE(md));
Craig Tillerebdef9d2015-11-19 17:09:49 -0800919 if (algorithm == GRPC_COMPRESS_ALGORITHMS_COUNT) {
Craig Tillerb4aa70e2016-12-09 09:40:11 -0800920 char *md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md));
David Garcia Quintas303d3082016-05-05 18:25:34 -0700921 gpr_log(GPR_ERROR,
922 "Invalid incoming compression algorithm: '%s'. Interpreting "
923 "incoming data as uncompressed.",
924 md_c_str);
Craig Tiller68208fe2016-11-14 14:35:02 -0800925 gpr_free(md_c_str);
David Garcia Quintas303d3082016-05-05 18:25:34 -0700926 return GRPC_COMPRESS_NONE;
Craig Tillera82950e2015-09-22 12:33:20 -0700927 }
David Garcia Quintasfc0fa332015-06-25 18:11:07 -0700928 return algorithm;
David Garcia Quintasdb94b272015-06-15 18:37:01 -0700929}
930
Craig Tillera7d37a32016-11-22 14:37:16 -0800931static void recv_common_filter(grpc_exec_ctx *exec_ctx, grpc_call *call,
932 grpc_metadata_batch *b) {
Craig Tillerde2508b2017-01-06 15:23:23 -0800933 if (b->idx.named.grpc_status != NULL) {
934 uint32_t status_code = decode_status(b->idx.named.grpc_status->md);
935 grpc_error *error =
936 status_code == GRPC_STATUS_OK
937 ? GRPC_ERROR_NONE
ncteisen4b36a3d2017-03-13 19:08:06 -0700938 : grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
939 "Error received from peer"),
Craig Tillercae37f32017-01-09 15:50:03 -0800940 GRPC_ERROR_INT_GRPC_STATUS,
941 (intptr_t)status_code);
Craig Tillera7d37a32016-11-22 14:37:16 -0800942
Craig Tillerde2508b2017-01-06 15:23:23 -0800943 if (b->idx.named.grpc_message != NULL) {
ncteisen4b36a3d2017-03-13 19:08:06 -0700944 error = grpc_error_set_str(
945 error, GRPC_ERROR_STR_GRPC_MESSAGE,
946 grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_message->md)));
Craig Tillerde2508b2017-01-06 15:23:23 -0800947 grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_message);
Craig Tiller8c58a482017-02-07 14:52:59 -0800948 } else if (error != GRPC_ERROR_NONE) {
ncteisen4b36a3d2017-03-13 19:08:06 -0700949 error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE,
950 grpc_empty_slice());
Craig Tillerde2508b2017-01-06 15:23:23 -0800951 }
952
953 set_status_from_error(exec_ctx, call, STATUS_FROM_WIRE, error);
954 grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_status);
Craig Tillera7d37a32016-11-22 14:37:16 -0800955 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800956}
957
Craig Tillera7d37a32016-11-22 14:37:16 -0800958static void publish_app_metadata(grpc_call *call, grpc_metadata_batch *b,
959 int is_trailing) {
Craig Tiller09608182016-11-22 15:43:56 -0800960 if (b->list.count == 0) return;
Craig Tillera7d37a32016-11-22 14:37:16 -0800961 GPR_TIMER_BEGIN("publish_app_metadata", 0);
Craig Tiller566316f2015-02-02 15:25:32 -0800962 grpc_metadata_array *dest;
963 grpc_metadata *mdusr;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800964 dest = call->buffered_metadata[is_trailing];
Craig Tillerb0f3bca2016-11-22 14:54:10 -0800965 if (dest->count + b->list.count > dest->capacity) {
966 dest->capacity =
967 GPR_MAX(dest->capacity + b->list.count, dest->capacity * 3 / 2);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800968 dest->metadata =
969 gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity);
970 }
Craig Tillera7d37a32016-11-22 14:37:16 -0800971 for (grpc_linked_mdelem *l = b->list.head; l != NULL; l = l->next) {
972 mdusr = &dest->metadata[dest->count++];
Craig Tillercf0a2022016-11-23 11:36:21 -0800973 /* we pass back borrowed slices that are valid whilst the call is valid */
974 mdusr->key = GRPC_MDKEY(l->md);
975 mdusr->value = GRPC_MDVALUE(l->md);
Craig Tillera7d37a32016-11-22 14:37:16 -0800976 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800977 GPR_TIMER_END("publish_app_metadata", 0);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800978}
Craig Tiller566316f2015-02-02 15:25:32 -0800979
Craig Tillera7d37a32016-11-22 14:37:16 -0800980static void recv_initial_filter(grpc_exec_ctx *exec_ctx, grpc_call *call,
981 grpc_metadata_batch *b) {
982 recv_common_filter(exec_ctx, call, b);
983
984 if (b->idx.named.grpc_encoding != NULL) {
David Garcia Quintas749367f2016-05-17 19:15:24 -0700985 GPR_TIMER_BEGIN("incoming_compression_algorithm", 0);
Craig Tillera7d37a32016-11-22 14:37:16 -0800986 set_incoming_compression_algorithm(
987 call, decode_compression(b->idx.named.grpc_encoding->md));
David Garcia Quintas749367f2016-05-17 19:15:24 -0700988 GPR_TIMER_END("incoming_compression_algorithm", 0);
Craig Tillerde7b4672016-11-23 11:13:46 -0800989 grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_encoding);
Craig Tillera82950e2015-09-22 12:33:20 -0700990 }
Craig Tillera7d37a32016-11-22 14:37:16 -0800991
992 if (b->idx.named.grpc_accept_encoding != NULL) {
993 GPR_TIMER_BEGIN("encodings_accepted_by_peer", 0);
994 set_encodings_accepted_by_peer(exec_ctx, call,
995 b->idx.named.grpc_accept_encoding->md);
Craig Tillerde7b4672016-11-23 11:13:46 -0800996 grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_accept_encoding);
Craig Tillera7d37a32016-11-22 14:37:16 -0800997 GPR_TIMER_END("encodings_accepted_by_peer", 0);
998 }
999
1000 publish_app_metadata(call, b, false);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001001}
Craig Tiller6902ad22015-04-16 08:01:49 -07001002
Craig Tillera7d37a32016-11-22 14:37:16 -08001003static void recv_trailing_filter(grpc_exec_ctx *exec_ctx, void *args,
1004 grpc_metadata_batch *b) {
Craig Tillerc5866662016-11-16 15:25:00 -08001005 grpc_call *call = args;
Craig Tillera7d37a32016-11-22 14:37:16 -08001006 recv_common_filter(exec_ctx, call, b);
1007 publish_app_metadata(call, b, true);
Craig Tiller629b0ed2015-04-22 11:14:26 -07001008}
Craig Tiller8b282cb2015-04-17 14:57:44 -07001009
Craig Tillera82950e2015-09-22 12:33:20 -07001010grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) {
1011 return CALL_STACK_FROM_CALL(call);
Craig Tiller566316f2015-02-02 15:25:32 -08001012}
1013
Craig Tiller255edaa2016-12-13 09:04:55 -08001014/*******************************************************************************
Craig Tillerfb189f82015-02-03 12:07:07 -08001015 * BATCH API IMPLEMENTATION
1016 */
1017
Craig Tillera82950e2015-09-22 12:33:20 -07001018static void set_status_value_directly(grpc_status_code status, void *dest) {
1019 *(grpc_status_code *)dest = status;
Craig Tillerfb189f82015-02-03 12:07:07 -08001020}
1021
Craig Tillera82950e2015-09-22 12:33:20 -07001022static void set_cancelled_value(grpc_status_code status, void *dest) {
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001023 *(int *)dest = (status != GRPC_STATUS_OK);
Craig Tiller166e2502015-02-03 20:14:41 -08001024}
Craig Tillerfb189f82015-02-03 12:07:07 -08001025
Craig Tillerc6549762016-03-09 17:10:43 -08001026static bool are_write_flags_valid(uint32_t flags) {
David Garcia Quintas1d5aca52015-06-14 14:42:04 -07001027 /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
Craig Tiller7536af02015-12-22 13:49:30 -08001028 const uint32_t allowed_write_positions =
Craig Tillera82950e2015-09-22 12:33:20 -07001029 (GRPC_WRITE_USED_MASK | GRPC_WRITE_INTERNAL_USED_MASK);
Craig Tiller7536af02015-12-22 13:49:30 -08001030 const uint32_t invalid_positions = ~allowed_write_positions;
David Garcia Quintas1d5aca52015-06-14 14:42:04 -07001031 return !(flags & invalid_positions);
1032}
1033
Craig Tillerc6549762016-03-09 17:10:43 -08001034static bool are_initial_metadata_flags_valid(uint32_t flags, bool is_client) {
1035 /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
1036 uint32_t invalid_positions = ~GRPC_INITIAL_METADATA_USED_MASK;
1037 if (!is_client) {
1038 invalid_positions |= GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
1039 }
1040 return !(flags & invalid_positions);
1041}
1042
Craig Tiller2a11ad12017-02-08 17:09:02 -08001043static int batch_slot_for_op(grpc_op_type type) {
1044 switch (type) {
1045 case GRPC_OP_SEND_INITIAL_METADATA:
1046 return 0;
1047 case GRPC_OP_SEND_MESSAGE:
1048 return 1;
1049 case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
1050 case GRPC_OP_SEND_STATUS_FROM_SERVER:
1051 return 2;
1052 case GRPC_OP_RECV_INITIAL_METADATA:
1053 return 3;
1054 case GRPC_OP_RECV_MESSAGE:
1055 return 4;
1056 case GRPC_OP_RECV_CLOSE_ON_SERVER:
1057 case GRPC_OP_RECV_STATUS_ON_CLIENT:
1058 return 5;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001059 }
Craig Tillerc869da02017-02-08 17:11:17 -08001060 GPR_UNREACHABLE_CODE(return 123456789);
Craig Tiller2a11ad12017-02-08 17:09:02 -08001061}
Craig Tiller89d33792017-02-08 16:39:16 -08001062
1063static batch_control *allocate_batch_control(grpc_call *call,
1064 const grpc_op *ops,
1065 size_t num_ops) {
1066 int slot = batch_slot_for_op(ops[0].op);
Craig Tillerb58de722017-03-29 14:15:12 -07001067 batch_control **pslot = &call->active_batches[slot];
1068 if (*pslot == NULL) {
1069 *pslot = gpr_arena_alloc(call->arena, sizeof(batch_control));
1070 }
1071 batch_control *bctl = *pslot;
Craig Tiller5e5ef302017-02-09 08:46:49 -08001072 if (bctl->call != NULL) {
1073 return NULL;
1074 }
1075 memset(bctl, 0, sizeof(*bctl));
1076 bctl->call = call;
Craig Tiller3fddcb42017-03-10 11:01:48 -08001077 bctl->op.payload = &call->stream_op_payload;
Craig Tiller5e5ef302017-02-09 08:46:49 -08001078 return bctl;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001079}
1080
1081static void finish_batch_completion(grpc_exec_ctx *exec_ctx, void *user_data,
1082 grpc_cq_completion *storage) {
1083 batch_control *bctl = user_data;
1084 grpc_call *call = bctl->call;
Craig Tiller5e5ef302017-02-09 08:46:49 -08001085 bctl->call = NULL;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001086 GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion");
1087}
1088
Craig Tiller94903892016-10-11 15:43:35 -07001089static grpc_error *consolidate_batch_errors(batch_control *bctl) {
Craig Tillerb597dcf2017-03-09 07:02:11 -08001090 size_t n = (size_t)gpr_atm_acq_load(&bctl->num_errors);
Craig Tiller94903892016-10-11 15:43:35 -07001091 if (n == 0) {
1092 return GRPC_ERROR_NONE;
1093 } else if (n == 1) {
Craig Tillera78da602017-01-27 08:16:23 -08001094 /* Skip creating a composite error in the case that only one error was
1095 logged */
Craig Tillerad980e32017-01-23 07:46:25 -08001096 grpc_error *e = bctl->errors[0];
1097 bctl->errors[0] = NULL;
1098 return e;
Craig Tiller94903892016-10-11 15:43:35 -07001099 } else {
ncteisen4b36a3d2017-03-13 19:08:06 -07001100 grpc_error *error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1101 "Call batch failed", bctl->errors, n);
Craig Tiller1c4775c2017-01-06 16:07:45 -08001102 for (size_t i = 0; i < n; i++) {
1103 GRPC_ERROR_UNREF(bctl->errors[i]);
Craig Tillerad980e32017-01-23 07:46:25 -08001104 bctl->errors[i] = NULL;
Craig Tiller1c4775c2017-01-06 16:07:45 -08001105 }
1106 return error;
Craig Tiller94903892016-10-11 15:43:35 -07001107 }
1108}
1109
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001110static void post_batch_completion(grpc_exec_ctx *exec_ctx,
1111 batch_control *bctl) {
Craig Tiller94903892016-10-11 15:43:35 -07001112 grpc_call *next_child_call;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001113 grpc_call *call = bctl->call;
Craig Tiller94903892016-10-11 15:43:35 -07001114 grpc_error *error = consolidate_batch_errors(bctl);
1115
Craig Tillerea54b8c2017-03-01 16:58:28 -08001116 if (bctl->op.send_initial_metadata) {
Craig Tiller94903892016-10-11 15:43:35 -07001117 grpc_metadata_batch_destroy(
1118 exec_ctx,
1119 &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
1120 }
Craig Tillerea54b8c2017-03-01 16:58:28 -08001121 if (bctl->op.send_message) {
Craig Tillerf927ad12017-01-06 15:27:31 -08001122 call->sending_message = false;
1123 }
Craig Tillerea54b8c2017-03-01 16:58:28 -08001124 if (bctl->op.send_trailing_metadata) {
Craig Tiller94903892016-10-11 15:43:35 -07001125 grpc_metadata_batch_destroy(
1126 exec_ctx,
1127 &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]);
1128 }
Craig Tillerea54b8c2017-03-01 16:58:28 -08001129 if (bctl->op.recv_trailing_metadata) {
Craig Tiller94903892016-10-11 15:43:35 -07001130 grpc_metadata_batch *md =
1131 &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
1132 recv_trailing_filter(exec_ctx, call, md);
1133
Craig Tiller94903892016-10-11 15:43:35 -07001134 /* propagate cancellation to any interested children */
Craig Tillerb18c8ba2017-03-13 15:51:37 -07001135 gpr_atm_rel_store(&call->received_final_op_atm, 1);
Craig Tiller1c10a7b2017-03-29 14:35:16 -07001136 parent_call *pc = get_parent_call(call);
1137 if (pc != NULL) {
1138 grpc_call *child;
1139 gpr_mu_lock(&pc->child_list_mu);
1140 child = pc->first_child;
1141 if (child != NULL) {
1142 do {
1143 next_child_call = child->child_call->sibling_next;
1144 if (child->cancellation_is_inherited) {
1145 GRPC_CALL_INTERNAL_REF(child, "propagate_cancel");
1146 cancel_with_error(exec_ctx, child, STATUS_FROM_API_OVERRIDE,
1147 GRPC_ERROR_CANCELLED);
1148 GRPC_CALL_INTERNAL_UNREF(exec_ctx, child, "propagate_cancel");
1149 }
1150 child = next_child_call;
1151 } while (child != pc->first_child);
1152 }
1153 gpr_mu_unlock(&pc->child_list_mu);
Craig Tiller94903892016-10-11 15:43:35 -07001154 }
1155
1156 if (call->is_client) {
1157 get_final_status(call, set_status_value_directly,
Craig Tiller841a99d2016-12-12 16:58:57 -08001158 call->final_op.client.status,
1159 call->final_op.client.status_details);
Craig Tiller94903892016-10-11 15:43:35 -07001160 } else {
1161 get_final_status(call, set_cancelled_value,
Craig Tiller841a99d2016-12-12 16:58:57 -08001162 call->final_op.server.cancelled, NULL);
Craig Tiller94903892016-10-11 15:43:35 -07001163 }
1164
Craig Tiller02b87cd2016-09-02 09:50:08 -07001165 GRPC_ERROR_UNREF(error);
1166 error = GRPC_ERROR_NONE;
1167 }
Craig Tiller94903892016-10-11 15:43:35 -07001168
Craig Tillerea54b8c2017-03-01 16:58:28 -08001169 if (bctl->completion_data.notify_tag.is_closure) {
Craig Tillerb08fa492016-05-10 14:56:05 -07001170 /* unrefs bctl->error */
Craig Tiller2db5bda2017-02-09 10:30:55 -08001171 bctl->call = NULL;
Craig Tillerea54b8c2017-03-01 16:58:28 -08001172 grpc_closure_run(exec_ctx, bctl->completion_data.notify_tag.tag, error);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001173 GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion");
1174 } else {
Craig Tillerb08fa492016-05-10 14:56:05 -07001175 /* unrefs bctl->error */
Craig Tiller9c1ec542017-03-02 08:42:54 -08001176 grpc_cq_end_op(
1177 exec_ctx, bctl->call->cq, bctl->completion_data.notify_tag.tag, error,
1178 finish_batch_completion, bctl, &bctl->completion_data.cq_completion);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001179 }
1180}
1181
Craig Tillerd04dc4d2017-01-09 14:40:28 -08001182static void finish_batch_step(grpc_exec_ctx *exec_ctx, batch_control *bctl) {
Craig Tiller065b1392017-01-09 14:05:07 -08001183 if (gpr_unref(&bctl->steps_to_complete)) {
1184 post_batch_completion(exec_ctx, bctl);
1185 }
1186}
1187
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001188static void continue_receiving_slices(grpc_exec_ctx *exec_ctx,
1189 batch_control *bctl) {
1190 grpc_call *call = bctl->call;
1191 for (;;) {
1192 size_t remaining = call->receiving_stream->length -
1193 (*call->receiving_buffer)->data.raw.slice_buffer.length;
1194 if (remaining == 0) {
1195 call->receiving_message = 0;
Craig Tiller3b66ab92015-12-09 19:42:22 -08001196 grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001197 call->receiving_stream = NULL;
Craig Tillerd04dc4d2017-01-09 14:40:28 -08001198 finish_batch_step(exec_ctx, bctl);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001199 return;
1200 }
1201 if (grpc_byte_stream_next(exec_ctx, call->receiving_stream,
1202 &call->receiving_slice, remaining,
1203 &call->receiving_slice_ready)) {
Craig Tillerd41a4a72016-10-26 16:16:06 -07001204 grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
Craig Tiller0f310802016-10-26 16:25:56 -07001205 call->receiving_slice);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001206 } else {
1207 return;
1208 }
1209 }
1210}
1211
1212static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
Craig Tillerc027e772016-05-03 16:27:00 -07001213 grpc_error *error) {
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001214 batch_control *bctl = bctlp;
1215 grpc_call *call = bctl->call;
1216
Craig Tillerc027e772016-05-03 16:27:00 -07001217 if (error == GRPC_ERROR_NONE) {
Craig Tillerd41a4a72016-10-26 16:16:06 -07001218 grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
Craig Tiller0f310802016-10-26 16:25:56 -07001219 call->receiving_slice);
Craig Tiller38edec62015-12-14 15:01:29 -08001220 continue_receiving_slices(exec_ctx, bctl);
1221 } else {
Craig Tillera286b042016-06-13 15:20:39 +00001222 if (grpc_trace_operation_failures) {
1223 GRPC_LOG_IF_ERROR("receiving_slice_ready", GRPC_ERROR_REF(error));
1224 }
Craig Tillere1b8c2b2015-12-16 19:27:52 -08001225 grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
Craig Tiller38edec62015-12-14 15:01:29 -08001226 call->receiving_stream = NULL;
1227 grpc_byte_buffer_destroy(*call->receiving_buffer);
1228 *call->receiving_buffer = NULL;
Craig Tillerd04dc4d2017-01-09 14:40:28 -08001229 finish_batch_step(exec_ctx, bctl);
Craig Tiller38edec62015-12-14 15:01:29 -08001230 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001231}
1232
Mark D. Roth274c8ed2016-10-04 09:21:42 -07001233static void process_data_after_md(grpc_exec_ctx *exec_ctx,
1234 batch_control *bctl) {
Craig Tillera44cbfc2016-02-03 16:02:49 -08001235 grpc_call *call = bctl->call;
1236 if (call->receiving_stream == NULL) {
1237 *call->receiving_buffer = NULL;
1238 call->receiving_message = 0;
Craig Tillerd04dc4d2017-01-09 14:40:28 -08001239 finish_batch_step(exec_ctx, bctl);
Craig Tillera44cbfc2016-02-03 16:02:49 -08001240 } else {
1241 call->test_only_last_message_flags = call->receiving_stream->flags;
1242 if ((call->receiving_stream->flags & GRPC_WRITE_INTERNAL_COMPRESS) &&
David Garcia Quintas749367f2016-05-17 19:15:24 -07001243 (call->incoming_compression_algorithm > GRPC_COMPRESS_NONE)) {
Craig Tillera44cbfc2016-02-03 16:02:49 -08001244 *call->receiving_buffer = grpc_raw_compressed_byte_buffer_create(
David Garcia Quintas749367f2016-05-17 19:15:24 -07001245 NULL, 0, call->incoming_compression_algorithm);
Craig Tillera44cbfc2016-02-03 16:02:49 -08001246 } else {
1247 *call->receiving_buffer = grpc_raw_byte_buffer_create(NULL, 0);
1248 }
Craig Tiller91031da2016-12-28 15:44:25 -08001249 grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready, bctl,
1250 grpc_schedule_on_exec_ctx);
Craig Tillera44cbfc2016-02-03 16:02:49 -08001251 continue_receiving_slices(exec_ctx, bctl);
Craig Tillera44cbfc2016-02-03 16:02:49 -08001252 }
1253}
1254
1255static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
Craig Tillerc027e772016-05-03 16:27:00 -07001256 grpc_error *error) {
Craig Tillera44cbfc2016-02-03 16:02:49 -08001257 batch_control *bctl = bctlp;
1258 grpc_call *call = bctl->call;
Mark D. Roth274c8ed2016-10-04 09:21:42 -07001259 if (error != GRPC_ERROR_NONE) {
yang-g23f777d2017-02-22 23:32:26 -08001260 if (call->receiving_stream != NULL) {
1261 grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
1262 call->receiving_stream = NULL;
1263 }
1264 add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), true);
Craig Tiller58b30cd2017-01-31 17:07:36 -08001265 cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE,
1266 GRPC_ERROR_REF(error));
Mark D. Roth274c8ed2016-10-04 09:21:42 -07001267 }
Craig Tiller065b1392017-01-09 14:05:07 -08001268 if (call->has_initial_md_been_received || error != GRPC_ERROR_NONE ||
Craig Tiller52cf8712016-04-23 22:54:21 -07001269 call->receiving_stream == NULL) {
Mark D. Roth274c8ed2016-10-04 09:21:42 -07001270 process_data_after_md(exec_ctx, bctlp);
Craig Tillera44cbfc2016-02-03 16:02:49 -08001271 } else {
Craig Tiller8a677802016-04-22 15:07:53 -07001272 call->saved_receiving_stream_ready_bctlp = bctlp;
Craig Tillera44cbfc2016-02-03 16:02:49 -08001273 }
1274}
1275
David Garcia Quintas3e71f772016-05-18 10:14:32 -07001276static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx,
1277 batch_control *bctl) {
1278 grpc_call *call = bctl->call;
David Garcia Quintasac094472016-05-18 20:25:57 -07001279 /* validate call->incoming_compression_algorithm */
1280 if (call->incoming_compression_algorithm != GRPC_COMPRESS_NONE) {
1281 const grpc_compression_algorithm algo =
1282 call->incoming_compression_algorithm;
David Garcia Quintas3e71f772016-05-18 10:14:32 -07001283 char *error_msg = NULL;
1284 const grpc_compression_options compression_options =
David Garcia Quintasac094472016-05-18 20:25:57 -07001285 grpc_channel_compression_options(call->channel);
David Garcia Quintas3e71f772016-05-18 10:14:32 -07001286 /* check if algorithm is known */
1287 if (algo >= GRPC_COMPRESS_ALGORITHMS_COUNT) {
1288 gpr_asprintf(&error_msg, "Invalid compression algorithm value '%d'.",
1289 algo);
Yuchen Zeng64c0e8d2016-06-10 11:19:51 -07001290 gpr_log(GPR_ERROR, "%s", error_msg);
Craig Tiller2dc32ea2017-01-31 15:32:34 -08001291 cancel_with_status(exec_ctx, call, STATUS_FROM_SURFACE,
1292 GRPC_STATUS_UNIMPLEMENTED, error_msg);
David Garcia Quintas3e71f772016-05-18 10:14:32 -07001293 } else if (grpc_compression_options_is_algorithm_enabled(
1294 &compression_options, algo) == 0) {
1295 /* check if algorithm is supported by current channel config */
David Garcia Quintas1ff168a2016-06-30 10:25:04 -07001296 char *algo_name = NULL;
David Garcia Quintas3e71f772016-05-18 10:14:32 -07001297 grpc_compression_algorithm_name(algo, &algo_name);
1298 gpr_asprintf(&error_msg, "Compression algorithm '%s' is disabled.",
1299 algo_name);
Yuchen Zeng64c0e8d2016-06-10 11:19:51 -07001300 gpr_log(GPR_ERROR, "%s", error_msg);
Craig Tiller2dc32ea2017-01-31 15:32:34 -08001301 cancel_with_status(exec_ctx, call, STATUS_FROM_SURFACE,
1302 GRPC_STATUS_UNIMPLEMENTED, error_msg);
David Garcia Quintas3e71f772016-05-18 10:14:32 -07001303 } else {
David Garcia Quintasac094472016-05-18 20:25:57 -07001304 call->incoming_compression_algorithm = algo;
David Garcia Quintas3e71f772016-05-18 10:14:32 -07001305 }
1306 gpr_free(error_msg);
1307 }
David Garcia Quintasf1945f22016-05-18 10:53:14 -07001308
1309 /* make sure the received grpc-encoding is amongst the ones listed in
1310 * grpc-accept-encoding */
1311 GPR_ASSERT(call->encodings_accepted_by_peer != 0);
1312 if (!GPR_BITGET(call->encodings_accepted_by_peer,
David Garcia Quintasac094472016-05-18 20:25:57 -07001313 call->incoming_compression_algorithm)) {
David Garcia Quintasf1945f22016-05-18 10:53:14 -07001314 extern int grpc_compression_trace;
1315 if (grpc_compression_trace) {
David Garcia Quintas1ff168a2016-06-30 10:25:04 -07001316 char *algo_name = NULL;
David Garcia Quintasac094472016-05-18 20:25:57 -07001317 grpc_compression_algorithm_name(call->incoming_compression_algorithm,
1318 &algo_name);
David Garcia Quintasf1945f22016-05-18 10:53:14 -07001319 gpr_log(GPR_ERROR,
1320 "Compression algorithm (grpc-encoding = '%s') not present in "
1321 "the bitset of accepted encodings (grpc-accept-encodings: "
1322 "'0x%x')",
1323 algo_name, call->encodings_accepted_by_peer);
1324 }
1325 }
David Garcia Quintas3e71f772016-05-18 10:14:32 -07001326}
1327
Craig Tiller3ba16e42016-12-08 16:46:18 -08001328static void add_batch_error(grpc_exec_ctx *exec_ctx, batch_control *bctl,
yang-g23f777d2017-02-22 23:32:26 -08001329 grpc_error *error, bool has_cancelled) {
Craig Tiller452422e2016-09-01 15:54:56 -07001330 if (error == GRPC_ERROR_NONE) return;
Craig Tillerb597dcf2017-03-09 07:02:11 -08001331 int idx = (int)gpr_atm_full_fetch_add(&bctl->num_errors, 1);
yang-g23f777d2017-02-22 23:32:26 -08001332 if (idx == 0 && !has_cancelled) {
Craig Tiller2dc32ea2017-01-31 15:32:34 -08001333 cancel_with_error(exec_ctx, bctl->call, STATUS_FROM_CORE,
1334 GRPC_ERROR_REF(error));
1335 }
Craig Tiller94903892016-10-11 15:43:35 -07001336 bctl->errors[idx] = error;
Craig Tiller452422e2016-09-01 15:54:56 -07001337}
1338
Craig Tillera44cbfc2016-02-03 16:02:49 -08001339static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
Craig Tillerc027e772016-05-03 16:27:00 -07001340 void *bctlp, grpc_error *error) {
Craig Tillera44cbfc2016-02-03 16:02:49 -08001341 batch_control *bctl = bctlp;
1342 grpc_call *call = bctl->call;
1343
yang-g23f777d2017-02-22 23:32:26 -08001344 add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false);
Craig Tiller452422e2016-09-01 15:54:56 -07001345 if (error == GRPC_ERROR_NONE) {
Craig Tillerc48ca712016-04-04 13:42:04 -07001346 grpc_metadata_batch *md =
1347 &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
Craig Tillera7d37a32016-11-22 14:37:16 -08001348 recv_initial_filter(exec_ctx, call, md);
Craig Tillera44cbfc2016-02-03 16:02:49 -08001349
Craig Tillera7d37a32016-11-22 14:37:16 -08001350 /* TODO(ctiller): this could be moved into recv_initial_filter now */
David Garcia Quintas3e71f772016-05-18 10:14:32 -07001351 GPR_TIMER_BEGIN("validate_filtered_metadata", 0);
1352 validate_filtered_metadata(exec_ctx, bctl);
1353 GPR_TIMER_END("validate_filtered_metadata", 0);
David Garcia Quintas46123372016-05-09 15:28:42 -07001354
Craig Tillerc48ca712016-04-04 13:42:04 -07001355 if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) !=
1356 0 &&
1357 !call->is_client) {
Mark D. Rothf28763c2016-09-14 15:18:40 -07001358 call->send_deadline =
1359 gpr_convert_clock_type(md->deadline, GPR_CLOCK_MONOTONIC);
Craig Tillerc48ca712016-04-04 13:42:04 -07001360 }
Craig Tillera44cbfc2016-02-03 16:02:49 -08001361 }
1362
Craig Tillerc48ca712016-04-04 13:42:04 -07001363 call->has_initial_md_been_received = true;
Craig Tiller8a677802016-04-22 15:07:53 -07001364 if (call->saved_receiving_stream_ready_bctlp != NULL) {
Craig Tillera44cbfc2016-02-03 16:02:49 -08001365 grpc_closure *saved_rsr_closure = grpc_closure_create(
Craig Tiller91031da2016-12-28 15:44:25 -08001366 receiving_stream_ready, call->saved_receiving_stream_ready_bctlp,
1367 grpc_schedule_on_exec_ctx);
Craig Tiller8a677802016-04-22 15:07:53 -07001368 call->saved_receiving_stream_ready_bctlp = NULL;
Craig Tillere0341792017-03-08 15:59:16 -08001369 grpc_closure_run(exec_ctx, saved_rsr_closure, GRPC_ERROR_REF(error));
Craig Tillera44cbfc2016-02-03 16:02:49 -08001370 }
1371
Craig Tillerd04dc4d2017-01-09 14:40:28 -08001372 finish_batch_step(exec_ctx, bctl);
Craig Tillera44cbfc2016-02-03 16:02:49 -08001373}
1374
Craig Tillerc027e772016-05-03 16:27:00 -07001375static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp,
1376 grpc_error *error) {
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001377 batch_control *bctl = bctlp;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001378
yang-g23f777d2017-02-22 23:32:26 -08001379 add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false);
Craig Tillerd04dc4d2017-01-09 14:40:28 -08001380 finish_batch_step(exec_ctx, bctl);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001381}
1382
Craig Tiller89d33792017-02-08 16:39:16 -08001383static void free_no_op_completion(grpc_exec_ctx *exec_ctx, void *p,
1384 grpc_cq_completion *completion) {
1385 gpr_free(completion);
1386}
1387
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001388static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
1389 grpc_call *call, const grpc_op *ops,
1390 size_t nops, void *notify_tag,
1391 int is_notify_tag_closure) {
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001392 size_t i;
Craig Tillerfb189f82015-02-03 12:07:07 -08001393 const grpc_op *op;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001394 batch_control *bctl;
1395 int num_completion_callbacks_needed = 1;
1396 grpc_call_error error = GRPC_CALL_OK;
Craig Tiller9928d392015-08-18 09:40:24 -07001397
Vitaly Bukae60003d2016-08-01 19:34:51 -07001398 // sent_initial_metadata guards against variable reuse.
1399 grpc_metadata compression_md;
1400
Craig Tiller0ba432d2015-10-09 16:57:11 -07001401 GPR_TIMER_BEGIN("grpc_call_start_batch", 0);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001402 GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag);
Masood Malekghassemi76c3d742015-08-19 18:22:53 -07001403
Craig Tillera82950e2015-09-22 12:33:20 -07001404 if (nops == 0) {
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001405 if (!is_notify_tag_closure) {
Craig Tiller4bf29282015-12-14 11:25:48 -08001406 grpc_cq_begin_op(call->cq, notify_tag);
Craig Tiller89d33792017-02-08 16:39:16 -08001407 grpc_cq_end_op(exec_ctx, call->cq, notify_tag, GRPC_ERROR_NONE,
1408 free_no_op_completion, NULL,
1409 gpr_malloc(sizeof(grpc_cq_completion)));
Craig Tiller2db5bda2017-02-09 10:30:55 -08001410 } else {
1411 grpc_closure_sched(exec_ctx, notify_tag, GRPC_ERROR_NONE);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001412 }
Craig Tillerea50b902015-12-15 07:05:25 -08001413 error = GRPC_CALL_OK;
1414 goto done;
Craig Tillera82950e2015-09-22 12:33:20 -07001415 }
1416
Craig Tiller89d33792017-02-08 16:39:16 -08001417 bctl = allocate_batch_control(call, ops, nops);
Craig Tiller5e5ef302017-02-09 08:46:49 -08001418 if (bctl == NULL) {
1419 return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1420 }
Craig Tillerea54b8c2017-03-01 16:58:28 -08001421 bctl->completion_data.notify_tag.tag = notify_tag;
Craig Tiller9c1ec542017-03-02 08:42:54 -08001422 bctl->completion_data.notify_tag.is_closure =
1423 (uint8_t)(is_notify_tag_closure != 0);
Craig Tillera82950e2015-09-22 12:33:20 -07001424
Craig Tillera0f3abd2017-03-31 15:42:16 -07001425 grpc_transport_stream_op_batch *stream_op = &bctl->op;
1426 grpc_transport_stream_op_batch_payload *stream_op_payload =
Craig Tiller9c1ec542017-03-02 08:42:54 -08001427 &call->stream_op_payload;
Craig Tillera82950e2015-09-22 12:33:20 -07001428 stream_op->covered_by_poller = true;
1429
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001430 /* rewrite batch ops into a transport op */
1431 for (i = 0; i < nops; i++) {
1432 op = &ops[i];
Craig Tillera82950e2015-09-22 12:33:20 -07001433 if (op->reserved != NULL) {
Craig Tiller3ffd8222015-09-21 08:21:57 -07001434 error = GRPC_CALL_ERROR;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001435 goto done_with_error;
Craig Tiller3ffd8222015-09-21 08:21:57 -07001436 }
Craig Tillera82950e2015-09-22 12:33:20 -07001437 switch (op->op) {
1438 case GRPC_OP_SEND_INITIAL_METADATA:
1439 /* Flag validation: currently allow no flags */
Craig Tillerc6549762016-03-09 17:10:43 -08001440 if (!are_initial_metadata_flags_valid(op->flags, call->is_client)) {
Craig Tillera82950e2015-09-22 12:33:20 -07001441 error = GRPC_CALL_ERROR_INVALID_FLAGS;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001442 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001443 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001444 if (call->sent_initial_metadata) {
1445 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1446 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001447 }
David Garcia Quintasa301eaa2016-05-06 16:59:03 -07001448 /* process compression level */
Vitaly Bukae60003d2016-08-01 19:34:51 -07001449 memset(&compression_md, 0, sizeof(compression_md));
David Garcia Quintasa301eaa2016-05-06 16:59:03 -07001450 size_t additional_metadata_count = 0;
David Garcia Quintas749367f2016-05-17 19:15:24 -07001451 grpc_compression_level effective_compression_level;
1452 bool level_set = false;
1453 if (op->data.send_initial_metadata.maybe_compression_level.is_set) {
David Garcia Quintas749367f2016-05-17 19:15:24 -07001454 effective_compression_level =
David Garcia Quintas8ba42be2016-06-07 17:30:20 -07001455 op->data.send_initial_metadata.maybe_compression_level.level;
David Garcia Quintas749367f2016-05-17 19:15:24 -07001456 level_set = true;
1457 } else {
David Garcia Quintasac094472016-05-18 20:25:57 -07001458 const grpc_compression_options copts =
1459 grpc_channel_compression_options(call->channel);
1460 level_set = copts.default_level.is_set;
1461 if (level_set) {
1462 effective_compression_level = copts.default_level.level;
1463 }
David Garcia Quintas749367f2016-05-17 19:15:24 -07001464 }
David Garcia Quintas3e4f49f2016-05-18 23:59:02 -07001465 if (level_set && !call->is_client) {
David Garcia Quintasa301eaa2016-05-06 16:59:03 -07001466 const grpc_compression_algorithm calgo =
1467 compression_algorithm_for_level_locked(
David Garcia Quintas749367f2016-05-17 19:15:24 -07001468 call, effective_compression_level);
David Garcia Quintas3e4f49f2016-05-18 23:59:02 -07001469 // the following will be picked up by the compress filter and used as
1470 // the call's compression algorithm.
Craig Tiller68208fe2016-11-14 14:35:02 -08001471 compression_md.key = GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST;
1472 compression_md.value = grpc_compression_algorithm_slice(calgo);
David Garcia Quintasa301eaa2016-05-06 16:59:03 -07001473 additional_metadata_count++;
1474 }
1475
1476 if (op->data.send_initial_metadata.count + additional_metadata_count >
1477 INT_MAX) {
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001478 error = GRPC_CALL_ERROR_INVALID_METADATA;
1479 goto done_with_error;
1480 }
Craig Tiller9c1ec542017-03-02 08:42:54 -08001481 stream_op->send_initial_metadata = true;
Craig Tillerea54b8c2017-03-01 16:58:28 -08001482 call->sent_initial_metadata = true;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001483 if (!prepare_application_metadata(
Craig Tillera59c16c2016-10-31 07:25:01 -07001484 exec_ctx, call, (int)op->data.send_initial_metadata.count,
David Garcia Quintasa301eaa2016-05-06 16:59:03 -07001485 op->data.send_initial_metadata.metadata, 0, call->is_client,
1486 &compression_md, (int)additional_metadata_count)) {
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001487 error = GRPC_CALL_ERROR_INVALID_METADATA;
1488 goto done_with_error;
1489 }
1490 /* TODO(ctiller): just make these the same variable? */
1491 call->metadata_batch[0][0].deadline = call->send_deadline;
Craig Tiller9c1ec542017-03-02 08:42:54 -08001492 stream_op_payload->send_initial_metadata.send_initial_metadata =
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001493 &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */];
Craig Tiller9c1ec542017-03-02 08:42:54 -08001494 stream_op_payload->send_initial_metadata.send_initial_metadata_flags =
1495 op->flags;
Craig Tillera82950e2015-09-22 12:33:20 -07001496 break;
1497 case GRPC_OP_SEND_MESSAGE:
1498 if (!are_write_flags_valid(op->flags)) {
1499 error = GRPC_CALL_ERROR_INVALID_FLAGS;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001500 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001501 }
Mark D. Roth448c1f02017-01-25 10:44:30 -08001502 if (op->data.send_message.send_message == NULL) {
Craig Tillera82950e2015-09-22 12:33:20 -07001503 error = GRPC_CALL_ERROR_INVALID_MESSAGE;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001504 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001505 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001506 if (call->sending_message) {
1507 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1508 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001509 }
Craig Tiller9c1ec542017-03-02 08:42:54 -08001510 stream_op->send_message = true;
Craig Tillerea54b8c2017-03-01 16:58:28 -08001511 call->sending_message = true;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001512 grpc_slice_buffer_stream_init(
1513 &call->sending_stream,
Mark D. Roth448c1f02017-01-25 10:44:30 -08001514 &op->data.send_message.send_message->data.raw.slice_buffer,
1515 op->flags);
Lizan Zhou61f09732016-10-26 14:09:52 -07001516 /* If the outgoing buffer is already compressed, mark it as so in the
1517 flags. These will be picked up by the compression filter and further
1518 (wasteful) attempts at compression skipped. */
Mark D. Roth9d76dbe2017-01-25 15:02:56 -08001519 if (op->data.send_message.send_message->data.raw.compression >
1520 GRPC_COMPRESS_NONE) {
Lizan Zhou61f09732016-10-26 14:09:52 -07001521 call->sending_stream.base.flags |= GRPC_WRITE_INTERNAL_COMPRESS;
1522 }
Craig Tiller9c1ec542017-03-02 08:42:54 -08001523 stream_op_payload->send_message.send_message =
1524 &call->sending_stream.base;
Craig Tillera82950e2015-09-22 12:33:20 -07001525 break;
1526 case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
1527 /* Flag validation: currently allow no flags */
1528 if (op->flags != 0) {
1529 error = GRPC_CALL_ERROR_INVALID_FLAGS;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001530 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001531 }
1532 if (!call->is_client) {
1533 error = GRPC_CALL_ERROR_NOT_ON_SERVER;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001534 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001535 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001536 if (call->sent_final_op) {
1537 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1538 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001539 }
Craig Tiller9c1ec542017-03-02 08:42:54 -08001540 stream_op->send_trailing_metadata = true;
Craig Tillere198b712017-03-31 15:29:33 -07001541 call->sent_final_op = true;
Craig Tiller2d43fbf2017-03-13 16:10:05 -07001542 stream_op_payload->send_trailing_metadata.send_trailing_metadata =
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001543 &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
Craig Tillera82950e2015-09-22 12:33:20 -07001544 break;
1545 case GRPC_OP_SEND_STATUS_FROM_SERVER:
1546 /* Flag validation: currently allow no flags */
1547 if (op->flags != 0) {
1548 error = GRPC_CALL_ERROR_INVALID_FLAGS;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001549 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001550 }
1551 if (call->is_client) {
1552 error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001553 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001554 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001555 if (call->sent_final_op) {
1556 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1557 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001558 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001559 if (op->data.send_status_from_server.trailing_metadata_count >
1560 INT_MAX) {
1561 error = GRPC_CALL_ERROR_INVALID_METADATA;
1562 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001563 }
Craig Tiller9c1ec542017-03-02 08:42:54 -08001564 stream_op->send_trailing_metadata = true;
Craig Tillere198b712017-03-31 15:29:33 -07001565 call->sent_final_op = true;
Craig Tiller93727aa2017-02-06 13:05:39 -08001566 GPR_ASSERT(call->send_extra_metadata_count == 0);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001567 call->send_extra_metadata_count = 1;
1568 call->send_extra_metadata[0].md = grpc_channel_get_reffed_status_elem(
Craig Tillera59c16c2016-10-31 07:25:01 -07001569 exec_ctx, call->channel, op->data.send_status_from_server.status);
Craig Tiller841a99d2016-12-12 16:58:57 -08001570 {
1571 grpc_error *override_error = GRPC_ERROR_NONE;
1572 if (op->data.send_status_from_server.status != GRPC_STATUS_OK) {
ncteisen4b36a3d2017-03-13 19:08:06 -07001573 override_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1574 "Error from server send status");
Craig Tiller841a99d2016-12-12 16:58:57 -08001575 }
1576 if (op->data.send_status_from_server.status_details != NULL) {
1577 call->send_extra_metadata[1].md = grpc_mdelem_from_slices(
1578 exec_ctx, GRPC_MDSTR_GRPC_MESSAGE,
1579 grpc_slice_ref_internal(
1580 *op->data.send_status_from_server.status_details));
1581 call->send_extra_metadata_count++;
1582 char *msg = grpc_slice_to_c_string(
1583 GRPC_MDVALUE(call->send_extra_metadata[1].md));
ncteisen4b36a3d2017-03-13 19:08:06 -07001584 override_error =
1585 grpc_error_set_str(override_error, GRPC_ERROR_STR_GRPC_MESSAGE,
1586 grpc_slice_from_copied_string(msg));
Craig Tiller841a99d2016-12-12 16:58:57 -08001587 gpr_free(msg);
1588 }
1589 set_status_from_error(exec_ctx, call, STATUS_FROM_API_OVERRIDE,
1590 override_error);
Craig Tiller69a1f662016-09-28 10:24:21 -07001591 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001592 if (!prepare_application_metadata(
Craig Tillera59c16c2016-10-31 07:25:01 -07001593 exec_ctx, call,
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001594 (int)op->data.send_status_from_server.trailing_metadata_count,
David Garcia Quintasa301eaa2016-05-06 16:59:03 -07001595 op->data.send_status_from_server.trailing_metadata, 1, 1, NULL,
1596 0)) {
Craig Tiller93727aa2017-02-06 13:05:39 -08001597 for (int n = 0; n < call->send_extra_metadata_count; n++) {
1598 GRPC_MDELEM_UNREF(exec_ctx, call->send_extra_metadata[n].md);
1599 }
1600 call->send_extra_metadata_count = 0;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001601 error = GRPC_CALL_ERROR_INVALID_METADATA;
1602 goto done_with_error;
1603 }
Craig Tiller9c1ec542017-03-02 08:42:54 -08001604 stream_op_payload->send_trailing_metadata.send_trailing_metadata =
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001605 &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
Craig Tillera82950e2015-09-22 12:33:20 -07001606 break;
1607 case GRPC_OP_RECV_INITIAL_METADATA:
1608 /* Flag validation: currently allow no flags */
1609 if (op->flags != 0) {
1610 error = GRPC_CALL_ERROR_INVALID_FLAGS;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001611 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001612 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001613 if (call->received_initial_metadata) {
1614 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1615 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001616 }
Craig Tiller9c5318a2016-12-05 15:07:04 -08001617 /* IF this is a server, then GRPC_OP_RECV_INITIAL_METADATA *must* come
1618 from server.c. In that case, it's coming from accept_stream, and in
1619 that case we're not necessarily covered by a poller. */
1620 stream_op->covered_by_poller = call->is_client;
Craig Tillere198b712017-03-31 15:29:33 -07001621 call->received_initial_metadata = true;
Mark D. Roth448c1f02017-01-25 10:44:30 -08001622 call->buffered_metadata[0] =
1623 op->data.recv_initial_metadata.recv_initial_metadata;
Craig Tillera44cbfc2016-02-03 16:02:49 -08001624 grpc_closure_init(&call->receiving_initial_metadata_ready,
Craig Tiller91031da2016-12-28 15:44:25 -08001625 receiving_initial_metadata_ready, bctl,
1626 grpc_schedule_on_exec_ctx);
Craig Tiller9c1ec542017-03-02 08:42:54 -08001627 stream_op->recv_initial_metadata = true;
1628 stream_op_payload->recv_initial_metadata.recv_initial_metadata =
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001629 &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
Craig Tiller9c1ec542017-03-02 08:42:54 -08001630 stream_op_payload->recv_initial_metadata.recv_initial_metadata_ready =
Craig Tillera44cbfc2016-02-03 16:02:49 -08001631 &call->receiving_initial_metadata_ready;
1632 num_completion_callbacks_needed++;
Craig Tillera82950e2015-09-22 12:33:20 -07001633 break;
1634 case GRPC_OP_RECV_MESSAGE:
1635 /* Flag validation: currently allow no flags */
1636 if (op->flags != 0) {
1637 error = GRPC_CALL_ERROR_INVALID_FLAGS;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001638 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001639 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001640 if (call->receiving_message) {
1641 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
yang-g48f3a712015-12-07 11:23:50 -08001642 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001643 }
Craig Tiller9c1ec542017-03-02 08:42:54 -08001644 call->receiving_message = true;
1645 stream_op->recv_message = true;
Mark D. Roth448c1f02017-01-25 10:44:30 -08001646 call->receiving_buffer = op->data.recv_message.recv_message;
Craig Tiller9c1ec542017-03-02 08:42:54 -08001647 stream_op_payload->recv_message.recv_message = &call->receiving_stream;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001648 grpc_closure_init(&call->receiving_stream_ready, receiving_stream_ready,
Craig Tiller91031da2016-12-28 15:44:25 -08001649 bctl, grpc_schedule_on_exec_ctx);
Craig Tiller9c1ec542017-03-02 08:42:54 -08001650 stream_op_payload->recv_message.recv_message_ready =
1651 &call->receiving_stream_ready;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001652 num_completion_callbacks_needed++;
Craig Tillera82950e2015-09-22 12:33:20 -07001653 break;
1654 case GRPC_OP_RECV_STATUS_ON_CLIENT:
1655 /* Flag validation: currently allow no flags */
1656 if (op->flags != 0) {
1657 error = GRPC_CALL_ERROR_INVALID_FLAGS;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001658 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001659 }
1660 if (!call->is_client) {
1661 error = GRPC_CALL_ERROR_NOT_ON_SERVER;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001662 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001663 }
Craig Tiller1cbf5762016-04-22 16:02:55 -07001664 if (call->requested_final_op) {
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001665 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1666 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001667 }
Craig Tillere198b712017-03-31 15:29:33 -07001668 call->requested_final_op = true;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001669 call->buffered_metadata[1] =
Craig Tillera82950e2015-09-22 12:33:20 -07001670 op->data.recv_status_on_client.trailing_metadata;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001671 call->final_op.client.status = op->data.recv_status_on_client.status;
1672 call->final_op.client.status_details =
1673 op->data.recv_status_on_client.status_details;
Craig Tiller9c1ec542017-03-02 08:42:54 -08001674 stream_op->recv_trailing_metadata = true;
1675 stream_op->collect_stats = true;
1676 stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001677 &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
Craig Tiller9c1ec542017-03-02 08:42:54 -08001678 stream_op_payload->collect_stats.collect_stats =
David Garcia Quintas5dde14c2016-07-28 17:29:27 -07001679 &call->final_info.stats.transport_stream_stats;
Craig Tillera82950e2015-09-22 12:33:20 -07001680 break;
1681 case GRPC_OP_RECV_CLOSE_ON_SERVER:
1682 /* Flag validation: currently allow no flags */
1683 if (op->flags != 0) {
1684 error = GRPC_CALL_ERROR_INVALID_FLAGS;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001685 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001686 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001687 if (call->is_client) {
1688 error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
1689 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001690 }
Craig Tiller1cbf5762016-04-22 16:02:55 -07001691 if (call->requested_final_op) {
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001692 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1693 goto done_with_error;
1694 }
Craig Tillere198b712017-03-31 15:29:33 -07001695 call->requested_final_op = true;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001696 call->final_op.server.cancelled =
Craig Tillera82950e2015-09-22 12:33:20 -07001697 op->data.recv_close_on_server.cancelled;
Craig Tiller9c1ec542017-03-02 08:42:54 -08001698 stream_op->recv_trailing_metadata = true;
1699 stream_op->collect_stats = true;
1700 stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001701 &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
Craig Tiller9c1ec542017-03-02 08:42:54 -08001702 stream_op_payload->collect_stats.collect_stats =
David Garcia Quintas5dde14c2016-07-28 17:29:27 -07001703 &call->final_info.stats.transport_stream_stats;
Craig Tillera82950e2015-09-22 12:33:20 -07001704 break;
Craig Tillerfb189f82015-02-03 12:07:07 -08001705 }
Craig Tillera82950e2015-09-22 12:33:20 -07001706 }
Craig Tillerfb189f82015-02-03 12:07:07 -08001707
Craig Tillera82950e2015-09-22 12:33:20 -07001708 GRPC_CALL_INTERNAL_REF(call, "completion");
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001709 if (!is_notify_tag_closure) {
Craig Tiller4bf29282015-12-14 11:25:48 -08001710 grpc_cq_begin_op(call->cq, notify_tag);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001711 }
1712 gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed);
Craig Tillerfb189f82015-02-03 12:07:07 -08001713
Craig Tiller91031da2016-12-28 15:44:25 -08001714 grpc_closure_init(&bctl->finish_batch, finish_batch, bctl,
1715 grpc_schedule_on_exec_ctx);
Craig Tiller6e7b45e2016-07-08 17:25:49 -07001716 stream_op->on_complete = &bctl->finish_batch;
Craig Tillerb597dcf2017-03-09 07:02:11 -08001717 gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001718
Craig Tiller6e7b45e2016-07-08 17:25:49 -07001719 execute_op(exec_ctx, call, stream_op);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001720
Craig Tiller3ffd8222015-09-21 08:21:57 -07001721done:
Craig Tiller0ba432d2015-10-09 16:57:11 -07001722 GPR_TIMER_END("grpc_call_start_batch", 0);
Craig Tiller3ffd8222015-09-21 08:21:57 -07001723 return error;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001724
1725done_with_error:
1726 /* reverse any mutations that occured */
Craig Tiller9c1ec542017-03-02 08:42:54 -08001727 if (stream_op->send_initial_metadata) {
Craig Tillere198b712017-03-31 15:29:33 -07001728 call->sent_initial_metadata = false;
Craig Tillera59c16c2016-10-31 07:25:01 -07001729 grpc_metadata_batch_clear(exec_ctx, &call->metadata_batch[0][0]);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001730 }
Craig Tiller9c1ec542017-03-02 08:42:54 -08001731 if (stream_op->send_message) {
Craig Tillere198b712017-03-31 15:29:33 -07001732 call->sending_message = false;
Craig Tiller3b66ab92015-12-09 19:42:22 -08001733 grpc_byte_stream_destroy(exec_ctx, &call->sending_stream.base);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001734 }
Craig Tiller9c1ec542017-03-02 08:42:54 -08001735 if (stream_op->send_trailing_metadata) {
Craig Tillere198b712017-03-31 15:29:33 -07001736 call->sent_final_op = false;
Craig Tillera59c16c2016-10-31 07:25:01 -07001737 grpc_metadata_batch_clear(exec_ctx, &call->metadata_batch[0][1]);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001738 }
Craig Tiller9c1ec542017-03-02 08:42:54 -08001739 if (stream_op->recv_initial_metadata) {
Craig Tillere198b712017-03-31 15:29:33 -07001740 call->received_initial_metadata = false;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001741 }
Craig Tiller9c1ec542017-03-02 08:42:54 -08001742 if (stream_op->recv_message) {
Craig Tillere198b712017-03-31 15:29:33 -07001743 call->receiving_message = false;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001744 }
Craig Tiller9c1ec542017-03-02 08:42:54 -08001745 if (stream_op->recv_trailing_metadata) {
Craig Tillere198b712017-03-31 15:29:33 -07001746 call->requested_final_op = false;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001747 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001748 goto done;
1749}
1750
1751grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
1752 size_t nops, void *tag, void *reserved) {
1753 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
1754 grpc_call_error err;
1755
1756 GRPC_API_TRACE(
David Garcia Quintas46123372016-05-09 15:28:42 -07001757 "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, "
1758 "reserved=%p)",
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001759 5, (call, ops, (unsigned long)nops, tag, reserved));
1760
1761 if (reserved != NULL) {
1762 err = GRPC_CALL_ERROR;
1763 } else {
1764 err = call_start_batch(&exec_ctx, call, ops, nops, tag, 0);
1765 }
1766
1767 grpc_exec_ctx_finish(&exec_ctx);
1768 return err;
1769}
1770
1771grpc_call_error grpc_call_start_batch_and_execute(grpc_exec_ctx *exec_ctx,
1772 grpc_call *call,
1773 const grpc_op *ops,
1774 size_t nops,
1775 grpc_closure *closure) {
1776 return call_start_batch(exec_ctx, call, ops, nops, closure, 1);
Craig Tillerfb189f82015-02-03 12:07:07 -08001777}
Craig Tiller935cf422015-05-01 14:10:46 -07001778
Craig Tillera82950e2015-09-22 12:33:20 -07001779void grpc_call_context_set(grpc_call *call, grpc_context_index elem,
1780 void *value, void (*destroy)(void *value)) {
1781 if (call->context[elem].destroy) {
1782 call->context[elem].destroy(call->context[elem].value);
1783 }
Julien Boeuf83b02972015-05-20 22:50:34 -07001784 call->context[elem].value = value;
1785 call->context[elem].destroy = destroy;
Craig Tiller935cf422015-05-01 14:10:46 -07001786}
1787
Craig Tillera82950e2015-09-22 12:33:20 -07001788void *grpc_call_context_get(grpc_call *call, grpc_context_index elem) {
Julien Boeuf83b02972015-05-20 22:50:34 -07001789 return call->context[elem].value;
Craig Tiller935cf422015-05-01 14:10:46 -07001790}
Julien Boeuf9f218dd2015-04-23 10:24:02 -07001791
Craig Tiller7536af02015-12-22 13:49:30 -08001792uint8_t grpc_call_is_client(grpc_call *call) { return call->is_client; }
David Garcia Quintas13c2f6e2016-03-17 22:51:52 -07001793
1794grpc_compression_algorithm grpc_call_compression_for_level(
1795 grpc_call *call, grpc_compression_level level) {
David Garcia Quintasa301eaa2016-05-06 16:59:03 -07001796 grpc_compression_algorithm algo =
1797 compression_algorithm_for_level_locked(call, level);
David Garcia Quintasa301eaa2016-05-06 16:59:03 -07001798 return algo;
David Garcia Quintas13c2f6e2016-03-17 22:51:52 -07001799}
Yuchen Zeng2e7d9572016-04-15 17:29:57 -07001800
1801const char *grpc_call_error_to_string(grpc_call_error error) {
1802 switch (error) {
1803 case GRPC_CALL_ERROR:
1804 return "GRPC_CALL_ERROR";
1805 case GRPC_CALL_ERROR_ALREADY_ACCEPTED:
1806 return "GRPC_CALL_ERROR_ALREADY_ACCEPTED";
1807 case GRPC_CALL_ERROR_ALREADY_FINISHED:
1808 return "GRPC_CALL_ERROR_ALREADY_FINISHED";
1809 case GRPC_CALL_ERROR_ALREADY_INVOKED:
1810 return "GRPC_CALL_ERROR_ALREADY_INVOKED";
1811 case GRPC_CALL_ERROR_BATCH_TOO_BIG:
1812 return "GRPC_CALL_ERROR_BATCH_TOO_BIG";
1813 case GRPC_CALL_ERROR_INVALID_FLAGS:
1814 return "GRPC_CALL_ERROR_INVALID_FLAGS";
1815 case GRPC_CALL_ERROR_INVALID_MESSAGE:
1816 return "GRPC_CALL_ERROR_INVALID_MESSAGE";
1817 case GRPC_CALL_ERROR_INVALID_METADATA:
1818 return "GRPC_CALL_ERROR_INVALID_METADATA";
1819 case GRPC_CALL_ERROR_NOT_INVOKED:
1820 return "GRPC_CALL_ERROR_NOT_INVOKED";
1821 case GRPC_CALL_ERROR_NOT_ON_CLIENT:
1822 return "GRPC_CALL_ERROR_NOT_ON_CLIENT";
1823 case GRPC_CALL_ERROR_NOT_ON_SERVER:
1824 return "GRPC_CALL_ERROR_NOT_ON_SERVER";
1825 case GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE:
1826 return "GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE";
1827 case GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH:
1828 return "GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH";
1829 case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS:
1830 return "GRPC_CALL_ERROR_TOO_MANY_OPERATIONS";
1831 case GRPC_CALL_OK:
1832 return "GRPC_CALL_OK";
Yuchen Zeng2e7d9572016-04-15 17:29:57 -07001833 }
Yuchen Zengf02bada2016-04-19 14:12:27 -07001834 GPR_UNREACHABLE_CODE(return "GRPC_CALL_ERROR_UNKNOW");
Yuchen Zeng2e7d9572016-04-15 17:29:57 -07001835}