blob: 3cf17ef71fe37e65b0ac540d6dbc6e45d090b98d [file] [log] [blame]
lryan56e307f2014-12-05 13:25:08 -08001/*
2 * Copyright 2014, Google Inc. All rights reserved.
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions are
6 * met:
7 *
8 * * Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above
11 * copyright notice, this list of conditions and the following disclaimer
12 * in the documentation and/or other materials provided with the
13 * distribution.
14 *
15 * * Neither the name of Google Inc. nor the names of its
16 * contributors may be used to endorse or promote products derived from
17 * this software without specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
20 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
21 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
22 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
23 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
26 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
27 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
28 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 */
31
nathanmittler29cbef12014-10-27 11:33:19 -070032package com.google.net.stubby.transport;
zhangkun048649e2014-08-28 15:52:03 -070033
lryanc5e70c22014-11-24 16:41:02 -080034import com.google.common.base.MoreObjects;
zhangkun048649e2014-08-28 15:52:03 -070035import com.google.common.base.Preconditions;
lryane4bd1c72014-09-08 14:03:35 -070036import com.google.net.stubby.Metadata;
zhangkun048649e2014-08-28 15:52:03 -070037import com.google.net.stubby.Status;
38
ejona9d502992014-09-22 12:23:19 -070039import java.io.InputStream;
40import java.nio.ByteBuffer;
lryan669724a2014-11-10 10:21:45 -080041import java.util.logging.Level;
42import java.util.logging.Logger;
ejona9d502992014-09-22 12:23:19 -070043
lryan28497e32014-10-17 16:14:38 -070044import javax.annotation.Nullable;
ejona9d502992014-09-22 12:23:19 -070045
zhangkun048649e2014-08-28 15:52:03 -070046/**
47 * The abstract base class for {@link ClientStream} implementations.
48 */
lryan669724a2014-11-10 10:21:45 -080049public abstract class AbstractClientStream<IdT> extends AbstractStream<IdT>
50 implements ClientStream {
51
52 private static final Logger log = Logger.getLogger(AbstractClientStream.class.getName());
zhangkun048649e2014-08-28 15:52:03 -070053
ejona6fc356b2014-09-22 12:49:20 -070054 private final ClientStreamListener listener;
lryanc5e70c22014-11-24 16:41:02 -080055 private boolean listenerClosed;
zhangkun048649e2014-08-28 15:52:03 -070056
lryanc5e70c22014-11-24 16:41:02 -080057 // Stored status & trailers to report when deframer completes or
58 // transportReportStatus is directly called.
zhangkun048649e2014-08-28 15:52:03 -070059 private Status status;
lryanc5e70c22014-11-24 16:41:02 -080060 private Metadata.Trailers trailers;
nathanmittler11c363a2015-01-09 11:22:19 -080061 private Runnable closeListenerTask;
zhangkun048649e2014-08-28 15:52:03 -070062
ejona9d502992014-09-22 12:23:19 -070063
nmittlerde3a1312015-01-16 11:54:24 -080064 protected AbstractClientStream(ClientStreamListener listener) {
zhangkun048649e2014-08-28 15:52:03 -070065 this.listener = Preconditions.checkNotNull(listener);
66 }
67
68 @Override
nmittlerde3a1312015-01-16 11:54:24 -080069 protected void receiveMessage(InputStream is, int length) {
70 if (!listenerClosed) {
71 listener.messageRead(is, length);
nathanmittler11c363a2015-01-09 11:22:19 -080072 }
ejona9d502992014-09-22 12:23:19 -070073 }
74
lryan28497e32014-10-17 16:14:38 -070075 @Override
76 public final void writeMessage(InputStream message, int length, @Nullable Runnable accepted) {
77 super.writeMessage(message, length, accepted);
78 }
79
lryan669724a2014-11-10 10:21:45 -080080 /**
81 * The transport implementation has detected a protocol error on the stream. Transports are
82 * responsible for properly closing streams when protocol errors occur.
83 *
84 * @param errorStatus the error to report
85 */
86 protected void inboundTransportError(Status errorStatus) {
lryanc5e70c22014-11-24 16:41:02 -080087 if (inboundPhase() == Phase.STATUS) {
lryan669724a2014-11-10 10:21:45 -080088 log.log(Level.INFO, "Received transport error on closed stream {0} {1}",
89 new Object[]{id(), errorStatus});
90 return;
91 }
lryan669724a2014-11-10 10:21:45 -080092 // For transport errors we immediately report status to the application layer
93 // and do not wait for additional payloads.
nathanmittler11c363a2015-01-09 11:22:19 -080094 transportReportStatus(errorStatus, false, new Metadata.Trailers());
lryan669724a2014-11-10 10:21:45 -080095 }
96
97 /**
98 * Called by transport implementations when they receive headers. When receiving headers
99 * a transport may determine that there is an error in the protocol at this phase which is
100 * why this method takes an error {@link Status}. If a transport reports an
nathanmittler11c363a2015-01-09 11:22:19 -0800101 * {@link com.google.net.stubby.Status.Code#INTERNAL} error
lryan669724a2014-11-10 10:21:45 -0800102 *
103 * @param headers the parsed headers
104 */
105 protected void inboundHeadersReceived(Metadata.Headers headers) {
lryanc5e70c22014-11-24 16:41:02 -0800106 if (inboundPhase() == Phase.STATUS) {
lryan669724a2014-11-10 10:21:45 -0800107 log.log(Level.INFO, "Received headers on closed stream {0} {1}",
108 new Object[]{id(), headers});
109 }
110 inboundPhase(Phase.MESSAGE);
nmittlerde3a1312015-01-16 11:54:24 -0800111 listener.headersRead(headers);
lryan669724a2014-11-10 10:21:45 -0800112 }
113
114 /**
115 * Process the contents of a received data frame from the server.
116 */
117 protected void inboundDataReceived(Buffer frame) {
118 Preconditions.checkNotNull(frame, "frame");
lryanc5e70c22014-11-24 16:41:02 -0800119 if (inboundPhase() == Phase.STATUS) {
lryan669724a2014-11-10 10:21:45 -0800120 frame.close();
121 return;
122 }
lryanc5e70c22014-11-24 16:41:02 -0800123 if (inboundPhase() == Phase.HEADERS) {
lryan669724a2014-11-10 10:21:45 -0800124 // Have not received headers yet so error
125 inboundTransportError(Status.INTERNAL.withDescription("headers not received before payload"));
126 frame.close();
127 return;
128 }
129 inboundPhase(Phase.MESSAGE);
nathanmittler5d953e82014-11-18 13:12:42 -0800130
131 deframe(frame, false);
132 }
133
134 @Override
nathanmittler11c363a2015-01-09 11:22:19 -0800135 protected void inboundDeliveryPaused() {
136 runCloseListenerTask();
137 }
138
139 @Override
nathanmittler5d953e82014-11-18 13:12:42 -0800140 protected final void deframeFailed(Throwable cause) {
141 log.log(Level.WARNING, "Exception processing message", cause);
142 cancel();
lryan669724a2014-11-10 10:21:45 -0800143 }
144
145 /**
146 * Called by transport implementations when they receive trailers.
147 */
148 protected void inboundTrailersReceived(Metadata.Trailers trailers, Status status) {
lryanc5e70c22014-11-24 16:41:02 -0800149 Preconditions.checkNotNull(trailers, "trailers");
150 if (inboundPhase() == Phase.STATUS) {
lryan669724a2014-11-10 10:21:45 -0800151 log.log(Level.INFO, "Received trailers on closed stream {0}\n {1}\n {3}",
152 new Object[]{id(), status, trailers});
153 }
lryan669724a2014-11-10 10:21:45 -0800154 // Stash the status & trailers so they can be delivered by the deframer calls
155 // remoteEndClosed
lryanc5e70c22014-11-24 16:41:02 -0800156 this.status = status;
ejonac0f41922014-12-23 12:16:17 -0800157 this.trailers = trailers;
nathanmittler5d953e82014-11-18 13:12:42 -0800158 deframe(Buffers.empty(), true);
lryan669724a2014-11-10 10:21:45 -0800159 }
160
ejona9d502992014-09-22 12:23:19 -0700161 @Override
162 protected void remoteEndClosed() {
nathanmittler11c363a2015-01-09 11:22:19 -0800163 transportReportStatus(status, true, trailers);
ejona9d502992014-09-22 12:23:19 -0700164 }
165
166 @Override
167 protected final void internalSendFrame(ByteBuffer frame, boolean endOfStream) {
168 sendFrame(frame, endOfStream);
169 }
170
171 /**
172 * Sends an outbound frame to the remote end point.
173 *
174 * @param frame a buffer containing the chunk of data to be sent.
175 * @param endOfStream if {@code true} indicates that no more data will be sent on the stream by
176 * this endpoint.
177 */
178 protected abstract void sendFrame(ByteBuffer frame, boolean endOfStream);
179
zhangkun048649e2014-08-28 15:52:03 -0700180 /**
nathanmittler11c363a2015-01-09 11:22:19 -0800181 * Report stream closure with status to the application layer if not already reported. This method
182 * must be called from the transport thread.
zhangkun048649e2014-08-28 15:52:03 -0700183 *
184 * @param newStatus the new status to set
nathanmittler11c363a2015-01-09 11:22:19 -0800185 * @param stopDelivery if {@code true}, interrupts any further delivery of inbound messages that
186 * may already be queued up in the deframer. If {@code false}, the listener will be
187 * notified immediately after all currently completed messages in the deframer have been
188 * delivered to the application.
zhangkun048649e2014-08-28 15:52:03 -0700189 */
nathanmittler11c363a2015-01-09 11:22:19 -0800190 public void transportReportStatus(final Status newStatus, boolean stopDelivery,
191 final Metadata.Trailers trailers) {
zhangkun048649e2014-08-28 15:52:03 -0700192 Preconditions.checkNotNull(newStatus, "newStatus");
nathanmittler11c363a2015-01-09 11:22:19 -0800193
194 boolean closingLater = closeListenerTask != null && !stopDelivery;
195 if (listenerClosed || closingLater) {
196 // We already closed (or are about to close) the listener.
197 return;
198 }
199
lryanc5e70c22014-11-24 16:41:02 -0800200 inboundPhase(Phase.STATUS);
201 status = newStatus;
nathanmittler11c363a2015-01-09 11:22:19 -0800202 closeListenerTask = null;
203
204 // Determine if the deframer is stalled (i.e. currently has no complete messages to deliver).
nmittlerde3a1312015-01-16 11:54:24 -0800205 boolean deliveryStalled = deframer.isStalled();
nathanmittler11c363a2015-01-09 11:22:19 -0800206
207 if (stopDelivery || deliveryStalled) {
208 // Close the listener immediately.
lryanc5e70c22014-11-24 16:41:02 -0800209 listenerClosed = true;
210 listener.closed(newStatus, trailers);
nathanmittler11c363a2015-01-09 11:22:19 -0800211 } else {
212 // Delay close until inboundDeliveryStalled()
213 closeListenerTask = newCloseListenerTask(newStatus, trailers);
zhangkun048649e2014-08-28 15:52:03 -0700214 }
nathanmittler11c363a2015-01-09 11:22:19 -0800215 }
216
217 /**
218 * Creates a new {@link Runnable} to close the listener with the given status/trailers.
219 */
220 private Runnable newCloseListenerTask(final Status status, final Metadata.Trailers trailers) {
221 return new Runnable() {
222 @Override
223 public void run() {
224 if (!listenerClosed) {
225 // Status has not been reported to the application layer
226 listenerClosed = true;
227 listener.closed(status, trailers);
228 }
229 }
230 };
231 }
232
233 /**
234 * Executes the pending listener close task, if one exists.
235 */
236 private void runCloseListenerTask() {
237 if (closeListenerTask != null) {
238 closeListenerTask.run();
239 closeListenerTask = null;
240 }
zhangkun048649e2014-08-28 15:52:03 -0700241 }
242
243 @Override
244 public final void halfClose() {
lryanc5e70c22014-11-24 16:41:02 -0800245 if (outboundPhase(Phase.STATUS) != Phase.STATUS) {
ejonac0f41922014-12-23 12:16:17 -0800246 closeFramer();
lryan669724a2014-11-10 10:21:45 -0800247 }
248 }
249
250 /**
lryanc5e70c22014-11-24 16:41:02 -0800251 * Cancel the stream. Called by the application layer, never called by the transport.
252 */
253 @Override
254 public void cancel() {
255 outboundPhase(Phase.STATUS);
256 if (id() != null) {
257 // Only send a cancellation to remote side if we have actually been allocated
258 // a stream id and we are not already closed. i.e. the server side is aware of the stream.
259 sendCancel();
260 }
261 dispose();
262 }
263
264 /**
265 * Send a stream cancellation message to the remote server. Can be called by either the
266 * application or transport layers.
lryan669724a2014-11-10 10:21:45 -0800267 */
268 protected abstract void sendCancel();
lryanc5e70c22014-11-24 16:41:02 -0800269
270 @Override
271 protected MoreObjects.ToStringHelper toStringHelper() {
272 MoreObjects.ToStringHelper toStringHelper = super.toStringHelper();
273 if (status != null) {
274 toStringHelper.add("status", status);
275 }
276 return toStringHelper;
277 }
278
279 @Override
280 public boolean isClosed() {
281 return super.isClosed() || listenerClosed;
282 }
zhangkun048649e2014-08-28 15:52:03 -0700283}