| /* |
| * |
| * Copyright 2016, Google Inc. |
| * All rights reserved. |
| * |
| * Redistribution and use in source and binary forms, with or without |
| * modification, are permitted provided that the following conditions are |
| * met: |
| * |
| * * Redistributions of source code must retain the above copyright |
| * notice, this list of conditions and the following disclaimer. |
| * * Redistributions in binary form must reproduce the above |
| * copyright notice, this list of conditions and the following disclaimer |
| * in the documentation and/or other materials provided with the |
| * distribution. |
| * * Neither the name of Google Inc. nor the names of its |
| * contributors may be used to endorse or promote products derived from |
| * this software without specific prior written permission. |
| * |
| * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| * |
| */ |
| |
| #include <string.h> |
| |
| #include <grpc/impl/codegen/port_platform.h> |
| #include <grpc/slice_buffer.h> |
| #include <grpc/support/alloc.h> |
| #include <grpc/support/host_port.h> |
| #include <grpc/support/log.h> |
| #include <grpc/support/string_util.h> |
| #include <grpc/support/useful.h> |
| |
| #include "src/core/ext/transport/chttp2/transport/incoming_metadata.h" |
| #include "src/core/lib/iomgr/exec_ctx.h" |
| #include "src/core/lib/support/string.h" |
| #include "src/core/lib/surface/channel.h" |
| #include "src/core/lib/transport/metadata_batch.h" |
| #include "src/core/lib/transport/static_metadata.h" |
| #include "src/core/lib/transport/transport_impl.h" |
| #include "third_party/objective_c/Cronet/cronet_c_for_grpc.h" |
| |
| #define GRPC_HEADER_SIZE_IN_BYTES 5 |
| |
| #define CRONET_LOG(...) \ |
| do { \ |
| if (grpc_cronet_trace) gpr_log(__VA_ARGS__); \ |
| } while (0) |
| |
| /* TODO (makdharma): Hook up into the wider tracing mechanism */ |
| int grpc_cronet_trace = 0; |
| |
| enum e_op_result { |
| ACTION_TAKEN_WITH_CALLBACK, |
| ACTION_TAKEN_NO_CALLBACK, |
| NO_ACTION_POSSIBLE |
| }; |
| |
| enum e_op_id { |
| OP_SEND_INITIAL_METADATA = 0, |
| OP_SEND_MESSAGE, |
| OP_SEND_TRAILING_METADATA, |
| OP_RECV_MESSAGE, |
| OP_RECV_INITIAL_METADATA, |
| OP_RECV_TRAILING_METADATA, |
| OP_CANCEL_ERROR, |
| OP_ON_COMPLETE, |
| OP_FAILED, |
| OP_SUCCEEDED, |
| OP_CANCELED, |
| OP_RECV_MESSAGE_AND_ON_COMPLETE, |
| OP_READ_REQ_MADE, |
| OP_NUM_OPS |
| }; |
| |
| /* Cronet callbacks. See cronet_c_for_grpc.h for documentation for each. */ |
| |
| static void on_request_headers_sent(cronet_bidirectional_stream *); |
| static void on_response_headers_received( |
| cronet_bidirectional_stream *, |
| const cronet_bidirectional_stream_header_array *, const char *); |
| static void on_write_completed(cronet_bidirectional_stream *, const char *); |
| static void on_read_completed(cronet_bidirectional_stream *, char *, int); |
| static void on_response_trailers_received( |
| cronet_bidirectional_stream *, |
| const cronet_bidirectional_stream_header_array *); |
| static void on_succeeded(cronet_bidirectional_stream *); |
| static void on_failed(cronet_bidirectional_stream *, int); |
| static void on_canceled(cronet_bidirectional_stream *); |
| static cronet_bidirectional_stream_callback cronet_callbacks = { |
| on_request_headers_sent, |
| on_response_headers_received, |
| on_read_completed, |
| on_write_completed, |
| on_response_trailers_received, |
| on_succeeded, |
| on_failed, |
| on_canceled}; |
| |
| /* Cronet transport object */ |
| struct grpc_cronet_transport { |
| grpc_transport base; /* must be first element in this structure */ |
| cronet_engine *engine; |
| char *host; |
| }; |
| typedef struct grpc_cronet_transport grpc_cronet_transport; |
| |
| /* TODO (makdharma): reorder structure for memory efficiency per |
| http://www.catb.org/esr/structure-packing/#_structure_reordering: */ |
| struct read_state { |
| /* vars to store data coming from server */ |
| char *read_buffer; |
| bool length_field_received; |
| int received_bytes; |
| int remaining_bytes; |
| int length_field; |
| char grpc_header_bytes[GRPC_HEADER_SIZE_IN_BYTES]; |
| char *payload_field; |
| bool read_stream_closed; |
| |
| /* vars for holding data destined for the application */ |
| struct grpc_slice_buffer_stream sbs; |
| grpc_slice_buffer read_slice_buffer; |
| |
| /* vars for trailing metadata */ |
| grpc_chttp2_incoming_metadata_buffer trailing_metadata; |
| bool trailing_metadata_valid; |
| |
| /* vars for initial metadata */ |
| grpc_chttp2_incoming_metadata_buffer initial_metadata; |
| }; |
| |
| struct write_state { |
| char *write_buffer; |
| }; |
| |
| /* track state of one stream op */ |
| struct op_state { |
| bool state_op_done[OP_NUM_OPS]; |
| bool state_callback_received[OP_NUM_OPS]; |
| /* data structure for storing data coming from server */ |
| struct read_state rs; |
| /* data structure for storing data going to the server */ |
| struct write_state ws; |
| }; |
| |
| struct op_and_state { |
| grpc_transport_stream_op op; |
| struct op_state state; |
| bool done; |
| struct stream_obj *s; /* Pointer back to the stream object */ |
| struct op_and_state *next; /* next op_and_state in the linked list */ |
| }; |
| |
| struct op_storage { |
| int num_pending_ops; |
| struct op_and_state *head; |
| }; |
| |
| struct stream_obj { |
| struct op_and_state *oas; |
| grpc_transport_stream_op *curr_op; |
| grpc_cronet_transport curr_ct; |
| grpc_stream *curr_gs; |
| cronet_bidirectional_stream *cbs; |
| cronet_bidirectional_stream_header_array header_array; |
| |
| /* Stream level state. Some state will be tracked both at stream and stream_op |
| * level */ |
| struct op_state state; |
| |
| /* OP storage */ |
| struct op_storage storage; |
| |
| /* Mutex to protect storage */ |
| gpr_mu mu; |
| }; |
| typedef struct stream_obj stream_obj; |
| |
| static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, |
| struct op_and_state *oas); |
| |
| /* |
| Utility function to translate enum into string for printing |
| */ |
| static const char *op_result_string(enum e_op_result i) { |
| switch (i) { |
| case ACTION_TAKEN_WITH_CALLBACK: |
| return "ACTION_TAKEN_WITH_CALLBACK"; |
| case ACTION_TAKEN_NO_CALLBACK: |
| return "ACTION_TAKEN_NO_CALLBACK"; |
| case NO_ACTION_POSSIBLE: |
| return "NO_ACTION_POSSIBLE"; |
| } |
| GPR_UNREACHABLE_CODE(return "UNKNOWN"); |
| } |
| |
| static const char *op_id_string(enum e_op_id i) { |
| switch (i) { |
| case OP_SEND_INITIAL_METADATA: |
| return "OP_SEND_INITIAL_METADATA"; |
| case OP_SEND_MESSAGE: |
| return "OP_SEND_MESSAGE"; |
| case OP_SEND_TRAILING_METADATA: |
| return "OP_SEND_TRAILING_METADATA"; |
| case OP_RECV_MESSAGE: |
| return "OP_RECV_MESSAGE"; |
| case OP_RECV_INITIAL_METADATA: |
| return "OP_RECV_INITIAL_METADATA"; |
| case OP_RECV_TRAILING_METADATA: |
| return "OP_RECV_TRAILING_METADATA"; |
| case OP_CANCEL_ERROR: |
| return "OP_CANCEL_ERROR"; |
| case OP_ON_COMPLETE: |
| return "OP_ON_COMPLETE"; |
| case OP_FAILED: |
| return "OP_FAILED"; |
| case OP_SUCCEEDED: |
| return "OP_SUCCEEDED"; |
| case OP_CANCELED: |
| return "OP_CANCELED"; |
| case OP_RECV_MESSAGE_AND_ON_COMPLETE: |
| return "OP_RECV_MESSAGE_AND_ON_COMPLETE"; |
| case OP_READ_REQ_MADE: |
| return "OP_READ_REQ_MADE"; |
| case OP_NUM_OPS: |
| return "OP_NUM_OPS"; |
| } |
| return "UNKNOWN"; |
| } |
| |
| static void free_read_buffer(stream_obj *s) { |
| if (s->state.rs.read_buffer && |
| s->state.rs.read_buffer != s->state.rs.grpc_header_bytes) { |
| gpr_free(s->state.rs.read_buffer); |
| s->state.rs.read_buffer = NULL; |
| } |
| } |
| |
| /* |
| Add a new stream op to op storage. |
| */ |
| static void add_to_storage(struct stream_obj *s, grpc_transport_stream_op *op) { |
| struct op_storage *storage = &s->storage; |
| /* add new op at the beginning of the linked list. The memory is freed |
| in remove_from_storage */ |
| struct op_and_state *new_op = gpr_malloc(sizeof(struct op_and_state)); |
| memcpy(&new_op->op, op, sizeof(grpc_transport_stream_op)); |
| memset(&new_op->state, 0, sizeof(new_op->state)); |
| new_op->s = s; |
| new_op->done = false; |
| gpr_mu_lock(&s->mu); |
| new_op->next = storage->head; |
| storage->head = new_op; |
| storage->num_pending_ops++; |
| CRONET_LOG(GPR_DEBUG, "adding new op %p. %d in the queue.", new_op, |
| storage->num_pending_ops); |
| gpr_mu_unlock(&s->mu); |
| } |
| |
| /* |
| Traverse the linked list and delete op and free memory |
| */ |
| static void remove_from_storage(struct stream_obj *s, |
| struct op_and_state *oas) { |
| struct op_and_state *curr; |
| if (s->storage.head == NULL || oas == NULL) { |
| return; |
| } |
| if (s->storage.head == oas) { |
| s->storage.head = oas->next; |
| gpr_free(oas); |
| s->storage.num_pending_ops--; |
| CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas, |
| s->storage.num_pending_ops); |
| } else { |
| for (curr = s->storage.head; curr != NULL; curr = curr->next) { |
| if (curr->next == oas) { |
| curr->next = oas->next; |
| s->storage.num_pending_ops--; |
| CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas, |
| s->storage.num_pending_ops); |
| gpr_free(oas); |
| break; |
| } else if (curr->next == NULL) { |
| CRONET_LOG(GPR_ERROR, "Reached end of LL and did not find op to free"); |
| } |
| } |
| } |
| } |
| |
| /* |
| Cycle through ops and try to take next action. Break when either |
| an action with callback is taken, or no action is possible. |
| This can get executed from the Cronet network thread via cronet callback |
| or on the application supplied thread via the perform_stream_op function. |
| */ |
| static void execute_from_storage(stream_obj *s) { |
| grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; |
| gpr_mu_lock(&s->mu); |
| for (struct op_and_state *curr = s->storage.head; curr != NULL;) { |
| CRONET_LOG(GPR_DEBUG, "calling op at %p. done = %d", curr, curr->done); |
| GPR_ASSERT(curr->done == 0); |
| enum e_op_result result = execute_stream_op(&exec_ctx, curr); |
| CRONET_LOG(GPR_DEBUG, "execute_stream_op[%p] returns %s", curr, |
| op_result_string(result)); |
| /* if this op is done, then remove it and free memory */ |
| if (curr->done) { |
| struct op_and_state *next = curr->next; |
| remove_from_storage(s, curr); |
| curr = next; |
| } |
| /* continue processing the same op if ACTION_TAKEN_WITHOUT_CALLBACK */ |
| if (result == NO_ACTION_POSSIBLE) { |
| curr = curr->next; |
| } else if (result == ACTION_TAKEN_WITH_CALLBACK) { |
| break; |
| } |
| } |
| gpr_mu_unlock(&s->mu); |
| grpc_exec_ctx_finish(&exec_ctx); |
| } |
| |
| /* |
| Cronet callback |
| */ |
| static void on_failed(cronet_bidirectional_stream *stream, int net_error) { |
| CRONET_LOG(GPR_DEBUG, "on_failed(%p, %d)", stream, net_error); |
| stream_obj *s = (stream_obj *)stream->annotation; |
| gpr_mu_lock(&s->mu); |
| cronet_bidirectional_stream_destroy(s->cbs); |
| s->state.state_callback_received[OP_FAILED] = true; |
| s->cbs = NULL; |
| if (s->header_array.headers) { |
| gpr_free(s->header_array.headers); |
| s->header_array.headers = NULL; |
| } |
| if (s->state.ws.write_buffer) { |
| gpr_free(s->state.ws.write_buffer); |
| s->state.ws.write_buffer = NULL; |
| } |
| free_read_buffer(s); |
| gpr_mu_unlock(&s->mu); |
| execute_from_storage(s); |
| } |
| |
| /* |
| Cronet callback |
| */ |
| static void on_canceled(cronet_bidirectional_stream *stream) { |
| CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream); |
| stream_obj *s = (stream_obj *)stream->annotation; |
| gpr_mu_lock(&s->mu); |
| cronet_bidirectional_stream_destroy(s->cbs); |
| s->state.state_callback_received[OP_CANCELED] = true; |
| s->cbs = NULL; |
| if (s->header_array.headers) { |
| gpr_free(s->header_array.headers); |
| s->header_array.headers = NULL; |
| } |
| if (s->state.ws.write_buffer) { |
| gpr_free(s->state.ws.write_buffer); |
| s->state.ws.write_buffer = NULL; |
| } |
| free_read_buffer(s); |
| gpr_mu_unlock(&s->mu); |
| execute_from_storage(s); |
| } |
| |
| /* |
| Cronet callback |
| */ |
| static void on_succeeded(cronet_bidirectional_stream *stream) { |
| CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream); |
| stream_obj *s = (stream_obj *)stream->annotation; |
| gpr_mu_lock(&s->mu); |
| cronet_bidirectional_stream_destroy(s->cbs); |
| s->state.state_callback_received[OP_SUCCEEDED] = true; |
| s->cbs = NULL; |
| free_read_buffer(s); |
| gpr_mu_unlock(&s->mu); |
| execute_from_storage(s); |
| } |
| |
| /* |
| Cronet callback |
| */ |
| static void on_request_headers_sent(cronet_bidirectional_stream *stream) { |
| CRONET_LOG(GPR_DEBUG, "W: on_request_headers_sent(%p)", stream); |
| stream_obj *s = (stream_obj *)stream->annotation; |
| gpr_mu_lock(&s->mu); |
| s->state.state_op_done[OP_SEND_INITIAL_METADATA] = true; |
| s->state.state_callback_received[OP_SEND_INITIAL_METADATA] = true; |
| /* Free the memory allocated for headers */ |
| if (s->header_array.headers) { |
| gpr_free(s->header_array.headers); |
| s->header_array.headers = NULL; |
| } |
| gpr_mu_unlock(&s->mu); |
| execute_from_storage(s); |
| } |
| |
| /* |
| Cronet callback |
| */ |
| static void on_response_headers_received( |
| cronet_bidirectional_stream *stream, |
| const cronet_bidirectional_stream_header_array *headers, |
| const char *negotiated_protocol) { |
| CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream, |
| headers, negotiated_protocol); |
| stream_obj *s = (stream_obj *)stream->annotation; |
| gpr_mu_lock(&s->mu); |
| memset(&s->state.rs.initial_metadata, 0, |
| sizeof(s->state.rs.initial_metadata)); |
| grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.initial_metadata); |
| for (size_t i = 0; i < headers->count; i++) { |
| grpc_chttp2_incoming_metadata_buffer_add( |
| &s->state.rs.initial_metadata, |
| grpc_mdelem_from_metadata_strings( |
| grpc_mdstr_from_string(headers->headers[i].key), |
| grpc_mdstr_from_string(headers->headers[i].value))); |
| } |
| s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true; |
| gpr_mu_unlock(&s->mu); |
| execute_from_storage(s); |
| } |
| |
| /* |
| Cronet callback |
| */ |
| static void on_write_completed(cronet_bidirectional_stream *stream, |
| const char *data) { |
| stream_obj *s = (stream_obj *)stream->annotation; |
| CRONET_LOG(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data); |
| gpr_mu_lock(&s->mu); |
| if (s->state.ws.write_buffer) { |
| gpr_free(s->state.ws.write_buffer); |
| s->state.ws.write_buffer = NULL; |
| } |
| s->state.state_callback_received[OP_SEND_MESSAGE] = true; |
| gpr_mu_unlock(&s->mu); |
| execute_from_storage(s); |
| } |
| |
| /* |
| Cronet callback |
| */ |
| static void on_read_completed(cronet_bidirectional_stream *stream, char *data, |
| int count) { |
| stream_obj *s = (stream_obj *)stream->annotation; |
| CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data, |
| count); |
| gpr_mu_lock(&s->mu); |
| s->state.state_callback_received[OP_RECV_MESSAGE] = true; |
| if (count > 0) { |
| s->state.rs.received_bytes += count; |
| s->state.rs.remaining_bytes -= count; |
| if (s->state.rs.remaining_bytes > 0) { |
| CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs); |
| s->state.state_op_done[OP_READ_REQ_MADE] = true; |
| cronet_bidirectional_stream_read( |
| s->cbs, s->state.rs.read_buffer + s->state.rs.received_bytes, |
| s->state.rs.remaining_bytes); |
| gpr_mu_unlock(&s->mu); |
| } else { |
| gpr_mu_unlock(&s->mu); |
| execute_from_storage(s); |
| } |
| } else { |
| s->state.rs.read_stream_closed = true; |
| gpr_mu_unlock(&s->mu); |
| execute_from_storage(s); |
| } |
| } |
| |
| /* |
| Cronet callback |
| */ |
| static void on_response_trailers_received( |
| cronet_bidirectional_stream *stream, |
| const cronet_bidirectional_stream_header_array *trailers) { |
| CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream, |
| trailers); |
| stream_obj *s = (stream_obj *)stream->annotation; |
| gpr_mu_lock(&s->mu); |
| memset(&s->state.rs.trailing_metadata, 0, |
| sizeof(s->state.rs.trailing_metadata)); |
| s->state.rs.trailing_metadata_valid = false; |
| grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.trailing_metadata); |
| for (size_t i = 0; i < trailers->count; i++) { |
| CRONET_LOG(GPR_DEBUG, "trailer key=%s, value=%s", trailers->headers[i].key, |
| trailers->headers[i].value); |
| grpc_chttp2_incoming_metadata_buffer_add( |
| &s->state.rs.trailing_metadata, |
| grpc_mdelem_from_metadata_strings( |
| grpc_mdstr_from_string(trailers->headers[i].key), |
| grpc_mdstr_from_string(trailers->headers[i].value))); |
| s->state.rs.trailing_metadata_valid = true; |
| } |
| s->state.state_callback_received[OP_RECV_TRAILING_METADATA] = true; |
| gpr_mu_unlock(&s->mu); |
| execute_from_storage(s); |
| } |
| |
| /* |
| Utility function that takes the data from s->write_slice_buffer and assembles |
| into a contiguous byte stream with 5 byte gRPC header prepended. |
| */ |
| static void create_grpc_frame(grpc_slice_buffer *write_slice_buffer, |
| char **pp_write_buffer, |
| size_t *p_write_buffer_size) { |
| grpc_slice slice = grpc_slice_buffer_take_first(write_slice_buffer); |
| size_t length = GRPC_SLICE_LENGTH(slice); |
| *p_write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES; |
| /* This is freed in the on_write_completed callback */ |
| char *write_buffer = gpr_malloc(length + GRPC_HEADER_SIZE_IN_BYTES); |
| *pp_write_buffer = write_buffer; |
| uint8_t *p = (uint8_t *)write_buffer; |
| /* Append 5 byte header */ |
| *p++ = 0; |
| *p++ = (uint8_t)(length >> 24); |
| *p++ = (uint8_t)(length >> 16); |
| *p++ = (uint8_t)(length >> 8); |
| *p++ = (uint8_t)(length); |
| /* append actual data */ |
| memcpy(p, GRPC_SLICE_START_PTR(slice), length); |
| } |
| |
| /* |
| Convert metadata in a format that Cronet can consume |
| */ |
| static void convert_metadata_to_cronet_headers( |
| grpc_linked_mdelem *head, const char *host, char **pp_url, |
| cronet_bidirectional_stream_header **pp_headers, size_t *p_num_headers, |
| const char **method) { |
| grpc_linked_mdelem *curr = head; |
| /* Walk the linked list and get number of header fields */ |
| size_t num_headers_available = 0; |
| while (curr != NULL) { |
| curr = curr->next; |
| num_headers_available++; |
| } |
| /* Allocate enough memory. It is freed in the on_request_headers_sent callback |
| */ |
| cronet_bidirectional_stream_header *headers = |
| (cronet_bidirectional_stream_header *)gpr_malloc( |
| sizeof(cronet_bidirectional_stream_header) * num_headers_available); |
| *pp_headers = headers; |
| |
| /* Walk the linked list again, this time copying the header fields. |
| s->num_headers can be less than num_headers_available, as some headers |
| are not used for cronet. |
| TODO (makdharma): Eliminate need to traverse the LL second time for perf. |
| */ |
| curr = head; |
| size_t num_headers = 0; |
| while (num_headers < num_headers_available) { |
| grpc_mdelem *mdelem = curr->md; |
| curr = curr->next; |
| const char *key = grpc_mdstr_as_c_string(mdelem->key); |
| const char *value = grpc_mdstr_as_c_string(mdelem->value); |
| if (mdelem->key == GRPC_MDSTR_SCHEME || |
| mdelem->key == GRPC_MDSTR_AUTHORITY) { |
| /* Cronet populates these fields on its own */ |
| continue; |
| } |
| if (mdelem->key == GRPC_MDSTR_METHOD) { |
| if (mdelem->value == GRPC_MDSTR_PUT) { |
| *method = "PUT"; |
| } else { |
| /* POST method in default*/ |
| *method = "POST"; |
| } |
| continue; |
| } |
| if (mdelem->key == GRPC_MDSTR_PATH) { |
| /* Create URL by appending :path value to the hostname */ |
| gpr_asprintf(pp_url, "https://%s%s", host, value); |
| continue; |
| } |
| CRONET_LOG(GPR_DEBUG, "header %s = %s", key, value); |
| headers[num_headers].key = key; |
| headers[num_headers].value = value; |
| num_headers++; |
| if (curr == NULL) { |
| break; |
| } |
| } |
| *p_num_headers = (size_t)num_headers; |
| } |
| |
| static int parse_grpc_header(const uint8_t *data) { |
| const uint8_t *p = data + 1; |
| int length = 0; |
| length |= ((uint8_t)*p++) << 24; |
| length |= ((uint8_t)*p++) << 16; |
| length |= ((uint8_t)*p++) << 8; |
| length |= ((uint8_t)*p++); |
| return length; |
| } |
| |
| /* |
| Op Execution: Decide if one of the actions contained in the stream op can be |
| executed. This is the heart of the state machine. |
| */ |
| static bool op_can_be_run(grpc_transport_stream_op *curr_op, |
| struct op_state *stream_state, |
| struct op_state *op_state, enum e_op_id op_id) { |
| bool result = true; |
| /* When call is canceled, every op can be run, except under following |
| conditions |
| */ |
| bool is_canceled_of_failed = stream_state->state_op_done[OP_CANCEL_ERROR] || |
| stream_state->state_callback_received[OP_FAILED]; |
| if (is_canceled_of_failed) { |
| if (op_id == OP_SEND_INITIAL_METADATA) result = false; |
| if (op_id == OP_SEND_MESSAGE) result = false; |
| if (op_id == OP_SEND_TRAILING_METADATA) result = false; |
| if (op_id == OP_CANCEL_ERROR) result = false; |
| /* already executed */ |
| if (op_id == OP_RECV_INITIAL_METADATA && |
| stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) |
| result = false; |
| if (op_id == OP_RECV_MESSAGE && |
| stream_state->state_op_done[OP_RECV_MESSAGE]) |
| result = false; |
| if (op_id == OP_RECV_TRAILING_METADATA && |
| stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) |
| result = false; |
| } else if (op_id == OP_SEND_INITIAL_METADATA) { |
| /* already executed */ |
| if (stream_state->state_op_done[OP_SEND_INITIAL_METADATA]) result = false; |
| } else if (op_id == OP_RECV_INITIAL_METADATA) { |
| /* already executed */ |
| if (stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) result = false; |
| /* we haven't sent headers yet. */ |
| else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) |
| result = false; |
| /* we haven't received headers yet. */ |
| else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA]) |
| result = false; |
| } else if (op_id == OP_SEND_MESSAGE) { |
| /* already executed (note we're checking op specific state, not stream |
| state) */ |
| if (op_state->state_op_done[OP_SEND_MESSAGE]) result = false; |
| /* we haven't sent headers yet. */ |
| else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) |
| result = false; |
| } else if (op_id == OP_RECV_MESSAGE) { |
| /* already executed */ |
| if (op_state->state_op_done[OP_RECV_MESSAGE]) result = false; |
| /* we haven't received headers yet. */ |
| else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA]) |
| result = false; |
| } else if (op_id == OP_RECV_TRAILING_METADATA) { |
| /* already executed */ |
| if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) result = false; |
| /* we have asked for but haven't received message yet. */ |
| else if (stream_state->state_op_done[OP_READ_REQ_MADE] && |
| !stream_state->state_op_done[OP_RECV_MESSAGE]) |
| result = false; |
| /* we haven't received trailers yet. */ |
| else if (!stream_state->state_callback_received[OP_RECV_TRAILING_METADATA]) |
| result = false; |
| /* we haven't received on_succeeded yet. */ |
| else if (!stream_state->state_callback_received[OP_SUCCEEDED]) |
| result = false; |
| } else if (op_id == OP_SEND_TRAILING_METADATA) { |
| /* already executed */ |
| if (stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) result = false; |
| /* we haven't sent initial metadata yet */ |
| else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) |
| result = false; |
| /* we haven't sent message yet */ |
| else if (curr_op->send_message && |
| !stream_state->state_op_done[OP_SEND_MESSAGE]) |
| result = false; |
| /* we haven't got on_write_completed for the send yet */ |
| else if (stream_state->state_op_done[OP_SEND_MESSAGE] && |
| !stream_state->state_callback_received[OP_SEND_MESSAGE]) |
| result = false; |
| } else if (op_id == OP_CANCEL_ERROR) { |
| /* already executed */ |
| if (stream_state->state_op_done[OP_CANCEL_ERROR]) result = false; |
| } else if (op_id == OP_ON_COMPLETE) { |
| /* already executed (note we're checking op specific state, not stream |
| state) */ |
| if (op_state->state_op_done[OP_ON_COMPLETE]) { |
| CRONET_LOG(GPR_DEBUG, "Because"); |
| result = false; |
| } |
| /* Check if every op that was asked for is done. */ |
| else if (curr_op->send_initial_metadata && |
| !stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) { |
| CRONET_LOG(GPR_DEBUG, "Because"); |
| result = false; |
| } else if (curr_op->send_message && |
| !op_state->state_op_done[OP_SEND_MESSAGE]) { |
| CRONET_LOG(GPR_DEBUG, "Because"); |
| result = false; |
| } else if (curr_op->send_message && |
| !stream_state->state_callback_received[OP_SEND_MESSAGE]) { |
| CRONET_LOG(GPR_DEBUG, "Because"); |
| result = false; |
| } else if (curr_op->send_trailing_metadata && |
| !stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) { |
| CRONET_LOG(GPR_DEBUG, "Because"); |
| result = false; |
| } else if (curr_op->recv_initial_metadata && |
| !stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) { |
| CRONET_LOG(GPR_DEBUG, "Because"); |
| result = false; |
| } else if (curr_op->recv_message && |
| !stream_state->state_op_done[OP_RECV_MESSAGE]) { |
| CRONET_LOG(GPR_DEBUG, "Because"); |
| result = false; |
| } else if (curr_op->recv_trailing_metadata) { |
| /* We aren't done with trailing metadata yet */ |
| if (!stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) { |
| CRONET_LOG(GPR_DEBUG, "Because"); |
| result = false; |
| } |
| /* We've asked for actual message in an earlier op, and it hasn't been |
| delivered yet. */ |
| else if (stream_state->state_op_done[OP_READ_REQ_MADE]) { |
| /* If this op is not the one asking for read, (which means some earlier |
| op has asked), and the read hasn't been delivered. */ |
| if (!curr_op->recv_message && |
| !stream_state->state_callback_received[OP_SUCCEEDED]) { |
| CRONET_LOG(GPR_DEBUG, "Because"); |
| result = false; |
| } |
| } |
| } |
| /* We should see at least one on_write_completed for the trailers that we |
| sent */ |
| else if (curr_op->send_trailing_metadata && |
| !stream_state->state_callback_received[OP_SEND_MESSAGE]) |
| result = false; |
| } |
| CRONET_LOG(GPR_DEBUG, "op_can_be_run %s : %s", op_id_string(op_id), |
| result ? "YES" : "NO"); |
| return result; |
| } |
| |
| /* |
| TODO (makdharma): Break down this function in smaller chunks for readability. |
| */ |
| static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, |
| struct op_and_state *oas) { |
| grpc_transport_stream_op *stream_op = &oas->op; |
| struct stream_obj *s = oas->s; |
| struct op_state *stream_state = &s->state; |
| enum e_op_result result = NO_ACTION_POSSIBLE; |
| if (stream_op->send_initial_metadata && |
| op_can_be_run(stream_op, stream_state, &oas->state, |
| OP_SEND_INITIAL_METADATA)) { |
| CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_INITIAL_METADATA", oas); |
| /* This OP is the beginning. Reset various states */ |
| memset(&s->header_array, 0, sizeof(s->header_array)); |
| memset(&stream_state->rs, 0, sizeof(stream_state->rs)); |
| memset(&stream_state->ws, 0, sizeof(stream_state->ws)); |
| memset(stream_state->state_op_done, 0, sizeof(stream_state->state_op_done)); |
| memset(stream_state->state_callback_received, 0, |
| sizeof(stream_state->state_callback_received)); |
| /* Start new cronet stream. It is destroyed in on_succeeded, on_canceled, |
| * on_failed */ |
| GPR_ASSERT(s->cbs == NULL); |
| s->cbs = cronet_bidirectional_stream_create(s->curr_ct.engine, s->curr_gs, |
| &cronet_callbacks); |
| CRONET_LOG(GPR_DEBUG, "%p = cronet_bidirectional_stream_create()", s->cbs); |
| char *url = NULL; |
| const char *method = "POST"; |
| s->header_array.headers = NULL; |
| convert_metadata_to_cronet_headers( |
| stream_op->send_initial_metadata->list.head, s->curr_ct.host, &url, |
| &s->header_array.headers, &s->header_array.count, &method); |
| s->header_array.capacity = s->header_array.count; |
| CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_start(%p, %s)", s->cbs, |
| url); |
| cronet_bidirectional_stream_start(s->cbs, url, 0, method, &s->header_array, |
| false); |
| stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true; |
| result = ACTION_TAKEN_WITH_CALLBACK; |
| } else if (stream_op->recv_initial_metadata && |
| op_can_be_run(stream_op, stream_state, &oas->state, |
| OP_RECV_INITIAL_METADATA)) { |
| CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas); |
| if (stream_state->state_op_done[OP_CANCEL_ERROR] || |
| stream_state->state_callback_received[OP_FAILED]) { |
| grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready, |
| GRPC_ERROR_CANCELLED, NULL); |
| } else { |
| grpc_chttp2_incoming_metadata_buffer_publish( |
| &oas->s->state.rs.initial_metadata, stream_op->recv_initial_metadata); |
| grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready, |
| GRPC_ERROR_NONE, NULL); |
| } |
| stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true; |
| result = ACTION_TAKEN_NO_CALLBACK; |
| } else if (stream_op->send_message && |
| op_can_be_run(stream_op, stream_state, &oas->state, |
| OP_SEND_MESSAGE)) { |
| CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_MESSAGE", oas); |
| if (stream_state->state_callback_received[OP_FAILED]) { |
| result = NO_ACTION_POSSIBLE; |
| CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed."); |
| } else { |
| grpc_slice_buffer write_slice_buffer; |
| grpc_slice slice; |
| grpc_slice_buffer_init(&write_slice_buffer); |
| grpc_byte_stream_next(NULL, stream_op->send_message, &slice, |
| stream_op->send_message->length, NULL); |
| /* Check that compression flag is OFF. We don't support compression yet. |
| */ |
| if (stream_op->send_message->flags != 0) { |
| gpr_log(GPR_ERROR, "Compression is not supported"); |
| GPR_ASSERT(stream_op->send_message->flags == 0); |
| } |
| grpc_slice_buffer_add(&write_slice_buffer, slice); |
| if (write_slice_buffer.count != 1) { |
| /* Empty request not handled yet */ |
| gpr_log(GPR_ERROR, "Empty request is not supported"); |
| GPR_ASSERT(write_slice_buffer.count == 1); |
| } |
| if (write_slice_buffer.count > 0) { |
| size_t write_buffer_size; |
| create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer, |
| &write_buffer_size); |
| CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, %p)", |
| s->cbs, stream_state->ws.write_buffer); |
| stream_state->state_callback_received[OP_SEND_MESSAGE] = false; |
| cronet_bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer, |
| (int)write_buffer_size, false); |
| result = ACTION_TAKEN_WITH_CALLBACK; |
| } else { |
| result = NO_ACTION_POSSIBLE; |
| } |
| } |
| stream_state->state_op_done[OP_SEND_MESSAGE] = true; |
| oas->state.state_op_done[OP_SEND_MESSAGE] = true; |
| } else if (stream_op->recv_message && |
| op_can_be_run(stream_op, stream_state, &oas->state, |
| OP_RECV_MESSAGE)) { |
| CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas); |
| if (stream_state->state_op_done[OP_CANCEL_ERROR] || |
| stream_state->state_callback_received[OP_FAILED]) { |
| CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed."); |
| grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready, |
| GRPC_ERROR_CANCELLED, NULL); |
| stream_state->state_op_done[OP_RECV_MESSAGE] = true; |
| } else if (stream_state->rs.read_stream_closed == true) { |
| /* No more data will be received */ |
| CRONET_LOG(GPR_DEBUG, "read stream closed"); |
| grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready, |
| GRPC_ERROR_NONE, NULL); |
| stream_state->state_op_done[OP_RECV_MESSAGE] = true; |
| oas->state.state_op_done[OP_RECV_MESSAGE] = true; |
| } else if (stream_state->rs.length_field_received == false) { |
| if (stream_state->rs.received_bytes == GRPC_HEADER_SIZE_IN_BYTES && |
| stream_state->rs.remaining_bytes == 0) { |
| /* Start a read operation for data */ |
| stream_state->rs.length_field_received = true; |
| stream_state->rs.length_field = stream_state->rs.remaining_bytes = |
| parse_grpc_header((const uint8_t *)stream_state->rs.read_buffer); |
| CRONET_LOG(GPR_DEBUG, "length field = %d", |
| stream_state->rs.length_field); |
| if (stream_state->rs.length_field > 0) { |
| stream_state->rs.read_buffer = |
| gpr_malloc((size_t)stream_state->rs.length_field); |
| GPR_ASSERT(stream_state->rs.read_buffer); |
| stream_state->rs.remaining_bytes = stream_state->rs.length_field; |
| stream_state->rs.received_bytes = 0; |
| CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs); |
| stream_state->state_op_done[OP_READ_REQ_MADE] = |
| true; /* Indicates that at least one read request has been made */ |
| cronet_bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer, |
| stream_state->rs.remaining_bytes); |
| result = ACTION_TAKEN_WITH_CALLBACK; |
| } else { |
| stream_state->rs.remaining_bytes = 0; |
| CRONET_LOG(GPR_DEBUG, "read operation complete. Empty response."); |
| grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer); |
| grpc_slice_buffer_stream_init(&stream_state->rs.sbs, |
| &stream_state->rs.read_slice_buffer, 0); |
| *((grpc_byte_buffer **)stream_op->recv_message) = |
| (grpc_byte_buffer *)&stream_state->rs.sbs; |
| grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready, |
| GRPC_ERROR_NONE, NULL); |
| stream_state->state_op_done[OP_RECV_MESSAGE] = true; |
| oas->state.state_op_done[OP_RECV_MESSAGE] = true; |
| result = ACTION_TAKEN_NO_CALLBACK; |
| } |
| } else if (stream_state->rs.remaining_bytes == 0) { |
| /* Start a read operation for first 5 bytes (GRPC header) */ |
| stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes; |
| stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES; |
| stream_state->rs.received_bytes = 0; |
| CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs); |
| stream_state->state_op_done[OP_READ_REQ_MADE] = |
| true; /* Indicates that at least one read request has been made */ |
| cronet_bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer, |
| stream_state->rs.remaining_bytes); |
| result = ACTION_TAKEN_WITH_CALLBACK; |
| } else { |
| result = NO_ACTION_POSSIBLE; |
| } |
| } else if (stream_state->rs.remaining_bytes == 0) { |
| CRONET_LOG(GPR_DEBUG, "read operation complete"); |
| grpc_slice read_data_slice = |
| grpc_slice_malloc((uint32_t)stream_state->rs.length_field); |
| uint8_t *dst_p = GRPC_SLICE_START_PTR(read_data_slice); |
| memcpy(dst_p, stream_state->rs.read_buffer, |
| (size_t)stream_state->rs.length_field); |
| free_read_buffer(s); |
| grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer); |
| grpc_slice_buffer_add(&stream_state->rs.read_slice_buffer, |
| read_data_slice); |
| grpc_slice_buffer_stream_init(&stream_state->rs.sbs, |
| &stream_state->rs.read_slice_buffer, 0); |
| *((grpc_byte_buffer **)stream_op->recv_message) = |
| (grpc_byte_buffer *)&stream_state->rs.sbs; |
| grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready, |
| GRPC_ERROR_NONE, NULL); |
| stream_state->state_op_done[OP_RECV_MESSAGE] = true; |
| oas->state.state_op_done[OP_RECV_MESSAGE] = true; |
| /* Clear read state of the stream, so next read op (if it were to come) |
| * will work */ |
| stream_state->rs.received_bytes = stream_state->rs.remaining_bytes = |
| stream_state->rs.length_field_received = 0; |
| result = ACTION_TAKEN_NO_CALLBACK; |
| } |
| } else if (stream_op->recv_trailing_metadata && |
| op_can_be_run(stream_op, stream_state, &oas->state, |
| OP_RECV_TRAILING_METADATA)) { |
| CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_TRAILING_METADATA", oas); |
| if (oas->s->state.rs.trailing_metadata_valid) { |
| grpc_chttp2_incoming_metadata_buffer_publish( |
| &oas->s->state.rs.trailing_metadata, |
| stream_op->recv_trailing_metadata); |
| stream_state->rs.trailing_metadata_valid = false; |
| } |
| stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true; |
| result = ACTION_TAKEN_NO_CALLBACK; |
| } else if (stream_op->send_trailing_metadata && |
| op_can_be_run(stream_op, stream_state, &oas->state, |
| OP_SEND_TRAILING_METADATA)) { |
| CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_TRAILING_METADATA", oas); |
| if (stream_state->state_callback_received[OP_FAILED]) { |
| result = NO_ACTION_POSSIBLE; |
| CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed."); |
| } else { |
| CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, 0)", |
| s->cbs); |
| stream_state->state_callback_received[OP_SEND_MESSAGE] = false; |
| cronet_bidirectional_stream_write(s->cbs, "", 0, true); |
| result = ACTION_TAKEN_WITH_CALLBACK; |
| } |
| stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true; |
| } else if (stream_op->cancel_error && |
| op_can_be_run(stream_op, stream_state, &oas->state, |
| OP_CANCEL_ERROR)) { |
| CRONET_LOG(GPR_DEBUG, "running: %p OP_CANCEL_ERROR", oas); |
| CRONET_LOG(GPR_DEBUG, "W: cronet_bidirectional_stream_cancel(%p)", s->cbs); |
| if (s->cbs) { |
| cronet_bidirectional_stream_cancel(s->cbs); |
| } |
| stream_state->state_op_done[OP_CANCEL_ERROR] = true; |
| result = ACTION_TAKEN_WITH_CALLBACK; |
| } else if (stream_op->on_complete && |
| op_can_be_run(stream_op, stream_state, &oas->state, |
| OP_ON_COMPLETE)) { |
| /* All actions in this stream_op are complete. Call the on_complete callback |
| */ |
| CRONET_LOG(GPR_DEBUG, "running: %p OP_ON_COMPLETE", oas); |
| grpc_exec_ctx_sched(exec_ctx, stream_op->on_complete, GRPC_ERROR_NONE, |
| NULL); |
| oas->state.state_op_done[OP_ON_COMPLETE] = true; |
| oas->done = true; |
| /* reset any send message state, only if this ON_COMPLETE is about a send. |
| */ |
| if (stream_op->send_message) { |
| stream_state->state_callback_received[OP_SEND_MESSAGE] = false; |
| stream_state->state_op_done[OP_SEND_MESSAGE] = false; |
| } |
| result = ACTION_TAKEN_NO_CALLBACK; |
| /* If this is the on_complete callback being called for a received message - |
| make a note */ |
| if (stream_op->recv_message) |
| stream_state->state_op_done[OP_RECV_MESSAGE_AND_ON_COMPLETE] = true; |
| } else { |
| result = NO_ACTION_POSSIBLE; |
| } |
| return result; |
| } |
| |
| /* |
| Functions used by upper layers to access transport functionality. |
| */ |
| |
| static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, |
| grpc_stream *gs, grpc_stream_refcount *refcount, |
| const void *server_data) { |
| stream_obj *s = (stream_obj *)gs; |
| memset(&s->storage, 0, sizeof(s->storage)); |
| s->storage.head = NULL; |
| memset(&s->state, 0, sizeof(s->state)); |
| s->curr_op = NULL; |
| s->cbs = NULL; |
| memset(&s->header_array, 0, sizeof(s->header_array)); |
| memset(&s->state.rs, 0, sizeof(s->state.rs)); |
| memset(&s->state.ws, 0, sizeof(s->state.ws)); |
| memset(s->state.state_op_done, 0, sizeof(s->state.state_op_done)); |
| memset(s->state.state_callback_received, 0, |
| sizeof(s->state.state_callback_received)); |
| gpr_mu_init(&s->mu); |
| return 0; |
| } |
| |
| static void set_pollset_do_nothing(grpc_exec_ctx *exec_ctx, grpc_transport *gt, |
| grpc_stream *gs, grpc_pollset *pollset) {} |
| |
| static void set_pollset_set_do_nothing(grpc_exec_ctx *exec_ctx, |
| grpc_transport *gt, grpc_stream *gs, |
| grpc_pollset_set *pollset_set) {} |
| |
| static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, |
| grpc_stream *gs, grpc_transport_stream_op *op) { |
| CRONET_LOG(GPR_DEBUG, "perform_stream_op"); |
| stream_obj *s = (stream_obj *)gs; |
| s->curr_gs = gs; |
| memcpy(&s->curr_ct, gt, sizeof(grpc_cronet_transport)); |
| add_to_storage(s, op); |
| execute_from_storage(s); |
| } |
| |
| static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, |
| grpc_stream *gs, void *and_free_memory) {} |
| |
| static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {} |
| |
| static char *get_peer(grpc_exec_ctx *exec_ctx, grpc_transport *gt) { |
| return NULL; |
| } |
| |
| static void perform_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, |
| grpc_transport_op *op) {} |
| |
| const grpc_transport_vtable grpc_cronet_vtable = {sizeof(stream_obj), |
| "cronet_http", |
| init_stream, |
| set_pollset_do_nothing, |
| set_pollset_set_do_nothing, |
| perform_stream_op, |
| perform_op, |
| destroy_stream, |
| destroy_transport, |
| get_peer}; |