blob: 4d94ac1ddb17c3039862b91bf12e3d61ce62fdf0 [file] [log] [blame]
simonma77d97062014-07-18 10:22:35 -07001package com.google.net.stubby.newtransport.okhttp;
2
simonma7bf17dc2014-07-29 09:52:20 -07003import com.google.common.annotations.VisibleForTesting;
simonma77d97062014-07-18 10:22:35 -07004import com.google.common.base.Preconditions;
simonma77d97062014-07-18 10:22:35 -07005import com.google.common.io.ByteStreams;
nathanmittler43d2fcc2014-08-29 21:58:54 -07006import com.google.common.util.concurrent.ListenableFuture;
lryane4bd1c72014-09-08 14:03:35 -07007import com.google.net.stubby.Metadata;
simonma77d97062014-07-18 10:22:35 -07008import com.google.net.stubby.MethodDescriptor;
9import com.google.net.stubby.Status;
zhangkun048649e2014-08-28 15:52:03 -070010import com.google.net.stubby.newtransport.AbstractClientStream;
simonma77d97062014-07-18 10:22:35 -070011import com.google.net.stubby.newtransport.AbstractClientTransport;
simonma77d97062014-07-18 10:22:35 -070012import com.google.net.stubby.newtransport.ClientStream;
13import com.google.net.stubby.newtransport.ClientTransport;
14import com.google.net.stubby.newtransport.InputStreamDeframer;
15import com.google.net.stubby.newtransport.StreamListener;
simonma1966d5b2014-08-08 14:51:01 -070016import com.google.net.stubby.newtransport.StreamState;
simonma77d97062014-07-18 10:22:35 -070017import com.google.net.stubby.transport.Transport;
18import com.google.net.stubby.transport.Transport.Code;
19
20import com.squareup.okhttp.internal.spdy.ErrorCode;
21import com.squareup.okhttp.internal.spdy.FrameReader;
22import com.squareup.okhttp.internal.spdy.Header;
23import com.squareup.okhttp.internal.spdy.HeadersMode;
brettmorgan56f5ec32014-07-18 16:54:50 -070024import com.squareup.okhttp.internal.spdy.Http20Draft12;
simonma77d97062014-07-18 10:22:35 -070025import com.squareup.okhttp.internal.spdy.Settings;
26import com.squareup.okhttp.internal.spdy.Variant;
27
zhangkun048649e2014-08-28 15:52:03 -070028import okio.Buffer;
simonma77d97062014-07-18 10:22:35 -070029import okio.BufferedSink;
30import okio.BufferedSource;
zhangkun048649e2014-08-28 15:52:03 -070031import okio.ByteString;
simonma77d97062014-07-18 10:22:35 -070032import okio.Okio;
simonma77d97062014-07-18 10:22:35 -070033
34import java.io.IOException;
35import java.net.Socket;
36import java.nio.ByteBuffer;
simonma7bf17dc2014-07-29 09:52:20 -070037import java.util.ArrayList;
simonma77d97062014-07-18 10:22:35 -070038import java.util.Collections;
39import java.util.HashMap;
simonma7bf17dc2014-07-29 09:52:20 -070040import java.util.Iterator;
simonma77d97062014-07-18 10:22:35 -070041import java.util.List;
42import java.util.Map;
simonmaf2c4c852014-08-06 10:00:56 -070043import java.util.concurrent.Executor;
simonma77d97062014-07-18 10:22:35 -070044
45import javax.annotation.concurrent.GuardedBy;
46
47/**
48 * A okhttp-based {@link ClientTransport} implementation.
49 */
50public class OkHttpClientTransport extends AbstractClientTransport {
51 /** The default initial window size in HTTP/2 is 64 KiB for the stream and connection. */
simonma7bf17dc2014-07-29 09:52:20 -070052 @VisibleForTesting
simonma77d97062014-07-18 10:22:35 -070053 static final int DEFAULT_INITIAL_WINDOW_SIZE = 64 * 1024;
54
simonma8f3e9ee2014-09-05 11:02:52 -070055 private static final Map<ErrorCode, Status> ERROR_CODE_TO_STATUS;
56 static {
57 Map<ErrorCode, Status> errorToStatus = new HashMap<ErrorCode, Status>();
58 errorToStatus.put(ErrorCode.NO_ERROR, Status.OK);
59 errorToStatus.put(ErrorCode.PROTOCOL_ERROR,
60 new Status(Transport.Code.INTERNAL, "Protocol error"));
61 errorToStatus.put(ErrorCode.INVALID_STREAM,
62 new Status(Transport.Code.INTERNAL, "Invalid stream"));
63 errorToStatus.put(ErrorCode.UNSUPPORTED_VERSION,
64 new Status(Transport.Code.INTERNAL, "Unsupported version"));
65 errorToStatus.put(ErrorCode.STREAM_IN_USE,
66 new Status(Transport.Code.INTERNAL, "Stream in use"));
67 errorToStatus.put(ErrorCode.STREAM_ALREADY_CLOSED,
68 new Status(Transport.Code.INTERNAL, "Stream already closed"));
69 errorToStatus.put(ErrorCode.INTERNAL_ERROR,
70 new Status(Transport.Code.INTERNAL, "Internal error"));
71 errorToStatus.put(ErrorCode.FLOW_CONTROL_ERROR,
72 new Status(Transport.Code.INTERNAL, "Flow control error"));
73 errorToStatus.put(ErrorCode.STREAM_CLOSED,
74 new Status(Transport.Code.INTERNAL, "Stream closed"));
75 errorToStatus.put(ErrorCode.FRAME_TOO_LARGE,
76 new Status(Transport.Code.INTERNAL, "Frame too large"));
77 errorToStatus.put(ErrorCode.REFUSED_STREAM,
78 new Status(Transport.Code.INTERNAL, "Refused stream"));
79 errorToStatus.put(ErrorCode.CANCEL, new Status(Transport.Code.CANCELLED, "Cancelled"));
80 errorToStatus.put(ErrorCode.COMPRESSION_ERROR,
81 new Status(Transport.Code.INTERNAL, "Compression error"));
82 errorToStatus.put(ErrorCode.INVALID_CREDENTIALS,
83 new Status(Transport.Code.PERMISSION_DENIED, "Invalid credentials"));
84 ERROR_CODE_TO_STATUS = Collections.unmodifiableMap(errorToStatus);
85 }
simonma77d97062014-07-18 10:22:35 -070086
87 private final String host;
88 private final int port;
89 private FrameReader frameReader;
90 private AsyncFrameWriter frameWriter;
lryane4bd1c72014-09-08 14:03:35 -070091 private final Object lock = new Object();
simonma7bf17dc2014-07-29 09:52:20 -070092 @GuardedBy("lock")
simonma77d97062014-07-18 10:22:35 -070093 private int nextStreamId;
94 private final Map<Integer, OkHttpClientStream> streams =
95 Collections.synchronizedMap(new HashMap<Integer, OkHttpClientStream>());
simonmaf2c4c852014-08-06 10:00:56 -070096 private final Executor executor;
simonma77d97062014-07-18 10:22:35 -070097 private int unacknowledgedBytesRead;
simonma7bf17dc2014-07-29 09:52:20 -070098 private ClientFrameHandler clientFrameHandler;
99 // The status used to finish all active streams when the transport is closed.
100 @GuardedBy("lock")
101 private boolean goAway;
102 @GuardedBy("lock")
103 private Status goAwayStatus;
simonma77d97062014-07-18 10:22:35 -0700104
simonmaf2c4c852014-08-06 10:00:56 -0700105 OkHttpClientTransport(String host, int port, Executor executor) {
simonma7bf17dc2014-07-29 09:52:20 -0700106 this.host = Preconditions.checkNotNull(host);
simonma77d97062014-07-18 10:22:35 -0700107 this.port = port;
simonmaf2c4c852014-08-06 10:00:56 -0700108 this.executor = Preconditions.checkNotNull(executor);
simonma77d97062014-07-18 10:22:35 -0700109 // Client initiated streams are odd, server initiated ones are even. Server should not need to
110 // use it. We start clients at 3 to avoid conflicting with HTTP negotiation.
111 nextStreamId = 3;
112 }
113
simonma7bf17dc2014-07-29 09:52:20 -0700114 /**
115 * Create a transport connected to a fake peer for test.
116 */
117 @VisibleForTesting
simonmaf2c4c852014-08-06 10:00:56 -0700118 OkHttpClientTransport(Executor executor, FrameReader frameReader, AsyncFrameWriter frameWriter,
119 int nextStreamId) {
simonma7bf17dc2014-07-29 09:52:20 -0700120 host = null;
121 port = -1;
simonmaf2c4c852014-08-06 10:00:56 -0700122 this.executor = Preconditions.checkNotNull(executor);
123 this.frameReader = Preconditions.checkNotNull(frameReader);
124 this.frameWriter = Preconditions.checkNotNull(frameWriter);
simonma7bf17dc2014-07-29 09:52:20 -0700125 this.nextStreamId = nextStreamId;
simonma7bf17dc2014-07-29 09:52:20 -0700126 }
127
simonma77d97062014-07-18 10:22:35 -0700128 @Override
lryane4bd1c72014-09-08 14:03:35 -0700129 protected ClientStream newStreamInternal(MethodDescriptor<?, ?> method,
130 Metadata.Headers headers,
131 StreamListener listener) {
132 return new OkHttpClientStream(method, headers.serialize(), listener);
simonma77d97062014-07-18 10:22:35 -0700133 }
134
135 @Override
136 protected void doStart() {
simonma7bf17dc2014-07-29 09:52:20 -0700137 // We set host to null for test.
138 if (host != null) {
139 BufferedSource source;
140 BufferedSink sink;
141 try {
142 Socket socket = new Socket(host, port);
143 source = Okio.buffer(Okio.source(socket));
144 sink = Okio.buffer(Okio.sink(socket));
145 } catch (IOException e) {
146 throw new RuntimeException(e);
147 }
148 Variant variant = new Http20Draft12();
149 frameReader = variant.newReader(source, true);
150 frameWriter = new AsyncFrameWriter(variant.newWriter(sink, true), this, executor);
simonma77d97062014-07-18 10:22:35 -0700151 }
simonma77d97062014-07-18 10:22:35 -0700152
simonma77d97062014-07-18 10:22:35 -0700153 notifyStarted();
simonma7bf17dc2014-07-29 09:52:20 -0700154 clientFrameHandler = new ClientFrameHandler();
155 executor.execute(clientFrameHandler);
simonma77d97062014-07-18 10:22:35 -0700156 }
157
158 @Override
159 protected void doStop() {
simonma7bf17dc2014-07-29 09:52:20 -0700160 boolean normalClose;
161 synchronized (lock) {
162 normalClose = !goAway;
simonma77d97062014-07-18 10:22:35 -0700163 }
simonma7bf17dc2014-07-29 09:52:20 -0700164 if (normalClose) {
165 abort(new Status(Code.INTERNAL, "Transport stopped"));
166 // Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated streams.
167 // The GOAWAY is part of graceful shutdown.
simonmab645b382014-08-26 10:58:45 -0700168 frameWriter.goAway(0, ErrorCode.NO_ERROR, new byte[0]);
simonma7bf17dc2014-07-29 09:52:20 -0700169 }
170 stopIfNecessary();
171 }
172
173 @VisibleForTesting
174 ClientFrameHandler getHandler() {
175 return clientFrameHandler;
176 }
177
178 @VisibleForTesting
179 Map<Integer, OkHttpClientStream> getStreams() {
180 return streams;
simonma77d97062014-07-18 10:22:35 -0700181 }
182
183 /**
simonma7bf17dc2014-07-29 09:52:20 -0700184 * Finish all active streams with given status, then close the transport.
simonma77d97062014-07-18 10:22:35 -0700185 */
simonma7bf17dc2014-07-29 09:52:20 -0700186 void abort(Status status) {
187 onGoAway(-1, status);
188 }
189
190 private void onGoAway(int lastKnownStreamId, Status status) {
191 ArrayList<OkHttpClientStream> goAwayStreams = new ArrayList<OkHttpClientStream>();
192 synchronized (lock) {
193 goAway = true;
194 goAwayStatus = status;
195 Iterator<Map.Entry<Integer, OkHttpClientStream>> it = streams.entrySet().iterator();
196 while (it.hasNext()) {
197 Map.Entry<Integer, OkHttpClientStream> entry = it.next();
198 if (entry.getKey() > lastKnownStreamId) {
199 goAwayStreams.add(entry.getValue());
200 it.remove();
201 }
202 }
simonma77d97062014-07-18 10:22:35 -0700203 }
simonma7bf17dc2014-07-29 09:52:20 -0700204
205 // Starting stop, go into STOPPING state so that Channel know this Transport should not be used
206 // further, will become STOPPED once all streams are complete.
207 stopAsync();
208
209 for (OkHttpClientStream stream : goAwayStreams) {
lryane4bd1c72014-09-08 14:03:35 -0700210 stream.setStatus(status, new Metadata.Trailers());
simonma77d97062014-07-18 10:22:35 -0700211 }
212 }
213
214 /**
simonma7bf17dc2014-07-29 09:52:20 -0700215 * Called when a stream is closed.
simonma77d97062014-07-18 10:22:35 -0700216 *
217 * <p> Return false if the stream has already finished.
218 */
219 private boolean finishStream(int streamId, Status status) {
220 OkHttpClientStream stream;
221 stream = streams.remove(streamId);
222 if (stream != null) {
223 // This is mainly for failed streams, for successfully finished streams, it's a no-op.
lryane4bd1c72014-09-08 14:03:35 -0700224 stream.setStatus(status, new Metadata.Trailers());
simonma77d97062014-07-18 10:22:35 -0700225 return true;
226 }
227 return false;
228 }
229
230 /**
simonma7bf17dc2014-07-29 09:52:20 -0700231 * When the transport is in goAway states, we should stop it once all active streams finish.
232 */
233 private void stopIfNecessary() {
234 boolean shouldStop;
235 synchronized (lock) {
236 shouldStop = (goAway && streams.size() == 0);
237 }
238 if (shouldStop) {
239 frameWriter.close();
240 try {
241 frameReader.close();
242 } catch (IOException e) {
243 throw new RuntimeException(e);
244 }
simonma7bf17dc2014-07-29 09:52:20 -0700245 notifyStopped();
246 }
247 }
248
249 /**
250 * Returns a Grpc status corresponding to the given ErrorCode.
251 */
252 @VisibleForTesting
253 static Status toGrpcStatus(ErrorCode code) {
254 return ERROR_CODE_TO_STATUS.get(code);
255 }
256
257 /**
simonma77d97062014-07-18 10:22:35 -0700258 * Runnable which reads frames and dispatches them to in flight calls
259 */
simonma7bf17dc2014-07-29 09:52:20 -0700260 @VisibleForTesting
261 class ClientFrameHandler implements FrameReader.Handler, Runnable {
262 ClientFrameHandler() {}
simonma77d97062014-07-18 10:22:35 -0700263
264 @Override
265 public void run() {
266 String threadName = Thread.currentThread().getName();
267 Thread.currentThread().setName("OkHttpClientTransport");
268 try {
269 // Read until the underlying socket closes.
270 while (frameReader.nextFrame(this)) {
271 }
272 } catch (IOException ioe) {
simonma7bf17dc2014-07-29 09:52:20 -0700273 abort(Status.fromThrowable(ioe));
simonma77d97062014-07-18 10:22:35 -0700274 } finally {
275 // Restore the original thread name.
276 Thread.currentThread().setName(threadName);
277 }
278 }
279
280 /**
281 * Handle a HTTP2 DATA frame
282 */
283 @Override
284 public void data(boolean inFinished, int streamId, BufferedSource in, int length)
285 throws IOException {
286 final OkHttpClientStream stream;
287 stream = streams.get(streamId);
288 if (stream == null) {
289 frameWriter.rstStream(streamId, ErrorCode.INVALID_STREAM);
290 return;
291 }
292 InputStreamDeframer deframer = stream.getDeframer();
293
294 // Wait until the frame is complete.
295 in.require(length);
296
297 deframer.deliverFrame(ByteStreams.limit(in.inputStream(), length), inFinished);
298 unacknowledgedBytesRead += length;
299 stream.unacknowledgedBytesRead += length;
300 if (unacknowledgedBytesRead >= DEFAULT_INITIAL_WINDOW_SIZE / 2) {
301 frameWriter.windowUpdate(0, unacknowledgedBytesRead);
302 unacknowledgedBytesRead = 0;
303 }
304 if (stream.unacknowledgedBytesRead >= DEFAULT_INITIAL_WINDOW_SIZE / 2) {
305 frameWriter.windowUpdate(streamId, stream.unacknowledgedBytesRead);
306 stream.unacknowledgedBytesRead = 0;
307 }
308 if (inFinished) {
simonma7bf17dc2014-07-29 09:52:20 -0700309 if (finishStream(streamId, Status.OK)) {
310 stopIfNecessary();
311 }
simonma77d97062014-07-18 10:22:35 -0700312 }
313 }
314
315 /**
316 * Handle HTTP2 HEADER and CONTINUATION frames
317 */
318 @Override
319 public void headers(boolean outFinished,
320 boolean inFinished,
321 int streamId,
322 int associatedStreamId,
brettmorgan56f5ec32014-07-18 16:54:50 -0700323 List<Header> headerBlock,
simonma77d97062014-07-18 10:22:35 -0700324 HeadersMode headersMode) {
325 // TODO(user): handle received headers.
326 }
327
328 @Override
329 public void rstStream(int streamId, ErrorCode errorCode) {
simonma7bf17dc2014-07-29 09:52:20 -0700330 if (finishStream(streamId, toGrpcStatus(errorCode))) {
331 stopIfNecessary();
332 }
simonma77d97062014-07-18 10:22:35 -0700333 }
334
335 @Override
336 public void settings(boolean clearPrevious, Settings settings) {
337 // not impl
338 frameWriter.ackSettings();
339 }
340
341 @Override
342 public void ping(boolean ack, int payload1, int payload2) {
343 if (!ack) {
344 frameWriter.ping(true, payload1, payload2);
345 }
346 }
347
348 @Override
349 public void ackSettings() {
350 // Do nothing currently.
351 }
352
353 @Override
354 public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) {
simonma7bf17dc2014-07-29 09:52:20 -0700355 onGoAway(lastGoodStreamId, new Status(Code.UNAVAILABLE, "Go away"));
simonma77d97062014-07-18 10:22:35 -0700356 }
357
358 @Override
359 public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)
360 throws IOException {
simonma7bf17dc2014-07-29 09:52:20 -0700361 // We don't accept server initiated stream.
362 frameWriter.rstStream(streamId, ErrorCode.PROTOCOL_ERROR);
simonma77d97062014-07-18 10:22:35 -0700363 }
364
365 @Override
366 public void windowUpdate(int arg0, long arg1) {
367 // TODO(user): flow control.
368 }
369
370 @Override
brettmorgan56f5ec32014-07-18 16:54:50 -0700371 public void priority(int streamId, int streamDependency, int weight, boolean exclusive) {
simonma77d97062014-07-18 10:22:35 -0700372 // Ignore priority change.
373 // TODO(user): log
374 }
brettmorgan56f5ec32014-07-18 16:54:50 -0700375
376 @Override
377 public void alternateService(int streamId, String origin, ByteString protocol, String host,
378 int port, long maxAge) {
379 // TODO(user): Deal with alternateService propagation
380 }
simonma77d97062014-07-18 10:22:35 -0700381 }
382
simonma7bf17dc2014-07-29 09:52:20 -0700383 @GuardedBy("lock")
384 private void assignStreamId(OkHttpClientStream stream) {
385 Preconditions.checkState(stream.streamId == 0, "StreamId already assigned");
386 stream.streamId = nextStreamId;
387 streams.put(stream.streamId, stream);
388 if (nextStreamId >= Integer.MAX_VALUE - 2) {
389 onGoAway(Integer.MAX_VALUE, new Status(Code.INTERNAL, "Stream id exhaust"));
390 } else {
391 nextStreamId += 2;
392 }
393 }
394
simonma77d97062014-07-18 10:22:35 -0700395 /**
396 * Client stream for the okhttp transport.
397 */
simonma7bf17dc2014-07-29 09:52:20 -0700398 @VisibleForTesting
zhangkun048649e2014-08-28 15:52:03 -0700399 class OkHttpClientStream extends AbstractClientStream {
simonma77d97062014-07-18 10:22:35 -0700400 int streamId;
401 final InputStreamDeframer deframer;
402 int unacknowledgedBytesRead;
403
lryane4bd1c72014-09-08 14:03:35 -0700404 OkHttpClientStream(MethodDescriptor<?, ?> method, byte[][] headers, StreamListener listener) {
simonma77d97062014-07-18 10:22:35 -0700405 super(listener);
simonma77d97062014-07-18 10:22:35 -0700406 deframer = new InputStreamDeframer(inboundMessageHandler());
simonma7bf17dc2014-07-29 09:52:20 -0700407 synchronized (lock) {
408 if (goAway) {
lryane4bd1c72014-09-08 14:03:35 -0700409 setStatus(goAwayStatus, new Metadata.Trailers());
simonma7bf17dc2014-07-29 09:52:20 -0700410 return;
411 }
412 assignStreamId(this);
413 }
414 frameWriter.synStream(false, false, streamId, 0,
lryane4bd1c72014-09-08 14:03:35 -0700415 Headers.createRequestHeaders(method.getName(), headers));
simonma77d97062014-07-18 10:22:35 -0700416 }
417
simonma7bf17dc2014-07-29 09:52:20 -0700418 InputStreamDeframer getDeframer() {
simonma77d97062014-07-18 10:22:35 -0700419 return deframer;
420 }
421
422 @Override
423 protected void sendFrame(ByteBuffer frame, boolean endOfStream) {
424 Preconditions.checkState(streamId != 0, "streamId should be set");
simonma18eb63b2014-09-04 13:16:28 -0700425 Buffer buffer = new Buffer();
426 // Read the data into a buffer.
427 // TODO(user): swap to NIO buffers or zero-copy if/when okhttp/okio supports it
428 buffer.write(frame.array(), frame.arrayOffset(), frame.remaining());
simonma77d97062014-07-18 10:22:35 -0700429 // Write the data to the remote endpoint.
430 frameWriter.data(endOfStream, streamId, buffer);
431 frameWriter.flush();
432 }
433
434 @Override
nathanmittler43d2fcc2014-08-29 21:58:54 -0700435 protected void disableWindowUpdate(ListenableFuture<Void> processingFuture) {
436 // TODO(user): implement inbound flow control.
437 }
438
439 @Override
simonma77d97062014-07-18 10:22:35 -0700440 public void cancel() {
simonma1966d5b2014-08-08 14:51:01 -0700441 if (streamId == 0) {
442 // This should only happens when the stream was failed in constructor.
443 Preconditions.checkState(state() == StreamState.CLOSED, "A unclosed stream has no id");
444 }
simonma77d97062014-07-18 10:22:35 -0700445 outboundPhase = Phase.STATUS;
simonma7bf17dc2014-07-29 09:52:20 -0700446 if (finishStream(streamId, toGrpcStatus(ErrorCode.CANCEL))) {
simonma77d97062014-07-18 10:22:35 -0700447 frameWriter.rstStream(streamId, ErrorCode.CANCEL);
simonma7bf17dc2014-07-29 09:52:20 -0700448 stopIfNecessary();
simonma77d97062014-07-18 10:22:35 -0700449 }
450 }
451 }
452}