blob: 6ce9c8c0510d5e862eebf2fd162e360af87f0a9e [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
3 * Copyright 2014, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/surface/call.h"
35#include "src/core/channel/channel_stack.h"
36#include "src/core/channel/metadata_buffer.h"
ctiller18b49ab2014-12-09 14:39:16 -080037#include "src/core/iomgr/alarm.h"
Craig Tiller485d7762015-01-23 12:54:05 -080038#include "src/core/support/string.h"
ctiller18b49ab2014-12-09 14:39:16 -080039#include "src/core/surface/channel.h"
40#include "src/core/surface/completion_queue.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080041#include <grpc/support/alloc.h>
42#include <grpc/support/log.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080043
44#include <stdio.h>
45#include <stdlib.h>
46#include <string.h>
47
Craig Tiller8eb9d472015-01-27 17:00:03 -080048#define OP_IN_MASK(op, mask) (((1 << (op)) & (mask)) != 0)
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080049
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080050typedef struct {
Craig Tillercce17ac2015-01-20 09:29:28 -080051 size_t md_out_count;
52 size_t md_out_capacity;
Craig Tiller62ac1552015-01-27 15:41:44 -080053 grpc_metadata *md_out;
Craig Tillercce17ac2015-01-20 09:29:28 -080054 grpc_byte_buffer *msg_out;
55
56 /* input buffers */
57 grpc_metadata_array md_in;
58 grpc_metadata_array trail_md_in;
59 grpc_recv_status status_in;
60 size_t msg_in_read_idx;
61 grpc_byte_buffer_array msg_in;
62
63 void *finished_tag;
64} legacy_state;
65
Craig Tiller8eb9d472015-01-27 17:00:03 -080066typedef enum { REQ_INITIAL = 0, REQ_READY, REQ_DONE } req_state;
67
68typedef enum {
69 SEND_NOTHING,
70 SEND_INITIAL_METADATA,
71 SEND_MESSAGE,
72 SEND_TRAILING_METADATA,
73 SEND_FINISH
74} send_action;
75
76typedef struct {
77 grpc_ioreq_completion_func on_complete;
78 void *user_data;
79 grpc_op_error status;
80} completed_request;
81
Craig Tillercce17ac2015-01-20 09:29:28 -080082typedef struct reqinfo {
Craig Tiller8eb9d472015-01-27 17:00:03 -080083 req_state state;
Craig Tillercce17ac2015-01-20 09:29:28 -080084 grpc_ioreq_data data;
85 struct reqinfo *master;
86 grpc_ioreq_completion_func on_complete;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080087 void *user_data;
Craig Tillercce17ac2015-01-20 09:29:28 -080088 gpr_uint32 need_mask;
89 gpr_uint32 complete_mask;
90} reqinfo;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080091
92struct grpc_call {
93 grpc_completion_queue *cq;
94 grpc_channel *channel;
95 grpc_mdctx *metadata_context;
Craig Tillercce17ac2015-01-20 09:29:28 -080096 /* TODO(ctiller): share with cq if possible? */
97 gpr_mu mu;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080098
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080099 gpr_uint8 is_client;
Craig Tillercce17ac2015-01-20 09:29:28 -0800100 gpr_uint8 got_initial_metadata;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800101 gpr_uint8 have_alarm;
Craig Tillercce17ac2015-01-20 09:29:28 -0800102 gpr_uint8 read_closed;
103 gpr_uint8 stream_closed;
Craig Tiller4de9b7d2015-01-13 17:45:03 -0800104 gpr_uint8 got_status_code;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800105 gpr_uint8 sending;
106 gpr_uint8 num_completed_requests;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800107
Craig Tillercce17ac2015-01-20 09:29:28 -0800108 reqinfo requests[GRPC_IOREQ_OP_COUNT];
Craig Tiller8eb9d472015-01-27 17:00:03 -0800109 completed_request completed_requests[GRPC_IOREQ_OP_COUNT];
Craig Tillercce17ac2015-01-20 09:29:28 -0800110 grpc_byte_buffer_array buffered_messages;
111 grpc_metadata_array buffered_initial_metadata;
112 grpc_metadata_array buffered_trailing_metadata;
113 size_t write_index;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800114
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800115 grpc_status_code status_code;
116 grpc_mdstr *status_details;
117
Craig Tillercce17ac2015-01-20 09:29:28 -0800118 grpc_alarm alarm;
119
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800120 gpr_refcount internal_refcount;
Craig Tillercce17ac2015-01-20 09:29:28 -0800121
122 legacy_state *legacy_state;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800123};
124
Craig Tillerf26370d2015-01-29 10:00:11 -0800125#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call)+1))
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800126#define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1)
127#define CALL_ELEM_FROM_CALL(call, idx) \
128 grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx)
129#define CALL_FROM_TOP_ELEM(top_elem) \
130 CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
131
Craig Tillercce17ac2015-01-20 09:29:28 -0800132#define SWAP(type, x, y) \
133 do { \
134 type temp = x; \
135 x = y; \
136 y = temp; \
137 } while (0)
138
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800139static void do_nothing(void *ignored, grpc_op_error also_ignored) {}
Craig Tiller8eb9d472015-01-27 17:00:03 -0800140static send_action choose_send_action(grpc_call *call);
141static void enact_send_action(grpc_call *call, send_action sa);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800142
143grpc_call *grpc_call_create(grpc_channel *channel,
144 const void *server_transport_data) {
145 grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel);
146 grpc_call *call =
147 gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size);
Craig Tillercce17ac2015-01-20 09:29:28 -0800148 memset(call, 0, sizeof(grpc_call));
149 gpr_mu_init(&call->mu);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800150 call->channel = channel;
Craig Tillercce17ac2015-01-20 09:29:28 -0800151 call->is_client = server_transport_data == NULL;
Craig Tiller23aa6c42015-01-27 17:16:12 -0800152 if (call->is_client) {
153 call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].state = REQ_DONE;
154 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800155 grpc_channel_internal_ref(channel);
156 call->metadata_context = grpc_channel_get_metadata_context(channel);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800157 gpr_ref_init(&call->internal_refcount, 1);
158 grpc_call_stack_init(channel_stack, server_transport_data,
159 CALL_STACK_FROM_CALL(call));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800160 return call;
161}
162
Craig Tillercce17ac2015-01-20 09:29:28 -0800163legacy_state *get_legacy_state(grpc_call *call) {
164 if (call->legacy_state == NULL) {
165 call->legacy_state = gpr_malloc(sizeof(legacy_state));
166 memset(call->legacy_state, 0, sizeof(legacy_state));
167 }
168 return call->legacy_state;
169}
170
Craig Tillerdddbf692015-01-29 10:25:33 -0800171void grpc_call_internal_ref(grpc_call *c, const char *reason) {
172 gpr_log(GPR_DEBUG, "ref %p %s", c, reason);
173 gpr_ref(&c->internal_refcount);
174}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800175
Craig Tillerdddbf692015-01-29 10:25:33 -0800176void grpc_call_internal_unref(grpc_call *c, const char *reason) {
177 gpr_log(GPR_DEBUG, "unref %p %s", c, reason);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800178 if (gpr_unref(&c->internal_refcount)) {
179 grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800180 grpc_channel_internal_unref(c->channel);
Craig Tillercce17ac2015-01-20 09:29:28 -0800181 gpr_mu_destroy(&c->mu);
182 if (c->legacy_state) {
183 gpr_free(c->legacy_state->md_out);
184 gpr_free(c->legacy_state->md_in.metadata);
185 gpr_free(c->legacy_state->trail_md_in.metadata);
Craig Tiller9724de82015-01-28 17:06:29 -0800186 /*gpr_free(c->legacy_state->status_in.details);*/
Craig Tillercce17ac2015-01-20 09:29:28 -0800187 gpr_free(c->legacy_state);
188 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800189 gpr_free(c);
190 }
191}
192
Craig Tillercce17ac2015-01-20 09:29:28 -0800193static grpc_call_error bind_cq(grpc_call *call, grpc_completion_queue *cq) {
194 if (call->cq) return GRPC_CALL_ERROR_ALREADY_INVOKED;
195 call->cq = cq;
196 return GRPC_CALL_OK;
197}
198
199static void request_more_data(grpc_call *call) {
200 grpc_call_op op;
201
202 /* call down */
203 op.type = GRPC_REQUEST_DATA;
204 op.dir = GRPC_CALL_DOWN;
205 op.flags = 0;
206 op.done_cb = do_nothing;
207 op.user_data = NULL;
208
209 grpc_call_execute_op(call, &op);
210}
211
Craig Tiller8eb9d472015-01-27 17:00:03 -0800212static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); }
Craig Tillercce17ac2015-01-20 09:29:28 -0800213
Craig Tiller8eb9d472015-01-27 17:00:03 -0800214static void unlock(grpc_call *call) {
Craig Tiller2f38be62015-01-29 10:26:22 -0800215 send_action sa = SEND_NOTHING;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800216 completed_request completed_requests[GRPC_IOREQ_OP_COUNT];
217 int num_completed_requests = call->num_completed_requests;
218 int i;
219
220 if (num_completed_requests != 0) {
221 memcpy(completed_requests, call->completed_requests,
222 sizeof(completed_requests));
223 call->num_completed_requests = 0;
224 }
225
226 if (!call->sending) {
227 sa = choose_send_action(call);
228 if (sa != SEND_NOTHING) {
229 call->sending = 1;
230 }
231 }
232
233 gpr_mu_unlock(&call->mu);
234
235 if (sa != SEND_NOTHING) {
236 enact_send_action(call, sa);
237 }
238
239 for (i = 0; i < num_completed_requests; i++) {
240 completed_requests[i].on_complete(call, completed_requests[i].status,
241 completed_requests[i].user_data);
242 }
243}
Craig Tillercce17ac2015-01-20 09:29:28 -0800244
245static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op,
246 grpc_op_error status) {
247 reqinfo *master = call->requests[op].master;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800248 completed_request *cr;
Craig Tillercce17ac2015-01-20 09:29:28 -0800249 size_t i;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800250 switch (call->requests[op].state) {
251 case REQ_INITIAL: /* not started yet */
252 return;
253 case REQ_DONE: /* already finished */
Craig Tiller8eb9d472015-01-27 17:00:03 -0800254 return;
255 case REQ_READY:
256 master->complete_mask |= 1 << op;
257 call->requests[op].state =
258 (op == GRPC_IOREQ_SEND_MESSAGES || op == GRPC_IOREQ_RECV_MESSAGES)
259 ? REQ_INITIAL
260 : REQ_DONE;
261 if (master->complete_mask == master->need_mask ||
262 status == GRPC_OP_ERROR) {
Craig Tiller9724de82015-01-28 17:06:29 -0800263 if (OP_IN_MASK(GRPC_IOREQ_RECV_STATUS, master->need_mask)) {
Craig Tillerf26370d2015-01-29 10:00:11 -0800264 call->requests[GRPC_IOREQ_RECV_STATUS].data.recv_status->status =
265 call->status_code;
266 call->requests[GRPC_IOREQ_RECV_STATUS].data.recv_status->details =
267 call->status_details
268 ? grpc_mdstr_as_c_string(call->status_details)
269 : NULL;
Craig Tiller9724de82015-01-28 17:06:29 -0800270 }
Craig Tiller8eb9d472015-01-27 17:00:03 -0800271 for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
272 if (call->requests[i].master == master) {
273 call->requests[i].master = NULL;
274 }
275 }
276 cr = &call->completed_requests[call->num_completed_requests++];
277 cr->status = status;
278 cr->on_complete = master->on_complete;
279 cr->user_data = master->user_data;
Craig Tillercce17ac2015-01-20 09:29:28 -0800280 }
Craig Tillercce17ac2015-01-20 09:29:28 -0800281 }
282}
283
284static void finish_write_step(void *pc, grpc_op_error error) {
285 grpc_call *call = pc;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800286 lock(call);
Craig Tillercce17ac2015-01-20 09:29:28 -0800287 if (error == GRPC_OP_OK) {
288 if (call->write_index ==
289 call->requests[GRPC_IOREQ_SEND_MESSAGES].data.send_messages.count) {
290 finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGES, GRPC_OP_OK);
291 }
Craig Tillercce17ac2015-01-20 09:29:28 -0800292 } else {
293 finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGES, GRPC_OP_ERROR);
Craig Tillercce17ac2015-01-20 09:29:28 -0800294 }
Craig Tiller8eb9d472015-01-27 17:00:03 -0800295 call->sending = 0;
296 unlock(call);
Craig Tillercce17ac2015-01-20 09:29:28 -0800297}
298
299static void finish_finish_step(void *pc, grpc_op_error error) {
300 grpc_call *call = pc;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800301 lock(call);
Craig Tillercce17ac2015-01-20 09:29:28 -0800302 if (error == GRPC_OP_OK) {
Craig Tilleree2d7022015-01-27 14:09:59 -0800303 finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, GRPC_OP_OK);
Craig Tillercce17ac2015-01-20 09:29:28 -0800304 } else {
305 gpr_log(GPR_ERROR, "not implemented");
306 abort();
307 }
Craig Tiller8eb9d472015-01-27 17:00:03 -0800308 call->sending = 0;
309 unlock(call);
Craig Tillercce17ac2015-01-20 09:29:28 -0800310}
311
Craig Tiller62ac1552015-01-27 15:41:44 -0800312static void finish_start_step(void *pc, grpc_op_error error) {
313 grpc_call *call = pc;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800314 lock(call);
Craig Tiller62ac1552015-01-27 15:41:44 -0800315 if (error == GRPC_OP_OK) {
316 finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, GRPC_OP_OK);
Craig Tiller62ac1552015-01-27 15:41:44 -0800317 } else {
318 gpr_log(GPR_ERROR, "not implemented");
319 abort();
320 }
Craig Tiller8eb9d472015-01-27 17:00:03 -0800321 call->sending = 0;
322 unlock(call);
Craig Tiller62ac1552015-01-27 15:41:44 -0800323}
324
Craig Tiller8eb9d472015-01-27 17:00:03 -0800325static send_action choose_send_action(grpc_call *call) {
326 switch (call->requests[GRPC_IOREQ_SEND_INITIAL_METADATA].state) {
327 case REQ_INITIAL:
328 return SEND_NOTHING;
329 case REQ_READY:
330 return SEND_INITIAL_METADATA;
331 case REQ_DONE:
332 break;
333 }
334 switch (call->requests[GRPC_IOREQ_SEND_MESSAGES].state) {
335 case REQ_INITIAL:
336 return SEND_NOTHING;
337 case REQ_READY:
338 return SEND_MESSAGE;
339 case REQ_DONE:
340 break;
341 }
342 switch (call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].state) {
343 case REQ_INITIAL:
344 return SEND_NOTHING;
345 case REQ_READY:
346 finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, GRPC_OP_OK);
347 return SEND_TRAILING_METADATA;
348 case REQ_DONE:
349 break;
350 }
351 switch (call->requests[GRPC_IOREQ_SEND_CLOSE].state) {
352 default:
353 return SEND_NOTHING;
354 case REQ_READY:
355 return SEND_FINISH;
356 }
357}
358
359static void enact_send_action(grpc_call *call, send_action sa) {
360 grpc_ioreq_data data;
Craig Tiller62ac1552015-01-27 15:41:44 -0800361 grpc_call_op op;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800362 int i;
Craig Tiller62ac1552015-01-27 15:41:44 -0800363
Craig Tiller8eb9d472015-01-27 17:00:03 -0800364 switch (sa) {
365 case SEND_NOTHING:
366 abort();
367 break;
368 case SEND_INITIAL_METADATA:
369 data = call->requests[GRPC_IOREQ_SEND_INITIAL_METADATA].data;
370 for (i = 0; i < data.send_metadata.count; i++) {
371 const grpc_metadata *md = &data.send_metadata.metadata[i];
372 grpc_call_element_send_metadata(
373 CALL_ELEM_FROM_CALL(call, 0),
374 grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key,
375 (const gpr_uint8 *)md->value,
376 md->value_length));
377 }
378 op.type = GRPC_SEND_START;
379 op.dir = GRPC_CALL_DOWN;
380 op.flags = 0;
381 op.data.start.pollset = grpc_cq_pollset(call->cq);
382 op.done_cb = finish_start_step;
383 op.user_data = call;
384 grpc_call_execute_op(call, &op);
385 break;
386 case SEND_MESSAGE:
387 data = call->requests[GRPC_IOREQ_SEND_MESSAGES].data;
388 op.type = GRPC_SEND_MESSAGE;
389 op.dir = GRPC_CALL_DOWN;
390 op.flags = 0;
391 op.data.message = data.send_messages.messages[call->write_index];
392 op.done_cb = finish_write_step;
393 op.user_data = call;
394 grpc_call_execute_op(call, &op);
395 break;
396 case SEND_TRAILING_METADATA:
397 data = call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].data;
398 for (i = 0; i < data.send_metadata.count; i++) {
399 const grpc_metadata *md = &data.send_metadata.metadata[i];
400 grpc_call_element_send_metadata(
401 CALL_ELEM_FROM_CALL(call, 0),
402 grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key,
403 (const gpr_uint8 *)md->value,
404 md->value_length));
405 }
406 lock(call);
407 call->sending = 0;
408 unlock(call);
409 break;
410 case SEND_FINISH:
Craig Tillerabcf6522015-01-28 15:44:24 -0800411 if (!call->is_client) {
412 /* TODO(ctiller): cache common status values */
413 char status_str[GPR_LTOA_MIN_BUFSIZE];
Craig Tiller9724de82015-01-28 17:06:29 -0800414 data = call->requests[GRPC_IOREQ_SEND_CLOSE].data;
Craig Tillerabcf6522015-01-28 15:44:24 -0800415 gpr_ltoa(data.send_close.status, status_str);
416 grpc_call_element_send_metadata(
417 CALL_ELEM_FROM_CALL(call, 0),
418 grpc_mdelem_from_metadata_strings(
419 call->metadata_context,
420 grpc_channel_get_status_string(call->channel),
421 grpc_mdstr_from_string(call->metadata_context, status_str)));
422 if (data.send_close.details) {
423 grpc_call_element_send_metadata(
424 CALL_ELEM_FROM_CALL(call, 0),
425 grpc_mdelem_from_metadata_strings(
426 call->metadata_context,
427 grpc_channel_get_message_string(call->channel),
428 grpc_mdstr_from_string(call->metadata_context,
429 data.send_close.details)));
430 }
431 }
Craig Tiller8eb9d472015-01-27 17:00:03 -0800432 op.type = GRPC_SEND_FINISH;
433 op.dir = GRPC_CALL_DOWN;
434 op.flags = 0;
435 op.done_cb = finish_finish_step;
436 op.user_data = call;
437 grpc_call_execute_op(call, &op);
438 break;
Craig Tiller62ac1552015-01-27 15:41:44 -0800439 }
Craig Tillercce17ac2015-01-20 09:29:28 -0800440}
441
442static grpc_call_error start_ioreq_error(grpc_call *call,
443 gpr_uint32 mutated_ops,
444 grpc_call_error ret) {
445 size_t i;
446 for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
447 if (mutated_ops & (1 << i)) {
448 call->requests[i].master = NULL;
449 }
450 }
Craig Tillercce17ac2015-01-20 09:29:28 -0800451 return ret;
452}
453
Craig Tiller8eb9d472015-01-27 17:00:03 -0800454static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
455 size_t nreqs,
456 grpc_ioreq_completion_func completion,
457 void *user_data) {
Craig Tillercce17ac2015-01-20 09:29:28 -0800458 size_t i;
459 gpr_uint32 have_ops = 0;
460 gpr_uint32 precomplete = 0;
461 grpc_ioreq_op op;
462 reqinfo *master = NULL;
463 reqinfo *requests = call->requests;
464 grpc_ioreq_data data;
Craig Tiller23aa6c42015-01-27 17:16:12 -0800465 gpr_uint8 have_send_closed = 0;
Craig Tillercce17ac2015-01-20 09:29:28 -0800466
467 for (i = 0; i < nreqs; i++) {
468 op = reqs[i].op;
469 if (requests[op].master) {
470 return start_ioreq_error(call, have_ops,
471 GRPC_CALL_ERROR_TOO_MANY_OPERATIONS);
472 }
Craig Tiller8eb9d472015-01-27 17:00:03 -0800473 switch (requests[op].state) {
474 case REQ_INITIAL:
475 break;
476 case REQ_READY:
477 return start_ioreq_error(call, have_ops,
478 GRPC_CALL_ERROR_TOO_MANY_OPERATIONS);
479 case REQ_DONE:
480 return start_ioreq_error(call, have_ops,
481 GRPC_CALL_ERROR_ALREADY_INVOKED);
482 }
Craig Tillercce17ac2015-01-20 09:29:28 -0800483 if (master == NULL) {
484 master = &requests[op];
485 }
486 have_ops |= 1 << op;
487 data = reqs[i].data;
488
489 switch (op) {
490 default:
491 break;
492 case GRPC_IOREQ_RECV_MESSAGES:
493 data.recv_messages->count = 0;
494 if (call->buffered_messages.count > 0) {
495 SWAP(grpc_byte_buffer_array, *data.recv_messages,
496 call->buffered_messages);
497 precomplete |= 1 << op;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800498 abort();
Craig Tillercce17ac2015-01-20 09:29:28 -0800499 }
500 break;
501 case GRPC_IOREQ_SEND_MESSAGES:
502 call->write_index = 0;
503 break;
Craig Tiller23aa6c42015-01-27 17:16:12 -0800504 case GRPC_IOREQ_SEND_CLOSE:
505 have_send_closed = 1;
506 break;
Craig Tillercce17ac2015-01-20 09:29:28 -0800507 }
508
Craig Tiller8eb9d472015-01-27 17:00:03 -0800509 requests[op].state = REQ_READY;
Craig Tillercce17ac2015-01-20 09:29:28 -0800510 requests[op].data = data;
511 requests[op].master = master;
512 }
513
514 GPR_ASSERT(master != NULL);
515 master->need_mask = have_ops;
516 master->complete_mask = precomplete;
517 master->on_complete = completion;
518 master->user_data = user_data;
519
Craig Tiller23aa6c42015-01-27 17:16:12 -0800520 if (have_send_closed) {
521 if (requests[GRPC_IOREQ_SEND_MESSAGES].state == REQ_INITIAL) {
522 requests[GRPC_IOREQ_SEND_MESSAGES].state = REQ_DONE;
523 }
524 }
525
Craig Tillercce17ac2015-01-20 09:29:28 -0800526 if (OP_IN_MASK(GRPC_IOREQ_RECV_MESSAGES, have_ops & ~precomplete)) {
527 request_more_data(call);
528 }
529
530 return GRPC_CALL_OK;
531}
532
533static void call_start_ioreq_done(grpc_call *call, grpc_op_error status,
534 void *user_data) {
535 grpc_cq_end_ioreq(call->cq, user_data, call, do_nothing, NULL, status);
536}
537
538grpc_call_error grpc_call_start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
539 size_t nreqs, void *tag) {
Craig Tiller8eb9d472015-01-27 17:00:03 -0800540 grpc_call_error err;
541 lock(call);
542 err = start_ioreq(call, reqs, nreqs, call_start_ioreq_done, tag);
543 unlock(call);
544 return err;
Craig Tillercce17ac2015-01-20 09:29:28 -0800545}
546
547grpc_call_error grpc_call_start_ioreq_and_call_back(
548 grpc_call *call, const grpc_ioreq *reqs, size_t nreqs,
549 grpc_ioreq_completion_func on_complete, void *user_data) {
Craig Tiller8eb9d472015-01-27 17:00:03 -0800550 grpc_call_error err;
551 lock(call);
552 err = start_ioreq(call, reqs, nreqs, on_complete, user_data);
553 unlock(call);
554 return err;
Craig Tillercce17ac2015-01-20 09:29:28 -0800555}
556
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800557void grpc_call_destroy(grpc_call *c) {
ctillerc6d61c42014-12-15 14:52:08 -0800558 int cancel;
Craig Tiller9724de82015-01-28 17:06:29 -0800559 lock(c);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800560 if (c->have_alarm) {
ctiller18b49ab2014-12-09 14:39:16 -0800561 grpc_alarm_cancel(&c->alarm);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800562 c->have_alarm = 0;
563 }
Craig Tillercce17ac2015-01-20 09:29:28 -0800564 cancel = !c->stream_closed;
Craig Tiller9724de82015-01-28 17:06:29 -0800565 unlock(c);
ctillerc6d61c42014-12-15 14:52:08 -0800566 if (cancel) grpc_call_cancel(c);
Craig Tillerdddbf692015-01-29 10:25:33 -0800567 grpc_call_internal_unref(c, "destroy");
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800568}
569
Craig Tillerd248c242015-01-14 11:49:12 -0800570static void maybe_set_status_code(grpc_call *call, gpr_uint32 status) {
Craig Tillercce17ac2015-01-20 09:29:28 -0800571 if (call->got_status_code) return;
572 call->status_code = status;
573 call->got_status_code = 1;
Craig Tillerd248c242015-01-14 11:49:12 -0800574}
575
576static void maybe_set_status_details(grpc_call *call, grpc_mdstr *status) {
Craig Tillercce17ac2015-01-20 09:29:28 -0800577 if (call->status_details != NULL) {
578 grpc_mdstr_unref(status);
579 return;
Craig Tiller6046dc32015-01-14 12:55:45 -0800580 }
Craig Tillercce17ac2015-01-20 09:29:28 -0800581 call->status_details = status;
Craig Tillerd248c242015-01-14 11:49:12 -0800582}
583
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800584grpc_call_error grpc_call_cancel(grpc_call *c) {
585 grpc_call_element *elem;
586 grpc_call_op op;
587
588 op.type = GRPC_CANCEL_OP;
589 op.dir = GRPC_CALL_DOWN;
590 op.flags = 0;
591 op.done_cb = do_nothing;
592 op.user_data = NULL;
593
594 elem = CALL_ELEM_FROM_CALL(c, 0);
ctillerf962f522014-12-10 15:28:27 -0800595 elem->filter->call_op(elem, NULL, &op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800596
597 return GRPC_CALL_OK;
598}
599
Craig Tiller6046dc32015-01-14 12:55:45 -0800600grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
601 grpc_status_code status,
602 const char *description) {
603 grpc_mdstr *details =
604 description ? grpc_mdstr_from_string(c->metadata_context, description)
605 : NULL;
Craig Tiller9724de82015-01-28 17:06:29 -0800606 lock(c);
Craig Tillerd248c242015-01-14 11:49:12 -0800607 maybe_set_status_code(c, status);
608 if (details) {
609 maybe_set_status_details(c, details);
610 }
Craig Tiller9724de82015-01-28 17:06:29 -0800611 unlock(c);
Craig Tillerd248c242015-01-14 11:49:12 -0800612 return grpc_call_cancel(c);
613}
614
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800615void grpc_call_execute_op(grpc_call *call, grpc_call_op *op) {
616 grpc_call_element *elem;
617 GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
618 elem = CALL_ELEM_FROM_CALL(call, 0);
ctillerf962f522014-12-10 15:28:27 -0800619 elem->filter->call_op(elem, NULL, op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800620}
621
Craig Tiller62ac1552015-01-27 15:41:44 -0800622grpc_call_error grpc_call_add_metadata(grpc_call *call, grpc_metadata *metadata,
623 gpr_uint32 flags) {
624 legacy_state *ls;
625 grpc_metadata *mdout;
626
Craig Tiller8eb9d472015-01-27 17:00:03 -0800627 lock(call);
Craig Tiller62ac1552015-01-27 15:41:44 -0800628 ls = get_legacy_state(call);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800629
Craig Tillercce17ac2015-01-20 09:29:28 -0800630 if (ls->md_out_count == ls->md_out_capacity) {
631 ls->md_out_capacity =
632 GPR_MAX(ls->md_out_count * 3 / 2, ls->md_out_count + 8);
633 ls->md_out =
634 gpr_realloc(ls->md_out, sizeof(grpc_mdelem *) * ls->md_out_capacity);
635 }
Craig Tiller62ac1552015-01-27 15:41:44 -0800636 mdout = &ls->md_out[ls->md_out_count++];
637 mdout->key = gpr_strdup(metadata->key);
638 mdout->value = gpr_malloc(metadata->value_length);
639 mdout->value_length = metadata->value_length;
640 memcpy(mdout->value, metadata->value, metadata->value_length);
klempnerc463f742014-12-19 13:03:35 -0800641
Craig Tiller8eb9d472015-01-27 17:00:03 -0800642 unlock(call);
Craig Tiller62ac1552015-01-27 15:41:44 -0800643
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800644 return GRPC_CALL_OK;
645}
646
Craig Tillercce17ac2015-01-20 09:29:28 -0800647static void finish_status(grpc_call *call, grpc_op_error status, void *tag) {
648 legacy_state *ls;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800649
Craig Tiller8eb9d472015-01-27 17:00:03 -0800650 lock(call);
Craig Tillercce17ac2015-01-20 09:29:28 -0800651 ls = get_legacy_state(call);
Craig Tiller8eb9d472015-01-27 17:00:03 -0800652 unlock(call);
Craig Tiller80fa15c2015-01-13 16:10:49 -0800653
Craig Tillercce17ac2015-01-20 09:29:28 -0800654 if (status == GRPC_OP_OK) {
655 grpc_cq_end_finished(call->cq, tag, call, do_nothing, NULL,
656 ls->status_in.status, ls->status_in.details,
657 ls->trail_md_in.metadata, ls->trail_md_in.count);
658 } else {
659 grpc_cq_end_finished(call->cq, tag, call, do_nothing, NULL,
660 GRPC_STATUS_UNKNOWN, "Read status failed", NULL, 0);
Craig Tiller80fa15c2015-01-13 16:10:49 -0800661 }
Craig Tillercce17ac2015-01-20 09:29:28 -0800662}
Craig Tiller80fa15c2015-01-13 16:10:49 -0800663
Craig Tillercce17ac2015-01-20 09:29:28 -0800664static void finish_recv_metadata(grpc_call *call, grpc_op_error status,
665 void *tag) {
Craig Tillercce17ac2015-01-20 09:29:28 -0800666 legacy_state *ls;
667
Craig Tiller8eb9d472015-01-27 17:00:03 -0800668 lock(call);
Craig Tillercce17ac2015-01-20 09:29:28 -0800669 ls = get_legacy_state(call);
670 if (status == GRPC_OP_OK) {
671 grpc_cq_end_client_metadata_read(call->cq, tag, call, do_nothing, NULL,
672 ls->md_in.count, ls->md_in.metadata);
673
Craig Tiller62ac1552015-01-27 15:41:44 -0800674 } else {
675 grpc_cq_end_client_metadata_read(call->cq, tag, call, do_nothing, NULL, 0,
676 NULL);
677 }
Craig Tiller8eb9d472015-01-27 17:00:03 -0800678 unlock(call);
Craig Tiller62ac1552015-01-27 15:41:44 -0800679}
680
681static void finish_send_metadata(grpc_call *call, grpc_op_error status,
682 void *metadata_read_tag) {
683 grpc_ioreq reqs[2];
684 legacy_state *ls;
685
Craig Tiller8eb9d472015-01-27 17:00:03 -0800686 lock(call);
Craig Tiller62ac1552015-01-27 15:41:44 -0800687 if (status == GRPC_OP_OK) {
Craig Tiller62ac1552015-01-27 15:41:44 -0800688 ls = get_legacy_state(call);
689 reqs[0].op = GRPC_IOREQ_RECV_INITIAL_METADATA;
690 reqs[0].data.recv_metadata = &ls->md_in;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800691 GPR_ASSERT(GRPC_CALL_OK == start_ioreq(call, reqs, 1, finish_recv_metadata,
692 metadata_read_tag));
Craig Tiller62ac1552015-01-27 15:41:44 -0800693
Craig Tiller62ac1552015-01-27 15:41:44 -0800694 ls = get_legacy_state(call);
Craig Tillercce17ac2015-01-20 09:29:28 -0800695 reqs[0].op = GRPC_IOREQ_RECV_TRAILING_METADATA;
696 reqs[0].data.recv_metadata = &ls->trail_md_in;
697 reqs[1].op = GRPC_IOREQ_RECV_STATUS;
698 reqs[1].data.recv_status = &ls->status_in;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800699 GPR_ASSERT(GRPC_CALL_OK ==
700 start_ioreq(call, reqs, 2, finish_status, ls->finished_tag));
Craig Tillercce17ac2015-01-20 09:29:28 -0800701 } else {
Craig Tiller62ac1552015-01-27 15:41:44 -0800702 ls = get_legacy_state(call);
703 grpc_cq_end_client_metadata_read(call->cq, metadata_read_tag, call,
704 do_nothing, NULL, 0, NULL);
Craig Tillercce17ac2015-01-20 09:29:28 -0800705 grpc_cq_end_finished(call->cq, ls->finished_tag, call, do_nothing, NULL,
706 GRPC_STATUS_UNKNOWN, "Failed to read initial metadata",
707 NULL, 0);
Craig Tiller80fa15c2015-01-13 16:10:49 -0800708 }
Craig Tiller8eb9d472015-01-27 17:00:03 -0800709 unlock(call);
Craig Tiller80fa15c2015-01-13 16:10:49 -0800710}
711
Craig Tiller40fc7a62015-01-13 16:11:58 -0800712grpc_call_error grpc_call_invoke(grpc_call *call, grpc_completion_queue *cq,
713 void *metadata_read_tag, void *finished_tag,
714 gpr_uint32 flags) {
Craig Tillercce17ac2015-01-20 09:29:28 -0800715 grpc_ioreq req;
716 legacy_state *ls = get_legacy_state(call);
717 grpc_call_error err;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800718
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800719 grpc_cq_begin_op(cq, call, GRPC_CLIENT_METADATA_READ);
Craig Tillercce17ac2015-01-20 09:29:28 -0800720 grpc_cq_begin_op(cq, call, GRPC_FINISHED);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800721
Craig Tiller8eb9d472015-01-27 17:00:03 -0800722 lock(call);
Craig Tillercce17ac2015-01-20 09:29:28 -0800723 err = bind_cq(call, cq);
724 if (err != GRPC_CALL_OK) return err;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800725
Craig Tiller9724de82015-01-28 17:06:29 -0800726 get_legacy_state(call)->finished_tag = finished_tag;
727
Craig Tiller62ac1552015-01-27 15:41:44 -0800728 req.op = GRPC_IOREQ_SEND_INITIAL_METADATA;
729 req.data.send_metadata.count = ls->md_out_count;
730 req.data.send_metadata.metadata = ls->md_out;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800731 err = start_ioreq(call, &req, 1, finish_send_metadata, metadata_read_tag);
732 unlock(call);
733 return err;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800734}
735
nnoble0c475f02014-12-05 15:37:39 -0800736grpc_call_error grpc_call_server_accept(grpc_call *call,
737 grpc_completion_queue *cq,
738 void *finished_tag) {
Craig Tillercce17ac2015-01-20 09:29:28 -0800739 grpc_ioreq req;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800740 grpc_call_error err;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800741
742 /* inform the completion queue of an incoming operation (corresponding to
743 finished_tag) */
744 grpc_cq_begin_op(cq, call, GRPC_FINISHED);
745
Craig Tiller8eb9d472015-01-27 17:00:03 -0800746 err = bind_cq(call, cq);
747 if (err != GRPC_CALL_OK) return err;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800748
Craig Tillercce17ac2015-01-20 09:29:28 -0800749 req.op = GRPC_IOREQ_RECV_STATUS;
750 req.data.recv_status = &get_legacy_state(call)->status_in;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800751 err = start_ioreq(call, &req, 1, finish_status, finished_tag);
752 unlock(call);
753 return err;
nnoble0c475f02014-12-05 15:37:39 -0800754}
755
Craig Tillerabcf6522015-01-28 15:44:24 -0800756static void finish_send_initial_metadata(grpc_call *call, grpc_op_error status,
757 void *tag) {}
Craig Tiller39fd4282015-01-28 09:12:31 -0800758
nnoble0c475f02014-12-05 15:37:39 -0800759grpc_call_error grpc_call_server_end_initial_metadata(grpc_call *call,
760 gpr_uint32 flags) {
Craig Tiller39fd4282015-01-28 09:12:31 -0800761 grpc_ioreq req;
762 grpc_call_error err;
763 legacy_state *ls;
764
765 lock(call);
766 ls = get_legacy_state(call);
767 req.op = GRPC_IOREQ_SEND_INITIAL_METADATA;
768 req.data.send_metadata.count = ls->md_out_count;
769 req.data.send_metadata.metadata = ls->md_out;
770 err = start_ioreq(call, &req, 1, finish_send_initial_metadata, NULL);
771 unlock(call);
Craig Tillerabcf6522015-01-28 15:44:24 -0800772
Craig Tiller39fd4282015-01-28 09:12:31 -0800773 return err;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800774}
775
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800776void grpc_call_client_initial_metadata_complete(
777 grpc_call_element *surface_element) {
778 grpc_call *call = grpc_call_from_top_element(surface_element);
Craig Tiller8eb9d472015-01-27 17:00:03 -0800779 lock(call);
Craig Tillercce17ac2015-01-20 09:29:28 -0800780 call->got_initial_metadata = 1;
781 finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK);
Craig Tiller8eb9d472015-01-27 17:00:03 -0800782 unlock(call);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800783}
784
Craig Tillercce17ac2015-01-20 09:29:28 -0800785static void finish_read(grpc_call *call, grpc_op_error error, void *tag) {
786 legacy_state *ls;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800787 lock(call);
Craig Tillercce17ac2015-01-20 09:29:28 -0800788 ls = get_legacy_state(call);
789 if (ls->msg_in.count == 0) {
790 grpc_cq_end_read(call->cq, tag, call, do_nothing, NULL, NULL);
791 } else {
792 grpc_cq_end_read(call->cq, tag, call, do_nothing, NULL,
793 ls->msg_in.buffers[ls->msg_in_read_idx++]);
794 }
Craig Tiller8eb9d472015-01-27 17:00:03 -0800795 unlock(call);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800796}
797
798grpc_call_error grpc_call_start_read(grpc_call *call, void *tag) {
Craig Tillercce17ac2015-01-20 09:29:28 -0800799 legacy_state *ls;
800 grpc_ioreq req;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800801 grpc_call_error err;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800802
803 grpc_cq_begin_op(call->cq, call, GRPC_READ);
804
Craig Tiller8eb9d472015-01-27 17:00:03 -0800805 lock(call);
Craig Tillercce17ac2015-01-20 09:29:28 -0800806 ls = get_legacy_state(call);
807
808 if (ls->msg_in_read_idx == ls->msg_in.count) {
809 ls->msg_in_read_idx = 0;
810 req.op = GRPC_IOREQ_RECV_MESSAGES;
811 req.data.recv_messages = &ls->msg_in;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800812 err = start_ioreq(call, &req, 1, finish_read, tag);
813 } else {
814 err = GRPC_CALL_OK;
815 grpc_cq_end_read(call->cq, tag, call, do_nothing, NULL,
816 ls->msg_in.buffers[ls->msg_in_read_idx++]);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800817 }
Craig Tiller8eb9d472015-01-27 17:00:03 -0800818 unlock(call);
819 return err;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800820}
821
Craig Tillercce17ac2015-01-20 09:29:28 -0800822static void finish_write(grpc_call *call, grpc_op_error status, void *tag) {
823 grpc_cq_end_write_accepted(call->cq, tag, call, do_nothing, NULL, status);
824}
825
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800826grpc_call_error grpc_call_start_write(grpc_call *call,
827 grpc_byte_buffer *byte_buffer, void *tag,
828 gpr_uint32 flags) {
Craig Tillercce17ac2015-01-20 09:29:28 -0800829 grpc_ioreq req;
830 legacy_state *ls;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800831 grpc_call_error err;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800832
833 grpc_cq_begin_op(call->cq, call, GRPC_WRITE_ACCEPTED);
834
Craig Tiller8eb9d472015-01-27 17:00:03 -0800835 lock(call);
Craig Tillercce17ac2015-01-20 09:29:28 -0800836 ls = get_legacy_state(call);
837 ls->msg_out = byte_buffer;
838 req.op = GRPC_IOREQ_SEND_MESSAGES;
839 req.data.send_messages.count = 1;
840 req.data.send_messages.messages = &ls->msg_out;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800841 err = start_ioreq(call, &req, 1, finish_write, tag);
842 unlock(call);
843
844 return err;
Craig Tillercce17ac2015-01-20 09:29:28 -0800845}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800846
Craig Tillercce17ac2015-01-20 09:29:28 -0800847static void finish_finish(grpc_call *call, grpc_op_error status, void *tag) {
848 grpc_cq_end_finish_accepted(call->cq, tag, call, do_nothing, NULL, status);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800849}
850
851grpc_call_error grpc_call_writes_done(grpc_call *call, void *tag) {
Craig Tillercce17ac2015-01-20 09:29:28 -0800852 grpc_ioreq req;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800853 grpc_call_error err;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800854 grpc_cq_begin_op(call->cq, call, GRPC_FINISH_ACCEPTED);
855
Craig Tiller8eb9d472015-01-27 17:00:03 -0800856 lock(call);
Craig Tillercce17ac2015-01-20 09:29:28 -0800857 req.op = GRPC_IOREQ_SEND_CLOSE;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800858 err = start_ioreq(call, &req, 1, finish_finish, tag);
859 unlock(call);
860
861 return err;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800862}
863
864grpc_call_error grpc_call_start_write_status(grpc_call *call,
ctiller2845cad2014-12-15 15:14:12 -0800865 grpc_status_code status,
866 const char *details, void *tag) {
Craig Tillerf31d14c2015-01-28 09:26:42 -0800867 grpc_ioreq reqs[2];
Craig Tiller8eb9d472015-01-27 17:00:03 -0800868 grpc_call_error err;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800869 grpc_cq_begin_op(call->cq, call, GRPC_FINISH_ACCEPTED);
870
Craig Tiller8eb9d472015-01-27 17:00:03 -0800871 lock(call);
Craig Tillerf31d14c2015-01-28 09:26:42 -0800872 reqs[0].op = GRPC_IOREQ_SEND_TRAILING_METADATA;
873 reqs[0].data.send_metadata.count = call->legacy_state->md_out_count;
874 reqs[0].data.send_metadata.metadata = call->legacy_state->md_out;
875 reqs[1].op = GRPC_IOREQ_SEND_CLOSE;
876 reqs[1].data.send_close.status = status;
877 reqs[1].data.send_close.details = details;
878 err = start_ioreq(call, reqs, 2, finish_finish, tag);
Craig Tiller8eb9d472015-01-27 17:00:03 -0800879 unlock(call);
880
881 return err;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800882}
883
884grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
885 return CALL_FROM_TOP_ELEM(elem);
886}
887
ctiller58393c22015-01-07 14:03:30 -0800888static void call_alarm(void *arg, int success) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800889 grpc_call *call = arg;
ctiller58393c22015-01-07 14:03:30 -0800890 if (success) {
Craig Tiller7b018782015-01-16 09:53:39 -0800891 if (call->is_client) {
892 grpc_call_cancel_with_status(call, GRPC_STATUS_DEADLINE_EXCEEDED,
893 "Deadline Exceeded");
894 } else {
Craig Tillerc9bbff22015-01-16 10:18:37 -0800895 grpc_call_cancel(call);
896 }
Craig Tiller7b018782015-01-16 09:53:39 -0800897 }
Craig Tillerdddbf692015-01-29 10:25:33 -0800898 grpc_call_internal_unref(call, "alarm");
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800899}
900
901void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) {
902 grpc_call *call = CALL_FROM_TOP_ELEM(elem);
903
904 if (call->have_alarm) {
905 gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice");
906 }
Craig Tillerdddbf692015-01-29 10:25:33 -0800907 grpc_call_internal_ref(call, "alarm");
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800908 call->have_alarm = 1;
ctiller769b70b2014-12-16 10:44:15 -0800909 grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now());
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800910}
Craig Tillerd631cf32015-01-27 10:35:01 -0800911
Craig Tillercce17ac2015-01-20 09:29:28 -0800912void grpc_call_read_closed(grpc_call_element *elem) {
913 grpc_call *call = CALL_FROM_TOP_ELEM(elem);
Craig Tiller8eb9d472015-01-27 17:00:03 -0800914 lock(call);
Craig Tillercce17ac2015-01-20 09:29:28 -0800915 GPR_ASSERT(!call->read_closed);
916 call->read_closed = 1;
917 finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGES, GRPC_OP_OK);
918 finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK);
919 finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK);
Craig Tiller8eb9d472015-01-27 17:00:03 -0800920 unlock(call);
Craig Tillercce17ac2015-01-20 09:29:28 -0800921}
922
923void grpc_call_stream_closed(grpc_call_element *elem) {
924 grpc_call *call = CALL_FROM_TOP_ELEM(elem);
Craig Tiller8eb9d472015-01-27 17:00:03 -0800925 lock(call);
Craig Tillercce17ac2015-01-20 09:29:28 -0800926 if (!call->read_closed) {
927 call->read_closed = 1;
928 finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGES, GRPC_OP_OK);
929 finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK);
930 finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK);
931 }
932 call->stream_closed = 1;
933 finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_OK);
Craig Tiller8eb9d472015-01-27 17:00:03 -0800934 unlock(call);
Craig Tillercce17ac2015-01-20 09:29:28 -0800935}
936
937/* we offset status by a small amount when storing it into transport metadata
938 as metadata cannot store a 0 value (which is used as OK for grpc_status_codes
939 */
940#define STATUS_OFFSET 1
941static void destroy_status(void *ignored) {}
942
943static gpr_uint32 decode_status(grpc_mdelem *md) {
944 gpr_uint32 status;
945 void *user_data = grpc_mdelem_get_user_data(md, destroy_status);
946 if (user_data) {
Craig Tillerf26370d2015-01-29 10:00:11 -0800947 status = ((gpr_uint32)(gpr_intptr) user_data) - STATUS_OFFSET;
Craig Tillercce17ac2015-01-20 09:29:28 -0800948 } else {
949 if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value),
950 GPR_SLICE_LENGTH(md->value->slice),
951 &status)) {
952 status = GRPC_STATUS_UNKNOWN; /* could not parse status code */
953 }
954 grpc_mdelem_set_user_data(md, destroy_status,
955 (void *)(gpr_intptr)(status + STATUS_OFFSET));
956 }
957 return status;
958}
959
960void grpc_call_recv_message(grpc_call_element *elem,
961 grpc_byte_buffer *byte_buffer) {
962 grpc_call *call = CALL_FROM_TOP_ELEM(elem);
963 grpc_byte_buffer_array *dest;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800964 lock(call);
Craig Tillercce17ac2015-01-20 09:29:28 -0800965 if (call->requests[GRPC_IOREQ_RECV_MESSAGES].master != NULL) {
966 dest = call->requests[GRPC_IOREQ_RECV_MESSAGES].data.recv_messages;
967 } else {
968 dest = &call->buffered_messages;
969 }
970 if (dest->count == dest->capacity) {
971 dest->capacity = GPR_MAX(dest->capacity + 1, dest->capacity * 3 / 2);
972 dest->buffers =
973 gpr_realloc(dest->buffers, sizeof(grpc_byte_buffer *) * dest->capacity);
974 }
975 dest->buffers[dest->count++] = byte_buffer;
976 finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGES, GRPC_OP_OK);
Craig Tiller8eb9d472015-01-27 17:00:03 -0800977 unlock(call);
Craig Tillercce17ac2015-01-20 09:29:28 -0800978}
979
980void grpc_call_recv_metadata(grpc_call_element *elem, grpc_mdelem *md) {
981 grpc_call *call = CALL_FROM_TOP_ELEM(elem);
982 grpc_mdstr *key = md->key;
983 grpc_metadata_array *dest;
984 grpc_metadata *mdusr;
985
Craig Tiller8eb9d472015-01-27 17:00:03 -0800986 lock(call);
Craig Tillercce17ac2015-01-20 09:29:28 -0800987 if (key == grpc_channel_get_status_string(call->channel)) {
988 maybe_set_status_code(call, decode_status(md));
989 grpc_mdelem_unref(md);
990 } else if (key == grpc_channel_get_message_string(call->channel)) {
991 maybe_set_status_details(call, md->value);
992 grpc_mdelem_unref(md);
993 } else {
994 if (!call->got_initial_metadata) {
Craig Tiller8eb9d472015-01-27 17:00:03 -0800995 dest = call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA].state == REQ_READY
Craig Tillercce17ac2015-01-20 09:29:28 -0800996 ? call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA]
997 .data.recv_metadata
998 : &call->buffered_initial_metadata;
999 } else {
Craig Tiller8eb9d472015-01-27 17:00:03 -08001000 dest = call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA].state == REQ_READY
Craig Tillercce17ac2015-01-20 09:29:28 -08001001 ? call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA]
1002 .data.recv_metadata
1003 : &call->buffered_trailing_metadata;
1004 }
1005 if (dest->count == dest->capacity) {
1006 dest->capacity = GPR_MAX(dest->capacity + 1, dest->capacity * 3 / 2);
1007 dest->metadata =
1008 gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity);
1009 }
1010 mdusr = &dest->metadata[dest->count++];
1011 mdusr->key = (char *)grpc_mdstr_as_c_string(md->key);
1012 mdusr->value = (char *)grpc_mdstr_as_c_string(md->value);
1013 mdusr->value_length = GPR_SLICE_LENGTH(md->value->slice);
1014 }
Craig Tiller8eb9d472015-01-27 17:00:03 -08001015 unlock(call);
Craig Tillercce17ac2015-01-20 09:29:28 -08001016}
1017
Craig Tillerd631cf32015-01-27 10:35:01 -08001018grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) {
1019 return CALL_STACK_FROM_CALL(call);
1020}