blob: ab1c9a639dfed773d3ec5f3d87e78882a2edc11e [file] [log] [blame]
lryan56e307f2014-12-05 13:25:08 -08001/*
2 * Copyright 2014, Google Inc. All rights reserved.
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions are
6 * met:
7 *
8 * * Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above
11 * copyright notice, this list of conditions and the following disclaimer
12 * in the documentation and/or other materials provided with the
13 * distribution.
14 *
15 * * Neither the name of Google Inc. nor the names of its
16 * contributors may be used to endorse or promote products derived from
17 * this software without specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
20 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
21 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
22 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
23 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
26 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
27 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
28 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 */
31
nathanmittler29cbef12014-10-27 11:33:19 -070032package com.google.net.stubby.transport;
zhangkun048649e2014-08-28 15:52:03 -070033
zhangkun048649e2014-08-28 15:52:03 -070034import com.google.common.base.Preconditions;
lryane4bd1c72014-09-08 14:03:35 -070035import com.google.net.stubby.Metadata;
zhangkun048649e2014-08-28 15:52:03 -070036import com.google.net.stubby.Status;
zhangkun048649e2014-08-28 15:52:03 -070037
ejona9d502992014-09-22 12:23:19 -070038import java.io.InputStream;
39import java.nio.ByteBuffer;
ejona0f56c472014-11-13 15:16:51 -080040import java.util.logging.Level;
41import java.util.logging.Logger;
ejona9d502992014-09-22 12:23:19 -070042
lryan28497e32014-10-17 16:14:38 -070043import javax.annotation.Nullable;
ejonaef2129c2014-09-18 10:12:52 -070044
zhangkun048649e2014-08-28 15:52:03 -070045/**
46 * Abstract base class for {@link ServerStream} implementations.
47 */
lryan669724a2014-11-10 10:21:45 -080048public abstract class AbstractServerStream<IdT> extends AbstractStream<IdT>
49 implements ServerStream {
ejona0f56c472014-11-13 15:16:51 -080050 private static final Logger log = Logger.getLogger(AbstractServerStream.class.getName());
zhangkun048649e2014-08-28 15:52:03 -070051
lryanc5e70c22014-11-24 16:41:02 -080052 /** Whether listener.closed() has been called. */
53 private boolean listenerClosed;
ejonaef2129c2014-09-18 10:12:52 -070054 private ServerStreamListener listener;
zhangkun048649e2014-08-28 15:52:03 -070055
lryan28497e32014-10-17 16:14:38 -070056 private boolean headersSent = false;
lryan28497e32014-10-17 16:14:38 -070057 /**
58 * Whether the stream was closed gracefully by the application (vs. a transport-level failure).
59 */
ejona6fc356b2014-09-22 12:49:20 -070060 private boolean gracefulClose;
ejona9d502992014-09-22 12:23:19 -070061 /** Saved trailers from close() that need to be sent once the framer has sent all messages. */
62 private Metadata.Trailers stashedTrailers;
zhangkun048649e2014-08-28 15:52:03 -070063
nmittlerde3a1312015-01-16 11:54:24 -080064 protected AbstractServerStream(IdT id) {
lryan669724a2014-11-10 10:21:45 -080065 id(id);
66 }
67
ejonaef2129c2014-09-18 10:12:52 -070068 public final void setListener(ServerStreamListener listener) {
zhangkun048649e2014-08-28 15:52:03 -070069 this.listener = Preconditions.checkNotNull(listener, "listener");
70 }
71
72 @Override
nmittlerde3a1312015-01-16 11:54:24 -080073 protected void receiveMessage(InputStream is, int length) {
ejona9d502992014-09-22 12:23:19 -070074 inboundPhase(Phase.MESSAGE);
nmittlerde3a1312015-01-16 11:54:24 -080075 listener.messageRead(is, length);
ejona9d502992014-09-22 12:23:19 -070076 }
77
ejona9d502992014-09-22 12:23:19 -070078 @Override
lryan28497e32014-10-17 16:14:38 -070079 public void writeHeaders(Metadata.Headers headers) {
80 Preconditions.checkNotNull(headers, "headers");
81 outboundPhase(Phase.HEADERS);
82 headersSent = true;
83 internalSendHeaders(headers);
84 outboundPhase(Phase.MESSAGE);
85 }
86
87 @Override
88 public final void writeMessage(InputStream message, int length, @Nullable Runnable accepted) {
89 if (!headersSent) {
90 writeHeaders(new Metadata.Headers());
91 headersSent = true;
92 }
93 super.writeMessage(message, length, accepted);
94 }
95
96 @Override
lryane4bd1c72014-09-08 14:03:35 -070097 public final void close(Status status, Metadata.Trailers trailers) {
ejonaef2129c2014-09-18 10:12:52 -070098 Preconditions.checkNotNull(status, "status");
99 Preconditions.checkNotNull(trailers, "trailers");
lryanc5e70c22014-11-24 16:41:02 -0800100 if (outboundPhase(Phase.STATUS) != Phase.STATUS) {
101 gracefulClose = true;
102 this.stashedTrailers = trailers;
103 writeStatusToTrailers(status);
ejonac0f41922014-12-23 12:16:17 -0800104 closeFramer();
zhangkun048649e2014-08-28 15:52:03 -0700105 }
zhangkun048649e2014-08-28 15:52:03 -0700106 }
107
lryan1f2a1862014-11-12 14:55:51 -0800108 private void writeStatusToTrailers(Status status) {
109 stashedTrailers.removeAll(Status.CODE_KEY);
110 stashedTrailers.removeAll(Status.MESSAGE_KEY);
111 stashedTrailers.put(Status.CODE_KEY, status);
112 if (status.getDescription() != null) {
113 stashedTrailers.put(Status.MESSAGE_KEY, status.getDescription());
114 }
115 }
116
lryan669724a2014-11-10 10:21:45 -0800117 /**
118 * Called in the network thread to process the content of an inbound DATA frame from the client.
119 *
120 * @param frame the inbound HTTP/2 DATA frame. If this buffer is not used immediately, it must
121 * be retained.
122 */
123 public void inboundDataReceived(Buffer frame, boolean endOfStream) {
lryanc5e70c22014-11-24 16:41:02 -0800124 if (inboundPhase() == Phase.STATUS) {
lryan669724a2014-11-10 10:21:45 -0800125 frame.close();
126 return;
127 }
ejona4de20262015-01-16 16:17:35 -0800128 // TODO(zhangkun): It sounds sub-optimal to deframe in the network thread. That means
lryan669724a2014-11-10 10:21:45 -0800129 // decompression is serialized.
nathanmittler5d953e82014-11-18 13:12:42 -0800130 deframe(frame, endOfStream);
131 }
132
133 @Override
134 protected final void deframeFailed(Throwable cause) {
135 log.log(Level.WARNING, "Exception processing message", cause);
136 abortStream(Status.fromThrowable(cause), true);
lryan669724a2014-11-10 10:21:45 -0800137 }
138
ejona9d502992014-09-22 12:23:19 -0700139 @Override
140 protected final void internalSendFrame(ByteBuffer frame, boolean endOfStream) {
ejonac0f41922014-12-23 12:16:17 -0800141 if (frame.hasRemaining()) {
142 sendFrame(frame, false);
143 }
144 if (endOfStream) {
145 sendTrailers(stashedTrailers, headersSent);
146 headersSent = true;
147 stashedTrailers = null;
ejona9d502992014-09-22 12:23:19 -0700148 }
149 }
150
151 /**
lryan28497e32014-10-17 16:14:38 -0700152 * Sends response headers to the remote end points.
153 * @param headers to be sent to client.
154 */
155 protected abstract void internalSendHeaders(Metadata.Headers headers);
156
157 /**
ejona9d502992014-09-22 12:23:19 -0700158 * Sends an outbound frame to the remote end point.
159 *
160 * @param frame a buffer containing the chunk of data to be sent.
161 * @param endOfStream if {@code true} indicates that no more data will be sent on the stream by
162 * this endpoint.
163 */
164 protected abstract void sendFrame(ByteBuffer frame, boolean endOfStream);
165
166 /**
167 * Sends trailers to the remote end point. This call implies end of stream.
168 *
169 * @param trailers metadata to be sent to end point
lryan28497e32014-10-17 16:14:38 -0700170 * @param headersSent true if response headers have already been sent.
ejona9d502992014-09-22 12:23:19 -0700171 */
lryan28497e32014-10-17 16:14:38 -0700172 protected abstract void sendTrailers(Metadata.Trailers trailers, boolean headersSent);
ejona9d502992014-09-22 12:23:19 -0700173
ejonaef2129c2014-09-18 10:12:52 -0700174 /**
175 * The Stream is considered completely closed and there is no further opportunity for error. It
176 * calls the listener's {@code closed()} if it was not already done by {@link #abortStream}. Note
177 * that it is expected that either {@code closed()} or {@code abortStream()} was previously
ejona6fc356b2014-09-22 12:49:20 -0700178 * called, since {@code closed()} is required for a normal stream closure and {@code
179 * abortStream()} for abnormal.
ejonaef2129c2014-09-18 10:12:52 -0700180 */
181 public void complete() {
lryanc5e70c22014-11-24 16:41:02 -0800182 listenerClosed = true;
ejona6fc356b2014-09-22 12:49:20 -0700183 if (!gracefulClose) {
lryan71e4a922014-09-25 18:25:54 -0700184 listener.closed(Status.INTERNAL.withDescription("successful complete() without close()"));
ejonaef2129c2014-09-18 10:12:52 -0700185 throw new IllegalStateException("successful complete() without close()");
186 }
ejona6fc356b2014-09-22 12:49:20 -0700187 listener.closed(Status.OK);
ejonaef2129c2014-09-18 10:12:52 -0700188 }
189
zhangkun048649e2014-08-28 15:52:03 -0700190 /**
191 * Called when the remote end half-closes the stream.
192 */
ejona9d502992014-09-22 12:23:19 -0700193 @Override
194 protected final void remoteEndClosed() {
lryanc5e70c22014-11-24 16:41:02 -0800195 if (inboundPhase(Phase.STATUS) != Phase.STATUS) {
196 listener.halfClosed();
zhangkun048649e2014-08-28 15:52:03 -0700197 }
zhangkun048649e2014-08-28 15:52:03 -0700198 }
199
200 /**
201 * Aborts the stream with an error status, cleans up resources and notifies the listener if
202 * necessary.
203 *
lryane4bd1c72014-09-08 14:03:35 -0700204 * <p>Unlike {@link #close(Status, Metadata.Trailers)}, this method is only called from the
ejonaef2129c2014-09-18 10:12:52 -0700205 * transport. The transport should use this method instead of {@code close(Status)} for internal
206 * errors to prevent exposing unexpected states and exceptions to the application.
zhangkun048649e2014-08-28 15:52:03 -0700207 *
208 * @param status the error status. Must not be Status.OK.
209 * @param notifyClient true if the stream is still writable and you want to notify the client
210 * about stream closure and send the status
211 */
212 public final void abortStream(Status status, boolean notifyClient) {
ejona4de20262015-01-16 16:17:35 -0800213 // TODO(lryan): Investigate whether we can remove the notification to the client
lryanc5e70c22014-11-24 16:41:02 -0800214 // and rely on a transport layer stream reset instead.
zhangkun048649e2014-08-28 15:52:03 -0700215 Preconditions.checkArgument(!status.isOk(), "status must not be OK");
lryanc5e70c22014-11-24 16:41:02 -0800216 if (!listenerClosed) {
ejonaef2129c2014-09-18 10:12:52 -0700217 listenerClosed = true;
lryanc5e70c22014-11-24 16:41:02 -0800218 listener.closed(status);
zhangkun048649e2014-08-28 15:52:03 -0700219 }
lryanc5e70c22014-11-24 16:41:02 -0800220 if (notifyClient) {
ejona4de20262015-01-16 16:17:35 -0800221 // TODO(lryan): Remove
lryanc5e70c22014-11-24 16:41:02 -0800222 if (stashedTrailers == null) {
223 stashedTrailers = new Metadata.Trailers();
ejonaef2129c2014-09-18 10:12:52 -0700224 }
lryanc5e70c22014-11-24 16:41:02 -0800225 writeStatusToTrailers(status);
ejonac0f41922014-12-23 12:16:17 -0800226 closeFramer();
lryanc5e70c22014-11-24 16:41:02 -0800227 } else {
ejonaef2129c2014-09-18 10:12:52 -0700228 dispose();
zhangkun048649e2014-08-28 15:52:03 -0700229 }
zhangkun048649e2014-08-28 15:52:03 -0700230 }
lryanc5e70c22014-11-24 16:41:02 -0800231
232 @Override
233 public boolean isClosed() {
234 return super.isClosed() || listenerClosed;
235 }
zhangkun048649e2014-08-28 15:52:03 -0700236}