blob: 65940b2e6796b69ea566d24565504a918f6a021a [file] [log] [blame]
simonma77d97062014-07-18 10:22:35 -07001package com.google.net.stubby.newtransport.okhttp;
2
simonma7bf17dc2014-07-29 09:52:20 -07003import com.google.common.annotations.VisibleForTesting;
simonma77d97062014-07-18 10:22:35 -07004import com.google.common.base.Preconditions;
simonma77d97062014-07-18 10:22:35 -07005import com.google.common.io.ByteStreams;
nathanmittler43d2fcc2014-08-29 21:58:54 -07006import com.google.common.util.concurrent.ListenableFuture;
lryane4bd1c72014-09-08 14:03:35 -07007import com.google.net.stubby.Metadata;
simonma77d97062014-07-18 10:22:35 -07008import com.google.net.stubby.MethodDescriptor;
9import com.google.net.stubby.Status;
zhangkun048649e2014-08-28 15:52:03 -070010import com.google.net.stubby.newtransport.AbstractClientStream;
simonma77d97062014-07-18 10:22:35 -070011import com.google.net.stubby.newtransport.AbstractClientTransport;
simonma77d97062014-07-18 10:22:35 -070012import com.google.net.stubby.newtransport.ClientStream;
ejona6fc356b2014-09-22 12:49:20 -070013import com.google.net.stubby.newtransport.ClientStreamListener;
simonma77d97062014-07-18 10:22:35 -070014import com.google.net.stubby.newtransport.ClientTransport;
15import com.google.net.stubby.newtransport.InputStreamDeframer;
simonma1966d5b2014-08-08 14:51:01 -070016import com.google.net.stubby.newtransport.StreamState;
simonma77d97062014-07-18 10:22:35 -070017import com.google.net.stubby.transport.Transport;
18import com.google.net.stubby.transport.Transport.Code;
19
20import com.squareup.okhttp.internal.spdy.ErrorCode;
21import com.squareup.okhttp.internal.spdy.FrameReader;
22import com.squareup.okhttp.internal.spdy.Header;
23import com.squareup.okhttp.internal.spdy.HeadersMode;
brettmorgan56f5ec32014-07-18 16:54:50 -070024import com.squareup.okhttp.internal.spdy.Http20Draft12;
simonma77d97062014-07-18 10:22:35 -070025import com.squareup.okhttp.internal.spdy.Settings;
26import com.squareup.okhttp.internal.spdy.Variant;
27
zhangkun048649e2014-08-28 15:52:03 -070028import okio.Buffer;
simonma77d97062014-07-18 10:22:35 -070029import okio.BufferedSink;
30import okio.BufferedSource;
zhangkun048649e2014-08-28 15:52:03 -070031import okio.ByteString;
simonma77d97062014-07-18 10:22:35 -070032import okio.Okio;
simonma77d97062014-07-18 10:22:35 -070033
34import java.io.IOException;
nathanmittler23fbc7c2014-09-11 12:50:16 -070035import java.net.InetSocketAddress;
simonma77d97062014-07-18 10:22:35 -070036import java.net.Socket;
37import java.nio.ByteBuffer;
simonma7bf17dc2014-07-29 09:52:20 -070038import java.util.ArrayList;
simonma77d97062014-07-18 10:22:35 -070039import java.util.Collections;
40import java.util.HashMap;
simonma7bf17dc2014-07-29 09:52:20 -070041import java.util.Iterator;
simonma77d97062014-07-18 10:22:35 -070042import java.util.List;
43import java.util.Map;
simonmaf2c4c852014-08-06 10:00:56 -070044import java.util.concurrent.Executor;
simonma77d97062014-07-18 10:22:35 -070045
46import javax.annotation.concurrent.GuardedBy;
47
48/**
49 * A okhttp-based {@link ClientTransport} implementation.
50 */
51public class OkHttpClientTransport extends AbstractClientTransport {
52 /** The default initial window size in HTTP/2 is 64 KiB for the stream and connection. */
simonma7bf17dc2014-07-29 09:52:20 -070053 @VisibleForTesting
simonma77d97062014-07-18 10:22:35 -070054 static final int DEFAULT_INITIAL_WINDOW_SIZE = 64 * 1024;
55
simonma8f3e9ee2014-09-05 11:02:52 -070056 private static final Map<ErrorCode, Status> ERROR_CODE_TO_STATUS;
57 static {
58 Map<ErrorCode, Status> errorToStatus = new HashMap<ErrorCode, Status>();
59 errorToStatus.put(ErrorCode.NO_ERROR, Status.OK);
60 errorToStatus.put(ErrorCode.PROTOCOL_ERROR,
61 new Status(Transport.Code.INTERNAL, "Protocol error"));
62 errorToStatus.put(ErrorCode.INVALID_STREAM,
63 new Status(Transport.Code.INTERNAL, "Invalid stream"));
64 errorToStatus.put(ErrorCode.UNSUPPORTED_VERSION,
65 new Status(Transport.Code.INTERNAL, "Unsupported version"));
66 errorToStatus.put(ErrorCode.STREAM_IN_USE,
67 new Status(Transport.Code.INTERNAL, "Stream in use"));
68 errorToStatus.put(ErrorCode.STREAM_ALREADY_CLOSED,
69 new Status(Transport.Code.INTERNAL, "Stream already closed"));
70 errorToStatus.put(ErrorCode.INTERNAL_ERROR,
71 new Status(Transport.Code.INTERNAL, "Internal error"));
72 errorToStatus.put(ErrorCode.FLOW_CONTROL_ERROR,
73 new Status(Transport.Code.INTERNAL, "Flow control error"));
74 errorToStatus.put(ErrorCode.STREAM_CLOSED,
75 new Status(Transport.Code.INTERNAL, "Stream closed"));
76 errorToStatus.put(ErrorCode.FRAME_TOO_LARGE,
77 new Status(Transport.Code.INTERNAL, "Frame too large"));
78 errorToStatus.put(ErrorCode.REFUSED_STREAM,
79 new Status(Transport.Code.INTERNAL, "Refused stream"));
80 errorToStatus.put(ErrorCode.CANCEL, new Status(Transport.Code.CANCELLED, "Cancelled"));
81 errorToStatus.put(ErrorCode.COMPRESSION_ERROR,
82 new Status(Transport.Code.INTERNAL, "Compression error"));
83 errorToStatus.put(ErrorCode.INVALID_CREDENTIALS,
84 new Status(Transport.Code.PERMISSION_DENIED, "Invalid credentials"));
85 ERROR_CODE_TO_STATUS = Collections.unmodifiableMap(errorToStatus);
86 }
simonma77d97062014-07-18 10:22:35 -070087
nathanmittler23fbc7c2014-09-11 12:50:16 -070088 private final InetSocketAddress address;
89 private final String defaultAuthority;
simonma77d97062014-07-18 10:22:35 -070090 private FrameReader frameReader;
91 private AsyncFrameWriter frameWriter;
lryane4bd1c72014-09-08 14:03:35 -070092 private final Object lock = new Object();
simonma7bf17dc2014-07-29 09:52:20 -070093 @GuardedBy("lock")
simonma77d97062014-07-18 10:22:35 -070094 private int nextStreamId;
95 private final Map<Integer, OkHttpClientStream> streams =
96 Collections.synchronizedMap(new HashMap<Integer, OkHttpClientStream>());
simonmaf2c4c852014-08-06 10:00:56 -070097 private final Executor executor;
simonma77d97062014-07-18 10:22:35 -070098 private int unacknowledgedBytesRead;
simonma7bf17dc2014-07-29 09:52:20 -070099 private ClientFrameHandler clientFrameHandler;
100 // The status used to finish all active streams when the transport is closed.
101 @GuardedBy("lock")
102 private boolean goAway;
103 @GuardedBy("lock")
104 private Status goAwayStatus;
simonma77d97062014-07-18 10:22:35 -0700105
nathanmittler23fbc7c2014-09-11 12:50:16 -0700106 OkHttpClientTransport(InetSocketAddress address, Executor executor) {
107 this.address = Preconditions.checkNotNull(address);
108 defaultAuthority = address.getHostString() + ":" + address.getPort();
simonmaf2c4c852014-08-06 10:00:56 -0700109 this.executor = Preconditions.checkNotNull(executor);
simonma77d97062014-07-18 10:22:35 -0700110 // Client initiated streams are odd, server initiated ones are even. Server should not need to
111 // use it. We start clients at 3 to avoid conflicting with HTTP negotiation.
112 nextStreamId = 3;
113 }
114
simonma7bf17dc2014-07-29 09:52:20 -0700115 /**
116 * Create a transport connected to a fake peer for test.
117 */
118 @VisibleForTesting
simonmaf2c4c852014-08-06 10:00:56 -0700119 OkHttpClientTransport(Executor executor, FrameReader frameReader, AsyncFrameWriter frameWriter,
120 int nextStreamId) {
nathanmittler23fbc7c2014-09-11 12:50:16 -0700121 address = null;
122 defaultAuthority = "notarealauthority:80";
simonmaf2c4c852014-08-06 10:00:56 -0700123 this.executor = Preconditions.checkNotNull(executor);
124 this.frameReader = Preconditions.checkNotNull(frameReader);
125 this.frameWriter = Preconditions.checkNotNull(frameWriter);
simonma7bf17dc2014-07-29 09:52:20 -0700126 this.nextStreamId = nextStreamId;
simonma7bf17dc2014-07-29 09:52:20 -0700127 }
128
simonma77d97062014-07-18 10:22:35 -0700129 @Override
lryane4bd1c72014-09-08 14:03:35 -0700130 protected ClientStream newStreamInternal(MethodDescriptor<?, ?> method,
131 Metadata.Headers headers,
ejona6fc356b2014-09-22 12:49:20 -0700132 ClientStreamListener listener) {
nathanmittler23fbc7c2014-09-11 12:50:16 -0700133 return new OkHttpClientStream(method, headers, listener);
simonma77d97062014-07-18 10:22:35 -0700134 }
135
136 @Override
137 protected void doStart() {
simonma7bf17dc2014-07-29 09:52:20 -0700138 // We set host to null for test.
nathanmittler23fbc7c2014-09-11 12:50:16 -0700139 if (address != null) {
simonma7bf17dc2014-07-29 09:52:20 -0700140 BufferedSource source;
141 BufferedSink sink;
142 try {
nathanmittler23fbc7c2014-09-11 12:50:16 -0700143 Socket socket = new Socket(address.getAddress(), address.getPort());
simonma7bf17dc2014-07-29 09:52:20 -0700144 source = Okio.buffer(Okio.source(socket));
145 sink = Okio.buffer(Okio.sink(socket));
146 } catch (IOException e) {
147 throw new RuntimeException(e);
148 }
149 Variant variant = new Http20Draft12();
150 frameReader = variant.newReader(source, true);
151 frameWriter = new AsyncFrameWriter(variant.newWriter(sink, true), this, executor);
simonma77d97062014-07-18 10:22:35 -0700152 }
simonma77d97062014-07-18 10:22:35 -0700153
simonma77d97062014-07-18 10:22:35 -0700154 notifyStarted();
simonma7bf17dc2014-07-29 09:52:20 -0700155 clientFrameHandler = new ClientFrameHandler();
156 executor.execute(clientFrameHandler);
simonma77d97062014-07-18 10:22:35 -0700157 }
158
159 @Override
160 protected void doStop() {
simonma7bf17dc2014-07-29 09:52:20 -0700161 boolean normalClose;
162 synchronized (lock) {
163 normalClose = !goAway;
simonma77d97062014-07-18 10:22:35 -0700164 }
simonma7bf17dc2014-07-29 09:52:20 -0700165 if (normalClose) {
166 abort(new Status(Code.INTERNAL, "Transport stopped"));
167 // Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated streams.
168 // The GOAWAY is part of graceful shutdown.
simonmab645b382014-08-26 10:58:45 -0700169 frameWriter.goAway(0, ErrorCode.NO_ERROR, new byte[0]);
simonma7bf17dc2014-07-29 09:52:20 -0700170 }
171 stopIfNecessary();
172 }
173
174 @VisibleForTesting
175 ClientFrameHandler getHandler() {
176 return clientFrameHandler;
177 }
178
179 @VisibleForTesting
180 Map<Integer, OkHttpClientStream> getStreams() {
181 return streams;
simonma77d97062014-07-18 10:22:35 -0700182 }
183
184 /**
simonma7bf17dc2014-07-29 09:52:20 -0700185 * Finish all active streams with given status, then close the transport.
simonma77d97062014-07-18 10:22:35 -0700186 */
simonma7bf17dc2014-07-29 09:52:20 -0700187 void abort(Status status) {
188 onGoAway(-1, status);
189 }
190
191 private void onGoAway(int lastKnownStreamId, Status status) {
192 ArrayList<OkHttpClientStream> goAwayStreams = new ArrayList<OkHttpClientStream>();
193 synchronized (lock) {
194 goAway = true;
195 goAwayStatus = status;
196 Iterator<Map.Entry<Integer, OkHttpClientStream>> it = streams.entrySet().iterator();
197 while (it.hasNext()) {
198 Map.Entry<Integer, OkHttpClientStream> entry = it.next();
199 if (entry.getKey() > lastKnownStreamId) {
200 goAwayStreams.add(entry.getValue());
201 it.remove();
202 }
203 }
simonma77d97062014-07-18 10:22:35 -0700204 }
simonma7bf17dc2014-07-29 09:52:20 -0700205
206 // Starting stop, go into STOPPING state so that Channel know this Transport should not be used
207 // further, will become STOPPED once all streams are complete.
208 stopAsync();
209
210 for (OkHttpClientStream stream : goAwayStreams) {
lryane4bd1c72014-09-08 14:03:35 -0700211 stream.setStatus(status, new Metadata.Trailers());
simonma77d97062014-07-18 10:22:35 -0700212 }
213 }
214
215 /**
simonma7bf17dc2014-07-29 09:52:20 -0700216 * Called when a stream is closed.
simonma77d97062014-07-18 10:22:35 -0700217 *
218 * <p> Return false if the stream has already finished.
219 */
220 private boolean finishStream(int streamId, Status status) {
221 OkHttpClientStream stream;
222 stream = streams.remove(streamId);
223 if (stream != null) {
224 // This is mainly for failed streams, for successfully finished streams, it's a no-op.
lryane4bd1c72014-09-08 14:03:35 -0700225 stream.setStatus(status, new Metadata.Trailers());
simonma77d97062014-07-18 10:22:35 -0700226 return true;
227 }
228 return false;
229 }
230
231 /**
simonma7bf17dc2014-07-29 09:52:20 -0700232 * When the transport is in goAway states, we should stop it once all active streams finish.
233 */
234 private void stopIfNecessary() {
235 boolean shouldStop;
236 synchronized (lock) {
237 shouldStop = (goAway && streams.size() == 0);
238 }
239 if (shouldStop) {
240 frameWriter.close();
241 try {
242 frameReader.close();
243 } catch (IOException e) {
244 throw new RuntimeException(e);
245 }
simonma7bf17dc2014-07-29 09:52:20 -0700246 notifyStopped();
247 }
248 }
249
250 /**
251 * Returns a Grpc status corresponding to the given ErrorCode.
252 */
253 @VisibleForTesting
254 static Status toGrpcStatus(ErrorCode code) {
255 return ERROR_CODE_TO_STATUS.get(code);
256 }
257
258 /**
simonma77d97062014-07-18 10:22:35 -0700259 * Runnable which reads frames and dispatches them to in flight calls
260 */
simonma7bf17dc2014-07-29 09:52:20 -0700261 @VisibleForTesting
262 class ClientFrameHandler implements FrameReader.Handler, Runnable {
263 ClientFrameHandler() {}
simonma77d97062014-07-18 10:22:35 -0700264
265 @Override
266 public void run() {
267 String threadName = Thread.currentThread().getName();
268 Thread.currentThread().setName("OkHttpClientTransport");
269 try {
270 // Read until the underlying socket closes.
271 while (frameReader.nextFrame(this)) {
272 }
273 } catch (IOException ioe) {
simonma7bf17dc2014-07-29 09:52:20 -0700274 abort(Status.fromThrowable(ioe));
simonma77d97062014-07-18 10:22:35 -0700275 } finally {
276 // Restore the original thread name.
277 Thread.currentThread().setName(threadName);
278 }
279 }
280
281 /**
282 * Handle a HTTP2 DATA frame
283 */
284 @Override
285 public void data(boolean inFinished, int streamId, BufferedSource in, int length)
286 throws IOException {
287 final OkHttpClientStream stream;
288 stream = streams.get(streamId);
289 if (stream == null) {
290 frameWriter.rstStream(streamId, ErrorCode.INVALID_STREAM);
291 return;
292 }
293 InputStreamDeframer deframer = stream.getDeframer();
294
295 // Wait until the frame is complete.
296 in.require(length);
297
298 deframer.deliverFrame(ByteStreams.limit(in.inputStream(), length), inFinished);
299 unacknowledgedBytesRead += length;
300 stream.unacknowledgedBytesRead += length;
301 if (unacknowledgedBytesRead >= DEFAULT_INITIAL_WINDOW_SIZE / 2) {
302 frameWriter.windowUpdate(0, unacknowledgedBytesRead);
303 unacknowledgedBytesRead = 0;
304 }
305 if (stream.unacknowledgedBytesRead >= DEFAULT_INITIAL_WINDOW_SIZE / 2) {
306 frameWriter.windowUpdate(streamId, stream.unacknowledgedBytesRead);
307 stream.unacknowledgedBytesRead = 0;
308 }
309 if (inFinished) {
simonma7bf17dc2014-07-29 09:52:20 -0700310 if (finishStream(streamId, Status.OK)) {
311 stopIfNecessary();
312 }
simonma77d97062014-07-18 10:22:35 -0700313 }
314 }
315
316 /**
317 * Handle HTTP2 HEADER and CONTINUATION frames
318 */
319 @Override
320 public void headers(boolean outFinished,
321 boolean inFinished,
322 int streamId,
323 int associatedStreamId,
brettmorgan56f5ec32014-07-18 16:54:50 -0700324 List<Header> headerBlock,
simonma77d97062014-07-18 10:22:35 -0700325 HeadersMode headersMode) {
326 // TODO(user): handle received headers.
327 }
328
329 @Override
330 public void rstStream(int streamId, ErrorCode errorCode) {
simonma7bf17dc2014-07-29 09:52:20 -0700331 if (finishStream(streamId, toGrpcStatus(errorCode))) {
332 stopIfNecessary();
333 }
simonma77d97062014-07-18 10:22:35 -0700334 }
335
336 @Override
337 public void settings(boolean clearPrevious, Settings settings) {
338 // not impl
339 frameWriter.ackSettings();
340 }
341
342 @Override
343 public void ping(boolean ack, int payload1, int payload2) {
344 if (!ack) {
345 frameWriter.ping(true, payload1, payload2);
346 }
347 }
348
349 @Override
350 public void ackSettings() {
351 // Do nothing currently.
352 }
353
354 @Override
355 public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) {
simonma7bf17dc2014-07-29 09:52:20 -0700356 onGoAway(lastGoodStreamId, new Status(Code.UNAVAILABLE, "Go away"));
simonma77d97062014-07-18 10:22:35 -0700357 }
358
359 @Override
360 public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)
361 throws IOException {
simonma7bf17dc2014-07-29 09:52:20 -0700362 // We don't accept server initiated stream.
363 frameWriter.rstStream(streamId, ErrorCode.PROTOCOL_ERROR);
simonma77d97062014-07-18 10:22:35 -0700364 }
365
366 @Override
367 public void windowUpdate(int arg0, long arg1) {
368 // TODO(user): flow control.
369 }
370
371 @Override
brettmorgan56f5ec32014-07-18 16:54:50 -0700372 public void priority(int streamId, int streamDependency, int weight, boolean exclusive) {
simonma77d97062014-07-18 10:22:35 -0700373 // Ignore priority change.
374 // TODO(user): log
375 }
brettmorgan56f5ec32014-07-18 16:54:50 -0700376
377 @Override
378 public void alternateService(int streamId, String origin, ByteString protocol, String host,
379 int port, long maxAge) {
380 // TODO(user): Deal with alternateService propagation
381 }
simonma77d97062014-07-18 10:22:35 -0700382 }
383
simonma7bf17dc2014-07-29 09:52:20 -0700384 @GuardedBy("lock")
385 private void assignStreamId(OkHttpClientStream stream) {
386 Preconditions.checkState(stream.streamId == 0, "StreamId already assigned");
387 stream.streamId = nextStreamId;
388 streams.put(stream.streamId, stream);
389 if (nextStreamId >= Integer.MAX_VALUE - 2) {
390 onGoAway(Integer.MAX_VALUE, new Status(Code.INTERNAL, "Stream id exhaust"));
391 } else {
392 nextStreamId += 2;
393 }
394 }
395
simonma77d97062014-07-18 10:22:35 -0700396 /**
397 * Client stream for the okhttp transport.
398 */
simonma7bf17dc2014-07-29 09:52:20 -0700399 @VisibleForTesting
zhangkun048649e2014-08-28 15:52:03 -0700400 class OkHttpClientStream extends AbstractClientStream {
simonma77d97062014-07-18 10:22:35 -0700401 int streamId;
402 final InputStreamDeframer deframer;
403 int unacknowledgedBytesRead;
404
nathanmittler23fbc7c2014-09-11 12:50:16 -0700405 OkHttpClientStream(MethodDescriptor<?, ?> method, Metadata.Headers headers,
ejona6fc356b2014-09-22 12:49:20 -0700406 ClientStreamListener listener) {
simonma77d97062014-07-18 10:22:35 -0700407 super(listener);
simonma77d97062014-07-18 10:22:35 -0700408 deframer = new InputStreamDeframer(inboundMessageHandler());
simonma7bf17dc2014-07-29 09:52:20 -0700409 synchronized (lock) {
410 if (goAway) {
lryane4bd1c72014-09-08 14:03:35 -0700411 setStatus(goAwayStatus, new Metadata.Trailers());
simonma7bf17dc2014-07-29 09:52:20 -0700412 return;
413 }
414 assignStreamId(this);
415 }
nathanmittler23fbc7c2014-09-11 12:50:16 -0700416 String defaultPath = "/" + method.getName();
simonma7bf17dc2014-07-29 09:52:20 -0700417 frameWriter.synStream(false, false, streamId, 0,
nathanmittler23fbc7c2014-09-11 12:50:16 -0700418 Headers.createRequestHeaders(headers, defaultPath, defaultAuthority));
simonma77d97062014-07-18 10:22:35 -0700419 }
420
simonma7bf17dc2014-07-29 09:52:20 -0700421 InputStreamDeframer getDeframer() {
simonma77d97062014-07-18 10:22:35 -0700422 return deframer;
423 }
424
425 @Override
426 protected void sendFrame(ByteBuffer frame, boolean endOfStream) {
427 Preconditions.checkState(streamId != 0, "streamId should be set");
simonma18eb63b2014-09-04 13:16:28 -0700428 Buffer buffer = new Buffer();
429 // Read the data into a buffer.
430 // TODO(user): swap to NIO buffers or zero-copy if/when okhttp/okio supports it
431 buffer.write(frame.array(), frame.arrayOffset(), frame.remaining());
simonma77d97062014-07-18 10:22:35 -0700432 // Write the data to the remote endpoint.
433 frameWriter.data(endOfStream, streamId, buffer);
434 frameWriter.flush();
435 }
436
437 @Override
nathanmittler43d2fcc2014-08-29 21:58:54 -0700438 protected void disableWindowUpdate(ListenableFuture<Void> processingFuture) {
439 // TODO(user): implement inbound flow control.
440 }
441
442 @Override
simonma77d97062014-07-18 10:22:35 -0700443 public void cancel() {
simonma1966d5b2014-08-08 14:51:01 -0700444 if (streamId == 0) {
445 // This should only happens when the stream was failed in constructor.
446 Preconditions.checkState(state() == StreamState.CLOSED, "A unclosed stream has no id");
447 }
simonma77d97062014-07-18 10:22:35 -0700448 outboundPhase = Phase.STATUS;
simonma7bf17dc2014-07-29 09:52:20 -0700449 if (finishStream(streamId, toGrpcStatus(ErrorCode.CANCEL))) {
simonma77d97062014-07-18 10:22:35 -0700450 frameWriter.rstStream(streamId, ErrorCode.CANCEL);
simonma7bf17dc2014-07-29 09:52:20 -0700451 stopIfNecessary();
simonma77d97062014-07-18 10:22:35 -0700452 }
453 }
454 }
455}