blob: b8720250e775d539035e0959d4c8c5b7b0ada2df [file] [log] [blame]
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include "src/core/lib/transport/byte_stream.h"
#include <stdlib.h>
#include <string.h>
#include <grpc/support/log.h>
#include "src/core/lib/slice/slice_internal.h"
bool grpc_byte_stream_next(grpc_exec_ctx* exec_ctx,
grpc_byte_stream* byte_stream, size_t max_size_hint,
grpc_closure* on_complete) {
return byte_stream->vtable->next(exec_ctx, byte_stream, max_size_hint,
on_complete);
}
grpc_error* grpc_byte_stream_pull(grpc_exec_ctx* exec_ctx,
grpc_byte_stream* byte_stream,
grpc_slice* slice) {
return byte_stream->vtable->pull(exec_ctx, byte_stream, slice);
}
void grpc_byte_stream_shutdown(grpc_exec_ctx* exec_ctx,
grpc_byte_stream* byte_stream,
grpc_error* error) {
byte_stream->vtable->shutdown(exec_ctx, byte_stream, error);
}
void grpc_byte_stream_destroy(grpc_exec_ctx* exec_ctx,
grpc_byte_stream* byte_stream) {
byte_stream->vtable->destroy(exec_ctx, byte_stream);
}
// grpc_slice_buffer_stream
static bool slice_buffer_stream_next(grpc_exec_ctx* exec_ctx,
grpc_byte_stream* byte_stream,
size_t max_size_hint,
grpc_closure* on_complete) {
grpc_slice_buffer_stream* stream = (grpc_slice_buffer_stream*)byte_stream;
GPR_ASSERT(stream->cursor < stream->backing_buffer->count);
return true;
}
static grpc_error* slice_buffer_stream_pull(grpc_exec_ctx* exec_ctx,
grpc_byte_stream* byte_stream,
grpc_slice* slice) {
grpc_slice_buffer_stream* stream = (grpc_slice_buffer_stream*)byte_stream;
if (stream->shutdown_error != GRPC_ERROR_NONE) {
return GRPC_ERROR_REF(stream->shutdown_error);
}
GPR_ASSERT(stream->cursor < stream->backing_buffer->count);
*slice =
grpc_slice_ref_internal(stream->backing_buffer->slices[stream->cursor]);
stream->cursor++;
return GRPC_ERROR_NONE;
}
static void slice_buffer_stream_shutdown(grpc_exec_ctx* exec_ctx,
grpc_byte_stream* byte_stream,
grpc_error* error) {
grpc_slice_buffer_stream* stream = (grpc_slice_buffer_stream*)byte_stream;
GRPC_ERROR_UNREF(stream->shutdown_error);
stream->shutdown_error = error;
}
static void slice_buffer_stream_destroy(grpc_exec_ctx* exec_ctx,
grpc_byte_stream* byte_stream) {
grpc_slice_buffer_stream* stream = (grpc_slice_buffer_stream*)byte_stream;
grpc_slice_buffer_reset_and_unref_internal(exec_ctx, stream->backing_buffer);
GRPC_ERROR_UNREF(stream->shutdown_error);
}
static const grpc_byte_stream_vtable slice_buffer_stream_vtable = {
slice_buffer_stream_next, slice_buffer_stream_pull,
slice_buffer_stream_shutdown, slice_buffer_stream_destroy};
void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream* stream,
grpc_slice_buffer* slice_buffer,
uint32_t flags) {
GPR_ASSERT(slice_buffer->length <= UINT32_MAX);
stream->base.length = (uint32_t)slice_buffer->length;
stream->base.flags = flags;
stream->base.vtable = &slice_buffer_stream_vtable;
stream->backing_buffer = slice_buffer;
stream->cursor = 0;
stream->shutdown_error = GRPC_ERROR_NONE;
}
// grpc_caching_byte_stream
void grpc_byte_stream_cache_init(grpc_byte_stream_cache* cache,
grpc_byte_stream* underlying_stream) {
cache->underlying_stream = underlying_stream;
grpc_slice_buffer_init(&cache->cache_buffer);
}
void grpc_byte_stream_cache_destroy(grpc_exec_ctx* exec_ctx,
grpc_byte_stream_cache* cache) {
grpc_byte_stream_destroy(exec_ctx, cache->underlying_stream);
grpc_slice_buffer_destroy_internal(exec_ctx, &cache->cache_buffer);
}
static bool caching_byte_stream_next(grpc_exec_ctx* exec_ctx,
grpc_byte_stream* byte_stream,
size_t max_size_hint,
grpc_closure* on_complete) {
grpc_caching_byte_stream* stream = (grpc_caching_byte_stream*)byte_stream;
if (stream->shutdown_error != GRPC_ERROR_NONE) return true;
if (stream->cursor < stream->cache->cache_buffer.count) return true;
return grpc_byte_stream_next(exec_ctx, stream->cache->underlying_stream,
max_size_hint, on_complete);
}
static grpc_error* caching_byte_stream_pull(grpc_exec_ctx* exec_ctx,
grpc_byte_stream* byte_stream,
grpc_slice* slice) {
grpc_caching_byte_stream* stream = (grpc_caching_byte_stream*)byte_stream;
if (stream->shutdown_error != GRPC_ERROR_NONE) {
return GRPC_ERROR_REF(stream->shutdown_error);
}
if (stream->cursor < stream->cache->cache_buffer.count) {
*slice = grpc_slice_ref_internal(
stream->cache->cache_buffer.slices[stream->cursor]);
++stream->cursor;
return GRPC_ERROR_NONE;
}
grpc_error* error =
grpc_byte_stream_pull(exec_ctx, stream->cache->underlying_stream, slice);
if (error == GRPC_ERROR_NONE) {
++stream->cursor;
grpc_slice_buffer_add(&stream->cache->cache_buffer,
grpc_slice_ref_internal(*slice));
}
return error;
}
static void caching_byte_stream_shutdown(grpc_exec_ctx* exec_ctx,
grpc_byte_stream* byte_stream,
grpc_error* error) {
grpc_caching_byte_stream* stream = (grpc_caching_byte_stream*)byte_stream;
GRPC_ERROR_UNREF(stream->shutdown_error);
stream->shutdown_error = GRPC_ERROR_REF(error);
grpc_byte_stream_shutdown(exec_ctx, stream->cache->underlying_stream, error);
}
static void caching_byte_stream_destroy(grpc_exec_ctx* exec_ctx,
grpc_byte_stream* byte_stream) {
grpc_caching_byte_stream* stream = (grpc_caching_byte_stream*)byte_stream;
GRPC_ERROR_UNREF(stream->shutdown_error);
}
static const grpc_byte_stream_vtable caching_byte_stream_vtable = {
caching_byte_stream_next, caching_byte_stream_pull,
caching_byte_stream_shutdown, caching_byte_stream_destroy};
void grpc_caching_byte_stream_init(grpc_caching_byte_stream* stream,
grpc_byte_stream_cache* cache) {
memset(stream, 0, sizeof(*stream));
stream->base.length = cache->underlying_stream->length;
stream->base.flags = cache->underlying_stream->flags;
stream->base.vtable = &caching_byte_stream_vtable;
stream->cache = cache;
stream->shutdown_error = GRPC_ERROR_NONE;
}
void grpc_caching_byte_stream_reset(grpc_caching_byte_stream* stream) {
stream->cursor = 0;
}