lryan | 56e307f | 2014-12-05 13:25:08 -0800 | [diff] [blame] | 1 | /* |
| 2 | * Copyright 2014, Google Inc. All rights reserved. |
| 3 | * |
| 4 | * Redistribution and use in source and binary forms, with or without |
| 5 | * modification, are permitted provided that the following conditions are |
| 6 | * met: |
| 7 | * |
| 8 | * * Redistributions of source code must retain the above copyright |
| 9 | * notice, this list of conditions and the following disclaimer. |
| 10 | * * Redistributions in binary form must reproduce the above |
| 11 | * copyright notice, this list of conditions and the following disclaimer |
| 12 | * in the documentation and/or other materials provided with the |
| 13 | * distribution. |
| 14 | * |
| 15 | * * Neither the name of Google Inc. nor the names of its |
| 16 | * contributors may be used to endorse or promote products derived from |
| 17 | * this software without specific prior written permission. |
| 18 | * |
| 19 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| 20 | * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| 21 | * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| 22 | * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| 23 | * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| 24 | * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| 25 | * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| 26 | * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| 27 | * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| 28 | * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| 29 | * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| 30 | */ |
| 31 | |
nathanmittler | 29cbef1 | 2014-10-27 11:33:19 -0700 | [diff] [blame] | 32 | package com.google.net.stubby.transport; |
nathanmittler | c0a0681 | 2014-07-11 14:57:47 -0700 | [diff] [blame] | 33 | |
lryan | c5e70c2 | 2014-11-24 16:41:02 -0800 | [diff] [blame] | 34 | import com.google.common.annotations.VisibleForTesting; |
| 35 | import com.google.common.base.MoreObjects; |
nathanmittler | c0a0681 | 2014-07-11 14:57:47 -0700 | [diff] [blame] | 36 | import com.google.common.base.Preconditions; |
nathanmittler | c0a0681 | 2014-07-11 14:57:47 -0700 | [diff] [blame] | 37 | |
| 38 | import java.io.InputStream; |
| 39 | import java.nio.ByteBuffer; |
| 40 | |
| 41 | import javax.annotation.Nullable; |
| 42 | |
| 43 | /** |
| 44 | * Abstract base class for {@link Stream} implementations. |
| 45 | */ |
lryan | 669724a | 2014-11-10 10:21:45 -0800 | [diff] [blame] | 46 | public abstract class AbstractStream<IdT> implements Stream { |
ejona | 913de47 | 2014-09-18 07:48:43 -0700 | [diff] [blame] | 47 | /** |
nathanmittler | c0a0681 | 2014-07-11 14:57:47 -0700 | [diff] [blame] | 48 | * Indicates the phase of the GRPC stream in one direction. |
| 49 | */ |
| 50 | protected enum Phase { |
lryan | a7d735e | 2014-09-09 19:25:43 -0700 | [diff] [blame] | 51 | HEADERS, MESSAGE, STATUS |
nathanmittler | c0a0681 | 2014-07-11 14:57:47 -0700 | [diff] [blame] | 52 | } |
| 53 | |
lryan | 669724a | 2014-11-10 10:21:45 -0800 | [diff] [blame] | 54 | private volatile IdT id; |
simonma | f589a4a | 2015-01-15 16:23:32 -0800 | [diff] [blame] | 55 | private final MessageFramer framer; |
nathanmittler | 5d953e8 | 2014-11-18 13:12:42 -0800 | [diff] [blame] | 56 | |
simonma | f589a4a | 2015-01-15 16:23:32 -0800 | [diff] [blame] | 57 | final MessageDeframer deframer; |
lryan | c5e70c2 | 2014-11-24 16:41:02 -0800 | [diff] [blame] | 58 | |
| 59 | /** |
| 60 | * Inbound phase is exclusively written to by the transport thread. |
| 61 | */ |
| 62 | private Phase inboundPhase = Phase.HEADERS; |
| 63 | |
| 64 | /** |
| 65 | * Outbound phase is exclusively written to by the application thread. |
| 66 | */ |
| 67 | private Phase outboundPhase = Phase.HEADERS; |
nathanmittler | c0a0681 | 2014-07-11 14:57:47 -0700 | [diff] [blame] | 68 | |
nmittler | de3a131 | 2015-01-16 11:54:24 -0800 | [diff] [blame^] | 69 | AbstractStream() { |
simonma | f589a4a | 2015-01-15 16:23:32 -0800 | [diff] [blame] | 70 | MessageDeframer.Listener inboundMessageHandler = new MessageDeframer.Listener() { |
nathanmittler | 11c363a | 2015-01-09 11:22:19 -0800 | [diff] [blame] | 71 | @Override |
| 72 | public void bytesRead(int numBytes) { |
| 73 | returnProcessedBytes(numBytes); |
| 74 | } |
| 75 | |
lryan | 669724a | 2014-11-10 10:21:45 -0800 | [diff] [blame] | 76 | @Override |
nmittler | de3a131 | 2015-01-16 11:54:24 -0800 | [diff] [blame^] | 77 | public void messageRead(InputStream input, final int length) { |
| 78 | receiveMessage(input, length); |
nathanmittler | c0a0681 | 2014-07-11 14:57:47 -0700 | [diff] [blame] | 79 | } |
nathanmittler | c0a0681 | 2014-07-11 14:57:47 -0700 | [diff] [blame] | 80 | |
lryan | 669724a | 2014-11-10 10:21:45 -0800 | [diff] [blame] | 81 | @Override |
nathanmittler | 11c363a | 2015-01-09 11:22:19 -0800 | [diff] [blame] | 82 | public void deliveryStalled() { |
| 83 | inboundDeliveryPaused(); |
| 84 | } |
| 85 | |
| 86 | @Override |
lryan | 669724a | 2014-11-10 10:21:45 -0800 | [diff] [blame] | 87 | public void endOfStream() { |
| 88 | remoteEndClosed(); |
| 89 | } |
| 90 | }; |
simonma | f589a4a | 2015-01-15 16:23:32 -0800 | [diff] [blame] | 91 | MessageFramer.Sink<ByteBuffer> outboundFrameHandler = new MessageFramer.Sink<ByteBuffer>() { |
lryan | 669724a | 2014-11-10 10:21:45 -0800 | [diff] [blame] | 92 | @Override |
| 93 | public void deliverFrame(ByteBuffer frame, boolean endOfStream) { |
| 94 | internalSendFrame(frame, endOfStream); |
| 95 | } |
| 96 | }; |
nathanmittler | 5d953e8 | 2014-11-18 13:12:42 -0800 | [diff] [blame] | 97 | |
simonma | f589a4a | 2015-01-15 16:23:32 -0800 | [diff] [blame] | 98 | framer = new MessageFramer(outboundFrameHandler, 4096); |
nmittler | de3a131 | 2015-01-16 11:54:24 -0800 | [diff] [blame^] | 99 | this.deframer = new MessageDeframer(inboundMessageHandler); |
nathanmittler | c0a0681 | 2014-07-11 14:57:47 -0700 | [diff] [blame] | 100 | } |
| 101 | |
nathanmittler | c0a0681 | 2014-07-11 14:57:47 -0700 | [diff] [blame] | 102 | /** |
lryan | 669724a | 2014-11-10 10:21:45 -0800 | [diff] [blame] | 103 | * Returns the internal id for this stream. Note that Id can be {@code null} for client streams |
| 104 | * as the transport may defer creating the stream to the remote side until is has payload or |
| 105 | * metadata to send. |
| 106 | */ |
| 107 | @Nullable |
| 108 | public IdT id() { |
| 109 | return id; |
| 110 | } |
| 111 | |
| 112 | /** |
| 113 | * Set the internal id for this stream |
| 114 | */ |
| 115 | public void id(IdT id) { |
| 116 | Preconditions.checkState(id != null, "Can only set id once"); |
| 117 | this.id = id; |
| 118 | } |
| 119 | |
nathanmittler | c0a0681 | 2014-07-11 14:57:47 -0700 | [diff] [blame] | 120 | @Override |
lryan | 28497e3 | 2014-10-17 16:14:38 -0700 | [diff] [blame] | 121 | public void writeMessage(InputStream message, int length, @Nullable Runnable accepted) { |
nathanmittler | c0a0681 | 2014-07-11 14:57:47 -0700 | [diff] [blame] | 122 | Preconditions.checkNotNull(message, "message"); |
| 123 | Preconditions.checkArgument(length >= 0, "length must be >= 0"); |
| 124 | outboundPhase(Phase.MESSAGE); |
lryan | c5e70c2 | 2014-11-24 16:41:02 -0800 | [diff] [blame] | 125 | if (!framer.isClosed()) { |
| 126 | framer.writePayload(message, length); |
nathanmittler | c0a0681 | 2014-07-11 14:57:47 -0700 | [diff] [blame] | 127 | } |
| 128 | |
ejona | 4de2026 | 2015-01-16 16:17:35 -0800 | [diff] [blame] | 129 | // TODO(nathanmittler): add flow control. |
nathanmittler | c0a0681 | 2014-07-11 14:57:47 -0700 | [diff] [blame] | 130 | if (accepted != null) { |
| 131 | accepted.run(); |
| 132 | } |
| 133 | } |
| 134 | |
| 135 | @Override |
| 136 | public final void flush() { |
lryan | c5e70c2 | 2014-11-24 16:41:02 -0800 | [diff] [blame] | 137 | if (!framer.isClosed()) { |
| 138 | framer.flush(); |
nathanmittler | c0a0681 | 2014-07-11 14:57:47 -0700 | [diff] [blame] | 139 | } |
| 140 | } |
| 141 | |
| 142 | /** |
lryan | c5e70c2 | 2014-11-24 16:41:02 -0800 | [diff] [blame] | 143 | * Closes the underlying framer. |
| 144 | * |
| 145 | * <p>No-op if the framer has already been closed. |
lryan | c5e70c2 | 2014-11-24 16:41:02 -0800 | [diff] [blame] | 146 | */ |
ejona | c0f4192 | 2014-12-23 12:16:17 -0800 | [diff] [blame] | 147 | final void closeFramer() { |
lryan | c5e70c2 | 2014-11-24 16:41:02 -0800 | [diff] [blame] | 148 | if (!framer.isClosed()) { |
lryan | c5e70c2 | 2014-11-24 16:41:02 -0800 | [diff] [blame] | 149 | framer.close(); |
| 150 | } |
| 151 | } |
| 152 | |
| 153 | /** |
| 154 | * Free any resources associated with this stream. Subclass implementations must call this |
| 155 | * version. |
| 156 | * <p> |
| 157 | * NOTE. Can be called by both the transport thread and the application thread. Transport |
| 158 | * threads need to dispose when the remote side has terminated the stream. Application threads |
| 159 | * will dispose when the application decides to close the stream as part of normal processing. |
| 160 | * </p> |
| 161 | */ |
| 162 | public void dispose() { |
| 163 | framer.dispose(); |
| 164 | } |
| 165 | |
| 166 | /** |
zhangkun | 048649e | 2014-08-28 15:52:03 -0700 | [diff] [blame] | 167 | * Sends an outbound frame to the remote end point. |
nathanmittler | c0a0681 | 2014-07-11 14:57:47 -0700 | [diff] [blame] | 168 | * |
| 169 | * @param frame a buffer containing the chunk of data to be sent. |
| 170 | * @param endOfStream if {@code true} indicates that no more data will be sent on the stream by |
| 171 | * this endpoint. |
| 172 | */ |
ejona | 9d50299 | 2014-09-22 12:23:19 -0700 | [diff] [blame] | 173 | protected abstract void internalSendFrame(ByteBuffer frame, boolean endOfStream); |
nathanmittler | c0a0681 | 2014-07-11 14:57:47 -0700 | [diff] [blame] | 174 | |
ejona | 9d50299 | 2014-09-22 12:23:19 -0700 | [diff] [blame] | 175 | /** A message was deframed. */ |
nmittler | de3a131 | 2015-01-16 11:54:24 -0800 | [diff] [blame^] | 176 | protected abstract void receiveMessage(InputStream is, int length); |
ejona | 9d50299 | 2014-09-22 12:23:19 -0700 | [diff] [blame] | 177 | |
nathanmittler | 11c363a | 2015-01-09 11:22:19 -0800 | [diff] [blame] | 178 | /** Deframer has no pending deliveries. */ |
| 179 | protected abstract void inboundDeliveryPaused(); |
| 180 | |
ejona | 9d50299 | 2014-09-22 12:23:19 -0700 | [diff] [blame] | 181 | /** Deframer reached end of stream. */ |
| 182 | protected abstract void remoteEndClosed(); |
zhangkun | 048649e | 2014-08-28 15:52:03 -0700 | [diff] [blame] | 183 | |
| 184 | /** |
nathanmittler | 5d953e8 | 2014-11-18 13:12:42 -0800 | [diff] [blame] | 185 | * Returns the given number of processed bytes back to inbound flow control to enable receipt of |
| 186 | * more data. |
nathanmittler | 43d2fcc | 2014-08-29 21:58:54 -0700 | [diff] [blame] | 187 | */ |
nathanmittler | 5d953e8 | 2014-11-18 13:12:42 -0800 | [diff] [blame] | 188 | protected abstract void returnProcessedBytes(int processedBytes); |
| 189 | |
| 190 | /** |
| 191 | * Called when a {@link #deframe(Buffer, boolean)} operation failed. |
| 192 | */ |
| 193 | protected abstract void deframeFailed(Throwable cause); |
| 194 | |
| 195 | /** |
| 196 | * Called to parse a received frame and attempt delivery of any completed |
nmittler | de3a131 | 2015-01-16 11:54:24 -0800 | [diff] [blame^] | 197 | * messages. Must be called from the transport thread. |
nathanmittler | 5d953e8 | 2014-11-18 13:12:42 -0800 | [diff] [blame] | 198 | */ |
| 199 | protected final void deframe(Buffer frame, boolean endOfStream) { |
nmittler | de3a131 | 2015-01-16 11:54:24 -0800 | [diff] [blame^] | 200 | try { |
| 201 | deframer.deframe(frame, endOfStream); |
| 202 | } catch (Throwable t) { |
| 203 | deframeFailed(t); |
nathanmittler | 5d953e8 | 2014-11-18 13:12:42 -0800 | [diff] [blame] | 204 | } |
| 205 | } |
| 206 | |
| 207 | /** |
nmittler | de3a131 | 2015-01-16 11:54:24 -0800 | [diff] [blame^] | 208 | * Called to request the given number of messages from the deframer. Must be called |
| 209 | * from the transport thread. |
nathanmittler | 5d953e8 | 2014-11-18 13:12:42 -0800 | [diff] [blame] | 210 | */ |
nmittler | de3a131 | 2015-01-16 11:54:24 -0800 | [diff] [blame^] | 211 | protected final void requestMessagesFromDeframer(int numMessages) { |
| 212 | try { |
| 213 | deframer.request(numMessages); |
| 214 | } catch (Throwable t) { |
| 215 | deframeFailed(t); |
nathanmittler | 5d953e8 | 2014-11-18 13:12:42 -0800 | [diff] [blame] | 216 | } |
| 217 | } |
nathanmittler | 43d2fcc | 2014-08-29 21:58:54 -0700 | [diff] [blame] | 218 | |
lryan | c5e70c2 | 2014-11-24 16:41:02 -0800 | [diff] [blame] | 219 | final Phase inboundPhase() { |
| 220 | return inboundPhase; |
| 221 | } |
| 222 | |
nathanmittler | 43d2fcc | 2014-08-29 21:58:54 -0700 | [diff] [blame] | 223 | /** |
lryan | c5e70c2 | 2014-11-24 16:41:02 -0800 | [diff] [blame] | 224 | * Transitions the inbound phase to the given phase and returns the previous phase. |
| 225 | * If the transition is disallowed, throws an {@link IllegalStateException}. |
nathanmittler | c0a0681 | 2014-07-11 14:57:47 -0700 | [diff] [blame] | 226 | */ |
lryan | c5e70c2 | 2014-11-24 16:41:02 -0800 | [diff] [blame] | 227 | final Phase inboundPhase(Phase nextPhase) { |
| 228 | Phase tmp = inboundPhase; |
nathanmittler | c0a0681 | 2014-07-11 14:57:47 -0700 | [diff] [blame] | 229 | inboundPhase = verifyNextPhase(inboundPhase, nextPhase); |
lryan | c5e70c2 | 2014-11-24 16:41:02 -0800 | [diff] [blame] | 230 | return tmp; |
| 231 | } |
| 232 | |
| 233 | final Phase outboundPhase() { |
| 234 | return outboundPhase; |
nathanmittler | c0a0681 | 2014-07-11 14:57:47 -0700 | [diff] [blame] | 235 | } |
| 236 | |
| 237 | /** |
lryan | c5e70c2 | 2014-11-24 16:41:02 -0800 | [diff] [blame] | 238 | * Transitions the outbound phase to the given phase and returns the previous phase. |
| 239 | * If the transition is disallowed, throws an {@link IllegalStateException}. |
nathanmittler | c0a0681 | 2014-07-11 14:57:47 -0700 | [diff] [blame] | 240 | */ |
lryan | c5e70c2 | 2014-11-24 16:41:02 -0800 | [diff] [blame] | 241 | final Phase outboundPhase(Phase nextPhase) { |
| 242 | Phase tmp = outboundPhase; |
nathanmittler | c0a0681 | 2014-07-11 14:57:47 -0700 | [diff] [blame] | 243 | outboundPhase = verifyNextPhase(outboundPhase, nextPhase); |
lryan | c5e70c2 | 2014-11-24 16:41:02 -0800 | [diff] [blame] | 244 | return tmp; |
zhangkun | 048649e | 2014-08-28 15:52:03 -0700 | [diff] [blame] | 245 | } |
| 246 | |
nathanmittler | c0a0681 | 2014-07-11 14:57:47 -0700 | [diff] [blame] | 247 | private Phase verifyNextPhase(Phase currentPhase, Phase nextPhase) { |
lryan | c5e70c2 | 2014-11-24 16:41:02 -0800 | [diff] [blame] | 248 | if (nextPhase.ordinal() < currentPhase.ordinal()) { |
nathanmittler | c0a0681 | 2014-07-11 14:57:47 -0700 | [diff] [blame] | 249 | throw new IllegalStateException( |
| 250 | String.format("Cannot transition phase from %s to %s", currentPhase, nextPhase)); |
| 251 | } |
| 252 | return nextPhase; |
| 253 | } |
| 254 | |
| 255 | /** |
lryan | c5e70c2 | 2014-11-24 16:41:02 -0800 | [diff] [blame] | 256 | * Can the stream receive data from its remote peer. |
| 257 | */ |
| 258 | public boolean canReceive() { |
| 259 | return inboundPhase() != Phase.STATUS; |
| 260 | } |
| 261 | |
| 262 | /** |
| 263 | * Can the stream send data to its remote peer. |
| 264 | */ |
| 265 | public boolean canSend() { |
| 266 | return outboundPhase() != Phase.STATUS; |
| 267 | } |
| 268 | |
| 269 | /** |
| 270 | * Is the stream fully closed. Note that this method is not thread-safe as inboundPhase and |
| 271 | * outboundPhase are mutated in different threads. Tests must account for thread coordination |
| 272 | * when calling. |
| 273 | */ |
| 274 | @VisibleForTesting |
| 275 | public boolean isClosed() { |
| 276 | return inboundPhase() == Phase.STATUS && outboundPhase() == Phase.STATUS; |
| 277 | } |
| 278 | |
nathanmittler | 11c363a | 2015-01-09 11:22:19 -0800 | [diff] [blame] | 279 | @Override |
lryan | c5e70c2 | 2014-11-24 16:41:02 -0800 | [diff] [blame] | 280 | public String toString() { |
| 281 | return toStringHelper().toString(); |
| 282 | } |
| 283 | |
| 284 | protected MoreObjects.ToStringHelper toStringHelper() { |
| 285 | return MoreObjects.toStringHelper(this) |
| 286 | .add("id", id()) |
| 287 | .add("inboundPhase", inboundPhase().name()) |
| 288 | .add("outboundPhase", outboundPhase().name()); |
| 289 | |
| 290 | } |
nathanmittler | c0a0681 | 2014-07-11 14:57:47 -0700 | [diff] [blame] | 291 | } |