blob: f709a5d287d366f65b0d4c8c9f21dff434d66199 [file] [log] [blame]
ejona7f23d5c2014-06-18 13:23:32 -07001package com.google.net.stubby.newtransport;
2
3import com.google.common.io.ByteStreams;
4import com.google.net.stubby.GrpcFramingUtil;
5import com.google.net.stubby.Operation;
6import com.google.net.stubby.Status;
7import com.google.net.stubby.transport.Transport;
nathanmittlercc7cdb12014-07-11 12:00:32 -07008import com.google.protobuf.ByteString;
ejona7f23d5c2014-06-18 13:23:32 -07009
10import java.io.ByteArrayInputStream;
11import java.io.DataInputStream;
12import java.io.IOException;
13import java.io.InputStream;
14
15/**
16 * Base implementation that joins a sequence of framed GRPC data produced by a {@link Framer},
17 * reconstructs their messages and hands them off to a receiving {@link Operation}
18 */
19public abstract class Deframer<F> implements Framer.Sink<F> {
20
21 /**
22 * Unset frame length
23 */
24 private static final int LENGTH_NOT_SET = -1;
25
nathanmittler43d2fcc2014-08-29 21:58:54 -070026 private final StreamListener target;
ejona7f23d5c2014-06-18 13:23:32 -070027 private boolean inFrame;
28 private byte currentFlags;
29 private int currentLength = LENGTH_NOT_SET;
nathanmittlercc7cdb12014-07-11 12:00:32 -070030 private boolean statusDelivered;
ejona7f23d5c2014-06-18 13:23:32 -070031
nathanmittler43d2fcc2014-08-29 21:58:54 -070032 public Deframer(StreamListener target) {
ejona7f23d5c2014-06-18 13:23:32 -070033 this.target = target;
34 }
35
36 @Override
37 public void deliverFrame(F frame, boolean endOfStream) {
38 int remaining = internalDeliverFrame(frame);
nathanmittlercc7cdb12014-07-11 12:00:32 -070039 if (endOfStream) {
40 if (remaining > 0) {
41 writeStatus(new Status(Transport.Code.UNKNOWN, "EOF on incomplete frame"));
42 } else if (!statusDelivered) {
43 writeStatus(Status.OK);
44 }
ejona7f23d5c2014-06-18 13:23:32 -070045 }
46 }
47
48 /**
49 * Consume a frame of bytes provided by the transport. Note that transport framing is not
50 * aligned on GRPC frame boundaries so this code needs to do bounds checking and buffering
51 * across transport frame boundaries.
52 *
53 * @return the number of unconsumed bytes remaining in the buffer
54 */
55 private int internalDeliverFrame(F frame) {
56 try {
57 frame = decompress(frame);
58 DataInputStream grpcStream = prefix(frame);
59 // Loop until no more GRPC frames can be fully decoded
60 while (true) {
61 if (!inFrame) {
62 // Not in frame so attempt to read flags
63 if (!ensure(grpcStream, GrpcFramingUtil.FRAME_TYPE_LENGTH)) {
64 return consolidate();
65 }
66 currentFlags = grpcStream.readByte();
67 inFrame = true;
68 }
69 if (currentLength == LENGTH_NOT_SET) {
70 // Read the frame length
71 if (!ensure(grpcStream, GrpcFramingUtil.FRAME_LENGTH)) {
72 return consolidate();
73 }
74 currentLength = grpcStream.readInt();
75 }
76 // Ensure that the entire frame length is available to read
77 InputStream framedChunk = ensureMessage(grpcStream, currentLength);
78 if (framedChunk == null) {
79 // Insufficient bytes available
80 return consolidate();
81 }
82 if (GrpcFramingUtil.isPayloadFrame(currentFlags)) {
83 // Advance stream now, because target.addPayload() may not or may process the frame on
84 // another thread.
85 framedChunk = new ByteArrayInputStream(ByteStreams.toByteArray(framedChunk));
86 try {
87 // Report payload to the receiving operation
nathanmittler43d2fcc2014-08-29 21:58:54 -070088 target.messageRead(framedChunk, currentLength);
ejona7f23d5c2014-06-18 13:23:32 -070089 } finally {
90 currentLength = LENGTH_NOT_SET;
91 inFrame = false;
92 }
93 } else if (GrpcFramingUtil.isContextValueFrame(currentFlags)) {
94 // Not clear if using proto encoding here is of any benefit.
95 // Using ContextValue.parseFrom requires copying out of the framed chunk
96 // Writing a custom parser would have to do varint handling and potentially
97 // deal with out-of-order tags etc.
98 Transport.ContextValue contextValue = Transport.ContextValue.parseFrom(framedChunk);
99 try {
nathanmittlercc7cdb12014-07-11 12:00:32 -0700100 ByteString value = contextValue.getValue();
nathanmittler43d2fcc2014-08-29 21:58:54 -0700101 target.contextRead(contextValue.getKey(), value.newInput(), value.size());
ejona7f23d5c2014-06-18 13:23:32 -0700102 } finally {
103 currentLength = LENGTH_NOT_SET;
104 inFrame = false;
105 }
106 } else if (GrpcFramingUtil.isStatusFrame(currentFlags)) {
107 int status = framedChunk.read() << 8 | framedChunk.read();
108 Transport.Code code = Transport.Code.valueOf(status);
109 // TODO(user): Resolve what to do with remainder of framedChunk
110 try {
111 if (code == null) {
112 // Log for unknown code
nathanmittlercc7cdb12014-07-11 12:00:32 -0700113 writeStatus(new Status(Transport.Code.UNKNOWN, "Unknown status code " + status));
ejona7f23d5c2014-06-18 13:23:32 -0700114 } else {
nathanmittlercc7cdb12014-07-11 12:00:32 -0700115 writeStatus(new Status(code));
ejona7f23d5c2014-06-18 13:23:32 -0700116 }
117 } finally {
118 currentLength = LENGTH_NOT_SET;
119 inFrame = false;
120 }
121 }
122 if (grpcStream.available() == 0) {
123 // We've processed all the data so consolidate the underlying buffers
124 return consolidate();
125 }
126 }
127 } catch (IOException ioe) {
128 Status status = new Status(Transport.Code.UNKNOWN, ioe);
nathanmittlercc7cdb12014-07-11 12:00:32 -0700129 writeStatus(status);
ejona7f23d5c2014-06-18 13:23:32 -0700130 throw status.asRuntimeException();
131 }
132 }
133
134 /**
135 * Return a stream view over the current buffer prefixed to the input frame
136 */
137 protected abstract DataInputStream prefix(F frame) throws IOException;
138
139 /**
140 * Consolidate the underlying buffers and return the number of buffered bytes remaining
141 */
142 protected abstract int consolidate() throws IOException;
143
144 /**
145 * Decompress the raw frame buffer prior to prefixing it.
146 */
147 protected abstract F decompress(F frame) throws IOException;
148
149 /**
150 * Ensure that {@code len} bytes are available in the buffer and frame
151 */
152 private boolean ensure(InputStream input, int len) throws IOException {
153 return (input.available() >= len);
154 }
155
nathanmittlercc7cdb12014-07-11 12:00:32 -0700156 private void writeStatus(Status status) {
nathanmittler43d2fcc2014-08-29 21:58:54 -0700157 target.closed(status);
nathanmittlercc7cdb12014-07-11 12:00:32 -0700158 statusDelivered = true;
159 }
160
ejona7f23d5c2014-06-18 13:23:32 -0700161 /**
162 * Return a message of {@code len} bytes than can be read from the buffer. If sufficient
163 * bytes are unavailable then buffer the available bytes and return null.
164 */
165 private InputStream ensureMessage(InputStream input, int len)
166 throws IOException {
167 if (input.available() < len) {
168 return null;
169 }
170 return ByteStreams.limit(input, len);
171 }
172}