blob: 81bd69429ee559ad998c8c825ca6e2df9a80f113 [file] [log] [blame]
/*
* Copyright 2014, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.netty;
import static com.google.common.base.Preconditions.checkNotNull;
import io.grpc.Attributes;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.internal.AbstractServerStream;
import io.grpc.internal.WritableBuffer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Stream;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Server stream for a Netty HTTP2 transport. Must only be called from the sending application
* thread.
*/
class NettyServerStream extends AbstractServerStream {
private static final Logger log = Logger.getLogger(NettyServerStream.class.getName());
private final Sink sink = new Sink();
private final TransportState state;
private final Channel channel;
private final WriteQueue writeQueue;
private final Attributes attributes;
public NettyServerStream(Channel channel, TransportState state, Attributes transportAttrs) {
super(new NettyWritableBufferAllocator(channel.alloc()));
this.state = checkNotNull(state, "transportState");
this.channel = checkNotNull(channel, "channel");
this.writeQueue = state.handler.getWriteQueue();
this.attributes = checkNotNull(transportAttrs);
}
@Override
protected TransportState transportState() {
return state;
}
@Override
protected Sink abstractServerStreamSink() {
return sink;
}
@Override public Attributes attributes() {
return attributes;
}
private class Sink implements AbstractServerStream.Sink {
@Override
public void request(final int numMessages) {
if (channel.eventLoop().inEventLoop()) {
// Processing data read in the event loop so can call into the deframer immediately
transportState().requestMessagesFromDeframer(numMessages);
} else {
writeQueue.enqueue(new RequestMessagesCommand(transportState(), numMessages), true);
}
}
@Override
public void writeHeaders(Metadata headers) {
writeQueue.enqueue(new SendResponseHeadersCommand(transportState(),
Utils.convertServerHeaders(headers), false),
true);
}
@Override
public void writeFrame(WritableBuffer frame, boolean flush) {
if (frame == null) {
writeQueue.scheduleFlush();
return;
}
ByteBuf bytebuf = ((NettyWritableBuffer) frame).bytebuf();
final int numBytes = bytebuf.readableBytes();
// Add the bytes to outbound flow control.
onSendingBytes(numBytes);
writeQueue.enqueue(
new SendGrpcFrameCommand(transportState(), bytebuf, false),
channel.newPromise().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// Remove the bytes from outbound flow control, optionally notifying
// the client that they can send more bytes.
transportState().onSentBytes(numBytes);
}
}), flush);
}
@Override
public void writeTrailers(Metadata trailers, boolean headersSent) {
Http2Headers http2Trailers = Utils.convertTrailers(trailers, headersSent);
writeQueue.enqueue(
new SendResponseHeadersCommand(transportState(), http2Trailers, true), true);
}
@Override
public void cancel(Status status) {
writeQueue.enqueue(new CancelServerStreamCommand(transportState(), status), true);
}
}
/** This should only called from the transport thread. */
public static class TransportState extends AbstractServerStream.TransportState
implements StreamIdHolder {
private final Http2Stream http2Stream;
private final NettyServerHandler handler;
public TransportState(NettyServerHandler handler, Http2Stream http2Stream, int maxMessageSize) {
super(maxMessageSize);
this.http2Stream = checkNotNull(http2Stream, "http2Stream");
this.handler = checkNotNull(handler, "handler");
}
@Override
public void bytesRead(int processedBytes) {
handler.returnProcessedBytes(http2Stream, processedBytes);
handler.getWriteQueue().scheduleFlush();
}
@Override
protected void deframeFailed(Throwable cause) {
log.log(Level.WARNING, "Exception processing message", cause);
Status status = Status.fromThrowable(cause);
transportReportStatus(status);
handler.getWriteQueue().enqueue(new CancelServerStreamCommand(this, status), true);
}
void inboundDataReceived(ByteBuf frame, boolean endOfStream) {
super.inboundDataReceived(new NettyReadableBuffer(frame.retain()), endOfStream);
}
@Override
public int id() {
return http2Stream.id();
}
}
}