blob: 3726d1f44ca752592d201a2c752fd148ccddc230 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
3 * Copyright 2014, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/surface/call.h"
35#include "src/core/channel/channel_stack.h"
36#include "src/core/channel/metadata_buffer.h"
ctiller18b49ab2014-12-09 14:39:16 -080037#include "src/core/iomgr/alarm.h"
Craig Tiller485d7762015-01-23 12:54:05 -080038#include "src/core/support/string.h"
ctiller18b49ab2014-12-09 14:39:16 -080039#include "src/core/surface/channel.h"
40#include "src/core/surface/completion_queue.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080041#include <grpc/support/alloc.h>
42#include <grpc/support/log.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080043
44#include <stdio.h>
45#include <stdlib.h>
46#include <string.h>
47
Craig Tiller8eb9d472015-01-27 17:00:03 -080048#define OP_IN_MASK(op, mask) (((1 << (op)) & (mask)) != 0)
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080049
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080050typedef struct {
Craig Tillercce17ac2015-01-20 09:29:28 -080051 size_t md_out_count;
52 size_t md_out_capacity;
Craig Tiller62ac1552015-01-27 15:41:44 -080053 grpc_metadata *md_out;
Craig Tillercce17ac2015-01-20 09:29:28 -080054 grpc_byte_buffer *msg_out;
55
56 /* input buffers */
57 grpc_metadata_array md_in;
58 grpc_metadata_array trail_md_in;
59 grpc_recv_status status_in;
60 size_t msg_in_read_idx;
61 grpc_byte_buffer_array msg_in;
62
63 void *finished_tag;
64} legacy_state;
65
Craig Tiller8eb9d472015-01-27 17:00:03 -080066typedef enum { REQ_INITIAL = 0, REQ_READY, REQ_DONE } req_state;
67
68typedef enum {
69 SEND_NOTHING,
70 SEND_INITIAL_METADATA,
71 SEND_MESSAGE,
72 SEND_TRAILING_METADATA,
73 SEND_FINISH
74} send_action;
75
76typedef struct {
77 grpc_ioreq_completion_func on_complete;
78 void *user_data;
79 grpc_op_error status;
80} completed_request;
81
Craig Tillercce17ac2015-01-20 09:29:28 -080082typedef struct reqinfo {
Craig Tiller8eb9d472015-01-27 17:00:03 -080083 req_state state;
Craig Tillercce17ac2015-01-20 09:29:28 -080084 grpc_ioreq_data data;
85 struct reqinfo *master;
86 grpc_ioreq_completion_func on_complete;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080087 void *user_data;
Craig Tillercce17ac2015-01-20 09:29:28 -080088 gpr_uint32 need_mask;
89 gpr_uint32 complete_mask;
90} reqinfo;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080091
92struct grpc_call {
93 grpc_completion_queue *cq;
94 grpc_channel *channel;
95 grpc_mdctx *metadata_context;
Craig Tillercce17ac2015-01-20 09:29:28 -080096 /* TODO(ctiller): share with cq if possible? */
97 gpr_mu mu;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080098
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080099 gpr_uint8 is_client;
Craig Tillercce17ac2015-01-20 09:29:28 -0800100 gpr_uint8 got_initial_metadata;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800101 gpr_uint8 have_alarm;
Craig Tillercce17ac2015-01-20 09:29:28 -0800102 gpr_uint8 read_closed;
103 gpr_uint8 stream_closed;
Craig Tiller4de9b7d2015-01-13 17:45:03 -0800104 gpr_uint8 got_status_code;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800105 gpr_uint8 sending;
106 gpr_uint8 num_completed_requests;
Craig Tiller2e103572015-01-29 14:12:07 -0800107 gpr_uint8 need_more_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800108
Craig Tillercce17ac2015-01-20 09:29:28 -0800109 reqinfo requests[GRPC_IOREQ_OP_COUNT];
Craig Tiller8eb9d472015-01-27 17:00:03 -0800110 completed_request completed_requests[GRPC_IOREQ_OP_COUNT];
Craig Tillercce17ac2015-01-20 09:29:28 -0800111 grpc_byte_buffer_array buffered_messages;
112 grpc_metadata_array buffered_initial_metadata;
113 grpc_metadata_array buffered_trailing_metadata;
114 size_t write_index;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800115
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800116 grpc_status_code status_code;
117 grpc_mdstr *status_details;
118
Craig Tillercce17ac2015-01-20 09:29:28 -0800119 grpc_alarm alarm;
120
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800121 gpr_refcount internal_refcount;
Craig Tillercce17ac2015-01-20 09:29:28 -0800122
123 legacy_state *legacy_state;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800124};
125
Craig Tillerf26370d2015-01-29 10:00:11 -0800126#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call)+1))
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800127#define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1)
128#define CALL_ELEM_FROM_CALL(call, idx) \
129 grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx)
130#define CALL_FROM_TOP_ELEM(top_elem) \
131 CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
132
Craig Tillercce17ac2015-01-20 09:29:28 -0800133#define SWAP(type, x, y) \
134 do { \
135 type temp = x; \
136 x = y; \
137 y = temp; \
138 } while (0)
139
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800140static void do_nothing(void *ignored, grpc_op_error also_ignored) {}
Craig Tiller8eb9d472015-01-27 17:00:03 -0800141static send_action choose_send_action(grpc_call *call);
142static void enact_send_action(grpc_call *call, send_action sa);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800143
144grpc_call *grpc_call_create(grpc_channel *channel,
145 const void *server_transport_data) {
146 grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel);
147 grpc_call *call =
148 gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size);
Craig Tillercce17ac2015-01-20 09:29:28 -0800149 memset(call, 0, sizeof(grpc_call));
150 gpr_mu_init(&call->mu);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800151 call->channel = channel;
Craig Tillercce17ac2015-01-20 09:29:28 -0800152 call->is_client = server_transport_data == NULL;
Craig Tiller23aa6c42015-01-27 17:16:12 -0800153 if (call->is_client) {
154 call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].state = REQ_DONE;
155 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800156 grpc_channel_internal_ref(channel);
157 call->metadata_context = grpc_channel_get_metadata_context(channel);
Craig Tillera4541102015-01-29 11:46:11 -0800158 /* one ref is dropped in response to destroy, the other in
159 stream_closed */
160 gpr_ref_init(&call->internal_refcount, 2);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800161 grpc_call_stack_init(channel_stack, server_transport_data,
162 CALL_STACK_FROM_CALL(call));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800163 return call;
164}
165
Craig Tillercce17ac2015-01-20 09:29:28 -0800166legacy_state *get_legacy_state(grpc_call *call) {
167 if (call->legacy_state == NULL) {
168 call->legacy_state = gpr_malloc(sizeof(legacy_state));
169 memset(call->legacy_state, 0, sizeof(legacy_state));
170 }
171 return call->legacy_state;
172}
173
Craig Tillerf63fed72015-01-29 10:49:34 -0800174void grpc_call_internal_ref(grpc_call *c) {
Craig Tillerdddbf692015-01-29 10:25:33 -0800175 gpr_ref(&c->internal_refcount);
176}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800177
Craig Tillera4541102015-01-29 11:46:11 -0800178static void destroy_call(grpc_call *c) {
179 grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c));
180 grpc_channel_internal_unref(c->channel);
181 gpr_mu_destroy(&c->mu);
182 if (c->status_details) {
183 grpc_mdstr_unref(c->status_details);
184 }
185 if (c->legacy_state) {
186 gpr_free(c->legacy_state->md_out);
187 gpr_free(c->legacy_state->md_in.metadata);
188 gpr_free(c->legacy_state->trail_md_in.metadata);
189 /*gpr_free(c->legacy_state->status_in.details);*/
190 gpr_free(c->legacy_state);
191 }
192 gpr_free(c);
193}
194
Craig Tillerf63fed72015-01-29 10:49:34 -0800195void grpc_call_internal_unref(grpc_call *c) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800196 if (gpr_unref(&c->internal_refcount)) {
Craig Tillera4541102015-01-29 11:46:11 -0800197 destroy_call(c);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800198 }
199}
200
Craig Tillercce17ac2015-01-20 09:29:28 -0800201static grpc_call_error bind_cq(grpc_call *call, grpc_completion_queue *cq) {
202 if (call->cq) return GRPC_CALL_ERROR_ALREADY_INVOKED;
203 call->cq = cq;
204 return GRPC_CALL_OK;
205}
206
207static void request_more_data(grpc_call *call) {
208 grpc_call_op op;
209
210 /* call down */
211 op.type = GRPC_REQUEST_DATA;
212 op.dir = GRPC_CALL_DOWN;
213 op.flags = 0;
214 op.done_cb = do_nothing;
215 op.user_data = NULL;
216
217 grpc_call_execute_op(call, &op);
218}
219
Craig Tiller8eb9d472015-01-27 17:00:03 -0800220static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); }
Craig Tillercce17ac2015-01-20 09:29:28 -0800221
Craig Tiller8eb9d472015-01-27 17:00:03 -0800222static void unlock(grpc_call *call) {
Craig Tiller2f38be62015-01-29 10:26:22 -0800223 send_action sa = SEND_NOTHING;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800224 completed_request completed_requests[GRPC_IOREQ_OP_COUNT];
225 int num_completed_requests = call->num_completed_requests;
Craig Tiller2e103572015-01-29 14:12:07 -0800226 int need_more_data = call->need_more_data;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800227 int i;
228
Craig Tiller2e103572015-01-29 14:12:07 -0800229 call->need_more_data = 0;
230
Craig Tiller8eb9d472015-01-27 17:00:03 -0800231 if (num_completed_requests != 0) {
232 memcpy(completed_requests, call->completed_requests,
233 sizeof(completed_requests));
234 call->num_completed_requests = 0;
235 }
236
237 if (!call->sending) {
238 sa = choose_send_action(call);
Craig Tillera9916872015-01-29 12:02:27 -0800239 gpr_log(GPR_DEBUG, "sa=%d", sa);
Craig Tiller8eb9d472015-01-27 17:00:03 -0800240 if (sa != SEND_NOTHING) {
241 call->sending = 1;
Craig Tillera4541102015-01-29 11:46:11 -0800242 grpc_call_internal_ref(call);
Craig Tiller8eb9d472015-01-27 17:00:03 -0800243 }
244 }
245
246 gpr_mu_unlock(&call->mu);
247
Craig Tiller2e103572015-01-29 14:12:07 -0800248 if (need_more_data) {
249 request_more_data(call);
250 }
251
Craig Tiller8eb9d472015-01-27 17:00:03 -0800252 if (sa != SEND_NOTHING) {
253 enact_send_action(call, sa);
254 }
255
256 for (i = 0; i < num_completed_requests; i++) {
257 completed_requests[i].on_complete(call, completed_requests[i].status,
258 completed_requests[i].user_data);
259 }
260}
Craig Tillercce17ac2015-01-20 09:29:28 -0800261
262static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op,
263 grpc_op_error status) {
264 reqinfo *master = call->requests[op].master;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800265 completed_request *cr;
Craig Tillercce17ac2015-01-20 09:29:28 -0800266 size_t i;
Craig Tillera4541102015-01-29 11:46:11 -0800267 gpr_log(GPR_DEBUG, "finish op %d refs=%d", (int)op, (int)call->internal_refcount.count);
Craig Tiller8eb9d472015-01-27 17:00:03 -0800268 switch (call->requests[op].state) {
269 case REQ_INITIAL: /* not started yet */
270 return;
271 case REQ_DONE: /* already finished */
Craig Tiller8eb9d472015-01-27 17:00:03 -0800272 return;
273 case REQ_READY:
274 master->complete_mask |= 1 << op;
275 call->requests[op].state =
276 (op == GRPC_IOREQ_SEND_MESSAGES || op == GRPC_IOREQ_RECV_MESSAGES)
277 ? REQ_INITIAL
278 : REQ_DONE;
279 if (master->complete_mask == master->need_mask ||
280 status == GRPC_OP_ERROR) {
Craig Tiller9724de82015-01-28 17:06:29 -0800281 if (OP_IN_MASK(GRPC_IOREQ_RECV_STATUS, master->need_mask)) {
Craig Tillerf26370d2015-01-29 10:00:11 -0800282 call->requests[GRPC_IOREQ_RECV_STATUS].data.recv_status->status =
283 call->status_code;
284 call->requests[GRPC_IOREQ_RECV_STATUS].data.recv_status->details =
285 call->status_details
286 ? grpc_mdstr_as_c_string(call->status_details)
287 : NULL;
Craig Tiller9724de82015-01-28 17:06:29 -0800288 }
Craig Tiller8eb9d472015-01-27 17:00:03 -0800289 for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
290 if (call->requests[i].master == master) {
291 call->requests[i].master = NULL;
292 }
293 }
294 cr = &call->completed_requests[call->num_completed_requests++];
295 cr->status = status;
296 cr->on_complete = master->on_complete;
297 cr->user_data = master->user_data;
Craig Tillercce17ac2015-01-20 09:29:28 -0800298 }
Craig Tillercce17ac2015-01-20 09:29:28 -0800299 }
300}
301
302static void finish_write_step(void *pc, grpc_op_error error) {
303 grpc_call *call = pc;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800304 lock(call);
Craig Tillercce17ac2015-01-20 09:29:28 -0800305 if (error == GRPC_OP_OK) {
306 if (call->write_index ==
307 call->requests[GRPC_IOREQ_SEND_MESSAGES].data.send_messages.count) {
308 finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGES, GRPC_OP_OK);
309 }
Craig Tillercce17ac2015-01-20 09:29:28 -0800310 } else {
311 finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGES, GRPC_OP_ERROR);
Craig Tillercce17ac2015-01-20 09:29:28 -0800312 }
Craig Tiller8eb9d472015-01-27 17:00:03 -0800313 call->sending = 0;
314 unlock(call);
Craig Tillera4541102015-01-29 11:46:11 -0800315 grpc_call_internal_unref(call);
Craig Tillercce17ac2015-01-20 09:29:28 -0800316}
317
318static void finish_finish_step(void *pc, grpc_op_error error) {
319 grpc_call *call = pc;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800320 lock(call);
Craig Tillercce17ac2015-01-20 09:29:28 -0800321 if (error == GRPC_OP_OK) {
Craig Tilleree2d7022015-01-27 14:09:59 -0800322 finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, GRPC_OP_OK);
Craig Tillercce17ac2015-01-20 09:29:28 -0800323 } else {
324 gpr_log(GPR_ERROR, "not implemented");
325 abort();
326 }
Craig Tiller8eb9d472015-01-27 17:00:03 -0800327 call->sending = 0;
328 unlock(call);
Craig Tillera4541102015-01-29 11:46:11 -0800329 grpc_call_internal_unref(call);
Craig Tillercce17ac2015-01-20 09:29:28 -0800330}
331
Craig Tiller62ac1552015-01-27 15:41:44 -0800332static void finish_start_step(void *pc, grpc_op_error error) {
333 grpc_call *call = pc;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800334 lock(call);
Craig Tiller62ac1552015-01-27 15:41:44 -0800335 if (error == GRPC_OP_OK) {
336 finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, GRPC_OP_OK);
Craig Tiller62ac1552015-01-27 15:41:44 -0800337 } else {
338 gpr_log(GPR_ERROR, "not implemented");
339 abort();
340 }
Craig Tiller8eb9d472015-01-27 17:00:03 -0800341 call->sending = 0;
342 unlock(call);
Craig Tillera4541102015-01-29 11:46:11 -0800343 grpc_call_internal_unref(call);
Craig Tiller62ac1552015-01-27 15:41:44 -0800344}
345
Craig Tiller8eb9d472015-01-27 17:00:03 -0800346static send_action choose_send_action(grpc_call *call) {
347 switch (call->requests[GRPC_IOREQ_SEND_INITIAL_METADATA].state) {
348 case REQ_INITIAL:
349 return SEND_NOTHING;
350 case REQ_READY:
351 return SEND_INITIAL_METADATA;
352 case REQ_DONE:
353 break;
354 }
355 switch (call->requests[GRPC_IOREQ_SEND_MESSAGES].state) {
356 case REQ_INITIAL:
357 return SEND_NOTHING;
358 case REQ_READY:
359 return SEND_MESSAGE;
360 case REQ_DONE:
361 break;
362 }
363 switch (call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].state) {
364 case REQ_INITIAL:
365 return SEND_NOTHING;
366 case REQ_READY:
367 finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, GRPC_OP_OK);
368 return SEND_TRAILING_METADATA;
369 case REQ_DONE:
370 break;
371 }
372 switch (call->requests[GRPC_IOREQ_SEND_CLOSE].state) {
373 default:
374 return SEND_NOTHING;
375 case REQ_READY:
376 return SEND_FINISH;
377 }
378}
379
Craig Tillera5d4e772015-01-29 11:52:37 -0800380static void send_metadata(grpc_call *call, grpc_mdelem *elem) {
381 grpc_call_op op;
382 op.type = GRPC_SEND_METADATA;
383 op.dir = GRPC_CALL_DOWN;
384 op.flags = 0;
385 op.data.metadata = elem;
386 op.done_cb = do_nothing;
387 op.user_data = NULL;
388 grpc_call_execute_op(call, &op);
389}
390
Craig Tiller8eb9d472015-01-27 17:00:03 -0800391static void enact_send_action(grpc_call *call, send_action sa) {
392 grpc_ioreq_data data;
Craig Tiller62ac1552015-01-27 15:41:44 -0800393 grpc_call_op op;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800394 int i;
Craig Tiller62ac1552015-01-27 15:41:44 -0800395
Craig Tiller8eb9d472015-01-27 17:00:03 -0800396 switch (sa) {
397 case SEND_NOTHING:
398 abort();
399 break;
400 case SEND_INITIAL_METADATA:
401 data = call->requests[GRPC_IOREQ_SEND_INITIAL_METADATA].data;
402 for (i = 0; i < data.send_metadata.count; i++) {
403 const grpc_metadata *md = &data.send_metadata.metadata[i];
Craig Tillera5d4e772015-01-29 11:52:37 -0800404 send_metadata(
405 call,
Craig Tiller8eb9d472015-01-27 17:00:03 -0800406 grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key,
407 (const gpr_uint8 *)md->value,
408 md->value_length));
409 }
410 op.type = GRPC_SEND_START;
411 op.dir = GRPC_CALL_DOWN;
412 op.flags = 0;
413 op.data.start.pollset = grpc_cq_pollset(call->cq);
414 op.done_cb = finish_start_step;
415 op.user_data = call;
416 grpc_call_execute_op(call, &op);
417 break;
418 case SEND_MESSAGE:
419 data = call->requests[GRPC_IOREQ_SEND_MESSAGES].data;
420 op.type = GRPC_SEND_MESSAGE;
421 op.dir = GRPC_CALL_DOWN;
422 op.flags = 0;
Craig Tillera9916872015-01-29 12:02:27 -0800423 op.data.message = data.send_messages.messages[call->write_index++];
Craig Tiller8eb9d472015-01-27 17:00:03 -0800424 op.done_cb = finish_write_step;
425 op.user_data = call;
426 grpc_call_execute_op(call, &op);
427 break;
428 case SEND_TRAILING_METADATA:
429 data = call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].data;
430 for (i = 0; i < data.send_metadata.count; i++) {
431 const grpc_metadata *md = &data.send_metadata.metadata[i];
Craig Tillera5d4e772015-01-29 11:52:37 -0800432 send_metadata(
433 call,
Craig Tiller8eb9d472015-01-27 17:00:03 -0800434 grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key,
435 (const gpr_uint8 *)md->value,
436 md->value_length));
437 }
438 lock(call);
439 call->sending = 0;
440 unlock(call);
Craig Tillera4541102015-01-29 11:46:11 -0800441 grpc_call_internal_unref(call);
Craig Tiller8eb9d472015-01-27 17:00:03 -0800442 break;
443 case SEND_FINISH:
Craig Tillerabcf6522015-01-28 15:44:24 -0800444 if (!call->is_client) {
445 /* TODO(ctiller): cache common status values */
446 char status_str[GPR_LTOA_MIN_BUFSIZE];
Craig Tiller9724de82015-01-28 17:06:29 -0800447 data = call->requests[GRPC_IOREQ_SEND_CLOSE].data;
Craig Tillerabcf6522015-01-28 15:44:24 -0800448 gpr_ltoa(data.send_close.status, status_str);
Craig Tillera5d4e772015-01-29 11:52:37 -0800449 send_metadata(
450 call,
Craig Tillerabcf6522015-01-28 15:44:24 -0800451 grpc_mdelem_from_metadata_strings(
452 call->metadata_context,
Craig Tiller8884d7f2015-01-29 10:46:45 -0800453 grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)),
Craig Tillerabcf6522015-01-28 15:44:24 -0800454 grpc_mdstr_from_string(call->metadata_context, status_str)));
455 if (data.send_close.details) {
Craig Tillera5d4e772015-01-29 11:52:37 -0800456 send_metadata(
457 call,
Craig Tillerabcf6522015-01-28 15:44:24 -0800458 grpc_mdelem_from_metadata_strings(
459 call->metadata_context,
Craig Tiller8884d7f2015-01-29 10:46:45 -0800460 grpc_mdstr_ref(grpc_channel_get_message_string(call->channel)),
Craig Tillerabcf6522015-01-28 15:44:24 -0800461 grpc_mdstr_from_string(call->metadata_context,
462 data.send_close.details)));
463 }
464 }
Craig Tiller8eb9d472015-01-27 17:00:03 -0800465 op.type = GRPC_SEND_FINISH;
466 op.dir = GRPC_CALL_DOWN;
467 op.flags = 0;
468 op.done_cb = finish_finish_step;
469 op.user_data = call;
470 grpc_call_execute_op(call, &op);
471 break;
Craig Tiller62ac1552015-01-27 15:41:44 -0800472 }
Craig Tillercce17ac2015-01-20 09:29:28 -0800473}
474
475static grpc_call_error start_ioreq_error(grpc_call *call,
476 gpr_uint32 mutated_ops,
477 grpc_call_error ret) {
478 size_t i;
479 for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
480 if (mutated_ops & (1 << i)) {
481 call->requests[i].master = NULL;
482 }
483 }
Craig Tillercce17ac2015-01-20 09:29:28 -0800484 return ret;
485}
486
Craig Tiller8eb9d472015-01-27 17:00:03 -0800487static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
488 size_t nreqs,
489 grpc_ioreq_completion_func completion,
490 void *user_data) {
Craig Tillercce17ac2015-01-20 09:29:28 -0800491 size_t i;
492 gpr_uint32 have_ops = 0;
493 gpr_uint32 precomplete = 0;
494 grpc_ioreq_op op;
495 reqinfo *master = NULL;
496 reqinfo *requests = call->requests;
497 grpc_ioreq_data data;
498
499 for (i = 0; i < nreqs; i++) {
500 op = reqs[i].op;
501 if (requests[op].master) {
502 return start_ioreq_error(call, have_ops,
503 GRPC_CALL_ERROR_TOO_MANY_OPERATIONS);
504 }
Craig Tiller8eb9d472015-01-27 17:00:03 -0800505 switch (requests[op].state) {
506 case REQ_INITIAL:
507 break;
508 case REQ_READY:
509 return start_ioreq_error(call, have_ops,
510 GRPC_CALL_ERROR_TOO_MANY_OPERATIONS);
511 case REQ_DONE:
512 return start_ioreq_error(call, have_ops,
513 GRPC_CALL_ERROR_ALREADY_INVOKED);
514 }
Craig Tillercce17ac2015-01-20 09:29:28 -0800515 if (master == NULL) {
516 master = &requests[op];
517 }
518 have_ops |= 1 << op;
519 data = reqs[i].data;
520
Craig Tiller8eb9d472015-01-27 17:00:03 -0800521 requests[op].state = REQ_READY;
Craig Tillercce17ac2015-01-20 09:29:28 -0800522 requests[op].data = data;
523 requests[op].master = master;
524 }
525
526 GPR_ASSERT(master != NULL);
527 master->need_mask = have_ops;
528 master->complete_mask = precomplete;
529 master->on_complete = completion;
530 master->user_data = user_data;
531
Craig Tiller2e103572015-01-29 14:12:07 -0800532 for (i = 0; i < nreqs; i++) {
533 op = reqs[i].op;
534 switch (op) {
535 default:
536 break;
537 case GRPC_IOREQ_RECV_MESSAGES:
538 data.recv_messages->count = 0;
539 if (call->buffered_messages.count > 0) {
540 SWAP(grpc_byte_buffer_array, *data.recv_messages,
541 call->buffered_messages);
542 finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGES, GRPC_OP_OK);
543 }
544 break;
545 case GRPC_IOREQ_SEND_MESSAGES:
546 call->write_index = 0;
547 break;
548 case GRPC_IOREQ_SEND_CLOSE:
549 if (requests[GRPC_IOREQ_SEND_MESSAGES].state == REQ_INITIAL) {
550 requests[GRPC_IOREQ_SEND_MESSAGES].state = REQ_DONE;
551 }
552 break;
Craig Tiller23aa6c42015-01-27 17:16:12 -0800553 }
554 }
555
Craig Tillercce17ac2015-01-20 09:29:28 -0800556 if (OP_IN_MASK(GRPC_IOREQ_RECV_MESSAGES, have_ops & ~precomplete)) {
Craig Tiller2e103572015-01-29 14:12:07 -0800557 call->need_more_data = 1;
Craig Tillercce17ac2015-01-20 09:29:28 -0800558 }
559
560 return GRPC_CALL_OK;
561}
562
563static void call_start_ioreq_done(grpc_call *call, grpc_op_error status,
564 void *user_data) {
565 grpc_cq_end_ioreq(call->cq, user_data, call, do_nothing, NULL, status);
566}
567
568grpc_call_error grpc_call_start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
569 size_t nreqs, void *tag) {
Craig Tiller8eb9d472015-01-27 17:00:03 -0800570 grpc_call_error err;
571 lock(call);
572 err = start_ioreq(call, reqs, nreqs, call_start_ioreq_done, tag);
573 unlock(call);
574 return err;
Craig Tillercce17ac2015-01-20 09:29:28 -0800575}
576
577grpc_call_error grpc_call_start_ioreq_and_call_back(
578 grpc_call *call, const grpc_ioreq *reqs, size_t nreqs,
579 grpc_ioreq_completion_func on_complete, void *user_data) {
Craig Tiller8eb9d472015-01-27 17:00:03 -0800580 grpc_call_error err;
581 lock(call);
582 err = start_ioreq(call, reqs, nreqs, on_complete, user_data);
583 unlock(call);
584 return err;
Craig Tillercce17ac2015-01-20 09:29:28 -0800585}
586
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800587void grpc_call_destroy(grpc_call *c) {
ctillerc6d61c42014-12-15 14:52:08 -0800588 int cancel;
Craig Tiller9724de82015-01-28 17:06:29 -0800589 lock(c);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800590 if (c->have_alarm) {
ctiller18b49ab2014-12-09 14:39:16 -0800591 grpc_alarm_cancel(&c->alarm);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800592 c->have_alarm = 0;
593 }
Craig Tillercce17ac2015-01-20 09:29:28 -0800594 cancel = !c->stream_closed;
Craig Tiller9724de82015-01-28 17:06:29 -0800595 unlock(c);
ctillerc6d61c42014-12-15 14:52:08 -0800596 if (cancel) grpc_call_cancel(c);
Craig Tillerf63fed72015-01-29 10:49:34 -0800597 grpc_call_internal_unref(c);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800598}
599
Craig Tillerd248c242015-01-14 11:49:12 -0800600static void maybe_set_status_code(grpc_call *call, gpr_uint32 status) {
Craig Tillercce17ac2015-01-20 09:29:28 -0800601 if (call->got_status_code) return;
602 call->status_code = status;
603 call->got_status_code = 1;
Craig Tillerd248c242015-01-14 11:49:12 -0800604}
605
606static void maybe_set_status_details(grpc_call *call, grpc_mdstr *status) {
Craig Tillercce17ac2015-01-20 09:29:28 -0800607 if (call->status_details != NULL) {
608 grpc_mdstr_unref(status);
609 return;
Craig Tiller6046dc32015-01-14 12:55:45 -0800610 }
Craig Tillercce17ac2015-01-20 09:29:28 -0800611 call->status_details = status;
Craig Tillerd248c242015-01-14 11:49:12 -0800612}
613
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800614grpc_call_error grpc_call_cancel(grpc_call *c) {
615 grpc_call_element *elem;
616 grpc_call_op op;
617
618 op.type = GRPC_CANCEL_OP;
619 op.dir = GRPC_CALL_DOWN;
620 op.flags = 0;
621 op.done_cb = do_nothing;
622 op.user_data = NULL;
623
624 elem = CALL_ELEM_FROM_CALL(c, 0);
ctillerf962f522014-12-10 15:28:27 -0800625 elem->filter->call_op(elem, NULL, &op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800626
627 return GRPC_CALL_OK;
628}
629
Craig Tiller6046dc32015-01-14 12:55:45 -0800630grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
631 grpc_status_code status,
632 const char *description) {
633 grpc_mdstr *details =
634 description ? grpc_mdstr_from_string(c->metadata_context, description)
635 : NULL;
Craig Tiller9724de82015-01-28 17:06:29 -0800636 lock(c);
Craig Tillerd248c242015-01-14 11:49:12 -0800637 maybe_set_status_code(c, status);
638 if (details) {
639 maybe_set_status_details(c, details);
640 }
Craig Tiller9724de82015-01-28 17:06:29 -0800641 unlock(c);
Craig Tillerd248c242015-01-14 11:49:12 -0800642 return grpc_call_cancel(c);
643}
644
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800645void grpc_call_execute_op(grpc_call *call, grpc_call_op *op) {
646 grpc_call_element *elem;
647 GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
648 elem = CALL_ELEM_FROM_CALL(call, 0);
ctillerf962f522014-12-10 15:28:27 -0800649 elem->filter->call_op(elem, NULL, op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800650}
651
Craig Tiller62ac1552015-01-27 15:41:44 -0800652grpc_call_error grpc_call_add_metadata(grpc_call *call, grpc_metadata *metadata,
653 gpr_uint32 flags) {
654 legacy_state *ls;
655 grpc_metadata *mdout;
656
Craig Tiller8eb9d472015-01-27 17:00:03 -0800657 lock(call);
Craig Tiller62ac1552015-01-27 15:41:44 -0800658 ls = get_legacy_state(call);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800659
Craig Tillercce17ac2015-01-20 09:29:28 -0800660 if (ls->md_out_count == ls->md_out_capacity) {
661 ls->md_out_capacity =
662 GPR_MAX(ls->md_out_count * 3 / 2, ls->md_out_count + 8);
663 ls->md_out =
664 gpr_realloc(ls->md_out, sizeof(grpc_mdelem *) * ls->md_out_capacity);
665 }
Craig Tiller62ac1552015-01-27 15:41:44 -0800666 mdout = &ls->md_out[ls->md_out_count++];
667 mdout->key = gpr_strdup(metadata->key);
668 mdout->value = gpr_malloc(metadata->value_length);
669 mdout->value_length = metadata->value_length;
670 memcpy(mdout->value, metadata->value, metadata->value_length);
klempnerc463f742014-12-19 13:03:35 -0800671
Craig Tiller8eb9d472015-01-27 17:00:03 -0800672 unlock(call);
Craig Tiller62ac1552015-01-27 15:41:44 -0800673
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800674 return GRPC_CALL_OK;
675}
676
Craig Tillercce17ac2015-01-20 09:29:28 -0800677static void finish_status(grpc_call *call, grpc_op_error status, void *tag) {
678 legacy_state *ls;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800679
Craig Tiller8eb9d472015-01-27 17:00:03 -0800680 lock(call);
Craig Tillercce17ac2015-01-20 09:29:28 -0800681 ls = get_legacy_state(call);
Craig Tiller8eb9d472015-01-27 17:00:03 -0800682 unlock(call);
Craig Tiller80fa15c2015-01-13 16:10:49 -0800683
Craig Tillercce17ac2015-01-20 09:29:28 -0800684 if (status == GRPC_OP_OK) {
685 grpc_cq_end_finished(call->cq, tag, call, do_nothing, NULL,
686 ls->status_in.status, ls->status_in.details,
687 ls->trail_md_in.metadata, ls->trail_md_in.count);
688 } else {
689 grpc_cq_end_finished(call->cq, tag, call, do_nothing, NULL,
690 GRPC_STATUS_UNKNOWN, "Read status failed", NULL, 0);
Craig Tiller80fa15c2015-01-13 16:10:49 -0800691 }
Craig Tillercce17ac2015-01-20 09:29:28 -0800692}
Craig Tiller80fa15c2015-01-13 16:10:49 -0800693
Craig Tillercce17ac2015-01-20 09:29:28 -0800694static void finish_recv_metadata(grpc_call *call, grpc_op_error status,
695 void *tag) {
Craig Tillercce17ac2015-01-20 09:29:28 -0800696 legacy_state *ls;
697
Craig Tiller8eb9d472015-01-27 17:00:03 -0800698 lock(call);
Craig Tillercce17ac2015-01-20 09:29:28 -0800699 ls = get_legacy_state(call);
700 if (status == GRPC_OP_OK) {
701 grpc_cq_end_client_metadata_read(call->cq, tag, call, do_nothing, NULL,
702 ls->md_in.count, ls->md_in.metadata);
703
Craig Tiller62ac1552015-01-27 15:41:44 -0800704 } else {
705 grpc_cq_end_client_metadata_read(call->cq, tag, call, do_nothing, NULL, 0,
706 NULL);
707 }
Craig Tiller8eb9d472015-01-27 17:00:03 -0800708 unlock(call);
Craig Tiller62ac1552015-01-27 15:41:44 -0800709}
710
711static void finish_send_metadata(grpc_call *call, grpc_op_error status,
712 void *metadata_read_tag) {
713 grpc_ioreq reqs[2];
714 legacy_state *ls;
715
Craig Tiller8eb9d472015-01-27 17:00:03 -0800716 lock(call);
Craig Tiller62ac1552015-01-27 15:41:44 -0800717 if (status == GRPC_OP_OK) {
Craig Tiller62ac1552015-01-27 15:41:44 -0800718 ls = get_legacy_state(call);
719 reqs[0].op = GRPC_IOREQ_RECV_INITIAL_METADATA;
720 reqs[0].data.recv_metadata = &ls->md_in;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800721 GPR_ASSERT(GRPC_CALL_OK == start_ioreq(call, reqs, 1, finish_recv_metadata,
722 metadata_read_tag));
Craig Tiller62ac1552015-01-27 15:41:44 -0800723
Craig Tiller62ac1552015-01-27 15:41:44 -0800724 ls = get_legacy_state(call);
Craig Tillercce17ac2015-01-20 09:29:28 -0800725 reqs[0].op = GRPC_IOREQ_RECV_TRAILING_METADATA;
726 reqs[0].data.recv_metadata = &ls->trail_md_in;
727 reqs[1].op = GRPC_IOREQ_RECV_STATUS;
728 reqs[1].data.recv_status = &ls->status_in;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800729 GPR_ASSERT(GRPC_CALL_OK ==
730 start_ioreq(call, reqs, 2, finish_status, ls->finished_tag));
Craig Tillercce17ac2015-01-20 09:29:28 -0800731 } else {
Craig Tiller62ac1552015-01-27 15:41:44 -0800732 ls = get_legacy_state(call);
733 grpc_cq_end_client_metadata_read(call->cq, metadata_read_tag, call,
734 do_nothing, NULL, 0, NULL);
Craig Tillercce17ac2015-01-20 09:29:28 -0800735 grpc_cq_end_finished(call->cq, ls->finished_tag, call, do_nothing, NULL,
736 GRPC_STATUS_UNKNOWN, "Failed to read initial metadata",
737 NULL, 0);
Craig Tiller80fa15c2015-01-13 16:10:49 -0800738 }
Craig Tiller8eb9d472015-01-27 17:00:03 -0800739 unlock(call);
Craig Tiller80fa15c2015-01-13 16:10:49 -0800740}
741
Craig Tiller40fc7a62015-01-13 16:11:58 -0800742grpc_call_error grpc_call_invoke(grpc_call *call, grpc_completion_queue *cq,
743 void *metadata_read_tag, void *finished_tag,
744 gpr_uint32 flags) {
Craig Tillercce17ac2015-01-20 09:29:28 -0800745 grpc_ioreq req;
746 legacy_state *ls = get_legacy_state(call);
747 grpc_call_error err;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800748
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800749 grpc_cq_begin_op(cq, call, GRPC_CLIENT_METADATA_READ);
Craig Tillercce17ac2015-01-20 09:29:28 -0800750 grpc_cq_begin_op(cq, call, GRPC_FINISHED);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800751
Craig Tiller8eb9d472015-01-27 17:00:03 -0800752 lock(call);
Craig Tillercce17ac2015-01-20 09:29:28 -0800753 err = bind_cq(call, cq);
754 if (err != GRPC_CALL_OK) return err;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800755
Craig Tiller9724de82015-01-28 17:06:29 -0800756 get_legacy_state(call)->finished_tag = finished_tag;
757
Craig Tiller62ac1552015-01-27 15:41:44 -0800758 req.op = GRPC_IOREQ_SEND_INITIAL_METADATA;
759 req.data.send_metadata.count = ls->md_out_count;
760 req.data.send_metadata.metadata = ls->md_out;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800761 err = start_ioreq(call, &req, 1, finish_send_metadata, metadata_read_tag);
762 unlock(call);
763 return err;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800764}
765
nnoble0c475f02014-12-05 15:37:39 -0800766grpc_call_error grpc_call_server_accept(grpc_call *call,
767 grpc_completion_queue *cq,
768 void *finished_tag) {
Craig Tillercce17ac2015-01-20 09:29:28 -0800769 grpc_ioreq req;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800770 grpc_call_error err;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800771
772 /* inform the completion queue of an incoming operation (corresponding to
773 finished_tag) */
774 grpc_cq_begin_op(cq, call, GRPC_FINISHED);
775
Craig Tiller8884d7f2015-01-29 10:46:45 -0800776 lock(call);
Craig Tiller8eb9d472015-01-27 17:00:03 -0800777 err = bind_cq(call, cq);
778 if (err != GRPC_CALL_OK) return err;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800779
Craig Tillercce17ac2015-01-20 09:29:28 -0800780 req.op = GRPC_IOREQ_RECV_STATUS;
781 req.data.recv_status = &get_legacy_state(call)->status_in;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800782 err = start_ioreq(call, &req, 1, finish_status, finished_tag);
783 unlock(call);
784 return err;
nnoble0c475f02014-12-05 15:37:39 -0800785}
786
Craig Tillerabcf6522015-01-28 15:44:24 -0800787static void finish_send_initial_metadata(grpc_call *call, grpc_op_error status,
788 void *tag) {}
Craig Tiller39fd4282015-01-28 09:12:31 -0800789
nnoble0c475f02014-12-05 15:37:39 -0800790grpc_call_error grpc_call_server_end_initial_metadata(grpc_call *call,
791 gpr_uint32 flags) {
Craig Tiller39fd4282015-01-28 09:12:31 -0800792 grpc_ioreq req;
793 grpc_call_error err;
794 legacy_state *ls;
795
796 lock(call);
797 ls = get_legacy_state(call);
798 req.op = GRPC_IOREQ_SEND_INITIAL_METADATA;
799 req.data.send_metadata.count = ls->md_out_count;
800 req.data.send_metadata.metadata = ls->md_out;
801 err = start_ioreq(call, &req, 1, finish_send_initial_metadata, NULL);
802 unlock(call);
Craig Tillerabcf6522015-01-28 15:44:24 -0800803
Craig Tiller39fd4282015-01-28 09:12:31 -0800804 return err;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800805}
806
Craig Tiller4069b682015-01-29 14:01:19 -0800807void grpc_call_initial_metadata_complete(
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800808 grpc_call_element *surface_element) {
809 grpc_call *call = grpc_call_from_top_element(surface_element);
Craig Tiller8eb9d472015-01-27 17:00:03 -0800810 lock(call);
Craig Tillercce17ac2015-01-20 09:29:28 -0800811 call->got_initial_metadata = 1;
812 finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK);
Craig Tiller8eb9d472015-01-27 17:00:03 -0800813 unlock(call);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800814}
815
Craig Tillercce17ac2015-01-20 09:29:28 -0800816static void finish_read(grpc_call *call, grpc_op_error error, void *tag) {
817 legacy_state *ls;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800818 lock(call);
Craig Tillercce17ac2015-01-20 09:29:28 -0800819 ls = get_legacy_state(call);
820 if (ls->msg_in.count == 0) {
821 grpc_cq_end_read(call->cq, tag, call, do_nothing, NULL, NULL);
822 } else {
823 grpc_cq_end_read(call->cq, tag, call, do_nothing, NULL,
824 ls->msg_in.buffers[ls->msg_in_read_idx++]);
825 }
Craig Tiller8eb9d472015-01-27 17:00:03 -0800826 unlock(call);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800827}
828
829grpc_call_error grpc_call_start_read(grpc_call *call, void *tag) {
Craig Tillercce17ac2015-01-20 09:29:28 -0800830 legacy_state *ls;
831 grpc_ioreq req;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800832 grpc_call_error err;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800833
834 grpc_cq_begin_op(call->cq, call, GRPC_READ);
835
Craig Tiller8eb9d472015-01-27 17:00:03 -0800836 lock(call);
Craig Tillercce17ac2015-01-20 09:29:28 -0800837 ls = get_legacy_state(call);
838
839 if (ls->msg_in_read_idx == ls->msg_in.count) {
840 ls->msg_in_read_idx = 0;
841 req.op = GRPC_IOREQ_RECV_MESSAGES;
842 req.data.recv_messages = &ls->msg_in;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800843 err = start_ioreq(call, &req, 1, finish_read, tag);
844 } else {
845 err = GRPC_CALL_OK;
846 grpc_cq_end_read(call->cq, tag, call, do_nothing, NULL,
847 ls->msg_in.buffers[ls->msg_in_read_idx++]);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800848 }
Craig Tiller8eb9d472015-01-27 17:00:03 -0800849 unlock(call);
850 return err;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800851}
852
Craig Tillercce17ac2015-01-20 09:29:28 -0800853static void finish_write(grpc_call *call, grpc_op_error status, void *tag) {
Craig Tillera4541102015-01-29 11:46:11 -0800854 lock(call);
855 grpc_byte_buffer_destroy(get_legacy_state(call)->msg_out);
856 unlock(call);
Craig Tillercce17ac2015-01-20 09:29:28 -0800857 grpc_cq_end_write_accepted(call->cq, tag, call, do_nothing, NULL, status);
858}
859
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800860grpc_call_error grpc_call_start_write(grpc_call *call,
861 grpc_byte_buffer *byte_buffer, void *tag,
862 gpr_uint32 flags) {
Craig Tillercce17ac2015-01-20 09:29:28 -0800863 grpc_ioreq req;
864 legacy_state *ls;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800865 grpc_call_error err;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800866
867 grpc_cq_begin_op(call->cq, call, GRPC_WRITE_ACCEPTED);
868
Craig Tiller8eb9d472015-01-27 17:00:03 -0800869 lock(call);
Craig Tillercce17ac2015-01-20 09:29:28 -0800870 ls = get_legacy_state(call);
Craig Tillera4541102015-01-29 11:46:11 -0800871 ls->msg_out = grpc_byte_buffer_copy(byte_buffer);
Craig Tillercce17ac2015-01-20 09:29:28 -0800872 req.op = GRPC_IOREQ_SEND_MESSAGES;
873 req.data.send_messages.count = 1;
874 req.data.send_messages.messages = &ls->msg_out;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800875 err = start_ioreq(call, &req, 1, finish_write, tag);
876 unlock(call);
877
878 return err;
Craig Tillercce17ac2015-01-20 09:29:28 -0800879}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800880
Craig Tillercce17ac2015-01-20 09:29:28 -0800881static void finish_finish(grpc_call *call, grpc_op_error status, void *tag) {
882 grpc_cq_end_finish_accepted(call->cq, tag, call, do_nothing, NULL, status);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800883}
884
885grpc_call_error grpc_call_writes_done(grpc_call *call, void *tag) {
Craig Tillercce17ac2015-01-20 09:29:28 -0800886 grpc_ioreq req;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800887 grpc_call_error err;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800888 grpc_cq_begin_op(call->cq, call, GRPC_FINISH_ACCEPTED);
889
Craig Tiller8eb9d472015-01-27 17:00:03 -0800890 lock(call);
Craig Tillercce17ac2015-01-20 09:29:28 -0800891 req.op = GRPC_IOREQ_SEND_CLOSE;
Craig Tiller8eb9d472015-01-27 17:00:03 -0800892 err = start_ioreq(call, &req, 1, finish_finish, tag);
893 unlock(call);
894
895 return err;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800896}
897
898grpc_call_error grpc_call_start_write_status(grpc_call *call,
ctiller2845cad2014-12-15 15:14:12 -0800899 grpc_status_code status,
900 const char *details, void *tag) {
Craig Tillerf31d14c2015-01-28 09:26:42 -0800901 grpc_ioreq reqs[2];
Craig Tiller8eb9d472015-01-27 17:00:03 -0800902 grpc_call_error err;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800903 grpc_cq_begin_op(call->cq, call, GRPC_FINISH_ACCEPTED);
904
Craig Tiller8eb9d472015-01-27 17:00:03 -0800905 lock(call);
Craig Tillerf31d14c2015-01-28 09:26:42 -0800906 reqs[0].op = GRPC_IOREQ_SEND_TRAILING_METADATA;
907 reqs[0].data.send_metadata.count = call->legacy_state->md_out_count;
908 reqs[0].data.send_metadata.metadata = call->legacy_state->md_out;
909 reqs[1].op = GRPC_IOREQ_SEND_CLOSE;
910 reqs[1].data.send_close.status = status;
911 reqs[1].data.send_close.details = details;
912 err = start_ioreq(call, reqs, 2, finish_finish, tag);
Craig Tiller8eb9d472015-01-27 17:00:03 -0800913 unlock(call);
914
915 return err;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800916}
917
918grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
919 return CALL_FROM_TOP_ELEM(elem);
920}
921
ctiller58393c22015-01-07 14:03:30 -0800922static void call_alarm(void *arg, int success) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800923 grpc_call *call = arg;
ctiller58393c22015-01-07 14:03:30 -0800924 if (success) {
Craig Tiller7b018782015-01-16 09:53:39 -0800925 if (call->is_client) {
926 grpc_call_cancel_with_status(call, GRPC_STATUS_DEADLINE_EXCEEDED,
927 "Deadline Exceeded");
928 } else {
Craig Tillerc9bbff22015-01-16 10:18:37 -0800929 grpc_call_cancel(call);
930 }
Craig Tiller7b018782015-01-16 09:53:39 -0800931 }
Craig Tillerf63fed72015-01-29 10:49:34 -0800932 grpc_call_internal_unref(call);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800933}
934
935void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) {
936 grpc_call *call = CALL_FROM_TOP_ELEM(elem);
937
938 if (call->have_alarm) {
939 gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice");
940 }
Craig Tillerf63fed72015-01-29 10:49:34 -0800941 grpc_call_internal_ref(call);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800942 call->have_alarm = 1;
ctiller769b70b2014-12-16 10:44:15 -0800943 grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now());
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800944}
Craig Tillerd631cf32015-01-27 10:35:01 -0800945
Craig Tillercce17ac2015-01-20 09:29:28 -0800946void grpc_call_read_closed(grpc_call_element *elem) {
947 grpc_call *call = CALL_FROM_TOP_ELEM(elem);
Craig Tiller8eb9d472015-01-27 17:00:03 -0800948 lock(call);
Craig Tillercce17ac2015-01-20 09:29:28 -0800949 GPR_ASSERT(!call->read_closed);
950 call->read_closed = 1;
951 finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGES, GRPC_OP_OK);
952 finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK);
953 finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK);
Craig Tiller8eb9d472015-01-27 17:00:03 -0800954 unlock(call);
Craig Tillercce17ac2015-01-20 09:29:28 -0800955}
956
957void grpc_call_stream_closed(grpc_call_element *elem) {
958 grpc_call *call = CALL_FROM_TOP_ELEM(elem);
Craig Tiller8eb9d472015-01-27 17:00:03 -0800959 lock(call);
Craig Tillera4541102015-01-29 11:46:11 -0800960 GPR_ASSERT(!call->stream_closed);
Craig Tillercce17ac2015-01-20 09:29:28 -0800961 if (!call->read_closed) {
962 call->read_closed = 1;
963 finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGES, GRPC_OP_OK);
964 finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK);
965 finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK);
966 }
967 call->stream_closed = 1;
968 finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_OK);
Craig Tiller8eb9d472015-01-27 17:00:03 -0800969 unlock(call);
Craig Tillera4541102015-01-29 11:46:11 -0800970 grpc_call_internal_unref(call);
Craig Tillercce17ac2015-01-20 09:29:28 -0800971}
972
973/* we offset status by a small amount when storing it into transport metadata
974 as metadata cannot store a 0 value (which is used as OK for grpc_status_codes
975 */
976#define STATUS_OFFSET 1
977static void destroy_status(void *ignored) {}
978
979static gpr_uint32 decode_status(grpc_mdelem *md) {
980 gpr_uint32 status;
981 void *user_data = grpc_mdelem_get_user_data(md, destroy_status);
982 if (user_data) {
Craig Tillerf26370d2015-01-29 10:00:11 -0800983 status = ((gpr_uint32)(gpr_intptr) user_data) - STATUS_OFFSET;
Craig Tillercce17ac2015-01-20 09:29:28 -0800984 } else {
985 if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value),
986 GPR_SLICE_LENGTH(md->value->slice),
987 &status)) {
988 status = GRPC_STATUS_UNKNOWN; /* could not parse status code */
989 }
990 grpc_mdelem_set_user_data(md, destroy_status,
991 (void *)(gpr_intptr)(status + STATUS_OFFSET));
992 }
993 return status;
994}
995
996void grpc_call_recv_message(grpc_call_element *elem,
997 grpc_byte_buffer *byte_buffer) {
998 grpc_call *call = CALL_FROM_TOP_ELEM(elem);
999 grpc_byte_buffer_array *dest;
Craig Tiller8eb9d472015-01-27 17:00:03 -08001000 lock(call);
Craig Tillercce17ac2015-01-20 09:29:28 -08001001 if (call->requests[GRPC_IOREQ_RECV_MESSAGES].master != NULL) {
1002 dest = call->requests[GRPC_IOREQ_RECV_MESSAGES].data.recv_messages;
1003 } else {
1004 dest = &call->buffered_messages;
1005 }
1006 if (dest->count == dest->capacity) {
1007 dest->capacity = GPR_MAX(dest->capacity + 1, dest->capacity * 3 / 2);
1008 dest->buffers =
1009 gpr_realloc(dest->buffers, sizeof(grpc_byte_buffer *) * dest->capacity);
1010 }
1011 dest->buffers[dest->count++] = byte_buffer;
1012 finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGES, GRPC_OP_OK);
Craig Tiller8eb9d472015-01-27 17:00:03 -08001013 unlock(call);
Craig Tillercce17ac2015-01-20 09:29:28 -08001014}
1015
1016void grpc_call_recv_metadata(grpc_call_element *elem, grpc_mdelem *md) {
1017 grpc_call *call = CALL_FROM_TOP_ELEM(elem);
1018 grpc_mdstr *key = md->key;
1019 grpc_metadata_array *dest;
1020 grpc_metadata *mdusr;
1021
Craig Tiller8eb9d472015-01-27 17:00:03 -08001022 lock(call);
Craig Tillercce17ac2015-01-20 09:29:28 -08001023 if (key == grpc_channel_get_status_string(call->channel)) {
1024 maybe_set_status_code(call, decode_status(md));
1025 grpc_mdelem_unref(md);
1026 } else if (key == grpc_channel_get_message_string(call->channel)) {
Craig Tillerd0179b42015-01-29 12:32:47 -08001027 maybe_set_status_details(call, grpc_mdstr_ref(md->value));
Craig Tillercce17ac2015-01-20 09:29:28 -08001028 grpc_mdelem_unref(md);
1029 } else {
1030 if (!call->got_initial_metadata) {
Craig Tiller8eb9d472015-01-27 17:00:03 -08001031 dest = call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA].state == REQ_READY
Craig Tillercce17ac2015-01-20 09:29:28 -08001032 ? call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA]
1033 .data.recv_metadata
1034 : &call->buffered_initial_metadata;
1035 } else {
Craig Tiller8eb9d472015-01-27 17:00:03 -08001036 dest = call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA].state == REQ_READY
Craig Tillercce17ac2015-01-20 09:29:28 -08001037 ? call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA]
1038 .data.recv_metadata
1039 : &call->buffered_trailing_metadata;
1040 }
1041 if (dest->count == dest->capacity) {
Craig Tillera4541102015-01-29 11:46:11 -08001042 dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2);
Craig Tillercce17ac2015-01-20 09:29:28 -08001043 dest->metadata =
1044 gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity);
1045 }
1046 mdusr = &dest->metadata[dest->count++];
1047 mdusr->key = (char *)grpc_mdstr_as_c_string(md->key);
1048 mdusr->value = (char *)grpc_mdstr_as_c_string(md->value);
1049 mdusr->value_length = GPR_SLICE_LENGTH(md->value->slice);
1050 }
Craig Tiller8eb9d472015-01-27 17:00:03 -08001051 unlock(call);
Craig Tillercce17ac2015-01-20 09:29:28 -08001052}
1053
Craig Tillerd631cf32015-01-27 10:35:01 -08001054grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) {
1055 return CALL_STACK_FROM_CALL(call);
1056}