blob: bcfbd8eb0d0171b7dc0615cbc1ee5aeef03f0bd0 [file] [log] [blame]
package com.google.net.stubby.spdy.netty;
import com.google.net.stubby.AbstractOperation;
import com.google.net.stubby.Operation;
import com.google.net.stubby.Status;
import com.google.net.stubby.transport.Framer;
import com.google.net.stubby.transport.Transport;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.handler.codec.spdy.DefaultSpdyDataFrame;
import io.netty.handler.codec.spdy.SpdyHeadersFrame;
import java.io.InputStream;
import java.nio.ByteBuffer;
/**
* Base implementation of {@link Operation} that writes SPDY frames
*/
abstract class SpdyOperation extends AbstractOperation implements Framer.Sink {
protected final Framer framer;
private final Channel channel;
SpdyOperation(SpdyHeadersFrame headersFrame, Channel channel, Framer framer) {
super(headersFrame.getStreamId());
this.channel = channel;
this.framer = framer;
channel.write(headersFrame);
}
@Override
public Operation addContext(String type, InputStream message, Phase nextPhase) {
super.addContext(type, message, nextPhase);
framer.writeContext(type, message, getPhase() == Phase.CLOSED, this);
return this;
}
@Override
public Operation addPayload(InputStream payload, Phase nextPhase) {
super.addPayload(payload, nextPhase);
framer.writePayload(payload, getPhase() == Phase.CLOSED, this);
return this;
}
@Override
public void deliverFrame(ByteBuffer frame, boolean endOfMessage) {
boolean closed = getPhase() == Phase.CLOSED;
DefaultSpdyDataFrame dataFrame = new DefaultSpdyDataFrame(getId(),
Unpooled.wrappedBuffer(frame));
boolean streamClosed = closed && endOfMessage;
dataFrame.setLast(streamClosed);
try {
ChannelFuture channelFuture = channel.writeAndFlush(dataFrame);
if (!streamClosed) {
// Sync for all except the last frame to prevent buffer corruption.
channelFuture.get();
}
} catch (Exception e) {
close(new Status(Transport.Code.INTERNAL, e));
} finally {
if (streamClosed) {
framer.close();
}
}
}
}