blob: b5c24e214618259c57410ee86ba619f157c20a94 [file] [log] [blame]
nathanmittler64eae042014-08-11 13:29:43 -07001package com.google.net.stubby.newtransport;
2
3import static com.google.net.stubby.GrpcFramingUtil.CONTEXT_VALUE_FRAME;
4import static com.google.net.stubby.GrpcFramingUtil.FRAME_LENGTH;
5import static com.google.net.stubby.GrpcFramingUtil.FRAME_TYPE_LENGTH;
6import static com.google.net.stubby.GrpcFramingUtil.FRAME_TYPE_MASK;
7import static com.google.net.stubby.GrpcFramingUtil.PAYLOAD_FRAME;
8import static com.google.net.stubby.GrpcFramingUtil.STATUS_FRAME;
9
10import com.google.common.base.Preconditions;
11import com.google.net.stubby.Status;
12import com.google.net.stubby.transport.Transport;
13
14import java.io.Closeable;
15import java.io.IOException;
16
17/**
18 * Deframer for GRPC frames. Delegates deframing/decompression of the GRPC compression frame to a
19 * {@link Decompressor}.
20 */
21public class GrpcDeframer implements Closeable {
22 private enum State {
23 HEADER, BODY
24 }
25
26 private static final int HEADER_LENGTH = FRAME_TYPE_LENGTH + FRAME_LENGTH;
27 private final Decompressor decompressor;
28 private State state = State.HEADER;
29 private int requiredLength = HEADER_LENGTH;
30 private int frameType;
31 private boolean statusNotified;
32 private GrpcMessageListener listener;
33 private CompositeBuffer nextFrame;
34
35 public GrpcDeframer(Decompressor decompressor, GrpcMessageListener listener) {
36 this.decompressor = Preconditions.checkNotNull(decompressor, "decompressor");
37 this.listener = Preconditions.checkNotNull(listener, "listener");
38 }
39
40 public void deframe(Buffer data, boolean endOfStream) {
41 Preconditions.checkNotNull(data, "data");
42
43 // Add the data to the decompression buffer.
44 decompressor.decompress(data);
45
46 // Process the uncompressed bytes.
47 while (readRequiredBytes()) {
48 if (statusNotified) {
49 throw new IllegalStateException("Inbound data after receiving status frame");
50 }
51
52 switch (state) {
53 case HEADER:
54 processHeader();
55 break;
56 case BODY:
57 processBody();
58 break;
59 default:
60 throw new AssertionError("Invalid state: " + state);
61 }
62 }
63
64 // If reached the end of stream without reading a status frame, fabricate one
65 // and deliver to the target.
66 if (!statusNotified && endOfStream) {
67 notifyStatus(Status.OK);
68 }
69 }
70
71
72 @Override
73 public void close() {
74 decompressor.close();
75 if (nextFrame != null) {
76 nextFrame.close();
77 }
78 }
79
80 /**
81 * Attempts to read the required bytes into nextFrame.
82 *
83 * @returns {@code true} if all of the required bytes have been read.
84 */
85 private boolean readRequiredBytes() {
86 if (nextFrame == null) {
87 nextFrame = new CompositeBuffer();
88 }
89
90 // Read until the buffer contains all the required bytes.
91 int missingBytes;
92 while ((missingBytes = requiredLength - nextFrame.readableBytes()) > 0) {
93 Buffer buffer = decompressor.readBytes(missingBytes);
94 if (buffer == null) {
95 // No more data is available.
96 break;
97 }
98 // Add it to the composite buffer for the next frame.
99 nextFrame.addBuffer(buffer);
100 }
101
102 // Return whether or not all of the required bytes are now in the frame.
103 return nextFrame.readableBytes() == requiredLength;
104 }
105
106 /**
107 * Processes the GRPC compression header which is composed of the compression flag and the outer
108 * frame length.
109 */
110 private void processHeader() {
111 // Peek, but do not read the header.
112 frameType = nextFrame.readUnsignedByte() & FRAME_TYPE_MASK;
113
114 // Update the required length to include the length of the frame.
115 requiredLength = nextFrame.readInt();
116
117 // Continue reading the frame body.
118 state = State.BODY;
119 }
120
121 /**
122 * Processes the body of the GRPC compression frame. A single compression frame may contain
123 * several GRPC messages within it.
124 */
125 private void processBody() {
126 switch (frameType) {
127 case CONTEXT_VALUE_FRAME:
128 processContext();
129 break;
130 case PAYLOAD_FRAME:
131 processMessage();
132 break;
133 case STATUS_FRAME:
134 processStatus();
135 break;
136 default:
137 throw new AssertionError("Invalid frameType: " + frameType);
138 }
139
140 // Done with this frame, begin processing the next header.
141 state = State.HEADER;
142 requiredLength = HEADER_LENGTH;
143 }
144
145 /**
146 * Processes the payload of a context frame.
147 */
148 private void processContext() {
149 Transport.ContextValue ctx;
150 try {
151 // Not clear if using proto encoding here is of any benefit.
152 // Using ContextValue.parseFrom requires copying out of the framed chunk
153 // Writing a custom parser would have to do varint handling and potentially
154 // deal with out-of-order tags etc.
155 ctx = Transport.ContextValue.parseFrom(Buffers.openStream(nextFrame, false));
156 } catch (IOException e) {
157 throw new RuntimeException(e);
158 } finally {
159 nextFrame.close();
160 nextFrame = null;
161 }
162
163 // Call the handler.
164 Buffer ctxBuffer = Buffers.wrap(ctx.getValue());
165 listener.onContext(ctx.getKey(), Buffers.openStream(ctxBuffer, true),
166 ctxBuffer.readableBytes());
167 }
168
169 /**
170 * Processes the payload of a message frame.
171 */
172 private void processMessage() {
173 try {
174 listener.onPayload(Buffers.openStream(nextFrame, true), nextFrame.readableBytes());
175 } finally {
176 // Don't close the frame, since the listener is now responsible for the life-cycle.
177 nextFrame = null;
178 }
179 }
180
181 /**
182 * Processes the payload of a status frame.
183 */
184 private void processStatus() {
185 try {
186 int statusCode = nextFrame.readUnsignedShort();
187 Transport.Code code = Transport.Code.valueOf(statusCode);
188 notifyStatus(code != null ? new Status(code)
189 : new Status(Transport.Code.UNKNOWN, "Unknown status code " + statusCode));
190 } finally {
191 nextFrame.close();
192 nextFrame = null;
193 }
194 }
195
196 /**
197 * Delivers the status notification to the listener.
198 */
199 private void notifyStatus(Status status) {
200 statusNotified = true;
201 listener.onStatus(status);
202 }
203}