blob: 292adc304a1f2645cf615d0e0109952b7b88a1f2 [file] [log] [blame]
simonma77d97062014-07-18 10:22:35 -07001package com.google.net.stubby.newtransport.okhttp;
2
simonma7bf17dc2014-07-29 09:52:20 -07003import com.google.common.annotations.VisibleForTesting;
simonma77d97062014-07-18 10:22:35 -07004import com.google.common.base.Preconditions;
5import com.google.common.collect.ImmutableMap;
6import com.google.common.io.ByteBuffers;
7import com.google.common.io.ByteStreams;
8import com.google.net.stubby.MethodDescriptor;
9import com.google.net.stubby.Status;
10import com.google.net.stubby.http2.okhttp.Headers;
11import com.google.net.stubby.newtransport.AbstractClientTransport;
12import com.google.net.stubby.newtransport.AbstractStream;
13import com.google.net.stubby.newtransport.ClientStream;
14import com.google.net.stubby.newtransport.ClientTransport;
15import com.google.net.stubby.newtransport.InputStreamDeframer;
16import com.google.net.stubby.newtransport.StreamListener;
simonma1966d5b2014-08-08 14:51:01 -070017import com.google.net.stubby.newtransport.StreamState;
simonma77d97062014-07-18 10:22:35 -070018import com.google.net.stubby.transport.Transport;
19import com.google.net.stubby.transport.Transport.Code;
20
21import com.squareup.okhttp.internal.spdy.ErrorCode;
22import com.squareup.okhttp.internal.spdy.FrameReader;
23import com.squareup.okhttp.internal.spdy.Header;
24import com.squareup.okhttp.internal.spdy.HeadersMode;
brettmorgan56f5ec32014-07-18 16:54:50 -070025import com.squareup.okhttp.internal.spdy.Http20Draft12;
simonma77d97062014-07-18 10:22:35 -070026import com.squareup.okhttp.internal.spdy.Settings;
27import com.squareup.okhttp.internal.spdy.Variant;
28
29import okio.ByteString;
30import okio.BufferedSink;
31import okio.BufferedSource;
32import okio.Okio;
33import okio.Buffer;
34
35import java.io.IOException;
36import java.net.Socket;
37import java.nio.ByteBuffer;
simonma7bf17dc2014-07-29 09:52:20 -070038import java.util.ArrayList;
simonma77d97062014-07-18 10:22:35 -070039import java.util.Collections;
40import java.util.HashMap;
simonma7bf17dc2014-07-29 09:52:20 -070041import java.util.Iterator;
simonma77d97062014-07-18 10:22:35 -070042import java.util.List;
43import java.util.Map;
simonmaf2c4c852014-08-06 10:00:56 -070044import java.util.concurrent.Executor;
simonma77d97062014-07-18 10:22:35 -070045
46import javax.annotation.concurrent.GuardedBy;
47
48/**
49 * A okhttp-based {@link ClientTransport} implementation.
50 */
51public class OkHttpClientTransport extends AbstractClientTransport {
52 /** The default initial window size in HTTP/2 is 64 KiB for the stream and connection. */
simonma7bf17dc2014-07-29 09:52:20 -070053 @VisibleForTesting
simonma77d97062014-07-18 10:22:35 -070054 static final int DEFAULT_INITIAL_WINDOW_SIZE = 64 * 1024;
55
56 private static final ImmutableMap<ErrorCode, Status> ERROR_CODE_TO_STATUS = ImmutableMap
57 .<ErrorCode, Status>builder()
58 .put(ErrorCode.NO_ERROR, Status.OK)
59 .put(ErrorCode.PROTOCOL_ERROR, new Status(Transport.Code.INTERNAL, "Protocol error"))
60 .put(ErrorCode.INVALID_STREAM, new Status(Transport.Code.INTERNAL, "Invalid stream"))
61 .put(ErrorCode.UNSUPPORTED_VERSION,
62 new Status(Transport.Code.INTERNAL, "Unsupported version"))
63 .put(ErrorCode.STREAM_IN_USE, new Status(Transport.Code.INTERNAL, "Stream in use"))
64 .put(ErrorCode.STREAM_ALREADY_CLOSED,
65 new Status(Transport.Code.INTERNAL, "Stream already closed"))
66 .put(ErrorCode.INTERNAL_ERROR, new Status(Transport.Code.INTERNAL, "Internal error"))
67 .put(ErrorCode.FLOW_CONTROL_ERROR, new Status(Transport.Code.INTERNAL, "Flow control error"))
68 .put(ErrorCode.STREAM_CLOSED, new Status(Transport.Code.INTERNAL, "Stream closed"))
69 .put(ErrorCode.FRAME_TOO_LARGE, new Status(Transport.Code.INTERNAL, "Frame too large"))
70 .put(ErrorCode.REFUSED_STREAM, new Status(Transport.Code.INTERNAL, "Refused stream"))
71 .put(ErrorCode.CANCEL, new Status(Transport.Code.CANCELLED, "Cancelled"))
72 .put(ErrorCode.COMPRESSION_ERROR, new Status(Transport.Code.INTERNAL, "Compression error"))
73 .put(ErrorCode.INVALID_CREDENTIALS,
74 new Status(Transport.Code.PERMISSION_DENIED, "Invalid credentials"))
75 .build();
76
77 private final String host;
78 private final int port;
79 private FrameReader frameReader;
80 private AsyncFrameWriter frameWriter;
simonma7bf17dc2014-07-29 09:52:20 -070081 private Object lock = new Object();
82 @GuardedBy("lock")
simonma77d97062014-07-18 10:22:35 -070083 private int nextStreamId;
84 private final Map<Integer, OkHttpClientStream> streams =
85 Collections.synchronizedMap(new HashMap<Integer, OkHttpClientStream>());
simonmaf2c4c852014-08-06 10:00:56 -070086 private final Executor executor;
simonma77d97062014-07-18 10:22:35 -070087 private int unacknowledgedBytesRead;
simonma7bf17dc2014-07-29 09:52:20 -070088 private ClientFrameHandler clientFrameHandler;
89 // The status used to finish all active streams when the transport is closed.
90 @GuardedBy("lock")
91 private boolean goAway;
92 @GuardedBy("lock")
93 private Status goAwayStatus;
simonma77d97062014-07-18 10:22:35 -070094
simonmaf2c4c852014-08-06 10:00:56 -070095 OkHttpClientTransport(String host, int port, Executor executor) {
simonma7bf17dc2014-07-29 09:52:20 -070096 this.host = Preconditions.checkNotNull(host);
simonma77d97062014-07-18 10:22:35 -070097 this.port = port;
simonmaf2c4c852014-08-06 10:00:56 -070098 this.executor = Preconditions.checkNotNull(executor);
simonma77d97062014-07-18 10:22:35 -070099 // Client initiated streams are odd, server initiated ones are even. Server should not need to
100 // use it. We start clients at 3 to avoid conflicting with HTTP negotiation.
101 nextStreamId = 3;
102 }
103
simonma7bf17dc2014-07-29 09:52:20 -0700104 /**
105 * Create a transport connected to a fake peer for test.
106 */
107 @VisibleForTesting
simonmaf2c4c852014-08-06 10:00:56 -0700108 OkHttpClientTransport(Executor executor, FrameReader frameReader, AsyncFrameWriter frameWriter,
109 int nextStreamId) {
simonma7bf17dc2014-07-29 09:52:20 -0700110 host = null;
111 port = -1;
simonmaf2c4c852014-08-06 10:00:56 -0700112 this.executor = Preconditions.checkNotNull(executor);
113 this.frameReader = Preconditions.checkNotNull(frameReader);
114 this.frameWriter = Preconditions.checkNotNull(frameWriter);
simonma7bf17dc2014-07-29 09:52:20 -0700115 this.nextStreamId = nextStreamId;
simonma7bf17dc2014-07-29 09:52:20 -0700116 }
117
simonma77d97062014-07-18 10:22:35 -0700118 @Override
119 protected ClientStream newStreamInternal(MethodDescriptor<?, ?> method, StreamListener listener) {
120 return new OkHttpClientStream(method, listener);
121 }
122
123 @Override
124 protected void doStart() {
simonma7bf17dc2014-07-29 09:52:20 -0700125 // We set host to null for test.
126 if (host != null) {
127 BufferedSource source;
128 BufferedSink sink;
129 try {
130 Socket socket = new Socket(host, port);
131 source = Okio.buffer(Okio.source(socket));
132 sink = Okio.buffer(Okio.sink(socket));
133 } catch (IOException e) {
134 throw new RuntimeException(e);
135 }
136 Variant variant = new Http20Draft12();
137 frameReader = variant.newReader(source, true);
138 frameWriter = new AsyncFrameWriter(variant.newWriter(sink, true), this, executor);
simonma77d97062014-07-18 10:22:35 -0700139 }
simonma77d97062014-07-18 10:22:35 -0700140
simonma77d97062014-07-18 10:22:35 -0700141 notifyStarted();
simonma7bf17dc2014-07-29 09:52:20 -0700142 clientFrameHandler = new ClientFrameHandler();
143 executor.execute(clientFrameHandler);
simonma77d97062014-07-18 10:22:35 -0700144 }
145
146 @Override
147 protected void doStop() {
simonma7bf17dc2014-07-29 09:52:20 -0700148 boolean normalClose;
149 synchronized (lock) {
150 normalClose = !goAway;
simonma77d97062014-07-18 10:22:35 -0700151 }
simonma7bf17dc2014-07-29 09:52:20 -0700152 if (normalClose) {
153 abort(new Status(Code.INTERNAL, "Transport stopped"));
154 // Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated streams.
155 // The GOAWAY is part of graceful shutdown.
156 frameWriter.goAway(0, ErrorCode.NO_ERROR, null);
157 }
158 stopIfNecessary();
159 }
160
161 @VisibleForTesting
162 ClientFrameHandler getHandler() {
163 return clientFrameHandler;
164 }
165
166 @VisibleForTesting
167 Map<Integer, OkHttpClientStream> getStreams() {
168 return streams;
simonma77d97062014-07-18 10:22:35 -0700169 }
170
171 /**
simonma7bf17dc2014-07-29 09:52:20 -0700172 * Finish all active streams with given status, then close the transport.
simonma77d97062014-07-18 10:22:35 -0700173 */
simonma7bf17dc2014-07-29 09:52:20 -0700174 void abort(Status status) {
175 onGoAway(-1, status);
176 }
177
178 private void onGoAway(int lastKnownStreamId, Status status) {
179 ArrayList<OkHttpClientStream> goAwayStreams = new ArrayList<OkHttpClientStream>();
180 synchronized (lock) {
181 goAway = true;
182 goAwayStatus = status;
183 Iterator<Map.Entry<Integer, OkHttpClientStream>> it = streams.entrySet().iterator();
184 while (it.hasNext()) {
185 Map.Entry<Integer, OkHttpClientStream> entry = it.next();
186 if (entry.getKey() > lastKnownStreamId) {
187 goAwayStreams.add(entry.getValue());
188 it.remove();
189 }
190 }
simonma77d97062014-07-18 10:22:35 -0700191 }
simonma7bf17dc2014-07-29 09:52:20 -0700192
193 // Starting stop, go into STOPPING state so that Channel know this Transport should not be used
194 // further, will become STOPPED once all streams are complete.
195 stopAsync();
196
197 for (OkHttpClientStream stream : goAwayStreams) {
simonma77d97062014-07-18 10:22:35 -0700198 stream.setStatus(status);
199 }
200 }
201
202 /**
simonma7bf17dc2014-07-29 09:52:20 -0700203 * Called when a stream is closed.
simonma77d97062014-07-18 10:22:35 -0700204 *
205 * <p> Return false if the stream has already finished.
206 */
207 private boolean finishStream(int streamId, Status status) {
208 OkHttpClientStream stream;
209 stream = streams.remove(streamId);
210 if (stream != null) {
211 // This is mainly for failed streams, for successfully finished streams, it's a no-op.
212 stream.setStatus(status);
213 return true;
214 }
215 return false;
216 }
217
218 /**
simonma7bf17dc2014-07-29 09:52:20 -0700219 * When the transport is in goAway states, we should stop it once all active streams finish.
220 */
221 private void stopIfNecessary() {
222 boolean shouldStop;
223 synchronized (lock) {
224 shouldStop = (goAway && streams.size() == 0);
225 }
226 if (shouldStop) {
227 frameWriter.close();
228 try {
229 frameReader.close();
230 } catch (IOException e) {
231 throw new RuntimeException(e);
232 }
simonma7bf17dc2014-07-29 09:52:20 -0700233 notifyStopped();
234 }
235 }
236
237 /**
238 * Returns a Grpc status corresponding to the given ErrorCode.
239 */
240 @VisibleForTesting
241 static Status toGrpcStatus(ErrorCode code) {
242 return ERROR_CODE_TO_STATUS.get(code);
243 }
244
245 /**
simonma77d97062014-07-18 10:22:35 -0700246 * Runnable which reads frames and dispatches them to in flight calls
247 */
simonma7bf17dc2014-07-29 09:52:20 -0700248 @VisibleForTesting
249 class ClientFrameHandler implements FrameReader.Handler, Runnable {
250 ClientFrameHandler() {}
simonma77d97062014-07-18 10:22:35 -0700251
252 @Override
253 public void run() {
254 String threadName = Thread.currentThread().getName();
255 Thread.currentThread().setName("OkHttpClientTransport");
256 try {
257 // Read until the underlying socket closes.
258 while (frameReader.nextFrame(this)) {
259 }
260 } catch (IOException ioe) {
simonma7bf17dc2014-07-29 09:52:20 -0700261 abort(Status.fromThrowable(ioe));
simonma77d97062014-07-18 10:22:35 -0700262 } finally {
263 // Restore the original thread name.
264 Thread.currentThread().setName(threadName);
265 }
266 }
267
268 /**
269 * Handle a HTTP2 DATA frame
270 */
271 @Override
272 public void data(boolean inFinished, int streamId, BufferedSource in, int length)
273 throws IOException {
274 final OkHttpClientStream stream;
275 stream = streams.get(streamId);
276 if (stream == null) {
277 frameWriter.rstStream(streamId, ErrorCode.INVALID_STREAM);
278 return;
279 }
280 InputStreamDeframer deframer = stream.getDeframer();
281
282 // Wait until the frame is complete.
283 in.require(length);
284
285 deframer.deliverFrame(ByteStreams.limit(in.inputStream(), length), inFinished);
286 unacknowledgedBytesRead += length;
287 stream.unacknowledgedBytesRead += length;
288 if (unacknowledgedBytesRead >= DEFAULT_INITIAL_WINDOW_SIZE / 2) {
289 frameWriter.windowUpdate(0, unacknowledgedBytesRead);
290 unacknowledgedBytesRead = 0;
291 }
292 if (stream.unacknowledgedBytesRead >= DEFAULT_INITIAL_WINDOW_SIZE / 2) {
293 frameWriter.windowUpdate(streamId, stream.unacknowledgedBytesRead);
294 stream.unacknowledgedBytesRead = 0;
295 }
296 if (inFinished) {
simonma7bf17dc2014-07-29 09:52:20 -0700297 if (finishStream(streamId, Status.OK)) {
298 stopIfNecessary();
299 }
simonma77d97062014-07-18 10:22:35 -0700300 }
301 }
302
303 /**
304 * Handle HTTP2 HEADER and CONTINUATION frames
305 */
306 @Override
307 public void headers(boolean outFinished,
308 boolean inFinished,
309 int streamId,
310 int associatedStreamId,
brettmorgan56f5ec32014-07-18 16:54:50 -0700311 List<Header> headerBlock,
simonma77d97062014-07-18 10:22:35 -0700312 HeadersMode headersMode) {
313 // TODO(user): handle received headers.
314 }
315
316 @Override
317 public void rstStream(int streamId, ErrorCode errorCode) {
simonma7bf17dc2014-07-29 09:52:20 -0700318 if (finishStream(streamId, toGrpcStatus(errorCode))) {
319 stopIfNecessary();
320 }
simonma77d97062014-07-18 10:22:35 -0700321 }
322
323 @Override
324 public void settings(boolean clearPrevious, Settings settings) {
325 // not impl
326 frameWriter.ackSettings();
327 }
328
329 @Override
330 public void ping(boolean ack, int payload1, int payload2) {
331 if (!ack) {
332 frameWriter.ping(true, payload1, payload2);
333 }
334 }
335
336 @Override
337 public void ackSettings() {
338 // Do nothing currently.
339 }
340
341 @Override
342 public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) {
simonma7bf17dc2014-07-29 09:52:20 -0700343 onGoAway(lastGoodStreamId, new Status(Code.UNAVAILABLE, "Go away"));
simonma77d97062014-07-18 10:22:35 -0700344 }
345
346 @Override
347 public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)
348 throws IOException {
simonma7bf17dc2014-07-29 09:52:20 -0700349 // We don't accept server initiated stream.
350 frameWriter.rstStream(streamId, ErrorCode.PROTOCOL_ERROR);
simonma77d97062014-07-18 10:22:35 -0700351 }
352
353 @Override
354 public void windowUpdate(int arg0, long arg1) {
355 // TODO(user): flow control.
356 }
357
358 @Override
brettmorgan56f5ec32014-07-18 16:54:50 -0700359 public void priority(int streamId, int streamDependency, int weight, boolean exclusive) {
simonma77d97062014-07-18 10:22:35 -0700360 // Ignore priority change.
361 // TODO(user): log
362 }
brettmorgan56f5ec32014-07-18 16:54:50 -0700363
364 @Override
365 public void alternateService(int streamId, String origin, ByteString protocol, String host,
366 int port, long maxAge) {
367 // TODO(user): Deal with alternateService propagation
368 }
simonma77d97062014-07-18 10:22:35 -0700369 }
370
simonma7bf17dc2014-07-29 09:52:20 -0700371 @GuardedBy("lock")
372 private void assignStreamId(OkHttpClientStream stream) {
373 Preconditions.checkState(stream.streamId == 0, "StreamId already assigned");
374 stream.streamId = nextStreamId;
375 streams.put(stream.streamId, stream);
376 if (nextStreamId >= Integer.MAX_VALUE - 2) {
377 onGoAway(Integer.MAX_VALUE, new Status(Code.INTERNAL, "Stream id exhaust"));
378 } else {
379 nextStreamId += 2;
380 }
381 }
382
simonma77d97062014-07-18 10:22:35 -0700383 /**
384 * Client stream for the okhttp transport.
385 */
simonma7bf17dc2014-07-29 09:52:20 -0700386 @VisibleForTesting
387 class OkHttpClientStream extends AbstractStream implements ClientStream {
simonma77d97062014-07-18 10:22:35 -0700388 int streamId;
389 final InputStreamDeframer deframer;
390 int unacknowledgedBytesRead;
391
simonma7bf17dc2014-07-29 09:52:20 -0700392 OkHttpClientStream(MethodDescriptor<?, ?> method, StreamListener listener) {
simonma77d97062014-07-18 10:22:35 -0700393 super(listener);
simonma77d97062014-07-18 10:22:35 -0700394 deframer = new InputStreamDeframer(inboundMessageHandler());
simonma7bf17dc2014-07-29 09:52:20 -0700395 synchronized (lock) {
396 if (goAway) {
397 setStatus(goAwayStatus);
398 return;
399 }
400 assignStreamId(this);
401 }
402 frameWriter.synStream(false, false, streamId, 0,
403 Headers.createRequestHeaders(method.getName()));
simonma77d97062014-07-18 10:22:35 -0700404 }
405
simonma7bf17dc2014-07-29 09:52:20 -0700406 InputStreamDeframer getDeframer() {
simonma77d97062014-07-18 10:22:35 -0700407 return deframer;
408 }
409
410 @Override
411 protected void sendFrame(ByteBuffer frame, boolean endOfStream) {
412 Preconditions.checkState(streamId != 0, "streamId should be set");
413 Buffer buffer;
414 try {
415 // Read the data into a buffer.
416 // TODO(user): swap to NIO buffers or zero-copy if/when okhttp/okio supports it
417 buffer = new Buffer().readFrom(ByteBuffers.newConsumingInputStream(frame));
418 } catch (IOException e) {
419 throw new RuntimeException(e);
420 }
421
422 // Write the data to the remote endpoint.
423 frameWriter.data(endOfStream, streamId, buffer);
424 frameWriter.flush();
425 }
426
427 @Override
428 public void cancel() {
simonma1966d5b2014-08-08 14:51:01 -0700429 if (streamId == 0) {
430 // This should only happens when the stream was failed in constructor.
431 Preconditions.checkState(state() == StreamState.CLOSED, "A unclosed stream has no id");
432 }
simonma77d97062014-07-18 10:22:35 -0700433 outboundPhase = Phase.STATUS;
simonma7bf17dc2014-07-29 09:52:20 -0700434 if (finishStream(streamId, toGrpcStatus(ErrorCode.CANCEL))) {
simonma77d97062014-07-18 10:22:35 -0700435 frameWriter.rstStream(streamId, ErrorCode.CANCEL);
simonma7bf17dc2014-07-29 09:52:20 -0700436 stopIfNecessary();
simonma77d97062014-07-18 10:22:35 -0700437 }
438 }
439 }
440}