blob: 8959408007c0001829932a7b15f7405639334c51 [file] [log] [blame]
lryan56e307f2014-12-05 13:25:08 -08001/*
2 * Copyright 2014, Google Inc. All rights reserved.
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions are
6 * met:
7 *
8 * * Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above
11 * copyright notice, this list of conditions and the following disclaimer
12 * in the documentation and/or other materials provided with the
13 * distribution.
14 *
15 * * Neither the name of Google Inc. nor the names of its
16 * contributors may be used to endorse or promote products derived from
17 * this software without specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
20 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
21 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
22 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
23 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
26 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
27 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
28 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 */
31
nathanmittler29cbef12014-10-27 11:33:19 -070032package com.google.net.stubby.transport;
nathanmittlerc0a06812014-07-11 14:57:47 -070033
lryanc5e70c22014-11-24 16:41:02 -080034import com.google.common.annotations.VisibleForTesting;
35import com.google.common.base.MoreObjects;
nathanmittlerc0a06812014-07-11 14:57:47 -070036import com.google.common.base.Preconditions;
nathanmittlerc0a06812014-07-11 14:57:47 -070037
38import java.io.InputStream;
39import java.nio.ByteBuffer;
40
41import javax.annotation.Nullable;
42
43/**
44 * Abstract base class for {@link Stream} implementations.
45 */
lryan669724a2014-11-10 10:21:45 -080046public abstract class AbstractStream<IdT> implements Stream {
ejona913de472014-09-18 07:48:43 -070047 /**
nathanmittlerc0a06812014-07-11 14:57:47 -070048 * Indicates the phase of the GRPC stream in one direction.
49 */
50 protected enum Phase {
lryana7d735e2014-09-09 19:25:43 -070051 HEADERS, MESSAGE, STATUS
nathanmittlerc0a06812014-07-11 14:57:47 -070052 }
53
lryan669724a2014-11-10 10:21:45 -080054 private volatile IdT id;
simonmaf589a4a2015-01-15 16:23:32 -080055 private final MessageFramer framer;
nathanmittler5d953e82014-11-18 13:12:42 -080056
simonmaf589a4a2015-01-15 16:23:32 -080057 final MessageDeframer deframer;
lryanc5e70c22014-11-24 16:41:02 -080058
59 /**
60 * Inbound phase is exclusively written to by the transport thread.
61 */
62 private Phase inboundPhase = Phase.HEADERS;
63
64 /**
65 * Outbound phase is exclusively written to by the application thread.
66 */
67 private Phase outboundPhase = Phase.HEADERS;
nathanmittlerc0a06812014-07-11 14:57:47 -070068
nmittlerde3a1312015-01-16 11:54:24 -080069 AbstractStream() {
simonmaf589a4a2015-01-15 16:23:32 -080070 MessageDeframer.Listener inboundMessageHandler = new MessageDeframer.Listener() {
nathanmittler11c363a2015-01-09 11:22:19 -080071 @Override
72 public void bytesRead(int numBytes) {
73 returnProcessedBytes(numBytes);
74 }
75
lryan669724a2014-11-10 10:21:45 -080076 @Override
nmittlerde3a1312015-01-16 11:54:24 -080077 public void messageRead(InputStream input, final int length) {
78 receiveMessage(input, length);
nathanmittlerc0a06812014-07-11 14:57:47 -070079 }
nathanmittlerc0a06812014-07-11 14:57:47 -070080
lryan669724a2014-11-10 10:21:45 -080081 @Override
nathanmittler11c363a2015-01-09 11:22:19 -080082 public void deliveryStalled() {
83 inboundDeliveryPaused();
84 }
85
86 @Override
lryan669724a2014-11-10 10:21:45 -080087 public void endOfStream() {
88 remoteEndClosed();
89 }
90 };
simonmaf589a4a2015-01-15 16:23:32 -080091 MessageFramer.Sink<ByteBuffer> outboundFrameHandler = new MessageFramer.Sink<ByteBuffer>() {
lryan669724a2014-11-10 10:21:45 -080092 @Override
93 public void deliverFrame(ByteBuffer frame, boolean endOfStream) {
94 internalSendFrame(frame, endOfStream);
95 }
96 };
nathanmittler5d953e82014-11-18 13:12:42 -080097
simonmaf589a4a2015-01-15 16:23:32 -080098 framer = new MessageFramer(outboundFrameHandler, 4096);
nmittlerde3a1312015-01-16 11:54:24 -080099 this.deframer = new MessageDeframer(inboundMessageHandler);
nathanmittlerc0a06812014-07-11 14:57:47 -0700100 }
101
nathanmittlerc0a06812014-07-11 14:57:47 -0700102 /**
lryan669724a2014-11-10 10:21:45 -0800103 * Returns the internal id for this stream. Note that Id can be {@code null} for client streams
104 * as the transport may defer creating the stream to the remote side until is has payload or
105 * metadata to send.
106 */
107 @Nullable
108 public IdT id() {
109 return id;
110 }
111
112 /**
113 * Set the internal id for this stream
114 */
115 public void id(IdT id) {
116 Preconditions.checkState(id != null, "Can only set id once");
117 this.id = id;
118 }
119
nathanmittlerc0a06812014-07-11 14:57:47 -0700120 @Override
lryan28497e32014-10-17 16:14:38 -0700121 public void writeMessage(InputStream message, int length, @Nullable Runnable accepted) {
nathanmittlerc0a06812014-07-11 14:57:47 -0700122 Preconditions.checkNotNull(message, "message");
123 Preconditions.checkArgument(length >= 0, "length must be >= 0");
124 outboundPhase(Phase.MESSAGE);
lryanc5e70c22014-11-24 16:41:02 -0800125 if (!framer.isClosed()) {
126 framer.writePayload(message, length);
nathanmittlerc0a06812014-07-11 14:57:47 -0700127 }
128
ejona4de20262015-01-16 16:17:35 -0800129 // TODO(nathanmittler): add flow control.
nathanmittlerc0a06812014-07-11 14:57:47 -0700130 if (accepted != null) {
131 accepted.run();
132 }
133 }
134
135 @Override
136 public final void flush() {
lryanc5e70c22014-11-24 16:41:02 -0800137 if (!framer.isClosed()) {
138 framer.flush();
nathanmittlerc0a06812014-07-11 14:57:47 -0700139 }
140 }
141
142 /**
lryanc5e70c22014-11-24 16:41:02 -0800143 * Closes the underlying framer.
144 *
145 * <p>No-op if the framer has already been closed.
lryanc5e70c22014-11-24 16:41:02 -0800146 */
ejonac0f41922014-12-23 12:16:17 -0800147 final void closeFramer() {
lryanc5e70c22014-11-24 16:41:02 -0800148 if (!framer.isClosed()) {
lryanc5e70c22014-11-24 16:41:02 -0800149 framer.close();
150 }
151 }
152
153 /**
154 * Free any resources associated with this stream. Subclass implementations must call this
155 * version.
156 * <p>
157 * NOTE. Can be called by both the transport thread and the application thread. Transport
158 * threads need to dispose when the remote side has terminated the stream. Application threads
159 * will dispose when the application decides to close the stream as part of normal processing.
160 * </p>
161 */
162 public void dispose() {
163 framer.dispose();
164 }
165
166 /**
zhangkun048649e2014-08-28 15:52:03 -0700167 * Sends an outbound frame to the remote end point.
nathanmittlerc0a06812014-07-11 14:57:47 -0700168 *
169 * @param frame a buffer containing the chunk of data to be sent.
170 * @param endOfStream if {@code true} indicates that no more data will be sent on the stream by
171 * this endpoint.
172 */
ejona9d502992014-09-22 12:23:19 -0700173 protected abstract void internalSendFrame(ByteBuffer frame, boolean endOfStream);
nathanmittlerc0a06812014-07-11 14:57:47 -0700174
ejona9d502992014-09-22 12:23:19 -0700175 /** A message was deframed. */
nmittlerde3a1312015-01-16 11:54:24 -0800176 protected abstract void receiveMessage(InputStream is, int length);
ejona9d502992014-09-22 12:23:19 -0700177
nathanmittler11c363a2015-01-09 11:22:19 -0800178 /** Deframer has no pending deliveries. */
179 protected abstract void inboundDeliveryPaused();
180
ejona9d502992014-09-22 12:23:19 -0700181 /** Deframer reached end of stream. */
182 protected abstract void remoteEndClosed();
zhangkun048649e2014-08-28 15:52:03 -0700183
184 /**
nathanmittler5d953e82014-11-18 13:12:42 -0800185 * Returns the given number of processed bytes back to inbound flow control to enable receipt of
186 * more data.
nathanmittler43d2fcc2014-08-29 21:58:54 -0700187 */
nathanmittler5d953e82014-11-18 13:12:42 -0800188 protected abstract void returnProcessedBytes(int processedBytes);
189
190 /**
191 * Called when a {@link #deframe(Buffer, boolean)} operation failed.
192 */
193 protected abstract void deframeFailed(Throwable cause);
194
195 /**
196 * Called to parse a received frame and attempt delivery of any completed
nmittlerde3a1312015-01-16 11:54:24 -0800197 * messages. Must be called from the transport thread.
nathanmittler5d953e82014-11-18 13:12:42 -0800198 */
199 protected final void deframe(Buffer frame, boolean endOfStream) {
nmittlerde3a1312015-01-16 11:54:24 -0800200 try {
201 deframer.deframe(frame, endOfStream);
202 } catch (Throwable t) {
203 deframeFailed(t);
nathanmittler5d953e82014-11-18 13:12:42 -0800204 }
205 }
206
207 /**
nmittlerde3a1312015-01-16 11:54:24 -0800208 * Called to request the given number of messages from the deframer. Must be called
209 * from the transport thread.
nathanmittler5d953e82014-11-18 13:12:42 -0800210 */
nmittlerde3a1312015-01-16 11:54:24 -0800211 protected final void requestMessagesFromDeframer(int numMessages) {
212 try {
213 deframer.request(numMessages);
214 } catch (Throwable t) {
215 deframeFailed(t);
nathanmittler5d953e82014-11-18 13:12:42 -0800216 }
217 }
nathanmittler43d2fcc2014-08-29 21:58:54 -0700218
lryanc5e70c22014-11-24 16:41:02 -0800219 final Phase inboundPhase() {
220 return inboundPhase;
221 }
222
nathanmittler43d2fcc2014-08-29 21:58:54 -0700223 /**
lryanc5e70c22014-11-24 16:41:02 -0800224 * Transitions the inbound phase to the given phase and returns the previous phase.
225 * If the transition is disallowed, throws an {@link IllegalStateException}.
nathanmittlerc0a06812014-07-11 14:57:47 -0700226 */
lryanc5e70c22014-11-24 16:41:02 -0800227 final Phase inboundPhase(Phase nextPhase) {
228 Phase tmp = inboundPhase;
nathanmittlerc0a06812014-07-11 14:57:47 -0700229 inboundPhase = verifyNextPhase(inboundPhase, nextPhase);
lryanc5e70c22014-11-24 16:41:02 -0800230 return tmp;
231 }
232
233 final Phase outboundPhase() {
234 return outboundPhase;
nathanmittlerc0a06812014-07-11 14:57:47 -0700235 }
236
237 /**
lryanc5e70c22014-11-24 16:41:02 -0800238 * Transitions the outbound phase to the given phase and returns the previous phase.
239 * If the transition is disallowed, throws an {@link IllegalStateException}.
nathanmittlerc0a06812014-07-11 14:57:47 -0700240 */
lryanc5e70c22014-11-24 16:41:02 -0800241 final Phase outboundPhase(Phase nextPhase) {
242 Phase tmp = outboundPhase;
nathanmittlerc0a06812014-07-11 14:57:47 -0700243 outboundPhase = verifyNextPhase(outboundPhase, nextPhase);
lryanc5e70c22014-11-24 16:41:02 -0800244 return tmp;
zhangkun048649e2014-08-28 15:52:03 -0700245 }
246
nathanmittlerc0a06812014-07-11 14:57:47 -0700247 private Phase verifyNextPhase(Phase currentPhase, Phase nextPhase) {
lryanc5e70c22014-11-24 16:41:02 -0800248 if (nextPhase.ordinal() < currentPhase.ordinal()) {
nathanmittlerc0a06812014-07-11 14:57:47 -0700249 throw new IllegalStateException(
250 String.format("Cannot transition phase from %s to %s", currentPhase, nextPhase));
251 }
252 return nextPhase;
253 }
254
255 /**
lryanc5e70c22014-11-24 16:41:02 -0800256 * Can the stream receive data from its remote peer.
257 */
258 public boolean canReceive() {
259 return inboundPhase() != Phase.STATUS;
260 }
261
262 /**
263 * Can the stream send data to its remote peer.
264 */
265 public boolean canSend() {
266 return outboundPhase() != Phase.STATUS;
267 }
268
269 /**
270 * Is the stream fully closed. Note that this method is not thread-safe as inboundPhase and
271 * outboundPhase are mutated in different threads. Tests must account for thread coordination
272 * when calling.
273 */
274 @VisibleForTesting
275 public boolean isClosed() {
276 return inboundPhase() == Phase.STATUS && outboundPhase() == Phase.STATUS;
277 }
278
nathanmittler11c363a2015-01-09 11:22:19 -0800279 @Override
lryanc5e70c22014-11-24 16:41:02 -0800280 public String toString() {
281 return toStringHelper().toString();
282 }
283
284 protected MoreObjects.ToStringHelper toStringHelper() {
285 return MoreObjects.toStringHelper(this)
286 .add("id", id())
287 .add("inboundPhase", inboundPhase().name())
288 .add("outboundPhase", outboundPhase().name());
289
290 }
nathanmittlerc0a06812014-07-11 14:57:47 -0700291}