lryan | 56e307f | 2014-12-05 13:25:08 -0800 | [diff] [blame] | 1 | /* |
| 2 | * Copyright 2014, Google Inc. All rights reserved. |
| 3 | * |
| 4 | * Redistribution and use in source and binary forms, with or without |
| 5 | * modification, are permitted provided that the following conditions are |
| 6 | * met: |
| 7 | * |
| 8 | * * Redistributions of source code must retain the above copyright |
| 9 | * notice, this list of conditions and the following disclaimer. |
| 10 | * * Redistributions in binary form must reproduce the above |
| 11 | * copyright notice, this list of conditions and the following disclaimer |
| 12 | * in the documentation and/or other materials provided with the |
| 13 | * distribution. |
| 14 | * |
| 15 | * * Neither the name of Google Inc. nor the names of its |
| 16 | * contributors may be used to endorse or promote products derived from |
| 17 | * this software without specific prior written permission. |
| 18 | * |
| 19 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| 20 | * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| 21 | * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| 22 | * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| 23 | * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| 24 | * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| 25 | * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| 26 | * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| 27 | * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| 28 | * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| 29 | * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| 30 | */ |
| 31 | |
nathanmittler | 29cbef1 | 2014-10-27 11:33:19 -0700 | [diff] [blame] | 32 | package com.google.net.stubby.transport; |
zhangkun | 048649e | 2014-08-28 15:52:03 -0700 | [diff] [blame] | 33 | |
zhangkun | 048649e | 2014-08-28 15:52:03 -0700 | [diff] [blame] | 34 | import com.google.common.base.Preconditions; |
lryan | e4bd1c7 | 2014-09-08 14:03:35 -0700 | [diff] [blame] | 35 | import com.google.net.stubby.Metadata; |
zhangkun | 048649e | 2014-08-28 15:52:03 -0700 | [diff] [blame] | 36 | import com.google.net.stubby.Status; |
zhangkun | 048649e | 2014-08-28 15:52:03 -0700 | [diff] [blame] | 37 | |
ejona | 9d50299 | 2014-09-22 12:23:19 -0700 | [diff] [blame] | 38 | import java.io.InputStream; |
| 39 | import java.nio.ByteBuffer; |
ejona | 0f56c47 | 2014-11-13 15:16:51 -0800 | [diff] [blame] | 40 | import java.util.logging.Level; |
| 41 | import java.util.logging.Logger; |
ejona | 9d50299 | 2014-09-22 12:23:19 -0700 | [diff] [blame] | 42 | |
lryan | 28497e3 | 2014-10-17 16:14:38 -0700 | [diff] [blame] | 43 | import javax.annotation.Nullable; |
ejona | ef2129c | 2014-09-18 10:12:52 -0700 | [diff] [blame] | 44 | |
zhangkun | 048649e | 2014-08-28 15:52:03 -0700 | [diff] [blame] | 45 | /** |
| 46 | * Abstract base class for {@link ServerStream} implementations. |
| 47 | */ |
lryan | 669724a | 2014-11-10 10:21:45 -0800 | [diff] [blame] | 48 | public abstract class AbstractServerStream<IdT> extends AbstractStream<IdT> |
| 49 | implements ServerStream { |
ejona | 0f56c47 | 2014-11-13 15:16:51 -0800 | [diff] [blame] | 50 | private static final Logger log = Logger.getLogger(AbstractServerStream.class.getName()); |
zhangkun | 048649e | 2014-08-28 15:52:03 -0700 | [diff] [blame] | 51 | |
lryan | c5e70c2 | 2014-11-24 16:41:02 -0800 | [diff] [blame] | 52 | /** Whether listener.closed() has been called. */ |
| 53 | private boolean listenerClosed; |
ejona | ef2129c | 2014-09-18 10:12:52 -0700 | [diff] [blame] | 54 | private ServerStreamListener listener; |
zhangkun | 048649e | 2014-08-28 15:52:03 -0700 | [diff] [blame] | 55 | |
lryan | 28497e3 | 2014-10-17 16:14:38 -0700 | [diff] [blame] | 56 | private boolean headersSent = false; |
lryan | 28497e3 | 2014-10-17 16:14:38 -0700 | [diff] [blame] | 57 | /** |
| 58 | * Whether the stream was closed gracefully by the application (vs. a transport-level failure). |
| 59 | */ |
ejona | 6fc356b | 2014-09-22 12:49:20 -0700 | [diff] [blame] | 60 | private boolean gracefulClose; |
ejona | 9d50299 | 2014-09-22 12:23:19 -0700 | [diff] [blame] | 61 | /** Saved trailers from close() that need to be sent once the framer has sent all messages. */ |
| 62 | private Metadata.Trailers stashedTrailers; |
zhangkun | 048649e | 2014-08-28 15:52:03 -0700 | [diff] [blame] | 63 | |
nmittler | de3a131 | 2015-01-16 11:54:24 -0800 | [diff] [blame^] | 64 | protected AbstractServerStream(IdT id) { |
lryan | 669724a | 2014-11-10 10:21:45 -0800 | [diff] [blame] | 65 | id(id); |
| 66 | } |
| 67 | |
ejona | ef2129c | 2014-09-18 10:12:52 -0700 | [diff] [blame] | 68 | public final void setListener(ServerStreamListener listener) { |
zhangkun | 048649e | 2014-08-28 15:52:03 -0700 | [diff] [blame] | 69 | this.listener = Preconditions.checkNotNull(listener, "listener"); |
| 70 | } |
| 71 | |
| 72 | @Override |
nmittler | de3a131 | 2015-01-16 11:54:24 -0800 | [diff] [blame^] | 73 | protected void receiveMessage(InputStream is, int length) { |
ejona | 9d50299 | 2014-09-22 12:23:19 -0700 | [diff] [blame] | 74 | inboundPhase(Phase.MESSAGE); |
nmittler | de3a131 | 2015-01-16 11:54:24 -0800 | [diff] [blame^] | 75 | listener.messageRead(is, length); |
ejona | 9d50299 | 2014-09-22 12:23:19 -0700 | [diff] [blame] | 76 | } |
| 77 | |
ejona | 9d50299 | 2014-09-22 12:23:19 -0700 | [diff] [blame] | 78 | @Override |
lryan | 28497e3 | 2014-10-17 16:14:38 -0700 | [diff] [blame] | 79 | public void writeHeaders(Metadata.Headers headers) { |
| 80 | Preconditions.checkNotNull(headers, "headers"); |
| 81 | outboundPhase(Phase.HEADERS); |
| 82 | headersSent = true; |
| 83 | internalSendHeaders(headers); |
| 84 | outboundPhase(Phase.MESSAGE); |
| 85 | } |
| 86 | |
| 87 | @Override |
| 88 | public final void writeMessage(InputStream message, int length, @Nullable Runnable accepted) { |
| 89 | if (!headersSent) { |
| 90 | writeHeaders(new Metadata.Headers()); |
| 91 | headersSent = true; |
| 92 | } |
| 93 | super.writeMessage(message, length, accepted); |
| 94 | } |
| 95 | |
| 96 | @Override |
lryan | e4bd1c7 | 2014-09-08 14:03:35 -0700 | [diff] [blame] | 97 | public final void close(Status status, Metadata.Trailers trailers) { |
ejona | ef2129c | 2014-09-18 10:12:52 -0700 | [diff] [blame] | 98 | Preconditions.checkNotNull(status, "status"); |
| 99 | Preconditions.checkNotNull(trailers, "trailers"); |
lryan | c5e70c2 | 2014-11-24 16:41:02 -0800 | [diff] [blame] | 100 | if (outboundPhase(Phase.STATUS) != Phase.STATUS) { |
| 101 | gracefulClose = true; |
| 102 | this.stashedTrailers = trailers; |
| 103 | writeStatusToTrailers(status); |
ejona | c0f4192 | 2014-12-23 12:16:17 -0800 | [diff] [blame] | 104 | closeFramer(); |
zhangkun | 048649e | 2014-08-28 15:52:03 -0700 | [diff] [blame] | 105 | } |
zhangkun | 048649e | 2014-08-28 15:52:03 -0700 | [diff] [blame] | 106 | } |
| 107 | |
lryan | 1f2a186 | 2014-11-12 14:55:51 -0800 | [diff] [blame] | 108 | private void writeStatusToTrailers(Status status) { |
| 109 | stashedTrailers.removeAll(Status.CODE_KEY); |
| 110 | stashedTrailers.removeAll(Status.MESSAGE_KEY); |
| 111 | stashedTrailers.put(Status.CODE_KEY, status); |
| 112 | if (status.getDescription() != null) { |
| 113 | stashedTrailers.put(Status.MESSAGE_KEY, status.getDescription()); |
| 114 | } |
| 115 | } |
| 116 | |
lryan | 669724a | 2014-11-10 10:21:45 -0800 | [diff] [blame] | 117 | /** |
| 118 | * Called in the network thread to process the content of an inbound DATA frame from the client. |
| 119 | * |
| 120 | * @param frame the inbound HTTP/2 DATA frame. If this buffer is not used immediately, it must |
| 121 | * be retained. |
| 122 | */ |
| 123 | public void inboundDataReceived(Buffer frame, boolean endOfStream) { |
lryan | c5e70c2 | 2014-11-24 16:41:02 -0800 | [diff] [blame] | 124 | if (inboundPhase() == Phase.STATUS) { |
lryan | 669724a | 2014-11-10 10:21:45 -0800 | [diff] [blame] | 125 | frame.close(); |
| 126 | return; |
| 127 | } |
ejona | 4de2026 | 2015-01-16 16:17:35 -0800 | [diff] [blame] | 128 | // TODO(zhangkun): It sounds sub-optimal to deframe in the network thread. That means |
lryan | 669724a | 2014-11-10 10:21:45 -0800 | [diff] [blame] | 129 | // decompression is serialized. |
nathanmittler | 5d953e8 | 2014-11-18 13:12:42 -0800 | [diff] [blame] | 130 | deframe(frame, endOfStream); |
| 131 | } |
| 132 | |
| 133 | @Override |
| 134 | protected final void deframeFailed(Throwable cause) { |
| 135 | log.log(Level.WARNING, "Exception processing message", cause); |
| 136 | abortStream(Status.fromThrowable(cause), true); |
lryan | 669724a | 2014-11-10 10:21:45 -0800 | [diff] [blame] | 137 | } |
| 138 | |
ejona | 9d50299 | 2014-09-22 12:23:19 -0700 | [diff] [blame] | 139 | @Override |
| 140 | protected final void internalSendFrame(ByteBuffer frame, boolean endOfStream) { |
ejona | c0f4192 | 2014-12-23 12:16:17 -0800 | [diff] [blame] | 141 | if (frame.hasRemaining()) { |
| 142 | sendFrame(frame, false); |
| 143 | } |
| 144 | if (endOfStream) { |
| 145 | sendTrailers(stashedTrailers, headersSent); |
| 146 | headersSent = true; |
| 147 | stashedTrailers = null; |
ejona | 9d50299 | 2014-09-22 12:23:19 -0700 | [diff] [blame] | 148 | } |
| 149 | } |
| 150 | |
| 151 | /** |
lryan | 28497e3 | 2014-10-17 16:14:38 -0700 | [diff] [blame] | 152 | * Sends response headers to the remote end points. |
| 153 | * @param headers to be sent to client. |
| 154 | */ |
| 155 | protected abstract void internalSendHeaders(Metadata.Headers headers); |
| 156 | |
| 157 | /** |
ejona | 9d50299 | 2014-09-22 12:23:19 -0700 | [diff] [blame] | 158 | * Sends an outbound frame to the remote end point. |
| 159 | * |
| 160 | * @param frame a buffer containing the chunk of data to be sent. |
| 161 | * @param endOfStream if {@code true} indicates that no more data will be sent on the stream by |
| 162 | * this endpoint. |
| 163 | */ |
| 164 | protected abstract void sendFrame(ByteBuffer frame, boolean endOfStream); |
| 165 | |
| 166 | /** |
| 167 | * Sends trailers to the remote end point. This call implies end of stream. |
| 168 | * |
| 169 | * @param trailers metadata to be sent to end point |
lryan | 28497e3 | 2014-10-17 16:14:38 -0700 | [diff] [blame] | 170 | * @param headersSent true if response headers have already been sent. |
ejona | 9d50299 | 2014-09-22 12:23:19 -0700 | [diff] [blame] | 171 | */ |
lryan | 28497e3 | 2014-10-17 16:14:38 -0700 | [diff] [blame] | 172 | protected abstract void sendTrailers(Metadata.Trailers trailers, boolean headersSent); |
ejona | 9d50299 | 2014-09-22 12:23:19 -0700 | [diff] [blame] | 173 | |
ejona | ef2129c | 2014-09-18 10:12:52 -0700 | [diff] [blame] | 174 | /** |
| 175 | * The Stream is considered completely closed and there is no further opportunity for error. It |
| 176 | * calls the listener's {@code closed()} if it was not already done by {@link #abortStream}. Note |
| 177 | * that it is expected that either {@code closed()} or {@code abortStream()} was previously |
ejona | 6fc356b | 2014-09-22 12:49:20 -0700 | [diff] [blame] | 178 | * called, since {@code closed()} is required for a normal stream closure and {@code |
| 179 | * abortStream()} for abnormal. |
ejona | ef2129c | 2014-09-18 10:12:52 -0700 | [diff] [blame] | 180 | */ |
| 181 | public void complete() { |
lryan | c5e70c2 | 2014-11-24 16:41:02 -0800 | [diff] [blame] | 182 | listenerClosed = true; |
ejona | 6fc356b | 2014-09-22 12:49:20 -0700 | [diff] [blame] | 183 | if (!gracefulClose) { |
lryan | 71e4a92 | 2014-09-25 18:25:54 -0700 | [diff] [blame] | 184 | listener.closed(Status.INTERNAL.withDescription("successful complete() without close()")); |
ejona | ef2129c | 2014-09-18 10:12:52 -0700 | [diff] [blame] | 185 | throw new IllegalStateException("successful complete() without close()"); |
| 186 | } |
ejona | 6fc356b | 2014-09-22 12:49:20 -0700 | [diff] [blame] | 187 | listener.closed(Status.OK); |
ejona | ef2129c | 2014-09-18 10:12:52 -0700 | [diff] [blame] | 188 | } |
| 189 | |
zhangkun | 048649e | 2014-08-28 15:52:03 -0700 | [diff] [blame] | 190 | /** |
| 191 | * Called when the remote end half-closes the stream. |
| 192 | */ |
ejona | 9d50299 | 2014-09-22 12:23:19 -0700 | [diff] [blame] | 193 | @Override |
| 194 | protected final void remoteEndClosed() { |
lryan | c5e70c2 | 2014-11-24 16:41:02 -0800 | [diff] [blame] | 195 | if (inboundPhase(Phase.STATUS) != Phase.STATUS) { |
| 196 | listener.halfClosed(); |
zhangkun | 048649e | 2014-08-28 15:52:03 -0700 | [diff] [blame] | 197 | } |
zhangkun | 048649e | 2014-08-28 15:52:03 -0700 | [diff] [blame] | 198 | } |
| 199 | |
| 200 | /** |
| 201 | * Aborts the stream with an error status, cleans up resources and notifies the listener if |
| 202 | * necessary. |
| 203 | * |
lryan | e4bd1c7 | 2014-09-08 14:03:35 -0700 | [diff] [blame] | 204 | * <p>Unlike {@link #close(Status, Metadata.Trailers)}, this method is only called from the |
ejona | ef2129c | 2014-09-18 10:12:52 -0700 | [diff] [blame] | 205 | * transport. The transport should use this method instead of {@code close(Status)} for internal |
| 206 | * errors to prevent exposing unexpected states and exceptions to the application. |
zhangkun | 048649e | 2014-08-28 15:52:03 -0700 | [diff] [blame] | 207 | * |
| 208 | * @param status the error status. Must not be Status.OK. |
| 209 | * @param notifyClient true if the stream is still writable and you want to notify the client |
| 210 | * about stream closure and send the status |
| 211 | */ |
| 212 | public final void abortStream(Status status, boolean notifyClient) { |
ejona | 4de2026 | 2015-01-16 16:17:35 -0800 | [diff] [blame] | 213 | // TODO(lryan): Investigate whether we can remove the notification to the client |
lryan | c5e70c2 | 2014-11-24 16:41:02 -0800 | [diff] [blame] | 214 | // and rely on a transport layer stream reset instead. |
zhangkun | 048649e | 2014-08-28 15:52:03 -0700 | [diff] [blame] | 215 | Preconditions.checkArgument(!status.isOk(), "status must not be OK"); |
lryan | c5e70c2 | 2014-11-24 16:41:02 -0800 | [diff] [blame] | 216 | if (!listenerClosed) { |
ejona | ef2129c | 2014-09-18 10:12:52 -0700 | [diff] [blame] | 217 | listenerClosed = true; |
lryan | c5e70c2 | 2014-11-24 16:41:02 -0800 | [diff] [blame] | 218 | listener.closed(status); |
zhangkun | 048649e | 2014-08-28 15:52:03 -0700 | [diff] [blame] | 219 | } |
lryan | c5e70c2 | 2014-11-24 16:41:02 -0800 | [diff] [blame] | 220 | if (notifyClient) { |
ejona | 4de2026 | 2015-01-16 16:17:35 -0800 | [diff] [blame] | 221 | // TODO(lryan): Remove |
lryan | c5e70c2 | 2014-11-24 16:41:02 -0800 | [diff] [blame] | 222 | if (stashedTrailers == null) { |
| 223 | stashedTrailers = new Metadata.Trailers(); |
ejona | ef2129c | 2014-09-18 10:12:52 -0700 | [diff] [blame] | 224 | } |
lryan | c5e70c2 | 2014-11-24 16:41:02 -0800 | [diff] [blame] | 225 | writeStatusToTrailers(status); |
ejona | c0f4192 | 2014-12-23 12:16:17 -0800 | [diff] [blame] | 226 | closeFramer(); |
lryan | c5e70c2 | 2014-11-24 16:41:02 -0800 | [diff] [blame] | 227 | } else { |
ejona | ef2129c | 2014-09-18 10:12:52 -0700 | [diff] [blame] | 228 | dispose(); |
zhangkun | 048649e | 2014-08-28 15:52:03 -0700 | [diff] [blame] | 229 | } |
zhangkun | 048649e | 2014-08-28 15:52:03 -0700 | [diff] [blame] | 230 | } |
lryan | c5e70c2 | 2014-11-24 16:41:02 -0800 | [diff] [blame] | 231 | |
| 232 | @Override |
| 233 | public boolean isClosed() { |
| 234 | return super.isClosed() || listenerClosed; |
| 235 | } |
zhangkun | 048649e | 2014-08-28 15:52:03 -0700 | [diff] [blame] | 236 | } |