blob: c08fa81d41bb9f230c5ff87fd46158b8e657e120 [file] [log] [blame]
nathanmittler64eae042014-08-11 13:29:43 -07001package com.google.net.stubby.newtransport;
2
3import static com.google.net.stubby.GrpcFramingUtil.CONTEXT_VALUE_FRAME;
4import static com.google.net.stubby.GrpcFramingUtil.FRAME_LENGTH;
5import static com.google.net.stubby.GrpcFramingUtil.FRAME_TYPE_LENGTH;
6import static com.google.net.stubby.GrpcFramingUtil.FRAME_TYPE_MASK;
7import static com.google.net.stubby.GrpcFramingUtil.PAYLOAD_FRAME;
8import static com.google.net.stubby.GrpcFramingUtil.STATUS_FRAME;
9
10import com.google.common.base.Preconditions;
nathanmittler43d2fcc2014-08-29 21:58:54 -070011import com.google.common.util.concurrent.ListenableFuture;
nathanmittler64eae042014-08-11 13:29:43 -070012import com.google.net.stubby.Status;
13import com.google.net.stubby.transport.Transport;
14
15import java.io.Closeable;
16import java.io.IOException;
nathanmittler43d2fcc2014-08-29 21:58:54 -070017import java.util.concurrent.Executor;
nathanmittler64eae042014-08-11 13:29:43 -070018
19/**
20 * Deframer for GRPC frames. Delegates deframing/decompression of the GRPC compression frame to a
21 * {@link Decompressor}.
22 */
23public class GrpcDeframer implements Closeable {
nathanmittler43d2fcc2014-08-29 21:58:54 -070024
nathanmittler64eae042014-08-11 13:29:43 -070025 private enum State {
26 HEADER, BODY
27 }
28
29 private static final int HEADER_LENGTH = FRAME_TYPE_LENGTH + FRAME_LENGTH;
30 private final Decompressor decompressor;
nathanmittler43d2fcc2014-08-29 21:58:54 -070031 private final Executor executor;
32 private final Runnable deliveryTask;
nathanmittler64eae042014-08-11 13:29:43 -070033 private State state = State.HEADER;
34 private int requiredLength = HEADER_LENGTH;
35 private int frameType;
36 private boolean statusNotified;
nathanmittler43d2fcc2014-08-29 21:58:54 -070037 private boolean endOfStream;
38 private boolean deliveryOutstanding;
39 private StreamListener listener;
nathanmittler64eae042014-08-11 13:29:43 -070040 private CompositeBuffer nextFrame;
41
nathanmittler43d2fcc2014-08-29 21:58:54 -070042 /**
43 * Constructs the deframer.
44 *
45 * @param decompressor the object used for de-framing GRPC compression frames.
46 * @param listener the listener for fully read GRPC messages.
47 * @param executor the executor to be used for delivery. All calls to
48 * {@link #deframe(Buffer, boolean)} must be made in the context of this executor. This
49 * executor must not allow concurrent access to this class, so it must be either a single
50 * thread or have sequential processing of events.
51 */
52 public GrpcDeframer(Decompressor decompressor, StreamListener listener, Executor executor) {
nathanmittler64eae042014-08-11 13:29:43 -070053 this.decompressor = Preconditions.checkNotNull(decompressor, "decompressor");
54 this.listener = Preconditions.checkNotNull(listener, "listener");
nathanmittler43d2fcc2014-08-29 21:58:54 -070055 this.executor = Preconditions.checkNotNull(executor, "executor");
56 deliveryTask = new Runnable() {
57 @Override
58 public void run() {
59 deliveryOutstanding = false;
60 deliver();
61 }
62 };
nathanmittler64eae042014-08-11 13:29:43 -070063 }
64
nathanmittler43d2fcc2014-08-29 21:58:54 -070065 /**
66 * Adds the given data to this deframer and attempts delivery to the listener.
67 */
nathanmittler64eae042014-08-11 13:29:43 -070068 public void deframe(Buffer data, boolean endOfStream) {
69 Preconditions.checkNotNull(data, "data");
70
71 // Add the data to the decompression buffer.
72 decompressor.decompress(data);
73
nathanmittler43d2fcc2014-08-29 21:58:54 -070074 // Indicate that all of the data for this stream has been received.
75 this.endOfStream = endOfStream;
76
77 // Deliver the next message if not already delivering.
78 deliver();
79 }
80
81 @Override
82 public void close() {
83 decompressor.close();
84 if (nextFrame != null) {
85 nextFrame.close();
86 }
87 }
88
89 /**
90 * If there is no outstanding delivery, attempts to read and deliver as many messages to the
91 * listener as possible. Only one outstanding delivery is allowed at a time.
92 */
93 private void deliver() {
94 if (deliveryOutstanding) {
95 // Only allow one outstanding delivery at a time.
96 return;
97 }
98
nathanmittler64eae042014-08-11 13:29:43 -070099 // Process the uncompressed bytes.
100 while (readRequiredBytes()) {
101 if (statusNotified) {
102 throw new IllegalStateException("Inbound data after receiving status frame");
103 }
104
105 switch (state) {
106 case HEADER:
107 processHeader();
108 break;
109 case BODY:
nathanmittler43d2fcc2014-08-29 21:58:54 -0700110 // Read the body and deliver the message to the listener.
111 deliveryOutstanding = true;
112 ListenableFuture<Void> processingFuture = processBody();
113 if (processingFuture != null) {
114 // A listener was returned for the completion of processing the delivered
115 // message. Once it's done, try to deliver the next message.
116 processingFuture.addListener(deliveryTask, executor);
117 return;
118 }
119
120 // No future was returned, so assume processing is complete for the delivery.
121 deliveryOutstanding = false;
nathanmittler64eae042014-08-11 13:29:43 -0700122 break;
123 default:
124 throw new AssertionError("Invalid state: " + state);
125 }
126 }
127
128 // If reached the end of stream without reading a status frame, fabricate one
129 // and deliver to the target.
130 if (!statusNotified && endOfStream) {
131 notifyStatus(Status.OK);
132 }
133 }
134
nathanmittler64eae042014-08-11 13:29:43 -0700135 /**
136 * Attempts to read the required bytes into nextFrame.
137 *
138 * @returns {@code true} if all of the required bytes have been read.
139 */
140 private boolean readRequiredBytes() {
141 if (nextFrame == null) {
142 nextFrame = new CompositeBuffer();
143 }
144
145 // Read until the buffer contains all the required bytes.
146 int missingBytes;
147 while ((missingBytes = requiredLength - nextFrame.readableBytes()) > 0) {
148 Buffer buffer = decompressor.readBytes(missingBytes);
149 if (buffer == null) {
150 // No more data is available.
151 break;
152 }
153 // Add it to the composite buffer for the next frame.
154 nextFrame.addBuffer(buffer);
155 }
156
157 // Return whether or not all of the required bytes are now in the frame.
158 return nextFrame.readableBytes() == requiredLength;
159 }
160
161 /**
162 * Processes the GRPC compression header which is composed of the compression flag and the outer
163 * frame length.
164 */
165 private void processHeader() {
166 // Peek, but do not read the header.
167 frameType = nextFrame.readUnsignedByte() & FRAME_TYPE_MASK;
168
169 // Update the required length to include the length of the frame.
170 requiredLength = nextFrame.readInt();
171
172 // Continue reading the frame body.
173 state = State.BODY;
174 }
175
176 /**
177 * Processes the body of the GRPC compression frame. A single compression frame may contain
178 * several GRPC messages within it.
179 */
nathanmittler43d2fcc2014-08-29 21:58:54 -0700180 private ListenableFuture<Void> processBody() {
181 ListenableFuture<Void> future = null;
nathanmittler64eae042014-08-11 13:29:43 -0700182 switch (frameType) {
183 case CONTEXT_VALUE_FRAME:
nathanmittler43d2fcc2014-08-29 21:58:54 -0700184 future = processContext();
nathanmittler64eae042014-08-11 13:29:43 -0700185 break;
186 case PAYLOAD_FRAME:
nathanmittler43d2fcc2014-08-29 21:58:54 -0700187 future = processMessage();
nathanmittler64eae042014-08-11 13:29:43 -0700188 break;
189 case STATUS_FRAME:
190 processStatus();
191 break;
192 default:
193 throw new AssertionError("Invalid frameType: " + frameType);
194 }
195
196 // Done with this frame, begin processing the next header.
197 state = State.HEADER;
198 requiredLength = HEADER_LENGTH;
nathanmittler43d2fcc2014-08-29 21:58:54 -0700199 return future;
nathanmittler64eae042014-08-11 13:29:43 -0700200 }
201
202 /**
203 * Processes the payload of a context frame.
204 */
nathanmittler43d2fcc2014-08-29 21:58:54 -0700205 private ListenableFuture<Void> processContext() {
nathanmittler64eae042014-08-11 13:29:43 -0700206 Transport.ContextValue ctx;
207 try {
208 // Not clear if using proto encoding here is of any benefit.
209 // Using ContextValue.parseFrom requires copying out of the framed chunk
210 // Writing a custom parser would have to do varint handling and potentially
211 // deal with out-of-order tags etc.
212 ctx = Transport.ContextValue.parseFrom(Buffers.openStream(nextFrame, false));
213 } catch (IOException e) {
214 throw new RuntimeException(e);
215 } finally {
216 nextFrame.close();
217 nextFrame = null;
218 }
219
220 // Call the handler.
221 Buffer ctxBuffer = Buffers.wrap(ctx.getValue());
nathanmittler43d2fcc2014-08-29 21:58:54 -0700222 return listener.contextRead(ctx.getKey(), Buffers.openStream(ctxBuffer, true),
nathanmittler64eae042014-08-11 13:29:43 -0700223 ctxBuffer.readableBytes());
224 }
225
226 /**
227 * Processes the payload of a message frame.
228 */
nathanmittler43d2fcc2014-08-29 21:58:54 -0700229 private ListenableFuture<Void> processMessage() {
nathanmittler64eae042014-08-11 13:29:43 -0700230 try {
nathanmittler43d2fcc2014-08-29 21:58:54 -0700231 return listener.messageRead(Buffers.openStream(nextFrame, true), nextFrame.readableBytes());
nathanmittler64eae042014-08-11 13:29:43 -0700232 } finally {
233 // Don't close the frame, since the listener is now responsible for the life-cycle.
234 nextFrame = null;
235 }
236 }
237
238 /**
239 * Processes the payload of a status frame.
240 */
241 private void processStatus() {
242 try {
243 int statusCode = nextFrame.readUnsignedShort();
244 Transport.Code code = Transport.Code.valueOf(statusCode);
245 notifyStatus(code != null ? new Status(code)
246 : new Status(Transport.Code.UNKNOWN, "Unknown status code " + statusCode));
247 } finally {
248 nextFrame.close();
249 nextFrame = null;
250 }
251 }
252
253 /**
254 * Delivers the status notification to the listener.
255 */
256 private void notifyStatus(Status status) {
257 statusNotified = true;
nathanmittler43d2fcc2014-08-29 21:58:54 -0700258 listener.closed(status);
nathanmittler64eae042014-08-11 13:29:43 -0700259 }
260}