blob: cc6ae462e26505b4ff63c4317e8a9579b184f234 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller06059952015-02-18 08:34:56 -08003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/surface/call.h"
35#include "src/core/channel/channel_stack.h"
ctiller18b49ab2014-12-09 14:39:16 -080036#include "src/core/iomgr/alarm.h"
Craig Tiller50968492015-04-28 17:05:49 -070037#include "src/core/profiling/timers.h"
Craig Tiller485d7762015-01-23 12:54:05 -080038#include "src/core/support/string.h"
Craig Tiller1e0d4c42015-01-30 16:17:29 -080039#include "src/core/surface/byte_buffer_queue.h"
ctiller18b49ab2014-12-09 14:39:16 -080040#include "src/core/surface/channel.h"
41#include "src/core/surface/completion_queue.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080042#include <grpc/support/alloc.h>
43#include <grpc/support/log.h>
Craig Tiller6902ad22015-04-16 08:01:49 -070044#include <assert.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080045
46#include <stdio.h>
47#include <stdlib.h>
48#include <string.h>
49
Craig Tiller8eb9d472015-01-27 17:00:03 -080050typedef enum { REQ_INITIAL = 0, REQ_READY, REQ_DONE } req_state;
51
52typedef enum {
53 SEND_NOTHING,
54 SEND_INITIAL_METADATA,
Craig Tiller7bd9b992015-02-04 08:38:02 -080055 SEND_BUFFERED_INITIAL_METADATA,
Craig Tiller8eb9d472015-01-27 17:00:03 -080056 SEND_MESSAGE,
Craig Tiller7bd9b992015-02-04 08:38:02 -080057 SEND_BUFFERED_MESSAGE,
Craig Tiller1c141902015-01-31 08:51:54 -080058 SEND_TRAILING_METADATA_AND_FINISH,
Craig Tiller8eb9d472015-01-27 17:00:03 -080059 SEND_FINISH
60} send_action;
61
62typedef struct {
63 grpc_ioreq_completion_func on_complete;
64 void *user_data;
65 grpc_op_error status;
66} completed_request;
67
Craig Tillerc12fee62015-02-03 11:55:50 -080068/* See request_set in grpc_call below for a description */
Craig Tiller6902ad22015-04-16 08:01:49 -070069#define REQSET_EMPTY 'X'
70#define REQSET_DONE 'Y'
71
72#define MAX_SEND_INITIAL_METADATA_COUNT 3
Craig Tiller1e0d4c42015-01-30 16:17:29 -080073
Craig Tillerc18c56e2015-02-02 15:59:13 -080074typedef struct {
Craig Tillerdaceea82015-02-02 16:15:53 -080075 /* Overall status of the operation: starts OK, may degrade to
76 non-OK */
Craig Tiller1e0d4c42015-01-30 16:17:29 -080077 grpc_op_error status;
Craig Tillerdaceea82015-02-02 16:15:53 -080078 /* Completion function to call at the end of the operation */
Craig Tillercce17ac2015-01-20 09:29:28 -080079 grpc_ioreq_completion_func on_complete;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080080 void *user_data;
Craig Tillerd642dcf2015-02-03 20:39:09 -080081 /* a bit mask of which request ops are needed (1u << opid) */
Craig Tiller58ce3f02015-04-22 07:54:24 -070082 gpr_uint16 need_mask;
Craig Tillerdaceea82015-02-02 16:15:53 -080083 /* a bit mask of which request ops are now completed */
Craig Tiller58ce3f02015-04-22 07:54:24 -070084 gpr_uint16 complete_mask;
Craig Tillerc18c56e2015-02-02 15:59:13 -080085} reqinfo_master;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080086
Craig Tillerdaceea82015-02-02 16:15:53 -080087/* Status data for a request can come from several sources; this
88 enumerates them all, and acts as a priority sorting for which
89 status to return to the application - earlier entries override
90 later ones */
Craig Tiller68752722015-01-29 14:59:54 -080091typedef enum {
Craig Tillerdaceea82015-02-02 16:15:53 -080092 /* Status came from the application layer overriding whatever
93 the wire says */
Craig Tiller68752722015-01-29 14:59:54 -080094 STATUS_FROM_API_OVERRIDE = 0,
Craig Tiller8b282cb2015-04-17 14:57:44 -070095 /* Status was created by some internal channel stack operation */
96 STATUS_FROM_CORE,
Craig Tillerdaceea82015-02-02 16:15:53 -080097 /* Status came from 'the wire' - or somewhere below the surface
98 layer */
Craig Tiller68752722015-01-29 14:59:54 -080099 STATUS_FROM_WIRE,
Craig Tiller68752722015-01-29 14:59:54 -0800100 STATUS_SOURCE_COUNT
101} status_source;
102
103typedef struct {
Craig Tillerdaceea82015-02-02 16:15:53 -0800104 gpr_uint8 is_set;
Craig Tiller68752722015-01-29 14:59:54 -0800105 grpc_status_code code;
106 grpc_mdstr *details;
107} received_status;
108
Craig Tillerdaceea82015-02-02 16:15:53 -0800109/* How far through the GRPC stream have we read? */
110typedef enum {
111 /* We are still waiting for initial metadata to complete */
Craig Tillerc12fee62015-02-03 11:55:50 -0800112 READ_STATE_INITIAL = 0,
Craig Tillerdaceea82015-02-02 16:15:53 -0800113 /* We have gotten initial metadata, and are reading either
114 messages or trailing metadata */
115 READ_STATE_GOT_INITIAL_METADATA,
116 /* The stream is closed for reading */
117 READ_STATE_READ_CLOSED,
118 /* The stream is closed for reading & writing */
119 READ_STATE_STREAM_CLOSED
120} read_state;
121
Craig Tillerc12fee62015-02-03 11:55:50 -0800122typedef enum {
123 WRITE_STATE_INITIAL = 0,
124 WRITE_STATE_STARTED,
125 WRITE_STATE_WRITE_CLOSED
126} write_state;
127
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800128struct grpc_call {
129 grpc_completion_queue *cq;
130 grpc_channel *channel;
131 grpc_mdctx *metadata_context;
Craig Tillercce17ac2015-01-20 09:29:28 -0800132 /* TODO(ctiller): share with cq if possible? */
133 gpr_mu mu;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800134
Craig Tillere5d683c2015-02-03 16:37:36 -0800135 /* how far through the stream have we read? */
Craig Tillerdaceea82015-02-02 16:15:53 -0800136 read_state read_state;
Craig Tillere5d683c2015-02-03 16:37:36 -0800137 /* how far through the stream have we written? */
Craig Tillerc12fee62015-02-03 11:55:50 -0800138 write_state write_state;
Craig Tillere5d683c2015-02-03 16:37:36 -0800139 /* client or server call */
140 gpr_uint8 is_client;
141 /* is the alarm set */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800142 gpr_uint8 have_alarm;
Craig Tillere5d683c2015-02-03 16:37:36 -0800143 /* are we currently performing a send operation */
Craig Tiller8eb9d472015-01-27 17:00:03 -0800144 gpr_uint8 sending;
Craig Tiller629b0ed2015-04-22 11:14:26 -0700145 /* are we currently performing a recv operation */
146 gpr_uint8 receiving;
Craig Tiller991ca9f2015-03-03 09:59:22 -0800147 /* are we currently completing requests */
148 gpr_uint8 completing;
Craig Tillere5d683c2015-02-03 16:37:36 -0800149 /* pairs with completed_requests */
Craig Tiller8eb9d472015-01-27 17:00:03 -0800150 gpr_uint8 num_completed_requests;
Craig Tiller629b0ed2015-04-22 11:14:26 -0700151 /* are we currently reading a message? */
152 gpr_uint8 reading_message;
Craig Tiller83f88d92015-04-21 16:02:05 -0700153 /* flags with bits corresponding to write states allowing us to determine
154 what was sent */
Craig Tiller58ce3f02015-04-22 07:54:24 -0700155 gpr_uint16 last_send_contains;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800156
Craig Tillere5d683c2015-02-03 16:37:36 -0800157 /* Active ioreqs.
158 request_set and request_data contain one element per active ioreq
159 operation.
Craig Tillerebf94bf2015-02-05 08:48:46 -0800160
Craig Tillere5d683c2015-02-03 16:37:36 -0800161 request_set[op] is an integer specifying a set of operations to which
162 the request belongs:
Craig Tillerebf94bf2015-02-05 08:48:46 -0800163 - if it is < GRPC_IOREQ_OP_COUNT, then this operation is pending
Craig Tillere5d683c2015-02-03 16:37:36 -0800164 completion, and the integer represents to which group of operations
165 the ioreq belongs. Each group is represented by one master, and the
166 integer in request_set is an index into masters to find the master
167 data.
168 - if it is REQSET_EMPTY, the ioreq op is inactive and available to be
169 started
170 - finally, if request_set[op] is REQSET_DONE, then the operation is
171 complete and unavailable to be started again
Craig Tillerebf94bf2015-02-05 08:48:46 -0800172
Craig Tillere5d683c2015-02-03 16:37:36 -0800173 request_data[op] is the request data as supplied by the initiator of
174 a request, and is valid iff request_set[op] <= GRPC_IOREQ_OP_COUNT.
175 The set fields are as per the request type specified by op.
176
Craig Tillerd6731622015-02-03 22:44:13 -0800177 Finally, one element of masters is set per active _set_ of ioreq
Craig Tillere5d683c2015-02-03 16:37:36 -0800178 operations. It describes work left outstanding, result status, and
179 what work to perform upon operation completion. As one ioreq of each
180 op type can be active at once, by convention we choose the first element
Craig Tillerd6731622015-02-03 22:44:13 -0800181 of the group to be the master -- ie the master of in-progress operation
182 op is masters[request_set[op]]. This allows constant time allocation
Craig Tillere5d683c2015-02-03 16:37:36 -0800183 and a strong upper bound of a count of masters to be calculated. */
Craig Tillerc12fee62015-02-03 11:55:50 -0800184 gpr_uint8 request_set[GRPC_IOREQ_OP_COUNT];
185 grpc_ioreq_data request_data[GRPC_IOREQ_OP_COUNT];
Craig Tillerc18c56e2015-02-02 15:59:13 -0800186 reqinfo_master masters[GRPC_IOREQ_OP_COUNT];
Craig Tillere5d683c2015-02-03 16:37:36 -0800187
188 /* Dynamic array of ioreq's that have completed: the count of
189 elements is queued in num_completed_requests.
190 This list is built up under lock(), and flushed entirely during
191 unlock().
192 We know the upper bound of the number of elements as we can only
193 have one ioreq of each type active at once. */
Craig Tiller8eb9d472015-01-27 17:00:03 -0800194 completed_request completed_requests[GRPC_IOREQ_OP_COUNT];
Craig Tillere5d683c2015-02-03 16:37:36 -0800195 /* Incoming buffer of messages */
Craig Tiller1e0d4c42015-01-30 16:17:29 -0800196 grpc_byte_buffer_queue incoming_queue;
Craig Tillere5d683c2015-02-03 16:37:36 -0800197 /* Buffered read metadata waiting to be returned to the application.
198 Element 0 is initial metadata, element 1 is trailing metadata. */
Craig Tillerc12fee62015-02-03 11:55:50 -0800199 grpc_metadata_array buffered_metadata[2];
Craig Tillere5d683c2015-02-03 16:37:36 -0800200 /* All metadata received - unreffed at once at the end of the call */
Craig Tiller3a4749f2015-01-30 07:51:45 -0800201 grpc_mdelem **owned_metadata;
202 size_t owned_metadata_count;
203 size_t owned_metadata_capacity;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800204
Craig Tillere5d683c2015-02-03 16:37:36 -0800205 /* Received call statuses from various sources */
Craig Tiller68752722015-01-29 14:59:54 -0800206 received_status status[STATUS_SOURCE_COUNT];
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800207
Craig Tiller935cf422015-05-01 14:10:46 -0700208 void *context[GRPC_CONTEXT_COUNT];
209 void (*destroy_context[GRPC_CONTEXT_COUNT])(void *);
210
Craig Tillere5d683c2015-02-03 16:37:36 -0800211 /* Deadline alarm - if have_alarm is non-zero */
Craig Tillercce17ac2015-01-20 09:29:28 -0800212 grpc_alarm alarm;
213
Craig Tillere5d683c2015-02-03 16:37:36 -0800214 /* Call refcount - to keep the call alive during asynchronous operations */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800215 gpr_refcount internal_refcount;
Craig Tillercce17ac2015-01-20 09:29:28 -0800216
Craig Tiller6902ad22015-04-16 08:01:49 -0700217 grpc_linked_mdelem send_initial_metadata[MAX_SEND_INITIAL_METADATA_COUNT];
218 grpc_linked_mdelem status_link;
219 grpc_linked_mdelem details_link;
220 size_t send_initial_metadata_count;
221 gpr_timespec send_deadline;
222
Craig Tiller83f88d92015-04-21 16:02:05 -0700223 grpc_stream_op_buffer send_ops;
224 grpc_stream_op_buffer recv_ops;
225 grpc_stream_state recv_state;
226
Craig Tiller629b0ed2015-04-22 11:14:26 -0700227 gpr_slice_buffer incoming_message;
228 gpr_uint32 incoming_message_length;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800229};
230
Craig Tiller87d5b192015-04-16 14:37:57 -0700231#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800232#define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1)
233#define CALL_ELEM_FROM_CALL(call, idx) \
234 grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx)
235#define CALL_FROM_TOP_ELEM(top_elem) \
236 CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
237
238static void do_nothing(void *ignored, grpc_op_error also_ignored) {}
Craig Tiller6902ad22015-04-16 08:01:49 -0700239static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline);
Craig Tiller83f88d92015-04-21 16:02:05 -0700240static void call_on_done_recv(void *call, int success);
241static void call_on_done_send(void *call, int success);
242static int fill_send_ops(grpc_call *call, grpc_transport_op *op);
243static void execute_op(grpc_call *call, grpc_transport_op *op);
Craig Tiller629b0ed2015-04-22 11:14:26 -0700244static void recv_metadata(grpc_call *call, grpc_metadata_batch *metadata);
245static void finish_read_ops(grpc_call *call);
Yang Gao172791a2015-05-04 16:33:53 -0700246static grpc_call_error cancel_with_status(
Yang Gaoc71a9d22015-05-04 00:22:12 -0700247 grpc_call *c, grpc_status_code status, const char *description,
248 gpr_uint8 locked);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800249
Craig Tillerfb189f82015-02-03 12:07:07 -0800250grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
Craig Tiller87d5b192015-04-16 14:37:57 -0700251 const void *server_transport_data,
252 grpc_mdelem **add_initial_metadata,
253 size_t add_initial_metadata_count,
254 gpr_timespec send_deadline) {
Craig Tiller1e0d4c42015-01-30 16:17:29 -0800255 size_t i;
Craig Tiller7e8489a2015-04-23 12:41:16 -0700256 grpc_transport_op initial_op;
257 grpc_transport_op *initial_op_ptr = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800258 grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel);
259 grpc_call *call =
260 gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size);
Craig Tillercce17ac2015-01-20 09:29:28 -0800261 memset(call, 0, sizeof(grpc_call));
262 gpr_mu_init(&call->mu);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800263 call->channel = channel;
Craig Tillerfb189f82015-02-03 12:07:07 -0800264 call->cq = cq;
Craig Tillercce17ac2015-01-20 09:29:28 -0800265 call->is_client = server_transport_data == NULL;
Craig Tiller1e0d4c42015-01-30 16:17:29 -0800266 for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
Craig Tillerc12fee62015-02-03 11:55:50 -0800267 call->request_set[i] = REQSET_EMPTY;
Craig Tiller1e0d4c42015-01-30 16:17:29 -0800268 }
Craig Tiller23aa6c42015-01-27 17:16:12 -0800269 if (call->is_client) {
Craig Tillerc12fee62015-02-03 11:55:50 -0800270 call->request_set[GRPC_IOREQ_SEND_TRAILING_METADATA] = REQSET_DONE;
271 call->request_set[GRPC_IOREQ_SEND_STATUS] = REQSET_DONE;
Craig Tiller23aa6c42015-01-27 17:16:12 -0800272 }
Craig Tiller6902ad22015-04-16 08:01:49 -0700273 GPR_ASSERT(add_initial_metadata_count < MAX_SEND_INITIAL_METADATA_COUNT);
274 for (i = 0; i < add_initial_metadata_count; i++) {
275 call->send_initial_metadata[i].md = add_initial_metadata[i];
276 }
277 call->send_initial_metadata_count = add_initial_metadata_count;
278 call->send_deadline = send_deadline;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800279 grpc_channel_internal_ref(channel);
280 call->metadata_context = grpc_channel_get_metadata_context(channel);
Craig Tillerfbf5be22015-04-22 16:17:09 -0700281 grpc_sopb_init(&call->send_ops);
282 grpc_sopb_init(&call->recv_ops);
Craig Tillerfa4f9942015-04-23 15:22:09 -0700283 gpr_slice_buffer_init(&call->incoming_message);
Craig Tiller9c71b6f2015-04-24 16:02:00 -0700284 /* dropped in destroy */
285 gpr_ref_init(&call->internal_refcount, 1);
Craig Tiller7e8489a2015-04-23 12:41:16 -0700286 /* server hack: start reads immediately so we can get initial metadata.
287 TODO(ctiller): figure out a cleaner solution */
288 if (!call->is_client) {
289 memset(&initial_op, 0, sizeof(initial_op));
290 initial_op.recv_ops = &call->recv_ops;
291 initial_op.recv_state = &call->recv_state;
292 initial_op.on_done_recv = call_on_done_recv;
293 initial_op.recv_user_data = call;
Craig Tiller935cf422015-05-01 14:10:46 -0700294 initial_op.context = call->context;
Craig Tiller7e8489a2015-04-23 12:41:16 -0700295 call->receiving = 1;
Craig Tiller4df412b2015-04-28 07:57:54 -0700296 GRPC_CALL_INTERNAL_REF(call, "receiving");
Craig Tiller7e8489a2015-04-23 12:41:16 -0700297 initial_op_ptr = &initial_op;
298 }
299 grpc_call_stack_init(channel_stack, server_transport_data, initial_op_ptr,
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800300 CALL_STACK_FROM_CALL(call));
Craig Tiller6902ad22015-04-16 08:01:49 -0700301 if (gpr_time_cmp(send_deadline, gpr_inf_future) != 0) {
302 set_deadline_alarm(call, send_deadline);
303 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800304 return call;
305}
306
Craig Tiller166e2502015-02-03 20:14:41 -0800307void grpc_call_set_completion_queue(grpc_call *call,
308 grpc_completion_queue *cq) {
309 call->cq = cq;
310}
311
Craig Tiller24be0f72015-02-10 14:04:22 -0800312grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call) {
313 return call->cq;
314}
315
Craig Tiller4df412b2015-04-28 07:57:54 -0700316#ifdef GRPC_CALL_REF_COUNT_DEBUG
317void grpc_call_internal_ref(grpc_call *c, const char *reason) {
318 gpr_log(GPR_DEBUG, "CALL: ref %p %d -> %d [%s]", c,
319 c->internal_refcount.count, c->internal_refcount.count + 1, reason);
320#else
321void grpc_call_internal_ref(grpc_call *c) {
322#endif
323 gpr_ref(&c->internal_refcount);
324}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800325
Craig Tilleraef25da2015-01-29 17:19:45 -0800326static void destroy_call(void *call, int ignored_success) {
Craig Tiller566316f2015-02-02 15:25:32 -0800327 size_t i;
Craig Tilleraef25da2015-01-29 17:19:45 -0800328 grpc_call *c = call;
Craig Tillera4541102015-01-29 11:46:11 -0800329 grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c));
330 grpc_channel_internal_unref(c->channel);
331 gpr_mu_destroy(&c->mu);
Craig Tiller68752722015-01-29 14:59:54 -0800332 for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
333 if (c->status[i].details) {
334 grpc_mdstr_unref(c->status[i].details);
335 }
Craig Tillera4541102015-01-29 11:46:11 -0800336 }
Craig Tiller3a4749f2015-01-30 07:51:45 -0800337 for (i = 0; i < c->owned_metadata_count; i++) {
338 grpc_mdelem_unref(c->owned_metadata[i]);
339 }
340 gpr_free(c->owned_metadata);
Craig Tillerc12fee62015-02-03 11:55:50 -0800341 for (i = 0; i < GPR_ARRAY_SIZE(c->buffered_metadata); i++) {
342 gpr_free(c->buffered_metadata[i].metadata);
343 }
Craig Tillereb40a532015-04-17 16:46:20 -0700344 for (i = 0; i < c->send_initial_metadata_count; i++) {
345 grpc_mdelem_unref(c->send_initial_metadata[i].md);
346 }
Craig Tiller935cf422015-05-01 14:10:46 -0700347 for (i = 0; i < GRPC_CONTEXT_COUNT; i++) {
348 if (c->destroy_context[i]) {
349 c->destroy_context[i](c->context[i]);
350 }
351 }
Craig Tillerb56ca8d2015-04-24 17:16:22 -0700352 grpc_sopb_destroy(&c->send_ops);
353 grpc_sopb_destroy(&c->recv_ops);
Craig Tiller37bbead2015-02-05 08:43:49 -0800354 grpc_bbq_destroy(&c->incoming_queue);
Craig Tillerfa4f9942015-04-23 15:22:09 -0700355 gpr_slice_buffer_destroy(&c->incoming_message);
Craig Tillera4541102015-01-29 11:46:11 -0800356 gpr_free(c);
357}
358
Craig Tiller4df412b2015-04-28 07:57:54 -0700359#ifdef GRPC_CALL_REF_COUNT_DEBUG
360void grpc_call_internal_unref(grpc_call *c, const char *reason,
361 int allow_immediate_deletion) {
362 gpr_log(GPR_DEBUG, "CALL: unref %p %d -> %d [%s]", c,
363 c->internal_refcount.count, c->internal_refcount.count - 1, reason);
364#else
Craig Tiller64bc3fd2015-04-24 17:07:12 -0700365void grpc_call_internal_unref(grpc_call *c, int allow_immediate_deletion) {
Craig Tiller4df412b2015-04-28 07:57:54 -0700366#endif
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800367 if (gpr_unref(&c->internal_refcount)) {
Craig Tilleraef25da2015-01-29 17:19:45 -0800368 if (allow_immediate_deletion) {
369 destroy_call(c, 1);
370 } else {
371 grpc_iomgr_add_callback(destroy_call, c);
372 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800373 }
374}
375
Craig Tiller928fbc82015-01-29 15:06:42 -0800376static void set_status_code(grpc_call *call, status_source source,
377 gpr_uint32 status) {
Craig Tillerdaceea82015-02-02 16:15:53 -0800378 call->status[source].is_set = 1;
Craig Tiller68752722015-01-29 14:59:54 -0800379 call->status[source].code = status;
Craig Tiller30547562015-02-05 17:04:51 -0800380
Craig Tillerd1abc812015-05-06 14:35:19 -0700381 if (status != GRPC_STATUS_OK && !grpc_bbq_empty(&call->incoming_queue)) {
Craig Tiller30547562015-02-05 17:04:51 -0800382 grpc_bbq_flush(&call->incoming_queue);
383 }
Craig Tiller68752722015-01-29 14:59:54 -0800384}
385
Craig Tiller928fbc82015-01-29 15:06:42 -0800386static void set_status_details(grpc_call *call, status_source source,
387 grpc_mdstr *status) {
Craig Tiller68752722015-01-29 14:59:54 -0800388 if (call->status[source].details != NULL) {
389 grpc_mdstr_unref(call->status[source].details);
390 }
391 call->status[source].details = status;
392}
393
Craig Tillerc12fee62015-02-03 11:55:50 -0800394static int is_op_live(grpc_call *call, grpc_ioreq_op op) {
395 gpr_uint8 set = call->request_set[op];
396 reqinfo_master *master;
397 if (set >= GRPC_IOREQ_OP_COUNT) return 0;
398 master = &call->masters[set];
Craig Tillerd642dcf2015-02-03 20:39:09 -0800399 return (master->complete_mask & (1u << op)) == 0;
Craig Tillerc12fee62015-02-03 11:55:50 -0800400}
401
Craig Tiller8eb9d472015-01-27 17:00:03 -0800402static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); }
Craig Tillercce17ac2015-01-20 09:29:28 -0800403
Craig Tiller629b0ed2015-04-22 11:14:26 -0700404static int need_more_data(grpc_call *call) {
405 return is_op_live(call, GRPC_IOREQ_RECV_INITIAL_METADATA) ||
Craig Tiller10b9cb52015-04-28 10:00:15 -0700406 (is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) && grpc_bbq_empty(&call->incoming_queue)) ||
Craig Tiller629b0ed2015-04-22 11:14:26 -0700407 is_op_live(call, GRPC_IOREQ_RECV_TRAILING_METADATA) ||
408 is_op_live(call, GRPC_IOREQ_RECV_STATUS) ||
409 is_op_live(call, GRPC_IOREQ_RECV_STATUS_DETAILS) ||
Craig Tiller4df412b2015-04-28 07:57:54 -0700410 (is_op_live(call, GRPC_IOREQ_RECV_CLOSE) &&
411 grpc_bbq_empty(&call->incoming_queue)) ||
412 (call->write_state == WRITE_STATE_INITIAL && !call->is_client &&
Craig Tiller10b9cb52015-04-28 10:00:15 -0700413 call->read_state < READ_STATE_GOT_INITIAL_METADATA);
Craig Tiller629b0ed2015-04-22 11:14:26 -0700414}
415
Craig Tiller8eb9d472015-01-27 17:00:03 -0800416static void unlock(grpc_call *call) {
Craig Tiller83f88d92015-04-21 16:02:05 -0700417 grpc_transport_op op;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800418 completed_request completed_requests[GRPC_IOREQ_OP_COUNT];
Craig Tiller991ca9f2015-03-03 09:59:22 -0800419 int completing_requests = 0;
Craig Tiller83f88d92015-04-21 16:02:05 -0700420 int start_op = 0;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800421 int i;
422
Craig Tiller83f88d92015-04-21 16:02:05 -0700423 memset(&op, 0, sizeof(op));
424
Craig Tiller1a727fd2015-04-24 13:21:22 -0700425 if (!call->receiving && need_more_data(call)) {
Craig Tiller83f88d92015-04-21 16:02:05 -0700426 op.recv_ops = &call->recv_ops;
427 op.recv_state = &call->recv_state;
428 op.on_done_recv = call_on_done_recv;
429 op.recv_user_data = call;
Craig Tiller629b0ed2015-04-22 11:14:26 -0700430 call->receiving = 1;
Craig Tiller4df412b2015-04-28 07:57:54 -0700431 GRPC_CALL_INTERNAL_REF(call, "receiving");
Craig Tiller83f88d92015-04-21 16:02:05 -0700432 start_op = 1;
433 }
434
435 if (!call->sending) {
436 if (fill_send_ops(call, &op)) {
437 call->sending = 1;
Craig Tiller4df412b2015-04-28 07:57:54 -0700438 GRPC_CALL_INTERNAL_REF(call, "sending");
Craig Tiller83f88d92015-04-21 16:02:05 -0700439 start_op = 1;
440 }
Craig Tillercffbcb72015-01-29 23:16:33 -0800441 }
Craig Tiller2e103572015-01-29 14:12:07 -0800442
Craig Tiller991ca9f2015-03-03 09:59:22 -0800443 if (!call->completing && call->num_completed_requests != 0) {
444 completing_requests = call->num_completed_requests;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800445 memcpy(completed_requests, call->completed_requests,
446 sizeof(completed_requests));
447 call->num_completed_requests = 0;
Craig Tiller991ca9f2015-03-03 09:59:22 -0800448 call->completing = 1;
Craig Tiller4df412b2015-04-28 07:57:54 -0700449 GRPC_CALL_INTERNAL_REF(call, "completing");
Craig Tiller8eb9d472015-01-27 17:00:03 -0800450 }
451
Craig Tiller8eb9d472015-01-27 17:00:03 -0800452 gpr_mu_unlock(&call->mu);
453
Craig Tiller83f88d92015-04-21 16:02:05 -0700454 if (start_op) {
455 execute_op(call, &op);
Craig Tiller8eb9d472015-01-27 17:00:03 -0800456 }
457
Craig Tiller991ca9f2015-03-03 09:59:22 -0800458 if (completing_requests > 0) {
459 for (i = 0; i < completing_requests; i++) {
460 completed_requests[i].on_complete(call, completed_requests[i].status,
461 completed_requests[i].user_data);
462 }
463 lock(call);
464 call->completing = 0;
465 unlock(call);
Craig Tiller4df412b2015-04-28 07:57:54 -0700466 GRPC_CALL_INTERNAL_UNREF(call, "completing", 0);
Craig Tiller8eb9d472015-01-27 17:00:03 -0800467 }
468}
Craig Tillercce17ac2015-01-20 09:29:28 -0800469
Craig Tillerfb189f82015-02-03 12:07:07 -0800470static void get_final_status(grpc_call *call, grpc_ioreq_data out) {
Craig Tiller68752722015-01-29 14:59:54 -0800471 int i;
472 for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
Craig Tillerdaceea82015-02-02 16:15:53 -0800473 if (call->status[i].is_set) {
Craig Tillerfb189f82015-02-03 12:07:07 -0800474 out.recv_status.set_value(call->status[i].code,
475 out.recv_status.user_data);
476 return;
477 }
478 }
Craig Tillerde343162015-02-09 23:37:22 -0800479 if (call->is_client) {
480 out.recv_status.set_value(GRPC_STATUS_UNKNOWN, out.recv_status.user_data);
481 } else {
482 out.recv_status.set_value(GRPC_STATUS_OK, out.recv_status.user_data);
483 }
Craig Tillerfb189f82015-02-03 12:07:07 -0800484}
485
486static void get_final_details(grpc_call *call, grpc_ioreq_data out) {
487 int i;
488 for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
489 if (call->status[i].is_set) {
Craig Tiller1e0d4c42015-01-30 16:17:29 -0800490 if (call->status[i].details) {
491 gpr_slice details = call->status[i].details->slice;
492 size_t len = GPR_SLICE_LENGTH(details);
Craig Tillerfb189f82015-02-03 12:07:07 -0800493 if (len + 1 > *out.recv_status_details.details_capacity) {
494 *out.recv_status_details.details_capacity = GPR_MAX(
495 len + 1, *out.recv_status_details.details_capacity * 3 / 2);
496 *out.recv_status_details.details =
497 gpr_realloc(*out.recv_status_details.details,
498 *out.recv_status_details.details_capacity);
Craig Tiller1e0d4c42015-01-30 16:17:29 -0800499 }
Craig Tillerfb189f82015-02-03 12:07:07 -0800500 memcpy(*out.recv_status_details.details, GPR_SLICE_START_PTR(details),
501 len);
502 (*out.recv_status_details.details)[len] = 0;
Craig Tiller1e0d4c42015-01-30 16:17:29 -0800503 } else {
504 goto no_details;
505 }
Craig Tiller68752722015-01-29 14:59:54 -0800506 return;
507 }
508 }
Craig Tiller1e0d4c42015-01-30 16:17:29 -0800509
510no_details:
Craig Tillerfb189f82015-02-03 12:07:07 -0800511 if (0 == *out.recv_status_details.details_capacity) {
512 *out.recv_status_details.details_capacity = 8;
513 *out.recv_status_details.details =
514 gpr_malloc(*out.recv_status_details.details_capacity);
Craig Tiller1e0d4c42015-01-30 16:17:29 -0800515 }
Craig Tillerfb189f82015-02-03 12:07:07 -0800516 **out.recv_status_details.details = 0;
Craig Tiller68752722015-01-29 14:59:54 -0800517}
518
Craig Tillerc12fee62015-02-03 11:55:50 -0800519static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op,
520 grpc_op_error status) {
521 completed_request *cr;
522 gpr_uint8 master_set = call->request_set[op];
523 reqinfo_master *master;
524 size_t i;
525 /* ioreq is live: we need to do something */
526 master = &call->masters[master_set];
Craig Tillerd642dcf2015-02-03 20:39:09 -0800527 master->complete_mask |= 1u << op;
Craig Tillerc12fee62015-02-03 11:55:50 -0800528 if (status != GRPC_OP_OK) {
529 master->status = status;
Craig Tillerc12fee62015-02-03 11:55:50 -0800530 }
531 if (master->complete_mask == master->need_mask) {
532 for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
533 if (call->request_set[i] != master_set) {
534 continue;
535 }
536 call->request_set[i] = REQSET_DONE;
537 switch ((grpc_ioreq_op)i) {
538 case GRPC_IOREQ_RECV_MESSAGE:
539 case GRPC_IOREQ_SEND_MESSAGE:
540 if (master->status == GRPC_OP_OK) {
541 call->request_set[i] = REQSET_EMPTY;
542 } else {
543 call->write_state = WRITE_STATE_WRITE_CLOSED;
544 }
545 break;
546 case GRPC_IOREQ_RECV_CLOSE:
547 case GRPC_IOREQ_SEND_INITIAL_METADATA:
548 case GRPC_IOREQ_SEND_TRAILING_METADATA:
549 case GRPC_IOREQ_SEND_STATUS:
550 case GRPC_IOREQ_SEND_CLOSE:
551 break;
552 case GRPC_IOREQ_RECV_STATUS:
Craig Tillerfb189f82015-02-03 12:07:07 -0800553 get_final_status(call, call->request_data[GRPC_IOREQ_RECV_STATUS]);
554 break;
555 case GRPC_IOREQ_RECV_STATUS_DETAILS:
556 get_final_details(call,
557 call->request_data[GRPC_IOREQ_RECV_STATUS_DETAILS]);
Craig Tillerc12fee62015-02-03 11:55:50 -0800558 break;
559 case GRPC_IOREQ_RECV_INITIAL_METADATA:
Craig Tillerbae41c82015-04-28 13:22:25 -0700560 GPR_SWAP(grpc_metadata_array, call->buffered_metadata[0],
Craig Tillerc12fee62015-02-03 11:55:50 -0800561 *call->request_data[GRPC_IOREQ_RECV_INITIAL_METADATA]
562 .recv_metadata);
563 break;
564 case GRPC_IOREQ_RECV_TRAILING_METADATA:
Craig Tillerbae41c82015-04-28 13:22:25 -0700565 GPR_SWAP(grpc_metadata_array, call->buffered_metadata[1],
Craig Tillerc12fee62015-02-03 11:55:50 -0800566 *call->request_data[GRPC_IOREQ_RECV_TRAILING_METADATA]
567 .recv_metadata);
568 break;
569 case GRPC_IOREQ_OP_COUNT:
570 abort();
571 break;
572 }
573 }
574 cr = &call->completed_requests[call->num_completed_requests++];
575 cr->status = master->status;
576 cr->on_complete = master->on_complete;
577 cr->user_data = master->user_data;
578 }
579}
580
Craig Tillercce17ac2015-01-20 09:29:28 -0800581static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op,
582 grpc_op_error status) {
Craig Tillerc12fee62015-02-03 11:55:50 -0800583 if (is_op_live(call, op)) {
584 finish_live_ioreq_op(call, op, status);
Craig Tillercce17ac2015-01-20 09:29:28 -0800585 }
586}
587
Craig Tiller58ce3f02015-04-22 07:54:24 -0700588static void call_on_done_send(void *pc, int success) {
589 grpc_call *call = pc;
590 grpc_op_error error = success ? GRPC_OP_OK : GRPC_OP_ERROR;
Craig Tiller1e0d4c42015-01-30 16:17:29 -0800591 lock(call);
Craig Tiller58ce3f02015-04-22 07:54:24 -0700592 if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_INITIAL_METADATA)) {
593 finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, error);
594 }
595 if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_MESSAGE)) {
596 finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, error);
597 }
598 if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_CLOSE)) {
Craig Tiller6e84aba2015-04-23 15:08:17 -0700599 finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, error);
600 finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, error);
Craig Tiller48b9fde2015-04-24 08:04:59 -0700601 finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, GRPC_OP_OK);
Craig Tiller58ce3f02015-04-22 07:54:24 -0700602 }
Craig Tiller3928c7a2015-04-23 16:00:47 -0700603 call->last_send_contains = 0;
Craig Tiller1e0d4c42015-01-30 16:17:29 -0800604 call->sending = 0;
605 unlock(call);
Craig Tiller4df412b2015-04-28 07:57:54 -0700606 GRPC_CALL_INTERNAL_UNREF(call, "sending", 0);
Craig Tiller1e0d4c42015-01-30 16:17:29 -0800607}
608
Craig Tiller629b0ed2015-04-22 11:14:26 -0700609static void finish_message(grpc_call *call) {
610 /* TODO(ctiller): this could be a lot faster if coded directly */
611 grpc_byte_buffer *byte_buffer = grpc_byte_buffer_create(
612 call->incoming_message.slices, call->incoming_message.count);
613 gpr_slice_buffer_reset_and_unref(&call->incoming_message);
614
615 grpc_bbq_push(&call->incoming_queue, byte_buffer);
616
617 GPR_ASSERT(call->incoming_message.count == 0);
618 call->reading_message = 0;
619}
620
621static int begin_message(grpc_call *call, grpc_begin_message msg) {
622 /* can't begin a message when we're still reading a message */
623 if (call->reading_message) {
624 char *message = NULL;
625 gpr_asprintf(
626 &message, "Message terminated early; read %d bytes, expected %d",
627 (int)call->incoming_message.length, (int)call->incoming_message_length);
Yang Gao172791a2015-05-04 16:33:53 -0700628 cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message, 1);
Craig Tiller629b0ed2015-04-22 11:14:26 -0700629 gpr_free(message);
630 return 0;
631 }
632 /* stash away parameters, and prepare for incoming slices */
633 if (msg.length > grpc_channel_get_max_message_length(call->channel)) {
634 char *message = NULL;
635 gpr_asprintf(
636 &message,
637 "Maximum message length of %d exceeded by a message of length %d",
638 grpc_channel_get_max_message_length(call->channel), msg.length);
Yang Gao172791a2015-05-04 16:33:53 -0700639 cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message, 1);
Craig Tiller629b0ed2015-04-22 11:14:26 -0700640 gpr_free(message);
641 return 0;
642 } else if (msg.length > 0) {
643 call->reading_message = 1;
644 call->incoming_message_length = msg.length;
645 return 1;
646 } else {
647 finish_message(call);
648 return 1;
649 }
650}
651
652static int add_slice_to_message(grpc_call *call, gpr_slice slice) {
653 if (GPR_SLICE_LENGTH(slice) == 0) {
654 gpr_slice_unref(slice);
655 return 1;
656 }
657 /* we have to be reading a message to know what to do here */
658 if (!call->reading_message) {
Yang Gao172791a2015-05-04 16:33:53 -0700659 cancel_with_status(
Craig Tiller629b0ed2015-04-22 11:14:26 -0700660 call, GRPC_STATUS_INVALID_ARGUMENT,
Yang Gaoff30f8e2015-05-04 00:13:39 -0700661 "Received payload data while not reading a message", 1);
Craig Tiller629b0ed2015-04-22 11:14:26 -0700662 return 0;
663 }
664 /* append the slice to the incoming buffer */
665 gpr_slice_buffer_add(&call->incoming_message, slice);
666 if (call->incoming_message.length > call->incoming_message_length) {
667 /* if we got too many bytes, complain */
668 char *message = NULL;
669 gpr_asprintf(
670 &message, "Receiving message overflow; read %d bytes, expected %d",
671 (int)call->incoming_message.length, (int)call->incoming_message_length);
Yang Gao172791a2015-05-04 16:33:53 -0700672 cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message, 1);
Craig Tiller629b0ed2015-04-22 11:14:26 -0700673 gpr_free(message);
674 return 0;
675 } else if (call->incoming_message.length == call->incoming_message_length) {
676 finish_message(call);
677 return 1;
678 } else {
679 return 1;
680 }
681}
682
683static void call_on_done_recv(void *pc, int success) {
684 grpc_call *call = pc;
685 size_t i;
David Garcia Quintasf667f1b2015-05-04 13:15:46 -0700686 GRPC_TIMER_BEGIN(GRPC_PTAG_CALL_ON_DONE_RECV, 0);
Craig Tiller629b0ed2015-04-22 11:14:26 -0700687 lock(call);
Craig Tiller6e84aba2015-04-23 15:08:17 -0700688 call->receiving = 0;
Craig Tiller48b9fde2015-04-24 08:04:59 -0700689 if (success) {
690 for (i = 0; success && i < call->recv_ops.nops; i++) {
691 grpc_stream_op *op = &call->recv_ops.ops[i];
692 switch (op->type) {
693 case GRPC_NO_OP:
694 break;
695 case GRPC_OP_METADATA:
696 recv_metadata(call, &op->data.metadata);
697 break;
698 case GRPC_OP_BEGIN_MESSAGE:
699 success = begin_message(call, op->data.begin_message);
700 break;
701 case GRPC_OP_SLICE:
702 success = add_slice_to_message(call, op->data.slice);
703 break;
704 }
Craig Tiller629b0ed2015-04-22 11:14:26 -0700705 }
Craig Tiller48b9fde2015-04-24 08:04:59 -0700706 if (call->recv_state == GRPC_STREAM_RECV_CLOSED) {
707 GPR_ASSERT(call->read_state <= READ_STATE_READ_CLOSED);
708 call->read_state = READ_STATE_READ_CLOSED;
709 }
710 if (call->recv_state == GRPC_STREAM_CLOSED) {
711 GPR_ASSERT(call->read_state <= READ_STATE_STREAM_CLOSED);
712 call->read_state = READ_STATE_STREAM_CLOSED;
Craig Tiller67bfefd2015-04-30 09:32:37 -0700713 if (call->have_alarm) {
714 grpc_alarm_cancel(&call->alarm);
715 call->have_alarm = 0;
716 }
Craig Tiller48b9fde2015-04-24 08:04:59 -0700717 }
718 finish_read_ops(call);
719 } else {
720 finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_ERROR);
721 finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_ERROR);
722 finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_ERROR);
723 finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_ERROR);
724 finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_ERROR);
725 finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS_DETAILS, GRPC_OP_ERROR);
Craig Tiller629b0ed2015-04-22 11:14:26 -0700726 }
Craig Tillerc1f75602015-04-24 11:44:53 -0700727 call->recv_ops.nops = 0;
Craig Tiller629b0ed2015-04-22 11:14:26 -0700728 unlock(call);
729
Craig Tiller4df412b2015-04-28 07:57:54 -0700730 GRPC_CALL_INTERNAL_UNREF(call, "receiving", 0);
David Garcia Quintasf667f1b2015-05-04 13:15:46 -0700731 GRPC_TIMER_BEGIN(GRPC_PTAG_CALL_ON_DONE_RECV, 0);
Craig Tiller629b0ed2015-04-22 11:14:26 -0700732}
733
Craig Tiller83f88d92015-04-21 16:02:05 -0700734static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count,
735 grpc_metadata *metadata) {
736 size_t i;
737 grpc_mdelem_list out;
738 if (count == 0) {
739 out.head = out.tail = NULL;
740 return out;
741 }
742 for (i = 0; i < count; i++) {
743 grpc_metadata *md = &metadata[i];
744 grpc_metadata *next_md = (i == count - 1) ? NULL : &metadata[i + 1];
745 grpc_metadata *prev_md = (i == 0) ? NULL : &metadata[i - 1];
746 grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data;
Craig Tillerd2b11fa2015-04-21 13:45:19 -0700747 GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data));
Craig Tiller83f88d92015-04-21 16:02:05 -0700748 l->md = grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key,
749 (const gpr_uint8 *)md->value,
750 md->value_length);
751 l->next = next_md ? (grpc_linked_mdelem *)&next_md->internal_data : NULL;
752 l->prev = prev_md ? (grpc_linked_mdelem *)&prev_md->internal_data : NULL;
753 }
754 out.head = (grpc_linked_mdelem *)&(metadata[0].internal_data);
755 out.tail = (grpc_linked_mdelem *)&(metadata[count - 1].internal_data);
756 return out;
757}
758
Craig Tiller629b0ed2015-04-22 11:14:26 -0700759/* Copy the contents of a byte buffer into stream ops */
760static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer,
761 grpc_stream_op_buffer *sopb) {
762 size_t i;
763
764 switch (byte_buffer->type) {
765 case GRPC_BB_SLICE_BUFFER:
766 for (i = 0; i < byte_buffer->data.slice_buffer.count; i++) {
767 gpr_slice slice = byte_buffer->data.slice_buffer.slices[i];
768 gpr_slice_ref(slice);
769 grpc_sopb_add_slice(sopb, slice);
770 }
771 break;
772 }
773}
774
Craig Tiller83f88d92015-04-21 16:02:05 -0700775static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
776 grpc_ioreq_data data;
777 grpc_metadata_batch mdb;
778 size_t i;
Craig Tiller58ce3f02015-04-22 07:54:24 -0700779 char status_str[GPR_LTOA_MIN_BUFSIZE];
Craig Tiller83f88d92015-04-21 16:02:05 -0700780 GPR_ASSERT(op->send_ops == NULL);
781
782 switch (call->write_state) {
783 case WRITE_STATE_INITIAL:
784 if (!is_op_live(call, GRPC_IOREQ_SEND_INITIAL_METADATA)) {
785 break;
786 }
787 data = call->request_data[GRPC_IOREQ_SEND_INITIAL_METADATA];
Craig Tiller629b0ed2015-04-22 11:14:26 -0700788 mdb.list = chain_metadata_from_app(call, data.send_metadata.count,
789 data.send_metadata.metadata);
Craig Tiller83f88d92015-04-21 16:02:05 -0700790 mdb.garbage.head = mdb.garbage.tail = NULL;
791 mdb.deadline = call->send_deadline;
792 for (i = 0; i < call->send_initial_metadata_count; i++) {
Craig Tiller629b0ed2015-04-22 11:14:26 -0700793 grpc_metadata_batch_link_head(&mdb, &call->send_initial_metadata[i]);
Craig Tiller83f88d92015-04-21 16:02:05 -0700794 }
795 grpc_sopb_add_metadata(&call->send_ops, mdb);
796 op->send_ops = &call->send_ops;
797 op->bind_pollset = grpc_cq_pollset(call->cq);
Craig Tiller58ce3f02015-04-22 07:54:24 -0700798 call->last_send_contains |= 1 << GRPC_IOREQ_SEND_INITIAL_METADATA;
799 call->write_state = WRITE_STATE_STARTED;
Craig Tillerc1f75602015-04-24 11:44:53 -0700800 call->send_initial_metadata_count = 0;
Craig Tiller629b0ed2015-04-22 11:14:26 -0700801 /* fall through intended */
Craig Tiller83f88d92015-04-21 16:02:05 -0700802 case WRITE_STATE_STARTED:
803 if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE)) {
804 data = call->request_data[GRPC_IOREQ_SEND_MESSAGE];
Craig Tiller629b0ed2015-04-22 11:14:26 -0700805 grpc_sopb_add_begin_message(
806 &call->send_ops, grpc_byte_buffer_length(data.send_message), 0);
807 copy_byte_buffer_to_stream_ops(data.send_message, &call->send_ops);
Craig Tiller58ce3f02015-04-22 07:54:24 -0700808 op->send_ops = &call->send_ops;
809 call->last_send_contains |= 1 << GRPC_IOREQ_SEND_MESSAGE;
810 }
811 if (is_op_live(call, GRPC_IOREQ_SEND_CLOSE)) {
812 op->is_last_send = 1;
813 op->send_ops = &call->send_ops;
814 call->last_send_contains |= 1 << GRPC_IOREQ_SEND_CLOSE;
815 call->write_state = WRITE_STATE_WRITE_CLOSED;
816 if (!call->is_client) {
817 /* send trailing metadata */
818 data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA];
Craig Tiller629b0ed2015-04-22 11:14:26 -0700819 mdb.list = chain_metadata_from_app(call, data.send_metadata.count,
820 data.send_metadata.metadata);
Craig Tiller58ce3f02015-04-22 07:54:24 -0700821 mdb.garbage.head = mdb.garbage.tail = NULL;
Craig Tillerd393f102015-04-23 16:11:51 -0700822 mdb.deadline = gpr_inf_future;
Craig Tiller58ce3f02015-04-22 07:54:24 -0700823 /* send status */
824 /* TODO(ctiller): cache common status values */
825 data = call->request_data[GRPC_IOREQ_SEND_STATUS];
826 gpr_ltoa(data.send_status.code, status_str);
827 grpc_metadata_batch_add_tail(
828 &mdb, &call->status_link,
829 grpc_mdelem_from_metadata_strings(
830 call->metadata_context,
831 grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)),
832 grpc_mdstr_from_string(call->metadata_context, status_str)));
833 if (data.send_status.details) {
834 grpc_metadata_batch_add_tail(
835 &mdb, &call->details_link,
836 grpc_mdelem_from_metadata_strings(
837 call->metadata_context,
Craig Tiller629b0ed2015-04-22 11:14:26 -0700838 grpc_mdstr_ref(
839 grpc_channel_get_message_string(call->channel)),
Craig Tiller58ce3f02015-04-22 07:54:24 -0700840 grpc_mdstr_from_string(call->metadata_context,
841 data.send_status.details)));
842 }
Craig Tiller7e8489a2015-04-23 12:41:16 -0700843 grpc_sopb_add_metadata(&call->send_ops, mdb);
Craig Tillerde648622015-02-05 09:32:10 -0800844 }
Craig Tillerc12fee62015-02-03 11:55:50 -0800845 }
Craig Tiller58ce3f02015-04-22 07:54:24 -0700846 break;
Craig Tillerc12fee62015-02-03 11:55:50 -0800847 case WRITE_STATE_WRITE_CLOSED:
Craig Tiller8eb9d472015-01-27 17:00:03 -0800848 break;
Craig Tiller62ac1552015-01-27 15:41:44 -0800849 }
Craig Tiller58ce3f02015-04-22 07:54:24 -0700850 if (op->send_ops) {
851 op->on_done_send = call_on_done_send;
852 op->send_user_data = call;
853 }
854 return op->send_ops != NULL;
Craig Tillercce17ac2015-01-20 09:29:28 -0800855}
856
857static grpc_call_error start_ioreq_error(grpc_call *call,
858 gpr_uint32 mutated_ops,
859 grpc_call_error ret) {
860 size_t i;
861 for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
Craig Tillerd642dcf2015-02-03 20:39:09 -0800862 if (mutated_ops & (1u << i)) {
Craig Tillerc12fee62015-02-03 11:55:50 -0800863 call->request_set[i] = REQSET_EMPTY;
Craig Tillercce17ac2015-01-20 09:29:28 -0800864 }
865 }
Craig Tillercce17ac2015-01-20 09:29:28 -0800866 return ret;
867}
868
Craig Tillerc12fee62015-02-03 11:55:50 -0800869static void finish_read_ops(grpc_call *call) {
870 int empty;
871
872 if (is_op_live(call, GRPC_IOREQ_RECV_MESSAGE)) {
873 empty =
874 (NULL == (*call->request_data[GRPC_IOREQ_RECV_MESSAGE].recv_message =
875 grpc_bbq_pop(&call->incoming_queue)));
876 if (!empty) {
877 finish_live_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
878 empty = grpc_bbq_empty(&call->incoming_queue);
879 }
880 } else {
881 empty = grpc_bbq_empty(&call->incoming_queue);
882 }
883
884 switch (call->read_state) {
885 case READ_STATE_STREAM_CLOSED:
886 if (empty) {
887 finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK);
888 }
889 /* fallthrough */
890 case READ_STATE_READ_CLOSED:
891 if (empty) {
892 finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
893 }
894 finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_OK);
Craig Tillerfb189f82015-02-03 12:07:07 -0800895 finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS_DETAILS, GRPC_OP_OK);
Craig Tillerc12fee62015-02-03 11:55:50 -0800896 finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK);
897 /* fallthrough */
898 case READ_STATE_GOT_INITIAL_METADATA:
899 finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK);
900 /* fallthrough */
901 case READ_STATE_INITIAL:
902 /* do nothing */
903 break;
904 }
905}
906
907static void early_out_write_ops(grpc_call *call) {
908 switch (call->write_state) {
909 case WRITE_STATE_WRITE_CLOSED:
910 finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, GRPC_OP_ERROR);
911 finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, GRPC_OP_ERROR);
912 finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, GRPC_OP_ERROR);
913 finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, GRPC_OP_OK);
914 /* fallthrough */
915 case WRITE_STATE_STARTED:
916 finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, GRPC_OP_ERROR);
917 /* fallthrough */
918 case WRITE_STATE_INITIAL:
919 /* do nothing */
920 break;
921 }
922}
923
Craig Tiller8eb9d472015-01-27 17:00:03 -0800924static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
925 size_t nreqs,
926 grpc_ioreq_completion_func completion,
Craig Tiller9cc61412015-02-02 14:02:52 -0800927 void *user_data) {
Craig Tillercce17ac2015-01-20 09:29:28 -0800928 size_t i;
929 gpr_uint32 have_ops = 0;
Craig Tillercce17ac2015-01-20 09:29:28 -0800930 grpc_ioreq_op op;
Craig Tillerc18c56e2015-02-02 15:59:13 -0800931 reqinfo_master *master;
Craig Tillercce17ac2015-01-20 09:29:28 -0800932 grpc_ioreq_data data;
Craig Tiller1e0d4c42015-01-30 16:17:29 -0800933 gpr_uint8 set;
934
935 if (nreqs == 0) {
936 return GRPC_CALL_OK;
937 }
938
939 set = reqs[0].op;
Craig Tillercce17ac2015-01-20 09:29:28 -0800940
941 for (i = 0; i < nreqs; i++) {
942 op = reqs[i].op;
Craig Tillerc12fee62015-02-03 11:55:50 -0800943 if (call->request_set[op] < GRPC_IOREQ_OP_COUNT) {
Craig Tillercce17ac2015-01-20 09:29:28 -0800944 return start_ioreq_error(call, have_ops,
945 GRPC_CALL_ERROR_TOO_MANY_OPERATIONS);
Craig Tillerc12fee62015-02-03 11:55:50 -0800946 } else if (call->request_set[op] == REQSET_DONE) {
Craig Tiller1c141902015-01-31 08:51:54 -0800947 return start_ioreq_error(call, have_ops, GRPC_CALL_ERROR_ALREADY_INVOKED);
Craig Tillercce17ac2015-01-20 09:29:28 -0800948 }
Craig Tillerd642dcf2015-02-03 20:39:09 -0800949 have_ops |= 1u << op;
Craig Tillercce17ac2015-01-20 09:29:28 -0800950 data = reqs[i].data;
951
Craig Tillerc12fee62015-02-03 11:55:50 -0800952 call->request_data[op] = data;
953 call->request_set[op] = set;
Craig Tillercce17ac2015-01-20 09:29:28 -0800954 }
955
Craig Tillerc18c56e2015-02-02 15:59:13 -0800956 master = &call->masters[set];
957 master->status = GRPC_OP_OK;
Craig Tillercce17ac2015-01-20 09:29:28 -0800958 master->need_mask = have_ops;
Craig Tiller1b409442015-01-29 14:36:17 -0800959 master->complete_mask = 0;
Craig Tillercce17ac2015-01-20 09:29:28 -0800960 master->on_complete = completion;
961 master->user_data = user_data;
962
Craig Tillerc12fee62015-02-03 11:55:50 -0800963 finish_read_ops(call);
964 early_out_write_ops(call);
965
Craig Tillercce17ac2015-01-20 09:29:28 -0800966 return GRPC_CALL_OK;
967}
968
Craig Tillercce17ac2015-01-20 09:29:28 -0800969grpc_call_error grpc_call_start_ioreq_and_call_back(
970 grpc_call *call, const grpc_ioreq *reqs, size_t nreqs,
971 grpc_ioreq_completion_func on_complete, void *user_data) {
Craig Tiller8eb9d472015-01-27 17:00:03 -0800972 grpc_call_error err;
973 lock(call);
Craig Tiller9cc61412015-02-02 14:02:52 -0800974 err = start_ioreq(call, reqs, nreqs, on_complete, user_data);
Craig Tiller8eb9d472015-01-27 17:00:03 -0800975 unlock(call);
976 return err;
Craig Tillercce17ac2015-01-20 09:29:28 -0800977}
978
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800979void grpc_call_destroy(grpc_call *c) {
ctillerc6d61c42014-12-15 14:52:08 -0800980 int cancel;
Craig Tiller9724de82015-01-28 17:06:29 -0800981 lock(c);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800982 if (c->have_alarm) {
ctiller18b49ab2014-12-09 14:39:16 -0800983 grpc_alarm_cancel(&c->alarm);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800984 c->have_alarm = 0;
985 }
Craig Tillerdaceea82015-02-02 16:15:53 -0800986 cancel = c->read_state != READ_STATE_STREAM_CLOSED;
Craig Tiller9724de82015-01-28 17:06:29 -0800987 unlock(c);
ctillerc6d61c42014-12-15 14:52:08 -0800988 if (cancel) grpc_call_cancel(c);
Craig Tiller4df412b2015-04-28 07:57:54 -0700989 GRPC_CALL_INTERNAL_UNREF(c, "destroy", 1);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800990}
991
Craig Tiller58ce3f02015-04-22 07:54:24 -0700992grpc_call_error grpc_call_cancel(grpc_call *call) {
Craig Tiller48b9fde2015-04-24 08:04:59 -0700993 return grpc_call_cancel_with_status(call, GRPC_STATUS_CANCELLED, "Cancelled");
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800994}
995
Craig Tiller6046dc32015-01-14 12:55:45 -0800996grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
997 grpc_status_code status,
998 const char *description) {
Yang Gao172791a2015-05-04 16:33:53 -0700999 return cancel_with_status(c, status, description, 0);
Yang Gaoff30f8e2015-05-04 00:13:39 -07001000}
1001
Yang Gao172791a2015-05-04 16:33:53 -07001002static grpc_call_error cancel_with_status(
Yang Gaoc71a9d22015-05-04 00:22:12 -07001003 grpc_call *c, grpc_status_code status, const char *description,
1004 gpr_uint8 locked) {
Craig Tiller48b9fde2015-04-24 08:04:59 -07001005 grpc_transport_op op;
Craig Tiller6046dc32015-01-14 12:55:45 -08001006 grpc_mdstr *details =
1007 description ? grpc_mdstr_from_string(c->metadata_context, description)
1008 : NULL;
Craig Tiller48b9fde2015-04-24 08:04:59 -07001009 memset(&op, 0, sizeof(op));
1010 op.cancel_with_status = status;
1011
Yang Gaoff30f8e2015-05-04 00:13:39 -07001012 if (locked == 0) {
1013 lock(c);
1014 }
Craig Tiller68752722015-01-29 14:59:54 -08001015 set_status_code(c, STATUS_FROM_API_OVERRIDE, status);
1016 set_status_details(c, STATUS_FROM_API_OVERRIDE, details);
Yang Gaoff30f8e2015-05-04 00:13:39 -07001017 if (locked == 0) {
1018 unlock(c);
1019 }
Craig Tiller48b9fde2015-04-24 08:04:59 -07001020
1021 execute_op(c, &op);
1022
1023 return GRPC_CALL_OK;
Craig Tillerd248c242015-01-14 11:49:12 -08001024}
1025
Craig Tiller58ce3f02015-04-22 07:54:24 -07001026static void execute_op(grpc_call *call, grpc_transport_op *op) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001027 grpc_call_element *elem;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001028 elem = CALL_ELEM_FROM_CALL(call, 0);
Craig Tiller935cf422015-05-01 14:10:46 -07001029 op->context = call->context;
Craig Tiller58ce3f02015-04-22 07:54:24 -07001030 elem->filter->start_transport_op(elem, op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001031}
1032
Craig Tiller566316f2015-02-02 15:25:32 -08001033grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
1034 return CALL_FROM_TOP_ELEM(elem);
1035}
1036
1037static void call_alarm(void *arg, int success) {
1038 grpc_call *call = arg;
1039 if (success) {
1040 if (call->is_client) {
Yang Gao172791a2015-05-04 16:33:53 -07001041 cancel_with_status(call, GRPC_STATUS_DEADLINE_EXCEEDED,
1042 "Deadline Exceeded", 0);
Craig Tiller566316f2015-02-02 15:25:32 -08001043 } else {
1044 grpc_call_cancel(call);
1045 }
1046 }
Craig Tiller4df412b2015-04-28 07:57:54 -07001047 GRPC_CALL_INTERNAL_UNREF(call, "alarm", 1);
Craig Tiller566316f2015-02-02 15:25:32 -08001048}
1049
Craig Tiller6902ad22015-04-16 08:01:49 -07001050static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) {
Craig Tiller566316f2015-02-02 15:25:32 -08001051 if (call->have_alarm) {
1052 gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice");
Craig Tillerfa4f9942015-04-23 15:22:09 -07001053 assert(0);
Craig Tiller7e8489a2015-04-23 12:41:16 -07001054 return;
Craig Tiller566316f2015-02-02 15:25:32 -08001055 }
Craig Tiller4df412b2015-04-28 07:57:54 -07001056 GRPC_CALL_INTERNAL_REF(call, "alarm");
Craig Tiller566316f2015-02-02 15:25:32 -08001057 call->have_alarm = 1;
1058 grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now());
1059}
1060
Craig Tiller566316f2015-02-02 15:25:32 -08001061/* we offset status by a small amount when storing it into transport metadata
1062 as metadata cannot store a 0 value (which is used as OK for grpc_status_codes
1063 */
1064#define STATUS_OFFSET 1
1065static void destroy_status(void *ignored) {}
1066
1067static gpr_uint32 decode_status(grpc_mdelem *md) {
1068 gpr_uint32 status;
1069 void *user_data = grpc_mdelem_get_user_data(md, destroy_status);
1070 if (user_data) {
Craig Tiller87d5b192015-04-16 14:37:57 -07001071 status = ((gpr_uint32)(gpr_intptr)user_data) - STATUS_OFFSET;
Craig Tiller566316f2015-02-02 15:25:32 -08001072 } else {
1073 if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value),
1074 GPR_SLICE_LENGTH(md->value->slice),
1075 &status)) {
1076 status = GRPC_STATUS_UNKNOWN; /* could not parse status code */
1077 }
1078 grpc_mdelem_set_user_data(md, destroy_status,
1079 (void *)(gpr_intptr)(status + STATUS_OFFSET));
1080 }
1081 return status;
1082}
1083
Craig Tiller629b0ed2015-04-22 11:14:26 -07001084static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
Craig Tiller6902ad22015-04-16 08:01:49 -07001085 grpc_linked_mdelem *l;
Craig Tiller566316f2015-02-02 15:25:32 -08001086 grpc_metadata_array *dest;
1087 grpc_metadata *mdusr;
Craig Tiller6902ad22015-04-16 08:01:49 -07001088 int is_trailing;
Craig Tiller8b282cb2015-04-17 14:57:44 -07001089 grpc_mdctx *mdctx = call->metadata_context;
Craig Tiller566316f2015-02-02 15:25:32 -08001090
Craig Tiller6902ad22015-04-16 08:01:49 -07001091 is_trailing = call->read_state >= READ_STATE_GOT_INITIAL_METADATA;
Craig Tiller48b02ec2015-04-21 13:58:36 -07001092 for (l = md->list.head; l != NULL; l = l->next) {
Craig Tiller6902ad22015-04-16 08:01:49 -07001093 grpc_mdelem *md = l->md;
1094 grpc_mdstr *key = md->key;
1095 if (key == grpc_channel_get_status_string(call->channel)) {
1096 set_status_code(call, STATUS_FROM_WIRE, decode_status(md));
Craig Tiller6902ad22015-04-16 08:01:49 -07001097 } else if (key == grpc_channel_get_message_string(call->channel)) {
1098 set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value));
Craig Tiller6902ad22015-04-16 08:01:49 -07001099 } else {
1100 dest = &call->buffered_metadata[is_trailing];
1101 if (dest->count == dest->capacity) {
1102 dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2);
1103 dest->metadata =
1104 gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity);
1105 }
1106 mdusr = &dest->metadata[dest->count++];
1107 mdusr->key = grpc_mdstr_as_c_string(md->key);
1108 mdusr->value = grpc_mdstr_as_c_string(md->value);
1109 mdusr->value_length = GPR_SLICE_LENGTH(md->value->slice);
1110 if (call->owned_metadata_count == call->owned_metadata_capacity) {
Craig Tiller87d5b192015-04-16 14:37:57 -07001111 call->owned_metadata_capacity =
1112 GPR_MAX(call->owned_metadata_capacity + 8,
1113 call->owned_metadata_capacity * 2);
Craig Tiller6902ad22015-04-16 08:01:49 -07001114 call->owned_metadata =
1115 gpr_realloc(call->owned_metadata,
1116 sizeof(grpc_mdelem *) * call->owned_metadata_capacity);
1117 }
1118 call->owned_metadata[call->owned_metadata_count++] = md;
Craig Tiller8b282cb2015-04-17 14:57:44 -07001119 l->md = 0;
Craig Tiller566316f2015-02-02 15:25:32 -08001120 }
Craig Tiller6902ad22015-04-16 08:01:49 -07001121 }
1122 if (gpr_time_cmp(md->deadline, gpr_inf_future) != 0) {
1123 set_deadline_alarm(call, md->deadline);
1124 }
1125 if (!is_trailing) {
Craig Tiller629b0ed2015-04-22 11:14:26 -07001126 call->read_state = READ_STATE_GOT_INITIAL_METADATA;
Craig Tiller566316f2015-02-02 15:25:32 -08001127 }
Craig Tiller6902ad22015-04-16 08:01:49 -07001128
Craig Tiller8b282cb2015-04-17 14:57:44 -07001129 grpc_mdctx_lock(mdctx);
1130 for (l = md->list.head; l; l = l->next) {
1131 if (l->md) grpc_mdctx_locked_mdelem_unref(mdctx, l->md);
1132 }
1133 for (l = md->garbage.head; l; l = l->next) {
1134 grpc_mdctx_locked_mdelem_unref(mdctx, l->md);
1135 }
1136 grpc_mdctx_unlock(mdctx);
Craig Tiller629b0ed2015-04-22 11:14:26 -07001137}
Craig Tiller8b282cb2015-04-17 14:57:44 -07001138
Craig Tiller566316f2015-02-02 15:25:32 -08001139grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) {
1140 return CALL_STACK_FROM_CALL(call);
1141}
1142
1143/*
Craig Tillerfb189f82015-02-03 12:07:07 -08001144 * BATCH API IMPLEMENTATION
1145 */
1146
1147static void set_status_value_directly(grpc_status_code status, void *dest) {
1148 *(grpc_status_code *)dest = status;
1149}
1150
1151static void set_cancelled_value(grpc_status_code status, void *dest) {
1152 *(grpc_status_code *)dest = (status != GRPC_STATUS_OK);
1153}
1154
Craig Tiller166e2502015-02-03 20:14:41 -08001155static void finish_batch(grpc_call *call, grpc_op_error result, void *tag) {
Craig Tiller900e4512015-04-29 14:17:10 -07001156 grpc_cq_end_op(call->cq, tag, call, do_nothing, NULL, GRPC_OP_OK);
Craig Tiller166e2502015-02-03 20:14:41 -08001157}
Craig Tillerfb189f82015-02-03 12:07:07 -08001158
1159grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
1160 size_t nops, void *tag) {
1161 grpc_ioreq reqs[GRPC_IOREQ_OP_COUNT];
1162 size_t in;
1163 size_t out;
1164 const grpc_op *op;
1165 grpc_ioreq *req;
1166
murgatroid99d47946b2015-03-09 14:27:07 -07001167 GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag);
1168
murgatroid99a8c21e82015-02-12 13:55:53 -08001169 if (nops == 0) {
1170 grpc_cq_begin_op(call->cq, call, GRPC_OP_COMPLETE);
Craig Tiller900e4512015-04-29 14:17:10 -07001171 grpc_cq_end_op(call->cq, tag, call, do_nothing, NULL, GRPC_OP_OK);
murgatroid99a8c21e82015-02-12 13:55:53 -08001172 return GRPC_CALL_OK;
1173 }
1174
Craig Tillerfb189f82015-02-03 12:07:07 -08001175 /* rewrite batch ops into ioreq ops */
1176 for (in = 0, out = 0; in < nops; in++) {
1177 op = &ops[in];
1178 switch (op->op) {
1179 case GRPC_OP_SEND_INITIAL_METADATA:
1180 req = &reqs[out++];
1181 req->op = GRPC_IOREQ_SEND_INITIAL_METADATA;
1182 req->data.send_metadata.count = op->data.send_initial_metadata.count;
1183 req->data.send_metadata.metadata =
1184 op->data.send_initial_metadata.metadata;
1185 break;
1186 case GRPC_OP_SEND_MESSAGE:
1187 req = &reqs[out++];
1188 req->op = GRPC_IOREQ_SEND_MESSAGE;
1189 req->data.send_message = op->data.send_message;
1190 break;
1191 case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
1192 if (!call->is_client) {
1193 return GRPC_CALL_ERROR_NOT_ON_SERVER;
1194 }
1195 req = &reqs[out++];
1196 req->op = GRPC_IOREQ_SEND_CLOSE;
1197 break;
1198 case GRPC_OP_SEND_STATUS_FROM_SERVER:
1199 if (call->is_client) {
1200 return GRPC_CALL_ERROR_NOT_ON_CLIENT;
1201 }
1202 req = &reqs[out++];
1203 req->op = GRPC_IOREQ_SEND_TRAILING_METADATA;
1204 req->data.send_metadata.count =
1205 op->data.send_status_from_server.trailing_metadata_count;
1206 req->data.send_metadata.metadata =
1207 op->data.send_status_from_server.trailing_metadata;
1208 req = &reqs[out++];
1209 req->op = GRPC_IOREQ_SEND_STATUS;
1210 req->data.send_status.code = op->data.send_status_from_server.status;
1211 req->data.send_status.details =
1212 op->data.send_status_from_server.status_details;
1213 req = &reqs[out++];
1214 req->op = GRPC_IOREQ_SEND_CLOSE;
1215 break;
1216 case GRPC_OP_RECV_INITIAL_METADATA:
1217 if (!call->is_client) {
1218 return GRPC_CALL_ERROR_NOT_ON_SERVER;
1219 }
1220 req = &reqs[out++];
1221 req->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
1222 req->data.recv_metadata = op->data.recv_initial_metadata;
1223 break;
1224 case GRPC_OP_RECV_MESSAGE:
1225 req = &reqs[out++];
1226 req->op = GRPC_IOREQ_RECV_MESSAGE;
1227 req->data.recv_message = op->data.recv_message;
1228 break;
1229 case GRPC_OP_RECV_STATUS_ON_CLIENT:
1230 if (!call->is_client) {
1231 return GRPC_CALL_ERROR_NOT_ON_SERVER;
1232 }
1233 req = &reqs[out++];
1234 req->op = GRPC_IOREQ_RECV_STATUS;
1235 req->data.recv_status.set_value = set_status_value_directly;
1236 req->data.recv_status.user_data = op->data.recv_status_on_client.status;
1237 req = &reqs[out++];
1238 req->op = GRPC_IOREQ_RECV_STATUS_DETAILS;
1239 req->data.recv_status_details.details =
1240 op->data.recv_status_on_client.status_details;
1241 req->data.recv_status_details.details_capacity =
1242 op->data.recv_status_on_client.status_details_capacity;
1243 req = &reqs[out++];
1244 req->op = GRPC_IOREQ_RECV_TRAILING_METADATA;
1245 req->data.recv_metadata =
1246 op->data.recv_status_on_client.trailing_metadata;
1247 req = &reqs[out++];
1248 req->op = GRPC_IOREQ_RECV_CLOSE;
1249 break;
1250 case GRPC_OP_RECV_CLOSE_ON_SERVER:
1251 req = &reqs[out++];
1252 req->op = GRPC_IOREQ_RECV_STATUS;
1253 req->data.recv_status.set_value = set_cancelled_value;
1254 req->data.recv_status.user_data =
1255 op->data.recv_close_on_server.cancelled;
1256 req = &reqs[out++];
1257 req->op = GRPC_IOREQ_RECV_CLOSE;
1258 break;
1259 }
1260 }
1261
1262 grpc_cq_begin_op(call->cq, call, GRPC_OP_COMPLETE);
1263
1264 return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_batch,
1265 tag);
1266}
Craig Tiller935cf422015-05-01 14:10:46 -07001267
1268void grpc_call_context_set(grpc_call *call, grpc_context_index elem, void *value,
1269 void (*destroy)(void *value)) {
1270 if (call->destroy_context[elem]) {
1271 call->destroy_context[elem](value);
1272 }
1273 call->context[elem] = value;
1274 call->destroy_context[elem] = destroy;
1275}
1276
1277void *grpc_call_context_get(grpc_call *call, grpc_context_index elem) {
1278 return call->context[elem];
1279}