blob: 22350b6f7dcdaf006feb595eea3a52fb9ed994c2 [file] [log] [blame]
simonma77d97062014-07-18 10:22:35 -07001package com.google.net.stubby.newtransport.okhttp;
2
simonma7bf17dc2014-07-29 09:52:20 -07003import com.google.common.annotations.VisibleForTesting;
simonma77d97062014-07-18 10:22:35 -07004import com.google.common.base.Preconditions;
simonma77d97062014-07-18 10:22:35 -07005import com.google.common.io.ByteStreams;
nathanmittler43d2fcc2014-08-29 21:58:54 -07006import com.google.common.util.concurrent.ListenableFuture;
simonma77d97062014-07-18 10:22:35 -07007import com.google.net.stubby.MethodDescriptor;
8import com.google.net.stubby.Status;
zhangkun048649e2014-08-28 15:52:03 -07009import com.google.net.stubby.newtransport.AbstractClientStream;
simonma77d97062014-07-18 10:22:35 -070010import com.google.net.stubby.newtransport.AbstractClientTransport;
simonma77d97062014-07-18 10:22:35 -070011import com.google.net.stubby.newtransport.ClientStream;
12import com.google.net.stubby.newtransport.ClientTransport;
13import com.google.net.stubby.newtransport.InputStreamDeframer;
14import com.google.net.stubby.newtransport.StreamListener;
simonma1966d5b2014-08-08 14:51:01 -070015import com.google.net.stubby.newtransport.StreamState;
simonma77d97062014-07-18 10:22:35 -070016import com.google.net.stubby.transport.Transport;
17import com.google.net.stubby.transport.Transport.Code;
18
19import com.squareup.okhttp.internal.spdy.ErrorCode;
20import com.squareup.okhttp.internal.spdy.FrameReader;
21import com.squareup.okhttp.internal.spdy.Header;
22import com.squareup.okhttp.internal.spdy.HeadersMode;
brettmorgan56f5ec32014-07-18 16:54:50 -070023import com.squareup.okhttp.internal.spdy.Http20Draft12;
simonma77d97062014-07-18 10:22:35 -070024import com.squareup.okhttp.internal.spdy.Settings;
25import com.squareup.okhttp.internal.spdy.Variant;
26
zhangkun048649e2014-08-28 15:52:03 -070027import okio.Buffer;
simonma77d97062014-07-18 10:22:35 -070028import okio.BufferedSink;
29import okio.BufferedSource;
zhangkun048649e2014-08-28 15:52:03 -070030import okio.ByteString;
simonma77d97062014-07-18 10:22:35 -070031import okio.Okio;
simonma77d97062014-07-18 10:22:35 -070032
33import java.io.IOException;
34import java.net.Socket;
35import java.nio.ByteBuffer;
simonma7bf17dc2014-07-29 09:52:20 -070036import java.util.ArrayList;
simonma77d97062014-07-18 10:22:35 -070037import java.util.Collections;
38import java.util.HashMap;
simonma7bf17dc2014-07-29 09:52:20 -070039import java.util.Iterator;
simonma77d97062014-07-18 10:22:35 -070040import java.util.List;
41import java.util.Map;
simonmaf2c4c852014-08-06 10:00:56 -070042import java.util.concurrent.Executor;
simonma77d97062014-07-18 10:22:35 -070043
44import javax.annotation.concurrent.GuardedBy;
45
46/**
47 * A okhttp-based {@link ClientTransport} implementation.
48 */
49public class OkHttpClientTransport extends AbstractClientTransport {
50 /** The default initial window size in HTTP/2 is 64 KiB for the stream and connection. */
simonma7bf17dc2014-07-29 09:52:20 -070051 @VisibleForTesting
simonma77d97062014-07-18 10:22:35 -070052 static final int DEFAULT_INITIAL_WINDOW_SIZE = 64 * 1024;
53
simonma8f3e9ee2014-09-05 11:02:52 -070054 private static final Map<ErrorCode, Status> ERROR_CODE_TO_STATUS;
55 static {
56 Map<ErrorCode, Status> errorToStatus = new HashMap<ErrorCode, Status>();
57 errorToStatus.put(ErrorCode.NO_ERROR, Status.OK);
58 errorToStatus.put(ErrorCode.PROTOCOL_ERROR,
59 new Status(Transport.Code.INTERNAL, "Protocol error"));
60 errorToStatus.put(ErrorCode.INVALID_STREAM,
61 new Status(Transport.Code.INTERNAL, "Invalid stream"));
62 errorToStatus.put(ErrorCode.UNSUPPORTED_VERSION,
63 new Status(Transport.Code.INTERNAL, "Unsupported version"));
64 errorToStatus.put(ErrorCode.STREAM_IN_USE,
65 new Status(Transport.Code.INTERNAL, "Stream in use"));
66 errorToStatus.put(ErrorCode.STREAM_ALREADY_CLOSED,
67 new Status(Transport.Code.INTERNAL, "Stream already closed"));
68 errorToStatus.put(ErrorCode.INTERNAL_ERROR,
69 new Status(Transport.Code.INTERNAL, "Internal error"));
70 errorToStatus.put(ErrorCode.FLOW_CONTROL_ERROR,
71 new Status(Transport.Code.INTERNAL, "Flow control error"));
72 errorToStatus.put(ErrorCode.STREAM_CLOSED,
73 new Status(Transport.Code.INTERNAL, "Stream closed"));
74 errorToStatus.put(ErrorCode.FRAME_TOO_LARGE,
75 new Status(Transport.Code.INTERNAL, "Frame too large"));
76 errorToStatus.put(ErrorCode.REFUSED_STREAM,
77 new Status(Transport.Code.INTERNAL, "Refused stream"));
78 errorToStatus.put(ErrorCode.CANCEL, new Status(Transport.Code.CANCELLED, "Cancelled"));
79 errorToStatus.put(ErrorCode.COMPRESSION_ERROR,
80 new Status(Transport.Code.INTERNAL, "Compression error"));
81 errorToStatus.put(ErrorCode.INVALID_CREDENTIALS,
82 new Status(Transport.Code.PERMISSION_DENIED, "Invalid credentials"));
83 ERROR_CODE_TO_STATUS = Collections.unmodifiableMap(errorToStatus);
84 }
simonma77d97062014-07-18 10:22:35 -070085
86 private final String host;
87 private final int port;
88 private FrameReader frameReader;
89 private AsyncFrameWriter frameWriter;
simonma7bf17dc2014-07-29 09:52:20 -070090 private Object lock = new Object();
91 @GuardedBy("lock")
simonma77d97062014-07-18 10:22:35 -070092 private int nextStreamId;
93 private final Map<Integer, OkHttpClientStream> streams =
94 Collections.synchronizedMap(new HashMap<Integer, OkHttpClientStream>());
simonmaf2c4c852014-08-06 10:00:56 -070095 private final Executor executor;
simonma77d97062014-07-18 10:22:35 -070096 private int unacknowledgedBytesRead;
simonma7bf17dc2014-07-29 09:52:20 -070097 private ClientFrameHandler clientFrameHandler;
98 // The status used to finish all active streams when the transport is closed.
99 @GuardedBy("lock")
100 private boolean goAway;
101 @GuardedBy("lock")
102 private Status goAwayStatus;
simonma77d97062014-07-18 10:22:35 -0700103
simonmaf2c4c852014-08-06 10:00:56 -0700104 OkHttpClientTransport(String host, int port, Executor executor) {
simonma7bf17dc2014-07-29 09:52:20 -0700105 this.host = Preconditions.checkNotNull(host);
simonma77d97062014-07-18 10:22:35 -0700106 this.port = port;
simonmaf2c4c852014-08-06 10:00:56 -0700107 this.executor = Preconditions.checkNotNull(executor);
simonma77d97062014-07-18 10:22:35 -0700108 // Client initiated streams are odd, server initiated ones are even. Server should not need to
109 // use it. We start clients at 3 to avoid conflicting with HTTP negotiation.
110 nextStreamId = 3;
111 }
112
simonma7bf17dc2014-07-29 09:52:20 -0700113 /**
114 * Create a transport connected to a fake peer for test.
115 */
116 @VisibleForTesting
simonmaf2c4c852014-08-06 10:00:56 -0700117 OkHttpClientTransport(Executor executor, FrameReader frameReader, AsyncFrameWriter frameWriter,
118 int nextStreamId) {
simonma7bf17dc2014-07-29 09:52:20 -0700119 host = null;
120 port = -1;
simonmaf2c4c852014-08-06 10:00:56 -0700121 this.executor = Preconditions.checkNotNull(executor);
122 this.frameReader = Preconditions.checkNotNull(frameReader);
123 this.frameWriter = Preconditions.checkNotNull(frameWriter);
simonma7bf17dc2014-07-29 09:52:20 -0700124 this.nextStreamId = nextStreamId;
simonma7bf17dc2014-07-29 09:52:20 -0700125 }
126
simonma77d97062014-07-18 10:22:35 -0700127 @Override
128 protected ClientStream newStreamInternal(MethodDescriptor<?, ?> method, StreamListener listener) {
129 return new OkHttpClientStream(method, listener);
130 }
131
132 @Override
133 protected void doStart() {
simonma7bf17dc2014-07-29 09:52:20 -0700134 // We set host to null for test.
135 if (host != null) {
136 BufferedSource source;
137 BufferedSink sink;
138 try {
139 Socket socket = new Socket(host, port);
140 source = Okio.buffer(Okio.source(socket));
141 sink = Okio.buffer(Okio.sink(socket));
142 } catch (IOException e) {
143 throw new RuntimeException(e);
144 }
145 Variant variant = new Http20Draft12();
146 frameReader = variant.newReader(source, true);
147 frameWriter = new AsyncFrameWriter(variant.newWriter(sink, true), this, executor);
simonma77d97062014-07-18 10:22:35 -0700148 }
simonma77d97062014-07-18 10:22:35 -0700149
simonma77d97062014-07-18 10:22:35 -0700150 notifyStarted();
simonma7bf17dc2014-07-29 09:52:20 -0700151 clientFrameHandler = new ClientFrameHandler();
152 executor.execute(clientFrameHandler);
simonma77d97062014-07-18 10:22:35 -0700153 }
154
155 @Override
156 protected void doStop() {
simonma7bf17dc2014-07-29 09:52:20 -0700157 boolean normalClose;
158 synchronized (lock) {
159 normalClose = !goAway;
simonma77d97062014-07-18 10:22:35 -0700160 }
simonma7bf17dc2014-07-29 09:52:20 -0700161 if (normalClose) {
162 abort(new Status(Code.INTERNAL, "Transport stopped"));
163 // Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated streams.
164 // The GOAWAY is part of graceful shutdown.
simonmab645b382014-08-26 10:58:45 -0700165 frameWriter.goAway(0, ErrorCode.NO_ERROR, new byte[0]);
simonma7bf17dc2014-07-29 09:52:20 -0700166 }
167 stopIfNecessary();
168 }
169
170 @VisibleForTesting
171 ClientFrameHandler getHandler() {
172 return clientFrameHandler;
173 }
174
175 @VisibleForTesting
176 Map<Integer, OkHttpClientStream> getStreams() {
177 return streams;
simonma77d97062014-07-18 10:22:35 -0700178 }
179
180 /**
simonma7bf17dc2014-07-29 09:52:20 -0700181 * Finish all active streams with given status, then close the transport.
simonma77d97062014-07-18 10:22:35 -0700182 */
simonma7bf17dc2014-07-29 09:52:20 -0700183 void abort(Status status) {
184 onGoAway(-1, status);
185 }
186
187 private void onGoAway(int lastKnownStreamId, Status status) {
188 ArrayList<OkHttpClientStream> goAwayStreams = new ArrayList<OkHttpClientStream>();
189 synchronized (lock) {
190 goAway = true;
191 goAwayStatus = status;
192 Iterator<Map.Entry<Integer, OkHttpClientStream>> it = streams.entrySet().iterator();
193 while (it.hasNext()) {
194 Map.Entry<Integer, OkHttpClientStream> entry = it.next();
195 if (entry.getKey() > lastKnownStreamId) {
196 goAwayStreams.add(entry.getValue());
197 it.remove();
198 }
199 }
simonma77d97062014-07-18 10:22:35 -0700200 }
simonma7bf17dc2014-07-29 09:52:20 -0700201
202 // Starting stop, go into STOPPING state so that Channel know this Transport should not be used
203 // further, will become STOPPED once all streams are complete.
204 stopAsync();
205
206 for (OkHttpClientStream stream : goAwayStreams) {
simonma77d97062014-07-18 10:22:35 -0700207 stream.setStatus(status);
208 }
209 }
210
211 /**
simonma7bf17dc2014-07-29 09:52:20 -0700212 * Called when a stream is closed.
simonma77d97062014-07-18 10:22:35 -0700213 *
214 * <p> Return false if the stream has already finished.
215 */
216 private boolean finishStream(int streamId, Status status) {
217 OkHttpClientStream stream;
218 stream = streams.remove(streamId);
219 if (stream != null) {
220 // This is mainly for failed streams, for successfully finished streams, it's a no-op.
221 stream.setStatus(status);
222 return true;
223 }
224 return false;
225 }
226
227 /**
simonma7bf17dc2014-07-29 09:52:20 -0700228 * When the transport is in goAway states, we should stop it once all active streams finish.
229 */
230 private void stopIfNecessary() {
231 boolean shouldStop;
232 synchronized (lock) {
233 shouldStop = (goAway && streams.size() == 0);
234 }
235 if (shouldStop) {
236 frameWriter.close();
237 try {
238 frameReader.close();
239 } catch (IOException e) {
240 throw new RuntimeException(e);
241 }
simonma7bf17dc2014-07-29 09:52:20 -0700242 notifyStopped();
243 }
244 }
245
246 /**
247 * Returns a Grpc status corresponding to the given ErrorCode.
248 */
249 @VisibleForTesting
250 static Status toGrpcStatus(ErrorCode code) {
251 return ERROR_CODE_TO_STATUS.get(code);
252 }
253
254 /**
simonma77d97062014-07-18 10:22:35 -0700255 * Runnable which reads frames and dispatches them to in flight calls
256 */
simonma7bf17dc2014-07-29 09:52:20 -0700257 @VisibleForTesting
258 class ClientFrameHandler implements FrameReader.Handler, Runnable {
259 ClientFrameHandler() {}
simonma77d97062014-07-18 10:22:35 -0700260
261 @Override
262 public void run() {
263 String threadName = Thread.currentThread().getName();
264 Thread.currentThread().setName("OkHttpClientTransport");
265 try {
266 // Read until the underlying socket closes.
267 while (frameReader.nextFrame(this)) {
268 }
269 } catch (IOException ioe) {
simonma7bf17dc2014-07-29 09:52:20 -0700270 abort(Status.fromThrowable(ioe));
simonma77d97062014-07-18 10:22:35 -0700271 } finally {
272 // Restore the original thread name.
273 Thread.currentThread().setName(threadName);
274 }
275 }
276
277 /**
278 * Handle a HTTP2 DATA frame
279 */
280 @Override
281 public void data(boolean inFinished, int streamId, BufferedSource in, int length)
282 throws IOException {
283 final OkHttpClientStream stream;
284 stream = streams.get(streamId);
285 if (stream == null) {
286 frameWriter.rstStream(streamId, ErrorCode.INVALID_STREAM);
287 return;
288 }
289 InputStreamDeframer deframer = stream.getDeframer();
290
291 // Wait until the frame is complete.
292 in.require(length);
293
294 deframer.deliverFrame(ByteStreams.limit(in.inputStream(), length), inFinished);
295 unacknowledgedBytesRead += length;
296 stream.unacknowledgedBytesRead += length;
297 if (unacknowledgedBytesRead >= DEFAULT_INITIAL_WINDOW_SIZE / 2) {
298 frameWriter.windowUpdate(0, unacknowledgedBytesRead);
299 unacknowledgedBytesRead = 0;
300 }
301 if (stream.unacknowledgedBytesRead >= DEFAULT_INITIAL_WINDOW_SIZE / 2) {
302 frameWriter.windowUpdate(streamId, stream.unacknowledgedBytesRead);
303 stream.unacknowledgedBytesRead = 0;
304 }
305 if (inFinished) {
simonma7bf17dc2014-07-29 09:52:20 -0700306 if (finishStream(streamId, Status.OK)) {
307 stopIfNecessary();
308 }
simonma77d97062014-07-18 10:22:35 -0700309 }
310 }
311
312 /**
313 * Handle HTTP2 HEADER and CONTINUATION frames
314 */
315 @Override
316 public void headers(boolean outFinished,
317 boolean inFinished,
318 int streamId,
319 int associatedStreamId,
brettmorgan56f5ec32014-07-18 16:54:50 -0700320 List<Header> headerBlock,
simonma77d97062014-07-18 10:22:35 -0700321 HeadersMode headersMode) {
322 // TODO(user): handle received headers.
323 }
324
325 @Override
326 public void rstStream(int streamId, ErrorCode errorCode) {
simonma7bf17dc2014-07-29 09:52:20 -0700327 if (finishStream(streamId, toGrpcStatus(errorCode))) {
328 stopIfNecessary();
329 }
simonma77d97062014-07-18 10:22:35 -0700330 }
331
332 @Override
333 public void settings(boolean clearPrevious, Settings settings) {
334 // not impl
335 frameWriter.ackSettings();
336 }
337
338 @Override
339 public void ping(boolean ack, int payload1, int payload2) {
340 if (!ack) {
341 frameWriter.ping(true, payload1, payload2);
342 }
343 }
344
345 @Override
346 public void ackSettings() {
347 // Do nothing currently.
348 }
349
350 @Override
351 public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) {
simonma7bf17dc2014-07-29 09:52:20 -0700352 onGoAway(lastGoodStreamId, new Status(Code.UNAVAILABLE, "Go away"));
simonma77d97062014-07-18 10:22:35 -0700353 }
354
355 @Override
356 public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)
357 throws IOException {
simonma7bf17dc2014-07-29 09:52:20 -0700358 // We don't accept server initiated stream.
359 frameWriter.rstStream(streamId, ErrorCode.PROTOCOL_ERROR);
simonma77d97062014-07-18 10:22:35 -0700360 }
361
362 @Override
363 public void windowUpdate(int arg0, long arg1) {
364 // TODO(user): flow control.
365 }
366
367 @Override
brettmorgan56f5ec32014-07-18 16:54:50 -0700368 public void priority(int streamId, int streamDependency, int weight, boolean exclusive) {
simonma77d97062014-07-18 10:22:35 -0700369 // Ignore priority change.
370 // TODO(user): log
371 }
brettmorgan56f5ec32014-07-18 16:54:50 -0700372
373 @Override
374 public void alternateService(int streamId, String origin, ByteString protocol, String host,
375 int port, long maxAge) {
376 // TODO(user): Deal with alternateService propagation
377 }
simonma77d97062014-07-18 10:22:35 -0700378 }
379
simonma7bf17dc2014-07-29 09:52:20 -0700380 @GuardedBy("lock")
381 private void assignStreamId(OkHttpClientStream stream) {
382 Preconditions.checkState(stream.streamId == 0, "StreamId already assigned");
383 stream.streamId = nextStreamId;
384 streams.put(stream.streamId, stream);
385 if (nextStreamId >= Integer.MAX_VALUE - 2) {
386 onGoAway(Integer.MAX_VALUE, new Status(Code.INTERNAL, "Stream id exhaust"));
387 } else {
388 nextStreamId += 2;
389 }
390 }
391
simonma77d97062014-07-18 10:22:35 -0700392 /**
393 * Client stream for the okhttp transport.
394 */
simonma7bf17dc2014-07-29 09:52:20 -0700395 @VisibleForTesting
zhangkun048649e2014-08-28 15:52:03 -0700396 class OkHttpClientStream extends AbstractClientStream {
simonma77d97062014-07-18 10:22:35 -0700397 int streamId;
398 final InputStreamDeframer deframer;
399 int unacknowledgedBytesRead;
400
simonma7bf17dc2014-07-29 09:52:20 -0700401 OkHttpClientStream(MethodDescriptor<?, ?> method, StreamListener listener) {
simonma77d97062014-07-18 10:22:35 -0700402 super(listener);
simonma77d97062014-07-18 10:22:35 -0700403 deframer = new InputStreamDeframer(inboundMessageHandler());
simonma7bf17dc2014-07-29 09:52:20 -0700404 synchronized (lock) {
405 if (goAway) {
406 setStatus(goAwayStatus);
407 return;
408 }
409 assignStreamId(this);
410 }
411 frameWriter.synStream(false, false, streamId, 0,
412 Headers.createRequestHeaders(method.getName()));
simonma77d97062014-07-18 10:22:35 -0700413 }
414
simonma7bf17dc2014-07-29 09:52:20 -0700415 InputStreamDeframer getDeframer() {
simonma77d97062014-07-18 10:22:35 -0700416 return deframer;
417 }
418
419 @Override
420 protected void sendFrame(ByteBuffer frame, boolean endOfStream) {
421 Preconditions.checkState(streamId != 0, "streamId should be set");
simonma18eb63b2014-09-04 13:16:28 -0700422 Buffer buffer = new Buffer();
423 // Read the data into a buffer.
424 // TODO(user): swap to NIO buffers or zero-copy if/when okhttp/okio supports it
425 buffer.write(frame.array(), frame.arrayOffset(), frame.remaining());
simonma77d97062014-07-18 10:22:35 -0700426 // Write the data to the remote endpoint.
427 frameWriter.data(endOfStream, streamId, buffer);
428 frameWriter.flush();
429 }
430
431 @Override
nathanmittler43d2fcc2014-08-29 21:58:54 -0700432 protected void disableWindowUpdate(ListenableFuture<Void> processingFuture) {
433 // TODO(user): implement inbound flow control.
434 }
435
436 @Override
simonma77d97062014-07-18 10:22:35 -0700437 public void cancel() {
simonma1966d5b2014-08-08 14:51:01 -0700438 if (streamId == 0) {
439 // This should only happens when the stream was failed in constructor.
440 Preconditions.checkState(state() == StreamState.CLOSED, "A unclosed stream has no id");
441 }
simonma77d97062014-07-18 10:22:35 -0700442 outboundPhase = Phase.STATUS;
simonma7bf17dc2014-07-29 09:52:20 -0700443 if (finishStream(streamId, toGrpcStatus(ErrorCode.CANCEL))) {
simonma77d97062014-07-18 10:22:35 -0700444 frameWriter.rstStream(streamId, ErrorCode.CANCEL);
simonma7bf17dc2014-07-29 09:52:20 -0700445 stopIfNecessary();
simonma77d97062014-07-18 10:22:35 -0700446 }
447 }
448 }
449}