blob: 09b107cf6b6dde6a3bc835ae30f77b40fbaedb27 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2015 gRPC authors.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080016 *
17 */
murgatroid999030c812016-09-16 13:25:08 -070018
David Garcia Quintasf74a49e2015-06-18 17:22:45 -070019#include <assert.h>
Craig Tillerc7e1a2a2015-11-02 14:17:32 -080020#include <limits.h>
David Garcia Quintasf74a49e2015-06-18 17:22:45 -070021#include <stdio.h>
22#include <stdlib.h>
23#include <string.h>
24
25#include <grpc/compression.h>
murgatroid99c3910ca2016-01-06 13:14:23 -080026#include <grpc/grpc.h>
Craig Tiller0f310802016-10-26 16:25:56 -070027#include <grpc/slice.h>
David Garcia Quintasf74a49e2015-06-18 17:22:45 -070028#include <grpc/support/alloc.h>
29#include <grpc/support/log.h>
30#include <grpc/support/string_util.h>
David Garcia Quintase091af82015-07-15 21:37:02 -070031#include <grpc/support/useful.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080032
Craig Tiller9533d042016-03-25 17:11:06 -070033#include "src/core/lib/channel/channel_stack.h"
34#include "src/core/lib/compression/algorithm_metadata.h"
Craig Tiller28086682017-07-18 14:22:19 -070035#include "src/core/lib/debug/stats.h"
Craig Tiller9533d042016-03-25 17:11:06 -070036#include "src/core/lib/iomgr/timer.h"
37#include "src/core/lib/profiling/timers.h"
Craig Tillera59c16c2016-10-31 07:25:01 -070038#include "src/core/lib/slice/slice_internal.h"
Craig Tiller0f310802016-10-26 16:25:56 -070039#include "src/core/lib/slice/slice_string_helpers.h"
Craig Tillere7a17022017-03-13 10:20:38 -070040#include "src/core/lib/support/arena.h"
Craig Tiller9533d042016-03-25 17:11:06 -070041#include "src/core/lib/support/string.h"
42#include "src/core/lib/surface/api_trace.h"
43#include "src/core/lib/surface/call.h"
44#include "src/core/lib/surface/channel.h"
45#include "src/core/lib/surface/completion_queue.h"
Craig Tillerf2b5b7e2017-01-10 08:28:59 -080046#include "src/core/lib/surface/validate_metadata.h"
Craig Tiller732351f2016-12-13 16:40:38 -080047#include "src/core/lib/transport/error_utils.h"
David Garcia Quintas73dcbda2016-04-23 00:17:05 -070048#include "src/core/lib/transport/metadata.h"
Craig Tiller9533d042016-03-25 17:11:06 -070049#include "src/core/lib/transport/static_metadata.h"
David Garcia Quintas73dcbda2016-04-23 00:17:05 -070050#include "src/core/lib/transport/transport.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080051
Craig Tillerc7e1a2a2015-11-02 14:17:32 -080052/** The maximum number of concurrent batches possible.
Craig Tiller1b011672015-07-10 10:41:44 -070053 Based upon the maximum number of individually queueable ops in the batch
Craig Tiller94903892016-10-11 15:43:35 -070054 api:
Craig Tiller1b011672015-07-10 10:41:44 -070055 - initial metadata send
56 - message send
57 - status/close send (depending on client/server)
58 - initial metadata recv
59 - message recv
60 - status/close recv (depending on client/server) */
Craig Tillerc7e1a2a2015-11-02 14:17:32 -080061#define MAX_CONCURRENT_BATCHES 6
Craig Tiller1b011672015-07-10 10:41:44 -070062
Craig Tillerc7e1a2a2015-11-02 14:17:32 -080063#define MAX_SEND_EXTRA_METADATA_COUNT 3
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080064
Craig Tillerdaceea82015-02-02 16:15:53 -080065/* Status data for a request can come from several sources; this
66 enumerates them all, and acts as a priority sorting for which
67 status to return to the application - earlier entries override
68 later ones */
Craig Tillera82950e2015-09-22 12:33:20 -070069typedef enum {
Craig Tillerdaceea82015-02-02 16:15:53 -080070 /* Status came from the application layer overriding whatever
71 the wire says */
Craig Tiller68752722015-01-29 14:59:54 -080072 STATUS_FROM_API_OVERRIDE = 0,
Craig Tillerdaceea82015-02-02 16:15:53 -080073 /* Status came from 'the wire' - or somewhere below the surface
74 layer */
Craig Tiller68752722015-01-29 14:59:54 -080075 STATUS_FROM_WIRE,
Craig Tiller2dc32ea2017-01-31 15:32:34 -080076 /* Status was created by some internal channel stack operation: must come via
77 add_batch_error */
Craig Tiller2aa03df2016-03-16 08:24:55 -070078 STATUS_FROM_CORE,
Craig Tiller2dc32ea2017-01-31 15:32:34 -080079 /* Status was created by some surface error */
80 STATUS_FROM_SURFACE,
Craig Tilleraea081f2015-06-11 14:19:33 -070081 /* Status came from the server sending status */
82 STATUS_FROM_SERVER_STATUS,
Craig Tiller68752722015-01-29 14:59:54 -080083 STATUS_SOURCE_COUNT
84} status_source;
85
Craig Tillera82950e2015-09-22 12:33:20 -070086typedef struct {
Craig Tiller841a99d2016-12-12 16:58:57 -080087 bool is_set;
88 grpc_error *error;
Craig Tiller68752722015-01-29 14:59:54 -080089} received_status;
90
Craig Tiller4bab9462017-02-22 08:56:02 -080091static gpr_atm pack_received_status(received_status r) {
92 return r.is_set ? (1 | (gpr_atm)r.error) : 0;
93}
94
95static received_status unpack_received_status(gpr_atm atm) {
96 return (atm & 1) == 0
97 ? (received_status){.is_set = false, .error = GRPC_ERROR_NONE}
98 : (received_status){.is_set = true,
99 .error = (grpc_error *)(atm & ~(gpr_atm)1)};
100}
101
yang-g23f777d2017-02-22 23:32:26 -0800102#define MAX_ERRORS_PER_BATCH 4
Craig Tiller94903892016-10-11 15:43:35 -0700103
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800104typedef struct batch_control {
105 grpc_call *call;
Craig Tillere198b712017-03-31 15:29:33 -0700106 /* Share memory for cq_completion and notify_tag as they are never needed
107 simultaneously. Each byte used in this data structure count as six bytes
Craig Tiller7a8232d2017-04-03 10:59:42 -0700108 per call, so any savings we can make are worthwhile,
109
110 We use notify_tag to determine whether or not to send notification to the
111 completion queue. Once we've made that determination, we can reuse the
112 memory for cq_completion. */
Craig Tillerea54b8c2017-03-01 16:58:28 -0800113 union {
114 grpc_cq_completion cq_completion;
115 struct {
Craig Tillere198b712017-03-31 15:29:33 -0700116 /* Any given op indicates completion by either (a) calling a closure or
117 (b) sending a notification on the call's completion queue. If
118 \a is_closure is true, \a tag indicates a closure to be invoked;
119 otherwise, \a tag indicates the tag to be used in the notification to
120 be sent to the completion queue. */
Craig Tillerea54b8c2017-03-01 16:58:28 -0800121 void *tag;
122 bool is_closure;
123 } notify_tag;
124 } completion_data;
Mark D. Roth764cf042017-09-01 09:00:06 -0700125 grpc_closure start_batch;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800126 grpc_closure finish_batch;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800127 gpr_refcount steps_to_complete;
Craig Tiller94903892016-10-11 15:43:35 -0700128
129 grpc_error *errors[MAX_ERRORS_PER_BATCH];
130 gpr_atm num_errors;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800131
Craig Tillera0f3abd2017-03-31 15:42:16 -0700132 grpc_transport_stream_op_batch op;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800133} batch_control;
134
Craig Tiller1c10a7b2017-03-29 14:35:16 -0700135typedef struct {
136 gpr_mu child_list_mu;
137 grpc_call *first_child;
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700138} parent_call;
Craig Tiller1c10a7b2017-03-29 14:35:16 -0700139
140typedef struct {
141 grpc_call *parent;
142 /** siblings: children of the same parent form a list, and this list is
143 protected under
144 parent->mu */
145 grpc_call *sibling_next;
146 grpc_call *sibling_prev;
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700147} child_call;
Craig Tiller1c10a7b2017-03-29 14:35:16 -0700148
Yuchen Zeng6eb505b2017-08-25 16:05:29 -0700149#define RECV_NONE ((gpr_atm)0)
150#define RECV_INITIAL_METADATA_FIRST ((gpr_atm)1)
151
Craig Tillera82950e2015-09-22 12:33:20 -0700152struct grpc_call {
Craig Tillerdd36b152017-03-31 08:27:28 -0700153 gpr_refcount ext_ref;
Craig Tillere7a17022017-03-13 10:20:38 -0700154 gpr_arena *arena;
Mark D. Roth764cf042017-09-01 09:00:06 -0700155 grpc_call_combiner call_combiner;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800156 grpc_completion_queue *cq;
David Garcia Quintas2a50dfe2016-05-31 15:09:12 -0700157 grpc_polling_entity pollent;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800158 grpc_channel *channel;
Mark D. Roth3d883412016-11-07 13:42:54 -0800159 gpr_timespec start_time;
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700160 /* parent_call* */ gpr_atm parent_call_atm;
161 child_call *child;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800162
Craig Tillere5d683c2015-02-03 16:37:36 -0800163 /* client or server call */
Craig Tiller1cbf5762016-04-22 16:02:55 -0700164 bool is_client;
Craig Tillerdd36b152017-03-31 08:27:28 -0700165 /** has grpc_call_unref been called */
Craig Tiller1cbf5762016-04-22 16:02:55 -0700166 bool destroy_called;
Craig Tillerc7df0df2015-08-03 08:06:50 -0700167 /** flag indicating that cancellation is inherited */
Craig Tiller1cbf5762016-04-22 16:02:55 -0700168 bool cancellation_is_inherited;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800169 /** which ops are in-flight */
Craig Tiller1cbf5762016-04-22 16:02:55 -0700170 bool sent_initial_metadata;
171 bool sending_message;
172 bool sent_final_op;
173 bool received_initial_metadata;
174 bool receiving_message;
175 bool requested_final_op;
Craig Tillerb597dcf2017-03-09 07:02:11 -0800176 gpr_atm any_ops_sent_atm;
177 gpr_atm received_final_op_atm;
yang-g0b6ad7d2015-06-25 14:39:01 -0700178
Craig Tillerb58de722017-03-29 14:15:12 -0700179 batch_control *active_batches[MAX_CONCURRENT_BATCHES];
Craig Tillera0f3abd2017-03-31 15:42:16 -0700180 grpc_transport_stream_op_batch_payload stream_op_payload;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800181
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800182 /* first idx: is_receiving, second idx: is_trailing */
183 grpc_metadata_batch metadata_batch[2][2];
Craig Tillerebf94bf2015-02-05 08:48:46 -0800184
Craig Tillere5d683c2015-02-03 16:37:36 -0800185 /* Buffered read metadata waiting to be returned to the application.
186 Element 0 is initial metadata, element 1 is trailing metadata. */
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800187 grpc_metadata_array *buffered_metadata[2];
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800188
Mark D. Roth764cf042017-09-01 09:00:06 -0700189 grpc_metadata compression_md;
190
191 // A char* indicating the peer name.
192 gpr_atm peer_string;
193
Craig Tiller4bab9462017-02-22 08:56:02 -0800194 /* Packed received call statuses from various sources */
195 gpr_atm status[STATUS_SOURCE_COUNT];
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800196
David Garcia Quintas01c4d992016-07-07 20:11:27 -0700197 /* Call data useful used for reporting. Only valid after the call has
198 * completed */
199 grpc_call_final_info final_info;
Craig Tiller466129e2016-03-09 14:43:18 -0800200
David Garcia Quintas749367f2016-05-17 19:15:24 -0700201 /* Compression algorithm for *incoming* data */
202 grpc_compression_algorithm incoming_compression_algorithm;
Muxi Yan68a0fd52017-07-21 09:26:04 -0700203 /* Stream compression algorithm for *incoming* data */
204 grpc_stream_compression_algorithm incoming_stream_compression_algorithm;
David Garcia Quintase091af82015-07-15 21:37:02 -0700205 /* Supported encodings (compression algorithms), a bitset */
Craig Tiller7536af02015-12-22 13:49:30 -0800206 uint32_t encodings_accepted_by_peer;
Muxi Yan68a0fd52017-07-21 09:26:04 -0700207 /* Supported stream encodings (stream compression algorithms), a bitset */
208 uint32_t stream_encodings_accepted_by_peer;
David Garcia Quintasb8edf7e2015-07-08 20:18:57 -0700209
Julien Boeufc6f8d0a2015-05-11 22:40:02 -0700210 /* Contexts for various subsystems (security, tracing, ...). */
Julien Boeuf83b02972015-05-20 22:50:34 -0700211 grpc_call_context_element context[GRPC_CONTEXT_COUNT];
Craig Tiller935cf422015-05-01 14:10:46 -0700212
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800213 /* for the client, extra metadata is initial metadata; for the
214 server, it's trailing metadata */
215 grpc_linked_mdelem send_extra_metadata[MAX_SEND_EXTRA_METADATA_COUNT];
216 int send_extra_metadata_count;
Craig Tiller6902ad22015-04-16 08:01:49 -0700217 gpr_timespec send_deadline;
218
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800219 grpc_slice_buffer_stream sending_stream;
Craig Tiller94903892016-10-11 15:43:35 -0700220
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800221 grpc_byte_stream *receiving_stream;
222 grpc_byte_buffer **receiving_buffer;
Craig Tillerd41a4a72016-10-26 16:16:06 -0700223 grpc_slice receiving_slice;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800224 grpc_closure receiving_slice_ready;
225 grpc_closure receiving_stream_ready;
Craig Tillera44cbfc2016-02-03 16:02:49 -0800226 grpc_closure receiving_initial_metadata_ready;
Craig Tiller7536af02015-12-22 13:49:30 -0800227 uint32_t test_only_last_message_flags;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800228
Craig Tillere7a17022017-03-13 10:20:38 -0700229 grpc_closure release_call;
230
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800231 union {
232 struct {
233 grpc_status_code *status;
Craig Tiller68208fe2016-11-14 14:35:02 -0800234 grpc_slice *status_details;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800235 } client;
236 struct {
237 int *cancelled;
238 } server;
239 } final_op;
Craig Tillera44cbfc2016-02-03 16:02:49 -0800240
Yuchen Zeng6eb505b2017-08-25 16:05:29 -0700241 /* recv_state can contain one of the following values:
242 RECV_NONE : : no initial metadata and messages received
243 RECV_INITIAL_METADATA_FIRST : received initial metadata first
244 a batch_control* : received messages first
245
246 +------1------RECV_NONE------3-----+
247 | |
248 | |
249 v v
250 RECV_INITIAL_METADATA_FIRST receiving_stream_ready_bctlp
251 | ^ | ^
252 | | | |
253 +-----2-----+ +-----4-----+
254
255 For 1, 4: See receiving_initial_metadata_ready() function
256 For 2, 3: See receiving_stream_ready() function */
257 gpr_atm recv_state;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800258};
259
ncteisen7712c7c2017-07-12 23:11:27 -0700260grpc_tracer_flag grpc_call_error_trace =
261 GRPC_TRACER_INITIALIZER(false, "call_error");
262grpc_tracer_flag grpc_compression_trace =
263 GRPC_TRACER_INITIALIZER(false, "compression");
Craig Tiller58b30cd2017-01-31 17:07:36 -0800264
Craig Tiller87d5b192015-04-16 14:37:57 -0700265#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800266#define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1)
267#define CALL_ELEM_FROM_CALL(call, idx) \
268 grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx)
269#define CALL_FROM_TOP_ELEM(top_elem) \
270 CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
271
Mark D. Roth764cf042017-09-01 09:00:06 -0700272static void execute_batch(grpc_exec_ctx *exec_ctx, grpc_call *call,
273 grpc_transport_stream_op_batch *op,
274 grpc_closure *start_batch_closure);
Craig Tiller2dc32ea2017-01-31 15:32:34 -0800275static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
276 status_source source, grpc_status_code status,
277 const char *description);
Craig Tiller255edaa2016-12-13 09:04:55 -0800278static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c,
Craig Tiller2dc32ea2017-01-31 15:32:34 -0800279 status_source source, grpc_error *error);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800280static void destroy_call(grpc_exec_ctx *exec_ctx, void *call_stack,
Craig Tillerc027e772016-05-03 16:27:00 -0700281 grpc_error *error);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800282static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
Craig Tillerc027e772016-05-03 16:27:00 -0700283 grpc_error *error);
Craig Tiller841a99d2016-12-12 16:58:57 -0800284static void get_final_status(grpc_call *call,
285 void (*set_value)(grpc_status_code code,
286 void *user_data),
287 void *set_value_user_data, grpc_slice *details);
288static void set_status_value_directly(grpc_status_code status, void *dest);
289static void set_status_from_error(grpc_exec_ctx *exec_ctx, grpc_call *call,
290 status_source source, grpc_error *error);
291static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl);
292static void post_batch_completion(grpc_exec_ctx *exec_ctx, batch_control *bctl);
293static void add_batch_error(grpc_exec_ctx *exec_ctx, batch_control *bctl,
yang-g23f777d2017-02-22 23:32:26 -0800294 grpc_error *error, bool has_cancelled);
Craig Tillerbac41422015-05-29 16:32:28 -0700295
Yash Tibrewal52778c42017-09-11 15:00:11 -0700296static void add_init_error(grpc_error **composite, grpc_error *new_err) {
297 if (new_err == GRPC_ERROR_NONE) return;
Craig Tillerf4484cd2017-02-01 08:28:40 -0800298 if (*composite == GRPC_ERROR_NONE)
ncteisen4b36a3d2017-03-13 19:08:06 -0700299 *composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Call creation failed");
Yash Tibrewal52778c42017-09-11 15:00:11 -0700300 *composite = grpc_error_add_child(*composite, new_err);
Craig Tillerf4484cd2017-02-01 08:28:40 -0800301}
302
Craig Tiller58450912017-03-16 09:42:43 -0700303void *grpc_call_arena_alloc(grpc_call *call, size_t size) {
304 return gpr_arena_alloc(call->arena, size);
305}
306
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700307static parent_call *get_or_create_parent_call(grpc_call *call) {
308 parent_call *p = (parent_call *)gpr_atm_acq_load(&call->parent_call_atm);
Craig Tiller1c10a7b2017-03-29 14:35:16 -0700309 if (p == NULL) {
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700310 p = (parent_call *)gpr_arena_alloc(call->arena, sizeof(*p));
Craig Tiller1c10a7b2017-03-29 14:35:16 -0700311 gpr_mu_init(&p->child_list_mu);
312 if (!gpr_atm_rel_cas(&call->parent_call_atm, (gpr_atm)NULL, (gpr_atm)p)) {
313 gpr_mu_destroy(&p->child_list_mu);
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700314 p = (parent_call *)gpr_atm_acq_load(&call->parent_call_atm);
Craig Tiller1c10a7b2017-03-29 14:35:16 -0700315 }
316 }
317 return p;
318}
319
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700320static parent_call *get_parent_call(grpc_call *call) {
321 return (parent_call *)gpr_atm_acq_load(&call->parent_call_atm);
Craig Tiller1c10a7b2017-03-29 14:35:16 -0700322}
323
Craig Tillera59c16c2016-10-31 07:25:01 -0700324grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
325 const grpc_call_create_args *args,
Craig Tiller8e214652016-08-19 09:54:31 -0700326 grpc_call **out_call) {
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800327 size_t i, j;
Craig Tillerf4484cd2017-02-01 08:28:40 -0800328 grpc_error *error = GRPC_ERROR_NONE;
Craig Tiller8e214652016-08-19 09:54:31 -0700329 grpc_channel_stack *channel_stack =
330 grpc_channel_get_channel_stack(args->channel);
Craig Tiller1f41b6b2015-10-09 15:07:02 -0700331 grpc_call *call;
Craig Tiller0ba432d2015-10-09 16:57:11 -0700332 GPR_TIMER_BEGIN("grpc_call_create", 0);
Craig Tillera6bec8f2017-03-14 08:26:04 -0700333 gpr_arena *arena =
334 gpr_arena_create(grpc_channel_get_call_size_estimate(args->channel));
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700335 call = (grpc_call *)gpr_arena_alloc(
336 arena, sizeof(grpc_call) + channel_stack->call_stack_size);
Craig Tillerdd36b152017-03-31 08:27:28 -0700337 gpr_ref_init(&call->ext_ref, 1);
Craig Tillere7a17022017-03-13 10:20:38 -0700338 call->arena = arena;
Mark D. Roth764cf042017-09-01 09:00:06 -0700339 grpc_call_combiner_init(&call->call_combiner);
Craig Tiller0eaed722016-09-21 10:44:18 -0700340 *out_call = call;
Craig Tiller8e214652016-08-19 09:54:31 -0700341 call->channel = args->channel;
342 call->cq = args->cq;
Mark D. Roth3d883412016-11-07 13:42:54 -0800343 call->start_time = gpr_now(GPR_CLOCK_MONOTONIC);
David Garcia Quintas46123372016-05-09 15:28:42 -0700344 /* Always support no compression */
345 GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
Craig Tiller8e214652016-08-19 09:54:31 -0700346 call->is_client = args->server_transport_data == NULL;
Craig Tiller28086682017-07-18 14:22:19 -0700347 if (call->is_client) {
348 GRPC_STATS_INC_CLIENT_CALLS_CREATED(exec_ctx);
349 } else {
350 GRPC_STATS_INC_SERVER_CALLS_CREATED(exec_ctx);
351 }
Craig Tillerea54b8c2017-03-01 16:58:28 -0800352 call->stream_op_payload.context = call->context;
Craig Tiller4eecdde2016-11-14 08:21:17 -0800353 grpc_slice path = grpc_empty_slice();
Craig Tillera82950e2015-09-22 12:33:20 -0700354 if (call->is_client) {
Craig Tiller8e214652016-08-19 09:54:31 -0700355 GPR_ASSERT(args->add_initial_metadata_count <
356 MAX_SEND_EXTRA_METADATA_COUNT);
357 for (i = 0; i < args->add_initial_metadata_count; i++) {
358 call->send_extra_metadata[i].md = args->add_initial_metadata[i];
Craig Tiller3b05e1d2016-11-21 13:46:31 -0800359 if (grpc_slice_eq(GRPC_MDKEY(args->add_initial_metadata[i]),
360 GRPC_MDSTR_PATH)) {
Craig Tiller0160de92016-11-18 08:46:46 -0800361 path = grpc_slice_ref_internal(
362 GRPC_MDVALUE(args->add_initial_metadata[i]));
Mark D. Rothaa850a72016-09-26 13:38:02 -0700363 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800364 }
Craig Tiller8e214652016-08-19 09:54:31 -0700365 call->send_extra_metadata_count = (int)args->add_initial_metadata_count;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800366 } else {
Craig Tiller8e214652016-08-19 09:54:31 -0700367 GPR_ASSERT(args->add_initial_metadata_count == 0);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800368 call->send_extra_metadata_count = 0;
Craig Tillera82950e2015-09-22 12:33:20 -0700369 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800370 for (i = 0; i < 2; i++) {
371 for (j = 0; j < 2; j++) {
372 call->metadata_batch[i][j].deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
373 }
Craig Tillera82950e2015-09-22 12:33:20 -0700374 }
Craig Tillerca3451d2016-09-29 10:27:44 -0700375 gpr_timespec send_deadline =
Craig Tiller8e214652016-08-19 09:54:31 -0700376 gpr_convert_clock_type(args->send_deadline, GPR_CLOCK_MONOTONIC);
Mark D. Rothf28763c2016-09-14 15:18:40 -0700377
Craig Tillerbea92ba2017-04-19 08:33:31 -0700378 bool immediately_cancel = false;
379
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700380 if (args->parent != NULL) {
381 child_call *cc = call->child =
382 (child_call *)gpr_arena_alloc(arena, sizeof(child_call));
383 call->child->parent = args->parent;
Craig Tiller1c10a7b2017-03-29 14:35:16 -0700384
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700385 GRPC_CALL_INTERNAL_REF(args->parent, "child");
Craig Tillera82950e2015-09-22 12:33:20 -0700386 GPR_ASSERT(call->is_client);
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700387 GPR_ASSERT(!args->parent->is_client);
Craig Tillera82950e2015-09-22 12:33:20 -0700388
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700389 parent_call *pc = get_or_create_parent_call(args->parent);
Craig Tiller1c10a7b2017-03-29 14:35:16 -0700390
391 gpr_mu_lock(&pc->child_list_mu);
Craig Tillera82950e2015-09-22 12:33:20 -0700392
Craig Tiller8e214652016-08-19 09:54:31 -0700393 if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) {
Craig Tillera82950e2015-09-22 12:33:20 -0700394 send_deadline = gpr_time_min(
395 gpr_convert_clock_type(send_deadline,
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700396 args->parent->send_deadline.clock_type),
397 args->parent->send_deadline);
Craig Tillerc7df0df2015-08-03 08:06:50 -0700398 }
Craig Tillera82950e2015-09-22 12:33:20 -0700399 /* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with
400 * GRPC_PROPAGATE_STATS_CONTEXT */
401 /* TODO(ctiller): This should change to use the appropriate census start_op
402 * call. */
Craig Tiller8e214652016-08-19 09:54:31 -0700403 if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT) {
Craig Tiller239af8b2017-02-01 10:21:42 -0800404 if (0 == (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT)) {
ncteisen4b36a3d2017-03-13 19:08:06 -0700405 add_init_error(&error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
406 "Census tracing propagation requested "
407 "without Census context propagation"));
Craig Tiller239af8b2017-02-01 10:21:42 -0800408 }
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700409 grpc_call_context_set(call, GRPC_CONTEXT_TRACING,
410 args->parent->context[GRPC_CONTEXT_TRACING].value,
411 NULL);
Craig Tillerf20d3072017-02-01 10:39:26 -0800412 } else if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT) {
ncteisen4b36a3d2017-03-13 19:08:06 -0700413 add_init_error(&error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
414 "Census context propagation requested "
415 "without Census tracing propagation"));
Craig Tiller45724b32015-09-22 10:42:19 -0700416 }
Craig Tiller8e214652016-08-19 09:54:31 -0700417 if (args->propagation_mask & GRPC_PROPAGATE_CANCELLATION) {
Craig Tillera82950e2015-09-22 12:33:20 -0700418 call->cancellation_is_inherited = 1;
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700419 if (gpr_atm_acq_load(&args->parent->received_final_op_atm)) {
Craig Tillerbea92ba2017-04-19 08:33:31 -0700420 immediately_cancel = true;
Craig Tiller123c72b2017-03-10 07:33:27 -0800421 }
Craig Tiller45724b32015-09-22 10:42:19 -0700422 }
Craig Tillera82950e2015-09-22 12:33:20 -0700423
Craig Tiller1c10a7b2017-03-29 14:35:16 -0700424 if (pc->first_child == NULL) {
425 pc->first_child = call;
426 cc->sibling_next = cc->sibling_prev = call;
Craig Tillera82950e2015-09-22 12:33:20 -0700427 } else {
Craig Tiller1c10a7b2017-03-29 14:35:16 -0700428 cc->sibling_next = pc->first_child;
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700429 cc->sibling_prev = pc->first_child->child->sibling_prev;
430 cc->sibling_next->child->sibling_prev =
431 cc->sibling_prev->child->sibling_next = call;
Craig Tillera82950e2015-09-22 12:33:20 -0700432 }
433
Craig Tiller1c10a7b2017-03-29 14:35:16 -0700434 gpr_mu_unlock(&pc->child_list_mu);
Craig Tillera82950e2015-09-22 12:33:20 -0700435 }
Mark D. Rothf28763c2016-09-14 15:18:40 -0700436
Mark D. Roth14c072c2016-08-26 08:31:34 -0700437 call->send_deadline = send_deadline;
Mark D. Rothf28763c2016-09-14 15:18:40 -0700438
Craig Tillerca3451d2016-09-29 10:27:44 -0700439 GRPC_CHANNEL_INTERNAL_REF(args->channel, "call");
Craig Tillerdd36b152017-03-31 08:27:28 -0700440 /* initial refcount dropped by grpc_call_unref */
Craig Tillerd426cac2017-03-13 12:30:45 -0700441 grpc_call_element_args call_args = {
442 .call_stack = CALL_STACK_FROM_CALL(call),
443 .server_transport_data = args->server_transport_data,
444 .context = call->context,
445 .path = path,
446 .start_time = call->start_time,
447 .deadline = send_deadline,
Mark D. Roth764cf042017-09-01 09:00:06 -0700448 .arena = call->arena,
449 .call_combiner = &call->call_combiner};
Craig Tillerf4484cd2017-02-01 08:28:40 -0800450 add_init_error(&error, grpc_call_stack_init(exec_ctx, channel_stack, 1,
Craig Tillerd426cac2017-03-13 12:30:45 -0700451 destroy_call, call, &call_args));
Mark D. Rothf28763c2016-09-14 15:18:40 -0700452 if (error != GRPC_ERROR_NONE) {
Craig Tiller58b30cd2017-01-31 17:07:36 -0800453 cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE,
454 GRPC_ERROR_REF(error));
Craig Tillera82950e2015-09-22 12:33:20 -0700455 }
Craig Tillerbea92ba2017-04-19 08:33:31 -0700456 if (immediately_cancel) {
457 cancel_with_error(exec_ctx, call, STATUS_FROM_API_OVERRIDE,
458 GRPC_ERROR_CANCELLED);
459 }
Craig Tillerca3451d2016-09-29 10:27:44 -0700460 if (args->cq != NULL) {
Mark D. Rothf28763c2016-09-14 15:18:40 -0700461 GPR_ASSERT(
Craig Tillerca3451d2016-09-29 10:27:44 -0700462 args->pollset_set_alternative == NULL &&
Mark D. Rothf28763c2016-09-14 15:18:40 -0700463 "Only one of 'cq' and 'pollset_set_alternative' should be non-NULL.");
Craig Tillerca3451d2016-09-29 10:27:44 -0700464 GRPC_CQ_INTERNAL_REF(args->cq, "bind");
Mark D. Rothf28763c2016-09-14 15:18:40 -0700465 call->pollent =
Craig Tillerca3451d2016-09-29 10:27:44 -0700466 grpc_polling_entity_create_from_pollset(grpc_cq_pollset(args->cq));
Mark D. Rothf28763c2016-09-14 15:18:40 -0700467 }
Craig Tillerca3451d2016-09-29 10:27:44 -0700468 if (args->pollset_set_alternative != NULL) {
469 call->pollent = grpc_polling_entity_create_from_pollset_set(
470 args->pollset_set_alternative);
Mark D. Rothf28763c2016-09-14 15:18:40 -0700471 }
472 if (!grpc_polling_entity_is_empty(&call->pollent)) {
473 grpc_call_stack_set_pollset_or_pollset_set(
Craig Tillera59c16c2016-10-31 07:25:01 -0700474 exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent);
Mark D. Rothf28763c2016-09-14 15:18:40 -0700475 }
476
Craig Tiller4eecdde2016-11-14 08:21:17 -0800477 grpc_slice_unref_internal(exec_ctx, path);
Mark D. Rothaa850a72016-09-26 13:38:02 -0700478
Craig Tiller0ba432d2015-10-09 16:57:11 -0700479 GPR_TIMER_END("grpc_call_create", 0);
Craig Tiller8e214652016-08-19 09:54:31 -0700480 return error;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800481}
482
Craig Tillera82950e2015-09-22 12:33:20 -0700483void grpc_call_set_completion_queue(grpc_exec_ctx *exec_ctx, grpc_call *call,
484 grpc_completion_queue *cq) {
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800485 GPR_ASSERT(cq);
David Garcia Quintasf72eb972016-05-03 18:28:09 -0700486
David Garcia Quintasc4d51122016-06-06 14:56:02 -0700487 if (grpc_polling_entity_pollset_set(&call->pollent) != NULL) {
David Garcia Quintasf72eb972016-05-03 18:28:09 -0700488 gpr_log(GPR_ERROR, "A pollset_set is already registered for this call.");
489 abort();
490 }
Craig Tiller166e2502015-02-03 20:14:41 -0800491 call->cq = cq;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800492 GRPC_CQ_INTERNAL_REF(cq, "bind");
David Garcia Quintasc4d51122016-06-06 14:56:02 -0700493 call->pollent = grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq));
David Garcia Quintas4afce7e2016-04-18 16:25:17 -0700494 grpc_call_stack_set_pollset_or_pollset_set(
David Garcia Quintas2a50dfe2016-05-31 15:09:12 -0700495 exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent);
Craig Tiller166e2502015-02-03 20:14:41 -0800496}
497
ncteisen9c43fc02017-06-08 16:06:23 -0700498#ifndef NDEBUG
Craig Tiller7b435612015-11-24 08:15:05 -0800499#define REF_REASON reason
500#define REF_ARG , const char *reason
Craig Tiller4df412b2015-04-28 07:57:54 -0700501#else
Craig Tiller7b435612015-11-24 08:15:05 -0800502#define REF_REASON ""
503#define REF_ARG
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800504#endif
Craig Tiller7b435612015-11-24 08:15:05 -0800505void grpc_call_internal_ref(grpc_call *c REF_ARG) {
506 GRPC_CALL_STACK_REF(CALL_STACK_FROM_CALL(c), REF_REASON);
507}
508void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c REF_ARG) {
509 GRPC_CALL_STACK_UNREF(exec_ctx, CALL_STACK_FROM_CALL(c), REF_REASON);
510}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800511
Craig Tillere7a17022017-03-13 10:20:38 -0700512static void release_call(grpc_exec_ctx *exec_ctx, void *call,
513 grpc_error *error) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700514 grpc_call *c = (grpc_call *)call;
Craig Tiller51006fe2017-03-15 08:07:02 -0700515 grpc_channel *channel = c->channel;
Mark D. Roth764cf042017-09-01 09:00:06 -0700516 grpc_call_combiner_destroy(&c->call_combiner);
517 gpr_free((char *)c->peer_string);
Craig Tillerdbad3702017-03-15 08:21:19 -0700518 grpc_channel_update_call_size_estimate(channel, gpr_arena_destroy(c->arena));
Craig Tiller51006fe2017-03-15 08:07:02 -0700519 GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call");
Craig Tillere7a17022017-03-13 10:20:38 -0700520}
521
David Garcia Quintas01c4d992016-07-07 20:11:27 -0700522static void set_status_value_directly(grpc_status_code status, void *dest);
Craig Tillerc027e772016-05-03 16:27:00 -0700523static void destroy_call(grpc_exec_ctx *exec_ctx, void *call,
524 grpc_error *error) {
Craig Tiller566316f2015-02-02 15:25:32 -0800525 size_t i;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800526 int ii;
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700527 grpc_call *c = (grpc_call *)call;
Craig Tiller0ba432d2015-10-09 16:57:11 -0700528 GPR_TIMER_BEGIN("destroy_call", 0);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800529 for (i = 0; i < 2; i++) {
530 grpc_metadata_batch_destroy(
Craig Tillera59c16c2016-10-31 07:25:01 -0700531 exec_ctx, &c->metadata_batch[1 /* is_receiving */][i /* is_initial */]);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800532 }
533 if (c->receiving_stream != NULL) {
Craig Tiller3b66ab92015-12-09 19:42:22 -0800534 grpc_byte_stream_destroy(exec_ctx, c->receiving_stream);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800535 }
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700536 parent_call *pc = get_parent_call(c);
Craig Tiller1c10a7b2017-03-29 14:35:16 -0700537 if (pc != NULL) {
538 gpr_mu_destroy(&pc->child_list_mu);
539 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800540 for (ii = 0; ii < c->send_extra_metadata_count; ii++) {
Craig Tillera59c16c2016-10-31 07:25:01 -0700541 GRPC_MDELEM_UNREF(exec_ctx, c->send_extra_metadata[ii].md);
Craig Tillera82950e2015-09-22 12:33:20 -0700542 }
543 for (i = 0; i < GRPC_CONTEXT_COUNT; i++) {
544 if (c->context[i].destroy) {
545 c->context[i].destroy(c->context[i].value);
Craig Tiller935cf422015-05-01 14:10:46 -0700546 }
Craig Tillera82950e2015-09-22 12:33:20 -0700547 }
Craig Tillera82950e2015-09-22 12:33:20 -0700548 if (c->cq) {
Craig Tillerf8401102017-04-17 09:47:28 -0700549 GRPC_CQ_INTERNAL_UNREF(exec_ctx, c->cq, "bind");
Craig Tillera82950e2015-09-22 12:33:20 -0700550 }
David Garcia Quintas01c4d992016-07-07 20:11:27 -0700551
Yash Tibrewal090aca52017-09-12 12:14:13 -0700552 get_final_status(c, set_status_value_directly, &c->final_info.final_status,
553 NULL);
Mark D. Roth3d883412016-11-07 13:42:54 -0800554 c->final_info.stats.latency =
555 gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time);
David Garcia Quintas01c4d992016-07-07 20:11:27 -0700556
Craig Tiller841a99d2016-12-12 16:58:57 -0800557 for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
Craig Tiller4bab9462017-02-22 08:56:02 -0800558 GRPC_ERROR_UNREF(
Craig Tillerb597dcf2017-03-09 07:02:11 -0800559 unpack_received_status(gpr_atm_acq_load(&c->status[i])).error);
Craig Tiller841a99d2016-12-12 16:58:57 -0800560 }
561
Craig Tillere7a17022017-03-13 10:20:38 -0700562 grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->final_info,
ncteisen969b46e2017-06-08 14:57:11 -0700563 GRPC_CLOSURE_INIT(&c->release_call, release_call, c,
Craig Tillere7a17022017-03-13 10:20:38 -0700564 grpc_schedule_on_exec_ctx));
Craig Tiller0ba432d2015-10-09 16:57:11 -0700565 GPR_TIMER_END("destroy_call", 0);
Craig Tillera4541102015-01-29 11:46:11 -0800566}
567
Craig Tillerdd36b152017-03-31 08:27:28 -0700568void grpc_call_ref(grpc_call *c) { gpr_ref(&c->ext_ref); }
569
570void grpc_call_unref(grpc_call *c) {
571 if (!gpr_unref(&c->ext_ref)) return;
572
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700573 child_call *cc = c->child;
Craig Tiller841a99d2016-12-12 16:58:57 -0800574 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
Craig Tillerb8d3a312015-06-19 17:27:53 -0700575
Craig Tillerdd36b152017-03-31 08:27:28 -0700576 GPR_TIMER_BEGIN("grpc_call_unref", 0);
577 GRPC_API_TRACE("grpc_call_unref(c=%p)", 1, (c));
Craig Tiller841a99d2016-12-12 16:58:57 -0800578
Craig Tiller9fd9a442017-04-05 16:52:34 -0700579 if (cc) {
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700580 parent_call *pc = get_parent_call(cc->parent);
Craig Tiller1c10a7b2017-03-29 14:35:16 -0700581 gpr_mu_lock(&pc->child_list_mu);
582 if (c == pc->first_child) {
583 pc->first_child = cc->sibling_next;
584 if (c == pc->first_child) {
585 pc->first_child = NULL;
Craig Tiller841a99d2016-12-12 16:58:57 -0800586 }
Craig Tiller841a99d2016-12-12 16:58:57 -0800587 }
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -0700588 cc->sibling_prev->child->sibling_next = cc->sibling_next;
589 cc->sibling_next->child->sibling_prev = cc->sibling_prev;
Craig Tiller1c10a7b2017-03-29 14:35:16 -0700590 gpr_mu_unlock(&pc->child_list_mu);
591 GRPC_CALL_INTERNAL_UNREF(&exec_ctx, cc->parent, "child");
Craig Tiller841a99d2016-12-12 16:58:57 -0800592 }
593
Craig Tiller841a99d2016-12-12 16:58:57 -0800594 GPR_ASSERT(!c->destroy_called);
595 c->destroy_called = 1;
Craig Tiller1c10a7b2017-03-29 14:35:16 -0700596 bool cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) != 0 &&
597 gpr_atm_acq_load(&c->received_final_op_atm) == 0;
Craig Tiller37cbc3f2017-02-16 14:54:55 -0800598 if (cancel) {
599 cancel_with_error(&exec_ctx, c, STATUS_FROM_API_OVERRIDE,
600 GRPC_ERROR_CANCELLED);
Mark D. Roth180c6b12017-09-05 13:46:41 -0700601 } else {
602 // Unset the call combiner cancellation closure. This has the
603 // effect of scheduling the previously set cancellation closure, if
604 // any, so that it can release any internal references it may be
605 // holding to the call stack.
606 grpc_call_combiner_set_notify_on_cancel(&exec_ctx, &c->call_combiner, NULL);
Craig Tiller37cbc3f2017-02-16 14:54:55 -0800607 }
Craig Tiller841a99d2016-12-12 16:58:57 -0800608 GRPC_CALL_INTERNAL_UNREF(&exec_ctx, c, "destroy");
609 grpc_exec_ctx_finish(&exec_ctx);
Craig Tillerdd36b152017-03-31 08:27:28 -0700610 GPR_TIMER_END("grpc_call_unref", 0);
Craig Tillerf0f70a82016-06-23 13:55:06 -0700611}
Craig Tiller30547562015-02-05 17:04:51 -0800612
Craig Tiller841a99d2016-12-12 16:58:57 -0800613grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) {
614 GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved));
615 GPR_ASSERT(!reserved);
Craig Tiller37cbc3f2017-02-16 14:54:55 -0800616 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
617 cancel_with_error(&exec_ctx, call, STATUS_FROM_API_OVERRIDE,
618 GRPC_ERROR_CANCELLED);
619 grpc_exec_ctx_finish(&exec_ctx);
620 return GRPC_CALL_OK;
Craig Tiller841a99d2016-12-12 16:58:57 -0800621}
622
Mark D. Roth764cf042017-09-01 09:00:06 -0700623// This is called via the call combiner to start sending a batch down
624// the filter stack.
625static void execute_batch_in_call_combiner(grpc_exec_ctx *exec_ctx, void *arg,
626 grpc_error *ignored) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700627 grpc_transport_stream_op_batch *batch = (grpc_transport_stream_op_batch *)arg;
628 grpc_call *call = (grpc_call *)batch->handler_private.extra_arg;
Mark D. Roth764cf042017-09-01 09:00:06 -0700629 GPR_TIMER_BEGIN("execute_batch", 0);
630 grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0);
631 GRPC_CALL_LOG_OP(GPR_INFO, elem, batch);
632 elem->filter->start_transport_stream_op_batch(exec_ctx, elem, batch);
633 GPR_TIMER_END("execute_batch", 0);
634}
Craig Tiller841a99d2016-12-12 16:58:57 -0800635
Mark D. Roth764cf042017-09-01 09:00:06 -0700636// start_batch_closure points to a caller-allocated closure to be used
637// for entering the call combiner.
638static void execute_batch(grpc_exec_ctx *exec_ctx, grpc_call *call,
639 grpc_transport_stream_op_batch *batch,
640 grpc_closure *start_batch_closure) {
641 batch->handler_private.extra_arg = call;
642 GRPC_CLOSURE_INIT(start_batch_closure, execute_batch_in_call_combiner, batch,
643 grpc_schedule_on_exec_ctx);
644 GRPC_CALL_COMBINER_START(exec_ctx, &call->call_combiner, start_batch_closure,
645 GRPC_ERROR_NONE, "executing batch");
Craig Tiller841a99d2016-12-12 16:58:57 -0800646}
647
648char *grpc_call_get_peer(grpc_call *call) {
Mark D. Roth764cf042017-09-01 09:00:06 -0700649 char *peer_string = (char *)gpr_atm_acq_load(&call->peer_string);
650 if (peer_string != NULL) return gpr_strdup(peer_string);
651 peer_string = grpc_channel_get_target(call->channel);
652 if (peer_string != NULL) return peer_string;
653 return gpr_strdup("unknown");
Craig Tiller841a99d2016-12-12 16:58:57 -0800654}
655
656grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
657 return CALL_FROM_TOP_ELEM(elem);
658}
659
660/*******************************************************************************
661 * CANCELLATION
662 */
663
664grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
665 grpc_status_code status,
666 const char *description,
667 void *reserved) {
Craig Tiller841a99d2016-12-12 16:58:57 -0800668 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
669 GRPC_API_TRACE(
670 "grpc_call_cancel_with_status("
671 "c=%p, status=%d, description=%s, reserved=%p)",
672 4, (c, (int)status, description, reserved));
673 GPR_ASSERT(reserved == NULL);
Craig Tiller2dc32ea2017-01-31 15:32:34 -0800674 cancel_with_status(&exec_ctx, c, STATUS_FROM_API_OVERRIDE, status,
675 description);
Craig Tiller841a99d2016-12-12 16:58:57 -0800676 grpc_exec_ctx_finish(&exec_ctx);
Craig Tiller2dc32ea2017-01-31 15:32:34 -0800677 return GRPC_CALL_OK;
Craig Tiller841a99d2016-12-12 16:58:57 -0800678}
679
Mark D. Roth764cf042017-09-01 09:00:06 -0700680typedef struct {
681 grpc_call *call;
682 grpc_closure start_batch;
683 grpc_closure finish_batch;
684} cancel_state;
685
686// The on_complete callback used when sending a cancel_stream batch down
687// the filter stack. Yields the call combiner when the batch is done.
688static void done_termination(grpc_exec_ctx *exec_ctx, void *arg,
Craig Tiller841a99d2016-12-12 16:58:57 -0800689 grpc_error *error) {
Mark D. Roth764cf042017-09-01 09:00:06 -0700690 cancel_state *state = (cancel_state *)arg;
691 GRPC_CALL_COMBINER_STOP(exec_ctx, &state->call->call_combiner,
692 "on_complete for cancel_stream op");
693 GRPC_CALL_INTERNAL_UNREF(exec_ctx, state->call, "termination");
694 gpr_free(state);
Craig Tiller841a99d2016-12-12 16:58:57 -0800695}
696
Craig Tiller255edaa2016-12-13 09:04:55 -0800697static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c,
Craig Tiller2dc32ea2017-01-31 15:32:34 -0800698 status_source source, grpc_error *error) {
Craig Tillerc5b90df2017-03-10 16:11:08 -0800699 GRPC_CALL_INTERNAL_REF(c, "termination");
Mark D. Roth764cf042017-09-01 09:00:06 -0700700 // Inform the call combiner of the cancellation, so that it can cancel
701 // any in-flight asynchronous actions that may be holding the call
702 // combiner. This ensures that the cancel_stream batch can be sent
703 // down the filter stack in a timely manner.
704 grpc_call_combiner_cancel(exec_ctx, &c->call_combiner, GRPC_ERROR_REF(error));
Craig Tiller2dc32ea2017-01-31 15:32:34 -0800705 set_status_from_error(exec_ctx, c, source, GRPC_ERROR_REF(error));
Mark D. Roth764cf042017-09-01 09:00:06 -0700706 cancel_state *state = (cancel_state *)gpr_malloc(sizeof(*state));
707 state->call = c;
708 GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state,
709 grpc_schedule_on_exec_ctx);
710 grpc_transport_stream_op_batch *op =
711 grpc_make_transport_stream_op(&state->finish_batch);
Craig Tiller22b182b2017-03-16 10:56:58 -0700712 op->cancel_stream = true;
713 op->payload->cancel_stream.cancel_error = error;
Mark D. Roth764cf042017-09-01 09:00:06 -0700714 execute_batch(exec_ctx, c, op, &state->start_batch);
Craig Tiller255edaa2016-12-13 09:04:55 -0800715}
716
Craig Tiller841a99d2016-12-12 16:58:57 -0800717static grpc_error *error_from_status(grpc_status_code status,
718 const char *description) {
Alexander Polcyn088e85c2017-07-28 14:53:20 -0700719 // copying 'description' is needed to ensure the grpc_call_cancel_with_status
720 // guarantee that can be short-lived.
Craig Tiller841a99d2016-12-12 16:58:57 -0800721 return grpc_error_set_int(
ncteisen4b36a3d2017-03-13 19:08:06 -0700722 grpc_error_set_str(GRPC_ERROR_CREATE_FROM_COPIED_STRING(description),
723 GRPC_ERROR_STR_GRPC_MESSAGE,
724 grpc_slice_from_copied_string(description)),
Craig Tiller841a99d2016-12-12 16:58:57 -0800725 GRPC_ERROR_INT_GRPC_STATUS, status);
726}
727
Craig Tiller2dc32ea2017-01-31 15:32:34 -0800728static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
729 status_source source, grpc_status_code status,
730 const char *description) {
731 cancel_with_error(exec_ctx, c, source,
732 error_from_status(status, description));
Craig Tiller841a99d2016-12-12 16:58:57 -0800733}
734
735/*******************************************************************************
736 * FINAL STATUS CODE MANIPULATION
737 */
738
Craig Tiller58b30cd2017-01-31 17:07:36 -0800739static bool get_final_status_from(
Craig Tiller4bab9462017-02-22 08:56:02 -0800740 grpc_call *call, grpc_error *error, bool allow_ok_status,
Craig Tiller58b30cd2017-01-31 17:07:36 -0800741 void (*set_value)(grpc_status_code code, void *user_data),
742 void *set_value_user_data, grpc_slice *details) {
Craig Tiller737b6252017-01-09 15:25:15 -0800743 grpc_status_code code;
ncteiseneb2b1152017-03-28 15:27:27 -0700744 grpc_slice slice = grpc_empty_slice();
ncteisenbbb38012017-03-10 14:58:43 -0800745 grpc_error_get_status(error, call->send_deadline, &code, &slice, NULL);
Craig Tiller58b30cd2017-01-31 17:07:36 -0800746 if (code == GRPC_STATUS_OK && !allow_ok_status) {
747 return false;
748 }
Craig Tiller737b6252017-01-09 15:25:15 -0800749
750 set_value(code, set_value_user_data);
751 if (details != NULL) {
ncteisenbbb38012017-03-10 14:58:43 -0800752 *details = grpc_slice_ref_internal(slice);
Craig Tiller737b6252017-01-09 15:25:15 -0800753 }
Craig Tiller58b30cd2017-01-31 17:07:36 -0800754 return true;
Craig Tiller737b6252017-01-09 15:25:15 -0800755}
756
Craig Tiller841a99d2016-12-12 16:58:57 -0800757static void get_final_status(grpc_call *call,
758 void (*set_value)(grpc_status_code code,
759 void *user_data),
760 void *set_value_user_data, grpc_slice *details) {
761 int i;
Craig Tiller4bab9462017-02-22 08:56:02 -0800762 received_status status[STATUS_SOURCE_COUNT];
763 for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
764 status[i] = unpack_received_status(gpr_atm_acq_load(&call->status[i]));
765 }
Craig Tiller84f75d42017-05-03 13:06:35 -0700766 if (GRPC_TRACER_ON(grpc_call_error_trace)) {
Craig Tiller58b30cd2017-01-31 17:07:36 -0800767 gpr_log(GPR_DEBUG, "get_final_status %s", call->is_client ? "CLI" : "SVR");
768 for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
Craig Tiller4bab9462017-02-22 08:56:02 -0800769 if (status[i].is_set) {
770 gpr_log(GPR_DEBUG, " %d: %s", i, grpc_error_string(status[i].error));
Craig Tiller58b30cd2017-01-31 17:07:36 -0800771 }
Craig Tiller841a99d2016-12-12 16:58:57 -0800772 }
773 }
Craig Tiller58b30cd2017-01-31 17:07:36 -0800774 /* first search through ignoring "OK" statuses: if something went wrong,
775 * ensure we report it */
776 for (int allow_ok_status = 0; allow_ok_status < 2; allow_ok_status++) {
777 /* search for the best status we can present: ideally the error we use has a
778 clearly defined grpc-status, and we'll prefer that. */
779 for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
Craig Tiller4bab9462017-02-22 08:56:02 -0800780 if (status[i].is_set &&
781 grpc_error_has_clear_grpc_status(status[i].error)) {
782 if (get_final_status_from(call, status[i].error, allow_ok_status != 0,
Craig Tiller58b30cd2017-01-31 17:07:36 -0800783 set_value, set_value_user_data, details)) {
784 return;
785 }
786 }
787 }
788 /* If no clearly defined status exists, search for 'anything' */
789 for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
Craig Tiller4bab9462017-02-22 08:56:02 -0800790 if (status[i].is_set) {
791 if (get_final_status_from(call, status[i].error, allow_ok_status != 0,
Craig Tiller58b30cd2017-01-31 17:07:36 -0800792 set_value, set_value_user_data, details)) {
793 return;
794 }
795 }
Craig Tiller737b6252017-01-09 15:25:15 -0800796 }
797 }
798 /* If nothing exists, set some default */
Craig Tiller841a99d2016-12-12 16:58:57 -0800799 if (call->is_client) {
800 set_value(GRPC_STATUS_UNKNOWN, set_value_user_data);
Craig Tillerbe1b9a72016-06-24 13:22:11 -0700801 } else {
Craig Tiller841a99d2016-12-12 16:58:57 -0800802 set_value(GRPC_STATUS_OK, set_value_user_data);
Craig Tillerf0f70a82016-06-23 13:55:06 -0700803 }
Craig Tillerf0f70a82016-06-23 13:55:06 -0700804}
805
Craig Tillera59c16c2016-10-31 07:25:01 -0700806static void set_status_from_error(grpc_exec_ctx *exec_ctx, grpc_call *call,
807 status_source source, grpc_error *error) {
Craig Tiller4bab9462017-02-22 08:56:02 -0800808 if (!gpr_atm_rel_cas(&call->status[source],
809 pack_received_status((received_status){
810 .is_set = false, .error = GRPC_ERROR_NONE}),
811 pack_received_status((received_status){
812 .is_set = true, .error = error}))) {
Craig Tiller841a99d2016-12-12 16:58:57 -0800813 GRPC_ERROR_UNREF(error);
Craig Tiller841a99d2016-12-12 16:58:57 -0800814 }
Craig Tiller68752722015-01-29 14:59:54 -0800815}
816
Craig Tiller841a99d2016-12-12 16:58:57 -0800817/*******************************************************************************
818 * COMPRESSION
819 */
820
David Garcia Quintasac094472016-05-18 20:25:57 -0700821static void set_incoming_compression_algorithm(
822 grpc_call *call, grpc_compression_algorithm algo) {
David Garcia Quintas303d3082016-05-05 18:25:34 -0700823 GPR_ASSERT(algo < GRPC_COMPRESS_ALGORITHMS_COUNT);
David Garcia Quintas749367f2016-05-17 19:15:24 -0700824 call->incoming_compression_algorithm = algo;
David Garcia Quintasdb94b272015-06-15 18:37:01 -0700825}
826
Muxi Yan68a0fd52017-07-21 09:26:04 -0700827static void set_incoming_stream_compression_algorithm(
828 grpc_call *call, grpc_stream_compression_algorithm algo) {
829 GPR_ASSERT(algo < GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT);
830 call->incoming_stream_compression_algorithm = algo;
831}
832
David Garcia Quintas0c331882015-10-08 14:51:54 -0700833grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm(
David Garcia Quintas64824be2015-10-06 19:45:36 -0700834 grpc_call *call) {
835 grpc_compression_algorithm algorithm;
David Garcia Quintas749367f2016-05-17 19:15:24 -0700836 algorithm = call->incoming_compression_algorithm;
David Garcia Quintas64824be2015-10-06 19:45:36 -0700837 return algorithm;
David Garcia Quintas7c0d9142015-07-23 04:58:20 -0700838}
839
David Garcia Quintasa301eaa2016-05-06 16:59:03 -0700840static grpc_compression_algorithm compression_algorithm_for_level_locked(
841 grpc_call *call, grpc_compression_level level) {
David Garcia Quintasac094472016-05-18 20:25:57 -0700842 return grpc_compression_algorithm_for_level(level,
843 call->encodings_accepted_by_peer);
David Garcia Quintasa301eaa2016-05-06 16:59:03 -0700844}
845
Muxi Yan68a0fd52017-07-21 09:26:04 -0700846static grpc_stream_compression_algorithm
847stream_compression_algorithm_for_level_locked(
848 grpc_call *call, grpc_stream_compression_level level) {
849 return grpc_stream_compression_algorithm_for_level(
850 level, call->stream_encodings_accepted_by_peer);
851}
852
Craig Tiller7536af02015-12-22 13:49:30 -0800853uint32_t grpc_call_test_only_get_message_flags(grpc_call *call) {
854 uint32_t flags;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800855 flags = call->test_only_last_message_flags;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800856 return flags;
857}
858
Craig Tiller3ff27542015-10-09 15:39:44 -0700859static void destroy_encodings_accepted_by_peer(void *p) { return; }
860
Craig Tillera59c16c2016-10-31 07:25:01 -0700861static void set_encodings_accepted_by_peer(grpc_exec_ctx *exec_ctx,
Craig Tiller0160de92016-11-18 08:46:46 -0800862 grpc_call *call, grpc_mdelem mdel) {
David Garcia Quintasb8edf7e2015-07-08 20:18:57 -0700863 size_t i;
864 grpc_compression_algorithm algorithm;
Craig Tillerd41a4a72016-10-26 16:16:06 -0700865 grpc_slice_buffer accept_encoding_parts;
866 grpc_slice accept_encoding_slice;
Craig Tiller3ff27542015-10-09 15:39:44 -0700867 void *accepted_user_data;
David Garcia Quintasb8edf7e2015-07-08 20:18:57 -0700868
Craig Tiller3ff27542015-10-09 15:39:44 -0700869 accepted_user_data =
870 grpc_mdelem_get_user_data(mdel, destroy_encodings_accepted_by_peer);
871 if (accepted_user_data != NULL) {
872 call->encodings_accepted_by_peer =
Craig Tiller7536af02015-12-22 13:49:30 -0800873 (uint32_t)(((uintptr_t)accepted_user_data) - 1);
Craig Tiller3ff27542015-10-09 15:39:44 -0700874 return;
875 }
876
Craig Tiller0160de92016-11-18 08:46:46 -0800877 accept_encoding_slice = GRPC_MDVALUE(mdel);
Craig Tillerd41a4a72016-10-26 16:16:06 -0700878 grpc_slice_buffer_init(&accept_encoding_parts);
879 grpc_slice_split(accept_encoding_slice, ",", &accept_encoding_parts);
David Garcia Quintasb8edf7e2015-07-08 20:18:57 -0700880
David Garcia Quintase091af82015-07-15 21:37:02 -0700881 /* No need to zero call->encodings_accepted_by_peer: grpc_call_create already
882 * zeroes the whole grpc_call */
David Garcia Quintasb1866bd2015-07-08 22:37:01 -0700883 /* Always support no compression */
Craig Tillera82950e2015-09-22 12:33:20 -0700884 GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
885 for (i = 0; i < accept_encoding_parts.count; i++) {
Craig Tiller68208fe2016-11-14 14:35:02 -0800886 grpc_slice accept_encoding_entry_slice = accept_encoding_parts.slices[i];
887 if (grpc_compression_algorithm_parse(accept_encoding_entry_slice,
888 &algorithm)) {
Craig Tillera82950e2015-09-22 12:33:20 -0700889 GPR_BITSET(&call->encodings_accepted_by_peer, algorithm);
890 } else {
891 char *accept_encoding_entry_str =
Craig Tillerb4aa70e2016-12-09 09:40:11 -0800892 grpc_slice_to_c_string(accept_encoding_entry_slice);
Craig Tillera82950e2015-09-22 12:33:20 -0700893 gpr_log(GPR_ERROR,
894 "Invalid entry in accept encoding metadata: '%s'. Ignoring.",
895 accept_encoding_entry_str);
896 gpr_free(accept_encoding_entry_str);
David Garcia Quintasb8edf7e2015-07-08 20:18:57 -0700897 }
Craig Tillera82950e2015-09-22 12:33:20 -0700898 }
Craig Tiller3ff27542015-10-09 15:39:44 -0700899
Craig Tillera59c16c2016-10-31 07:25:01 -0700900 grpc_slice_buffer_destroy_internal(exec_ctx, &accept_encoding_parts);
Craig Tiller3ff27542015-10-09 15:39:44 -0700901
902 grpc_mdelem_set_user_data(
903 mdel, destroy_encodings_accepted_by_peer,
Craig Tiller7536af02015-12-22 13:49:30 -0800904 (void *)(((uintptr_t)call->encodings_accepted_by_peer) + 1));
David Garcia Quintasb8edf7e2015-07-08 20:18:57 -0700905}
906
Muxi Yan68a0fd52017-07-21 09:26:04 -0700907static void set_stream_encodings_accepted_by_peer(grpc_exec_ctx *exec_ctx,
908 grpc_call *call,
909 grpc_mdelem mdel) {
910 size_t i;
911 grpc_stream_compression_algorithm algorithm;
912 grpc_slice_buffer accept_encoding_parts;
913 grpc_slice accept_encoding_slice;
914 void *accepted_user_data;
915
916 accepted_user_data =
917 grpc_mdelem_get_user_data(mdel, destroy_encodings_accepted_by_peer);
918 if (accepted_user_data != NULL) {
919 call->stream_encodings_accepted_by_peer =
920 (uint32_t)(((uintptr_t)accepted_user_data) - 1);
921 return;
922 }
923
924 accept_encoding_slice = GRPC_MDVALUE(mdel);
925 grpc_slice_buffer_init(&accept_encoding_parts);
926 grpc_slice_split(accept_encoding_slice, ",", &accept_encoding_parts);
927
928 /* Always support no compression */
929 GPR_BITSET(&call->stream_encodings_accepted_by_peer,
930 GRPC_STREAM_COMPRESS_NONE);
931 for (i = 0; i < accept_encoding_parts.count; i++) {
932 grpc_slice accept_encoding_entry_slice = accept_encoding_parts.slices[i];
933 if (grpc_stream_compression_algorithm_parse(accept_encoding_entry_slice,
934 &algorithm)) {
935 GPR_BITSET(&call->stream_encodings_accepted_by_peer, algorithm);
936 } else {
937 char *accept_encoding_entry_str =
938 grpc_slice_to_c_string(accept_encoding_entry_slice);
939 gpr_log(GPR_ERROR,
940 "Invalid entry in accept encoding metadata: '%s'. Ignoring.",
941 accept_encoding_entry_str);
942 gpr_free(accept_encoding_entry_str);
943 }
944 }
945
946 grpc_slice_buffer_destroy_internal(exec_ctx, &accept_encoding_parts);
947
948 grpc_mdelem_set_user_data(
949 mdel, destroy_encodings_accepted_by_peer,
950 (void *)(((uintptr_t)call->stream_encodings_accepted_by_peer) + 1));
951}
952
Craig Tiller7536af02015-12-22 13:49:30 -0800953uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call *call) {
954 uint32_t encodings_accepted_by_peer;
David Garcia Quintas0c331882015-10-08 14:51:54 -0700955 encodings_accepted_by_peer = call->encodings_accepted_by_peer;
David Garcia Quintas0c331882015-10-08 14:51:54 -0700956 return encodings_accepted_by_peer;
Craig Tiller68752722015-01-29 14:59:54 -0800957}
958
Muxi Yan68a0fd52017-07-21 09:26:04 -0700959uint32_t grpc_call_test_only_get_stream_encodings_accepted_by_peer(
960 grpc_call *call) {
961 uint32_t stream_encodings_accepted_by_peer;
962 stream_encodings_accepted_by_peer = call->stream_encodings_accepted_by_peer;
963 return stream_encodings_accepted_by_peer;
964}
965
966grpc_stream_compression_algorithm
967grpc_call_test_only_get_incoming_stream_encodings(grpc_call *call) {
968 return call->incoming_stream_compression_algorithm;
969}
970
Noah Eisene6432bf2017-07-31 16:37:15 -0700971static grpc_linked_mdelem *linked_from_md(const grpc_metadata *md) {
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800972 return (grpc_linked_mdelem *)&md->internal_data;
Craig Tillerc12fee62015-02-03 11:55:50 -0800973}
974
David Garcia Quintasa301eaa2016-05-06 16:59:03 -0700975static grpc_metadata *get_md_elem(grpc_metadata *metadata,
976 grpc_metadata *additional_metadata, int i,
977 int count) {
978 grpc_metadata *res =
979 i < count ? &metadata[i] : &additional_metadata[i - count];
980 GPR_ASSERT(res);
981 return res;
982}
983
Craig Tillera59c16c2016-10-31 07:25:01 -0700984static int prepare_application_metadata(
985 grpc_exec_ctx *exec_ctx, grpc_call *call, int count,
986 grpc_metadata *metadata, int is_trailing, int prepend_extra_metadata,
987 grpc_metadata *additional_metadata, int additional_metadata_count) {
David Garcia Quintasa301eaa2016-05-06 16:59:03 -0700988 int total_count = count + additional_metadata_count;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -0800989 int i;
990 grpc_metadata_batch *batch =
991 &call->metadata_batch[0 /* is_receiving */][is_trailing];
David Garcia Quintasa301eaa2016-05-06 16:59:03 -0700992 for (i = 0; i < total_count; i++) {
993 const grpc_metadata *md =
994 get_md_elem(metadata, additional_metadata, i, count);
Noah Eisene6432bf2017-07-31 16:37:15 -0700995 grpc_linked_mdelem *l = linked_from_md(md);
Craig Tillerb42445c2016-04-22 13:11:44 -0700996 GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data));
Craig Tillerf2b5b7e2017-01-10 08:28:59 -0800997 if (!GRPC_LOG_IF_ERROR("validate_metadata",
998 grpc_validate_header_key_is_legal(md->key))) {
Craig Tillerb42445c2016-04-22 13:11:44 -0700999 break;
Craig Tillerdf2d9222016-11-18 16:38:57 -08001000 } else if (!grpc_is_binary_header(md->key) &&
Craig Tillerf2b5b7e2017-01-10 08:28:59 -08001001 !GRPC_LOG_IF_ERROR(
1002 "validate_metadata",
1003 grpc_validate_header_nonbin_value_is_legal(md->value))) {
Craig Tillerb42445c2016-04-22 13:11:44 -07001004 break;
1005 }
Craig Tiller1282a672016-11-18 14:57:53 -08001006 l->md = grpc_mdelem_from_grpc_metadata(exec_ctx, (grpc_metadata *)md);
Craig Tillerb42445c2016-04-22 13:11:44 -07001007 }
David Garcia Quintasa301eaa2016-05-06 16:59:03 -07001008 if (i != total_count) {
Craig Tiller5ae3ffb2016-11-18 14:58:32 -08001009 for (int j = 0; j < i; j++) {
David Garcia Quintasa301eaa2016-05-06 16:59:03 -07001010 const grpc_metadata *md =
1011 get_md_elem(metadata, additional_metadata, j, count);
Noah Eisene6432bf2017-07-31 16:37:15 -07001012 grpc_linked_mdelem *l = linked_from_md(md);
Craig Tillera59c16c2016-10-31 07:25:01 -07001013 GRPC_MDELEM_UNREF(exec_ctx, l->md);
Craig Tillerb42445c2016-04-22 13:11:44 -07001014 }
1015 return 0;
1016 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001017 if (prepend_extra_metadata) {
1018 if (call->send_extra_metadata_count == 0) {
1019 prepend_extra_metadata = 0;
Craig Tillera82950e2015-09-22 12:33:20 -07001020 } else {
Craig Tiller09608182016-11-22 15:43:56 -08001021 for (i = 0; i < call->send_extra_metadata_count; i++) {
1022 GRPC_LOG_IF_ERROR("prepare_application_metadata",
1023 grpc_metadata_batch_link_tail(
Craig Tiller9277aa72017-01-11 14:15:38 -08001024 exec_ctx, batch, &call->send_extra_metadata[i]));
Craig Tillera82950e2015-09-22 12:33:20 -07001025 }
Craig Tiller629b0ed2015-04-22 11:14:26 -07001026 }
Craig Tillera82950e2015-09-22 12:33:20 -07001027 }
Craig Tiller09608182016-11-22 15:43:56 -08001028 for (i = 0; i < total_count; i++) {
David Garcia Quintasa301eaa2016-05-06 16:59:03 -07001029 grpc_metadata *md = get_md_elem(metadata, additional_metadata, i, count);
Noah Eisene6432bf2017-07-31 16:37:15 -07001030 grpc_linked_mdelem *l = linked_from_md(md);
1031 grpc_error *error = grpc_metadata_batch_link_tail(exec_ctx, batch, l);
1032 if (error != GRPC_ERROR_NONE) {
1033 GRPC_MDELEM_UNREF(exec_ctx, l->md);
1034 }
1035 GRPC_LOG_IF_ERROR("prepare_application_metadata", error);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001036 }
Craig Tiller09608182016-11-22 15:43:56 -08001037 call->send_extra_metadata_count = 0;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001038
Craig Tillerb96d0012015-05-06 15:33:23 -07001039 return 1;
1040}
1041
Craig Tiller566316f2015-02-02 15:25:32 -08001042/* we offset status by a small amount when storing it into transport metadata
1043 as metadata cannot store a 0 value (which is used as OK for grpc_status_codes
1044 */
1045#define STATUS_OFFSET 1
Craig Tillera82950e2015-09-22 12:33:20 -07001046static void destroy_status(void *ignored) {}
Craig Tiller566316f2015-02-02 15:25:32 -08001047
Craig Tiller0160de92016-11-18 08:46:46 -08001048static uint32_t decode_status(grpc_mdelem md) {
Craig Tiller7536af02015-12-22 13:49:30 -08001049 uint32_t status;
Craig Tillerebdef9d2015-11-19 17:09:49 -08001050 void *user_data;
Craig Tiller0160de92016-11-18 08:46:46 -08001051 if (grpc_mdelem_eq(md, GRPC_MDELEM_GRPC_STATUS_0)) return 0;
1052 if (grpc_mdelem_eq(md, GRPC_MDELEM_GRPC_STATUS_1)) return 1;
1053 if (grpc_mdelem_eq(md, GRPC_MDELEM_GRPC_STATUS_2)) return 2;
Craig Tillerebdef9d2015-11-19 17:09:49 -08001054 user_data = grpc_mdelem_get_user_data(md, destroy_status);
1055 if (user_data != NULL) {
Craig Tiller7536af02015-12-22 13:49:30 -08001056 status = ((uint32_t)(intptr_t)user_data) - STATUS_OFFSET;
Craig Tillera82950e2015-09-22 12:33:20 -07001057 } else {
Craig Tiller0160de92016-11-18 08:46:46 -08001058 if (!grpc_parse_slice_to_uint32(GRPC_MDVALUE(md), &status)) {
Craig Tillera82950e2015-09-22 12:33:20 -07001059 status = GRPC_STATUS_UNKNOWN; /* could not parse status code */
Craig Tiller566316f2015-02-02 15:25:32 -08001060 }
Craig Tillera82950e2015-09-22 12:33:20 -07001061 grpc_mdelem_set_user_data(md, destroy_status,
Craig Tiller7536af02015-12-22 13:49:30 -08001062 (void *)(intptr_t)(status + STATUS_OFFSET));
Craig Tillera82950e2015-09-22 12:33:20 -07001063 }
Craig Tiller566316f2015-02-02 15:25:32 -08001064 return status;
1065}
1066
Craig Tiller0160de92016-11-18 08:46:46 -08001067static grpc_compression_algorithm decode_compression(grpc_mdelem md) {
Craig Tillerebdef9d2015-11-19 17:09:49 -08001068 grpc_compression_algorithm algorithm =
Craig Tiller0160de92016-11-18 08:46:46 -08001069 grpc_compression_algorithm_from_slice(GRPC_MDVALUE(md));
Craig Tillerebdef9d2015-11-19 17:09:49 -08001070 if (algorithm == GRPC_COMPRESS_ALGORITHMS_COUNT) {
Craig Tillerb4aa70e2016-12-09 09:40:11 -08001071 char *md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md));
David Garcia Quintas303d3082016-05-05 18:25:34 -07001072 gpr_log(GPR_ERROR,
1073 "Invalid incoming compression algorithm: '%s'. Interpreting "
1074 "incoming data as uncompressed.",
1075 md_c_str);
Craig Tiller68208fe2016-11-14 14:35:02 -08001076 gpr_free(md_c_str);
David Garcia Quintas303d3082016-05-05 18:25:34 -07001077 return GRPC_COMPRESS_NONE;
Craig Tillera82950e2015-09-22 12:33:20 -07001078 }
David Garcia Quintasfc0fa332015-06-25 18:11:07 -07001079 return algorithm;
David Garcia Quintasdb94b272015-06-15 18:37:01 -07001080}
1081
Muxi Yan68a0fd52017-07-21 09:26:04 -07001082static grpc_stream_compression_algorithm decode_stream_compression(
1083 grpc_mdelem md) {
1084 grpc_stream_compression_algorithm algorithm =
1085 grpc_stream_compression_algorithm_from_slice(GRPC_MDVALUE(md));
1086 if (algorithm == GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT) {
1087 char *md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md));
1088 gpr_log(GPR_ERROR,
1089 "Invalid incoming stream compression algorithm: '%s'. Interpreting "
1090 "incoming data as uncompressed.",
1091 md_c_str);
1092 gpr_free(md_c_str);
1093 return GRPC_STREAM_COMPRESS_NONE;
1094 }
1095 return algorithm;
1096}
1097
Craig Tillera7d37a32016-11-22 14:37:16 -08001098static void publish_app_metadata(grpc_call *call, grpc_metadata_batch *b,
1099 int is_trailing) {
Craig Tiller09608182016-11-22 15:43:56 -08001100 if (b->list.count == 0) return;
Craig Tillera7d37a32016-11-22 14:37:16 -08001101 GPR_TIMER_BEGIN("publish_app_metadata", 0);
Craig Tiller566316f2015-02-02 15:25:32 -08001102 grpc_metadata_array *dest;
1103 grpc_metadata *mdusr;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001104 dest = call->buffered_metadata[is_trailing];
Craig Tillerb0f3bca2016-11-22 14:54:10 -08001105 if (dest->count + b->list.count > dest->capacity) {
1106 dest->capacity =
1107 GPR_MAX(dest->capacity + b->list.count, dest->capacity * 3 / 2);
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001108 dest->metadata = (grpc_metadata *)gpr_realloc(
1109 dest->metadata, sizeof(grpc_metadata) * dest->capacity);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001110 }
Craig Tillera7d37a32016-11-22 14:37:16 -08001111 for (grpc_linked_mdelem *l = b->list.head; l != NULL; l = l->next) {
1112 mdusr = &dest->metadata[dest->count++];
Craig Tillercf0a2022016-11-23 11:36:21 -08001113 /* we pass back borrowed slices that are valid whilst the call is valid */
1114 mdusr->key = GRPC_MDKEY(l->md);
1115 mdusr->value = GRPC_MDVALUE(l->md);
Craig Tillera7d37a32016-11-22 14:37:16 -08001116 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001117 GPR_TIMER_END("publish_app_metadata", 0);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001118}
Craig Tiller566316f2015-02-02 15:25:32 -08001119
Craig Tillera7d37a32016-11-22 14:37:16 -08001120static void recv_initial_filter(grpc_exec_ctx *exec_ctx, grpc_call *call,
1121 grpc_metadata_batch *b) {
Muxi Yan68a0fd52017-07-21 09:26:04 -07001122 if (b->idx.named.content_encoding != NULL) {
1123 if (b->idx.named.grpc_encoding != NULL) {
1124 gpr_log(GPR_ERROR,
1125 "Received both content-encoding and grpc-encoding header. "
1126 "Ignoring grpc-encoding.");
1127 grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_encoding);
1128 }
1129 GPR_TIMER_BEGIN("incoming_stream_compression_algorithm", 0);
1130 set_incoming_stream_compression_algorithm(
1131 call, decode_stream_compression(b->idx.named.content_encoding->md));
1132 GPR_TIMER_END("incoming_stream_compression_algorithm", 0);
1133 grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.content_encoding);
1134 } else if (b->idx.named.grpc_encoding != NULL) {
David Garcia Quintas749367f2016-05-17 19:15:24 -07001135 GPR_TIMER_BEGIN("incoming_compression_algorithm", 0);
Craig Tillera7d37a32016-11-22 14:37:16 -08001136 set_incoming_compression_algorithm(
1137 call, decode_compression(b->idx.named.grpc_encoding->md));
David Garcia Quintas749367f2016-05-17 19:15:24 -07001138 GPR_TIMER_END("incoming_compression_algorithm", 0);
Craig Tillerde7b4672016-11-23 11:13:46 -08001139 grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_encoding);
Craig Tillera82950e2015-09-22 12:33:20 -07001140 }
Craig Tillera7d37a32016-11-22 14:37:16 -08001141 if (b->idx.named.grpc_accept_encoding != NULL) {
1142 GPR_TIMER_BEGIN("encodings_accepted_by_peer", 0);
1143 set_encodings_accepted_by_peer(exec_ctx, call,
1144 b->idx.named.grpc_accept_encoding->md);
Craig Tillerde7b4672016-11-23 11:13:46 -08001145 grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_accept_encoding);
Craig Tillera7d37a32016-11-22 14:37:16 -08001146 GPR_TIMER_END("encodings_accepted_by_peer", 0);
1147 }
Muxi Yan68a0fd52017-07-21 09:26:04 -07001148 if (b->idx.named.accept_encoding != NULL) {
1149 GPR_TIMER_BEGIN("stream_encodings_accepted_by_peer", 0);
1150 set_stream_encodings_accepted_by_peer(exec_ctx, call,
1151 b->idx.named.accept_encoding->md);
1152 grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.accept_encoding);
1153 GPR_TIMER_END("stream_encodings_accepted_by_peer", 0);
1154 }
Craig Tillera7d37a32016-11-22 14:37:16 -08001155 publish_app_metadata(call, b, false);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001156}
Craig Tiller6902ad22015-04-16 08:01:49 -07001157
Craig Tillera7d37a32016-11-22 14:37:16 -08001158static void recv_trailing_filter(grpc_exec_ctx *exec_ctx, void *args,
1159 grpc_metadata_batch *b) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001160 grpc_call *call = (grpc_call *)args;
Mark D. Rothbd3b93b2017-06-22 10:53:01 -07001161 if (b->idx.named.grpc_status != NULL) {
1162 uint32_t status_code = decode_status(b->idx.named.grpc_status->md);
1163 grpc_error *error =
1164 status_code == GRPC_STATUS_OK
1165 ? GRPC_ERROR_NONE
1166 : grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1167 "Error received from peer"),
1168 GRPC_ERROR_INT_GRPC_STATUS,
1169 (intptr_t)status_code);
1170 if (b->idx.named.grpc_message != NULL) {
1171 error = grpc_error_set_str(
1172 error, GRPC_ERROR_STR_GRPC_MESSAGE,
1173 grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_message->md)));
1174 grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_message);
1175 } else if (error != GRPC_ERROR_NONE) {
1176 error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE,
1177 grpc_empty_slice());
1178 }
1179 set_status_from_error(exec_ctx, call, STATUS_FROM_WIRE, error);
1180 grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_status);
1181 }
Craig Tillera7d37a32016-11-22 14:37:16 -08001182 publish_app_metadata(call, b, true);
Craig Tiller629b0ed2015-04-22 11:14:26 -07001183}
Craig Tiller8b282cb2015-04-17 14:57:44 -07001184
Craig Tillera82950e2015-09-22 12:33:20 -07001185grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) {
1186 return CALL_STACK_FROM_CALL(call);
Craig Tiller566316f2015-02-02 15:25:32 -08001187}
1188
Craig Tiller255edaa2016-12-13 09:04:55 -08001189/*******************************************************************************
Craig Tillerfb189f82015-02-03 12:07:07 -08001190 * BATCH API IMPLEMENTATION
1191 */
1192
Craig Tillera82950e2015-09-22 12:33:20 -07001193static void set_status_value_directly(grpc_status_code status, void *dest) {
1194 *(grpc_status_code *)dest = status;
Craig Tillerfb189f82015-02-03 12:07:07 -08001195}
1196
Craig Tillera82950e2015-09-22 12:33:20 -07001197static void set_cancelled_value(grpc_status_code status, void *dest) {
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001198 *(int *)dest = (status != GRPC_STATUS_OK);
Craig Tiller166e2502015-02-03 20:14:41 -08001199}
Craig Tillerfb189f82015-02-03 12:07:07 -08001200
Craig Tillerc6549762016-03-09 17:10:43 -08001201static bool are_write_flags_valid(uint32_t flags) {
David Garcia Quintas1d5aca52015-06-14 14:42:04 -07001202 /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
Craig Tiller7536af02015-12-22 13:49:30 -08001203 const uint32_t allowed_write_positions =
Craig Tillera82950e2015-09-22 12:33:20 -07001204 (GRPC_WRITE_USED_MASK | GRPC_WRITE_INTERNAL_USED_MASK);
Craig Tiller7536af02015-12-22 13:49:30 -08001205 const uint32_t invalid_positions = ~allowed_write_positions;
David Garcia Quintas1d5aca52015-06-14 14:42:04 -07001206 return !(flags & invalid_positions);
1207}
1208
Craig Tillerc6549762016-03-09 17:10:43 -08001209static bool are_initial_metadata_flags_valid(uint32_t flags, bool is_client) {
1210 /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
1211 uint32_t invalid_positions = ~GRPC_INITIAL_METADATA_USED_MASK;
1212 if (!is_client) {
1213 invalid_positions |= GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
1214 }
1215 return !(flags & invalid_positions);
1216}
1217
Craig Tiller2a11ad12017-02-08 17:09:02 -08001218static int batch_slot_for_op(grpc_op_type type) {
1219 switch (type) {
1220 case GRPC_OP_SEND_INITIAL_METADATA:
1221 return 0;
1222 case GRPC_OP_SEND_MESSAGE:
1223 return 1;
1224 case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
1225 case GRPC_OP_SEND_STATUS_FROM_SERVER:
1226 return 2;
1227 case GRPC_OP_RECV_INITIAL_METADATA:
1228 return 3;
1229 case GRPC_OP_RECV_MESSAGE:
1230 return 4;
1231 case GRPC_OP_RECV_CLOSE_ON_SERVER:
1232 case GRPC_OP_RECV_STATUS_ON_CLIENT:
1233 return 5;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001234 }
Craig Tillerc869da02017-02-08 17:11:17 -08001235 GPR_UNREACHABLE_CODE(return 123456789);
Craig Tiller2a11ad12017-02-08 17:09:02 -08001236}
Craig Tiller89d33792017-02-08 16:39:16 -08001237
1238static batch_control *allocate_batch_control(grpc_call *call,
1239 const grpc_op *ops,
1240 size_t num_ops) {
1241 int slot = batch_slot_for_op(ops[0].op);
Craig Tillerb58de722017-03-29 14:15:12 -07001242 batch_control **pslot = &call->active_batches[slot];
1243 if (*pslot == NULL) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001244 *pslot =
1245 (batch_control *)gpr_arena_alloc(call->arena, sizeof(batch_control));
Craig Tillerb58de722017-03-29 14:15:12 -07001246 }
1247 batch_control *bctl = *pslot;
Craig Tiller5e5ef302017-02-09 08:46:49 -08001248 if (bctl->call != NULL) {
1249 return NULL;
1250 }
1251 memset(bctl, 0, sizeof(*bctl));
1252 bctl->call = call;
Craig Tiller3fddcb42017-03-10 11:01:48 -08001253 bctl->op.payload = &call->stream_op_payload;
Craig Tiller5e5ef302017-02-09 08:46:49 -08001254 return bctl;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001255}
1256
1257static void finish_batch_completion(grpc_exec_ctx *exec_ctx, void *user_data,
1258 grpc_cq_completion *storage) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001259 batch_control *bctl = (batch_control *)user_data;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001260 grpc_call *call = bctl->call;
Craig Tiller5e5ef302017-02-09 08:46:49 -08001261 bctl->call = NULL;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001262 GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion");
1263}
1264
Craig Tiller94903892016-10-11 15:43:35 -07001265static grpc_error *consolidate_batch_errors(batch_control *bctl) {
Craig Tillerb597dcf2017-03-09 07:02:11 -08001266 size_t n = (size_t)gpr_atm_acq_load(&bctl->num_errors);
Craig Tiller94903892016-10-11 15:43:35 -07001267 if (n == 0) {
1268 return GRPC_ERROR_NONE;
1269 } else if (n == 1) {
Craig Tillera78da602017-01-27 08:16:23 -08001270 /* Skip creating a composite error in the case that only one error was
1271 logged */
Craig Tillerad980e32017-01-23 07:46:25 -08001272 grpc_error *e = bctl->errors[0];
1273 bctl->errors[0] = NULL;
1274 return e;
Craig Tiller94903892016-10-11 15:43:35 -07001275 } else {
ncteisen4b36a3d2017-03-13 19:08:06 -07001276 grpc_error *error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1277 "Call batch failed", bctl->errors, n);
Craig Tiller1c4775c2017-01-06 16:07:45 -08001278 for (size_t i = 0; i < n; i++) {
1279 GRPC_ERROR_UNREF(bctl->errors[i]);
Craig Tillerad980e32017-01-23 07:46:25 -08001280 bctl->errors[i] = NULL;
Craig Tiller1c4775c2017-01-06 16:07:45 -08001281 }
1282 return error;
Craig Tiller94903892016-10-11 15:43:35 -07001283 }
1284}
1285
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001286static void post_batch_completion(grpc_exec_ctx *exec_ctx,
1287 batch_control *bctl) {
Craig Tiller94903892016-10-11 15:43:35 -07001288 grpc_call *next_child_call;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001289 grpc_call *call = bctl->call;
Craig Tiller94903892016-10-11 15:43:35 -07001290 grpc_error *error = consolidate_batch_errors(bctl);
1291
Craig Tillerea54b8c2017-03-01 16:58:28 -08001292 if (bctl->op.send_initial_metadata) {
Craig Tiller94903892016-10-11 15:43:35 -07001293 grpc_metadata_batch_destroy(
1294 exec_ctx,
1295 &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
1296 }
Craig Tillerea54b8c2017-03-01 16:58:28 -08001297 if (bctl->op.send_message) {
Craig Tillerf927ad12017-01-06 15:27:31 -08001298 call->sending_message = false;
1299 }
Craig Tillerea54b8c2017-03-01 16:58:28 -08001300 if (bctl->op.send_trailing_metadata) {
Craig Tiller94903892016-10-11 15:43:35 -07001301 grpc_metadata_batch_destroy(
1302 exec_ctx,
1303 &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]);
1304 }
Craig Tillerea54b8c2017-03-01 16:58:28 -08001305 if (bctl->op.recv_trailing_metadata) {
Craig Tiller94903892016-10-11 15:43:35 -07001306 grpc_metadata_batch *md =
1307 &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
1308 recv_trailing_filter(exec_ctx, call, md);
1309
Craig Tiller94903892016-10-11 15:43:35 -07001310 /* propagate cancellation to any interested children */
Craig Tillerb18c8ba2017-03-13 15:51:37 -07001311 gpr_atm_rel_store(&call->received_final_op_atm, 1);
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -07001312 parent_call *pc = get_parent_call(call);
Craig Tiller1c10a7b2017-03-29 14:35:16 -07001313 if (pc != NULL) {
1314 grpc_call *child;
1315 gpr_mu_lock(&pc->child_list_mu);
1316 child = pc->first_child;
1317 if (child != NULL) {
1318 do {
Yash Tibrewalb2a54ac2017-09-13 10:18:07 -07001319 next_child_call = child->child->sibling_next;
Craig Tiller1c10a7b2017-03-29 14:35:16 -07001320 if (child->cancellation_is_inherited) {
1321 GRPC_CALL_INTERNAL_REF(child, "propagate_cancel");
1322 cancel_with_error(exec_ctx, child, STATUS_FROM_API_OVERRIDE,
1323 GRPC_ERROR_CANCELLED);
1324 GRPC_CALL_INTERNAL_UNREF(exec_ctx, child, "propagate_cancel");
1325 }
1326 child = next_child_call;
1327 } while (child != pc->first_child);
1328 }
1329 gpr_mu_unlock(&pc->child_list_mu);
Craig Tiller94903892016-10-11 15:43:35 -07001330 }
1331
1332 if (call->is_client) {
1333 get_final_status(call, set_status_value_directly,
Craig Tiller841a99d2016-12-12 16:58:57 -08001334 call->final_op.client.status,
1335 call->final_op.client.status_details);
Craig Tiller94903892016-10-11 15:43:35 -07001336 } else {
1337 get_final_status(call, set_cancelled_value,
Craig Tiller841a99d2016-12-12 16:58:57 -08001338 call->final_op.server.cancelled, NULL);
Craig Tiller94903892016-10-11 15:43:35 -07001339 }
1340
Craig Tiller02b87cd2016-09-02 09:50:08 -07001341 GRPC_ERROR_UNREF(error);
1342 error = GRPC_ERROR_NONE;
1343 }
Craig Tiller94903892016-10-11 15:43:35 -07001344
Craig Tillerea54b8c2017-03-01 16:58:28 -08001345 if (bctl->completion_data.notify_tag.is_closure) {
Craig Tillerb08fa492016-05-10 14:56:05 -07001346 /* unrefs bctl->error */
Craig Tiller2db5bda2017-02-09 10:30:55 -08001347 bctl->call = NULL;
Yash Tibrewal52778c42017-09-11 15:00:11 -07001348 GRPC_CLOSURE_RUN(
1349 exec_ctx, (grpc_closure *)bctl->completion_data.notify_tag.tag, error);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001350 GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion");
1351 } else {
Craig Tillerb08fa492016-05-10 14:56:05 -07001352 /* unrefs bctl->error */
Craig Tiller9c1ec542017-03-02 08:42:54 -08001353 grpc_cq_end_op(
1354 exec_ctx, bctl->call->cq, bctl->completion_data.notify_tag.tag, error,
1355 finish_batch_completion, bctl, &bctl->completion_data.cq_completion);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001356 }
1357}
1358
Craig Tillerd04dc4d2017-01-09 14:40:28 -08001359static void finish_batch_step(grpc_exec_ctx *exec_ctx, batch_control *bctl) {
Craig Tiller065b1392017-01-09 14:05:07 -08001360 if (gpr_unref(&bctl->steps_to_complete)) {
1361 post_batch_completion(exec_ctx, bctl);
1362 }
1363}
1364
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001365static void continue_receiving_slices(grpc_exec_ctx *exec_ctx,
1366 batch_control *bctl) {
Muxi Yan29723ee2017-04-12 20:24:42 -07001367 grpc_error *error;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001368 grpc_call *call = bctl->call;
1369 for (;;) {
1370 size_t remaining = call->receiving_stream->length -
1371 (*call->receiving_buffer)->data.raw.slice_buffer.length;
1372 if (remaining == 0) {
1373 call->receiving_message = 0;
Craig Tiller3b66ab92015-12-09 19:42:22 -08001374 grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001375 call->receiving_stream = NULL;
Craig Tillerd04dc4d2017-01-09 14:40:28 -08001376 finish_batch_step(exec_ctx, bctl);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001377 return;
1378 }
Muxi Yan29723ee2017-04-12 20:24:42 -07001379 if (grpc_byte_stream_next(exec_ctx, call->receiving_stream, remaining,
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001380 &call->receiving_slice_ready)) {
Muxi Yan29723ee2017-04-12 20:24:42 -07001381 error = grpc_byte_stream_pull(exec_ctx, call->receiving_stream,
1382 &call->receiving_slice);
1383 if (error == GRPC_ERROR_NONE) {
1384 grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
1385 call->receiving_slice);
1386 } else {
1387 grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
1388 call->receiving_stream = NULL;
1389 grpc_byte_buffer_destroy(*call->receiving_buffer);
1390 *call->receiving_buffer = NULL;
1391 call->receiving_message = 0;
1392 finish_batch_step(exec_ctx, bctl);
1393 return;
1394 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001395 } else {
1396 return;
1397 }
1398 }
1399}
1400
1401static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
Craig Tillerc027e772016-05-03 16:27:00 -07001402 grpc_error *error) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001403 batch_control *bctl = (batch_control *)bctlp;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001404 grpc_call *call = bctl->call;
Muxi Yan29723ee2017-04-12 20:24:42 -07001405 grpc_byte_stream *bs = call->receiving_stream;
1406 bool release_error = false;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001407
Craig Tillerc027e772016-05-03 16:27:00 -07001408 if (error == GRPC_ERROR_NONE) {
Muxi Yan29723ee2017-04-12 20:24:42 -07001409 grpc_slice slice;
1410 error = grpc_byte_stream_pull(exec_ctx, bs, &slice);
1411 if (error == GRPC_ERROR_NONE) {
1412 grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
1413 slice);
1414 continue_receiving_slices(exec_ctx, bctl);
1415 } else {
1416 /* Error returned by grpc_byte_stream_pull needs to be released manually
1417 */
1418 release_error = true;
1419 }
1420 }
1421
1422 if (error != GRPC_ERROR_NONE) {
Craig Tiller84f75d42017-05-03 13:06:35 -07001423 if (GRPC_TRACER_ON(grpc_trace_operation_failures)) {
Craig Tillera286b042016-06-13 15:20:39 +00001424 GRPC_LOG_IF_ERROR("receiving_slice_ready", GRPC_ERROR_REF(error));
1425 }
Craig Tillere1b8c2b2015-12-16 19:27:52 -08001426 grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
Craig Tiller38edec62015-12-14 15:01:29 -08001427 call->receiving_stream = NULL;
1428 grpc_byte_buffer_destroy(*call->receiving_buffer);
1429 *call->receiving_buffer = NULL;
Muxi Yan29723ee2017-04-12 20:24:42 -07001430 call->receiving_message = 0;
Craig Tillerd04dc4d2017-01-09 14:40:28 -08001431 finish_batch_step(exec_ctx, bctl);
Muxi Yan29723ee2017-04-12 20:24:42 -07001432 if (release_error) {
1433 GRPC_ERROR_UNREF(error);
1434 }
Craig Tiller38edec62015-12-14 15:01:29 -08001435 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001436}
1437
Mark D. Roth274c8ed2016-10-04 09:21:42 -07001438static void process_data_after_md(grpc_exec_ctx *exec_ctx,
1439 batch_control *bctl) {
Craig Tillera44cbfc2016-02-03 16:02:49 -08001440 grpc_call *call = bctl->call;
1441 if (call->receiving_stream == NULL) {
1442 *call->receiving_buffer = NULL;
1443 call->receiving_message = 0;
Craig Tillerd04dc4d2017-01-09 14:40:28 -08001444 finish_batch_step(exec_ctx, bctl);
Craig Tillera44cbfc2016-02-03 16:02:49 -08001445 } else {
1446 call->test_only_last_message_flags = call->receiving_stream->flags;
1447 if ((call->receiving_stream->flags & GRPC_WRITE_INTERNAL_COMPRESS) &&
David Garcia Quintas749367f2016-05-17 19:15:24 -07001448 (call->incoming_compression_algorithm > GRPC_COMPRESS_NONE)) {
Craig Tillera44cbfc2016-02-03 16:02:49 -08001449 *call->receiving_buffer = grpc_raw_compressed_byte_buffer_create(
David Garcia Quintas749367f2016-05-17 19:15:24 -07001450 NULL, 0, call->incoming_compression_algorithm);
Craig Tillera44cbfc2016-02-03 16:02:49 -08001451 } else {
1452 *call->receiving_buffer = grpc_raw_byte_buffer_create(NULL, 0);
1453 }
ncteisen969b46e2017-06-08 14:57:11 -07001454 GRPC_CLOSURE_INIT(&call->receiving_slice_ready, receiving_slice_ready, bctl,
Craig Tiller91031da2016-12-28 15:44:25 -08001455 grpc_schedule_on_exec_ctx);
Craig Tillera44cbfc2016-02-03 16:02:49 -08001456 continue_receiving_slices(exec_ctx, bctl);
Craig Tillera44cbfc2016-02-03 16:02:49 -08001457 }
1458}
1459
1460static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
Craig Tillerc027e772016-05-03 16:27:00 -07001461 grpc_error *error) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001462 batch_control *bctl = (batch_control *)bctlp;
Craig Tillera44cbfc2016-02-03 16:02:49 -08001463 grpc_call *call = bctl->call;
Mark D. Roth274c8ed2016-10-04 09:21:42 -07001464 if (error != GRPC_ERROR_NONE) {
yang-g23f777d2017-02-22 23:32:26 -08001465 if (call->receiving_stream != NULL) {
1466 grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
1467 call->receiving_stream = NULL;
1468 }
1469 add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), true);
Craig Tiller58b30cd2017-01-31 17:07:36 -08001470 cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE,
1471 GRPC_ERROR_REF(error));
Mark D. Roth274c8ed2016-10-04 09:21:42 -07001472 }
Yuchen Zeng6eb505b2017-08-25 16:05:29 -07001473 /* If recv_state is RECV_NONE, we will save the batch_control
Yuchen Zenge40e2592017-08-15 15:50:49 -07001474 * object with rel_cas, and will not use it after the cas. Its corresponding
1475 * acq_load is in receiving_initial_metadata_ready() */
Yuchen Zeng873bb702017-08-09 19:42:18 -07001476 if (error != GRPC_ERROR_NONE || call->receiving_stream == NULL ||
Yuchen Zeng6eb505b2017-08-25 16:05:29 -07001477 !gpr_atm_rel_cas(&call->recv_state, RECV_NONE, (gpr_atm)bctlp)) {
Yash Tibrewal090aca52017-09-12 12:14:13 -07001478 process_data_after_md(exec_ctx, bctl);
Craig Tillera44cbfc2016-02-03 16:02:49 -08001479 }
1480}
1481
Mark D. Roth764cf042017-09-01 09:00:06 -07001482// The recv_message_ready callback used when sending a batch containing
1483// a recv_message op down the filter stack. Yields the call combiner
1484// before processing the received message.
1485static void receiving_stream_ready_in_call_combiner(grpc_exec_ctx *exec_ctx,
1486 void *bctlp,
1487 grpc_error *error) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001488 batch_control *bctl = (batch_control *)bctlp;
Mark D. Roth764cf042017-09-01 09:00:06 -07001489 grpc_call *call = bctl->call;
1490 GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner, "recv_message_ready");
1491 receiving_stream_ready(exec_ctx, bctlp, error);
1492}
1493
David Garcia Quintas3e71f772016-05-18 10:14:32 -07001494static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx,
1495 batch_control *bctl) {
1496 grpc_call *call = bctl->call;
Muxi Yan68a0fd52017-07-21 09:26:04 -07001497 /* validate compression algorithms */
1498 if (call->incoming_stream_compression_algorithm !=
1499 GRPC_STREAM_COMPRESS_NONE) {
1500 const grpc_stream_compression_algorithm algo =
1501 call->incoming_stream_compression_algorithm;
1502 char *error_msg = NULL;
1503 const grpc_compression_options compression_options =
1504 grpc_channel_compression_options(call->channel);
1505 if (algo >= GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT) {
1506 gpr_asprintf(&error_msg,
1507 "Invalid stream compression algorithm value '%d'.", algo);
1508 gpr_log(GPR_ERROR, "%s", error_msg);
1509 cancel_with_status(exec_ctx, call, STATUS_FROM_SURFACE,
1510 GRPC_STATUS_UNIMPLEMENTED, error_msg);
1511 } else if (grpc_compression_options_is_stream_compression_algorithm_enabled(
1512 &compression_options, algo) == 0) {
1513 /* check if algorithm is supported by current channel config */
Yash Tibrewal9eb86722017-09-17 23:43:30 -07001514 const char *algo_name = NULL;
Muxi Yan68a0fd52017-07-21 09:26:04 -07001515 grpc_stream_compression_algorithm_name(algo, &algo_name);
1516 gpr_asprintf(&error_msg, "Stream compression algorithm '%s' is disabled.",
1517 algo_name);
1518 gpr_log(GPR_ERROR, "%s", error_msg);
1519 cancel_with_status(exec_ctx, call, STATUS_FROM_SURFACE,
1520 GRPC_STATUS_UNIMPLEMENTED, error_msg);
1521 }
1522 gpr_free(error_msg);
1523
1524 GPR_ASSERT(call->stream_encodings_accepted_by_peer != 0);
1525 if (!GPR_BITGET(call->stream_encodings_accepted_by_peer,
1526 call->incoming_stream_compression_algorithm)) {
1527 if (GRPC_TRACER_ON(grpc_compression_trace)) {
Yash Tibrewal9eb86722017-09-17 23:43:30 -07001528 const char *algo_name = NULL;
Muxi Yan68a0fd52017-07-21 09:26:04 -07001529 grpc_stream_compression_algorithm_name(
1530 call->incoming_stream_compression_algorithm, &algo_name);
1531 gpr_log(
1532 GPR_ERROR,
1533 "Stream compression algorithm (content-encoding = '%s') not "
1534 "present in the bitset of accepted encodings (accept-encodings: "
1535 "'0x%x')",
1536 algo_name, call->stream_encodings_accepted_by_peer);
1537 }
1538 }
1539 } else if (call->incoming_compression_algorithm != GRPC_COMPRESS_NONE) {
David Garcia Quintasac094472016-05-18 20:25:57 -07001540 const grpc_compression_algorithm algo =
1541 call->incoming_compression_algorithm;
David Garcia Quintas3e71f772016-05-18 10:14:32 -07001542 char *error_msg = NULL;
1543 const grpc_compression_options compression_options =
David Garcia Quintasac094472016-05-18 20:25:57 -07001544 grpc_channel_compression_options(call->channel);
David Garcia Quintas3e71f772016-05-18 10:14:32 -07001545 /* check if algorithm is known */
1546 if (algo >= GRPC_COMPRESS_ALGORITHMS_COUNT) {
1547 gpr_asprintf(&error_msg, "Invalid compression algorithm value '%d'.",
1548 algo);
Yuchen Zeng64c0e8d2016-06-10 11:19:51 -07001549 gpr_log(GPR_ERROR, "%s", error_msg);
Craig Tiller2dc32ea2017-01-31 15:32:34 -08001550 cancel_with_status(exec_ctx, call, STATUS_FROM_SURFACE,
1551 GRPC_STATUS_UNIMPLEMENTED, error_msg);
David Garcia Quintas3e71f772016-05-18 10:14:32 -07001552 } else if (grpc_compression_options_is_algorithm_enabled(
1553 &compression_options, algo) == 0) {
1554 /* check if algorithm is supported by current channel config */
Yash Tibrewal9eb86722017-09-17 23:43:30 -07001555 const char *algo_name = NULL;
David Garcia Quintas3e71f772016-05-18 10:14:32 -07001556 grpc_compression_algorithm_name(algo, &algo_name);
1557 gpr_asprintf(&error_msg, "Compression algorithm '%s' is disabled.",
1558 algo_name);
Yuchen Zeng64c0e8d2016-06-10 11:19:51 -07001559 gpr_log(GPR_ERROR, "%s", error_msg);
Craig Tiller2dc32ea2017-01-31 15:32:34 -08001560 cancel_with_status(exec_ctx, call, STATUS_FROM_SURFACE,
1561 GRPC_STATUS_UNIMPLEMENTED, error_msg);
David Garcia Quintas3e71f772016-05-18 10:14:32 -07001562 } else {
David Garcia Quintasac094472016-05-18 20:25:57 -07001563 call->incoming_compression_algorithm = algo;
David Garcia Quintas3e71f772016-05-18 10:14:32 -07001564 }
1565 gpr_free(error_msg);
David Garcia Quintasf1945f22016-05-18 10:53:14 -07001566
Muxi Yan68a0fd52017-07-21 09:26:04 -07001567 GPR_ASSERT(call->encodings_accepted_by_peer != 0);
1568 if (!GPR_BITGET(call->encodings_accepted_by_peer,
1569 call->incoming_compression_algorithm)) {
1570 if (GRPC_TRACER_ON(grpc_compression_trace)) {
Yash Tibrewal9eb86722017-09-17 23:43:30 -07001571 const char *algo_name = NULL;
Muxi Yan68a0fd52017-07-21 09:26:04 -07001572 grpc_compression_algorithm_name(call->incoming_compression_algorithm,
1573 &algo_name);
1574 gpr_log(GPR_ERROR,
1575 "Compression algorithm (grpc-encoding = '%s') not present in "
1576 "the bitset of accepted encodings (grpc-accept-encodings: "
1577 "'0x%x')",
1578 algo_name, call->encodings_accepted_by_peer);
1579 }
David Garcia Quintasf1945f22016-05-18 10:53:14 -07001580 }
1581 }
David Garcia Quintas3e71f772016-05-18 10:14:32 -07001582}
1583
Craig Tiller3ba16e42016-12-08 16:46:18 -08001584static void add_batch_error(grpc_exec_ctx *exec_ctx, batch_control *bctl,
yang-g23f777d2017-02-22 23:32:26 -08001585 grpc_error *error, bool has_cancelled) {
Craig Tiller452422e2016-09-01 15:54:56 -07001586 if (error == GRPC_ERROR_NONE) return;
Craig Tillerb597dcf2017-03-09 07:02:11 -08001587 int idx = (int)gpr_atm_full_fetch_add(&bctl->num_errors, 1);
yang-g23f777d2017-02-22 23:32:26 -08001588 if (idx == 0 && !has_cancelled) {
Craig Tiller2dc32ea2017-01-31 15:32:34 -08001589 cancel_with_error(exec_ctx, bctl->call, STATUS_FROM_CORE,
1590 GRPC_ERROR_REF(error));
1591 }
Craig Tiller94903892016-10-11 15:43:35 -07001592 bctl->errors[idx] = error;
Craig Tiller452422e2016-09-01 15:54:56 -07001593}
1594
Craig Tillera44cbfc2016-02-03 16:02:49 -08001595static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
Craig Tillerc027e772016-05-03 16:27:00 -07001596 void *bctlp, grpc_error *error) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001597 batch_control *bctl = (batch_control *)bctlp;
Craig Tillera44cbfc2016-02-03 16:02:49 -08001598 grpc_call *call = bctl->call;
1599
Mark D. Roth764cf042017-09-01 09:00:06 -07001600 GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner,
1601 "recv_initial_metadata_ready");
1602
yang-g23f777d2017-02-22 23:32:26 -08001603 add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false);
Craig Tiller452422e2016-09-01 15:54:56 -07001604 if (error == GRPC_ERROR_NONE) {
Craig Tillerc48ca712016-04-04 13:42:04 -07001605 grpc_metadata_batch *md =
1606 &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
Craig Tillera7d37a32016-11-22 14:37:16 -08001607 recv_initial_filter(exec_ctx, call, md);
Craig Tillera44cbfc2016-02-03 16:02:49 -08001608
Craig Tillera7d37a32016-11-22 14:37:16 -08001609 /* TODO(ctiller): this could be moved into recv_initial_filter now */
David Garcia Quintas3e71f772016-05-18 10:14:32 -07001610 GPR_TIMER_BEGIN("validate_filtered_metadata", 0);
1611 validate_filtered_metadata(exec_ctx, bctl);
1612 GPR_TIMER_END("validate_filtered_metadata", 0);
David Garcia Quintas46123372016-05-09 15:28:42 -07001613
Craig Tillerc48ca712016-04-04 13:42:04 -07001614 if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) !=
1615 0 &&
1616 !call->is_client) {
Mark D. Rothf28763c2016-09-14 15:18:40 -07001617 call->send_deadline =
1618 gpr_convert_clock_type(md->deadline, GPR_CLOCK_MONOTONIC);
Craig Tillerc48ca712016-04-04 13:42:04 -07001619 }
Craig Tillera44cbfc2016-02-03 16:02:49 -08001620 }
1621
Yuchen Zeng0ce19a22017-08-15 14:24:00 -07001622 grpc_closure *saved_rsr_closure = NULL;
1623 while (true) {
Yuchen Zeng6eb505b2017-08-25 16:05:29 -07001624 gpr_atm rsr_bctlp = gpr_atm_acq_load(&call->recv_state);
Yuchen Zeng0ce19a22017-08-15 14:24:00 -07001625 /* Should only receive initial metadata once */
1626 GPR_ASSERT(rsr_bctlp != 1);
1627 if (rsr_bctlp == 0) {
Yuchen Zenge40e2592017-08-15 15:50:49 -07001628 /* We haven't seen initial metadata and messages before, thus initial
1629 * metadata is received first.
1630 * no_barrier_cas is used, as this function won't access the batch_control
1631 * object saved by receiving_stream_ready() if the initial metadata is
1632 * received first. */
Yuchen Zeng6eb505b2017-08-25 16:05:29 -07001633 if (gpr_atm_no_barrier_cas(&call->recv_state, RECV_NONE,
1634 RECV_INITIAL_METADATA_FIRST)) {
Yuchen Zeng0ce19a22017-08-15 14:24:00 -07001635 break;
1636 }
1637 } else {
1638 /* Already received messages */
1639 saved_rsr_closure = GRPC_CLOSURE_CREATE(receiving_stream_ready,
1640 (batch_control *)rsr_bctlp,
1641 grpc_schedule_on_exec_ctx);
Yuchen Zeng6eb505b2017-08-25 16:05:29 -07001642 /* No need to modify recv_state */
Yuchen Zeng0ce19a22017-08-15 14:24:00 -07001643 break;
1644 }
1645 }
1646 if (saved_rsr_closure != NULL) {
ncteisen969b46e2017-06-08 14:57:11 -07001647 GRPC_CLOSURE_RUN(exec_ctx, saved_rsr_closure, GRPC_ERROR_REF(error));
Craig Tillera44cbfc2016-02-03 16:02:49 -08001648 }
1649
Craig Tillerd04dc4d2017-01-09 14:40:28 -08001650 finish_batch_step(exec_ctx, bctl);
Craig Tillera44cbfc2016-02-03 16:02:49 -08001651}
1652
Craig Tillerc027e772016-05-03 16:27:00 -07001653static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp,
1654 grpc_error *error) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001655 batch_control *bctl = (batch_control *)bctlp;
Mark D. Roth764cf042017-09-01 09:00:06 -07001656 grpc_call *call = bctl->call;
1657 GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner, "on_complete");
yang-g23f777d2017-02-22 23:32:26 -08001658 add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false);
Craig Tillerd04dc4d2017-01-09 14:40:28 -08001659 finish_batch_step(exec_ctx, bctl);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001660}
1661
Craig Tiller89d33792017-02-08 16:39:16 -08001662static void free_no_op_completion(grpc_exec_ctx *exec_ctx, void *p,
1663 grpc_cq_completion *completion) {
1664 gpr_free(completion);
1665}
1666
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001667static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
1668 grpc_call *call, const grpc_op *ops,
1669 size_t nops, void *notify_tag,
1670 int is_notify_tag_closure) {
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001671 size_t i;
Craig Tillerfb189f82015-02-03 12:07:07 -08001672 const grpc_op *op;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001673 batch_control *bctl;
1674 int num_completion_callbacks_needed = 1;
1675 grpc_call_error error = GRPC_CALL_OK;
Yash Tibrewal533d1182017-09-18 10:48:22 -07001676 grpc_transport_stream_op_batch *stream_op;
1677 grpc_transport_stream_op_batch_payload *stream_op_payload;
Craig Tiller9928d392015-08-18 09:40:24 -07001678
Craig Tiller0ba432d2015-10-09 16:57:11 -07001679 GPR_TIMER_BEGIN("grpc_call_start_batch", 0);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001680 GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag);
Masood Malekghassemi76c3d742015-08-19 18:22:53 -07001681
Craig Tillera82950e2015-09-22 12:33:20 -07001682 if (nops == 0) {
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001683 if (!is_notify_tag_closure) {
yang-g7d6b9142017-07-13 11:48:56 -07001684 GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
Yash Tibrewal52778c42017-09-11 15:00:11 -07001685 grpc_cq_end_op(
1686 exec_ctx, call->cq, notify_tag, GRPC_ERROR_NONE,
1687 free_no_op_completion, NULL,
1688 (grpc_cq_completion *)gpr_malloc(sizeof(grpc_cq_completion)));
Craig Tiller2db5bda2017-02-09 10:30:55 -08001689 } else {
Yash Tibrewal52778c42017-09-11 15:00:11 -07001690 GRPC_CLOSURE_SCHED(exec_ctx, (grpc_closure *)notify_tag, GRPC_ERROR_NONE);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001691 }
Craig Tillerea50b902015-12-15 07:05:25 -08001692 error = GRPC_CALL_OK;
1693 goto done;
Craig Tillera82950e2015-09-22 12:33:20 -07001694 }
1695
Craig Tiller89d33792017-02-08 16:39:16 -08001696 bctl = allocate_batch_control(call, ops, nops);
Craig Tiller5e5ef302017-02-09 08:46:49 -08001697 if (bctl == NULL) {
1698 return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1699 }
Craig Tillerea54b8c2017-03-01 16:58:28 -08001700 bctl->completion_data.notify_tag.tag = notify_tag;
Craig Tiller9c1ec542017-03-02 08:42:54 -08001701 bctl->completion_data.notify_tag.is_closure =
1702 (uint8_t)(is_notify_tag_closure != 0);
Craig Tillera82950e2015-09-22 12:33:20 -07001703
Yash Tibrewal533d1182017-09-18 10:48:22 -07001704 stream_op = &bctl->op;
1705 stream_op_payload = &call->stream_op_payload;
Craig Tillera82950e2015-09-22 12:33:20 -07001706
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001707 /* rewrite batch ops into a transport op */
1708 for (i = 0; i < nops; i++) {
1709 op = &ops[i];
Craig Tillera82950e2015-09-22 12:33:20 -07001710 if (op->reserved != NULL) {
Craig Tiller3ffd8222015-09-21 08:21:57 -07001711 error = GRPC_CALL_ERROR;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001712 goto done_with_error;
Craig Tiller3ffd8222015-09-21 08:21:57 -07001713 }
Craig Tillera82950e2015-09-22 12:33:20 -07001714 switch (op->op) {
Yash Tibrewal533d1182017-09-18 10:48:22 -07001715 case GRPC_OP_SEND_INITIAL_METADATA: {
Craig Tillera82950e2015-09-22 12:33:20 -07001716 /* Flag validation: currently allow no flags */
Craig Tillerc6549762016-03-09 17:10:43 -08001717 if (!are_initial_metadata_flags_valid(op->flags, call->is_client)) {
Craig Tillera82950e2015-09-22 12:33:20 -07001718 error = GRPC_CALL_ERROR_INVALID_FLAGS;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001719 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001720 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001721 if (call->sent_initial_metadata) {
1722 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1723 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001724 }
David Garcia Quintasa301eaa2016-05-06 16:59:03 -07001725 /* process compression level */
Mark D. Roth764cf042017-09-01 09:00:06 -07001726 memset(&call->compression_md, 0, sizeof(call->compression_md));
David Garcia Quintasa301eaa2016-05-06 16:59:03 -07001727 size_t additional_metadata_count = 0;
Muxi Yandf174cc2017-07-26 16:01:48 -07001728 grpc_compression_level effective_compression_level =
1729 GRPC_COMPRESS_LEVEL_NONE;
Muxi Yan68a0fd52017-07-21 09:26:04 -07001730 grpc_stream_compression_level effective_stream_compression_level =
Muxi Yandf174cc2017-07-26 16:01:48 -07001731 GRPC_STREAM_COMPRESS_LEVEL_NONE;
David Garcia Quintas749367f2016-05-17 19:15:24 -07001732 bool level_set = false;
Muxi Yan68a0fd52017-07-21 09:26:04 -07001733 bool stream_compression = false;
1734 if (op->data.send_initial_metadata.maybe_stream_compression_level
1735 .is_set) {
Muxi Yandf174cc2017-07-26 16:01:48 -07001736 effective_stream_compression_level =
Muxi Yan68a0fd52017-07-21 09:26:04 -07001737 op->data.send_initial_metadata.maybe_stream_compression_level
1738 .level;
1739 level_set = true;
1740 stream_compression = true;
1741 } else if (op->data.send_initial_metadata.maybe_compression_level
1742 .is_set) {
David Garcia Quintas749367f2016-05-17 19:15:24 -07001743 effective_compression_level =
David Garcia Quintas8ba42be2016-06-07 17:30:20 -07001744 op->data.send_initial_metadata.maybe_compression_level.level;
David Garcia Quintas749367f2016-05-17 19:15:24 -07001745 level_set = true;
1746 } else {
David Garcia Quintasac094472016-05-18 20:25:57 -07001747 const grpc_compression_options copts =
1748 grpc_channel_compression_options(call->channel);
Muxi Yan68a0fd52017-07-21 09:26:04 -07001749 if (copts.default_stream_compression_level.is_set) {
1750 level_set = true;
1751 effective_stream_compression_level =
1752 copts.default_stream_compression_level.level;
1753 stream_compression = true;
1754 } else if (copts.default_level.is_set) {
1755 level_set = true;
David Garcia Quintasac094472016-05-18 20:25:57 -07001756 effective_compression_level = copts.default_level.level;
1757 }
David Garcia Quintas749367f2016-05-17 19:15:24 -07001758 }
David Garcia Quintas3e4f49f2016-05-18 23:59:02 -07001759 if (level_set && !call->is_client) {
Muxi Yan68a0fd52017-07-21 09:26:04 -07001760 if (stream_compression) {
1761 const grpc_stream_compression_algorithm calgo =
1762 stream_compression_algorithm_for_level_locked(
1763 call, effective_stream_compression_level);
Mark D. Roth764cf042017-09-01 09:00:06 -07001764 call->compression_md.key =
Muxi Yan68a0fd52017-07-21 09:26:04 -07001765 GRPC_MDSTR_GRPC_INTERNAL_STREAM_ENCODING_REQUEST;
Mark D. Roth764cf042017-09-01 09:00:06 -07001766 call->compression_md.value =
Muxi Yan68a0fd52017-07-21 09:26:04 -07001767 grpc_stream_compression_algorithm_slice(calgo);
1768 } else {
1769 const grpc_compression_algorithm calgo =
1770 compression_algorithm_for_level_locked(
1771 call, effective_compression_level);
Muxi Yan04b58032017-08-14 11:17:56 -07001772 /* the following will be picked up by the compress filter and used
1773 * as the call's compression algorithm. */
Mark D. Roth764cf042017-09-01 09:00:06 -07001774 call->compression_md.key =
1775 GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST;
1776 call->compression_md.value =
1777 grpc_compression_algorithm_slice(calgo);
Muxi Yan68a0fd52017-07-21 09:26:04 -07001778 additional_metadata_count++;
1779 }
David Garcia Quintasa301eaa2016-05-06 16:59:03 -07001780 }
1781
1782 if (op->data.send_initial_metadata.count + additional_metadata_count >
1783 INT_MAX) {
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001784 error = GRPC_CALL_ERROR_INVALID_METADATA;
1785 goto done_with_error;
1786 }
Craig Tiller9c1ec542017-03-02 08:42:54 -08001787 stream_op->send_initial_metadata = true;
Craig Tillerea54b8c2017-03-01 16:58:28 -08001788 call->sent_initial_metadata = true;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001789 if (!prepare_application_metadata(
Craig Tillera59c16c2016-10-31 07:25:01 -07001790 exec_ctx, call, (int)op->data.send_initial_metadata.count,
David Garcia Quintasa301eaa2016-05-06 16:59:03 -07001791 op->data.send_initial_metadata.metadata, 0, call->is_client,
Mark D. Roth764cf042017-09-01 09:00:06 -07001792 &call->compression_md, (int)additional_metadata_count)) {
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001793 error = GRPC_CALL_ERROR_INVALID_METADATA;
1794 goto done_with_error;
1795 }
1796 /* TODO(ctiller): just make these the same variable? */
Craig Tiller095a2202017-05-17 09:02:44 -07001797 if (call->is_client) {
1798 call->metadata_batch[0][0].deadline = call->send_deadline;
1799 }
Craig Tiller9c1ec542017-03-02 08:42:54 -08001800 stream_op_payload->send_initial_metadata.send_initial_metadata =
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001801 &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */];
Craig Tiller9c1ec542017-03-02 08:42:54 -08001802 stream_op_payload->send_initial_metadata.send_initial_metadata_flags =
1803 op->flags;
Mark D. Roth764cf042017-09-01 09:00:06 -07001804 if (call->is_client) {
1805 stream_op_payload->send_initial_metadata.peer_string =
1806 &call->peer_string;
1807 }
Craig Tillera82950e2015-09-22 12:33:20 -07001808 break;
Yash Tibrewal533d1182017-09-18 10:48:22 -07001809 }
1810 case GRPC_OP_SEND_MESSAGE: {
Craig Tillera82950e2015-09-22 12:33:20 -07001811 if (!are_write_flags_valid(op->flags)) {
1812 error = GRPC_CALL_ERROR_INVALID_FLAGS;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001813 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001814 }
Mark D. Roth448c1f02017-01-25 10:44:30 -08001815 if (op->data.send_message.send_message == NULL) {
Craig Tillera82950e2015-09-22 12:33:20 -07001816 error = GRPC_CALL_ERROR_INVALID_MESSAGE;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001817 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001818 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001819 if (call->sending_message) {
1820 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1821 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001822 }
Craig Tiller9c1ec542017-03-02 08:42:54 -08001823 stream_op->send_message = true;
Craig Tillerea54b8c2017-03-01 16:58:28 -08001824 call->sending_message = true;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001825 grpc_slice_buffer_stream_init(
1826 &call->sending_stream,
Mark D. Roth448c1f02017-01-25 10:44:30 -08001827 &op->data.send_message.send_message->data.raw.slice_buffer,
1828 op->flags);
Lizan Zhou61f09732016-10-26 14:09:52 -07001829 /* If the outgoing buffer is already compressed, mark it as so in the
1830 flags. These will be picked up by the compression filter and further
1831 (wasteful) attempts at compression skipped. */
Mark D. Roth9d76dbe2017-01-25 15:02:56 -08001832 if (op->data.send_message.send_message->data.raw.compression >
1833 GRPC_COMPRESS_NONE) {
Lizan Zhou61f09732016-10-26 14:09:52 -07001834 call->sending_stream.base.flags |= GRPC_WRITE_INTERNAL_COMPRESS;
1835 }
Craig Tiller9c1ec542017-03-02 08:42:54 -08001836 stream_op_payload->send_message.send_message =
1837 &call->sending_stream.base;
Craig Tillera82950e2015-09-22 12:33:20 -07001838 break;
Yash Tibrewal533d1182017-09-18 10:48:22 -07001839 }
1840 case GRPC_OP_SEND_CLOSE_FROM_CLIENT: {
Craig Tillera82950e2015-09-22 12:33:20 -07001841 /* Flag validation: currently allow no flags */
1842 if (op->flags != 0) {
1843 error = GRPC_CALL_ERROR_INVALID_FLAGS;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001844 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001845 }
1846 if (!call->is_client) {
1847 error = GRPC_CALL_ERROR_NOT_ON_SERVER;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001848 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001849 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001850 if (call->sent_final_op) {
1851 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1852 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001853 }
Craig Tiller9c1ec542017-03-02 08:42:54 -08001854 stream_op->send_trailing_metadata = true;
Craig Tillere198b712017-03-31 15:29:33 -07001855 call->sent_final_op = true;
Craig Tiller2d43fbf2017-03-13 16:10:05 -07001856 stream_op_payload->send_trailing_metadata.send_trailing_metadata =
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001857 &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
Craig Tillera82950e2015-09-22 12:33:20 -07001858 break;
Yash Tibrewal533d1182017-09-18 10:48:22 -07001859 }
1860 case GRPC_OP_SEND_STATUS_FROM_SERVER: {
Craig Tillera82950e2015-09-22 12:33:20 -07001861 /* Flag validation: currently allow no flags */
1862 if (op->flags != 0) {
1863 error = GRPC_CALL_ERROR_INVALID_FLAGS;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001864 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001865 }
1866 if (call->is_client) {
1867 error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001868 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001869 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001870 if (call->sent_final_op) {
1871 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1872 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001873 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001874 if (op->data.send_status_from_server.trailing_metadata_count >
1875 INT_MAX) {
1876 error = GRPC_CALL_ERROR_INVALID_METADATA;
1877 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001878 }
Craig Tiller9c1ec542017-03-02 08:42:54 -08001879 stream_op->send_trailing_metadata = true;
Craig Tillere198b712017-03-31 15:29:33 -07001880 call->sent_final_op = true;
Craig Tiller93727aa2017-02-06 13:05:39 -08001881 GPR_ASSERT(call->send_extra_metadata_count == 0);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001882 call->send_extra_metadata_count = 1;
1883 call->send_extra_metadata[0].md = grpc_channel_get_reffed_status_elem(
Craig Tillera59c16c2016-10-31 07:25:01 -07001884 exec_ctx, call->channel, op->data.send_status_from_server.status);
Craig Tiller841a99d2016-12-12 16:58:57 -08001885 {
1886 grpc_error *override_error = GRPC_ERROR_NONE;
1887 if (op->data.send_status_from_server.status != GRPC_STATUS_OK) {
ncteisen4b36a3d2017-03-13 19:08:06 -07001888 override_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1889 "Error from server send status");
Craig Tiller841a99d2016-12-12 16:58:57 -08001890 }
1891 if (op->data.send_status_from_server.status_details != NULL) {
1892 call->send_extra_metadata[1].md = grpc_mdelem_from_slices(
1893 exec_ctx, GRPC_MDSTR_GRPC_MESSAGE,
1894 grpc_slice_ref_internal(
1895 *op->data.send_status_from_server.status_details));
1896 call->send_extra_metadata_count++;
1897 char *msg = grpc_slice_to_c_string(
1898 GRPC_MDVALUE(call->send_extra_metadata[1].md));
ncteisen4b36a3d2017-03-13 19:08:06 -07001899 override_error =
1900 grpc_error_set_str(override_error, GRPC_ERROR_STR_GRPC_MESSAGE,
1901 grpc_slice_from_copied_string(msg));
Craig Tiller841a99d2016-12-12 16:58:57 -08001902 gpr_free(msg);
1903 }
1904 set_status_from_error(exec_ctx, call, STATUS_FROM_API_OVERRIDE,
1905 override_error);
Craig Tiller69a1f662016-09-28 10:24:21 -07001906 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001907 if (!prepare_application_metadata(
Craig Tillera59c16c2016-10-31 07:25:01 -07001908 exec_ctx, call,
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001909 (int)op->data.send_status_from_server.trailing_metadata_count,
David Garcia Quintasa301eaa2016-05-06 16:59:03 -07001910 op->data.send_status_from_server.trailing_metadata, 1, 1, NULL,
1911 0)) {
Craig Tiller93727aa2017-02-06 13:05:39 -08001912 for (int n = 0; n < call->send_extra_metadata_count; n++) {
1913 GRPC_MDELEM_UNREF(exec_ctx, call->send_extra_metadata[n].md);
1914 }
1915 call->send_extra_metadata_count = 0;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001916 error = GRPC_CALL_ERROR_INVALID_METADATA;
1917 goto done_with_error;
1918 }
Craig Tiller9c1ec542017-03-02 08:42:54 -08001919 stream_op_payload->send_trailing_metadata.send_trailing_metadata =
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001920 &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
Craig Tillera82950e2015-09-22 12:33:20 -07001921 break;
Yash Tibrewal533d1182017-09-18 10:48:22 -07001922 }
1923 case GRPC_OP_RECV_INITIAL_METADATA: {
Craig Tillera82950e2015-09-22 12:33:20 -07001924 /* Flag validation: currently allow no flags */
1925 if (op->flags != 0) {
1926 error = GRPC_CALL_ERROR_INVALID_FLAGS;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001927 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001928 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001929 if (call->received_initial_metadata) {
1930 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1931 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001932 }
Craig Tillere198b712017-03-31 15:29:33 -07001933 call->received_initial_metadata = true;
Mark D. Roth448c1f02017-01-25 10:44:30 -08001934 call->buffered_metadata[0] =
1935 op->data.recv_initial_metadata.recv_initial_metadata;
ncteisen969b46e2017-06-08 14:57:11 -07001936 GRPC_CLOSURE_INIT(&call->receiving_initial_metadata_ready,
Craig Tiller91031da2016-12-28 15:44:25 -08001937 receiving_initial_metadata_ready, bctl,
1938 grpc_schedule_on_exec_ctx);
Craig Tiller9c1ec542017-03-02 08:42:54 -08001939 stream_op->recv_initial_metadata = true;
1940 stream_op_payload->recv_initial_metadata.recv_initial_metadata =
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001941 &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
Craig Tiller9c1ec542017-03-02 08:42:54 -08001942 stream_op_payload->recv_initial_metadata.recv_initial_metadata_ready =
Craig Tillera44cbfc2016-02-03 16:02:49 -08001943 &call->receiving_initial_metadata_ready;
Mark D. Roth764cf042017-09-01 09:00:06 -07001944 if (!call->is_client) {
1945 stream_op_payload->recv_initial_metadata.peer_string =
1946 &call->peer_string;
1947 }
Craig Tillera44cbfc2016-02-03 16:02:49 -08001948 num_completion_callbacks_needed++;
Craig Tillera82950e2015-09-22 12:33:20 -07001949 break;
Yash Tibrewal533d1182017-09-18 10:48:22 -07001950 }
1951 case GRPC_OP_RECV_MESSAGE: {
Craig Tillera82950e2015-09-22 12:33:20 -07001952 /* Flag validation: currently allow no flags */
1953 if (op->flags != 0) {
1954 error = GRPC_CALL_ERROR_INVALID_FLAGS;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001955 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001956 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001957 if (call->receiving_message) {
1958 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
yang-g48f3a712015-12-07 11:23:50 -08001959 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001960 }
Craig Tiller9c1ec542017-03-02 08:42:54 -08001961 call->receiving_message = true;
1962 stream_op->recv_message = true;
Mark D. Roth448c1f02017-01-25 10:44:30 -08001963 call->receiving_buffer = op->data.recv_message.recv_message;
Craig Tiller9c1ec542017-03-02 08:42:54 -08001964 stream_op_payload->recv_message.recv_message = &call->receiving_stream;
Mark D. Roth764cf042017-09-01 09:00:06 -07001965 GRPC_CLOSURE_INIT(&call->receiving_stream_ready,
1966 receiving_stream_ready_in_call_combiner, bctl,
1967 grpc_schedule_on_exec_ctx);
Craig Tiller9c1ec542017-03-02 08:42:54 -08001968 stream_op_payload->recv_message.recv_message_ready =
1969 &call->receiving_stream_ready;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001970 num_completion_callbacks_needed++;
Craig Tillera82950e2015-09-22 12:33:20 -07001971 break;
Yash Tibrewal533d1182017-09-18 10:48:22 -07001972 }
1973 case GRPC_OP_RECV_STATUS_ON_CLIENT: {
Craig Tillera82950e2015-09-22 12:33:20 -07001974 /* Flag validation: currently allow no flags */
1975 if (op->flags != 0) {
1976 error = GRPC_CALL_ERROR_INVALID_FLAGS;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001977 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001978 }
1979 if (!call->is_client) {
1980 error = GRPC_CALL_ERROR_NOT_ON_SERVER;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001981 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001982 }
Craig Tiller1cbf5762016-04-22 16:02:55 -07001983 if (call->requested_final_op) {
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001984 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1985 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07001986 }
Craig Tillere198b712017-03-31 15:29:33 -07001987 call->requested_final_op = true;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001988 call->buffered_metadata[1] =
Craig Tillera82950e2015-09-22 12:33:20 -07001989 op->data.recv_status_on_client.trailing_metadata;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001990 call->final_op.client.status = op->data.recv_status_on_client.status;
1991 call->final_op.client.status_details =
1992 op->data.recv_status_on_client.status_details;
Craig Tiller9c1ec542017-03-02 08:42:54 -08001993 stream_op->recv_trailing_metadata = true;
1994 stream_op->collect_stats = true;
1995 stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08001996 &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
Craig Tiller9c1ec542017-03-02 08:42:54 -08001997 stream_op_payload->collect_stats.collect_stats =
David Garcia Quintas5dde14c2016-07-28 17:29:27 -07001998 &call->final_info.stats.transport_stream_stats;
Craig Tillera82950e2015-09-22 12:33:20 -07001999 break;
Yash Tibrewal533d1182017-09-18 10:48:22 -07002000 }
2001 case GRPC_OP_RECV_CLOSE_ON_SERVER: {
Craig Tillera82950e2015-09-22 12:33:20 -07002002 /* Flag validation: currently allow no flags */
2003 if (op->flags != 0) {
2004 error = GRPC_CALL_ERROR_INVALID_FLAGS;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08002005 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07002006 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08002007 if (call->is_client) {
2008 error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
2009 goto done_with_error;
Craig Tillera82950e2015-09-22 12:33:20 -07002010 }
Craig Tiller1cbf5762016-04-22 16:02:55 -07002011 if (call->requested_final_op) {
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08002012 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
2013 goto done_with_error;
2014 }
Craig Tillere198b712017-03-31 15:29:33 -07002015 call->requested_final_op = true;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08002016 call->final_op.server.cancelled =
Craig Tillera82950e2015-09-22 12:33:20 -07002017 op->data.recv_close_on_server.cancelled;
Craig Tiller9c1ec542017-03-02 08:42:54 -08002018 stream_op->recv_trailing_metadata = true;
2019 stream_op->collect_stats = true;
2020 stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08002021 &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
Craig Tiller9c1ec542017-03-02 08:42:54 -08002022 stream_op_payload->collect_stats.collect_stats =
David Garcia Quintas5dde14c2016-07-28 17:29:27 -07002023 &call->final_info.stats.transport_stream_stats;
Craig Tillera82950e2015-09-22 12:33:20 -07002024 break;
Yash Tibrewal533d1182017-09-18 10:48:22 -07002025 }
Craig Tillerfb189f82015-02-03 12:07:07 -08002026 }
Craig Tillera82950e2015-09-22 12:33:20 -07002027 }
Craig Tillerfb189f82015-02-03 12:07:07 -08002028
Craig Tillera82950e2015-09-22 12:33:20 -07002029 GRPC_CALL_INTERNAL_REF(call, "completion");
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08002030 if (!is_notify_tag_closure) {
yang-g7d6b9142017-07-13 11:48:56 -07002031 GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08002032 }
2033 gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed);
Craig Tillerfb189f82015-02-03 12:07:07 -08002034
ncteisen969b46e2017-06-08 14:57:11 -07002035 GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl,
Craig Tiller91031da2016-12-28 15:44:25 -08002036 grpc_schedule_on_exec_ctx);
Craig Tiller6e7b45e2016-07-08 17:25:49 -07002037 stream_op->on_complete = &bctl->finish_batch;
Craig Tillerb597dcf2017-03-09 07:02:11 -08002038 gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08002039
Mark D. Roth764cf042017-09-01 09:00:06 -07002040 execute_batch(exec_ctx, call, stream_op, &bctl->start_batch);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08002041
Craig Tiller3ffd8222015-09-21 08:21:57 -07002042done:
Craig Tiller0ba432d2015-10-09 16:57:11 -07002043 GPR_TIMER_END("grpc_call_start_batch", 0);
Craig Tiller3ffd8222015-09-21 08:21:57 -07002044 return error;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08002045
2046done_with_error:
2047 /* reverse any mutations that occured */
Craig Tiller9c1ec542017-03-02 08:42:54 -08002048 if (stream_op->send_initial_metadata) {
Craig Tillere198b712017-03-31 15:29:33 -07002049 call->sent_initial_metadata = false;
Craig Tillera59c16c2016-10-31 07:25:01 -07002050 grpc_metadata_batch_clear(exec_ctx, &call->metadata_batch[0][0]);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08002051 }
Craig Tiller9c1ec542017-03-02 08:42:54 -08002052 if (stream_op->send_message) {
Craig Tillere198b712017-03-31 15:29:33 -07002053 call->sending_message = false;
Craig Tiller3b66ab92015-12-09 19:42:22 -08002054 grpc_byte_stream_destroy(exec_ctx, &call->sending_stream.base);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08002055 }
Craig Tiller9c1ec542017-03-02 08:42:54 -08002056 if (stream_op->send_trailing_metadata) {
Craig Tillere198b712017-03-31 15:29:33 -07002057 call->sent_final_op = false;
Craig Tillera59c16c2016-10-31 07:25:01 -07002058 grpc_metadata_batch_clear(exec_ctx, &call->metadata_batch[0][1]);
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08002059 }
Craig Tiller9c1ec542017-03-02 08:42:54 -08002060 if (stream_op->recv_initial_metadata) {
Craig Tillere198b712017-03-31 15:29:33 -07002061 call->received_initial_metadata = false;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08002062 }
Craig Tiller9c1ec542017-03-02 08:42:54 -08002063 if (stream_op->recv_message) {
Craig Tillere198b712017-03-31 15:29:33 -07002064 call->receiving_message = false;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08002065 }
Craig Tiller9c1ec542017-03-02 08:42:54 -08002066 if (stream_op->recv_trailing_metadata) {
Craig Tillere198b712017-03-31 15:29:33 -07002067 call->requested_final_op = false;
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08002068 }
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08002069 goto done;
2070}
2071
2072grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
2073 size_t nops, void *tag, void *reserved) {
2074 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
2075 grpc_call_error err;
2076
2077 GRPC_API_TRACE(
David Garcia Quintas46123372016-05-09 15:28:42 -07002078 "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, "
2079 "reserved=%p)",
Craig Tillerc7e1a2a2015-11-02 14:17:32 -08002080 5, (call, ops, (unsigned long)nops, tag, reserved));
2081
2082 if (reserved != NULL) {
2083 err = GRPC_CALL_ERROR;
2084 } else {
2085 err = call_start_batch(&exec_ctx, call, ops, nops, tag, 0);
2086 }
2087
2088 grpc_exec_ctx_finish(&exec_ctx);
2089 return err;
2090}
2091
2092grpc_call_error grpc_call_start_batch_and_execute(grpc_exec_ctx *exec_ctx,
2093 grpc_call *call,
2094 const grpc_op *ops,
2095 size_t nops,
2096 grpc_closure *closure) {
2097 return call_start_batch(exec_ctx, call, ops, nops, closure, 1);
Craig Tillerfb189f82015-02-03 12:07:07 -08002098}
Craig Tiller935cf422015-05-01 14:10:46 -07002099
Craig Tillera82950e2015-09-22 12:33:20 -07002100void grpc_call_context_set(grpc_call *call, grpc_context_index elem,
2101 void *value, void (*destroy)(void *value)) {
2102 if (call->context[elem].destroy) {
2103 call->context[elem].destroy(call->context[elem].value);
2104 }
Julien Boeuf83b02972015-05-20 22:50:34 -07002105 call->context[elem].value = value;
2106 call->context[elem].destroy = destroy;
Craig Tiller935cf422015-05-01 14:10:46 -07002107}
2108
Craig Tillera82950e2015-09-22 12:33:20 -07002109void *grpc_call_context_get(grpc_call *call, grpc_context_index elem) {
Julien Boeuf83b02972015-05-20 22:50:34 -07002110 return call->context[elem].value;
Craig Tiller935cf422015-05-01 14:10:46 -07002111}
Julien Boeuf9f218dd2015-04-23 10:24:02 -07002112
Craig Tiller7536af02015-12-22 13:49:30 -08002113uint8_t grpc_call_is_client(grpc_call *call) { return call->is_client; }
David Garcia Quintas13c2f6e2016-03-17 22:51:52 -07002114
2115grpc_compression_algorithm grpc_call_compression_for_level(
2116 grpc_call *call, grpc_compression_level level) {
David Garcia Quintasa301eaa2016-05-06 16:59:03 -07002117 grpc_compression_algorithm algo =
2118 compression_algorithm_for_level_locked(call, level);
David Garcia Quintasa301eaa2016-05-06 16:59:03 -07002119 return algo;
David Garcia Quintas13c2f6e2016-03-17 22:51:52 -07002120}
Yuchen Zeng2e7d9572016-04-15 17:29:57 -07002121
2122const char *grpc_call_error_to_string(grpc_call_error error) {
2123 switch (error) {
2124 case GRPC_CALL_ERROR:
2125 return "GRPC_CALL_ERROR";
2126 case GRPC_CALL_ERROR_ALREADY_ACCEPTED:
2127 return "GRPC_CALL_ERROR_ALREADY_ACCEPTED";
2128 case GRPC_CALL_ERROR_ALREADY_FINISHED:
2129 return "GRPC_CALL_ERROR_ALREADY_FINISHED";
2130 case GRPC_CALL_ERROR_ALREADY_INVOKED:
2131 return "GRPC_CALL_ERROR_ALREADY_INVOKED";
2132 case GRPC_CALL_ERROR_BATCH_TOO_BIG:
2133 return "GRPC_CALL_ERROR_BATCH_TOO_BIG";
2134 case GRPC_CALL_ERROR_INVALID_FLAGS:
2135 return "GRPC_CALL_ERROR_INVALID_FLAGS";
2136 case GRPC_CALL_ERROR_INVALID_MESSAGE:
2137 return "GRPC_CALL_ERROR_INVALID_MESSAGE";
2138 case GRPC_CALL_ERROR_INVALID_METADATA:
2139 return "GRPC_CALL_ERROR_INVALID_METADATA";
2140 case GRPC_CALL_ERROR_NOT_INVOKED:
2141 return "GRPC_CALL_ERROR_NOT_INVOKED";
2142 case GRPC_CALL_ERROR_NOT_ON_CLIENT:
2143 return "GRPC_CALL_ERROR_NOT_ON_CLIENT";
2144 case GRPC_CALL_ERROR_NOT_ON_SERVER:
2145 return "GRPC_CALL_ERROR_NOT_ON_SERVER";
2146 case GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE:
2147 return "GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE";
2148 case GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH:
2149 return "GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH";
2150 case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS:
2151 return "GRPC_CALL_ERROR_TOO_MANY_OPERATIONS";
yang-g0eaf7de2017-07-05 16:50:51 -07002152 case GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN:
2153 return "GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN";
Yuchen Zeng2e7d9572016-04-15 17:29:57 -07002154 case GRPC_CALL_OK:
2155 return "GRPC_CALL_OK";
Yuchen Zeng2e7d9572016-04-15 17:29:57 -07002156 }
Yuchen Zengf02bada2016-04-19 14:12:27 -07002157 GPR_UNREACHABLE_CODE(return "GRPC_CALL_ERROR_UNKNOW");
Yuchen Zeng2e7d9572016-04-15 17:29:57 -07002158}