blob: dd7d80fa1b8043dbab86ea95bd70bde7657db3d3 [file] [log] [blame]
ejona7f23d5c2014-06-18 13:23:32 -07001package com.google.net.stubby.newtransport;
2
3import com.google.common.io.ByteStreams;
4import com.google.net.stubby.GrpcFramingUtil;
5import com.google.net.stubby.Operation;
6import com.google.net.stubby.Status;
7import com.google.net.stubby.transport.Transport;
8
9import java.io.ByteArrayInputStream;
10import java.io.DataInputStream;
11import java.io.IOException;
12import java.io.InputStream;
13
14/**
15 * Base implementation that joins a sequence of framed GRPC data produced by a {@link Framer},
16 * reconstructs their messages and hands them off to a receiving {@link Operation}
17 */
18public abstract class Deframer<F> implements Framer.Sink<F> {
19
20 /**
21 * Unset frame length
22 */
23 private static final int LENGTH_NOT_SET = -1;
24
ejona9d502992014-09-22 12:23:19 -070025 private final GrpcDeframer.Sink target;
ejona7f23d5c2014-06-18 13:23:32 -070026 private boolean inFrame;
27 private byte currentFlags;
28 private int currentLength = LENGTH_NOT_SET;
nathanmittlercc7cdb12014-07-11 12:00:32 -070029 private boolean statusDelivered;
ejona7f23d5c2014-06-18 13:23:32 -070030
ejona9d502992014-09-22 12:23:19 -070031 public Deframer(GrpcDeframer.Sink target) {
ejona7f23d5c2014-06-18 13:23:32 -070032 this.target = target;
33 }
34
35 @Override
36 public void deliverFrame(F frame, boolean endOfStream) {
37 int remaining = internalDeliverFrame(frame);
nathanmittlercc7cdb12014-07-11 12:00:32 -070038 if (endOfStream) {
39 if (remaining > 0) {
40 writeStatus(new Status(Transport.Code.UNKNOWN, "EOF on incomplete frame"));
41 } else if (!statusDelivered) {
42 writeStatus(Status.OK);
43 }
ejona7f23d5c2014-06-18 13:23:32 -070044 }
45 }
46
47 /**
48 * Consume a frame of bytes provided by the transport. Note that transport framing is not
49 * aligned on GRPC frame boundaries so this code needs to do bounds checking and buffering
50 * across transport frame boundaries.
51 *
52 * @return the number of unconsumed bytes remaining in the buffer
53 */
54 private int internalDeliverFrame(F frame) {
55 try {
56 frame = decompress(frame);
57 DataInputStream grpcStream = prefix(frame);
58 // Loop until no more GRPC frames can be fully decoded
59 while (true) {
60 if (!inFrame) {
61 // Not in frame so attempt to read flags
62 if (!ensure(grpcStream, GrpcFramingUtil.FRAME_TYPE_LENGTH)) {
63 return consolidate();
64 }
65 currentFlags = grpcStream.readByte();
66 inFrame = true;
67 }
68 if (currentLength == LENGTH_NOT_SET) {
69 // Read the frame length
70 if (!ensure(grpcStream, GrpcFramingUtil.FRAME_LENGTH)) {
71 return consolidate();
72 }
73 currentLength = grpcStream.readInt();
74 }
75 // Ensure that the entire frame length is available to read
76 InputStream framedChunk = ensureMessage(grpcStream, currentLength);
77 if (framedChunk == null) {
78 // Insufficient bytes available
79 return consolidate();
80 }
81 if (GrpcFramingUtil.isPayloadFrame(currentFlags)) {
82 // Advance stream now, because target.addPayload() may not or may process the frame on
83 // another thread.
84 framedChunk = new ByteArrayInputStream(ByteStreams.toByteArray(framedChunk));
85 try {
86 // Report payload to the receiving operation
nathanmittler43d2fcc2014-08-29 21:58:54 -070087 target.messageRead(framedChunk, currentLength);
ejona7f23d5c2014-06-18 13:23:32 -070088 } finally {
89 currentLength = LENGTH_NOT_SET;
90 inFrame = false;
91 }
ejona7f23d5c2014-06-18 13:23:32 -070092 } else if (GrpcFramingUtil.isStatusFrame(currentFlags)) {
93 int status = framedChunk.read() << 8 | framedChunk.read();
94 Transport.Code code = Transport.Code.valueOf(status);
95 // TODO(user): Resolve what to do with remainder of framedChunk
96 try {
97 if (code == null) {
98 // Log for unknown code
nathanmittlercc7cdb12014-07-11 12:00:32 -070099 writeStatus(new Status(Transport.Code.UNKNOWN, "Unknown status code " + status));
ejona7f23d5c2014-06-18 13:23:32 -0700100 } else {
nathanmittlercc7cdb12014-07-11 12:00:32 -0700101 writeStatus(new Status(code));
ejona7f23d5c2014-06-18 13:23:32 -0700102 }
103 } finally {
104 currentLength = LENGTH_NOT_SET;
105 inFrame = false;
106 }
107 }
108 if (grpcStream.available() == 0) {
109 // We've processed all the data so consolidate the underlying buffers
110 return consolidate();
111 }
112 }
113 } catch (IOException ioe) {
114 Status status = new Status(Transport.Code.UNKNOWN, ioe);
nathanmittlercc7cdb12014-07-11 12:00:32 -0700115 writeStatus(status);
ejona7f23d5c2014-06-18 13:23:32 -0700116 throw status.asRuntimeException();
117 }
118 }
119
120 /**
121 * Return a stream view over the current buffer prefixed to the input frame
122 */
123 protected abstract DataInputStream prefix(F frame) throws IOException;
124
125 /**
126 * Consolidate the underlying buffers and return the number of buffered bytes remaining
127 */
128 protected abstract int consolidate() throws IOException;
129
130 /**
131 * Decompress the raw frame buffer prior to prefixing it.
132 */
133 protected abstract F decompress(F frame) throws IOException;
134
135 /**
136 * Ensure that {@code len} bytes are available in the buffer and frame
137 */
138 private boolean ensure(InputStream input, int len) throws IOException {
139 return (input.available() >= len);
140 }
141
nathanmittlercc7cdb12014-07-11 12:00:32 -0700142 private void writeStatus(Status status) {
ejona9d502992014-09-22 12:23:19 -0700143 target.statusRead(status);
144 target.endOfStream();
nathanmittlercc7cdb12014-07-11 12:00:32 -0700145 statusDelivered = true;
146 }
147
ejona7f23d5c2014-06-18 13:23:32 -0700148 /**
149 * Return a message of {@code len} bytes than can be read from the buffer. If sufficient
150 * bytes are unavailable then buffer the available bytes and return null.
151 */
152 private InputStream ensureMessage(InputStream input, int len)
153 throws IOException {
154 if (input.available() < len) {
155 return null;
156 }
157 return ByteStreams.limit(input, len);
158 }
159}