blob: 3c563bcc6fafb565977d0f48b62093440ab05a85 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller6169d5f2016-03-31 07:46:18 -07003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
murgatroid999030c812016-09-16 13:25:08 -070033
David Garcia Quintasf74a49e2015-06-18 17:22:45 -070034#include <assert.h>
Craig Tillerc7e1a2a2015-11-02 14:17:32 -080035#include <limits.h>
David Garcia Quintasf74a49e2015-06-18 17:22:45 -070036#include <stdio.h>
37#include <stdlib.h>
38#include <string.h>
39
40#include <grpc/compression.h>
murgatroid99c3910ca2016-01-06 13:14:23 -080041#include <grpc/grpc.h>
Craig Tiller0f310802016-10-26 16:25:56 -070042#include <grpc/slice.h>
David Garcia Quintasf74a49e2015-06-18 17:22:45 -070043#include <grpc/support/alloc.h>
44#include <grpc/support/log.h>
45#include <grpc/support/string_util.h>
David Garcia Quintase091af82015-07-15 21:37:02 -070046#include <grpc/support/useful.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080047
Craig Tiller9533d042016-03-25 17:11:06 -070048#include "src/core/lib/channel/channel_stack.h"
49#include "src/core/lib/compression/algorithm_metadata.h"
50#include "src/core/lib/iomgr/timer.h"
51#include "src/core/lib/profiling/timers.h"
Craig Tillera59c16c2016-10-31 07:25:01 -070052#include "src/core/lib/slice/slice_internal.h"
Craig Tiller0f310802016-10-26 16:25:56 -070053#include "src/core/lib/slice/slice_string_helpers.h"
Craig Tiller9533d042016-03-25 17:11:06 -070054#include "src/core/lib/support/string.h"
55#include "src/core/lib/surface/api_trace.h"
56#include "src/core/lib/surface/call.h"
57#include "src/core/lib/surface/channel.h"
58#include "src/core/lib/surface/completion_queue.h"
Craig Tillerf2b5b7e2017-01-10 08:28:59 -080059#include "src/core/lib/surface/validate_metadata.h"
Craig Tiller732351f2016-12-13 16:40:38 -080060#include "src/core/lib/transport/error_utils.h"
David Garcia Quintas73dcbda2016-04-23 00:17:05 -070061#include "src/core/lib/transport/metadata.h"
Craig Tiller9533d042016-03-25 17:11:06 -070062#include "src/core/lib/transport/static_metadata.h"
David Garcia Quintas73dcbda2016-04-23 00:17:05 -070063#include "src/core/lib/transport/transport.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080064
Craig Tillerc7e1a2a2015-11-02 14:17:32 -080065/** The maximum number of concurrent batches possible.
Craig Tiller1b011672015-07-10 10:41:44 -070066 Based upon the maximum number of individually queueable ops in the batch
Craig Tiller94903892016-10-11 15:43:35 -070067 api:
Craig Tiller1b011672015-07-10 10:41:44 -070068 - initial metadata send
69 - message send
70 - status/close send (depending on client/server)
71 - initial metadata recv
72 - message recv
73 - status/close recv (depending on client/server) */
Craig Tillerc7e1a2a2015-11-02 14:17:32 -080074#define MAX_CONCURRENT_BATCHES 6
Craig Tiller1b011672015-07-10 10:41:44 -070075
Craig Tillerc7e1a2a2015-11-02 14:17:32 -080076#define MAX_SEND_EXTRA_METADATA_COUNT 3
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080077
Craig Tillerdaceea82015-02-02 16:15:53 -080078/* Status data for a request can come from several sources; this
79 enumerates them all, and acts as a priority sorting for which
80 status to return to the application - earlier entries override
81 later ones */
Craig Tillera82950e2015-09-22 12:33:20 -070082typedef enum {
Craig Tillerdaceea82015-02-02 16:15:53 -080083 /* Status came from the application layer overriding whatever
84 the wire says */
Craig Tiller68752722015-01-29 14:59:54 -080085 STATUS_FROM_API_OVERRIDE = 0,
Craig Tillerdaceea82015-02-02 16:15:53 -080086 /* Status came from 'the wire' - or somewhere below the surface
87 layer */
Craig Tiller68752722015-01-29 14:59:54 -080088 STATUS_FROM_WIRE,
Craig Tiller2dc32ea2017-01-31 15:32:34 -080089 /* Status was created by some internal channel stack operation: must come via
90 add_batch_error */
Craig Tiller2aa03df2016-03-16 08:24:55 -070091 STATUS_FROM_CORE,
Craig Tiller2dc32ea2017-01-31 15:32:34 -080092 /* Status was created by some surface error */
93 STATUS_FROM_SURFACE,
Craig Tilleraea081f2015-06-11 14:19:33 -070094 /* Status came from the server sending status */
95 STATUS_FROM_SERVER_STATUS,
Craig Tiller68752722015-01-29 14:59:54 -080096 STATUS_SOURCE_COUNT
97} status_source;
98
Craig Tillera82950e2015-09-22 12:33:20 -070099typedef struct {
Craig Tiller841a99d2016-12-12 16:58:57 -0800100 bool is_set;
101 grpc_error *error;
Craig Tiller68752722015-01-29 14:59:54 -0800102} received_status;
103
Craig Tiller4bab9462017-02-22 08:56:02 -0800104static gpr_atm pack_received_status(received_status r) {
105 return r.is_set ? (1 | (gpr_atm)r.error) : 0;
106}
107
108static received_status unpack_received_status(gpr_atm atm) {
109 return (atm & 1) == 0
110 ? (received_status){.is_set = false, .error = GRPC_ERROR_NONE}
111 : (received_status){.is_set = true,
112 .error = (grpc_error *)(atm & ~(gpr_atm)1)};
113}
114
Craig Tiller94903892016-10-11 15:43:35 -0700115#define MAX_ERRORS_PER_BATCH 3
116
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800117typedef struct batch_control {
118 grpc_call *call;
119 grpc_cq_completion cq_completion;
120 grpc_closure finish_batch;
121 void *notify_tag;
122 gpr_refcount steps_to_complete;
Craig Tiller94903892016-10-11 15:43:35 -0700123
124 grpc_error *errors[MAX_ERRORS_PER_BATCH];
125 gpr_atm num_errors;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800126
Craig Tiller7536af02015-12-22 13:49:30 -0800127 uint8_t send_initial_metadata;
128 uint8_t send_message;
129 uint8_t send_final_op;
130 uint8_t recv_initial_metadata;
131 uint8_t recv_message;
132 uint8_t recv_final_op;
133 uint8_t is_notify_tag_closure;
Craig Tiller6e7b45e2016-07-08 17:25:49 -0700134
135 /* TODO(ctiller): now that this is inlined, figure out how much of the above
136 state can be eliminated */
137 grpc_transport_stream_op op;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800138} batch_control;
139
Craig Tillera82950e2015-09-22 12:33:20 -0700140struct grpc_call {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800141 grpc_completion_queue *cq;
David Garcia Quintas2a50dfe2016-05-31 15:09:12 -0700142 grpc_polling_entity pollent;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800143 grpc_channel *channel;
Craig Tiller3e7c6a72015-07-31 16:17:04 -0700144 grpc_call *parent;
Craig Tillerc7df0df2015-08-03 08:06:50 -0700145 grpc_call *first_child;
Mark D. Roth3d883412016-11-07 13:42:54 -0800146 gpr_timespec start_time;
Craig Tillercce17ac2015-01-20 09:29:28 -0800147 /* TODO(ctiller): share with cq if possible? */
148 gpr_mu mu;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800149
Craig Tillere5d683c2015-02-03 16:37:36 -0800150 /* client or server call */
Craig Tiller1cbf5762016-04-22 16:02:55 -0700151 bool is_client;
Craig Tillerf3fba742015-06-11 09:36:33 -0700152 /** has grpc_call_destroy been called */
Craig Tiller1cbf5762016-04-22 16:02:55 -0700153 bool destroy_called;
Craig Tillerc7df0df2015-08-03 08:06:50 -0700154 /** flag indicating that cancellation is inherited */
Craig Tiller1cbf5762016-04-22 16:02:55 -0700155 bool cancellation_is_inherited;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800156 /** which ops are in-flight */
Craig Tiller1cbf5762016-04-22 16:02:55 -0700157 bool sent_initial_metadata;
158 bool sending_message;
159 bool sent_final_op;
160 bool received_initial_metadata;
161 bool receiving_message;
162 bool requested_final_op;
163 bool received_final_op;
yang-g0b6ad7d2015-06-25 14:39:01 -0700164
Craig Tillera44cbfc2016-02-03 16:02:49 -0800165 /* have we received initial metadata */
166 bool has_initial_md_been_received;
167
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800168 batch_control active_batches[MAX_CONCURRENT_BATCHES];
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800169
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800170 /* first idx: is_receiving, second idx: is_trailing */
171 grpc_metadata_batch metadata_batch[2][2];
Craig Tillerebf94bf2015-02-05 08:48:46 -0800172
Craig Tillere5d683c2015-02-03 16:37:36 -0800173 /* Buffered read metadata waiting to be returned to the application.
174 Element 0 is initial metadata, element 1 is trailing metadata. */
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800175 grpc_metadata_array *buffered_metadata[2];
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800176
Craig Tiller4bab9462017-02-22 08:56:02 -0800177 /* Packed received call statuses from various sources */
178 gpr_atm status[STATUS_SOURCE_COUNT];
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800179
David Garcia Quintas01c4d992016-07-07 20:11:27 -0700180 /* Call data useful used for reporting. Only valid after the call has
181 * completed */
182 grpc_call_final_info final_info;
Craig Tiller466129e2016-03-09 14:43:18 -0800183
David Garcia Quintas749367f2016-05-17 19:15:24 -0700184 /* Compression algorithm for *incoming* data */
185 grpc_compression_algorithm incoming_compression_algorithm;
David Garcia Quintase091af82015-07-15 21:37:02 -0700186 /* Supported encodings (compression algorithms), a bitset */
Craig Tiller7536af02015-12-22 13:49:30 -0800187 uint32_t encodings_accepted_by_peer;
David Garcia Quintasb8edf7e2015-07-08 20:18:57 -0700188
Julien Boeufc6f8d0a2015-05-11 22:40:02 -0700189 /* Contexts for various subsystems (security, tracing, ...). */
Julien Boeuf83b02972015-05-20 22:50:34 -0700190 grpc_call_context_element context[GRPC_CONTEXT_COUNT];
Craig Tiller935cf422015-05-01 14:10:46 -0700191
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800192 /* for the client, extra metadata is initial metadata; for the
193 server, it's trailing metadata */
194 grpc_linked_mdelem send_extra_metadata[MAX_SEND_EXTRA_METADATA_COUNT];
195 int send_extra_metadata_count;
Craig Tiller6902ad22015-04-16 08:01:49 -0700196 gpr_timespec send_deadline;
197
Craig Tillerd6c98df2015-08-18 09:33:44 -0700198 /** siblings: children of the same parent form a list, and this list is
199 protected under
Craig Tillerc7df0df2015-08-03 08:06:50 -0700200 parent->mu */
201 grpc_call *sibling_next;
202 grpc_call *sibling_prev;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800203
204 grpc_slice_buffer_stream sending_stream;
Craig Tiller94903892016-10-11 15:43:35 -0700205
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800206 grpc_byte_stream *receiving_stream;
207 grpc_byte_buffer **receiving_buffer;
Craig Tillerd41a4a72016-10-26 16:16:06 -0700208 grpc_slice receiving_slice;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800209 grpc_closure receiving_slice_ready;
210 grpc_closure receiving_stream_ready;
Craig Tillera44cbfc2016-02-03 16:02:49 -0800211 grpc_closure receiving_initial_metadata_ready;
Craig Tiller7536af02015-12-22 13:49:30 -0800212 uint32_t test_only_last_message_flags;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800213
214 union {
215 struct {
216 grpc_status_code *status;
Craig Tiller68208fe2016-11-14 14:35:02 -0800217 grpc_slice *status_details;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800218 } client;
219 struct {
220 int *cancelled;
221 } server;
222 } final_op;
Craig Tillera44cbfc2016-02-03 16:02:49 -0800223
Craig Tiller8a677802016-04-22 15:07:53 -0700224 void *saved_receiving_stream_ready_bctlp;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800225};
226
Craig Tiller58b30cd2017-01-31 17:07:36 -0800227int grpc_call_error_trace = 0;
228
Craig Tiller87d5b192015-04-16 14:37:57 -0700229#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800230#define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1)
231#define CALL_ELEM_FROM_CALL(call, idx) \
232 grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx)
233#define CALL_FROM_TOP_ELEM(top_elem) \
234 CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
235
Craig Tillera82950e2015-09-22 12:33:20 -0700236static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
237 grpc_transport_stream_op *op);
Craig Tiller2dc32ea2017-01-31 15:32:34 -0800238static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
239 status_source source, grpc_status_code status,
240 const char *description);
Craig Tiller255edaa2016-12-13 09:04:55 -0800241static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c,
Craig Tiller2dc32ea2017-01-31 15:32:34 -0800242 status_source source, grpc_error *error);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800243static void destroy_call(grpc_exec_ctx *exec_ctx, void *call_stack,
Craig Tillerc027e772016-05-03 16:27:00 -0700244 grpc_error *error);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800245static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
Craig Tillerc027e772016-05-03 16:27:00 -0700246 grpc_error *error);
Craig Tiller841a99d2016-12-12 16:58:57 -0800247static void get_final_status(grpc_call *call,
248 void (*set_value)(grpc_status_code code,
249 void *user_data),
250 void *set_value_user_data, grpc_slice *details);
251static void set_status_value_directly(grpc_status_code status, void *dest);
252static void set_status_from_error(grpc_exec_ctx *exec_ctx, grpc_call *call,
253 status_source source, grpc_error *error);
254static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl);
255static void post_batch_completion(grpc_exec_ctx *exec_ctx, batch_control *bctl);
256static void add_batch_error(grpc_exec_ctx *exec_ctx, batch_control *bctl,
257 grpc_error *error);
Craig Tillerbac41422015-05-29 16:32:28 -0700258
Craig Tillerf4484cd2017-02-01 08:28:40 -0800259static void add_init_error(grpc_error **composite, grpc_error *new) {
260 if (new == GRPC_ERROR_NONE) return;
261 if (*composite == GRPC_ERROR_NONE)
262 *composite = GRPC_ERROR_CREATE("Call creation failed");
263 *composite = grpc_error_add_child(*composite, new);
264}
265
Craig Tillera59c16c2016-10-31 07:25:01 -0700266grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
267 const grpc_call_create_args *args,
Craig Tiller8e214652016-08-19 09:54:31 -0700268 grpc_call **out_call) {
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800269 size_t i, j;
Craig Tillerf4484cd2017-02-01 08:28:40 -0800270 grpc_error *error = GRPC_ERROR_NONE;
Craig Tiller8e214652016-08-19 09:54:31 -0700271 grpc_channel_stack *channel_stack =
272 grpc_channel_get_channel_stack(args->channel);
Craig Tiller1f41b6b2015-10-09 15:07:02 -0700273 grpc_call *call;
Craig Tiller0ba432d2015-10-09 16:57:11 -0700274 GPR_TIMER_BEGIN("grpc_call_create", 0);
Craig Tiller0eaed722016-09-21 10:44:18 -0700275 call = gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size);
276 *out_call = call;
Craig Tillera82950e2015-09-22 12:33:20 -0700277 memset(call, 0, sizeof(grpc_call));
278 gpr_mu_init(&call->mu);
Craig Tiller8e214652016-08-19 09:54:31 -0700279 call->channel = args->channel;
280 call->cq = args->cq;
281 call->parent = args->parent_call;
Mark D. Roth3d883412016-11-07 13:42:54 -0800282 call->start_time = gpr_now(GPR_CLOCK_MONOTONIC);
David Garcia Quintas46123372016-05-09 15:28:42 -0700283 /* Always support no compression */
284 GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
Craig Tiller8e214652016-08-19 09:54:31 -0700285 call->is_client = args->server_transport_data == NULL;
Craig Tiller4eecdde2016-11-14 08:21:17 -0800286 grpc_slice path = grpc_empty_slice();
Craig Tillera82950e2015-09-22 12:33:20 -0700287 if (call->is_client) {
Craig Tiller8e214652016-08-19 09:54:31 -0700288 GPR_ASSERT(args->add_initial_metadata_count <
289 MAX_SEND_EXTRA_METADATA_COUNT);
290 for (i = 0; i < args->add_initial_metadata_count; i++) {
291 call->send_extra_metadata[i].md = args->add_initial_metadata[i];
Craig Tiller3b05e1d2016-11-21 13:46:31 -0800292 if (grpc_slice_eq(GRPC_MDKEY(args->add_initial_metadata[i]),
293 GRPC_MDSTR_PATH)) {
Craig Tiller0160de92016-11-18 08:46:46 -0800294 path = grpc_slice_ref_internal(
295 GRPC_MDVALUE(args->add_initial_metadata[i]));
Mark D. Rothaa850a72016-09-26 13:38:02 -0700296 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800297 }
Craig Tiller8e214652016-08-19 09:54:31 -0700298 call->send_extra_metadata_count = (int)args->add_initial_metadata_count;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800299 } else {
Craig Tiller8e214652016-08-19 09:54:31 -0700300 GPR_ASSERT(args->add_initial_metadata_count == 0);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800301 call->send_extra_metadata_count = 0;
Craig Tillera82950e2015-09-22 12:33:20 -0700302 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800303 for (i = 0; i < 2; i++) {
304 for (j = 0; j < 2; j++) {
305 call->metadata_batch[i][j].deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
306 }
Craig Tillera82950e2015-09-22 12:33:20 -0700307 }
Craig Tillerca3451d2016-09-29 10:27:44 -0700308 gpr_timespec send_deadline =
Craig Tiller8e214652016-08-19 09:54:31 -0700309 gpr_convert_clock_type(args->send_deadline, GPR_CLOCK_MONOTONIC);
Mark D. Rothf28763c2016-09-14 15:18:40 -0700310
Craig Tiller8e214652016-08-19 09:54:31 -0700311 if (args->parent_call != NULL) {
312 GRPC_CALL_INTERNAL_REF(args->parent_call, "child");
Craig Tillera82950e2015-09-22 12:33:20 -0700313 GPR_ASSERT(call->is_client);
Craig Tiller8e214652016-08-19 09:54:31 -0700314 GPR_ASSERT(!args->parent_call->is_client);
Craig Tillera82950e2015-09-22 12:33:20 -0700315
Craig Tiller8e214652016-08-19 09:54:31 -0700316 gpr_mu_lock(&args->parent_call->mu);
Craig Tillera82950e2015-09-22 12:33:20 -0700317
Craig Tiller8e214652016-08-19 09:54:31 -0700318 if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) {
Craig Tillera82950e2015-09-22 12:33:20 -0700319 send_deadline = gpr_time_min(
320 gpr_convert_clock_type(send_deadline,
Craig Tiller8e214652016-08-19 09:54:31 -0700321 args->parent_call->send_deadline.clock_type),
322 args->parent_call->send_deadline);
Craig Tillerc7df0df2015-08-03 08:06:50 -0700323 }
Craig Tillera82950e2015-09-22 12:33:20 -0700324 /* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with
325 * GRPC_PROPAGATE_STATS_CONTEXT */
326 /* TODO(ctiller): This should change to use the appropriate census start_op
327 * call. */
Craig Tiller8e214652016-08-19 09:54:31 -0700328 if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT) {
Craig Tiller239af8b2017-02-01 10:21:42 -0800329 if (0 == (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT)) {
330 add_init_error(&error,
331 GRPC_ERROR_CREATE("Census tracing propagation requested "
332 "without Census context propagation"));
333 }
Craig Tiller8e214652016-08-19 09:54:31 -0700334 grpc_call_context_set(
335 call, GRPC_CONTEXT_TRACING,
336 args->parent_call->context[GRPC_CONTEXT_TRACING].value, NULL);
Craig Tillerf20d3072017-02-01 10:39:26 -0800337 } else if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT) {
Craig Tillerf4484cd2017-02-01 08:28:40 -0800338 add_init_error(&error,
Craig Tiller239af8b2017-02-01 10:21:42 -0800339 GRPC_ERROR_CREATE("Census context propagation requested "
340 "without Census tracing propagation"));
Craig Tiller45724b32015-09-22 10:42:19 -0700341 }
Craig Tiller8e214652016-08-19 09:54:31 -0700342 if (args->propagation_mask & GRPC_PROPAGATE_CANCELLATION) {
Craig Tillera82950e2015-09-22 12:33:20 -0700343 call->cancellation_is_inherited = 1;
Craig Tiller45724b32015-09-22 10:42:19 -0700344 }
Craig Tillera82950e2015-09-22 12:33:20 -0700345
Craig Tiller8e214652016-08-19 09:54:31 -0700346 if (args->parent_call->first_child == NULL) {
347 args->parent_call->first_child = call;
Craig Tillera82950e2015-09-22 12:33:20 -0700348 call->sibling_next = call->sibling_prev = call;
349 } else {
Craig Tiller8e214652016-08-19 09:54:31 -0700350 call->sibling_next = args->parent_call->first_child;
351 call->sibling_prev = args->parent_call->first_child->sibling_prev;
Craig Tillera82950e2015-09-22 12:33:20 -0700352 call->sibling_next->sibling_prev = call->sibling_prev->sibling_next =
353 call;
354 }
355
Craig Tiller8e214652016-08-19 09:54:31 -0700356 gpr_mu_unlock(&args->parent_call->mu);
Craig Tillera82950e2015-09-22 12:33:20 -0700357 }
Mark D. Rothf28763c2016-09-14 15:18:40 -0700358
Mark D. Roth14c072c2016-08-26 08:31:34 -0700359 call->send_deadline = send_deadline;
Mark D. Rothf28763c2016-09-14 15:18:40 -0700360
Craig Tillerca3451d2016-09-29 10:27:44 -0700361 GRPC_CHANNEL_INTERNAL_REF(args->channel, "call");
Mark D. Rothf28763c2016-09-14 15:18:40 -0700362 /* initial refcount dropped by grpc_call_destroy */
Craig Tillerf4484cd2017-02-01 08:28:40 -0800363 add_init_error(&error, grpc_call_stack_init(exec_ctx, channel_stack, 1,
364 destroy_call, call, call->context,
365 args->server_transport_data, path,
366 call->start_time, send_deadline,
367 CALL_STACK_FROM_CALL(call)));
Mark D. Rothf28763c2016-09-14 15:18:40 -0700368 if (error != GRPC_ERROR_NONE) {
Craig Tiller58b30cd2017-01-31 17:07:36 -0800369 cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE,
370 GRPC_ERROR_REF(error));
Craig Tillera82950e2015-09-22 12:33:20 -0700371 }
Craig Tillerca3451d2016-09-29 10:27:44 -0700372 if (args->cq != NULL) {
Mark D. Rothf28763c2016-09-14 15:18:40 -0700373 GPR_ASSERT(
Craig Tillerca3451d2016-09-29 10:27:44 -0700374 args->pollset_set_alternative == NULL &&
Mark D. Rothf28763c2016-09-14 15:18:40 -0700375 "Only one of 'cq' and 'pollset_set_alternative' should be non-NULL.");
Craig Tillerca3451d2016-09-29 10:27:44 -0700376 GRPC_CQ_INTERNAL_REF(args->cq, "bind");
Mark D. Rothf28763c2016-09-14 15:18:40 -0700377 call->pollent =
Craig Tillerca3451d2016-09-29 10:27:44 -0700378 grpc_polling_entity_create_from_pollset(grpc_cq_pollset(args->cq));
Mark D. Rothf28763c2016-09-14 15:18:40 -0700379 }
Craig Tillerca3451d2016-09-29 10:27:44 -0700380 if (args->pollset_set_alternative != NULL) {
381 call->pollent = grpc_polling_entity_create_from_pollset_set(
382 args->pollset_set_alternative);
Mark D. Rothf28763c2016-09-14 15:18:40 -0700383 }
384 if (!grpc_polling_entity_is_empty(&call->pollent)) {
385 grpc_call_stack_set_pollset_or_pollset_set(
Craig Tillera59c16c2016-10-31 07:25:01 -0700386 exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent);
Mark D. Rothf28763c2016-09-14 15:18:40 -0700387 }
388
Craig Tiller4eecdde2016-11-14 08:21:17 -0800389 grpc_slice_unref_internal(exec_ctx, path);
Mark D. Rothaa850a72016-09-26 13:38:02 -0700390
Craig Tiller0ba432d2015-10-09 16:57:11 -0700391 GPR_TIMER_END("grpc_call_create", 0);
Craig Tiller8e214652016-08-19 09:54:31 -0700392 return error;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800393}
394
Craig Tillera82950e2015-09-22 12:33:20 -0700395void grpc_call_set_completion_queue(grpc_exec_ctx *exec_ctx, grpc_call *call,
396 grpc_completion_queue *cq) {
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800397 GPR_ASSERT(cq);
David Garcia Quintasf72eb972016-05-03 18:28:09 -0700398
David Garcia Quintasc4d51122016-06-06 14:56:02 -0700399 if (grpc_polling_entity_pollset_set(&call->pollent) != NULL) {
David Garcia Quintasf72eb972016-05-03 18:28:09 -0700400 gpr_log(GPR_ERROR, "A pollset_set is already registered for this call.");
401 abort();
402 }
Craig Tiller166e2502015-02-03 20:14:41 -0800403 call->cq = cq;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800404 GRPC_CQ_INTERNAL_REF(cq, "bind");
David Garcia Quintasc4d51122016-06-06 14:56:02 -0700405 call->pollent = grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq));
David Garcia Quintas4afce7e2016-04-18 16:25:17 -0700406 grpc_call_stack_set_pollset_or_pollset_set(
David Garcia Quintas2a50dfe2016-05-31 15:09:12 -0700407 exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent);
Craig Tiller166e2502015-02-03 20:14:41 -0800408}
409
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800410#ifdef GRPC_STREAM_REFCOUNT_DEBUG
Craig Tiller7b435612015-11-24 08:15:05 -0800411#define REF_REASON reason
412#define REF_ARG , const char *reason
Craig Tiller4df412b2015-04-28 07:57:54 -0700413#else
Craig Tiller7b435612015-11-24 08:15:05 -0800414#define REF_REASON ""
415#define REF_ARG
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800416#endif
Craig Tiller7b435612015-11-24 08:15:05 -0800417void grpc_call_internal_ref(grpc_call *c REF_ARG) {
418 GRPC_CALL_STACK_REF(CALL_STACK_FROM_CALL(c), REF_REASON);
419}
420void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c REF_ARG) {
421 GRPC_CALL_STACK_UNREF(exec_ctx, CALL_STACK_FROM_CALL(c), REF_REASON);
422}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800423
David Garcia Quintas01c4d992016-07-07 20:11:27 -0700424static void set_status_value_directly(grpc_status_code status, void *dest);
Craig Tillerc027e772016-05-03 16:27:00 -0700425static void destroy_call(grpc_exec_ctx *exec_ctx, void *call,
426 grpc_error *error) {
Craig Tiller566316f2015-02-02 15:25:32 -0800427 size_t i;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800428 int ii;
Craig Tilleraef25da2015-01-29 17:19:45 -0800429 grpc_call *c = call;
Craig Tiller0ba432d2015-10-09 16:57:11 -0700430 GPR_TIMER_BEGIN("destroy_call", 0);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800431 for (i = 0; i < 2; i++) {
432 grpc_metadata_batch_destroy(
Craig Tillera59c16c2016-10-31 07:25:01 -0700433 exec_ctx, &c->metadata_batch[1 /* is_receiving */][i /* is_initial */]);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800434 }
435 if (c->receiving_stream != NULL) {
Craig Tiller3b66ab92015-12-09 19:42:22 -0800436 grpc_byte_stream_destroy(exec_ctx, c->receiving_stream);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800437 }
Craig Tillera82950e2015-09-22 12:33:20 -0700438 gpr_mu_destroy(&c->mu);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800439 for (ii = 0; ii < c->send_extra_metadata_count; ii++) {
Craig Tillera59c16c2016-10-31 07:25:01 -0700440 GRPC_MDELEM_UNREF(exec_ctx, c->send_extra_metadata[ii].md);
Craig Tillera82950e2015-09-22 12:33:20 -0700441 }
442 for (i = 0; i < GRPC_CONTEXT_COUNT; i++) {
443 if (c->context[i].destroy) {
444 c->context[i].destroy(c->context[i].value);
Craig Tiller935cf422015-05-01 14:10:46 -0700445 }
Craig Tillera82950e2015-09-22 12:33:20 -0700446 }
Craig Tillera82950e2015-09-22 12:33:20 -0700447 if (c->cq) {
448 GRPC_CQ_INTERNAL_UNREF(c->cq, "bind");
449 }
Craig Tiller9859d8d2016-04-26 21:07:53 -0700450 grpc_channel *channel = c->channel;
David Garcia Quintas01c4d992016-07-07 20:11:27 -0700451
Craig Tiller841a99d2016-12-12 16:58:57 -0800452 get_final_status(call, set_status_value_directly, &c->final_info.final_status,
453 NULL);
Mark D. Roth3d883412016-11-07 13:42:54 -0800454 c->final_info.stats.latency =
455 gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time);
David Garcia Quintas01c4d992016-07-07 20:11:27 -0700456
Craig Tiller841a99d2016-12-12 16:58:57 -0800457 for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
Craig Tiller4bab9462017-02-22 08:56:02 -0800458 GRPC_ERROR_UNREF(
459 unpack_received_status(gpr_atm_no_barrier_load(&c->status[i])).error);
Craig Tiller841a99d2016-12-12 16:58:57 -0800460 }
461
David Garcia Quintas01c4d992016-07-07 20:11:27 -0700462 grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->final_info, c);
Craig Tiller9859d8d2016-04-26 21:07:53 -0700463 GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call");
Craig Tiller0ba432d2015-10-09 16:57:11 -0700464 GPR_TIMER_END("destroy_call", 0);
Craig Tillera4541102015-01-29 11:46:11 -0800465}
466
Craig Tiller841a99d2016-12-12 16:58:57 -0800467void grpc_call_destroy(grpc_call *c) {
468 int cancel;
469 grpc_call *parent = c->parent;
470 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
Craig Tillerb8d3a312015-06-19 17:27:53 -0700471
Craig Tiller841a99d2016-12-12 16:58:57 -0800472 GPR_TIMER_BEGIN("grpc_call_destroy", 0);
473 GRPC_API_TRACE("grpc_call_destroy(c=%p)", 1, (c));
474
475 if (parent) {
476 gpr_mu_lock(&parent->mu);
477 if (c == parent->first_child) {
478 parent->first_child = c->sibling_next;
479 if (c == parent->first_child) {
480 parent->first_child = NULL;
481 }
482 c->sibling_prev->sibling_next = c->sibling_next;
483 c->sibling_next->sibling_prev = c->sibling_prev;
484 }
485 gpr_mu_unlock(&parent->mu);
486 GRPC_CALL_INTERNAL_UNREF(&exec_ctx, parent, "child");
487 }
488
489 gpr_mu_lock(&c->mu);
490 GPR_ASSERT(!c->destroy_called);
491 c->destroy_called = 1;
492 cancel = !c->received_final_op;
493 gpr_mu_unlock(&c->mu);
Craig Tiller37cbc3f2017-02-16 14:54:55 -0800494 if (cancel) {
495 cancel_with_error(&exec_ctx, c, STATUS_FROM_API_OVERRIDE,
496 GRPC_ERROR_CANCELLED);
497 }
Craig Tiller841a99d2016-12-12 16:58:57 -0800498 GRPC_CALL_INTERNAL_UNREF(&exec_ctx, c, "destroy");
499 grpc_exec_ctx_finish(&exec_ctx);
500 GPR_TIMER_END("grpc_call_destroy", 0);
Craig Tillerf0f70a82016-06-23 13:55:06 -0700501}
Craig Tiller30547562015-02-05 17:04:51 -0800502
Craig Tiller841a99d2016-12-12 16:58:57 -0800503grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) {
504 GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved));
505 GPR_ASSERT(!reserved);
Craig Tiller37cbc3f2017-02-16 14:54:55 -0800506 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
507 cancel_with_error(&exec_ctx, call, STATUS_FROM_API_OVERRIDE,
508 GRPC_ERROR_CANCELLED);
509 grpc_exec_ctx_finish(&exec_ctx);
510 return GRPC_CALL_OK;
Craig Tiller841a99d2016-12-12 16:58:57 -0800511}
512
513static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
514 grpc_transport_stream_op *op) {
515 grpc_call_element *elem;
516
517 GPR_TIMER_BEGIN("execute_op", 0);
518 elem = CALL_ELEM_FROM_CALL(call, 0);
519 op->context = call->context;
520 elem->filter->start_transport_stream_op(exec_ctx, elem, op);
521 GPR_TIMER_END("execute_op", 0);
522}
523
524char *grpc_call_get_peer(grpc_call *call) {
525 grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0);
526 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
527 char *result;
528 GRPC_API_TRACE("grpc_call_get_peer(%p)", 1, (call));
529 result = elem->filter->get_peer(&exec_ctx, elem);
530 if (result == NULL) {
531 result = grpc_channel_get_target(call->channel);
532 }
533 if (result == NULL) {
534 result = gpr_strdup("unknown");
535 }
536 grpc_exec_ctx_finish(&exec_ctx);
537 return result;
538}
539
540grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
541 return CALL_FROM_TOP_ELEM(elem);
542}
543
544/*******************************************************************************
545 * CANCELLATION
546 */
547
548grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
549 grpc_status_code status,
550 const char *description,
551 void *reserved) {
Craig Tiller841a99d2016-12-12 16:58:57 -0800552 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
553 GRPC_API_TRACE(
554 "grpc_call_cancel_with_status("
555 "c=%p, status=%d, description=%s, reserved=%p)",
556 4, (c, (int)status, description, reserved));
557 GPR_ASSERT(reserved == NULL);
558 gpr_mu_lock(&c->mu);
Craig Tiller2dc32ea2017-01-31 15:32:34 -0800559 cancel_with_status(&exec_ctx, c, STATUS_FROM_API_OVERRIDE, status,
560 description);
Craig Tiller841a99d2016-12-12 16:58:57 -0800561 gpr_mu_unlock(&c->mu);
562 grpc_exec_ctx_finish(&exec_ctx);
Craig Tiller2dc32ea2017-01-31 15:32:34 -0800563 return GRPC_CALL_OK;
Craig Tiller841a99d2016-12-12 16:58:57 -0800564}
565
Craig Tiller841a99d2016-12-12 16:58:57 -0800566typedef struct termination_closure {
567 grpc_closure closure;
568 grpc_call *call;
Craig Tiller841a99d2016-12-12 16:58:57 -0800569 grpc_transport_stream_op op;
570} termination_closure;
571
572static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp,
573 grpc_error *error) {
574 termination_closure *tc = tcp;
575 GRPC_CALL_INTERNAL_UNREF(exec_ctx, tc->call, "termination");
Craig Tiller841a99d2016-12-12 16:58:57 -0800576 gpr_free(tc);
577}
578
579static void send_termination(grpc_exec_ctx *exec_ctx, void *tcp,
580 grpc_error *error) {
581 termination_closure *tc = tcp;
582 memset(&tc->op, 0, sizeof(tc->op));
Craig Tiller2dc32ea2017-01-31 15:32:34 -0800583 tc->op.cancel_error = GRPC_ERROR_REF(error);
Craig Tiller841a99d2016-12-12 16:58:57 -0800584 /* reuse closure to catch completion */
Craig Tiller2dc32ea2017-01-31 15:32:34 -0800585 tc->op.on_complete = grpc_closure_init(&tc->closure, done_termination, tc,
586 grpc_schedule_on_exec_ctx);
Craig Tiller841a99d2016-12-12 16:58:57 -0800587 execute_op(exec_ctx, tc->call, &tc->op);
588}
589
Craig Tiller2dc32ea2017-01-31 15:32:34 -0800590static void terminate_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c,
591 grpc_error *error) {
Craig Tiller841a99d2016-12-12 16:58:57 -0800592 termination_closure *tc = gpr_malloc(sizeof(*tc));
593 memset(tc, 0, sizeof(*tc));
Craig Tiller841a99d2016-12-12 16:58:57 -0800594 tc->call = c;
Craig Tiller2dc32ea2017-01-31 15:32:34 -0800595 GRPC_CALL_INTERNAL_REF(tc->call, "termination");
596 grpc_closure_sched(exec_ctx, grpc_closure_init(&tc->closure, send_termination,
597 tc, grpc_schedule_on_exec_ctx),
598 error);
Craig Tiller841a99d2016-12-12 16:58:57 -0800599}
600
Craig Tiller255edaa2016-12-13 09:04:55 -0800601static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c,
Craig Tiller2dc32ea2017-01-31 15:32:34 -0800602 status_source source, grpc_error *error) {
603 set_status_from_error(exec_ctx, c, source, GRPC_ERROR_REF(error));
Craig Tiller255edaa2016-12-13 09:04:55 -0800604 terminate_with_error(exec_ctx, c, error);
605}
606
Craig Tiller841a99d2016-12-12 16:58:57 -0800607static grpc_error *error_from_status(grpc_status_code status,
608 const char *description) {
609 return grpc_error_set_int(
610 grpc_error_set_str(GRPC_ERROR_CREATE(description),
611 GRPC_ERROR_STR_GRPC_MESSAGE, description),
612 GRPC_ERROR_INT_GRPC_STATUS, status);
613}
614
Craig Tiller2dc32ea2017-01-31 15:32:34 -0800615static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
616 status_source source, grpc_status_code status,
617 const char *description) {
618 cancel_with_error(exec_ctx, c, source,
619 error_from_status(status, description));
Craig Tiller841a99d2016-12-12 16:58:57 -0800620}
621
622/*******************************************************************************
623 * FINAL STATUS CODE MANIPULATION
624 */
625
Craig Tiller58b30cd2017-01-31 17:07:36 -0800626static bool get_final_status_from(
Craig Tiller4bab9462017-02-22 08:56:02 -0800627 grpc_call *call, grpc_error *error, bool allow_ok_status,
Craig Tiller58b30cd2017-01-31 17:07:36 -0800628 void (*set_value)(grpc_status_code code, void *user_data),
629 void *set_value_user_data, grpc_slice *details) {
Craig Tiller737b6252017-01-09 15:25:15 -0800630 grpc_status_code code;
631 const char *msg = NULL;
Craig Tiller4bab9462017-02-22 08:56:02 -0800632 grpc_error_get_status(error, call->send_deadline, &code, &msg, NULL);
Craig Tiller58b30cd2017-01-31 17:07:36 -0800633 if (code == GRPC_STATUS_OK && !allow_ok_status) {
634 return false;
635 }
Craig Tiller737b6252017-01-09 15:25:15 -0800636
637 set_value(code, set_value_user_data);
638 if (details != NULL) {
Craig Tiller8f24d6a2017-01-10 06:36:46 -0800639 *details =
640 msg == NULL ? grpc_empty_slice() : grpc_slice_from_copied_string(msg);
Craig Tiller737b6252017-01-09 15:25:15 -0800641 }
Craig Tiller58b30cd2017-01-31 17:07:36 -0800642 return true;
Craig Tiller737b6252017-01-09 15:25:15 -0800643}
644
Craig Tiller841a99d2016-12-12 16:58:57 -0800645static void get_final_status(grpc_call *call,
646 void (*set_value)(grpc_status_code code,
647 void *user_data),
648 void *set_value_user_data, grpc_slice *details) {
649 int i;
Craig Tiller4bab9462017-02-22 08:56:02 -0800650 received_status status[STATUS_SOURCE_COUNT];
651 for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
652 status[i] = unpack_received_status(gpr_atm_acq_load(&call->status[i]));
653 }
Craig Tiller58b30cd2017-01-31 17:07:36 -0800654 if (grpc_call_error_trace) {
655 gpr_log(GPR_DEBUG, "get_final_status %s", call->is_client ? "CLI" : "SVR");
656 for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
Craig Tiller4bab9462017-02-22 08:56:02 -0800657 if (status[i].is_set) {
658 gpr_log(GPR_DEBUG, " %d: %s", i, grpc_error_string(status[i].error));
Craig Tiller58b30cd2017-01-31 17:07:36 -0800659 }
Craig Tiller841a99d2016-12-12 16:58:57 -0800660 }
661 }
Craig Tiller58b30cd2017-01-31 17:07:36 -0800662 /* first search through ignoring "OK" statuses: if something went wrong,
663 * ensure we report it */
664 for (int allow_ok_status = 0; allow_ok_status < 2; allow_ok_status++) {
665 /* search for the best status we can present: ideally the error we use has a
666 clearly defined grpc-status, and we'll prefer that. */
667 for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
Craig Tiller4bab9462017-02-22 08:56:02 -0800668 if (status[i].is_set &&
669 grpc_error_has_clear_grpc_status(status[i].error)) {
670 if (get_final_status_from(call, status[i].error, allow_ok_status != 0,
Craig Tiller58b30cd2017-01-31 17:07:36 -0800671 set_value, set_value_user_data, details)) {
672 return;
673 }
674 }
675 }
676 /* If no clearly defined status exists, search for 'anything' */
677 for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
Craig Tiller4bab9462017-02-22 08:56:02 -0800678 if (status[i].is_set) {
679 if (get_final_status_from(call, status[i].error, allow_ok_status != 0,
Craig Tiller58b30cd2017-01-31 17:07:36 -0800680 set_value, set_value_user_data, details)) {
681 return;
682 }
683 }
Craig Tiller737b6252017-01-09 15:25:15 -0800684 }
685 }
686 /* If nothing exists, set some default */
Craig Tiller841a99d2016-12-12 16:58:57 -0800687 if (call->is_client) {
688 set_value(GRPC_STATUS_UNKNOWN, set_value_user_data);
Craig Tillerbe1b9a72016-06-24 13:22:11 -0700689 } else {
Craig Tiller841a99d2016-12-12 16:58:57 -0800690 set_value(GRPC_STATUS_OK, set_value_user_data);
Craig Tillerf0f70a82016-06-23 13:55:06 -0700691 }
Craig Tillerf0f70a82016-06-23 13:55:06 -0700692}
693
Craig Tillera59c16c2016-10-31 07:25:01 -0700694static void set_status_from_error(grpc_exec_ctx *exec_ctx, grpc_call *call,
695 status_source source, grpc_error *error) {
Craig Tiller4bab9462017-02-22 08:56:02 -0800696 if (!gpr_atm_rel_cas(&call->status[source],
697 pack_received_status((received_status){
698 .is_set = false, .error = GRPC_ERROR_NONE}),
699 pack_received_status((received_status){
700 .is_set = true, .error = error}))) {
Craig Tiller841a99d2016-12-12 16:58:57 -0800701 GRPC_ERROR_UNREF(error);
Craig Tiller841a99d2016-12-12 16:58:57 -0800702 }
Craig Tiller68752722015-01-29 14:59:54 -0800703}
704
Craig Tiller841a99d2016-12-12 16:58:57 -0800705/*******************************************************************************
706 * COMPRESSION
707 */
708
David Garcia Quintasac094472016-05-18 20:25:57 -0700709static void set_incoming_compression_algorithm(
710 grpc_call *call, grpc_compression_algorithm algo) {
David Garcia Quintas303d3082016-05-05 18:25:34 -0700711 GPR_ASSERT(algo < GRPC_COMPRESS_ALGORITHMS_COUNT);
David Garcia Quintas749367f2016-05-17 19:15:24 -0700712 call->incoming_compression_algorithm = algo;
David Garcia Quintasdb94b272015-06-15 18:37:01 -0700713}
714
David Garcia Quintas0c331882015-10-08 14:51:54 -0700715grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm(
David Garcia Quintas64824be2015-10-06 19:45:36 -0700716 grpc_call *call) {
717 grpc_compression_algorithm algorithm;
718 gpr_mu_lock(&call->mu);
David Garcia Quintas749367f2016-05-17 19:15:24 -0700719 algorithm = call->incoming_compression_algorithm;
David Garcia Quintas64824be2015-10-06 19:45:36 -0700720 gpr_mu_unlock(&call->mu);
721 return algorithm;
David Garcia Quintas7c0d9142015-07-23 04:58:20 -0700722}
723
David Garcia Quintasa301eaa2016-05-06 16:59:03 -0700724static grpc_compression_algorithm compression_algorithm_for_level_locked(
725 grpc_call *call, grpc_compression_level level) {
David Garcia Quintasac094472016-05-18 20:25:57 -0700726 return grpc_compression_algorithm_for_level(level,
727 call->encodings_accepted_by_peer);
David Garcia Quintasa301eaa2016-05-06 16:59:03 -0700728}
729
Craig Tiller7536af02015-12-22 13:49:30 -0800730uint32_t grpc_call_test_only_get_message_flags(grpc_call *call) {
731 uint32_t flags;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800732 gpr_mu_lock(&call->mu);
733 flags = call->test_only_last_message_flags;
734 gpr_mu_unlock(&call->mu);
735 return flags;
736}
737
Craig Tiller3ff27542015-10-09 15:39:44 -0700738static void destroy_encodings_accepted_by_peer(void *p) { return; }
739
Craig Tillera59c16c2016-10-31 07:25:01 -0700740static void set_encodings_accepted_by_peer(grpc_exec_ctx *exec_ctx,
Craig Tiller0160de92016-11-18 08:46:46 -0800741 grpc_call *call, grpc_mdelem mdel) {
David Garcia Quintasb8edf7e2015-07-08 20:18:57 -0700742 size_t i;
743 grpc_compression_algorithm algorithm;
Craig Tillerd41a4a72016-10-26 16:16:06 -0700744 grpc_slice_buffer accept_encoding_parts;
745 grpc_slice accept_encoding_slice;
Craig Tiller3ff27542015-10-09 15:39:44 -0700746 void *accepted_user_data;
David Garcia Quintasb8edf7e2015-07-08 20:18:57 -0700747
Craig Tiller3ff27542015-10-09 15:39:44 -0700748 accepted_user_data =
749 grpc_mdelem_get_user_data(mdel, destroy_encodings_accepted_by_peer);
750 if (accepted_user_data != NULL) {
751 call->encodings_accepted_by_peer =
Craig Tiller7536af02015-12-22 13:49:30 -0800752 (uint32_t)(((uintptr_t)accepted_user_data) - 1);
Craig Tiller3ff27542015-10-09 15:39:44 -0700753 return;
754 }
755
Craig Tiller0160de92016-11-18 08:46:46 -0800756 accept_encoding_slice = GRPC_MDVALUE(mdel);
Craig Tillerd41a4a72016-10-26 16:16:06 -0700757 grpc_slice_buffer_init(&accept_encoding_parts);
758 grpc_slice_split(accept_encoding_slice, ",", &accept_encoding_parts);
David Garcia Quintasb8edf7e2015-07-08 20:18:57 -0700759
David Garcia Quintase091af82015-07-15 21:37:02 -0700760 /* No need to zero call->encodings_accepted_by_peer: grpc_call_create already
761 * zeroes the whole grpc_call */
David Garcia Quintasb1866bd2015-07-08 22:37:01 -0700762 /* Always support no compression */
Craig Tillera82950e2015-09-22 12:33:20 -0700763 GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
764 for (i = 0; i < accept_encoding_parts.count; i++) {
Craig Tiller68208fe2016-11-14 14:35:02 -0800765 grpc_slice accept_encoding_entry_slice = accept_encoding_parts.slices[i];
766 if (grpc_compression_algorithm_parse(accept_encoding_entry_slice,
767 &algorithm)) {
Craig Tillera82950e2015-09-22 12:33:20 -0700768 GPR_BITSET(&call->encodings_accepted_by_peer, algorithm);
769 } else {
770 char *accept_encoding_entry_str =
Craig Tillerb4aa70e2016-12-09 09:40:11 -0800771 grpc_slice_to_c_string(accept_encoding_entry_slice);
Craig Tillera82950e2015-09-22 12:33:20 -0700772 gpr_log(GPR_ERROR,
773 "Invalid entry in accept encoding metadata: '%s'. Ignoring.",
774 accept_encoding_entry_str);
775 gpr_free(accept_encoding_entry_str);
David Garcia Quintasb8edf7e2015-07-08 20:18:57 -0700776 }
Craig Tillera82950e2015-09-22 12:33:20 -0700777 }
Craig Tiller3ff27542015-10-09 15:39:44 -0700778
Craig Tillera59c16c2016-10-31 07:25:01 -0700779 grpc_slice_buffer_destroy_internal(exec_ctx, &accept_encoding_parts);
Craig Tiller3ff27542015-10-09 15:39:44 -0700780
781 grpc_mdelem_set_user_data(
782 mdel, destroy_encodings_accepted_by_peer,
Craig Tiller7536af02015-12-22 13:49:30 -0800783 (void *)(((uintptr_t)call->encodings_accepted_by_peer) + 1));
David Garcia Quintasb8edf7e2015-07-08 20:18:57 -0700784}
785
Craig Tiller7536af02015-12-22 13:49:30 -0800786uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call *call) {
787 uint32_t encodings_accepted_by_peer;
David Garcia Quintas0c331882015-10-08 14:51:54 -0700788 gpr_mu_lock(&call->mu);
789 encodings_accepted_by_peer = call->encodings_accepted_by_peer;
790 gpr_mu_unlock(&call->mu);
791 return encodings_accepted_by_peer;
Craig Tiller68752722015-01-29 14:59:54 -0800792}
793
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800794static grpc_linked_mdelem *linked_from_md(grpc_metadata *md) {
795 return (grpc_linked_mdelem *)&md->internal_data;
Craig Tillerc12fee62015-02-03 11:55:50 -0800796}
797
David Garcia Quintasa301eaa2016-05-06 16:59:03 -0700798static grpc_metadata *get_md_elem(grpc_metadata *metadata,
799 grpc_metadata *additional_metadata, int i,
800 int count) {
801 grpc_metadata *res =
802 i < count ? &metadata[i] : &additional_metadata[i - count];
803 GPR_ASSERT(res);
804 return res;
805}
806
Craig Tillera59c16c2016-10-31 07:25:01 -0700807static int prepare_application_metadata(
808 grpc_exec_ctx *exec_ctx, grpc_call *call, int count,
809 grpc_metadata *metadata, int is_trailing, int prepend_extra_metadata,
810 grpc_metadata *additional_metadata, int additional_metadata_count) {
David Garcia Quintasa301eaa2016-05-06 16:59:03 -0700811 int total_count = count + additional_metadata_count;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800812 int i;
813 grpc_metadata_batch *batch =
814 &call->metadata_batch[0 /* is_receiving */][is_trailing];
David Garcia Quintasa301eaa2016-05-06 16:59:03 -0700815 for (i = 0; i < total_count; i++) {
816 const grpc_metadata *md =
817 get_md_elem(metadata, additional_metadata, i, count);
Craig Tillerb42445c2016-04-22 13:11:44 -0700818 grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data;
819 GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data));
Craig Tillerf2b5b7e2017-01-10 08:28:59 -0800820 if (!GRPC_LOG_IF_ERROR("validate_metadata",
821 grpc_validate_header_key_is_legal(md->key))) {
Craig Tillerb42445c2016-04-22 13:11:44 -0700822 break;
Craig Tillerdf2d9222016-11-18 16:38:57 -0800823 } else if (!grpc_is_binary_header(md->key) &&
Craig Tillerf2b5b7e2017-01-10 08:28:59 -0800824 !GRPC_LOG_IF_ERROR(
825 "validate_metadata",
826 grpc_validate_header_nonbin_value_is_legal(md->value))) {
Craig Tillerb42445c2016-04-22 13:11:44 -0700827 break;
828 }
Craig Tiller1282a672016-11-18 14:57:53 -0800829 l->md = grpc_mdelem_from_grpc_metadata(exec_ctx, (grpc_metadata *)md);
Craig Tillerb42445c2016-04-22 13:11:44 -0700830 }
David Garcia Quintasa301eaa2016-05-06 16:59:03 -0700831 if (i != total_count) {
Craig Tiller5ae3ffb2016-11-18 14:58:32 -0800832 for (int j = 0; j < i; j++) {
David Garcia Quintasa301eaa2016-05-06 16:59:03 -0700833 const grpc_metadata *md =
834 get_md_elem(metadata, additional_metadata, j, count);
Craig Tillerb42445c2016-04-22 13:11:44 -0700835 grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data;
Craig Tillera59c16c2016-10-31 07:25:01 -0700836 GRPC_MDELEM_UNREF(exec_ctx, l->md);
Craig Tillerb42445c2016-04-22 13:11:44 -0700837 }
838 return 0;
839 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800840 if (prepend_extra_metadata) {
841 if (call->send_extra_metadata_count == 0) {
842 prepend_extra_metadata = 0;
Craig Tillera82950e2015-09-22 12:33:20 -0700843 } else {
Craig Tiller09608182016-11-22 15:43:56 -0800844 for (i = 0; i < call->send_extra_metadata_count; i++) {
845 GRPC_LOG_IF_ERROR("prepare_application_metadata",
846 grpc_metadata_batch_link_tail(
Craig Tiller9277aa72017-01-11 14:15:38 -0800847 exec_ctx, batch, &call->send_extra_metadata[i]));
Craig Tillera82950e2015-09-22 12:33:20 -0700848 }
Craig Tiller629b0ed2015-04-22 11:14:26 -0700849 }
Craig Tillera82950e2015-09-22 12:33:20 -0700850 }
Craig Tiller09608182016-11-22 15:43:56 -0800851 for (i = 0; i < total_count; i++) {
David Garcia Quintasa301eaa2016-05-06 16:59:03 -0700852 grpc_metadata *md = get_md_elem(metadata, additional_metadata, i, count);
Craig Tiller9277aa72017-01-11 14:15:38 -0800853 GRPC_LOG_IF_ERROR(
854 "prepare_application_metadata",
855 grpc_metadata_batch_link_tail(exec_ctx, batch, linked_from_md(md)));
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800856 }
Craig Tiller09608182016-11-22 15:43:56 -0800857 call->send_extra_metadata_count = 0;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800858
Craig Tillerb96d0012015-05-06 15:33:23 -0700859 return 1;
860}
861
Craig Tiller566316f2015-02-02 15:25:32 -0800862/* we offset status by a small amount when storing it into transport metadata
863 as metadata cannot store a 0 value (which is used as OK for grpc_status_codes
864 */
865#define STATUS_OFFSET 1
Craig Tillera82950e2015-09-22 12:33:20 -0700866static void destroy_status(void *ignored) {}
Craig Tiller566316f2015-02-02 15:25:32 -0800867
Craig Tiller0160de92016-11-18 08:46:46 -0800868static uint32_t decode_status(grpc_mdelem md) {
Craig Tiller7536af02015-12-22 13:49:30 -0800869 uint32_t status;
Craig Tillerebdef9d2015-11-19 17:09:49 -0800870 void *user_data;
Craig Tiller0160de92016-11-18 08:46:46 -0800871 if (grpc_mdelem_eq(md, GRPC_MDELEM_GRPC_STATUS_0)) return 0;
872 if (grpc_mdelem_eq(md, GRPC_MDELEM_GRPC_STATUS_1)) return 1;
873 if (grpc_mdelem_eq(md, GRPC_MDELEM_GRPC_STATUS_2)) return 2;
Craig Tillerebdef9d2015-11-19 17:09:49 -0800874 user_data = grpc_mdelem_get_user_data(md, destroy_status);
875 if (user_data != NULL) {
Craig Tiller7536af02015-12-22 13:49:30 -0800876 status = ((uint32_t)(intptr_t)user_data) - STATUS_OFFSET;
Craig Tillera82950e2015-09-22 12:33:20 -0700877 } else {
Craig Tiller0160de92016-11-18 08:46:46 -0800878 if (!grpc_parse_slice_to_uint32(GRPC_MDVALUE(md), &status)) {
Craig Tillera82950e2015-09-22 12:33:20 -0700879 status = GRPC_STATUS_UNKNOWN; /* could not parse status code */
Craig Tiller566316f2015-02-02 15:25:32 -0800880 }
Craig Tillera82950e2015-09-22 12:33:20 -0700881 grpc_mdelem_set_user_data(md, destroy_status,
Craig Tiller7536af02015-12-22 13:49:30 -0800882 (void *)(intptr_t)(status + STATUS_OFFSET));
Craig Tillera82950e2015-09-22 12:33:20 -0700883 }
Craig Tiller566316f2015-02-02 15:25:32 -0800884 return status;
885}
886
Craig Tiller0160de92016-11-18 08:46:46 -0800887static grpc_compression_algorithm decode_compression(grpc_mdelem md) {
Craig Tillerebdef9d2015-11-19 17:09:49 -0800888 grpc_compression_algorithm algorithm =
Craig Tiller0160de92016-11-18 08:46:46 -0800889 grpc_compression_algorithm_from_slice(GRPC_MDVALUE(md));
Craig Tillerebdef9d2015-11-19 17:09:49 -0800890 if (algorithm == GRPC_COMPRESS_ALGORITHMS_COUNT) {
Craig Tillerb4aa70e2016-12-09 09:40:11 -0800891 char *md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md));
David Garcia Quintas303d3082016-05-05 18:25:34 -0700892 gpr_log(GPR_ERROR,
893 "Invalid incoming compression algorithm: '%s'. Interpreting "
894 "incoming data as uncompressed.",
895 md_c_str);
Craig Tiller68208fe2016-11-14 14:35:02 -0800896 gpr_free(md_c_str);
David Garcia Quintas303d3082016-05-05 18:25:34 -0700897 return GRPC_COMPRESS_NONE;
Craig Tillera82950e2015-09-22 12:33:20 -0700898 }
David Garcia Quintasfc0fa332015-06-25 18:11:07 -0700899 return algorithm;
David Garcia Quintasdb94b272015-06-15 18:37:01 -0700900}
901
Craig Tillera7d37a32016-11-22 14:37:16 -0800902static void recv_common_filter(grpc_exec_ctx *exec_ctx, grpc_call *call,
903 grpc_metadata_batch *b) {
Craig Tillerde2508b2017-01-06 15:23:23 -0800904 if (b->idx.named.grpc_status != NULL) {
905 uint32_t status_code = decode_status(b->idx.named.grpc_status->md);
906 grpc_error *error =
907 status_code == GRPC_STATUS_OK
908 ? GRPC_ERROR_NONE
909 : grpc_error_set_int(GRPC_ERROR_CREATE("Error received from peer"),
Craig Tillercae37f32017-01-09 15:50:03 -0800910 GRPC_ERROR_INT_GRPC_STATUS,
911 (intptr_t)status_code);
Craig Tillera7d37a32016-11-22 14:37:16 -0800912
Craig Tillerde2508b2017-01-06 15:23:23 -0800913 if (b->idx.named.grpc_message != NULL) {
914 char *msg =
915 grpc_slice_to_c_string(GRPC_MDVALUE(b->idx.named.grpc_message->md));
916 error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE, msg);
917 gpr_free(msg);
918 grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_message);
Craig Tiller8c58a482017-02-07 14:52:59 -0800919 } else if (error != GRPC_ERROR_NONE) {
Craig Tiller8f24d6a2017-01-10 06:36:46 -0800920 error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE, "");
Craig Tillerde2508b2017-01-06 15:23:23 -0800921 }
922
923 set_status_from_error(exec_ctx, call, STATUS_FROM_WIRE, error);
924 grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_status);
Craig Tillera7d37a32016-11-22 14:37:16 -0800925 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800926}
927
Craig Tillera7d37a32016-11-22 14:37:16 -0800928static void publish_app_metadata(grpc_call *call, grpc_metadata_batch *b,
929 int is_trailing) {
Craig Tiller09608182016-11-22 15:43:56 -0800930 if (b->list.count == 0) return;
Craig Tillera7d37a32016-11-22 14:37:16 -0800931 GPR_TIMER_BEGIN("publish_app_metadata", 0);
Craig Tiller566316f2015-02-02 15:25:32 -0800932 grpc_metadata_array *dest;
933 grpc_metadata *mdusr;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800934 dest = call->buffered_metadata[is_trailing];
Craig Tillerb0f3bca2016-11-22 14:54:10 -0800935 if (dest->count + b->list.count > dest->capacity) {
936 dest->capacity =
937 GPR_MAX(dest->capacity + b->list.count, dest->capacity * 3 / 2);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800938 dest->metadata =
939 gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity);
940 }
Craig Tillera7d37a32016-11-22 14:37:16 -0800941 for (grpc_linked_mdelem *l = b->list.head; l != NULL; l = l->next) {
942 mdusr = &dest->metadata[dest->count++];
Craig Tillercf0a2022016-11-23 11:36:21 -0800943 /* we pass back borrowed slices that are valid whilst the call is valid */
944 mdusr->key = GRPC_MDKEY(l->md);
945 mdusr->value = GRPC_MDVALUE(l->md);
Craig Tillera7d37a32016-11-22 14:37:16 -0800946 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800947 GPR_TIMER_END("publish_app_metadata", 0);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800948}
Craig Tiller566316f2015-02-02 15:25:32 -0800949
Craig Tillera7d37a32016-11-22 14:37:16 -0800950static void recv_initial_filter(grpc_exec_ctx *exec_ctx, grpc_call *call,
951 grpc_metadata_batch *b) {
952 recv_common_filter(exec_ctx, call, b);
953
954 if (b->idx.named.grpc_encoding != NULL) {
David Garcia Quintas749367f2016-05-17 19:15:24 -0700955 GPR_TIMER_BEGIN("incoming_compression_algorithm", 0);
Craig Tillera7d37a32016-11-22 14:37:16 -0800956 set_incoming_compression_algorithm(
957 call, decode_compression(b->idx.named.grpc_encoding->md));
David Garcia Quintas749367f2016-05-17 19:15:24 -0700958 GPR_TIMER_END("incoming_compression_algorithm", 0);
Craig Tillerde7b4672016-11-23 11:13:46 -0800959 grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_encoding);
Craig Tillera82950e2015-09-22 12:33:20 -0700960 }
Craig Tillera7d37a32016-11-22 14:37:16 -0800961
962 if (b->idx.named.grpc_accept_encoding != NULL) {
963 GPR_TIMER_BEGIN("encodings_accepted_by_peer", 0);
964 set_encodings_accepted_by_peer(exec_ctx, call,
965 b->idx.named.grpc_accept_encoding->md);
Craig Tillerde7b4672016-11-23 11:13:46 -0800966 grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_accept_encoding);
Craig Tillera7d37a32016-11-22 14:37:16 -0800967 GPR_TIMER_END("encodings_accepted_by_peer", 0);
968 }
969
970 publish_app_metadata(call, b, false);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800971}
Craig Tiller6902ad22015-04-16 08:01:49 -0700972
Craig Tillera7d37a32016-11-22 14:37:16 -0800973static void recv_trailing_filter(grpc_exec_ctx *exec_ctx, void *args,
974 grpc_metadata_batch *b) {
Craig Tillerc5866662016-11-16 15:25:00 -0800975 grpc_call *call = args;
Craig Tillera7d37a32016-11-22 14:37:16 -0800976 recv_common_filter(exec_ctx, call, b);
977 publish_app_metadata(call, b, true);
Craig Tiller629b0ed2015-04-22 11:14:26 -0700978}
Craig Tiller8b282cb2015-04-17 14:57:44 -0700979
Craig Tillera82950e2015-09-22 12:33:20 -0700980grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) {
981 return CALL_STACK_FROM_CALL(call);
Craig Tiller566316f2015-02-02 15:25:32 -0800982}
983
Craig Tiller255edaa2016-12-13 09:04:55 -0800984/*******************************************************************************
Craig Tillerfb189f82015-02-03 12:07:07 -0800985 * BATCH API IMPLEMENTATION
986 */
987
Craig Tillera82950e2015-09-22 12:33:20 -0700988static void set_status_value_directly(grpc_status_code status, void *dest) {
989 *(grpc_status_code *)dest = status;
Craig Tillerfb189f82015-02-03 12:07:07 -0800990}
991
Craig Tillera82950e2015-09-22 12:33:20 -0700992static void set_cancelled_value(grpc_status_code status, void *dest) {
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800993 *(int *)dest = (status != GRPC_STATUS_OK);
Craig Tiller166e2502015-02-03 20:14:41 -0800994}
Craig Tillerfb189f82015-02-03 12:07:07 -0800995
Craig Tillerc6549762016-03-09 17:10:43 -0800996static bool are_write_flags_valid(uint32_t flags) {
David Garcia Quintas1d5aca52015-06-14 14:42:04 -0700997 /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
Craig Tiller7536af02015-12-22 13:49:30 -0800998 const uint32_t allowed_write_positions =
Craig Tillera82950e2015-09-22 12:33:20 -0700999 (GRPC_WRITE_USED_MASK | GRPC_WRITE_INTERNAL_USED_MASK);
Craig Tiller7536af02015-12-22 13:49:30 -08001000 const uint32_t invalid_positions = ~allowed_write_positions;
David Garcia Quintas1d5aca52015-06-14 14:42:04 -07001001 return !(flags & invalid_positions);
1002}
1003
Craig Tillerc6549762016-03-09 17:10:43 -08001004static bool are_initial_metadata_flags_valid(uint32_t flags, bool is_client) {
1005 /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
1006 uint32_t invalid_positions = ~GRPC_INITIAL_METADATA_USED_MASK;
1007 if (!is_client) {
1008 invalid_positions |= GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
1009 }
1010 return !(flags & invalid_positions);
1011}
1012
Craig Tiller2a11ad12017-02-08 17:09:02 -08001013static int batch_slot_for_op(grpc_op_type type) {
1014 switch (type) {
1015 case GRPC_OP_SEND_INITIAL_METADATA:
1016 return 0;
1017 case GRPC_OP_SEND_MESSAGE:
1018 return 1;
1019 case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
1020 case GRPC_OP_SEND_STATUS_FROM_SERVER:
1021 return 2;
1022 case GRPC_OP_RECV_INITIAL_METADATA:
1023 return 3;
1024 case GRPC_OP_RECV_MESSAGE:
1025 return 4;
1026 case GRPC_OP_RECV_CLOSE_ON_SERVER:
1027 case GRPC_OP_RECV_STATUS_ON_CLIENT:
1028 return 5;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001029 }
Craig Tillerc869da02017-02-08 17:11:17 -08001030 GPR_UNREACHABLE_CODE(return 123456789);
Craig Tiller2a11ad12017-02-08 17:09:02 -08001031}
Craig Tiller89d33792017-02-08 16:39:16 -08001032
1033static batch_control *allocate_batch_control(grpc_call *call,
1034 const grpc_op *ops,
1035 size_t num_ops) {
1036 int slot = batch_slot_for_op(ops[0].op);
1037 for (size_t i = 1; i < num_ops; i++) {
1038 int op_slot = batch_slot_for_op(ops[i].op);
1039 slot = GPR_MIN(slot, op_slot);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001040 }
Craig Tiller5e5ef302017-02-09 08:46:49 -08001041 batch_control *bctl = &call->active_batches[slot];
1042 if (bctl->call != NULL) {
1043 return NULL;
1044 }
1045 memset(bctl, 0, sizeof(*bctl));
1046 bctl->call = call;
1047 return bctl;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001048}
1049
1050static void finish_batch_completion(grpc_exec_ctx *exec_ctx, void *user_data,
1051 grpc_cq_completion *storage) {
1052 batch_control *bctl = user_data;
1053 grpc_call *call = bctl->call;
Craig Tiller5e5ef302017-02-09 08:46:49 -08001054 bctl->call = NULL;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001055 GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion");
1056}
1057
Craig Tiller94903892016-10-11 15:43:35 -07001058static grpc_error *consolidate_batch_errors(batch_control *bctl) {
1059 size_t n = (size_t)gpr_atm_no_barrier_load(&bctl->num_errors);
1060 if (n == 0) {
1061 return GRPC_ERROR_NONE;
1062 } else if (n == 1) {
Craig Tillera78da602017-01-27 08:16:23 -08001063 /* Skip creating a composite error in the case that only one error was
1064 logged */
Craig Tillerad980e32017-01-23 07:46:25 -08001065 grpc_error *e = bctl->errors[0];
1066 bctl->errors[0] = NULL;
1067 return e;
Craig Tiller94903892016-10-11 15:43:35 -07001068 } else {
Craig Tiller1c4775c2017-01-06 16:07:45 -08001069 grpc_error *error =
1070 GRPC_ERROR_CREATE_REFERENCING("Call batch failed", bctl->errors, n);
1071 for (size_t i = 0; i < n; i++) {
1072 GRPC_ERROR_UNREF(bctl->errors[i]);
Craig Tillerad980e32017-01-23 07:46:25 -08001073 bctl->errors[i] = NULL;
Craig Tiller1c4775c2017-01-06 16:07:45 -08001074 }
1075 return error;
Craig Tiller94903892016-10-11 15:43:35 -07001076 }
1077}
1078
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001079static void post_batch_completion(grpc_exec_ctx *exec_ctx,
1080 batch_control *bctl) {
Craig Tiller94903892016-10-11 15:43:35 -07001081 grpc_call *child_call;
1082 grpc_call *next_child_call;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001083 grpc_call *call = bctl->call;
Craig Tiller94903892016-10-11 15:43:35 -07001084 grpc_error *error = consolidate_batch_errors(bctl);
1085
1086 gpr_mu_lock(&call->mu);
1087
Craig Tiller94903892016-10-11 15:43:35 -07001088 if (bctl->send_initial_metadata) {
1089 grpc_metadata_batch_destroy(
1090 exec_ctx,
1091 &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
1092 }
Craig Tillerf927ad12017-01-06 15:27:31 -08001093 if (bctl->send_message) {
1094 call->sending_message = false;
1095 }
Craig Tiller94903892016-10-11 15:43:35 -07001096 if (bctl->send_final_op) {
1097 grpc_metadata_batch_destroy(
1098 exec_ctx,
1099 &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]);
1100 }
Craig Tiller02b87cd2016-09-02 09:50:08 -07001101 if (bctl->recv_final_op) {
Craig Tiller94903892016-10-11 15:43:35 -07001102 grpc_metadata_batch *md =
1103 &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
1104 recv_trailing_filter(exec_ctx, call, md);
1105
1106 call->received_final_op = true;
1107 /* propagate cancellation to any interested children */
1108 child_call = call->first_child;
1109 if (child_call != NULL) {
1110 do {
1111 next_child_call = child_call->sibling_next;
1112 if (child_call->cancellation_is_inherited) {
1113 GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel");
1114 grpc_call_cancel(child_call, NULL);
1115 GRPC_CALL_INTERNAL_UNREF(exec_ctx, child_call, "propagate_cancel");
1116 }
1117 child_call = next_child_call;
1118 } while (child_call != call->first_child);
1119 }
1120
1121 if (call->is_client) {
1122 get_final_status(call, set_status_value_directly,
Craig Tiller841a99d2016-12-12 16:58:57 -08001123 call->final_op.client.status,
1124 call->final_op.client.status_details);
Craig Tiller94903892016-10-11 15:43:35 -07001125 } else {
1126 get_final_status(call, set_cancelled_value,
Craig Tiller841a99d2016-12-12 16:58:57 -08001127 call->final_op.server.cancelled, NULL);
Craig Tiller94903892016-10-11 15:43:35 -07001128 }
1129
Craig Tiller02b87cd2016-09-02 09:50:08 -07001130 GRPC_ERROR_UNREF(error);
1131 error = GRPC_ERROR_NONE;
1132 }
Craig Tiller94903892016-10-11 15:43:35 -07001133 gpr_mu_unlock(&call->mu);
1134
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001135 if (bctl->is_notify_tag_closure) {
Craig Tillerb08fa492016-05-10 14:56:05 -07001136 /* unrefs bctl->error */
Craig Tiller2db5bda2017-02-09 10:30:55 -08001137 bctl->call = NULL;
Craig Tiller02b87cd2016-09-02 09:50:08 -07001138 grpc_closure_run(exec_ctx, bctl->notify_tag, error);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001139 GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion");
1140 } else {
Craig Tillerb08fa492016-05-10 14:56:05 -07001141 /* unrefs bctl->error */
Craig Tiller02b87cd2016-09-02 09:50:08 -07001142 grpc_cq_end_op(exec_ctx, bctl->call->cq, bctl->notify_tag, error,
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001143 finish_batch_completion, bctl, &bctl->cq_completion);
1144 }
1145}
1146
Craig Tillerd04dc4d2017-01-09 14:40:28 -08001147static void finish_batch_step(grpc_exec_ctx *exec_ctx, batch_control *bctl) {
Craig Tiller065b1392017-01-09 14:05:07 -08001148 if (gpr_unref(&bctl->steps_to_complete)) {
1149 post_batch_completion(exec_ctx, bctl);
1150 }
1151}
1152
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001153static void continue_receiving_slices(grpc_exec_ctx *exec_ctx,
1154 batch_control *bctl) {
1155 grpc_call *call = bctl->call;
1156 for (;;) {
1157 size_t remaining = call->receiving_stream->length -
1158 (*call->receiving_buffer)->data.raw.slice_buffer.length;
1159 if (remaining == 0) {
1160 call->receiving_message = 0;
Craig Tiller3b66ab92015-12-09 19:42:22 -08001161 grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001162 call->receiving_stream = NULL;
Craig Tillerd04dc4d2017-01-09 14:40:28 -08001163 finish_batch_step(exec_ctx, bctl);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001164 return;
1165 }
1166 if (grpc_byte_stream_next(exec_ctx, call->receiving_stream,
1167 &call->receiving_slice, remaining,
1168 &call->receiving_slice_ready)) {
Craig Tillerd41a4a72016-10-26 16:16:06 -07001169 grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
Craig Tiller0f310802016-10-26 16:25:56 -07001170 call->receiving_slice);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001171 } else {
1172 return;
1173 }
1174 }
1175}
1176
1177static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
Craig Tillerc027e772016-05-03 16:27:00 -07001178 grpc_error *error) {
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001179 batch_control *bctl = bctlp;
1180 grpc_call *call = bctl->call;
1181
Craig Tillerc027e772016-05-03 16:27:00 -07001182 if (error == GRPC_ERROR_NONE) {
Craig Tillerd41a4a72016-10-26 16:16:06 -07001183 grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
Craig Tiller0f310802016-10-26 16:25:56 -07001184 call->receiving_slice);
Craig Tiller38edec62015-12-14 15:01:29 -08001185 continue_receiving_slices(exec_ctx, bctl);
1186 } else {
Craig Tillera286b042016-06-13 15:20:39 +00001187 if (grpc_trace_operation_failures) {
1188 GRPC_LOG_IF_ERROR("receiving_slice_ready", GRPC_ERROR_REF(error));
1189 }
Craig Tillere1b8c2b2015-12-16 19:27:52 -08001190 grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
Craig Tiller38edec62015-12-14 15:01:29 -08001191 call->receiving_stream = NULL;
1192 grpc_byte_buffer_destroy(*call->receiving_buffer);
1193 *call->receiving_buffer = NULL;
Craig Tillerd04dc4d2017-01-09 14:40:28 -08001194 finish_batch_step(exec_ctx, bctl);
Craig Tiller38edec62015-12-14 15:01:29 -08001195 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001196}
1197
Mark D. Roth274c8ed2016-10-04 09:21:42 -07001198static void process_data_after_md(grpc_exec_ctx *exec_ctx,
1199 batch_control *bctl) {
Craig Tillera44cbfc2016-02-03 16:02:49 -08001200 grpc_call *call = bctl->call;
1201 if (call->receiving_stream == NULL) {
1202 *call->receiving_buffer = NULL;
1203 call->receiving_message = 0;
Craig Tillerd04dc4d2017-01-09 14:40:28 -08001204 finish_batch_step(exec_ctx, bctl);
Craig Tillera44cbfc2016-02-03 16:02:49 -08001205 } else {
1206 call->test_only_last_message_flags = call->receiving_stream->flags;
1207 if ((call->receiving_stream->flags & GRPC_WRITE_INTERNAL_COMPRESS) &&
David Garcia Quintas749367f2016-05-17 19:15:24 -07001208 (call->incoming_compression_algorithm > GRPC_COMPRESS_NONE)) {
Craig Tillera44cbfc2016-02-03 16:02:49 -08001209 *call->receiving_buffer = grpc_raw_compressed_byte_buffer_create(
David Garcia Quintas749367f2016-05-17 19:15:24 -07001210 NULL, 0, call->incoming_compression_algorithm);
Craig Tillera44cbfc2016-02-03 16:02:49 -08001211 } else {
1212 *call->receiving_buffer = grpc_raw_byte_buffer_create(NULL, 0);
1213 }
Craig Tiller91031da2016-12-28 15:44:25 -08001214 grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready, bctl,
1215 grpc_schedule_on_exec_ctx);
Craig Tillera44cbfc2016-02-03 16:02:49 -08001216 continue_receiving_slices(exec_ctx, bctl);
Craig Tillera44cbfc2016-02-03 16:02:49 -08001217 }
1218}
1219
1220static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
Craig Tillerc027e772016-05-03 16:27:00 -07001221 grpc_error *error) {
Craig Tillera44cbfc2016-02-03 16:02:49 -08001222 batch_control *bctl = bctlp;
1223 grpc_call *call = bctl->call;
Craig Tiller065b1392017-01-09 14:05:07 -08001224 gpr_mu_lock(&bctl->call->mu);
Mark D. Roth274c8ed2016-10-04 09:21:42 -07001225 if (error != GRPC_ERROR_NONE) {
Craig Tiller58b30cd2017-01-31 17:07:36 -08001226 cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE,
1227 GRPC_ERROR_REF(error));
Mark D. Roth274c8ed2016-10-04 09:21:42 -07001228 }
Craig Tiller065b1392017-01-09 14:05:07 -08001229 if (call->has_initial_md_been_received || error != GRPC_ERROR_NONE ||
Craig Tiller52cf8712016-04-23 22:54:21 -07001230 call->receiving_stream == NULL) {
Craig Tillera44cbfc2016-02-03 16:02:49 -08001231 gpr_mu_unlock(&bctl->call->mu);
Mark D. Roth274c8ed2016-10-04 09:21:42 -07001232 process_data_after_md(exec_ctx, bctlp);
Craig Tillera44cbfc2016-02-03 16:02:49 -08001233 } else {
Craig Tiller8a677802016-04-22 15:07:53 -07001234 call->saved_receiving_stream_ready_bctlp = bctlp;
Craig Tillera44cbfc2016-02-03 16:02:49 -08001235 gpr_mu_unlock(&bctl->call->mu);
1236 }
1237}
1238
David Garcia Quintas3e71f772016-05-18 10:14:32 -07001239static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx,
1240 batch_control *bctl) {
1241 grpc_call *call = bctl->call;
David Garcia Quintasac094472016-05-18 20:25:57 -07001242 /* validate call->incoming_compression_algorithm */
1243 if (call->incoming_compression_algorithm != GRPC_COMPRESS_NONE) {
1244 const grpc_compression_algorithm algo =
1245 call->incoming_compression_algorithm;
David Garcia Quintas3e71f772016-05-18 10:14:32 -07001246 char *error_msg = NULL;
1247 const grpc_compression_options compression_options =
David Garcia Quintasac094472016-05-18 20:25:57 -07001248 grpc_channel_compression_options(call->channel);
David Garcia Quintas3e71f772016-05-18 10:14:32 -07001249 /* check if algorithm is known */
1250 if (algo >= GRPC_COMPRESS_ALGORITHMS_COUNT) {
1251 gpr_asprintf(&error_msg, "Invalid compression algorithm value '%d'.",
1252 algo);
Yuchen Zeng64c0e8d2016-06-10 11:19:51 -07001253 gpr_log(GPR_ERROR, "%s", error_msg);
Craig Tiller2dc32ea2017-01-31 15:32:34 -08001254 cancel_with_status(exec_ctx, call, STATUS_FROM_SURFACE,
1255 GRPC_STATUS_UNIMPLEMENTED, error_msg);
David Garcia Quintas3e71f772016-05-18 10:14:32 -07001256 } else if (grpc_compression_options_is_algorithm_enabled(
1257 &compression_options, algo) == 0) {
1258 /* check if algorithm is supported by current channel config */
David Garcia Quintas1ff168a2016-06-30 10:25:04 -07001259 char *algo_name = NULL;
David Garcia Quintas3e71f772016-05-18 10:14:32 -07001260 grpc_compression_algorithm_name(algo, &algo_name);
1261 gpr_asprintf(&error_msg, "Compression algorithm '%s' is disabled.",
1262 algo_name);
Yuchen Zeng64c0e8d2016-06-10 11:19:51 -07001263 gpr_log(GPR_ERROR, "%s", error_msg);
Craig Tiller2dc32ea2017-01-31 15:32:34 -08001264 cancel_with_status(exec_ctx, call, STATUS_FROM_SURFACE,
1265 GRPC_STATUS_UNIMPLEMENTED, error_msg);
David Garcia Quintas3e71f772016-05-18 10:14:32 -07001266 } else {
David Garcia Quintasac094472016-05-18 20:25:57 -07001267 call->incoming_compression_algorithm = algo;
David Garcia Quintas3e71f772016-05-18 10:14:32 -07001268 }
1269 gpr_free(error_msg);
1270 }
David Garcia Quintasf1945f22016-05-18 10:53:14 -07001271
1272 /* make sure the received grpc-encoding is amongst the ones listed in
1273 * grpc-accept-encoding */
1274 GPR_ASSERT(call->encodings_accepted_by_peer != 0);
1275 if (!GPR_BITGET(call->encodings_accepted_by_peer,
David Garcia Quintasac094472016-05-18 20:25:57 -07001276 call->incoming_compression_algorithm)) {
David Garcia Quintasf1945f22016-05-18 10:53:14 -07001277 extern int grpc_compression_trace;
1278 if (grpc_compression_trace) {
David Garcia Quintas1ff168a2016-06-30 10:25:04 -07001279 char *algo_name = NULL;
David Garcia Quintasac094472016-05-18 20:25:57 -07001280 grpc_compression_algorithm_name(call->incoming_compression_algorithm,
1281 &algo_name);
David Garcia Quintasf1945f22016-05-18 10:53:14 -07001282 gpr_log(GPR_ERROR,
1283 "Compression algorithm (grpc-encoding = '%s') not present in "
1284 "the bitset of accepted encodings (grpc-accept-encodings: "
1285 "'0x%x')",
1286 algo_name, call->encodings_accepted_by_peer);
1287 }
1288 }
David Garcia Quintas3e71f772016-05-18 10:14:32 -07001289}
1290
Craig Tiller3ba16e42016-12-08 16:46:18 -08001291static void add_batch_error(grpc_exec_ctx *exec_ctx, batch_control *bctl,
1292 grpc_error *error) {
Craig Tiller452422e2016-09-01 15:54:56 -07001293 if (error == GRPC_ERROR_NONE) return;
Craig Tiller94903892016-10-11 15:43:35 -07001294 int idx = (int)gpr_atm_no_barrier_fetch_add(&bctl->num_errors, 1);
Craig Tiller2dc32ea2017-01-31 15:32:34 -08001295 if (idx == 0) {
1296 cancel_with_error(exec_ctx, bctl->call, STATUS_FROM_CORE,
1297 GRPC_ERROR_REF(error));
1298 }
Craig Tiller94903892016-10-11 15:43:35 -07001299 bctl->errors[idx] = error;
Craig Tiller452422e2016-09-01 15:54:56 -07001300}
1301
Craig Tillera44cbfc2016-02-03 16:02:49 -08001302static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
Craig Tillerc027e772016-05-03 16:27:00 -07001303 void *bctlp, grpc_error *error) {
Craig Tillera44cbfc2016-02-03 16:02:49 -08001304 batch_control *bctl = bctlp;
1305 grpc_call *call = bctl->call;
1306
1307 gpr_mu_lock(&call->mu);
1308
Craig Tiller3ba16e42016-12-08 16:46:18 -08001309 add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error));
Craig Tiller452422e2016-09-01 15:54:56 -07001310 if (error == GRPC_ERROR_NONE) {
Craig Tillerc48ca712016-04-04 13:42:04 -07001311 grpc_metadata_batch *md =
1312 &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
Craig Tillera7d37a32016-11-22 14:37:16 -08001313 recv_initial_filter(exec_ctx, call, md);
Craig Tillera44cbfc2016-02-03 16:02:49 -08001314
Craig Tillera7d37a32016-11-22 14:37:16 -08001315 /* TODO(ctiller): this could be moved into recv_initial_filter now */
David Garcia Quintas3e71f772016-05-18 10:14:32 -07001316 GPR_TIMER_BEGIN("validate_filtered_metadata", 0);
1317 validate_filtered_metadata(exec_ctx, bctl);
1318 GPR_TIMER_END("validate_filtered_metadata", 0);
David Garcia Quintas46123372016-05-09 15:28:42 -07001319
Craig Tillerc48ca712016-04-04 13:42:04 -07001320 if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) !=
1321 0 &&
1322 !call->is_client) {
Mark D. Rothf28763c2016-09-14 15:18:40 -07001323 call->send_deadline =
1324 gpr_convert_clock_type(md->deadline, GPR_CLOCK_MONOTONIC);
Craig Tillerc48ca712016-04-04 13:42:04 -07001325 }
Craig Tillera44cbfc2016-02-03 16:02:49 -08001326 }
1327
Craig Tillerc48ca712016-04-04 13:42:04 -07001328 call->has_initial_md_been_received = true;
Craig Tiller8a677802016-04-22 15:07:53 -07001329 if (call->saved_receiving_stream_ready_bctlp != NULL) {
Craig Tillera44cbfc2016-02-03 16:02:49 -08001330 grpc_closure *saved_rsr_closure = grpc_closure_create(
Craig Tiller91031da2016-12-28 15:44:25 -08001331 receiving_stream_ready, call->saved_receiving_stream_ready_bctlp,
1332 grpc_schedule_on_exec_ctx);
Craig Tiller8a677802016-04-22 15:07:53 -07001333 call->saved_receiving_stream_ready_bctlp = NULL;
Craig Tillerad980e32017-01-23 07:46:25 -08001334 grpc_closure_sched(exec_ctx, saved_rsr_closure, GRPC_ERROR_REF(error));
Craig Tillera44cbfc2016-02-03 16:02:49 -08001335 }
1336
1337 gpr_mu_unlock(&call->mu);
1338
Craig Tillerd04dc4d2017-01-09 14:40:28 -08001339 finish_batch_step(exec_ctx, bctl);
Craig Tillera44cbfc2016-02-03 16:02:49 -08001340}
1341
Craig Tillerc027e772016-05-03 16:27:00 -07001342static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp,
1343 grpc_error *error) {
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001344 batch_control *bctl = bctlp;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001345
Craig Tiller3ba16e42016-12-08 16:46:18 -08001346 add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error));
Craig Tillerd04dc4d2017-01-09 14:40:28 -08001347 finish_batch_step(exec_ctx, bctl);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001348}
1349
Craig Tiller89d33792017-02-08 16:39:16 -08001350static void free_no_op_completion(grpc_exec_ctx *exec_ctx, void *p,
1351 grpc_cq_completion *completion) {
1352 gpr_free(completion);
1353}
1354
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001355static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
1356 grpc_call *call, const grpc_op *ops,
1357 size_t nops, void *notify_tag,
1358 int is_notify_tag_closure) {
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001359 size_t i;
Craig Tillerfb189f82015-02-03 12:07:07 -08001360 const grpc_op *op;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001361 batch_control *bctl;
1362 int num_completion_callbacks_needed = 1;
1363 grpc_call_error error = GRPC_CALL_OK;
Craig Tiller9928d392015-08-18 09:40:24 -07001364
Vitaly Bukae60003d2016-08-01 19:34:51 -07001365 // sent_initial_metadata guards against variable reuse.
1366 grpc_metadata compression_md;
1367
Craig Tiller0ba432d2015-10-09 16:57:11 -07001368 GPR_TIMER_BEGIN("grpc_call_start_batch", 0);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001369 GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag);
Masood Malekghassemi76c3d742015-08-19 18:22:53 -07001370
Craig Tillera82950e2015-09-22 12:33:20 -07001371 if (nops == 0) {
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001372 if (!is_notify_tag_closure) {
Craig Tiller4bf29282015-12-14 11:25:48 -08001373 grpc_cq_begin_op(call->cq, notify_tag);
Craig Tiller89d33792017-02-08 16:39:16 -08001374 grpc_cq_end_op(exec_ctx, call->cq, notify_tag, GRPC_ERROR_NONE,
1375 free_no_op_completion, NULL,
1376 gpr_malloc(sizeof(grpc_cq_completion)));
Craig Tiller2db5bda2017-02-09 10:30:55 -08001377 } else {
1378 grpc_closure_sched(exec_ctx, notify_tag, GRPC_ERROR_NONE);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001379 }
Craig Tillerea50b902015-12-15 07:05:25 -08001380 error = GRPC_CALL_OK;
1381 goto done;
Craig Tillera82950e2015-09-22 12:33:20 -07001382 }
1383
Craig Tillera82950e2015-09-22 12:33:20 -07001384 /* TODO(ctiller): this feels like it could be made lock-free */
Craig Tiller89d33792017-02-08 16:39:16 -08001385 bctl = allocate_batch_control(call, ops, nops);
Craig Tiller5e5ef302017-02-09 08:46:49 -08001386 if (bctl == NULL) {
1387 return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1388 }
Craig Tillera82950e2015-09-22 12:33:20 -07001389 bctl->notify_tag = notify_tag;
1390 bctl->is_notify_tag_closure = (uint8_t)(is_notify_tag_closure != 0);
1391
Craig Tiller89d33792017-02-08 16:39:16 -08001392 gpr_mu_lock(&call->mu);
Craig Tillera82950e2015-09-22 12:33:20 -07001393 grpc_transport_stream_op *stream_op = &bctl->op;
1394 memset(stream_op, 0, sizeof(*stream_op));
1395 stream_op->covered_by_poller = true;
1396
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001397 /* rewrite batch ops into a transport op */
1398 for (i = 0; i < nops; i++) {
1399 op = &ops[i];
Craig Tillera82950e2015-09-22 12:33:20 -07001400 if (op->reserved != NULL) {
Craig Tiller3ffd8222015-09-21 08:21:57 -07001401 error = GRPC_CALL_ERROR;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001402 goto done_with_error;
Craig Tiller3ffd8222015-09-21 08:21:57 -07001403 }
Craig Tillera82950e2015-09-22 12:33:20 -07001404 switch (op->op) {
1405 case GRPC_OP_SEND_INITIAL_METADATA:
1406 /* Flag validation: currently allow no flags */
Craig Tillerc6549762016-03-09 17:10:43 -08001407 if (!are_initial_metadata_flags_valid(op->flags, call->is_client)) {
Craig Tillera82950e2015-09-22 12:33:20 -07001408 error = GRPC_CALL_ERROR_INVALID_FLAGS;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001409 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001410 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001411 if (call->sent_initial_metadata) {
1412 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1413 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001414 }
David Garcia Quintasa301eaa2016-05-06 16:59:03 -07001415 /* process compression level */
Vitaly Bukae60003d2016-08-01 19:34:51 -07001416 memset(&compression_md, 0, sizeof(compression_md));
David Garcia Quintasa301eaa2016-05-06 16:59:03 -07001417 size_t additional_metadata_count = 0;
David Garcia Quintas749367f2016-05-17 19:15:24 -07001418 grpc_compression_level effective_compression_level;
1419 bool level_set = false;
1420 if (op->data.send_initial_metadata.maybe_compression_level.is_set) {
David Garcia Quintas749367f2016-05-17 19:15:24 -07001421 effective_compression_level =
David Garcia Quintas8ba42be2016-06-07 17:30:20 -07001422 op->data.send_initial_metadata.maybe_compression_level.level;
David Garcia Quintas749367f2016-05-17 19:15:24 -07001423 level_set = true;
1424 } else {
David Garcia Quintasac094472016-05-18 20:25:57 -07001425 const grpc_compression_options copts =
1426 grpc_channel_compression_options(call->channel);
1427 level_set = copts.default_level.is_set;
1428 if (level_set) {
1429 effective_compression_level = copts.default_level.level;
1430 }
David Garcia Quintas749367f2016-05-17 19:15:24 -07001431 }
David Garcia Quintas3e4f49f2016-05-18 23:59:02 -07001432 if (level_set && !call->is_client) {
David Garcia Quintasa301eaa2016-05-06 16:59:03 -07001433 const grpc_compression_algorithm calgo =
1434 compression_algorithm_for_level_locked(
David Garcia Quintas749367f2016-05-17 19:15:24 -07001435 call, effective_compression_level);
David Garcia Quintas3e4f49f2016-05-18 23:59:02 -07001436 // the following will be picked up by the compress filter and used as
1437 // the call's compression algorithm.
Craig Tiller68208fe2016-11-14 14:35:02 -08001438 compression_md.key = GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST;
1439 compression_md.value = grpc_compression_algorithm_slice(calgo);
David Garcia Quintasa301eaa2016-05-06 16:59:03 -07001440 additional_metadata_count++;
1441 }
1442
1443 if (op->data.send_initial_metadata.count + additional_metadata_count >
1444 INT_MAX) {
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001445 error = GRPC_CALL_ERROR_INVALID_METADATA;
1446 goto done_with_error;
1447 }
1448 bctl->send_initial_metadata = 1;
1449 call->sent_initial_metadata = 1;
1450 if (!prepare_application_metadata(
Craig Tillera59c16c2016-10-31 07:25:01 -07001451 exec_ctx, call, (int)op->data.send_initial_metadata.count,
David Garcia Quintasa301eaa2016-05-06 16:59:03 -07001452 op->data.send_initial_metadata.metadata, 0, call->is_client,
1453 &compression_md, (int)additional_metadata_count)) {
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001454 error = GRPC_CALL_ERROR_INVALID_METADATA;
1455 goto done_with_error;
1456 }
1457 /* TODO(ctiller): just make these the same variable? */
1458 call->metadata_batch[0][0].deadline = call->send_deadline;
Craig Tiller6e7b45e2016-07-08 17:25:49 -07001459 stream_op->send_initial_metadata =
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001460 &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */];
Craig Tiller6e7b45e2016-07-08 17:25:49 -07001461 stream_op->send_initial_metadata_flags = op->flags;
Craig Tillera82950e2015-09-22 12:33:20 -07001462 break;
1463 case GRPC_OP_SEND_MESSAGE:
1464 if (!are_write_flags_valid(op->flags)) {
1465 error = GRPC_CALL_ERROR_INVALID_FLAGS;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001466 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001467 }
Mark D. Roth448c1f02017-01-25 10:44:30 -08001468 if (op->data.send_message.send_message == NULL) {
Craig Tillera82950e2015-09-22 12:33:20 -07001469 error = GRPC_CALL_ERROR_INVALID_MESSAGE;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001470 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001471 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001472 if (call->sending_message) {
1473 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1474 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001475 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001476 bctl->send_message = 1;
1477 call->sending_message = 1;
1478 grpc_slice_buffer_stream_init(
1479 &call->sending_stream,
Mark D. Roth448c1f02017-01-25 10:44:30 -08001480 &op->data.send_message.send_message->data.raw.slice_buffer,
1481 op->flags);
Lizan Zhou61f09732016-10-26 14:09:52 -07001482 /* If the outgoing buffer is already compressed, mark it as so in the
1483 flags. These will be picked up by the compression filter and further
1484 (wasteful) attempts at compression skipped. */
Mark D. Roth9d76dbe2017-01-25 15:02:56 -08001485 if (op->data.send_message.send_message->data.raw.compression >
1486 GRPC_COMPRESS_NONE) {
Lizan Zhou61f09732016-10-26 14:09:52 -07001487 call->sending_stream.base.flags |= GRPC_WRITE_INTERNAL_COMPRESS;
1488 }
Craig Tiller6e7b45e2016-07-08 17:25:49 -07001489 stream_op->send_message = &call->sending_stream.base;
Craig Tillera82950e2015-09-22 12:33:20 -07001490 break;
1491 case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
1492 /* Flag validation: currently allow no flags */
1493 if (op->flags != 0) {
1494 error = GRPC_CALL_ERROR_INVALID_FLAGS;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001495 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001496 }
1497 if (!call->is_client) {
1498 error = GRPC_CALL_ERROR_NOT_ON_SERVER;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001499 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001500 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001501 if (call->sent_final_op) {
1502 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1503 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001504 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001505 bctl->send_final_op = 1;
1506 call->sent_final_op = 1;
Craig Tiller6e7b45e2016-07-08 17:25:49 -07001507 stream_op->send_trailing_metadata =
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001508 &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
Craig Tillera82950e2015-09-22 12:33:20 -07001509 break;
1510 case GRPC_OP_SEND_STATUS_FROM_SERVER:
1511 /* Flag validation: currently allow no flags */
1512 if (op->flags != 0) {
1513 error = GRPC_CALL_ERROR_INVALID_FLAGS;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001514 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001515 }
1516 if (call->is_client) {
1517 error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001518 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001519 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001520 if (call->sent_final_op) {
1521 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1522 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001523 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001524 if (op->data.send_status_from_server.trailing_metadata_count >
1525 INT_MAX) {
1526 error = GRPC_CALL_ERROR_INVALID_METADATA;
1527 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001528 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001529 bctl->send_final_op = 1;
1530 call->sent_final_op = 1;
Craig Tiller93727aa2017-02-06 13:05:39 -08001531 GPR_ASSERT(call->send_extra_metadata_count == 0);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001532 call->send_extra_metadata_count = 1;
1533 call->send_extra_metadata[0].md = grpc_channel_get_reffed_status_elem(
Craig Tillera59c16c2016-10-31 07:25:01 -07001534 exec_ctx, call->channel, op->data.send_status_from_server.status);
Craig Tiller841a99d2016-12-12 16:58:57 -08001535 {
1536 grpc_error *override_error = GRPC_ERROR_NONE;
1537 if (op->data.send_status_from_server.status != GRPC_STATUS_OK) {
1538 override_error = GRPC_ERROR_CREATE("Error from server send status");
1539 }
1540 if (op->data.send_status_from_server.status_details != NULL) {
1541 call->send_extra_metadata[1].md = grpc_mdelem_from_slices(
1542 exec_ctx, GRPC_MDSTR_GRPC_MESSAGE,
1543 grpc_slice_ref_internal(
1544 *op->data.send_status_from_server.status_details));
1545 call->send_extra_metadata_count++;
1546 char *msg = grpc_slice_to_c_string(
1547 GRPC_MDVALUE(call->send_extra_metadata[1].md));
1548 override_error = grpc_error_set_str(
1549 override_error, GRPC_ERROR_STR_GRPC_MESSAGE, msg);
1550 gpr_free(msg);
1551 }
1552 set_status_from_error(exec_ctx, call, STATUS_FROM_API_OVERRIDE,
1553 override_error);
Craig Tiller69a1f662016-09-28 10:24:21 -07001554 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001555 if (!prepare_application_metadata(
Craig Tillera59c16c2016-10-31 07:25:01 -07001556 exec_ctx, call,
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001557 (int)op->data.send_status_from_server.trailing_metadata_count,
David Garcia Quintasa301eaa2016-05-06 16:59:03 -07001558 op->data.send_status_from_server.trailing_metadata, 1, 1, NULL,
1559 0)) {
Craig Tiller93727aa2017-02-06 13:05:39 -08001560 for (int n = 0; n < call->send_extra_metadata_count; n++) {
1561 GRPC_MDELEM_UNREF(exec_ctx, call->send_extra_metadata[n].md);
1562 }
1563 call->send_extra_metadata_count = 0;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001564 error = GRPC_CALL_ERROR_INVALID_METADATA;
1565 goto done_with_error;
1566 }
Craig Tiller6e7b45e2016-07-08 17:25:49 -07001567 stream_op->send_trailing_metadata =
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001568 &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
Craig Tillera82950e2015-09-22 12:33:20 -07001569 break;
1570 case GRPC_OP_RECV_INITIAL_METADATA:
1571 /* Flag validation: currently allow no flags */
1572 if (op->flags != 0) {
1573 error = GRPC_CALL_ERROR_INVALID_FLAGS;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001574 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001575 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001576 if (call->received_initial_metadata) {
1577 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1578 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001579 }
Craig Tiller9c5318a2016-12-05 15:07:04 -08001580 /* IF this is a server, then GRPC_OP_RECV_INITIAL_METADATA *must* come
1581 from server.c. In that case, it's coming from accept_stream, and in
1582 that case we're not necessarily covered by a poller. */
1583 stream_op->covered_by_poller = call->is_client;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001584 call->received_initial_metadata = 1;
Mark D. Roth448c1f02017-01-25 10:44:30 -08001585 call->buffered_metadata[0] =
1586 op->data.recv_initial_metadata.recv_initial_metadata;
Craig Tillera44cbfc2016-02-03 16:02:49 -08001587 grpc_closure_init(&call->receiving_initial_metadata_ready,
Craig Tiller91031da2016-12-28 15:44:25 -08001588 receiving_initial_metadata_ready, bctl,
1589 grpc_schedule_on_exec_ctx);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001590 bctl->recv_initial_metadata = 1;
Craig Tiller6e7b45e2016-07-08 17:25:49 -07001591 stream_op->recv_initial_metadata =
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001592 &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
Craig Tiller6e7b45e2016-07-08 17:25:49 -07001593 stream_op->recv_initial_metadata_ready =
Craig Tillera44cbfc2016-02-03 16:02:49 -08001594 &call->receiving_initial_metadata_ready;
1595 num_completion_callbacks_needed++;
Craig Tillera82950e2015-09-22 12:33:20 -07001596 break;
1597 case GRPC_OP_RECV_MESSAGE:
1598 /* Flag validation: currently allow no flags */
1599 if (op->flags != 0) {
1600 error = GRPC_CALL_ERROR_INVALID_FLAGS;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001601 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001602 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001603 if (call->receiving_message) {
1604 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
yang-g48f3a712015-12-07 11:23:50 -08001605 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001606 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001607 call->receiving_message = 1;
1608 bctl->recv_message = 1;
Mark D. Roth448c1f02017-01-25 10:44:30 -08001609 call->receiving_buffer = op->data.recv_message.recv_message;
Craig Tiller6e7b45e2016-07-08 17:25:49 -07001610 stream_op->recv_message = &call->receiving_stream;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001611 grpc_closure_init(&call->receiving_stream_ready, receiving_stream_ready,
Craig Tiller91031da2016-12-28 15:44:25 -08001612 bctl, grpc_schedule_on_exec_ctx);
Craig Tiller6e7b45e2016-07-08 17:25:49 -07001613 stream_op->recv_message_ready = &call->receiving_stream_ready;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001614 num_completion_callbacks_needed++;
Craig Tillera82950e2015-09-22 12:33:20 -07001615 break;
1616 case GRPC_OP_RECV_STATUS_ON_CLIENT:
1617 /* Flag validation: currently allow no flags */
1618 if (op->flags != 0) {
1619 error = GRPC_CALL_ERROR_INVALID_FLAGS;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001620 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001621 }
1622 if (!call->is_client) {
1623 error = GRPC_CALL_ERROR_NOT_ON_SERVER;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001624 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001625 }
Craig Tiller1cbf5762016-04-22 16:02:55 -07001626 if (call->requested_final_op) {
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001627 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1628 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001629 }
Craig Tiller1cbf5762016-04-22 16:02:55 -07001630 call->requested_final_op = 1;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001631 call->buffered_metadata[1] =
Craig Tillera82950e2015-09-22 12:33:20 -07001632 op->data.recv_status_on_client.trailing_metadata;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001633 call->final_op.client.status = op->data.recv_status_on_client.status;
1634 call->final_op.client.status_details =
1635 op->data.recv_status_on_client.status_details;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001636 bctl->recv_final_op = 1;
Craig Tiller6e7b45e2016-07-08 17:25:49 -07001637 stream_op->recv_trailing_metadata =
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001638 &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
Craig Tiller106df112016-08-04 09:22:34 -07001639 stream_op->collect_stats =
David Garcia Quintas5dde14c2016-07-28 17:29:27 -07001640 &call->final_info.stats.transport_stream_stats;
Craig Tillera82950e2015-09-22 12:33:20 -07001641 break;
1642 case GRPC_OP_RECV_CLOSE_ON_SERVER:
1643 /* Flag validation: currently allow no flags */
1644 if (op->flags != 0) {
1645 error = GRPC_CALL_ERROR_INVALID_FLAGS;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001646 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001647 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001648 if (call->is_client) {
1649 error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
1650 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001651 }
Craig Tiller1cbf5762016-04-22 16:02:55 -07001652 if (call->requested_final_op) {
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001653 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1654 goto done_with_error;
1655 }
Craig Tiller1cbf5762016-04-22 16:02:55 -07001656 call->requested_final_op = 1;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001657 call->final_op.server.cancelled =
Craig Tillera82950e2015-09-22 12:33:20 -07001658 op->data.recv_close_on_server.cancelled;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001659 bctl->recv_final_op = 1;
Craig Tiller6e7b45e2016-07-08 17:25:49 -07001660 stream_op->recv_trailing_metadata =
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001661 &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
Craig Tiller106df112016-08-04 09:22:34 -07001662 stream_op->collect_stats =
David Garcia Quintas5dde14c2016-07-28 17:29:27 -07001663 &call->final_info.stats.transport_stream_stats;
Craig Tillera82950e2015-09-22 12:33:20 -07001664 break;
Craig Tillerfb189f82015-02-03 12:07:07 -08001665 }
Craig Tillera82950e2015-09-22 12:33:20 -07001666 }
Craig Tillerfb189f82015-02-03 12:07:07 -08001667
Craig Tillera82950e2015-09-22 12:33:20 -07001668 GRPC_CALL_INTERNAL_REF(call, "completion");
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001669 if (!is_notify_tag_closure) {
Craig Tiller4bf29282015-12-14 11:25:48 -08001670 grpc_cq_begin_op(call->cq, notify_tag);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001671 }
1672 gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed);
Craig Tillerfb189f82015-02-03 12:07:07 -08001673
Craig Tiller6e7b45e2016-07-08 17:25:49 -07001674 stream_op->context = call->context;
Craig Tiller91031da2016-12-28 15:44:25 -08001675 grpc_closure_init(&bctl->finish_batch, finish_batch, bctl,
1676 grpc_schedule_on_exec_ctx);
Craig Tiller6e7b45e2016-07-08 17:25:49 -07001677 stream_op->on_complete = &bctl->finish_batch;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001678 gpr_mu_unlock(&call->mu);
1679
Craig Tiller6e7b45e2016-07-08 17:25:49 -07001680 execute_op(exec_ctx, call, stream_op);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001681
Craig Tiller3ffd8222015-09-21 08:21:57 -07001682done:
Craig Tiller0ba432d2015-10-09 16:57:11 -07001683 GPR_TIMER_END("grpc_call_start_batch", 0);
Craig Tiller3ffd8222015-09-21 08:21:57 -07001684 return error;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001685
1686done_with_error:
1687 /* reverse any mutations that occured */
1688 if (bctl->send_initial_metadata) {
1689 call->sent_initial_metadata = 0;
Craig Tillera59c16c2016-10-31 07:25:01 -07001690 grpc_metadata_batch_clear(exec_ctx, &call->metadata_batch[0][0]);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001691 }
1692 if (bctl->send_message) {
1693 call->sending_message = 0;
Craig Tiller3b66ab92015-12-09 19:42:22 -08001694 grpc_byte_stream_destroy(exec_ctx, &call->sending_stream.base);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001695 }
1696 if (bctl->send_final_op) {
1697 call->sent_final_op = 0;
Craig Tillera59c16c2016-10-31 07:25:01 -07001698 grpc_metadata_batch_clear(exec_ctx, &call->metadata_batch[0][1]);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001699 }
1700 if (bctl->recv_initial_metadata) {
1701 call->received_initial_metadata = 0;
1702 }
1703 if (bctl->recv_message) {
1704 call->receiving_message = 0;
1705 }
1706 if (bctl->recv_final_op) {
Craig Tiller1cbf5762016-04-22 16:02:55 -07001707 call->requested_final_op = 0;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001708 }
1709 gpr_mu_unlock(&call->mu);
1710 goto done;
1711}
1712
1713grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
1714 size_t nops, void *tag, void *reserved) {
1715 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
1716 grpc_call_error err;
1717
1718 GRPC_API_TRACE(
David Garcia Quintas46123372016-05-09 15:28:42 -07001719 "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, "
1720 "reserved=%p)",
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001721 5, (call, ops, (unsigned long)nops, tag, reserved));
1722
1723 if (reserved != NULL) {
1724 err = GRPC_CALL_ERROR;
1725 } else {
1726 err = call_start_batch(&exec_ctx, call, ops, nops, tag, 0);
1727 }
1728
1729 grpc_exec_ctx_finish(&exec_ctx);
1730 return err;
1731}
1732
1733grpc_call_error grpc_call_start_batch_and_execute(grpc_exec_ctx *exec_ctx,
1734 grpc_call *call,
1735 const grpc_op *ops,
1736 size_t nops,
1737 grpc_closure *closure) {
1738 return call_start_batch(exec_ctx, call, ops, nops, closure, 1);
Craig Tillerfb189f82015-02-03 12:07:07 -08001739}
Craig Tiller935cf422015-05-01 14:10:46 -07001740
Craig Tillera82950e2015-09-22 12:33:20 -07001741void grpc_call_context_set(grpc_call *call, grpc_context_index elem,
1742 void *value, void (*destroy)(void *value)) {
1743 if (call->context[elem].destroy) {
1744 call->context[elem].destroy(call->context[elem].value);
1745 }
Julien Boeuf83b02972015-05-20 22:50:34 -07001746 call->context[elem].value = value;
1747 call->context[elem].destroy = destroy;
Craig Tiller935cf422015-05-01 14:10:46 -07001748}
1749
Craig Tillera82950e2015-09-22 12:33:20 -07001750void *grpc_call_context_get(grpc_call *call, grpc_context_index elem) {
Julien Boeuf83b02972015-05-20 22:50:34 -07001751 return call->context[elem].value;
Craig Tiller935cf422015-05-01 14:10:46 -07001752}
Julien Boeuf9f218dd2015-04-23 10:24:02 -07001753
Craig Tiller7536af02015-12-22 13:49:30 -08001754uint8_t grpc_call_is_client(grpc_call *call) { return call->is_client; }
David Garcia Quintas13c2f6e2016-03-17 22:51:52 -07001755
1756grpc_compression_algorithm grpc_call_compression_for_level(
1757 grpc_call *call, grpc_compression_level level) {
David Garcia Quintas7d2c7332016-03-18 11:37:14 -07001758 gpr_mu_lock(&call->mu);
David Garcia Quintasa301eaa2016-05-06 16:59:03 -07001759 grpc_compression_algorithm algo =
1760 compression_algorithm_for_level_locked(call, level);
David Garcia Quintas7d2c7332016-03-18 11:37:14 -07001761 gpr_mu_unlock(&call->mu);
David Garcia Quintasa301eaa2016-05-06 16:59:03 -07001762 return algo;
David Garcia Quintas13c2f6e2016-03-17 22:51:52 -07001763}
Yuchen Zeng2e7d9572016-04-15 17:29:57 -07001764
1765const char *grpc_call_error_to_string(grpc_call_error error) {
1766 switch (error) {
1767 case GRPC_CALL_ERROR:
1768 return "GRPC_CALL_ERROR";
1769 case GRPC_CALL_ERROR_ALREADY_ACCEPTED:
1770 return "GRPC_CALL_ERROR_ALREADY_ACCEPTED";
1771 case GRPC_CALL_ERROR_ALREADY_FINISHED:
1772 return "GRPC_CALL_ERROR_ALREADY_FINISHED";
1773 case GRPC_CALL_ERROR_ALREADY_INVOKED:
1774 return "GRPC_CALL_ERROR_ALREADY_INVOKED";
1775 case GRPC_CALL_ERROR_BATCH_TOO_BIG:
1776 return "GRPC_CALL_ERROR_BATCH_TOO_BIG";
1777 case GRPC_CALL_ERROR_INVALID_FLAGS:
1778 return "GRPC_CALL_ERROR_INVALID_FLAGS";
1779 case GRPC_CALL_ERROR_INVALID_MESSAGE:
1780 return "GRPC_CALL_ERROR_INVALID_MESSAGE";
1781 case GRPC_CALL_ERROR_INVALID_METADATA:
1782 return "GRPC_CALL_ERROR_INVALID_METADATA";
1783 case GRPC_CALL_ERROR_NOT_INVOKED:
1784 return "GRPC_CALL_ERROR_NOT_INVOKED";
1785 case GRPC_CALL_ERROR_NOT_ON_CLIENT:
1786 return "GRPC_CALL_ERROR_NOT_ON_CLIENT";
1787 case GRPC_CALL_ERROR_NOT_ON_SERVER:
1788 return "GRPC_CALL_ERROR_NOT_ON_SERVER";
1789 case GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE:
1790 return "GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE";
1791 case GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH:
1792 return "GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH";
1793 case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS:
1794 return "GRPC_CALL_ERROR_TOO_MANY_OPERATIONS";
1795 case GRPC_CALL_OK:
1796 return "GRPC_CALL_OK";
Yuchen Zeng2e7d9572016-04-15 17:29:57 -07001797 }
Yuchen Zengf02bada2016-04-19 14:12:27 -07001798 GPR_UNREACHABLE_CODE(return "GRPC_CALL_ERROR_UNKNOW");
Yuchen Zeng2e7d9572016-04-15 17:29:57 -07001799}