blob: bcfbd8eb0d0171b7dc0615cbc1ee5aeef03f0bd0 [file] [log] [blame]
ejona07d3f6a2014-05-14 11:26:57 -07001package com.google.net.stubby.spdy.netty;
2
3import com.google.net.stubby.AbstractOperation;
4import com.google.net.stubby.Operation;
5import com.google.net.stubby.Status;
6import com.google.net.stubby.transport.Framer;
7import com.google.net.stubby.transport.Transport;
8
9import io.netty.buffer.Unpooled;
10import io.netty.channel.Channel;
11import io.netty.channel.ChannelFuture;
12import io.netty.handler.codec.spdy.DefaultSpdyDataFrame;
13import io.netty.handler.codec.spdy.SpdyHeadersFrame;
14
15import java.io.InputStream;
16import java.nio.ByteBuffer;
17
18/**
19 * Base implementation of {@link Operation} that writes SPDY frames
20 */
21abstract class SpdyOperation extends AbstractOperation implements Framer.Sink {
22
23 protected final Framer framer;
24 private final Channel channel;
25
26 SpdyOperation(SpdyHeadersFrame headersFrame, Channel channel, Framer framer) {
27 super(headersFrame.getStreamId());
28 this.channel = channel;
29 this.framer = framer;
30 channel.write(headersFrame);
31 }
32
33 @Override
34 public Operation addContext(String type, InputStream message, Phase nextPhase) {
35 super.addContext(type, message, nextPhase);
36 framer.writeContext(type, message, getPhase() == Phase.CLOSED, this);
37 return this;
38 }
39
40 @Override
41 public Operation addPayload(InputStream payload, Phase nextPhase) {
42 super.addPayload(payload, nextPhase);
43 framer.writePayload(payload, getPhase() == Phase.CLOSED, this);
44 return this;
45 }
46
47 @Override
48 public void deliverFrame(ByteBuffer frame, boolean endOfMessage) {
49 boolean closed = getPhase() == Phase.CLOSED;
50 DefaultSpdyDataFrame dataFrame = new DefaultSpdyDataFrame(getId(),
51 Unpooled.wrappedBuffer(frame));
52 boolean streamClosed = closed && endOfMessage;
53 dataFrame.setLast(streamClosed);
54 try {
55 ChannelFuture channelFuture = channel.writeAndFlush(dataFrame);
56 if (!streamClosed) {
57 // Sync for all except the last frame to prevent buffer corruption.
58 channelFuture.get();
59 }
60 } catch (Exception e) {
61 close(new Status(Transport.Code.INTERNAL, e));
62 } finally {
63 if (streamClosed) {
64 framer.close();
65 }
66 }
67 }
68}