blob: 9a17a7083f9b72d239637e1e6583c10f2bb58e0b [file] [log] [blame]
ejona07d3f6a2014-05-14 11:26:57 -07001package com.google.net.stubby.http2.netty;
2
3import com.google.net.stubby.AbstractOperation;
4import com.google.net.stubby.Operation;
5import com.google.net.stubby.Status;
6import com.google.net.stubby.transport.Framer;
7import com.google.net.stubby.transport.Transport;
8
ejona07d3f6a2014-05-14 11:26:57 -07009import java.io.InputStream;
10import java.nio.ByteBuffer;
11
lryan0cf7d4c2014-06-11 11:50:49 -070012import io.netty.buffer.Unpooled;
13import io.netty.channel.ChannelFuture;
14
ejona07d3f6a2014-05-14 11:26:57 -070015/**
16 * Base implementation of {@link Operation} that writes HTTP2 frames
17 */
18abstract class Http2Operation extends AbstractOperation implements Framer.Sink {
19
lryan0cf7d4c2014-06-11 11:50:49 -070020 private final Framer framer;
21 private final Http2Codec.Http2Writer writer;
ejona07d3f6a2014-05-14 11:26:57 -070022
lryan0cf7d4c2014-06-11 11:50:49 -070023 Http2Operation(int streamId, Http2Codec.Http2Writer writer, Framer framer) {
ejona07d3f6a2014-05-14 11:26:57 -070024 super(streamId);
lryan0cf7d4c2014-06-11 11:50:49 -070025 this.writer = writer;
ejona07d3f6a2014-05-14 11:26:57 -070026 this.framer = framer;
27 }
28
29 @Override
30 public Operation addContext(String type, InputStream message, Phase nextPhase) {
31 super.addContext(type, message, nextPhase);
32 framer.writeContext(type, message, getPhase() == Phase.CLOSED, this);
33 return this;
34 }
35
36 @Override
37 public Operation addPayload(InputStream payload, Phase nextPhase) {
38 super.addPayload(payload, nextPhase);
39 framer.writePayload(payload, getPhase() == Phase.CLOSED, this);
40 return this;
41 }
42
43 @Override
lryan2ce84462014-06-02 14:43:36 -070044 public Operation close(Status status) {
45 boolean alreadyClosed = getPhase() == Phase.CLOSED;
46 super.close(status);
47 if (!alreadyClosed) {
48 framer.writeStatus(status, true, this);
49 }
50 return this;
51 }
52
53 @Override
ejona07d3f6a2014-05-14 11:26:57 -070054 public void deliverFrame(ByteBuffer frame, boolean endOfMessage) {
55 boolean closed = getPhase() == Phase.CLOSED;
lryan0cf7d4c2014-06-11 11:50:49 -070056
ejona07d3f6a2014-05-14 11:26:57 -070057 try {
lryan0cf7d4c2014-06-11 11:50:49 -070058 ChannelFuture channelFuture = writer.writeData(getId(),
nathanmittler42af07c2014-08-19 12:26:11 -070059 Unpooled.wrappedBuffer(frame), closed);
ejona07d3f6a2014-05-14 11:26:57 -070060 if (!closed) {
61 // Sync for all except the last frame to prevent buffer corruption.
62 channelFuture.get();
63 }
64 } catch (Exception e) {
65 close(new Status(Transport.Code.INTERNAL, e));
66 } finally {
67 if (closed) {
68 framer.close();
69 }
70 }
71 }
72}