blob: 6c051f5b473fb904816ebdf84214ed05420b130d [file] [log] [blame]
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "src/core/transport/chttp2/internal.h"
#include <limits.h>
#include <grpc/support/log.h>
#include "src/core/transport/chttp2/http2_errors.h"
static void finalize_outbuf (grpc_chttp2_transport_writing * transport_writing);
int
grpc_chttp2_unlocking_check_writes (grpc_chttp2_transport_global * transport_global, grpc_chttp2_transport_writing * transport_writing)
{
grpc_chttp2_stream_global *stream_global;
grpc_chttp2_stream_writing *stream_writing;
grpc_chttp2_stream_global *first_reinserted_stream = NULL;
gpr_uint32 window_delta;
/* simple writes are queued to qbuf, and flushed here */
gpr_slice_buffer_swap (&transport_global->qbuf, &transport_writing->outbuf);
GPR_ASSERT (transport_global->qbuf.count == 0);
if (transport_global->dirtied_local_settings && !transport_global->sent_local_settings)
{
gpr_slice_buffer_add (&transport_writing->outbuf, grpc_chttp2_settings_create (transport_global->settings[GRPC_SENT_SETTINGS], transport_global->settings[GRPC_LOCAL_SETTINGS], transport_global->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS));
transport_global->force_send_settings = 0;
transport_global->dirtied_local_settings = 0;
transport_global->sent_local_settings = 1;
}
/* for each grpc_chttp2_stream that's become writable, frame it's data
(according to available window sizes) and add to the output buffer */
while (grpc_chttp2_list_pop_writable_stream (transport_global, transport_writing, &stream_global, &stream_writing))
{
if (stream_global == first_reinserted_stream)
{
/* prevent infinite loop */
grpc_chttp2_list_add_first_writable_stream (transport_global, stream_global);
break;
}
stream_writing->id = stream_global->id;
stream_writing->send_closed = GRPC_DONT_SEND_CLOSED;
if (stream_global->outgoing_sopb)
{
window_delta = grpc_chttp2_preencode (stream_global->outgoing_sopb->ops, &stream_global->outgoing_sopb->nops, (gpr_uint32) GPR_MIN (GPR_MIN (transport_global->outgoing_window, stream_global->outgoing_window), GPR_UINT32_MAX), &stream_writing->sopb);
GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT ("write", transport_global, outgoing_window, -(gpr_int64) window_delta);
GRPC_CHTTP2_FLOWCTL_TRACE_STREAM ("write", transport_global, stream_global, outgoing_window, -(gpr_int64) window_delta);
transport_global->outgoing_window -= window_delta;
stream_global->outgoing_window -= window_delta;
if (stream_global->write_state == GRPC_WRITE_STATE_QUEUED_CLOSE && stream_global->outgoing_sopb->nops == 0)
{
if (!transport_global->is_client && !stream_global->read_closed)
{
stream_writing->send_closed = GRPC_SEND_CLOSED_WITH_RST_STREAM;
}
else
{
stream_writing->send_closed = GRPC_SEND_CLOSED;
}
}
if (stream_global->outgoing_window > 0 && stream_global->outgoing_sopb->nops != 0)
{
grpc_chttp2_list_add_writable_stream (transport_global, stream_global);
if (first_reinserted_stream == NULL && transport_global->outgoing_window == 0)
{
first_reinserted_stream = stream_global;
}
}
}
if (!stream_global->read_closed && stream_global->unannounced_incoming_window > 0)
{
GPR_ASSERT (stream_writing->announce_window == 0);
GRPC_CHTTP2_FLOWCTL_TRACE_STREAM ("write", transport_writing, stream_writing, announce_window, stream_global->unannounced_incoming_window);
stream_writing->announce_window = stream_global->unannounced_incoming_window;
GRPC_CHTTP2_FLOWCTL_TRACE_STREAM ("write", transport_global, stream_global, incoming_window, stream_global->unannounced_incoming_window);
GRPC_CHTTP2_FLOWCTL_TRACE_STREAM ("write", transport_global, stream_global, unannounced_incoming_window, -(gpr_int64) stream_global->unannounced_incoming_window);
stream_global->incoming_window += stream_global->unannounced_incoming_window;
stream_global->unannounced_incoming_window = 0;
grpc_chttp2_list_add_incoming_window_updated (transport_global, stream_global);
stream_global->writing_now |= GRPC_CHTTP2_WRITING_WINDOW;
}
if (stream_writing->sopb.nops > 0 || stream_writing->send_closed != GRPC_DONT_SEND_CLOSED)
{
stream_global->writing_now |= GRPC_CHTTP2_WRITING_DATA;
}
if (stream_global->writing_now != 0)
{
grpc_chttp2_list_add_writing_stream (transport_writing, stream_writing);
}
}
/* if the grpc_chttp2_transport is ready to send a window update, do so here
also; 3/4 is a magic number that will likely get tuned soon */
if (transport_global->incoming_window < transport_global->connection_window_target * 3 / 4)
{
window_delta = transport_global->connection_window_target - transport_global->incoming_window;
gpr_slice_buffer_add (&transport_writing->outbuf, grpc_chttp2_window_update_create (0, window_delta));
GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT ("write", transport_global, incoming_window, window_delta);
transport_global->incoming_window += window_delta;
}
return transport_writing->outbuf.count > 0 || grpc_chttp2_list_have_writing_streams (transport_writing);
}
void
grpc_chttp2_perform_writes (grpc_exec_ctx * exec_ctx, grpc_chttp2_transport_writing * transport_writing, grpc_endpoint * endpoint)
{
GPR_ASSERT (transport_writing->outbuf.count > 0 || grpc_chttp2_list_have_writing_streams (transport_writing));
finalize_outbuf (transport_writing);
GPR_ASSERT (transport_writing->outbuf.count > 0);
GPR_ASSERT (endpoint);
grpc_endpoint_write (exec_ctx, endpoint, &transport_writing->outbuf, &transport_writing->done_cb);
}
static void
finalize_outbuf (grpc_chttp2_transport_writing * transport_writing)
{
grpc_chttp2_stream_writing *stream_writing;
while (grpc_chttp2_list_pop_writing_stream (transport_writing, &stream_writing))
{
if (stream_writing->sopb.nops > 0 || stream_writing->send_closed != GRPC_DONT_SEND_CLOSED)
{
grpc_chttp2_encode (stream_writing->sopb.ops, stream_writing->sopb.nops, stream_writing->send_closed != GRPC_DONT_SEND_CLOSED, stream_writing->id, &transport_writing->hpack_compressor, &transport_writing->outbuf);
stream_writing->sopb.nops = 0;
}
if (stream_writing->announce_window > 0)
{
gpr_slice_buffer_add (&transport_writing->outbuf, grpc_chttp2_window_update_create (stream_writing->id, stream_writing->announce_window));
GRPC_CHTTP2_FLOWCTL_TRACE_STREAM ("write", transport_writing, stream_writing, announce_window, -(gpr_int64) stream_writing->announce_window);
stream_writing->announce_window = 0;
}
if (stream_writing->send_closed == GRPC_SEND_CLOSED_WITH_RST_STREAM)
{
gpr_slice_buffer_add (&transport_writing->outbuf, grpc_chttp2_rst_stream_create (stream_writing->id, GRPC_CHTTP2_NO_ERROR));
}
grpc_chttp2_list_add_written_stream (transport_writing, stream_writing);
}
}
void
grpc_chttp2_cleanup_writing (grpc_exec_ctx * exec_ctx, grpc_chttp2_transport_global * transport_global, grpc_chttp2_transport_writing * transport_writing)
{
grpc_chttp2_stream_writing *stream_writing;
grpc_chttp2_stream_global *stream_global;
while (grpc_chttp2_list_pop_written_stream (transport_global, transport_writing, &stream_global, &stream_writing))
{
GPR_ASSERT (stream_global->writing_now != 0);
if (stream_writing->send_closed != GRPC_DONT_SEND_CLOSED)
{
stream_global->write_state = GRPC_WRITE_STATE_SENT_CLOSE;
if (!transport_global->is_client)
{
stream_global->read_closed = 1;
}
}
if (stream_global->writing_now & GRPC_CHTTP2_WRITING_DATA)
{
if (stream_global->outgoing_sopb != NULL && stream_global->outgoing_sopb->nops == 0)
{
GPR_ASSERT (stream_global->write_state != GRPC_WRITE_STATE_QUEUED_CLOSE);
stream_global->outgoing_sopb = NULL;
grpc_closure_list_add (closure_list, stream_global->send_done_closure, 1);
}
}
stream_global->writing_now = 0;
grpc_chttp2_list_add_read_write_state_changed (transport_global, stream_global);
}
gpr_slice_buffer_reset_and_unref (&transport_writing->outbuf);
}