blob: e08273e451cb1ad9ca3745c098f669996639f888 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller06059952015-02-18 08:34:56 -08003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
David Garcia Quintasf74a49e2015-06-18 17:22:45 -070033#include <assert.h>
34#include <stdio.h>
35#include <stdlib.h>
36#include <string.h>
37
38#include <grpc/compression.h>
39#include <grpc/support/alloc.h>
40#include <grpc/support/log.h>
41#include <grpc/support/string_util.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080042
Alistair Veitch9686dab2015-05-26 14:26:47 -070043#include "src/core/census/grpc_context.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080044#include "src/core/channel/channel_stack.h"
ctiller18b49ab2014-12-09 14:39:16 -080045#include "src/core/iomgr/alarm.h"
Craig Tiller50968492015-04-28 17:05:49 -070046#include "src/core/profiling/timers.h"
Craig Tiller485d7762015-01-23 12:54:05 -080047#include "src/core/support/string.h"
Craig Tiller1e0d4c42015-01-30 16:17:29 -080048#include "src/core/surface/byte_buffer_queue.h"
David Garcia Quintasf74a49e2015-06-18 17:22:45 -070049#include "src/core/surface/call.h"
ctiller18b49ab2014-12-09 14:39:16 -080050#include "src/core/surface/channel.h"
51#include "src/core/surface/completion_queue.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080052
Craig Tiller1b011672015-07-10 10:41:44 -070053/** The maximum number of completions possible.
54 Based upon the maximum number of individually queueable ops in the batch
55 api:
56 - initial metadata send
57 - message send
58 - status/close send (depending on client/server)
59 - initial metadata recv
60 - message recv
61 - status/close recv (depending on client/server) */
62#define MAX_CONCURRENT_COMPLETIONS 6
63
Craig Tiller8eb9d472015-01-27 17:00:03 -080064typedef enum { REQ_INITIAL = 0, REQ_READY, REQ_DONE } req_state;
65
66typedef enum {
67 SEND_NOTHING,
68 SEND_INITIAL_METADATA,
Craig Tiller7bd9b992015-02-04 08:38:02 -080069 SEND_BUFFERED_INITIAL_METADATA,
Craig Tiller8eb9d472015-01-27 17:00:03 -080070 SEND_MESSAGE,
Craig Tiller7bd9b992015-02-04 08:38:02 -080071 SEND_BUFFERED_MESSAGE,
Craig Tiller1c141902015-01-31 08:51:54 -080072 SEND_TRAILING_METADATA_AND_FINISH,
Craig Tiller8eb9d472015-01-27 17:00:03 -080073 SEND_FINISH
74} send_action;
75
76typedef struct {
77 grpc_ioreq_completion_func on_complete;
78 void *user_data;
Craig Tiller64be9f72015-05-04 14:53:51 -070079 int success;
Craig Tiller8eb9d472015-01-27 17:00:03 -080080} completed_request;
81
Craig Tillerc12fee62015-02-03 11:55:50 -080082/* See request_set in grpc_call below for a description */
Craig Tiller6902ad22015-04-16 08:01:49 -070083#define REQSET_EMPTY 'X'
84#define REQSET_DONE 'Y'
85
86#define MAX_SEND_INITIAL_METADATA_COUNT 3
Craig Tiller1e0d4c42015-01-30 16:17:29 -080087
Craig Tillerc18c56e2015-02-02 15:59:13 -080088typedef struct {
Craig Tillerdaceea82015-02-02 16:15:53 -080089 /* Overall status of the operation: starts OK, may degrade to
90 non-OK */
Craig Tillercae5bf52015-07-02 08:46:59 -070091 gpr_uint8 success;
Craig Tillerd642dcf2015-02-03 20:39:09 -080092 /* a bit mask of which request ops are needed (1u << opid) */
Craig Tiller58ce3f02015-04-22 07:54:24 -070093 gpr_uint16 need_mask;
Craig Tillerdaceea82015-02-02 16:15:53 -080094 /* a bit mask of which request ops are now completed */
Craig Tiller58ce3f02015-04-22 07:54:24 -070095 gpr_uint16 complete_mask;
Craig Tillercae5bf52015-07-02 08:46:59 -070096 /* Completion function to call at the end of the operation */
97 grpc_ioreq_completion_func on_complete;
98 void *user_data;
Craig Tillerc18c56e2015-02-02 15:59:13 -080099} reqinfo_master;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800100
Craig Tillerdaceea82015-02-02 16:15:53 -0800101/* Status data for a request can come from several sources; this
102 enumerates them all, and acts as a priority sorting for which
103 status to return to the application - earlier entries override
104 later ones */
Craig Tiller68752722015-01-29 14:59:54 -0800105typedef enum {
Craig Tillerdaceea82015-02-02 16:15:53 -0800106 /* Status came from the application layer overriding whatever
107 the wire says */
Craig Tiller68752722015-01-29 14:59:54 -0800108 STATUS_FROM_API_OVERRIDE = 0,
Craig Tiller8b282cb2015-04-17 14:57:44 -0700109 /* Status was created by some internal channel stack operation */
110 STATUS_FROM_CORE,
Craig Tillerdaceea82015-02-02 16:15:53 -0800111 /* Status came from 'the wire' - or somewhere below the surface
112 layer */
Craig Tiller68752722015-01-29 14:59:54 -0800113 STATUS_FROM_WIRE,
Craig Tilleraea081f2015-06-11 14:19:33 -0700114 /* Status came from the server sending status */
115 STATUS_FROM_SERVER_STATUS,
Craig Tiller68752722015-01-29 14:59:54 -0800116 STATUS_SOURCE_COUNT
117} status_source;
118
119typedef struct {
Craig Tillerdaceea82015-02-02 16:15:53 -0800120 gpr_uint8 is_set;
Craig Tiller68752722015-01-29 14:59:54 -0800121 grpc_status_code code;
122 grpc_mdstr *details;
123} received_status;
124
Craig Tillerdaceea82015-02-02 16:15:53 -0800125/* How far through the GRPC stream have we read? */
126typedef enum {
127 /* We are still waiting for initial metadata to complete */
Craig Tillerc12fee62015-02-03 11:55:50 -0800128 READ_STATE_INITIAL = 0,
Craig Tillerdaceea82015-02-02 16:15:53 -0800129 /* We have gotten initial metadata, and are reading either
130 messages or trailing metadata */
131 READ_STATE_GOT_INITIAL_METADATA,
132 /* The stream is closed for reading */
133 READ_STATE_READ_CLOSED,
134 /* The stream is closed for reading & writing */
135 READ_STATE_STREAM_CLOSED
136} read_state;
137
Craig Tillerc12fee62015-02-03 11:55:50 -0800138typedef enum {
139 WRITE_STATE_INITIAL = 0,
140 WRITE_STATE_STARTED,
141 WRITE_STATE_WRITE_CLOSED
142} write_state;
143
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800144struct grpc_call {
145 grpc_completion_queue *cq;
146 grpc_channel *channel;
147 grpc_mdctx *metadata_context;
Craig Tillercce17ac2015-01-20 09:29:28 -0800148 /* TODO(ctiller): share with cq if possible? */
149 gpr_mu mu;
Craig Tiller396fab22015-07-09 14:14:09 -0700150 gpr_mu completion_mu;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800151
Craig Tillere5d683c2015-02-03 16:37:36 -0800152 /* how far through the stream have we read? */
Craig Tillerdaceea82015-02-02 16:15:53 -0800153 read_state read_state;
Craig Tillere5d683c2015-02-03 16:37:36 -0800154 /* how far through the stream have we written? */
Craig Tillerc12fee62015-02-03 11:55:50 -0800155 write_state write_state;
Craig Tillere5d683c2015-02-03 16:37:36 -0800156 /* client or server call */
157 gpr_uint8 is_client;
158 /* is the alarm set */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800159 gpr_uint8 have_alarm;
Craig Tillere5d683c2015-02-03 16:37:36 -0800160 /* are we currently performing a send operation */
Craig Tiller8eb9d472015-01-27 17:00:03 -0800161 gpr_uint8 sending;
Craig Tiller629b0ed2015-04-22 11:14:26 -0700162 /* are we currently performing a recv operation */
163 gpr_uint8 receiving;
Craig Tiller991ca9f2015-03-03 09:59:22 -0800164 /* are we currently completing requests */
165 gpr_uint8 completing;
Craig Tillerf3fba742015-06-11 09:36:33 -0700166 /** has grpc_call_destroy been called */
167 gpr_uint8 destroy_called;
Craig Tillere5d683c2015-02-03 16:37:36 -0800168 /* pairs with completed_requests */
Craig Tiller8eb9d472015-01-27 17:00:03 -0800169 gpr_uint8 num_completed_requests;
Craig Tiller629b0ed2015-04-22 11:14:26 -0700170 /* are we currently reading a message? */
171 gpr_uint8 reading_message;
Craig Tillerbac41422015-05-29 16:32:28 -0700172 /* have we bound a pollset yet? */
173 gpr_uint8 bound_pollset;
yang-g0b6ad7d2015-06-25 14:39:01 -0700174 /* is an error status set */
175 gpr_uint8 error_status_set;
Craig Tiller77f04612015-07-01 13:39:45 -0700176 /** should the alarm be cancelled */
177 gpr_uint8 cancel_alarm;
Craig Tiller97fc6a32015-07-08 15:31:35 -0700178 /** bitmask of allocated completion events in completions */
179 gpr_uint8 allocated_completions;
yang-g0b6ad7d2015-06-25 14:39:01 -0700180
Craig Tiller83f88d92015-04-21 16:02:05 -0700181 /* flags with bits corresponding to write states allowing us to determine
182 what was sent */
Craig Tiller58ce3f02015-04-22 07:54:24 -0700183 gpr_uint16 last_send_contains;
Craig Tiller5dde66e2015-06-02 09:05:23 -0700184 /* cancel with this status on the next outgoing transport op */
185 grpc_status_code cancel_with_status;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800186
Craig Tillere5d683c2015-02-03 16:37:36 -0800187 /* Active ioreqs.
188 request_set and request_data contain one element per active ioreq
189 operation.
Craig Tillerebf94bf2015-02-05 08:48:46 -0800190
Craig Tillere5d683c2015-02-03 16:37:36 -0800191 request_set[op] is an integer specifying a set of operations to which
192 the request belongs:
Craig Tillerebf94bf2015-02-05 08:48:46 -0800193 - if it is < GRPC_IOREQ_OP_COUNT, then this operation is pending
Craig Tillere5d683c2015-02-03 16:37:36 -0800194 completion, and the integer represents to which group of operations
195 the ioreq belongs. Each group is represented by one master, and the
196 integer in request_set is an index into masters to find the master
197 data.
198 - if it is REQSET_EMPTY, the ioreq op is inactive and available to be
199 started
200 - finally, if request_set[op] is REQSET_DONE, then the operation is
201 complete and unavailable to be started again
Craig Tillerebf94bf2015-02-05 08:48:46 -0800202
Craig Tillere5d683c2015-02-03 16:37:36 -0800203 request_data[op] is the request data as supplied by the initiator of
204 a request, and is valid iff request_set[op] <= GRPC_IOREQ_OP_COUNT.
205 The set fields are as per the request type specified by op.
206
Craig Tillerd6731622015-02-03 22:44:13 -0800207 Finally, one element of masters is set per active _set_ of ioreq
Craig Tillere5d683c2015-02-03 16:37:36 -0800208 operations. It describes work left outstanding, result status, and
209 what work to perform upon operation completion. As one ioreq of each
210 op type can be active at once, by convention we choose the first element
Craig Tillerd6731622015-02-03 22:44:13 -0800211 of the group to be the master -- ie the master of in-progress operation
212 op is masters[request_set[op]]. This allows constant time allocation
Craig Tillere5d683c2015-02-03 16:37:36 -0800213 and a strong upper bound of a count of masters to be calculated. */
Craig Tillerc12fee62015-02-03 11:55:50 -0800214 gpr_uint8 request_set[GRPC_IOREQ_OP_COUNT];
215 grpc_ioreq_data request_data[GRPC_IOREQ_OP_COUNT];
David Garcia Quintas1d5aca52015-06-14 14:42:04 -0700216 gpr_uint32 request_flags[GRPC_IOREQ_OP_COUNT];
Craig Tillerc18c56e2015-02-02 15:59:13 -0800217 reqinfo_master masters[GRPC_IOREQ_OP_COUNT];
Craig Tillere5d683c2015-02-03 16:37:36 -0800218
219 /* Dynamic array of ioreq's that have completed: the count of
220 elements is queued in num_completed_requests.
221 This list is built up under lock(), and flushed entirely during
222 unlock().
223 We know the upper bound of the number of elements as we can only
224 have one ioreq of each type active at once. */
Craig Tiller8eb9d472015-01-27 17:00:03 -0800225 completed_request completed_requests[GRPC_IOREQ_OP_COUNT];
Craig Tillere5d683c2015-02-03 16:37:36 -0800226 /* Incoming buffer of messages */
Craig Tiller1e0d4c42015-01-30 16:17:29 -0800227 grpc_byte_buffer_queue incoming_queue;
Craig Tillere5d683c2015-02-03 16:37:36 -0800228 /* Buffered read metadata waiting to be returned to the application.
229 Element 0 is initial metadata, element 1 is trailing metadata. */
Craig Tillerc12fee62015-02-03 11:55:50 -0800230 grpc_metadata_array buffered_metadata[2];
Craig Tillere5d683c2015-02-03 16:37:36 -0800231 /* All metadata received - unreffed at once at the end of the call */
Craig Tiller3a4749f2015-01-30 07:51:45 -0800232 grpc_mdelem **owned_metadata;
233 size_t owned_metadata_count;
234 size_t owned_metadata_capacity;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800235
Craig Tillere5d683c2015-02-03 16:37:36 -0800236 /* Received call statuses from various sources */
Craig Tiller68752722015-01-29 14:59:54 -0800237 received_status status[STATUS_SOURCE_COUNT];
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800238
David Garcia Quintas8a187092015-07-01 14:52:44 -0700239 /* Compression algorithm for the call */
David Garcia Quintasd16af0e2015-06-22 22:39:21 -0700240 grpc_compression_algorithm compression_algorithm;
David Garcia Quintasdb94b272015-06-15 18:37:01 -0700241
Julien Boeufc6f8d0a2015-05-11 22:40:02 -0700242 /* Contexts for various subsystems (security, tracing, ...). */
Julien Boeuf83b02972015-05-20 22:50:34 -0700243 grpc_call_context_element context[GRPC_CONTEXT_COUNT];
Craig Tiller935cf422015-05-01 14:10:46 -0700244
Craig Tillere5d683c2015-02-03 16:37:36 -0800245 /* Deadline alarm - if have_alarm is non-zero */
Craig Tillercce17ac2015-01-20 09:29:28 -0800246 grpc_alarm alarm;
247
Craig Tillere5d683c2015-02-03 16:37:36 -0800248 /* Call refcount - to keep the call alive during asynchronous operations */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800249 gpr_refcount internal_refcount;
Craig Tillercce17ac2015-01-20 09:29:28 -0800250
Craig Tiller6902ad22015-04-16 08:01:49 -0700251 grpc_linked_mdelem send_initial_metadata[MAX_SEND_INITIAL_METADATA_COUNT];
252 grpc_linked_mdelem status_link;
253 grpc_linked_mdelem details_link;
254 size_t send_initial_metadata_count;
255 gpr_timespec send_deadline;
256
Craig Tiller83f88d92015-04-21 16:02:05 -0700257 grpc_stream_op_buffer send_ops;
258 grpc_stream_op_buffer recv_ops;
259 grpc_stream_state recv_state;
260
Craig Tiller629b0ed2015-04-22 11:14:26 -0700261 gpr_slice_buffer incoming_message;
262 gpr_uint32 incoming_message_length;
David Garcia Quintas1d5aca52015-06-14 14:42:04 -0700263 gpr_uint32 incoming_message_flags;
David Garcia Quintas284488b2015-05-28 16:27:39 -0700264 grpc_iomgr_closure destroy_closure;
Craig Tiller1e6facb2015-06-11 22:47:11 -0700265 grpc_iomgr_closure on_done_recv;
266 grpc_iomgr_closure on_done_send;
267 grpc_iomgr_closure on_done_bind;
Craig Tiller97fc6a32015-07-08 15:31:35 -0700268
269 /** completion events - for completion queue use */
Craig Tiller1b011672015-07-10 10:41:44 -0700270 grpc_cq_completion completions[MAX_CONCURRENT_COMPLETIONS];
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800271};
272
Craig Tiller87d5b192015-04-16 14:37:57 -0700273#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800274#define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1)
275#define CALL_ELEM_FROM_CALL(call, idx) \
276 grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx)
277#define CALL_FROM_TOP_ELEM(top_elem) \
278 CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
279
Craig Tiller6902ad22015-04-16 08:01:49 -0700280static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline);
Craig Tiller83f88d92015-04-21 16:02:05 -0700281static void call_on_done_recv(void *call, int success);
282static void call_on_done_send(void *call, int success);
Craig Tillerb7959a02015-06-25 08:50:54 -0700283static int fill_send_ops(grpc_call *call, grpc_transport_stream_op *op);
284static void execute_op(grpc_call *call, grpc_transport_stream_op *op);
Craig Tiller629b0ed2015-04-22 11:14:26 -0700285static void recv_metadata(grpc_call *call, grpc_metadata_batch *metadata);
286static void finish_read_ops(grpc_call *call);
Craig Tillerf9fae982015-05-29 23:02:18 -0700287static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status,
Craig Tiller5dde66e2015-06-02 09:05:23 -0700288 const char *description);
Craig Tiller1e6facb2015-06-11 22:47:11 -0700289static void finished_loose_op(void *call, int success);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800290
Craig Tillerbac41422015-05-29 16:32:28 -0700291static void lock(grpc_call *call);
292static void unlock(grpc_call *call);
293
Craig Tillerfb189f82015-02-03 12:07:07 -0800294grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
Craig Tiller87d5b192015-04-16 14:37:57 -0700295 const void *server_transport_data,
296 grpc_mdelem **add_initial_metadata,
297 size_t add_initial_metadata_count,
298 gpr_timespec send_deadline) {
Craig Tiller1e0d4c42015-01-30 16:17:29 -0800299 size_t i;
Craig Tillerb7959a02015-06-25 08:50:54 -0700300 grpc_transport_stream_op initial_op;
301 grpc_transport_stream_op *initial_op_ptr = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800302 grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel);
303 grpc_call *call =
304 gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size);
Craig Tillercce17ac2015-01-20 09:29:28 -0800305 memset(call, 0, sizeof(grpc_call));
306 gpr_mu_init(&call->mu);
Craig Tiller396fab22015-07-09 14:14:09 -0700307 gpr_mu_init(&call->completion_mu);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800308 call->channel = channel;
Craig Tillerfb189f82015-02-03 12:07:07 -0800309 call->cq = cq;
Craig Tiller09f2bdc2015-06-03 00:50:56 -0700310 if (cq) {
Craig Tiller7ae5a382015-06-03 07:58:58 -0700311 GRPC_CQ_INTERNAL_REF(cq, "bind");
Craig Tiller09f2bdc2015-06-03 00:50:56 -0700312 }
Craig Tillercce17ac2015-01-20 09:29:28 -0800313 call->is_client = server_transport_data == NULL;
Craig Tiller1e0d4c42015-01-30 16:17:29 -0800314 for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
Craig Tillerc12fee62015-02-03 11:55:50 -0800315 call->request_set[i] = REQSET_EMPTY;
Craig Tiller1e0d4c42015-01-30 16:17:29 -0800316 }
Craig Tiller23aa6c42015-01-27 17:16:12 -0800317 if (call->is_client) {
Craig Tillerc12fee62015-02-03 11:55:50 -0800318 call->request_set[GRPC_IOREQ_SEND_TRAILING_METADATA] = REQSET_DONE;
319 call->request_set[GRPC_IOREQ_SEND_STATUS] = REQSET_DONE;
Craig Tiller23aa6c42015-01-27 17:16:12 -0800320 }
Craig Tiller6902ad22015-04-16 08:01:49 -0700321 GPR_ASSERT(add_initial_metadata_count < MAX_SEND_INITIAL_METADATA_COUNT);
322 for (i = 0; i < add_initial_metadata_count; i++) {
323 call->send_initial_metadata[i].md = add_initial_metadata[i];
324 }
325 call->send_initial_metadata_count = add_initial_metadata_count;
326 call->send_deadline = send_deadline;
Craig Tiller9ec2a522015-05-29 22:46:54 -0700327 GRPC_CHANNEL_INTERNAL_REF(channel, "call");
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800328 call->metadata_context = grpc_channel_get_metadata_context(channel);
Craig Tillerfbf5be22015-04-22 16:17:09 -0700329 grpc_sopb_init(&call->send_ops);
330 grpc_sopb_init(&call->recv_ops);
Craig Tillerfa4f9942015-04-23 15:22:09 -0700331 gpr_slice_buffer_init(&call->incoming_message);
Craig Tiller1e6facb2015-06-11 22:47:11 -0700332 grpc_iomgr_closure_init(&call->on_done_recv, call_on_done_recv, call);
333 grpc_iomgr_closure_init(&call->on_done_send, call_on_done_send, call);
334 grpc_iomgr_closure_init(&call->on_done_bind, finished_loose_op, call);
Craig Tillerf3fba742015-06-11 09:36:33 -0700335 /* dropped in destroy and when READ_STATE_STREAM_CLOSED received */
336 gpr_ref_init(&call->internal_refcount, 2);
Craig Tiller7e8489a2015-04-23 12:41:16 -0700337 /* server hack: start reads immediately so we can get initial metadata.
338 TODO(ctiller): figure out a cleaner solution */
339 if (!call->is_client) {
340 memset(&initial_op, 0, sizeof(initial_op));
341 initial_op.recv_ops = &call->recv_ops;
342 initial_op.recv_state = &call->recv_state;
Craig Tiller1e6facb2015-06-11 22:47:11 -0700343 initial_op.on_done_recv = &call->on_done_recv;
Craig Tiller935cf422015-05-01 14:10:46 -0700344 initial_op.context = call->context;
Craig Tiller7e8489a2015-04-23 12:41:16 -0700345 call->receiving = 1;
Craig Tiller4df412b2015-04-28 07:57:54 -0700346 GRPC_CALL_INTERNAL_REF(call, "receiving");
Craig Tiller7e8489a2015-04-23 12:41:16 -0700347 initial_op_ptr = &initial_op;
348 }
349 grpc_call_stack_init(channel_stack, server_transport_data, initial_op_ptr,
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800350 CALL_STACK_FROM_CALL(call));
Craig Tiller6a7626c2015-07-19 22:21:41 -0700351 if (gpr_time_cmp(send_deadline, gpr_inf_future(send_deadline.clock_type)) != 0) {
Craig Tiller6902ad22015-04-16 08:01:49 -0700352 set_deadline_alarm(call, send_deadline);
353 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800354 return call;
355}
356
Craig Tiller166e2502015-02-03 20:14:41 -0800357void grpc_call_set_completion_queue(grpc_call *call,
358 grpc_completion_queue *cq) {
Craig Tillerbac41422015-05-29 16:32:28 -0700359 lock(call);
Craig Tiller166e2502015-02-03 20:14:41 -0800360 call->cq = cq;
Craig Tiller09f2bdc2015-06-03 00:50:56 -0700361 if (cq) {
Craig Tiller7ae5a382015-06-03 07:58:58 -0700362 GRPC_CQ_INTERNAL_REF(cq, "bind");
Craig Tiller09f2bdc2015-06-03 00:50:56 -0700363 }
Craig Tillerbac41422015-05-29 16:32:28 -0700364 unlock(call);
Craig Tiller166e2502015-02-03 20:14:41 -0800365}
366
Craig Tiller24be0f72015-02-10 14:04:22 -0800367grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call) {
368 return call->cq;
369}
370
Craig Tillerb4580d42015-07-10 10:42:34 -0700371static grpc_cq_completion *allocate_completion(grpc_call *call) {
Craig Tiller97fc6a32015-07-08 15:31:35 -0700372 gpr_uint8 i;
Craig Tiller396fab22015-07-09 14:14:09 -0700373 gpr_mu_lock(&call->completion_mu);
Craig Tiller97fc6a32015-07-08 15:31:35 -0700374 for (i = 0; i < GPR_ARRAY_SIZE(call->completions); i++) {
375 if (call->allocated_completions & (1u << i)) {
376 continue;
377 }
378 call->allocated_completions |= 1u << i;
Craig Tiller396fab22015-07-09 14:14:09 -0700379 gpr_mu_unlock(&call->completion_mu);
Craig Tiller97fc6a32015-07-08 15:31:35 -0700380 return &call->completions[i];
381 }
382 gpr_log(GPR_ERROR, "should never reach here");
383 abort();
384}
385
Craig Tillerb4580d42015-07-10 10:42:34 -0700386static void done_completion(void *call, grpc_cq_completion *completion) {
Craig Tiller97fc6a32015-07-08 15:31:35 -0700387 grpc_call *c = call;
Craig Tiller396fab22015-07-09 14:14:09 -0700388 gpr_mu_lock(&c->completion_mu);
Craig Tiller97fc6a32015-07-08 15:31:35 -0700389 c->allocated_completions &= ~(1u << (completion - c->completions));
Craig Tiller396fab22015-07-09 14:14:09 -0700390 gpr_mu_unlock(&c->completion_mu);
Craig Tiller97fc6a32015-07-08 15:31:35 -0700391 GRPC_CALL_INTERNAL_UNREF(c, "completion", 1);
392}
393
Craig Tiller4df412b2015-04-28 07:57:54 -0700394#ifdef GRPC_CALL_REF_COUNT_DEBUG
395void grpc_call_internal_ref(grpc_call *c, const char *reason) {
396 gpr_log(GPR_DEBUG, "CALL: ref %p %d -> %d [%s]", c,
397 c->internal_refcount.count, c->internal_refcount.count + 1, reason);
398#else
399void grpc_call_internal_ref(grpc_call *c) {
400#endif
401 gpr_ref(&c->internal_refcount);
402}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800403
Craig Tilleraef25da2015-01-29 17:19:45 -0800404static void destroy_call(void *call, int ignored_success) {
Craig Tiller566316f2015-02-02 15:25:32 -0800405 size_t i;
Craig Tilleraef25da2015-01-29 17:19:45 -0800406 grpc_call *c = call;
Craig Tillera4541102015-01-29 11:46:11 -0800407 grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c));
Craig Tiller9ec2a522015-05-29 22:46:54 -0700408 GRPC_CHANNEL_INTERNAL_UNREF(c->channel, "call");
Craig Tillera4541102015-01-29 11:46:11 -0800409 gpr_mu_destroy(&c->mu);
Craig Tiller396fab22015-07-09 14:14:09 -0700410 gpr_mu_destroy(&c->completion_mu);
Craig Tiller68752722015-01-29 14:59:54 -0800411 for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
412 if (c->status[i].details) {
Craig Tiller1a65a232015-07-06 10:22:32 -0700413 GRPC_MDSTR_UNREF(c->status[i].details);
Craig Tiller68752722015-01-29 14:59:54 -0800414 }
Craig Tillera4541102015-01-29 11:46:11 -0800415 }
Craig Tiller3a4749f2015-01-30 07:51:45 -0800416 for (i = 0; i < c->owned_metadata_count; i++) {
Craig Tiller1a65a232015-07-06 10:22:32 -0700417 GRPC_MDELEM_UNREF(c->owned_metadata[i]);
Craig Tiller3a4749f2015-01-30 07:51:45 -0800418 }
419 gpr_free(c->owned_metadata);
Craig Tillerc12fee62015-02-03 11:55:50 -0800420 for (i = 0; i < GPR_ARRAY_SIZE(c->buffered_metadata); i++) {
421 gpr_free(c->buffered_metadata[i].metadata);
422 }
Craig Tillereb40a532015-04-17 16:46:20 -0700423 for (i = 0; i < c->send_initial_metadata_count; i++) {
Craig Tiller1a65a232015-07-06 10:22:32 -0700424 GRPC_MDELEM_UNREF(c->send_initial_metadata[i].md);
Craig Tillereb40a532015-04-17 16:46:20 -0700425 }
Craig Tiller935cf422015-05-01 14:10:46 -0700426 for (i = 0; i < GRPC_CONTEXT_COUNT; i++) {
Julien Boeuf83b02972015-05-20 22:50:34 -0700427 if (c->context[i].destroy) {
428 c->context[i].destroy(c->context[i].value);
Craig Tiller935cf422015-05-01 14:10:46 -0700429 }
430 }
Craig Tillerb56ca8d2015-04-24 17:16:22 -0700431 grpc_sopb_destroy(&c->send_ops);
432 grpc_sopb_destroy(&c->recv_ops);
Craig Tiller37bbead2015-02-05 08:43:49 -0800433 grpc_bbq_destroy(&c->incoming_queue);
Craig Tillerfa4f9942015-04-23 15:22:09 -0700434 gpr_slice_buffer_destroy(&c->incoming_message);
Craig Tiller09f2bdc2015-06-03 00:50:56 -0700435 if (c->cq) {
Craig Tiller7ae5a382015-06-03 07:58:58 -0700436 GRPC_CQ_INTERNAL_UNREF(c->cq, "bind");
Craig Tiller09f2bdc2015-06-03 00:50:56 -0700437 }
Craig Tillera4541102015-01-29 11:46:11 -0800438 gpr_free(c);
439}
440
Craig Tiller4df412b2015-04-28 07:57:54 -0700441#ifdef GRPC_CALL_REF_COUNT_DEBUG
442void grpc_call_internal_unref(grpc_call *c, const char *reason,
443 int allow_immediate_deletion) {
444 gpr_log(GPR_DEBUG, "CALL: unref %p %d -> %d [%s]", c,
445 c->internal_refcount.count, c->internal_refcount.count - 1, reason);
446#else
Craig Tiller64bc3fd2015-04-24 17:07:12 -0700447void grpc_call_internal_unref(grpc_call *c, int allow_immediate_deletion) {
Craig Tiller4df412b2015-04-28 07:57:54 -0700448#endif
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800449 if (gpr_unref(&c->internal_refcount)) {
Craig Tilleraef25da2015-01-29 17:19:45 -0800450 if (allow_immediate_deletion) {
451 destroy_call(c, 1);
452 } else {
David Garcia Quintas284488b2015-05-28 16:27:39 -0700453 c->destroy_closure.cb = destroy_call;
454 c->destroy_closure.cb_arg = c;
455 grpc_iomgr_add_callback(&c->destroy_closure);
Craig Tilleraef25da2015-01-29 17:19:45 -0800456 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800457 }
458}
459
Craig Tiller928fbc82015-01-29 15:06:42 -0800460static void set_status_code(grpc_call *call, status_source source,
461 gpr_uint32 status) {
Craig Tillerb8d3a312015-06-19 17:27:53 -0700462 if (call->status[source].is_set) return;
463
Craig Tillerdaceea82015-02-02 16:15:53 -0800464 call->status[source].is_set = 1;
Craig Tiller68752722015-01-29 14:59:54 -0800465 call->status[source].code = status;
yang-g0b6ad7d2015-06-25 14:39:01 -0700466 call->error_status_set = status != GRPC_STATUS_OK;
Craig Tiller30547562015-02-05 17:04:51 -0800467
Craig Tillerd1abc812015-05-06 14:35:19 -0700468 if (status != GRPC_STATUS_OK && !grpc_bbq_empty(&call->incoming_queue)) {
Craig Tiller30547562015-02-05 17:04:51 -0800469 grpc_bbq_flush(&call->incoming_queue);
470 }
Craig Tiller68752722015-01-29 14:59:54 -0800471}
472
David Garcia Quintasd16af0e2015-06-22 22:39:21 -0700473static void set_compression_algorithm(grpc_call *call,
474 grpc_compression_algorithm algo) {
475 call->compression_algorithm = algo;
David Garcia Quintasdb94b272015-06-15 18:37:01 -0700476}
477
David Garcia Quintasd317e752015-07-15 00:09:27 -0700478grpc_compression_algorithm grpc_call_get_compression_algorithm(
479 const grpc_call *call) {
480 return call->compression_algorithm;
Craig Tiller68752722015-01-29 14:59:54 -0800481}
482
Craig Tiller928fbc82015-01-29 15:06:42 -0800483static void set_status_details(grpc_call *call, status_source source,
484 grpc_mdstr *status) {
Craig Tiller68752722015-01-29 14:59:54 -0800485 if (call->status[source].details != NULL) {
Craig Tiller1a65a232015-07-06 10:22:32 -0700486 GRPC_MDSTR_UNREF(call->status[source].details);
Craig Tiller68752722015-01-29 14:59:54 -0800487 }
488 call->status[source].details = status;
489}
490
Craig Tillerc12fee62015-02-03 11:55:50 -0800491static int is_op_live(grpc_call *call, grpc_ioreq_op op) {
492 gpr_uint8 set = call->request_set[op];
493 reqinfo_master *master;
494 if (set >= GRPC_IOREQ_OP_COUNT) return 0;
495 master = &call->masters[set];
Craig Tillerd642dcf2015-02-03 20:39:09 -0800496 return (master->complete_mask & (1u << op)) == 0;
Craig Tillerc12fee62015-02-03 11:55:50 -0800497}
498
Craig Tiller8eb9d472015-01-27 17:00:03 -0800499static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); }
Craig Tillercce17ac2015-01-20 09:29:28 -0800500
Craig Tiller629b0ed2015-04-22 11:14:26 -0700501static int need_more_data(grpc_call *call) {
Craig Tillerd4919d02015-05-20 16:08:18 -0700502 if (call->read_state == READ_STATE_STREAM_CLOSED) return 0;
Craig Tiller5dde66e2015-06-02 09:05:23 -0700503 /* TODO(ctiller): this needs some serious cleanup */
Craig Tiller629b0ed2015-04-22 11:14:26 -0700504 return is_op_live(call, GRPC_IOREQ_RECV_INITIAL_METADATA) ||
Craig Tillerf9fae982015-05-29 23:02:18 -0700505 (is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) &&
506 grpc_bbq_empty(&call->incoming_queue)) ||
Craig Tiller629b0ed2015-04-22 11:14:26 -0700507 is_op_live(call, GRPC_IOREQ_RECV_TRAILING_METADATA) ||
508 is_op_live(call, GRPC_IOREQ_RECV_STATUS) ||
509 is_op_live(call, GRPC_IOREQ_RECV_STATUS_DETAILS) ||
Craig Tiller4df412b2015-04-28 07:57:54 -0700510 (is_op_live(call, GRPC_IOREQ_RECV_CLOSE) &&
511 grpc_bbq_empty(&call->incoming_queue)) ||
Craig Tiller5dde66e2015-06-02 09:05:23 -0700512 (call->write_state == WRITE_STATE_INITIAL && !call->is_client) ||
Craig Tillerf1bff012015-07-06 11:20:50 -0700513 (call->cancel_with_status != GRPC_STATUS_OK) || call->destroy_called;
Craig Tiller629b0ed2015-04-22 11:14:26 -0700514}
515
Craig Tiller8eb9d472015-01-27 17:00:03 -0800516static void unlock(grpc_call *call) {
Craig Tillerb7959a02015-06-25 08:50:54 -0700517 grpc_transport_stream_op op;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800518 completed_request completed_requests[GRPC_IOREQ_OP_COUNT];
Craig Tiller991ca9f2015-03-03 09:59:22 -0800519 int completing_requests = 0;
Craig Tiller83f88d92015-04-21 16:02:05 -0700520 int start_op = 0;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800521 int i;
Craig Tiller4efb6962015-06-03 09:32:41 -0700522 const gpr_uint32 MAX_RECV_PEEK_AHEAD = 65536;
523 size_t buffered_bytes;
Craig Tiller77f04612015-07-01 13:39:45 -0700524 int cancel_alarm = 0;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800525
Craig Tiller83f88d92015-04-21 16:02:05 -0700526 memset(&op, 0, sizeof(op));
527
Craig Tiller5dde66e2015-06-02 09:05:23 -0700528 op.cancel_with_status = call->cancel_with_status;
529 start_op = op.cancel_with_status != GRPC_STATUS_OK;
530 call->cancel_with_status = GRPC_STATUS_OK; /* reset */
Craig Tillerbac41422015-05-29 16:32:28 -0700531
Craig Tiller77f04612015-07-01 13:39:45 -0700532 cancel_alarm = call->cancel_alarm;
533 call->cancel_alarm = 0;
534
Craig Tiller1a727fd2015-04-24 13:21:22 -0700535 if (!call->receiving && need_more_data(call)) {
Craig Tiller83f88d92015-04-21 16:02:05 -0700536 op.recv_ops = &call->recv_ops;
537 op.recv_state = &call->recv_state;
Craig Tiller1e6facb2015-06-11 22:47:11 -0700538 op.on_done_recv = &call->on_done_recv;
Craig Tiller4efb6962015-06-03 09:32:41 -0700539 if (grpc_bbq_empty(&call->incoming_queue) && call->reading_message) {
540 op.max_recv_bytes = call->incoming_message_length -
541 call->incoming_message.length + MAX_RECV_PEEK_AHEAD;
542 } else {
543 buffered_bytes = grpc_bbq_bytes(&call->incoming_queue);
544 if (buffered_bytes > MAX_RECV_PEEK_AHEAD) {
545 op.max_recv_bytes = 0;
546 } else {
547 op.max_recv_bytes = MAX_RECV_PEEK_AHEAD - buffered_bytes;
548 }
549 }
Craig Tiller629b0ed2015-04-22 11:14:26 -0700550 call->receiving = 1;
Craig Tiller4df412b2015-04-28 07:57:54 -0700551 GRPC_CALL_INTERNAL_REF(call, "receiving");
Craig Tiller83f88d92015-04-21 16:02:05 -0700552 start_op = 1;
553 }
554
555 if (!call->sending) {
556 if (fill_send_ops(call, &op)) {
557 call->sending = 1;
Craig Tiller4df412b2015-04-28 07:57:54 -0700558 GRPC_CALL_INTERNAL_REF(call, "sending");
Craig Tiller83f88d92015-04-21 16:02:05 -0700559 start_op = 1;
560 }
Craig Tillercffbcb72015-01-29 23:16:33 -0800561 }
Craig Tiller2e103572015-01-29 14:12:07 -0800562
Craig Tiller5dde66e2015-06-02 09:05:23 -0700563 if (!call->bound_pollset && call->cq && (!call->is_client || start_op)) {
564 call->bound_pollset = 1;
565 op.bind_pollset = grpc_cq_pollset(call->cq);
566 start_op = 1;
567 }
568
Craig Tiller991ca9f2015-03-03 09:59:22 -0800569 if (!call->completing && call->num_completed_requests != 0) {
570 completing_requests = call->num_completed_requests;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800571 memcpy(completed_requests, call->completed_requests,
572 sizeof(completed_requests));
573 call->num_completed_requests = 0;
Craig Tiller991ca9f2015-03-03 09:59:22 -0800574 call->completing = 1;
Craig Tiller4df412b2015-04-28 07:57:54 -0700575 GRPC_CALL_INTERNAL_REF(call, "completing");
Craig Tiller8eb9d472015-01-27 17:00:03 -0800576 }
577
Craig Tiller8eb9d472015-01-27 17:00:03 -0800578 gpr_mu_unlock(&call->mu);
579
Craig Tiller77f04612015-07-01 13:39:45 -0700580 if (cancel_alarm) {
581 grpc_alarm_cancel(&call->alarm);
582 }
583
Craig Tiller83f88d92015-04-21 16:02:05 -0700584 if (start_op) {
585 execute_op(call, &op);
Craig Tiller8eb9d472015-01-27 17:00:03 -0800586 }
587
Craig Tiller991ca9f2015-03-03 09:59:22 -0800588 if (completing_requests > 0) {
589 for (i = 0; i < completing_requests; i++) {
Craig Tiller64be9f72015-05-04 14:53:51 -0700590 completed_requests[i].on_complete(call, completed_requests[i].success,
Craig Tiller991ca9f2015-03-03 09:59:22 -0800591 completed_requests[i].user_data);
592 }
593 lock(call);
594 call->completing = 0;
595 unlock(call);
Craig Tiller4df412b2015-04-28 07:57:54 -0700596 GRPC_CALL_INTERNAL_UNREF(call, "completing", 0);
Craig Tiller8eb9d472015-01-27 17:00:03 -0800597 }
598}
Craig Tillercce17ac2015-01-20 09:29:28 -0800599
Craig Tillerfb189f82015-02-03 12:07:07 -0800600static void get_final_status(grpc_call *call, grpc_ioreq_data out) {
Craig Tiller68752722015-01-29 14:59:54 -0800601 int i;
602 for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
Craig Tillerdaceea82015-02-02 16:15:53 -0800603 if (call->status[i].is_set) {
Craig Tillerfb189f82015-02-03 12:07:07 -0800604 out.recv_status.set_value(call->status[i].code,
605 out.recv_status.user_data);
606 return;
607 }
608 }
Craig Tillerde343162015-02-09 23:37:22 -0800609 if (call->is_client) {
610 out.recv_status.set_value(GRPC_STATUS_UNKNOWN, out.recv_status.user_data);
611 } else {
612 out.recv_status.set_value(GRPC_STATUS_OK, out.recv_status.user_data);
613 }
Craig Tillerfb189f82015-02-03 12:07:07 -0800614}
615
616static void get_final_details(grpc_call *call, grpc_ioreq_data out) {
617 int i;
618 for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
619 if (call->status[i].is_set) {
Craig Tiller1e0d4c42015-01-30 16:17:29 -0800620 if (call->status[i].details) {
621 gpr_slice details = call->status[i].details->slice;
622 size_t len = GPR_SLICE_LENGTH(details);
Craig Tillerfb189f82015-02-03 12:07:07 -0800623 if (len + 1 > *out.recv_status_details.details_capacity) {
624 *out.recv_status_details.details_capacity = GPR_MAX(
625 len + 1, *out.recv_status_details.details_capacity * 3 / 2);
626 *out.recv_status_details.details =
627 gpr_realloc(*out.recv_status_details.details,
628 *out.recv_status_details.details_capacity);
Craig Tiller1e0d4c42015-01-30 16:17:29 -0800629 }
Craig Tillerfb189f82015-02-03 12:07:07 -0800630 memcpy(*out.recv_status_details.details, GPR_SLICE_START_PTR(details),
631 len);
632 (*out.recv_status_details.details)[len] = 0;
Craig Tiller1e0d4c42015-01-30 16:17:29 -0800633 } else {
634 goto no_details;
635 }
Craig Tiller68752722015-01-29 14:59:54 -0800636 return;
637 }
638 }
Craig Tiller1e0d4c42015-01-30 16:17:29 -0800639
640no_details:
Craig Tillerfb189f82015-02-03 12:07:07 -0800641 if (0 == *out.recv_status_details.details_capacity) {
642 *out.recv_status_details.details_capacity = 8;
643 *out.recv_status_details.details =
644 gpr_malloc(*out.recv_status_details.details_capacity);
Craig Tiller1e0d4c42015-01-30 16:17:29 -0800645 }
Craig Tillerfb189f82015-02-03 12:07:07 -0800646 **out.recv_status_details.details = 0;
Craig Tiller68752722015-01-29 14:59:54 -0800647}
648
Craig Tillerc12fee62015-02-03 11:55:50 -0800649static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op,
Craig Tiller64be9f72015-05-04 14:53:51 -0700650 int success) {
Craig Tillerc12fee62015-02-03 11:55:50 -0800651 completed_request *cr;
652 gpr_uint8 master_set = call->request_set[op];
653 reqinfo_master *master;
654 size_t i;
655 /* ioreq is live: we need to do something */
656 master = &call->masters[master_set];
Craig Tillerd642dcf2015-02-03 20:39:09 -0800657 master->complete_mask |= 1u << op;
Craig Tiller64be9f72015-05-04 14:53:51 -0700658 if (!success) {
659 master->success = 0;
Craig Tillerc12fee62015-02-03 11:55:50 -0800660 }
661 if (master->complete_mask == master->need_mask) {
662 for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
663 if (call->request_set[i] != master_set) {
664 continue;
665 }
666 call->request_set[i] = REQSET_DONE;
667 switch ((grpc_ioreq_op)i) {
668 case GRPC_IOREQ_RECV_MESSAGE:
669 case GRPC_IOREQ_SEND_MESSAGE:
Craig Tillerfd7166d2015-05-19 10:23:03 -0700670 call->request_set[i] = REQSET_EMPTY;
671 if (!master->success) {
Craig Tillerc12fee62015-02-03 11:55:50 -0800672 call->write_state = WRITE_STATE_WRITE_CLOSED;
673 }
674 break;
Craig Tilleraea081f2015-06-11 14:19:33 -0700675 case GRPC_IOREQ_SEND_STATUS:
676 if (call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details !=
677 NULL) {
Craig Tiller1a65a232015-07-06 10:22:32 -0700678 GRPC_MDSTR_UNREF(
Craig Tilleraea081f2015-06-11 14:19:33 -0700679 call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details);
680 call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details =
681 NULL;
682 }
683 break;
Craig Tillerc12fee62015-02-03 11:55:50 -0800684 case GRPC_IOREQ_RECV_CLOSE:
685 case GRPC_IOREQ_SEND_INITIAL_METADATA:
686 case GRPC_IOREQ_SEND_TRAILING_METADATA:
Craig Tillerc12fee62015-02-03 11:55:50 -0800687 case GRPC_IOREQ_SEND_CLOSE:
688 break;
689 case GRPC_IOREQ_RECV_STATUS:
Craig Tillerfb189f82015-02-03 12:07:07 -0800690 get_final_status(call, call->request_data[GRPC_IOREQ_RECV_STATUS]);
691 break;
692 case GRPC_IOREQ_RECV_STATUS_DETAILS:
693 get_final_details(call,
694 call->request_data[GRPC_IOREQ_RECV_STATUS_DETAILS]);
Craig Tillerc12fee62015-02-03 11:55:50 -0800695 break;
696 case GRPC_IOREQ_RECV_INITIAL_METADATA:
Craig Tillerbae41c82015-04-28 13:22:25 -0700697 GPR_SWAP(grpc_metadata_array, call->buffered_metadata[0],
Craig Tillerf9fae982015-05-29 23:02:18 -0700698 *call->request_data[GRPC_IOREQ_RECV_INITIAL_METADATA]
699 .recv_metadata);
Craig Tillerc12fee62015-02-03 11:55:50 -0800700 break;
701 case GRPC_IOREQ_RECV_TRAILING_METADATA:
Craig Tillerbae41c82015-04-28 13:22:25 -0700702 GPR_SWAP(grpc_metadata_array, call->buffered_metadata[1],
Craig Tillerf9fae982015-05-29 23:02:18 -0700703 *call->request_data[GRPC_IOREQ_RECV_TRAILING_METADATA]
704 .recv_metadata);
Craig Tillerc12fee62015-02-03 11:55:50 -0800705 break;
706 case GRPC_IOREQ_OP_COUNT:
707 abort();
708 break;
709 }
710 }
711 cr = &call->completed_requests[call->num_completed_requests++];
Craig Tiller64be9f72015-05-04 14:53:51 -0700712 cr->success = master->success;
Craig Tillerc12fee62015-02-03 11:55:50 -0800713 cr->on_complete = master->on_complete;
714 cr->user_data = master->user_data;
715 }
716}
717
Craig Tiller64be9f72015-05-04 14:53:51 -0700718static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op, int success) {
Craig Tillerc12fee62015-02-03 11:55:50 -0800719 if (is_op_live(call, op)) {
Craig Tiller64be9f72015-05-04 14:53:51 -0700720 finish_live_ioreq_op(call, op, success);
Craig Tillercce17ac2015-01-20 09:29:28 -0800721 }
722}
723
Craig Tiller27e14f42015-05-21 14:37:39 -0700724static void early_out_write_ops(grpc_call *call) {
725 switch (call->write_state) {
726 case WRITE_STATE_WRITE_CLOSED:
727 finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, 0);
728 finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, 0);
729 finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, 0);
730 finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1);
731 /* fallthrough */
732 case WRITE_STATE_STARTED:
733 finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, 0);
734 /* fallthrough */
735 case WRITE_STATE_INITIAL:
736 /* do nothing */
737 break;
738 }
739}
740
Craig Tiller58ce3f02015-04-22 07:54:24 -0700741static void call_on_done_send(void *pc, int success) {
742 grpc_call *call = pc;
Craig Tiller1e0d4c42015-01-30 16:17:29 -0800743 lock(call);
Craig Tiller58ce3f02015-04-22 07:54:24 -0700744 if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_INITIAL_METADATA)) {
Craig Tiller64be9f72015-05-04 14:53:51 -0700745 finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, success);
Craig Tiller134e92f2015-05-11 14:24:34 -0700746 call->write_state = WRITE_STATE_STARTED;
Craig Tiller58ce3f02015-04-22 07:54:24 -0700747 }
748 if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_MESSAGE)) {
Craig Tiller64be9f72015-05-04 14:53:51 -0700749 finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, success);
Craig Tiller58ce3f02015-04-22 07:54:24 -0700750 }
751 if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_CLOSE)) {
Craig Tiller64be9f72015-05-04 14:53:51 -0700752 finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, success);
753 finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, success);
754 finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1);
Craig Tiller134e92f2015-05-11 14:24:34 -0700755 call->write_state = WRITE_STATE_WRITE_CLOSED;
756 }
757 if (!success) {
758 call->write_state = WRITE_STATE_WRITE_CLOSED;
Craig Tiller27e14f42015-05-21 14:37:39 -0700759 early_out_write_ops(call);
Craig Tiller58ce3f02015-04-22 07:54:24 -0700760 }
Craig Tiller644cad62015-05-20 17:57:54 -0700761 call->send_ops.nops = 0;
Craig Tiller3928c7a2015-04-23 16:00:47 -0700762 call->last_send_contains = 0;
Craig Tiller1e0d4c42015-01-30 16:17:29 -0800763 call->sending = 0;
764 unlock(call);
Craig Tiller4df412b2015-04-28 07:57:54 -0700765 GRPC_CALL_INTERNAL_UNREF(call, "sending", 0);
Craig Tiller1e0d4c42015-01-30 16:17:29 -0800766}
767
Craig Tiller629b0ed2015-04-22 11:14:26 -0700768static void finish_message(grpc_call *call) {
yang-g0b6ad7d2015-06-25 14:39:01 -0700769 if (call->error_status_set == 0) {
770 /* TODO(ctiller): this could be a lot faster if coded directly */
David Garcia Quintas8a187092015-07-01 14:52:44 -0700771 grpc_byte_buffer *byte_buffer;
772 /* some aliases for readability */
773 gpr_slice *slices = call->incoming_message.slices;
774 const size_t nslices = call->incoming_message.count;
David Garcia Quintas658b6082015-07-15 13:46:00 -0700775
David Garcia Quintasd317e752015-07-15 00:09:27 -0700776 if ((call->incoming_message_flags & GRPC_WRITE_INTERNAL_COMPRESS) &&
777 (call->compression_algorithm > GRPC_COMPRESS_NONE)) {
David Garcia Quintas8a187092015-07-01 14:52:44 -0700778 byte_buffer = grpc_raw_compressed_byte_buffer_create(
779 slices, nslices, call->compression_algorithm);
780 } else {
781 byte_buffer = grpc_raw_byte_buffer_create(slices, nslices);
782 }
yang-g0b6ad7d2015-06-25 14:39:01 -0700783 grpc_bbq_push(&call->incoming_queue, byte_buffer);
784 }
Craig Tiller629b0ed2015-04-22 11:14:26 -0700785 gpr_slice_buffer_reset_and_unref(&call->incoming_message);
Craig Tiller629b0ed2015-04-22 11:14:26 -0700786 GPR_ASSERT(call->incoming_message.count == 0);
787 call->reading_message = 0;
788}
789
790static int begin_message(grpc_call *call, grpc_begin_message msg) {
791 /* can't begin a message when we're still reading a message */
792 if (call->reading_message) {
793 char *message = NULL;
794 gpr_asprintf(
795 &message, "Message terminated early; read %d bytes, expected %d",
796 (int)call->incoming_message.length, (int)call->incoming_message_length);
Craig Tiller5dde66e2015-06-02 09:05:23 -0700797 cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message);
Craig Tiller629b0ed2015-04-22 11:14:26 -0700798 gpr_free(message);
799 return 0;
800 }
David Garcia Quintasf74a49e2015-06-18 17:22:45 -0700801 /* sanity check: if message flags indicate a compressed message, the
802 * compression level should already be present in the call, as parsed off its
803 * corresponding metadata. */
804 if ((msg.flags & GRPC_WRITE_INTERNAL_COMPRESS) &&
David Garcia Quintasd16af0e2015-06-22 22:39:21 -0700805 (call->compression_algorithm == GRPC_COMPRESS_NONE)) {
David Garcia Quintasf74a49e2015-06-18 17:22:45 -0700806 char *message = NULL;
David Garcia Quintasfc0fa332015-06-25 18:11:07 -0700807 char *alg_name;
David Garcia Quintasd317e752015-07-15 00:09:27 -0700808 if (!grpc_compression_algorithm_name(call->compression_algorithm,
809 &alg_name)) {
David Garcia Quintasfc0fa332015-06-25 18:11:07 -0700810 /* This shouldn't happen, other than due to data corruption */
811 alg_name = "<unknown>";
812 }
813 gpr_asprintf(&message,
814 "Invalid compression algorithm (%s) for compressed message.",
815 alg_name);
David Garcia Quintasd317e752015-07-15 00:09:27 -0700816 cancel_with_status(call, GRPC_STATUS_INTERNAL, message);
David Garcia Quintas45b477e2015-07-15 22:13:34 -0700817 gpr_free(message);
818 return 0;
David Garcia Quintasf74a49e2015-06-18 17:22:45 -0700819 }
Craig Tiller629b0ed2015-04-22 11:14:26 -0700820 /* stash away parameters, and prepare for incoming slices */
821 if (msg.length > grpc_channel_get_max_message_length(call->channel)) {
822 char *message = NULL;
823 gpr_asprintf(
824 &message,
825 "Maximum message length of %d exceeded by a message of length %d",
826 grpc_channel_get_max_message_length(call->channel), msg.length);
Craig Tiller5dde66e2015-06-02 09:05:23 -0700827 cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message);
Craig Tiller629b0ed2015-04-22 11:14:26 -0700828 gpr_free(message);
829 return 0;
830 } else if (msg.length > 0) {
831 call->reading_message = 1;
832 call->incoming_message_length = msg.length;
David Garcia Quintas1d5aca52015-06-14 14:42:04 -0700833 call->incoming_message_flags = msg.flags;
Craig Tiller629b0ed2015-04-22 11:14:26 -0700834 return 1;
835 } else {
836 finish_message(call);
837 return 1;
838 }
839}
840
841static int add_slice_to_message(grpc_call *call, gpr_slice slice) {
842 if (GPR_SLICE_LENGTH(slice) == 0) {
843 gpr_slice_unref(slice);
844 return 1;
845 }
846 /* we have to be reading a message to know what to do here */
847 if (!call->reading_message) {
Craig Tillerf9fae982015-05-29 23:02:18 -0700848 cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT,
Craig Tiller5dde66e2015-06-02 09:05:23 -0700849 "Received payload data while not reading a message");
Craig Tiller629b0ed2015-04-22 11:14:26 -0700850 return 0;
851 }
852 /* append the slice to the incoming buffer */
853 gpr_slice_buffer_add(&call->incoming_message, slice);
854 if (call->incoming_message.length > call->incoming_message_length) {
855 /* if we got too many bytes, complain */
856 char *message = NULL;
857 gpr_asprintf(
858 &message, "Receiving message overflow; read %d bytes, expected %d",
859 (int)call->incoming_message.length, (int)call->incoming_message_length);
Craig Tiller5dde66e2015-06-02 09:05:23 -0700860 cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message);
Craig Tiller629b0ed2015-04-22 11:14:26 -0700861 gpr_free(message);
862 return 0;
863 } else if (call->incoming_message.length == call->incoming_message_length) {
864 finish_message(call);
865 return 1;
866 } else {
867 return 1;
868 }
869}
870
871static void call_on_done_recv(void *pc, int success) {
872 grpc_call *call = pc;
873 size_t i;
David Garcia Quintasf667f1b2015-05-04 13:15:46 -0700874 GRPC_TIMER_BEGIN(GRPC_PTAG_CALL_ON_DONE_RECV, 0);
Craig Tiller629b0ed2015-04-22 11:14:26 -0700875 lock(call);
Craig Tiller6e84aba2015-04-23 15:08:17 -0700876 call->receiving = 0;
Craig Tiller48b9fde2015-04-24 08:04:59 -0700877 if (success) {
878 for (i = 0; success && i < call->recv_ops.nops; i++) {
879 grpc_stream_op *op = &call->recv_ops.ops[i];
880 switch (op->type) {
881 case GRPC_NO_OP:
882 break;
883 case GRPC_OP_METADATA:
884 recv_metadata(call, &op->data.metadata);
885 break;
886 case GRPC_OP_BEGIN_MESSAGE:
887 success = begin_message(call, op->data.begin_message);
888 break;
889 case GRPC_OP_SLICE:
890 success = add_slice_to_message(call, op->data.slice);
891 break;
892 }
Craig Tiller629b0ed2015-04-22 11:14:26 -0700893 }
Craig Tillerc5f3e262015-05-07 10:15:00 -0700894 if (!success) {
895 grpc_stream_ops_unref_owned_objects(&call->recv_ops.ops[i],
896 call->recv_ops.nops - i);
897 }
Craig Tiller48b9fde2015-04-24 08:04:59 -0700898 if (call->recv_state == GRPC_STREAM_RECV_CLOSED) {
899 GPR_ASSERT(call->read_state <= READ_STATE_READ_CLOSED);
900 call->read_state = READ_STATE_READ_CLOSED;
901 }
902 if (call->recv_state == GRPC_STREAM_CLOSED) {
903 GPR_ASSERT(call->read_state <= READ_STATE_STREAM_CLOSED);
904 call->read_state = READ_STATE_STREAM_CLOSED;
Craig Tiller77f04612015-07-01 13:39:45 -0700905 call->cancel_alarm |= call->have_alarm;
Craig Tillerf3fba742015-06-11 09:36:33 -0700906 GRPC_CALL_INTERNAL_UNREF(call, "closed", 0);
Craig Tiller48b9fde2015-04-24 08:04:59 -0700907 }
908 finish_read_ops(call);
909 } else {
Craig Tiller64be9f72015-05-04 14:53:51 -0700910 finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, 0);
911 finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, 0);
912 finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, 0);
913 finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, 0);
914 finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, 0);
915 finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS_DETAILS, 0);
Craig Tiller629b0ed2015-04-22 11:14:26 -0700916 }
Craig Tillerc1f75602015-04-24 11:44:53 -0700917 call->recv_ops.nops = 0;
Craig Tiller629b0ed2015-04-22 11:14:26 -0700918 unlock(call);
919
Craig Tiller4df412b2015-04-28 07:57:54 -0700920 GRPC_CALL_INTERNAL_UNREF(call, "receiving", 0);
Yu Zhoua7c10622015-05-29 16:22:26 -0700921 GRPC_TIMER_END(GRPC_PTAG_CALL_ON_DONE_RECV, 0);
Craig Tiller629b0ed2015-04-22 11:14:26 -0700922}
923
Craig Tillerb96d0012015-05-06 15:33:23 -0700924static int prepare_application_metadata(grpc_call *call, size_t count,
925 grpc_metadata *metadata) {
Craig Tiller83f88d92015-04-21 16:02:05 -0700926 size_t i;
Craig Tiller83f88d92015-04-21 16:02:05 -0700927 for (i = 0; i < count; i++) {
928 grpc_metadata *md = &metadata[i];
929 grpc_metadata *next_md = (i == count - 1) ? NULL : &metadata[i + 1];
930 grpc_metadata *prev_md = (i == 0) ? NULL : &metadata[i - 1];
931 grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data;
Craig Tillerd2b11fa2015-04-21 13:45:19 -0700932 GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data));
Craig Tiller83f88d92015-04-21 16:02:05 -0700933 l->md = grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key,
934 (const gpr_uint8 *)md->value,
935 md->value_length);
Craig Tillerb96d0012015-05-06 15:33:23 -0700936 if (!grpc_mdstr_is_legal_header(l->md->key)) {
937 gpr_log(GPR_ERROR, "attempt to send invalid metadata key");
938 return 0;
939 } else if (!grpc_mdstr_is_bin_suffixed(l->md->key) &&
940 !grpc_mdstr_is_legal_header(l->md->value)) {
941 gpr_log(GPR_ERROR, "attempt to send invalid metadata value");
942 return 0;
943 }
Craig Tiller83f88d92015-04-21 16:02:05 -0700944 l->next = next_md ? (grpc_linked_mdelem *)&next_md->internal_data : NULL;
945 l->prev = prev_md ? (grpc_linked_mdelem *)&prev_md->internal_data : NULL;
946 }
Craig Tillerb96d0012015-05-06 15:33:23 -0700947 return 1;
948}
949
950static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count,
951 grpc_metadata *metadata) {
952 grpc_mdelem_list out;
953 if (count == 0) {
954 out.head = out.tail = NULL;
955 return out;
956 }
Craig Tiller83f88d92015-04-21 16:02:05 -0700957 out.head = (grpc_linked_mdelem *)&(metadata[0].internal_data);
958 out.tail = (grpc_linked_mdelem *)&(metadata[count - 1].internal_data);
959 return out;
960}
961
Craig Tiller629b0ed2015-04-22 11:14:26 -0700962/* Copy the contents of a byte buffer into stream ops */
963static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer,
964 grpc_stream_op_buffer *sopb) {
965 size_t i;
966
967 switch (byte_buffer->type) {
David Garcia Quintas59f905d2015-06-08 16:31:19 -0700968 case GRPC_BB_RAW:
969 for (i = 0; i < byte_buffer->data.raw.slice_buffer.count; i++) {
970 gpr_slice slice = byte_buffer->data.raw.slice_buffer.slices[i];
Craig Tiller629b0ed2015-04-22 11:14:26 -0700971 gpr_slice_ref(slice);
972 grpc_sopb_add_slice(sopb, slice);
973 }
974 break;
975 }
976}
977
Craig Tillerb7959a02015-06-25 08:50:54 -0700978static int fill_send_ops(grpc_call *call, grpc_transport_stream_op *op) {
Craig Tiller83f88d92015-04-21 16:02:05 -0700979 grpc_ioreq_data data;
David Garcia Quintas1d5aca52015-06-14 14:42:04 -0700980 gpr_uint32 flags;
Craig Tiller83f88d92015-04-21 16:02:05 -0700981 grpc_metadata_batch mdb;
982 size_t i;
983 GPR_ASSERT(op->send_ops == NULL);
984
985 switch (call->write_state) {
986 case WRITE_STATE_INITIAL:
987 if (!is_op_live(call, GRPC_IOREQ_SEND_INITIAL_METADATA)) {
988 break;
989 }
990 data = call->request_data[GRPC_IOREQ_SEND_INITIAL_METADATA];
Craig Tiller629b0ed2015-04-22 11:14:26 -0700991 mdb.list = chain_metadata_from_app(call, data.send_metadata.count,
992 data.send_metadata.metadata);
Craig Tiller83f88d92015-04-21 16:02:05 -0700993 mdb.garbage.head = mdb.garbage.tail = NULL;
994 mdb.deadline = call->send_deadline;
995 for (i = 0; i < call->send_initial_metadata_count; i++) {
Craig Tiller629b0ed2015-04-22 11:14:26 -0700996 grpc_metadata_batch_link_head(&mdb, &call->send_initial_metadata[i]);
Craig Tiller83f88d92015-04-21 16:02:05 -0700997 }
998 grpc_sopb_add_metadata(&call->send_ops, mdb);
999 op->send_ops = &call->send_ops;
Craig Tiller58ce3f02015-04-22 07:54:24 -07001000 call->last_send_contains |= 1 << GRPC_IOREQ_SEND_INITIAL_METADATA;
Craig Tillerc1f75602015-04-24 11:44:53 -07001001 call->send_initial_metadata_count = 0;
Craig Tiller629b0ed2015-04-22 11:14:26 -07001002 /* fall through intended */
Craig Tiller83f88d92015-04-21 16:02:05 -07001003 case WRITE_STATE_STARTED:
1004 if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE)) {
1005 data = call->request_data[GRPC_IOREQ_SEND_MESSAGE];
David Garcia Quintas1d5aca52015-06-14 14:42:04 -07001006 flags = call->request_flags[GRPC_IOREQ_SEND_MESSAGE];
Craig Tiller629b0ed2015-04-22 11:14:26 -07001007 grpc_sopb_add_begin_message(
David Garcia Quintas1d5aca52015-06-14 14:42:04 -07001008 &call->send_ops, grpc_byte_buffer_length(data.send_message), flags);
Craig Tiller629b0ed2015-04-22 11:14:26 -07001009 copy_byte_buffer_to_stream_ops(data.send_message, &call->send_ops);
Craig Tiller58ce3f02015-04-22 07:54:24 -07001010 op->send_ops = &call->send_ops;
1011 call->last_send_contains |= 1 << GRPC_IOREQ_SEND_MESSAGE;
1012 }
1013 if (is_op_live(call, GRPC_IOREQ_SEND_CLOSE)) {
1014 op->is_last_send = 1;
1015 op->send_ops = &call->send_ops;
1016 call->last_send_contains |= 1 << GRPC_IOREQ_SEND_CLOSE;
Craig Tiller58ce3f02015-04-22 07:54:24 -07001017 if (!call->is_client) {
1018 /* send trailing metadata */
1019 data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA];
Craig Tiller629b0ed2015-04-22 11:14:26 -07001020 mdb.list = chain_metadata_from_app(call, data.send_metadata.count,
1021 data.send_metadata.metadata);
Craig Tiller58ce3f02015-04-22 07:54:24 -07001022 mdb.garbage.head = mdb.garbage.tail = NULL;
Craig Tiller143e7bf2015-07-13 08:41:49 -07001023 mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
Craig Tiller58ce3f02015-04-22 07:54:24 -07001024 /* send status */
1025 /* TODO(ctiller): cache common status values */
1026 data = call->request_data[GRPC_IOREQ_SEND_STATUS];
Craig Tiller58ce3f02015-04-22 07:54:24 -07001027 grpc_metadata_batch_add_tail(
1028 &mdb, &call->status_link,
Craig Tiller3fc8e822015-06-08 16:31:28 -07001029 grpc_channel_get_reffed_status_elem(call->channel,
1030 data.send_status.code));
Craig Tiller58ce3f02015-04-22 07:54:24 -07001031 if (data.send_status.details) {
1032 grpc_metadata_batch_add_tail(
1033 &mdb, &call->details_link,
1034 grpc_mdelem_from_metadata_strings(
1035 call->metadata_context,
Craig Tiller1a65a232015-07-06 10:22:32 -07001036 GRPC_MDSTR_REF(
Craig Tiller629b0ed2015-04-22 11:14:26 -07001037 grpc_channel_get_message_string(call->channel)),
Craig Tilleraea081f2015-06-11 14:19:33 -07001038 data.send_status.details));
1039 call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details =
1040 NULL;
Craig Tiller58ce3f02015-04-22 07:54:24 -07001041 }
Craig Tiller7e8489a2015-04-23 12:41:16 -07001042 grpc_sopb_add_metadata(&call->send_ops, mdb);
Craig Tillerde648622015-02-05 09:32:10 -08001043 }
Craig Tillerc12fee62015-02-03 11:55:50 -08001044 }
Craig Tiller58ce3f02015-04-22 07:54:24 -07001045 break;
Craig Tillerc12fee62015-02-03 11:55:50 -08001046 case WRITE_STATE_WRITE_CLOSED:
Craig Tiller8eb9d472015-01-27 17:00:03 -08001047 break;
Craig Tiller62ac1552015-01-27 15:41:44 -08001048 }
Craig Tiller58ce3f02015-04-22 07:54:24 -07001049 if (op->send_ops) {
Craig Tiller1e6facb2015-06-11 22:47:11 -07001050 op->on_done_send = &call->on_done_send;
Craig Tiller58ce3f02015-04-22 07:54:24 -07001051 }
1052 return op->send_ops != NULL;
Craig Tillercce17ac2015-01-20 09:29:28 -08001053}
1054
1055static grpc_call_error start_ioreq_error(grpc_call *call,
1056 gpr_uint32 mutated_ops,
1057 grpc_call_error ret) {
1058 size_t i;
1059 for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
Craig Tillerd642dcf2015-02-03 20:39:09 -08001060 if (mutated_ops & (1u << i)) {
Craig Tillerc12fee62015-02-03 11:55:50 -08001061 call->request_set[i] = REQSET_EMPTY;
Craig Tillercce17ac2015-01-20 09:29:28 -08001062 }
1063 }
Craig Tillercce17ac2015-01-20 09:29:28 -08001064 return ret;
1065}
1066
Craig Tillerc12fee62015-02-03 11:55:50 -08001067static void finish_read_ops(grpc_call *call) {
1068 int empty;
1069
1070 if (is_op_live(call, GRPC_IOREQ_RECV_MESSAGE)) {
1071 empty =
1072 (NULL == (*call->request_data[GRPC_IOREQ_RECV_MESSAGE].recv_message =
1073 grpc_bbq_pop(&call->incoming_queue)));
1074 if (!empty) {
Craig Tiller64be9f72015-05-04 14:53:51 -07001075 finish_live_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, 1);
Craig Tillerc12fee62015-02-03 11:55:50 -08001076 empty = grpc_bbq_empty(&call->incoming_queue);
1077 }
1078 } else {
1079 empty = grpc_bbq_empty(&call->incoming_queue);
1080 }
1081
1082 switch (call->read_state) {
1083 case READ_STATE_STREAM_CLOSED:
Craig Tiller77f04612015-07-01 13:39:45 -07001084 if (empty && !call->have_alarm) {
Craig Tiller64be9f72015-05-04 14:53:51 -07001085 finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, 1);
Craig Tillerc12fee62015-02-03 11:55:50 -08001086 }
1087 /* fallthrough */
1088 case READ_STATE_READ_CLOSED:
1089 if (empty) {
Craig Tiller64be9f72015-05-04 14:53:51 -07001090 finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, 1);
Craig Tillerc12fee62015-02-03 11:55:50 -08001091 }
Craig Tiller64be9f72015-05-04 14:53:51 -07001092 finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, 1);
1093 finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS_DETAILS, 1);
1094 finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, 1);
Craig Tillerc12fee62015-02-03 11:55:50 -08001095 /* fallthrough */
1096 case READ_STATE_GOT_INITIAL_METADATA:
Craig Tiller64be9f72015-05-04 14:53:51 -07001097 finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, 1);
Craig Tillerc12fee62015-02-03 11:55:50 -08001098 /* fallthrough */
1099 case READ_STATE_INITIAL:
1100 /* do nothing */
1101 break;
1102 }
1103}
1104
Craig Tiller8eb9d472015-01-27 17:00:03 -08001105static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
1106 size_t nreqs,
1107 grpc_ioreq_completion_func completion,
Craig Tiller9cc61412015-02-02 14:02:52 -08001108 void *user_data) {
Craig Tillercce17ac2015-01-20 09:29:28 -08001109 size_t i;
1110 gpr_uint32 have_ops = 0;
Craig Tillercce17ac2015-01-20 09:29:28 -08001111 grpc_ioreq_op op;
Craig Tillerc18c56e2015-02-02 15:59:13 -08001112 reqinfo_master *master;
Craig Tillercce17ac2015-01-20 09:29:28 -08001113 grpc_ioreq_data data;
Craig Tiller1e0d4c42015-01-30 16:17:29 -08001114 gpr_uint8 set;
1115
1116 if (nreqs == 0) {
1117 return GRPC_CALL_OK;
1118 }
1119
1120 set = reqs[0].op;
Craig Tillercce17ac2015-01-20 09:29:28 -08001121
1122 for (i = 0; i < nreqs; i++) {
1123 op = reqs[i].op;
Craig Tillerc12fee62015-02-03 11:55:50 -08001124 if (call->request_set[op] < GRPC_IOREQ_OP_COUNT) {
Craig Tillercce17ac2015-01-20 09:29:28 -08001125 return start_ioreq_error(call, have_ops,
1126 GRPC_CALL_ERROR_TOO_MANY_OPERATIONS);
Craig Tillerc12fee62015-02-03 11:55:50 -08001127 } else if (call->request_set[op] == REQSET_DONE) {
Craig Tiller1c141902015-01-31 08:51:54 -08001128 return start_ioreq_error(call, have_ops, GRPC_CALL_ERROR_ALREADY_INVOKED);
Craig Tillercce17ac2015-01-20 09:29:28 -08001129 }
Craig Tillercce17ac2015-01-20 09:29:28 -08001130 data = reqs[i].data;
Craig Tillerb96d0012015-05-06 15:33:23 -07001131 if (op == GRPC_IOREQ_SEND_INITIAL_METADATA ||
1132 op == GRPC_IOREQ_SEND_TRAILING_METADATA) {
1133 if (!prepare_application_metadata(call, data.send_metadata.count,
1134 data.send_metadata.metadata)) {
1135 return start_ioreq_error(call, have_ops,
1136 GRPC_CALL_ERROR_INVALID_METADATA);
1137 }
1138 }
Craig Tilleraea081f2015-06-11 14:19:33 -07001139 if (op == GRPC_IOREQ_SEND_STATUS) {
1140 set_status_code(call, STATUS_FROM_SERVER_STATUS,
1141 reqs[i].data.send_status.code);
1142 if (reqs[i].data.send_status.details) {
1143 set_status_details(call, STATUS_FROM_SERVER_STATUS,
Craig Tiller1a65a232015-07-06 10:22:32 -07001144 GRPC_MDSTR_REF(reqs[i].data.send_status.details));
Craig Tilleraea081f2015-06-11 14:19:33 -07001145 }
1146 }
Craig Tillerb96d0012015-05-06 15:33:23 -07001147 have_ops |= 1u << op;
Craig Tillercce17ac2015-01-20 09:29:28 -08001148
Craig Tillerc12fee62015-02-03 11:55:50 -08001149 call->request_data[op] = data;
David Garcia Quintas1d5aca52015-06-14 14:42:04 -07001150 call->request_flags[op] = reqs[i].flags;
Craig Tillerc12fee62015-02-03 11:55:50 -08001151 call->request_set[op] = set;
Craig Tillercce17ac2015-01-20 09:29:28 -08001152 }
1153
Craig Tillerc18c56e2015-02-02 15:59:13 -08001154 master = &call->masters[set];
Craig Tiller64be9f72015-05-04 14:53:51 -07001155 master->success = 1;
Craig Tillercce17ac2015-01-20 09:29:28 -08001156 master->need_mask = have_ops;
Craig Tiller1b409442015-01-29 14:36:17 -08001157 master->complete_mask = 0;
Craig Tillercce17ac2015-01-20 09:29:28 -08001158 master->on_complete = completion;
1159 master->user_data = user_data;
1160
Craig Tillerc12fee62015-02-03 11:55:50 -08001161 finish_read_ops(call);
1162 early_out_write_ops(call);
1163
Craig Tillercce17ac2015-01-20 09:29:28 -08001164 return GRPC_CALL_OK;
1165}
1166
Craig Tillercce17ac2015-01-20 09:29:28 -08001167grpc_call_error grpc_call_start_ioreq_and_call_back(
1168 grpc_call *call, const grpc_ioreq *reqs, size_t nreqs,
1169 grpc_ioreq_completion_func on_complete, void *user_data) {
Craig Tiller8eb9d472015-01-27 17:00:03 -08001170 grpc_call_error err;
1171 lock(call);
Craig Tiller9cc61412015-02-02 14:02:52 -08001172 err = start_ioreq(call, reqs, nreqs, on_complete, user_data);
Craig Tiller8eb9d472015-01-27 17:00:03 -08001173 unlock(call);
1174 return err;
Craig Tillercce17ac2015-01-20 09:29:28 -08001175}
1176
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001177void grpc_call_destroy(grpc_call *c) {
ctillerc6d61c42014-12-15 14:52:08 -08001178 int cancel;
Craig Tiller9724de82015-01-28 17:06:29 -08001179 lock(c);
Craig Tillerf3fba742015-06-11 09:36:33 -07001180 GPR_ASSERT(!c->destroy_called);
1181 c->destroy_called = 1;
Craig Tiller77f04612015-07-01 13:39:45 -07001182 c->cancel_alarm |= c->have_alarm;
Craig Tillerdaceea82015-02-02 16:15:53 -08001183 cancel = c->read_state != READ_STATE_STREAM_CLOSED;
Craig Tiller9724de82015-01-28 17:06:29 -08001184 unlock(c);
ctillerc6d61c42014-12-15 14:52:08 -08001185 if (cancel) grpc_call_cancel(c);
Craig Tiller4df412b2015-04-28 07:57:54 -07001186 GRPC_CALL_INTERNAL_UNREF(c, "destroy", 1);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001187}
1188
Craig Tiller58ce3f02015-04-22 07:54:24 -07001189grpc_call_error grpc_call_cancel(grpc_call *call) {
Craig Tiller48b9fde2015-04-24 08:04:59 -07001190 return grpc_call_cancel_with_status(call, GRPC_STATUS_CANCELLED, "Cancelled");
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001191}
1192
Craig Tiller6046dc32015-01-14 12:55:45 -08001193grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
1194 grpc_status_code status,
1195 const char *description) {
Craig Tiller5dde66e2015-06-02 09:05:23 -07001196 grpc_call_error r;
1197 lock(c);
1198 r = cancel_with_status(c, status, description);
1199 unlock(c);
1200 return r;
Yang Gaoff30f8e2015-05-04 00:13:39 -07001201}
1202
Craig Tillerf9fae982015-05-29 23:02:18 -07001203static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status,
Craig Tiller5dde66e2015-06-02 09:05:23 -07001204 const char *description) {
Craig Tiller6046dc32015-01-14 12:55:45 -08001205 grpc_mdstr *details =
1206 description ? grpc_mdstr_from_string(c->metadata_context, description)
1207 : NULL;
Craig Tiller48b9fde2015-04-24 08:04:59 -07001208
Craig Tiller5dde66e2015-06-02 09:05:23 -07001209 GPR_ASSERT(status != GRPC_STATUS_OK);
1210
Craig Tiller68752722015-01-29 14:59:54 -08001211 set_status_code(c, STATUS_FROM_API_OVERRIDE, status);
1212 set_status_details(c, STATUS_FROM_API_OVERRIDE, details);
Craig Tiller48b9fde2015-04-24 08:04:59 -07001213
Craig Tiller5dde66e2015-06-02 09:05:23 -07001214 c->cancel_with_status = status;
Craig Tiller48b9fde2015-04-24 08:04:59 -07001215
1216 return GRPC_CALL_OK;
Craig Tillerd248c242015-01-14 11:49:12 -08001217}
1218
Craig Tiller5dde66e2015-06-02 09:05:23 -07001219static void finished_loose_op(void *call, int success_ignored) {
1220 GRPC_CALL_INTERNAL_UNREF(call, "loose-op", 0);
1221}
1222
Craig Tiller1e6facb2015-06-11 22:47:11 -07001223typedef struct {
1224 grpc_call *call;
1225 grpc_iomgr_closure closure;
1226} finished_loose_op_allocated_args;
1227
1228static void finished_loose_op_allocated(void *alloc, int success) {
1229 finished_loose_op_allocated_args *args = alloc;
1230 finished_loose_op(args->call, success);
1231 gpr_free(args);
1232}
1233
Craig Tillerb7959a02015-06-25 08:50:54 -07001234static void execute_op(grpc_call *call, grpc_transport_stream_op *op) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001235 grpc_call_element *elem;
Craig Tiller5dde66e2015-06-02 09:05:23 -07001236
1237 GPR_ASSERT(op->on_consumed == NULL);
1238 if (op->cancel_with_status != GRPC_STATUS_OK || op->bind_pollset) {
1239 GRPC_CALL_INTERNAL_REF(call, "loose-op");
Craig Tiller1e6facb2015-06-11 22:47:11 -07001240 if (op->bind_pollset) {
1241 op->on_consumed = &call->on_done_bind;
1242 } else {
1243 finished_loose_op_allocated_args *args = gpr_malloc(sizeof(*args));
1244 args->call = call;
Craig Tillerf1bff012015-07-06 11:20:50 -07001245 grpc_iomgr_closure_init(&args->closure, finished_loose_op_allocated,
1246 args);
Craig Tiller1e6facb2015-06-11 22:47:11 -07001247 op->on_consumed = &args->closure;
1248 }
Craig Tiller5dde66e2015-06-02 09:05:23 -07001249 }
1250
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001251 elem = CALL_ELEM_FROM_CALL(call, 0);
Craig Tiller935cf422015-05-01 14:10:46 -07001252 op->context = call->context;
Craig Tillere039f032015-06-25 12:54:23 -07001253 elem->filter->start_transport_stream_op(elem, op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001254}
1255
Craig Tiller566316f2015-02-02 15:25:32 -08001256grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
1257 return CALL_FROM_TOP_ELEM(elem);
1258}
1259
1260static void call_alarm(void *arg, int success) {
1261 grpc_call *call = arg;
Craig Tiller77f04612015-07-01 13:39:45 -07001262 lock(call);
1263 call->have_alarm = 0;
Craig Tiller566316f2015-02-02 15:25:32 -08001264 if (success) {
Craig Tiller5dde66e2015-06-02 09:05:23 -07001265 cancel_with_status(call, GRPC_STATUS_DEADLINE_EXCEEDED,
1266 "Deadline Exceeded");
Craig Tiller566316f2015-02-02 15:25:32 -08001267 }
Craig Tiller77f04612015-07-01 13:39:45 -07001268 finish_read_ops(call);
1269 unlock(call);
Craig Tiller4df412b2015-04-28 07:57:54 -07001270 GRPC_CALL_INTERNAL_UNREF(call, "alarm", 1);
Craig Tiller566316f2015-02-02 15:25:32 -08001271}
1272
Craig Tiller6902ad22015-04-16 08:01:49 -07001273static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) {
Craig Tiller566316f2015-02-02 15:25:32 -08001274 if (call->have_alarm) {
1275 gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice");
Craig Tillerfa4f9942015-04-23 15:22:09 -07001276 assert(0);
Craig Tiller7e8489a2015-04-23 12:41:16 -07001277 return;
Craig Tiller566316f2015-02-02 15:25:32 -08001278 }
Craig Tiller4df412b2015-04-28 07:57:54 -07001279 GRPC_CALL_INTERNAL_REF(call, "alarm");
Craig Tiller566316f2015-02-02 15:25:32 -08001280 call->have_alarm = 1;
Craig Tiller6a7626c2015-07-19 22:21:41 -07001281 grpc_alarm_init(&call->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), call_alarm, call,
1282 gpr_now(GPR_CLOCK_MONOTONIC));
Craig Tiller566316f2015-02-02 15:25:32 -08001283}
1284
Craig Tiller566316f2015-02-02 15:25:32 -08001285/* we offset status by a small amount when storing it into transport metadata
1286 as metadata cannot store a 0 value (which is used as OK for grpc_status_codes
1287 */
1288#define STATUS_OFFSET 1
1289static void destroy_status(void *ignored) {}
1290
1291static gpr_uint32 decode_status(grpc_mdelem *md) {
1292 gpr_uint32 status;
1293 void *user_data = grpc_mdelem_get_user_data(md, destroy_status);
1294 if (user_data) {
Craig Tiller87d5b192015-04-16 14:37:57 -07001295 status = ((gpr_uint32)(gpr_intptr)user_data) - STATUS_OFFSET;
Craig Tiller566316f2015-02-02 15:25:32 -08001296 } else {
1297 if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value),
1298 GPR_SLICE_LENGTH(md->value->slice),
1299 &status)) {
1300 status = GRPC_STATUS_UNKNOWN; /* could not parse status code */
1301 }
1302 grpc_mdelem_set_user_data(md, destroy_status,
1303 (void *)(gpr_intptr)(status + STATUS_OFFSET));
1304 }
1305 return status;
1306}
1307
David Garcia Quintasdb94b272015-06-15 18:37:01 -07001308/* just as for status above, we need to offset: metadata userdata can't hold a
1309 * zero (null), which in this case is used to signal no compression */
1310#define COMPRESS_OFFSET 1
1311static void destroy_compression(void *ignored) {}
1312
1313static gpr_uint32 decode_compression(grpc_mdelem *md) {
David Garcia Quintasfc0fa332015-06-25 18:11:07 -07001314 grpc_compression_algorithm algorithm;
David Garcia Quintas92ce5882015-06-23 17:07:12 -07001315 void *user_data = grpc_mdelem_get_user_data(md, destroy_compression);
David Garcia Quintasdb94b272015-06-15 18:37:01 -07001316 if (user_data) {
David Garcia Quintasfc0fa332015-06-25 18:11:07 -07001317 algorithm = ((grpc_compression_level)(gpr_intptr)user_data) - COMPRESS_OFFSET;
David Garcia Quintasdb94b272015-06-15 18:37:01 -07001318 } else {
David Garcia Quintasfc0fa332015-06-25 18:11:07 -07001319 const char *md_c_str = grpc_mdstr_as_c_string(md->value);
1320 if (!grpc_compression_algorithm_parse(md_c_str, &algorithm)) {
1321 gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'", md_c_str);
1322 assert(0);
David Garcia Quintasdb94b272015-06-15 18:37:01 -07001323 }
1324 grpc_mdelem_set_user_data(md, destroy_compression,
David Garcia Quintasfc0fa332015-06-25 18:11:07 -07001325 (void *)(gpr_intptr)(algorithm + COMPRESS_OFFSET));
David Garcia Quintasdb94b272015-06-15 18:37:01 -07001326 }
David Garcia Quintasfc0fa332015-06-25 18:11:07 -07001327 return algorithm;
David Garcia Quintasdb94b272015-06-15 18:37:01 -07001328}
1329
Craig Tiller629b0ed2015-04-22 11:14:26 -07001330static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
Craig Tiller6902ad22015-04-16 08:01:49 -07001331 grpc_linked_mdelem *l;
Craig Tiller566316f2015-02-02 15:25:32 -08001332 grpc_metadata_array *dest;
1333 grpc_metadata *mdusr;
Craig Tiller6902ad22015-04-16 08:01:49 -07001334 int is_trailing;
Craig Tiller8b282cb2015-04-17 14:57:44 -07001335 grpc_mdctx *mdctx = call->metadata_context;
Craig Tiller566316f2015-02-02 15:25:32 -08001336
Craig Tiller6902ad22015-04-16 08:01:49 -07001337 is_trailing = call->read_state >= READ_STATE_GOT_INITIAL_METADATA;
Craig Tiller48b02ec2015-04-21 13:58:36 -07001338 for (l = md->list.head; l != NULL; l = l->next) {
Craig Tiller6902ad22015-04-16 08:01:49 -07001339 grpc_mdelem *md = l->md;
1340 grpc_mdstr *key = md->key;
1341 if (key == grpc_channel_get_status_string(call->channel)) {
1342 set_status_code(call, STATUS_FROM_WIRE, decode_status(md));
Craig Tiller6902ad22015-04-16 08:01:49 -07001343 } else if (key == grpc_channel_get_message_string(call->channel)) {
Craig Tiller1a65a232015-07-06 10:22:32 -07001344 set_status_details(call, STATUS_FROM_WIRE, GRPC_MDSTR_REF(md->value));
Craig Tillerf1bff012015-07-06 11:20:50 -07001345 } else if (key ==
David Garcia Quintasd7d9ce22015-06-30 23:29:03 -07001346 grpc_channel_get_compression_algorithm_string(call->channel)) {
David Garcia Quintasd16af0e2015-06-22 22:39:21 -07001347 set_compression_algorithm(call, decode_compression(md));
Craig Tiller6902ad22015-04-16 08:01:49 -07001348 } else {
1349 dest = &call->buffered_metadata[is_trailing];
1350 if (dest->count == dest->capacity) {
1351 dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2);
1352 dest->metadata =
1353 gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity);
1354 }
1355 mdusr = &dest->metadata[dest->count++];
1356 mdusr->key = grpc_mdstr_as_c_string(md->key);
1357 mdusr->value = grpc_mdstr_as_c_string(md->value);
1358 mdusr->value_length = GPR_SLICE_LENGTH(md->value->slice);
1359 if (call->owned_metadata_count == call->owned_metadata_capacity) {
Craig Tiller87d5b192015-04-16 14:37:57 -07001360 call->owned_metadata_capacity =
1361 GPR_MAX(call->owned_metadata_capacity + 8,
1362 call->owned_metadata_capacity * 2);
Craig Tiller6902ad22015-04-16 08:01:49 -07001363 call->owned_metadata =
1364 gpr_realloc(call->owned_metadata,
1365 sizeof(grpc_mdelem *) * call->owned_metadata_capacity);
1366 }
1367 call->owned_metadata[call->owned_metadata_count++] = md;
Craig Tiller8b282cb2015-04-17 14:57:44 -07001368 l->md = 0;
Craig Tiller566316f2015-02-02 15:25:32 -08001369 }
Craig Tiller6902ad22015-04-16 08:01:49 -07001370 }
Craig Tiller143e7bf2015-07-13 08:41:49 -07001371 if (gpr_time_cmp(md->deadline, gpr_inf_future(GPR_CLOCK_REALTIME)) != 0) {
Craig Tiller6902ad22015-04-16 08:01:49 -07001372 set_deadline_alarm(call, md->deadline);
1373 }
1374 if (!is_trailing) {
Craig Tiller629b0ed2015-04-22 11:14:26 -07001375 call->read_state = READ_STATE_GOT_INITIAL_METADATA;
Craig Tiller566316f2015-02-02 15:25:32 -08001376 }
Craig Tiller6902ad22015-04-16 08:01:49 -07001377
Craig Tiller8b282cb2015-04-17 14:57:44 -07001378 grpc_mdctx_lock(mdctx);
1379 for (l = md->list.head; l; l = l->next) {
Craig Tiller1a65a232015-07-06 10:22:32 -07001380 if (l->md) GRPC_MDCTX_LOCKED_MDELEM_UNREF(mdctx, l->md);
Craig Tiller8b282cb2015-04-17 14:57:44 -07001381 }
1382 for (l = md->garbage.head; l; l = l->next) {
Craig Tiller1a65a232015-07-06 10:22:32 -07001383 GRPC_MDCTX_LOCKED_MDELEM_UNREF(mdctx, l->md);
Craig Tiller8b282cb2015-04-17 14:57:44 -07001384 }
1385 grpc_mdctx_unlock(mdctx);
Craig Tiller629b0ed2015-04-22 11:14:26 -07001386}
Craig Tiller8b282cb2015-04-17 14:57:44 -07001387
Craig Tiller566316f2015-02-02 15:25:32 -08001388grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) {
1389 return CALL_STACK_FROM_CALL(call);
1390}
1391
1392/*
Craig Tillerfb189f82015-02-03 12:07:07 -08001393 * BATCH API IMPLEMENTATION
1394 */
1395
1396static void set_status_value_directly(grpc_status_code status, void *dest) {
1397 *(grpc_status_code *)dest = status;
1398}
1399
1400static void set_cancelled_value(grpc_status_code status, void *dest) {
1401 *(grpc_status_code *)dest = (status != GRPC_STATUS_OK);
1402}
1403
Craig Tiller64be9f72015-05-04 14:53:51 -07001404static void finish_batch(grpc_call *call, int success, void *tag) {
Craig Tiller12cf5372015-07-09 13:48:11 -07001405 grpc_cq_end_op(call->cq, tag, success, done_completion, call,
1406 allocate_completion(call));
Craig Tillerfbac5f12015-05-15 14:20:44 -07001407}
1408
1409static void finish_batch_with_close(grpc_call *call, int success, void *tag) {
Craig Tiller12cf5372015-07-09 13:48:11 -07001410 grpc_cq_end_op(call->cq, tag, 1, done_completion, call,
1411 allocate_completion(call));
Craig Tiller166e2502015-02-03 20:14:41 -08001412}
Craig Tillerfb189f82015-02-03 12:07:07 -08001413
David Garcia Quintas1d5aca52015-06-14 14:42:04 -07001414static int are_write_flags_valid(gpr_uint32 flags) {
1415 /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
1416 const gpr_uint32 allowed_write_positions =
1417 (GRPC_WRITE_USED_MASK | GRPC_WRITE_INTERNAL_USED_MASK);
1418 const gpr_uint32 invalid_positions = ~allowed_write_positions;
1419 return !(flags & invalid_positions);
1420}
1421
Craig Tillerfb189f82015-02-03 12:07:07 -08001422grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
1423 size_t nops, void *tag) {
1424 grpc_ioreq reqs[GRPC_IOREQ_OP_COUNT];
1425 size_t in;
1426 size_t out;
1427 const grpc_op *op;
1428 grpc_ioreq *req;
Craig Tillerfbac5f12015-05-15 14:20:44 -07001429 void (*finish_func)(grpc_call *, int, void *) = finish_batch;
Craig Tillerfb189f82015-02-03 12:07:07 -08001430
murgatroid99d47946b2015-03-09 14:27:07 -07001431 GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag);
1432
murgatroid99a8c21e82015-02-12 13:55:53 -08001433 if (nops == 0) {
Craig Tiller97fc6a32015-07-08 15:31:35 -07001434 grpc_cq_begin_op(call->cq);
1435 GRPC_CALL_INTERNAL_REF(call, "completion");
Craig Tiller12cf5372015-07-09 13:48:11 -07001436 grpc_cq_end_op(call->cq, tag, 1, done_completion, call,
1437 allocate_completion(call));
murgatroid99a8c21e82015-02-12 13:55:53 -08001438 return GRPC_CALL_OK;
1439 }
1440
Craig Tillerfb189f82015-02-03 12:07:07 -08001441 /* rewrite batch ops into ioreq ops */
1442 for (in = 0, out = 0; in < nops; in++) {
1443 op = &ops[in];
1444 switch (op->op) {
1445 case GRPC_OP_SEND_INITIAL_METADATA:
David Garcia Quintas1d5aca52015-06-14 14:42:04 -07001446 /* Flag validation: currently allow no flags */
David Garcia Quintasde526252015-06-15 13:15:26 -07001447 if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
Craig Tillerfb189f82015-02-03 12:07:07 -08001448 req = &reqs[out++];
1449 req->op = GRPC_IOREQ_SEND_INITIAL_METADATA;
1450 req->data.send_metadata.count = op->data.send_initial_metadata.count;
1451 req->data.send_metadata.metadata =
1452 op->data.send_initial_metadata.metadata;
David Garcia Quintas1d5aca52015-06-14 14:42:04 -07001453 req->flags = op->flags;
Craig Tillerfb189f82015-02-03 12:07:07 -08001454 break;
1455 case GRPC_OP_SEND_MESSAGE:
Craig Tiller9a576332015-06-17 10:21:49 -07001456 if (!are_write_flags_valid(op->flags)) {
David Garcia Quintas1d5aca52015-06-14 14:42:04 -07001457 return GRPC_CALL_ERROR_INVALID_FLAGS;
1458 }
Craig Tillerfb189f82015-02-03 12:07:07 -08001459 req = &reqs[out++];
1460 req->op = GRPC_IOREQ_SEND_MESSAGE;
1461 req->data.send_message = op->data.send_message;
David Garcia Quintas5927aec2015-06-18 17:24:44 -07001462 req->flags = op->flags;
Craig Tillerfb189f82015-02-03 12:07:07 -08001463 break;
1464 case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
David Garcia Quintas1d5aca52015-06-14 14:42:04 -07001465 /* Flag validation: currently allow no flags */
David Garcia Quintasde526252015-06-15 13:15:26 -07001466 if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
Craig Tillerfb189f82015-02-03 12:07:07 -08001467 if (!call->is_client) {
1468 return GRPC_CALL_ERROR_NOT_ON_SERVER;
1469 }
1470 req = &reqs[out++];
1471 req->op = GRPC_IOREQ_SEND_CLOSE;
David Garcia Quintas1d5aca52015-06-14 14:42:04 -07001472 req->flags = op->flags;
Craig Tillerfb189f82015-02-03 12:07:07 -08001473 break;
1474 case GRPC_OP_SEND_STATUS_FROM_SERVER:
David Garcia Quintas1d5aca52015-06-14 14:42:04 -07001475 /* Flag validation: currently allow no flags */
David Garcia Quintasde526252015-06-15 13:15:26 -07001476 if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
Craig Tillerfb189f82015-02-03 12:07:07 -08001477 if (call->is_client) {
1478 return GRPC_CALL_ERROR_NOT_ON_CLIENT;
1479 }
1480 req = &reqs[out++];
1481 req->op = GRPC_IOREQ_SEND_TRAILING_METADATA;
David Garcia Quintas1d5aca52015-06-14 14:42:04 -07001482 req->flags = op->flags;
Craig Tillerfb189f82015-02-03 12:07:07 -08001483 req->data.send_metadata.count =
1484 op->data.send_status_from_server.trailing_metadata_count;
1485 req->data.send_metadata.metadata =
1486 op->data.send_status_from_server.trailing_metadata;
1487 req = &reqs[out++];
1488 req->op = GRPC_IOREQ_SEND_STATUS;
1489 req->data.send_status.code = op->data.send_status_from_server.status;
1490 req->data.send_status.details =
Craig Tilleraea081f2015-06-11 14:19:33 -07001491 op->data.send_status_from_server.status_details != NULL
1492 ? grpc_mdstr_from_string(
1493 call->metadata_context,
1494 op->data.send_status_from_server.status_details)
1495 : NULL;
Craig Tillerfb189f82015-02-03 12:07:07 -08001496 req = &reqs[out++];
1497 req->op = GRPC_IOREQ_SEND_CLOSE;
1498 break;
1499 case GRPC_OP_RECV_INITIAL_METADATA:
David Garcia Quintas1d5aca52015-06-14 14:42:04 -07001500 /* Flag validation: currently allow no flags */
David Garcia Quintasde526252015-06-15 13:15:26 -07001501 if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
Craig Tillerfb189f82015-02-03 12:07:07 -08001502 if (!call->is_client) {
1503 return GRPC_CALL_ERROR_NOT_ON_SERVER;
1504 }
1505 req = &reqs[out++];
1506 req->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
1507 req->data.recv_metadata = op->data.recv_initial_metadata;
David Garcia Quintas1d5aca52015-06-14 14:42:04 -07001508 req->flags = op->flags;
Craig Tillerfb189f82015-02-03 12:07:07 -08001509 break;
1510 case GRPC_OP_RECV_MESSAGE:
David Garcia Quintas1d5aca52015-06-14 14:42:04 -07001511 /* Flag validation: currently allow no flags */
David Garcia Quintasde526252015-06-15 13:15:26 -07001512 if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
Craig Tillerfb189f82015-02-03 12:07:07 -08001513 req = &reqs[out++];
1514 req->op = GRPC_IOREQ_RECV_MESSAGE;
1515 req->data.recv_message = op->data.recv_message;
David Garcia Quintas1d5aca52015-06-14 14:42:04 -07001516 req->flags = op->flags;
Craig Tillerfb189f82015-02-03 12:07:07 -08001517 break;
1518 case GRPC_OP_RECV_STATUS_ON_CLIENT:
David Garcia Quintas1d5aca52015-06-14 14:42:04 -07001519 /* Flag validation: currently allow no flags */
David Garcia Quintasde526252015-06-15 13:15:26 -07001520 if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
Craig Tillerfb189f82015-02-03 12:07:07 -08001521 if (!call->is_client) {
1522 return GRPC_CALL_ERROR_NOT_ON_SERVER;
1523 }
1524 req = &reqs[out++];
1525 req->op = GRPC_IOREQ_RECV_STATUS;
David Garcia Quintas1d5aca52015-06-14 14:42:04 -07001526 req->flags = op->flags;
Craig Tillerfb189f82015-02-03 12:07:07 -08001527 req->data.recv_status.set_value = set_status_value_directly;
1528 req->data.recv_status.user_data = op->data.recv_status_on_client.status;
1529 req = &reqs[out++];
1530 req->op = GRPC_IOREQ_RECV_STATUS_DETAILS;
1531 req->data.recv_status_details.details =
1532 op->data.recv_status_on_client.status_details;
1533 req->data.recv_status_details.details_capacity =
1534 op->data.recv_status_on_client.status_details_capacity;
1535 req = &reqs[out++];
1536 req->op = GRPC_IOREQ_RECV_TRAILING_METADATA;
1537 req->data.recv_metadata =
1538 op->data.recv_status_on_client.trailing_metadata;
1539 req = &reqs[out++];
1540 req->op = GRPC_IOREQ_RECV_CLOSE;
Craig Tillerfbac5f12015-05-15 14:20:44 -07001541 finish_func = finish_batch_with_close;
Craig Tillerfb189f82015-02-03 12:07:07 -08001542 break;
1543 case GRPC_OP_RECV_CLOSE_ON_SERVER:
David Garcia Quintas1d5aca52015-06-14 14:42:04 -07001544 /* Flag validation: currently allow no flags */
David Garcia Quintasde526252015-06-15 13:15:26 -07001545 if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
Craig Tillerfb189f82015-02-03 12:07:07 -08001546 req = &reqs[out++];
1547 req->op = GRPC_IOREQ_RECV_STATUS;
David Garcia Quintas1d5aca52015-06-14 14:42:04 -07001548 req->flags = op->flags;
Craig Tillerfb189f82015-02-03 12:07:07 -08001549 req->data.recv_status.set_value = set_cancelled_value;
1550 req->data.recv_status.user_data =
1551 op->data.recv_close_on_server.cancelled;
1552 req = &reqs[out++];
1553 req->op = GRPC_IOREQ_RECV_CLOSE;
Craig Tillerfbac5f12015-05-15 14:20:44 -07001554 finish_func = finish_batch_with_close;
Craig Tillerfb189f82015-02-03 12:07:07 -08001555 break;
1556 }
1557 }
1558
Craig Tiller97fc6a32015-07-08 15:31:35 -07001559 GRPC_CALL_INTERNAL_REF(call, "completion");
1560 grpc_cq_begin_op(call->cq);
Craig Tillerfb189f82015-02-03 12:07:07 -08001561
Craig Tillerf9fae982015-05-29 23:02:18 -07001562 return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_func, tag);
Craig Tillerfb189f82015-02-03 12:07:07 -08001563}
Craig Tiller935cf422015-05-01 14:10:46 -07001564
Alistair Veitch9686dab2015-05-26 14:26:47 -07001565void grpc_call_context_set(grpc_call *call, grpc_context_index elem,
1566 void *value, void (*destroy)(void *value)) {
Julien Boeuf83b02972015-05-20 22:50:34 -07001567 if (call->context[elem].destroy) {
1568 call->context[elem].destroy(call->context[elem].value);
Craig Tiller935cf422015-05-01 14:10:46 -07001569 }
Julien Boeuf83b02972015-05-20 22:50:34 -07001570 call->context[elem].value = value;
1571 call->context[elem].destroy = destroy;
Craig Tiller935cf422015-05-01 14:10:46 -07001572}
1573
1574void *grpc_call_context_get(grpc_call *call, grpc_context_index elem) {
Julien Boeuf83b02972015-05-20 22:50:34 -07001575 return call->context[elem].value;
Craig Tiller935cf422015-05-01 14:10:46 -07001576}
Julien Boeuf9f218dd2015-04-23 10:24:02 -07001577
1578gpr_uint8 grpc_call_is_client(grpc_call *call) { return call->is_client; }