| /* |
| * Copyright 2014, Google Inc. All rights reserved. |
| * |
| * Redistribution and use in source and binary forms, with or without |
| * modification, are permitted provided that the following conditions are |
| * met: |
| * |
| * * Redistributions of source code must retain the above copyright |
| * notice, this list of conditions and the following disclaimer. |
| * * Redistributions in binary form must reproduce the above |
| * copyright notice, this list of conditions and the following disclaimer |
| * in the documentation and/or other materials provided with the |
| * distribution. |
| * |
| * * Neither the name of Google Inc. nor the names of its |
| * contributors may be used to endorse or promote products derived from |
| * this software without specific prior written permission. |
| * |
| * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| */ |
| |
| package io.grpc.netty; |
| |
| import static com.google.common.base.Preconditions.checkNotNull; |
| |
| import io.grpc.Attributes; |
| import io.grpc.Metadata; |
| import io.grpc.Status; |
| import io.grpc.internal.AbstractServerStream; |
| import io.grpc.internal.WritableBuffer; |
| import io.netty.buffer.ByteBuf; |
| import io.netty.channel.Channel; |
| import io.netty.channel.ChannelFuture; |
| import io.netty.channel.ChannelFutureListener; |
| import io.netty.handler.codec.http2.Http2Headers; |
| import io.netty.handler.codec.http2.Http2Stream; |
| |
| import java.util.logging.Level; |
| import java.util.logging.Logger; |
| |
| /** |
| * Server stream for a Netty HTTP2 transport. Must only be called from the sending application |
| * thread. |
| */ |
| class NettyServerStream extends AbstractServerStream { |
| private static final Logger log = Logger.getLogger(NettyServerStream.class.getName()); |
| |
| private final Sink sink = new Sink(); |
| private final TransportState state; |
| private final Channel channel; |
| private final WriteQueue writeQueue; |
| private final Attributes attributes; |
| |
| public NettyServerStream(Channel channel, TransportState state, Attributes transportAttrs) { |
| super(new NettyWritableBufferAllocator(channel.alloc())); |
| this.state = checkNotNull(state, "transportState"); |
| this.channel = checkNotNull(channel, "channel"); |
| this.writeQueue = state.handler.getWriteQueue(); |
| this.attributes = checkNotNull(transportAttrs); |
| } |
| |
| @Override |
| protected TransportState transportState() { |
| return state; |
| } |
| |
| @Override |
| protected Sink abstractServerStreamSink() { |
| return sink; |
| } |
| |
| @Override public Attributes attributes() { |
| return attributes; |
| } |
| |
| private class Sink implements AbstractServerStream.Sink { |
| @Override |
| public void request(final int numMessages) { |
| if (channel.eventLoop().inEventLoop()) { |
| // Processing data read in the event loop so can call into the deframer immediately |
| transportState().requestMessagesFromDeframer(numMessages); |
| } else { |
| writeQueue.enqueue(new RequestMessagesCommand(transportState(), numMessages), true); |
| } |
| } |
| |
| @Override |
| public void writeHeaders(Metadata headers) { |
| writeQueue.enqueue(new SendResponseHeadersCommand(transportState(), |
| Utils.convertServerHeaders(headers), false), |
| true); |
| } |
| |
| @Override |
| public void writeFrame(WritableBuffer frame, boolean flush) { |
| if (frame == null) { |
| writeQueue.scheduleFlush(); |
| return; |
| } |
| ByteBuf bytebuf = ((NettyWritableBuffer) frame).bytebuf(); |
| final int numBytes = bytebuf.readableBytes(); |
| // Add the bytes to outbound flow control. |
| onSendingBytes(numBytes); |
| writeQueue.enqueue( |
| new SendGrpcFrameCommand(transportState(), bytebuf, false), |
| channel.newPromise().addListener(new ChannelFutureListener() { |
| @Override |
| public void operationComplete(ChannelFuture future) throws Exception { |
| // Remove the bytes from outbound flow control, optionally notifying |
| // the client that they can send more bytes. |
| transportState().onSentBytes(numBytes); |
| } |
| }), flush); |
| } |
| |
| @Override |
| public void writeTrailers(Metadata trailers, boolean headersSent) { |
| Http2Headers http2Trailers = Utils.convertTrailers(trailers, headersSent); |
| writeQueue.enqueue( |
| new SendResponseHeadersCommand(transportState(), http2Trailers, true), true); |
| } |
| |
| @Override |
| public void cancel(Status status) { |
| writeQueue.enqueue(new CancelServerStreamCommand(transportState(), status), true); |
| } |
| } |
| |
| /** This should only called from the transport thread. */ |
| public static class TransportState extends AbstractServerStream.TransportState |
| implements StreamIdHolder { |
| private final Http2Stream http2Stream; |
| private final NettyServerHandler handler; |
| |
| public TransportState(NettyServerHandler handler, Http2Stream http2Stream, int maxMessageSize) { |
| super(maxMessageSize); |
| this.http2Stream = checkNotNull(http2Stream, "http2Stream"); |
| this.handler = checkNotNull(handler, "handler"); |
| } |
| |
| @Override |
| public void bytesRead(int processedBytes) { |
| handler.returnProcessedBytes(http2Stream, processedBytes); |
| handler.getWriteQueue().scheduleFlush(); |
| } |
| |
| @Override |
| protected void deframeFailed(Throwable cause) { |
| log.log(Level.WARNING, "Exception processing message", cause); |
| Status status = Status.fromThrowable(cause); |
| transportReportStatus(status); |
| handler.getWriteQueue().enqueue(new CancelServerStreamCommand(this, status), true); |
| } |
| |
| void inboundDataReceived(ByteBuf frame, boolean endOfStream) { |
| super.inboundDataReceived(new NettyReadableBuffer(frame.retain()), endOfStream); |
| } |
| |
| @Override |
| public int id() { |
| return http2Stream.id(); |
| } |
| } |
| } |