blob: 987bca406f1e4ece0ec16f75e32b82cb7b23423a [file] [log] [blame]
simonma77d97062014-07-18 10:22:35 -07001package com.google.net.stubby.newtransport.okhttp;
2
simonma7bf17dc2014-07-29 09:52:20 -07003import com.google.common.annotations.VisibleForTesting;
simonma77d97062014-07-18 10:22:35 -07004import com.google.common.base.Preconditions;
simonma77d97062014-07-18 10:22:35 -07005import com.google.common.io.ByteStreams;
nathanmittler43d2fcc2014-08-29 21:58:54 -07006import com.google.common.util.concurrent.ListenableFuture;
lryane4bd1c72014-09-08 14:03:35 -07007import com.google.net.stubby.Metadata;
simonma77d97062014-07-18 10:22:35 -07008import com.google.net.stubby.MethodDescriptor;
9import com.google.net.stubby.Status;
zhangkun048649e2014-08-28 15:52:03 -070010import com.google.net.stubby.newtransport.AbstractClientStream;
simonma77d97062014-07-18 10:22:35 -070011import com.google.net.stubby.newtransport.AbstractClientTransport;
simonma77d97062014-07-18 10:22:35 -070012import com.google.net.stubby.newtransport.ClientStream;
ejona6fc356b2014-09-22 12:49:20 -070013import com.google.net.stubby.newtransport.ClientStreamListener;
simonma77d97062014-07-18 10:22:35 -070014import com.google.net.stubby.newtransport.ClientTransport;
15import com.google.net.stubby.newtransport.InputStreamDeframer;
simonma1966d5b2014-08-08 14:51:01 -070016import com.google.net.stubby.newtransport.StreamState;
simonma77d97062014-07-18 10:22:35 -070017
18import com.squareup.okhttp.internal.spdy.ErrorCode;
19import com.squareup.okhttp.internal.spdy.FrameReader;
20import com.squareup.okhttp.internal.spdy.Header;
21import com.squareup.okhttp.internal.spdy.HeadersMode;
brettmorgan56f5ec32014-07-18 16:54:50 -070022import com.squareup.okhttp.internal.spdy.Http20Draft12;
simonma77d97062014-07-18 10:22:35 -070023import com.squareup.okhttp.internal.spdy.Settings;
24import com.squareup.okhttp.internal.spdy.Variant;
25
zhangkun048649e2014-08-28 15:52:03 -070026import okio.Buffer;
simonma77d97062014-07-18 10:22:35 -070027import okio.BufferedSink;
28import okio.BufferedSource;
zhangkun048649e2014-08-28 15:52:03 -070029import okio.ByteString;
simonma77d97062014-07-18 10:22:35 -070030import okio.Okio;
simonma77d97062014-07-18 10:22:35 -070031
32import java.io.IOException;
nathanmittler23fbc7c2014-09-11 12:50:16 -070033import java.net.InetSocketAddress;
simonma77d97062014-07-18 10:22:35 -070034import java.net.Socket;
35import java.nio.ByteBuffer;
simonma7bf17dc2014-07-29 09:52:20 -070036import java.util.ArrayList;
simonma77d97062014-07-18 10:22:35 -070037import java.util.Collections;
38import java.util.HashMap;
simonma7bf17dc2014-07-29 09:52:20 -070039import java.util.Iterator;
simonma77d97062014-07-18 10:22:35 -070040import java.util.List;
41import java.util.Map;
simonmaf2c4c852014-08-06 10:00:56 -070042import java.util.concurrent.Executor;
simonma77d97062014-07-18 10:22:35 -070043
44import javax.annotation.concurrent.GuardedBy;
45
46/**
47 * A okhttp-based {@link ClientTransport} implementation.
48 */
49public class OkHttpClientTransport extends AbstractClientTransport {
50 /** The default initial window size in HTTP/2 is 64 KiB for the stream and connection. */
simonma7bf17dc2014-07-29 09:52:20 -070051 @VisibleForTesting
simonma77d97062014-07-18 10:22:35 -070052 static final int DEFAULT_INITIAL_WINDOW_SIZE = 64 * 1024;
53
simonma8f3e9ee2014-09-05 11:02:52 -070054 private static final Map<ErrorCode, Status> ERROR_CODE_TO_STATUS;
55 static {
56 Map<ErrorCode, Status> errorToStatus = new HashMap<ErrorCode, Status>();
57 errorToStatus.put(ErrorCode.NO_ERROR, Status.OK);
58 errorToStatus.put(ErrorCode.PROTOCOL_ERROR,
lryan71e4a922014-09-25 18:25:54 -070059 Status.INTERNAL.withDescription("Protocol error"));
simonma8f3e9ee2014-09-05 11:02:52 -070060 errorToStatus.put(ErrorCode.INVALID_STREAM,
lryan71e4a922014-09-25 18:25:54 -070061 Status.INTERNAL.withDescription("Invalid stream"));
simonma8f3e9ee2014-09-05 11:02:52 -070062 errorToStatus.put(ErrorCode.UNSUPPORTED_VERSION,
lryan71e4a922014-09-25 18:25:54 -070063 Status.INTERNAL.withDescription("Unsupported version"));
simonma8f3e9ee2014-09-05 11:02:52 -070064 errorToStatus.put(ErrorCode.STREAM_IN_USE,
lryan71e4a922014-09-25 18:25:54 -070065 Status.INTERNAL.withDescription("Stream in use"));
simonma8f3e9ee2014-09-05 11:02:52 -070066 errorToStatus.put(ErrorCode.STREAM_ALREADY_CLOSED,
lryan71e4a922014-09-25 18:25:54 -070067 Status.INTERNAL.withDescription("Stream already closed"));
simonma8f3e9ee2014-09-05 11:02:52 -070068 errorToStatus.put(ErrorCode.INTERNAL_ERROR,
lryan71e4a922014-09-25 18:25:54 -070069 Status.INTERNAL.withDescription("Internal error"));
simonma8f3e9ee2014-09-05 11:02:52 -070070 errorToStatus.put(ErrorCode.FLOW_CONTROL_ERROR,
lryan71e4a922014-09-25 18:25:54 -070071 Status.INTERNAL.withDescription("Flow control error"));
simonma8f3e9ee2014-09-05 11:02:52 -070072 errorToStatus.put(ErrorCode.STREAM_CLOSED,
lryan71e4a922014-09-25 18:25:54 -070073 Status.INTERNAL.withDescription("Stream closed"));
simonma8f3e9ee2014-09-05 11:02:52 -070074 errorToStatus.put(ErrorCode.FRAME_TOO_LARGE,
lryan71e4a922014-09-25 18:25:54 -070075 Status.INTERNAL.withDescription("Frame too large"));
simonma8f3e9ee2014-09-05 11:02:52 -070076 errorToStatus.put(ErrorCode.REFUSED_STREAM,
lryan71e4a922014-09-25 18:25:54 -070077 Status.INTERNAL.withDescription("Refused stream"));
78 errorToStatus.put(ErrorCode.CANCEL, Status.CANCELLED.withDescription("Cancelled"));
simonma8f3e9ee2014-09-05 11:02:52 -070079 errorToStatus.put(ErrorCode.COMPRESSION_ERROR,
lryan71e4a922014-09-25 18:25:54 -070080 Status.INTERNAL.withDescription("Compression error"));
simonma8f3e9ee2014-09-05 11:02:52 -070081 errorToStatus.put(ErrorCode.INVALID_CREDENTIALS,
lryan71e4a922014-09-25 18:25:54 -070082 Status.PERMISSION_DENIED.withDescription("Invalid credentials"));
simonma8f3e9ee2014-09-05 11:02:52 -070083 ERROR_CODE_TO_STATUS = Collections.unmodifiableMap(errorToStatus);
84 }
simonma77d97062014-07-18 10:22:35 -070085
nathanmittler23fbc7c2014-09-11 12:50:16 -070086 private final InetSocketAddress address;
87 private final String defaultAuthority;
simonma77d97062014-07-18 10:22:35 -070088 private FrameReader frameReader;
89 private AsyncFrameWriter frameWriter;
lryane4bd1c72014-09-08 14:03:35 -070090 private final Object lock = new Object();
simonma7bf17dc2014-07-29 09:52:20 -070091 @GuardedBy("lock")
simonma77d97062014-07-18 10:22:35 -070092 private int nextStreamId;
93 private final Map<Integer, OkHttpClientStream> streams =
94 Collections.synchronizedMap(new HashMap<Integer, OkHttpClientStream>());
simonmaf2c4c852014-08-06 10:00:56 -070095 private final Executor executor;
simonma77d97062014-07-18 10:22:35 -070096 private int unacknowledgedBytesRead;
simonma7bf17dc2014-07-29 09:52:20 -070097 private ClientFrameHandler clientFrameHandler;
98 // The status used to finish all active streams when the transport is closed.
99 @GuardedBy("lock")
100 private boolean goAway;
101 @GuardedBy("lock")
102 private Status goAwayStatus;
simonma77d97062014-07-18 10:22:35 -0700103
nathanmittler23fbc7c2014-09-11 12:50:16 -0700104 OkHttpClientTransport(InetSocketAddress address, Executor executor) {
105 this.address = Preconditions.checkNotNull(address);
106 defaultAuthority = address.getHostString() + ":" + address.getPort();
simonmaf2c4c852014-08-06 10:00:56 -0700107 this.executor = Preconditions.checkNotNull(executor);
simonma77d97062014-07-18 10:22:35 -0700108 // Client initiated streams are odd, server initiated ones are even. Server should not need to
109 // use it. We start clients at 3 to avoid conflicting with HTTP negotiation.
110 nextStreamId = 3;
111 }
112
simonma7bf17dc2014-07-29 09:52:20 -0700113 /**
114 * Create a transport connected to a fake peer for test.
115 */
116 @VisibleForTesting
simonmaf2c4c852014-08-06 10:00:56 -0700117 OkHttpClientTransport(Executor executor, FrameReader frameReader, AsyncFrameWriter frameWriter,
118 int nextStreamId) {
nathanmittler23fbc7c2014-09-11 12:50:16 -0700119 address = null;
120 defaultAuthority = "notarealauthority:80";
simonmaf2c4c852014-08-06 10:00:56 -0700121 this.executor = Preconditions.checkNotNull(executor);
122 this.frameReader = Preconditions.checkNotNull(frameReader);
123 this.frameWriter = Preconditions.checkNotNull(frameWriter);
simonma7bf17dc2014-07-29 09:52:20 -0700124 this.nextStreamId = nextStreamId;
simonma7bf17dc2014-07-29 09:52:20 -0700125 }
126
simonma77d97062014-07-18 10:22:35 -0700127 @Override
lryane4bd1c72014-09-08 14:03:35 -0700128 protected ClientStream newStreamInternal(MethodDescriptor<?, ?> method,
129 Metadata.Headers headers,
ejona6fc356b2014-09-22 12:49:20 -0700130 ClientStreamListener listener) {
nathanmittler23fbc7c2014-09-11 12:50:16 -0700131 return new OkHttpClientStream(method, headers, listener);
simonma77d97062014-07-18 10:22:35 -0700132 }
133
134 @Override
135 protected void doStart() {
simonma7bf17dc2014-07-29 09:52:20 -0700136 // We set host to null for test.
nathanmittler23fbc7c2014-09-11 12:50:16 -0700137 if (address != null) {
simonma7bf17dc2014-07-29 09:52:20 -0700138 BufferedSource source;
139 BufferedSink sink;
140 try {
nathanmittler23fbc7c2014-09-11 12:50:16 -0700141 Socket socket = new Socket(address.getAddress(), address.getPort());
simonma7bf17dc2014-07-29 09:52:20 -0700142 source = Okio.buffer(Okio.source(socket));
143 sink = Okio.buffer(Okio.sink(socket));
144 } catch (IOException e) {
145 throw new RuntimeException(e);
146 }
147 Variant variant = new Http20Draft12();
148 frameReader = variant.newReader(source, true);
149 frameWriter = new AsyncFrameWriter(variant.newWriter(sink, true), this, executor);
simonma77d97062014-07-18 10:22:35 -0700150 }
simonma77d97062014-07-18 10:22:35 -0700151
simonma77d97062014-07-18 10:22:35 -0700152 notifyStarted();
simonma7bf17dc2014-07-29 09:52:20 -0700153 clientFrameHandler = new ClientFrameHandler();
154 executor.execute(clientFrameHandler);
simonma77d97062014-07-18 10:22:35 -0700155 }
156
157 @Override
158 protected void doStop() {
simonma7bf17dc2014-07-29 09:52:20 -0700159 boolean normalClose;
160 synchronized (lock) {
161 normalClose = !goAway;
simonma77d97062014-07-18 10:22:35 -0700162 }
simonma7bf17dc2014-07-29 09:52:20 -0700163 if (normalClose) {
lryan71e4a922014-09-25 18:25:54 -0700164 abort(Status.INTERNAL.withDescription("Transport stopped"));
simonma7bf17dc2014-07-29 09:52:20 -0700165 // Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated streams.
166 // The GOAWAY is part of graceful shutdown.
simonmab645b382014-08-26 10:58:45 -0700167 frameWriter.goAway(0, ErrorCode.NO_ERROR, new byte[0]);
simonma7bf17dc2014-07-29 09:52:20 -0700168 }
169 stopIfNecessary();
170 }
171
172 @VisibleForTesting
173 ClientFrameHandler getHandler() {
174 return clientFrameHandler;
175 }
176
177 @VisibleForTesting
178 Map<Integer, OkHttpClientStream> getStreams() {
179 return streams;
simonma77d97062014-07-18 10:22:35 -0700180 }
181
182 /**
simonma7bf17dc2014-07-29 09:52:20 -0700183 * Finish all active streams with given status, then close the transport.
simonma77d97062014-07-18 10:22:35 -0700184 */
simonma7bf17dc2014-07-29 09:52:20 -0700185 void abort(Status status) {
186 onGoAway(-1, status);
187 }
188
189 private void onGoAway(int lastKnownStreamId, Status status) {
190 ArrayList<OkHttpClientStream> goAwayStreams = new ArrayList<OkHttpClientStream>();
191 synchronized (lock) {
192 goAway = true;
193 goAwayStatus = status;
194 Iterator<Map.Entry<Integer, OkHttpClientStream>> it = streams.entrySet().iterator();
195 while (it.hasNext()) {
196 Map.Entry<Integer, OkHttpClientStream> entry = it.next();
197 if (entry.getKey() > lastKnownStreamId) {
198 goAwayStreams.add(entry.getValue());
199 it.remove();
200 }
201 }
simonma77d97062014-07-18 10:22:35 -0700202 }
simonma7bf17dc2014-07-29 09:52:20 -0700203
204 // Starting stop, go into STOPPING state so that Channel know this Transport should not be used
205 // further, will become STOPPED once all streams are complete.
206 stopAsync();
207
208 for (OkHttpClientStream stream : goAwayStreams) {
lryane4bd1c72014-09-08 14:03:35 -0700209 stream.setStatus(status, new Metadata.Trailers());
simonma77d97062014-07-18 10:22:35 -0700210 }
211 }
212
213 /**
simonma7bf17dc2014-07-29 09:52:20 -0700214 * Called when a stream is closed.
simonma77d97062014-07-18 10:22:35 -0700215 *
216 * <p> Return false if the stream has already finished.
217 */
218 private boolean finishStream(int streamId, Status status) {
219 OkHttpClientStream stream;
220 stream = streams.remove(streamId);
221 if (stream != null) {
222 // This is mainly for failed streams, for successfully finished streams, it's a no-op.
lryane4bd1c72014-09-08 14:03:35 -0700223 stream.setStatus(status, new Metadata.Trailers());
simonma77d97062014-07-18 10:22:35 -0700224 return true;
225 }
226 return false;
227 }
228
229 /**
simonma7bf17dc2014-07-29 09:52:20 -0700230 * When the transport is in goAway states, we should stop it once all active streams finish.
231 */
232 private void stopIfNecessary() {
233 boolean shouldStop;
234 synchronized (lock) {
235 shouldStop = (goAway && streams.size() == 0);
236 }
237 if (shouldStop) {
238 frameWriter.close();
239 try {
240 frameReader.close();
241 } catch (IOException e) {
242 throw new RuntimeException(e);
243 }
simonma7bf17dc2014-07-29 09:52:20 -0700244 notifyStopped();
245 }
246 }
247
248 /**
249 * Returns a Grpc status corresponding to the given ErrorCode.
250 */
251 @VisibleForTesting
252 static Status toGrpcStatus(ErrorCode code) {
253 return ERROR_CODE_TO_STATUS.get(code);
254 }
255
256 /**
simonma77d97062014-07-18 10:22:35 -0700257 * Runnable which reads frames and dispatches them to in flight calls
258 */
simonma7bf17dc2014-07-29 09:52:20 -0700259 @VisibleForTesting
260 class ClientFrameHandler implements FrameReader.Handler, Runnable {
261 ClientFrameHandler() {}
simonma77d97062014-07-18 10:22:35 -0700262
263 @Override
264 public void run() {
265 String threadName = Thread.currentThread().getName();
266 Thread.currentThread().setName("OkHttpClientTransport");
267 try {
268 // Read until the underlying socket closes.
269 while (frameReader.nextFrame(this)) {
270 }
271 } catch (IOException ioe) {
simonma7bf17dc2014-07-29 09:52:20 -0700272 abort(Status.fromThrowable(ioe));
simonma77d97062014-07-18 10:22:35 -0700273 } finally {
274 // Restore the original thread name.
275 Thread.currentThread().setName(threadName);
276 }
277 }
278
279 /**
280 * Handle a HTTP2 DATA frame
281 */
282 @Override
283 public void data(boolean inFinished, int streamId, BufferedSource in, int length)
284 throws IOException {
285 final OkHttpClientStream stream;
286 stream = streams.get(streamId);
287 if (stream == null) {
288 frameWriter.rstStream(streamId, ErrorCode.INVALID_STREAM);
289 return;
290 }
291 InputStreamDeframer deframer = stream.getDeframer();
292
293 // Wait until the frame is complete.
294 in.require(length);
295
296 deframer.deliverFrame(ByteStreams.limit(in.inputStream(), length), inFinished);
297 unacknowledgedBytesRead += length;
298 stream.unacknowledgedBytesRead += length;
299 if (unacknowledgedBytesRead >= DEFAULT_INITIAL_WINDOW_SIZE / 2) {
300 frameWriter.windowUpdate(0, unacknowledgedBytesRead);
301 unacknowledgedBytesRead = 0;
302 }
303 if (stream.unacknowledgedBytesRead >= DEFAULT_INITIAL_WINDOW_SIZE / 2) {
304 frameWriter.windowUpdate(streamId, stream.unacknowledgedBytesRead);
305 stream.unacknowledgedBytesRead = 0;
306 }
307 if (inFinished) {
simonma7bf17dc2014-07-29 09:52:20 -0700308 if (finishStream(streamId, Status.OK)) {
309 stopIfNecessary();
310 }
simonma77d97062014-07-18 10:22:35 -0700311 }
312 }
313
314 /**
315 * Handle HTTP2 HEADER and CONTINUATION frames
316 */
317 @Override
318 public void headers(boolean outFinished,
319 boolean inFinished,
320 int streamId,
321 int associatedStreamId,
brettmorgan56f5ec32014-07-18 16:54:50 -0700322 List<Header> headerBlock,
simonma77d97062014-07-18 10:22:35 -0700323 HeadersMode headersMode) {
324 // TODO(user): handle received headers.
325 }
326
327 @Override
328 public void rstStream(int streamId, ErrorCode errorCode) {
simonma7bf17dc2014-07-29 09:52:20 -0700329 if (finishStream(streamId, toGrpcStatus(errorCode))) {
330 stopIfNecessary();
331 }
simonma77d97062014-07-18 10:22:35 -0700332 }
333
334 @Override
335 public void settings(boolean clearPrevious, Settings settings) {
336 // not impl
337 frameWriter.ackSettings();
338 }
339
340 @Override
341 public void ping(boolean ack, int payload1, int payload2) {
342 if (!ack) {
343 frameWriter.ping(true, payload1, payload2);
344 }
345 }
346
347 @Override
348 public void ackSettings() {
349 // Do nothing currently.
350 }
351
352 @Override
353 public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) {
lryan71e4a922014-09-25 18:25:54 -0700354 onGoAway(lastGoodStreamId, Status.UNAVAILABLE.withDescription("Go away"));
simonma77d97062014-07-18 10:22:35 -0700355 }
356
357 @Override
358 public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)
359 throws IOException {
simonma7bf17dc2014-07-29 09:52:20 -0700360 // We don't accept server initiated stream.
361 frameWriter.rstStream(streamId, ErrorCode.PROTOCOL_ERROR);
simonma77d97062014-07-18 10:22:35 -0700362 }
363
364 @Override
365 public void windowUpdate(int arg0, long arg1) {
366 // TODO(user): flow control.
367 }
368
369 @Override
brettmorgan56f5ec32014-07-18 16:54:50 -0700370 public void priority(int streamId, int streamDependency, int weight, boolean exclusive) {
simonma77d97062014-07-18 10:22:35 -0700371 // Ignore priority change.
372 // TODO(user): log
373 }
brettmorgan56f5ec32014-07-18 16:54:50 -0700374
375 @Override
376 public void alternateService(int streamId, String origin, ByteString protocol, String host,
377 int port, long maxAge) {
378 // TODO(user): Deal with alternateService propagation
379 }
simonma77d97062014-07-18 10:22:35 -0700380 }
381
simonma7bf17dc2014-07-29 09:52:20 -0700382 @GuardedBy("lock")
383 private void assignStreamId(OkHttpClientStream stream) {
384 Preconditions.checkState(stream.streamId == 0, "StreamId already assigned");
385 stream.streamId = nextStreamId;
386 streams.put(stream.streamId, stream);
387 if (nextStreamId >= Integer.MAX_VALUE - 2) {
lryan71e4a922014-09-25 18:25:54 -0700388 onGoAway(Integer.MAX_VALUE, Status.INTERNAL.withDescription("Stream id exhaust"));
simonma7bf17dc2014-07-29 09:52:20 -0700389 } else {
390 nextStreamId += 2;
391 }
392 }
393
simonma77d97062014-07-18 10:22:35 -0700394 /**
395 * Client stream for the okhttp transport.
396 */
simonma7bf17dc2014-07-29 09:52:20 -0700397 @VisibleForTesting
zhangkun048649e2014-08-28 15:52:03 -0700398 class OkHttpClientStream extends AbstractClientStream {
simonma77d97062014-07-18 10:22:35 -0700399 int streamId;
400 final InputStreamDeframer deframer;
401 int unacknowledgedBytesRead;
402
nathanmittler23fbc7c2014-09-11 12:50:16 -0700403 OkHttpClientStream(MethodDescriptor<?, ?> method, Metadata.Headers headers,
ejona6fc356b2014-09-22 12:49:20 -0700404 ClientStreamListener listener) {
simonma77d97062014-07-18 10:22:35 -0700405 super(listener);
simonma77d97062014-07-18 10:22:35 -0700406 deframer = new InputStreamDeframer(inboundMessageHandler());
simonma7bf17dc2014-07-29 09:52:20 -0700407 synchronized (lock) {
408 if (goAway) {
lryane4bd1c72014-09-08 14:03:35 -0700409 setStatus(goAwayStatus, new Metadata.Trailers());
simonma7bf17dc2014-07-29 09:52:20 -0700410 return;
411 }
412 assignStreamId(this);
413 }
nathanmittler23fbc7c2014-09-11 12:50:16 -0700414 String defaultPath = "/" + method.getName();
simonma7bf17dc2014-07-29 09:52:20 -0700415 frameWriter.synStream(false, false, streamId, 0,
nathanmittler23fbc7c2014-09-11 12:50:16 -0700416 Headers.createRequestHeaders(headers, defaultPath, defaultAuthority));
simonma77d97062014-07-18 10:22:35 -0700417 }
418
simonma7bf17dc2014-07-29 09:52:20 -0700419 InputStreamDeframer getDeframer() {
simonma77d97062014-07-18 10:22:35 -0700420 return deframer;
421 }
422
423 @Override
424 protected void sendFrame(ByteBuffer frame, boolean endOfStream) {
425 Preconditions.checkState(streamId != 0, "streamId should be set");
simonma18eb63b2014-09-04 13:16:28 -0700426 Buffer buffer = new Buffer();
427 // Read the data into a buffer.
428 // TODO(user): swap to NIO buffers or zero-copy if/when okhttp/okio supports it
429 buffer.write(frame.array(), frame.arrayOffset(), frame.remaining());
simonma77d97062014-07-18 10:22:35 -0700430 // Write the data to the remote endpoint.
431 frameWriter.data(endOfStream, streamId, buffer);
432 frameWriter.flush();
433 }
434
435 @Override
nathanmittler43d2fcc2014-08-29 21:58:54 -0700436 protected void disableWindowUpdate(ListenableFuture<Void> processingFuture) {
437 // TODO(user): implement inbound flow control.
438 }
439
440 @Override
simonma77d97062014-07-18 10:22:35 -0700441 public void cancel() {
simonma1966d5b2014-08-08 14:51:01 -0700442 if (streamId == 0) {
443 // This should only happens when the stream was failed in constructor.
444 Preconditions.checkState(state() == StreamState.CLOSED, "A unclosed stream has no id");
445 }
simonma77d97062014-07-18 10:22:35 -0700446 outboundPhase = Phase.STATUS;
simonma7bf17dc2014-07-29 09:52:20 -0700447 if (finishStream(streamId, toGrpcStatus(ErrorCode.CANCEL))) {
simonma77d97062014-07-18 10:22:35 -0700448 frameWriter.rstStream(streamId, ErrorCode.CANCEL);
simonma7bf17dc2014-07-29 09:52:20 -0700449 stopIfNecessary();
simonma77d97062014-07-18 10:22:35 -0700450 }
451 }
452 }
453}