blob: 06b26e50a6511f0bcc1388f70120fb385996b2bb [file] [log] [blame]
ejona7f23d5c2014-06-18 13:23:32 -07001package com.google.net.stubby.newtransport;
2
3import com.google.common.io.ByteStreams;
4import com.google.net.stubby.GrpcFramingUtil;
5import com.google.net.stubby.Operation;
6import com.google.net.stubby.Status;
7import com.google.net.stubby.transport.Transport;
8
9import java.io.ByteArrayInputStream;
10import java.io.DataInputStream;
11import java.io.IOException;
12import java.io.InputStream;
13
14/**
15 * Base implementation that joins a sequence of framed GRPC data produced by a {@link Framer},
16 * reconstructs their messages and hands them off to a receiving {@link Operation}
17 */
18public abstract class Deframer<F> implements Framer.Sink<F> {
19
20 /**
21 * Unset frame length
22 */
23 private static final int LENGTH_NOT_SET = -1;
24
25 private final Framer target;
26 private boolean inFrame;
27 private byte currentFlags;
28 private int currentLength = LENGTH_NOT_SET;
29
30 public Deframer(Framer target) {
31 this.target = target;
32 }
33
34 @Override
35 public void deliverFrame(F frame, boolean endOfStream) {
36 int remaining = internalDeliverFrame(frame);
37 if (endOfStream && remaining > 0) {
38 target.writeStatus(new Status(Transport.Code.UNKNOWN, "EOF on incomplete frame"));
39 }
40 }
41
42 /**
43 * Consume a frame of bytes provided by the transport. Note that transport framing is not
44 * aligned on GRPC frame boundaries so this code needs to do bounds checking and buffering
45 * across transport frame boundaries.
46 *
47 * @return the number of unconsumed bytes remaining in the buffer
48 */
49 private int internalDeliverFrame(F frame) {
50 try {
51 frame = decompress(frame);
52 DataInputStream grpcStream = prefix(frame);
53 // Loop until no more GRPC frames can be fully decoded
54 while (true) {
55 if (!inFrame) {
56 // Not in frame so attempt to read flags
57 if (!ensure(grpcStream, GrpcFramingUtil.FRAME_TYPE_LENGTH)) {
58 return consolidate();
59 }
60 currentFlags = grpcStream.readByte();
61 inFrame = true;
62 }
63 if (currentLength == LENGTH_NOT_SET) {
64 // Read the frame length
65 if (!ensure(grpcStream, GrpcFramingUtil.FRAME_LENGTH)) {
66 return consolidate();
67 }
68 currentLength = grpcStream.readInt();
69 }
70 // Ensure that the entire frame length is available to read
71 InputStream framedChunk = ensureMessage(grpcStream, currentLength);
72 if (framedChunk == null) {
73 // Insufficient bytes available
74 return consolidate();
75 }
76 if (GrpcFramingUtil.isPayloadFrame(currentFlags)) {
77 // Advance stream now, because target.addPayload() may not or may process the frame on
78 // another thread.
79 framedChunk = new ByteArrayInputStream(ByteStreams.toByteArray(framedChunk));
80 try {
81 // Report payload to the receiving operation
82 target.writePayload(framedChunk, currentLength);
83 } finally {
84 currentLength = LENGTH_NOT_SET;
85 inFrame = false;
86 }
87 } else if (GrpcFramingUtil.isContextValueFrame(currentFlags)) {
88 // Not clear if using proto encoding here is of any benefit.
89 // Using ContextValue.parseFrom requires copying out of the framed chunk
90 // Writing a custom parser would have to do varint handling and potentially
91 // deal with out-of-order tags etc.
92 Transport.ContextValue contextValue = Transport.ContextValue.parseFrom(framedChunk);
93 try {
94 target.writeContext(contextValue.getKey(),
95 contextValue.getValue().newInput(), currentLength);
96 } finally {
97 currentLength = LENGTH_NOT_SET;
98 inFrame = false;
99 }
100 } else if (GrpcFramingUtil.isStatusFrame(currentFlags)) {
101 int status = framedChunk.read() << 8 | framedChunk.read();
102 Transport.Code code = Transport.Code.valueOf(status);
103 // TODO(user): Resolve what to do with remainder of framedChunk
104 try {
105 if (code == null) {
106 // Log for unknown code
107 target.writeStatus(
108 new Status(Transport.Code.UNKNOWN, "Unknown status code " + status));
109 } else {
110 target.writeStatus(new Status(code));
111 }
112 } finally {
113 currentLength = LENGTH_NOT_SET;
114 inFrame = false;
115 }
116 }
117 if (grpcStream.available() == 0) {
118 // We've processed all the data so consolidate the underlying buffers
119 return consolidate();
120 }
121 }
122 } catch (IOException ioe) {
123 Status status = new Status(Transport.Code.UNKNOWN, ioe);
124 target.writeStatus(status);
125 throw status.asRuntimeException();
126 }
127 }
128
129 /**
130 * Return a stream view over the current buffer prefixed to the input frame
131 */
132 protected abstract DataInputStream prefix(F frame) throws IOException;
133
134 /**
135 * Consolidate the underlying buffers and return the number of buffered bytes remaining
136 */
137 protected abstract int consolidate() throws IOException;
138
139 /**
140 * Decompress the raw frame buffer prior to prefixing it.
141 */
142 protected abstract F decompress(F frame) throws IOException;
143
144 /**
145 * Ensure that {@code len} bytes are available in the buffer and frame
146 */
147 private boolean ensure(InputStream input, int len) throws IOException {
148 return (input.available() >= len);
149 }
150
151 /**
152 * Return a message of {@code len} bytes than can be read from the buffer. If sufficient
153 * bytes are unavailable then buffer the available bytes and return null.
154 */
155 private InputStream ensureMessage(InputStream input, int len)
156 throws IOException {
157 if (input.available() < len) {
158 return null;
159 }
160 return ByteStreams.limit(input, len);
161 }
162}