blob: b60897b082c6ccf86cfb280517a624cbb6ee297c [file] [log] [blame]
simonma77d97062014-07-18 10:22:35 -07001package com.google.net.stubby.newtransport.okhttp;
2
3import com.google.common.base.Preconditions;
4import com.google.common.collect.ImmutableMap;
5import com.google.common.io.ByteBuffers;
6import com.google.common.io.ByteStreams;
7import com.google.net.stubby.MethodDescriptor;
8import com.google.net.stubby.Status;
9import com.google.net.stubby.http2.okhttp.Headers;
10import com.google.net.stubby.newtransport.AbstractClientTransport;
11import com.google.net.stubby.newtransport.AbstractStream;
12import com.google.net.stubby.newtransport.ClientStream;
13import com.google.net.stubby.newtransport.ClientTransport;
14import com.google.net.stubby.newtransport.InputStreamDeframer;
15import com.google.net.stubby.newtransport.StreamListener;
16import com.google.net.stubby.transport.Transport;
17import com.google.net.stubby.transport.Transport.Code;
18
19import com.squareup.okhttp.internal.spdy.ErrorCode;
20import com.squareup.okhttp.internal.spdy.FrameReader;
21import com.squareup.okhttp.internal.spdy.Header;
22import com.squareup.okhttp.internal.spdy.HeadersMode;
23import com.squareup.okhttp.internal.spdy.Http20Draft10;
24import com.squareup.okhttp.internal.spdy.Settings;
25import com.squareup.okhttp.internal.spdy.Variant;
26
27import okio.ByteString;
28import okio.BufferedSink;
29import okio.BufferedSource;
30import okio.Okio;
31import okio.Buffer;
32
33import java.io.IOException;
34import java.net.Socket;
35import java.nio.ByteBuffer;
36import java.util.Collection;
37import java.util.Collections;
38import java.util.HashMap;
39import java.util.List;
40import java.util.Map;
41import java.util.concurrent.ExecutorService;
42import java.util.concurrent.Executors;
43
44import javax.annotation.concurrent.GuardedBy;
45
46/**
47 * A okhttp-based {@link ClientTransport} implementation.
48 */
49public class OkHttpClientTransport extends AbstractClientTransport {
50 /** The default initial window size in HTTP/2 is 64 KiB for the stream and connection. */
51 static final int DEFAULT_INITIAL_WINDOW_SIZE = 64 * 1024;
52
53 private static final ImmutableMap<ErrorCode, Status> ERROR_CODE_TO_STATUS = ImmutableMap
54 .<ErrorCode, Status>builder()
55 .put(ErrorCode.NO_ERROR, Status.OK)
56 .put(ErrorCode.PROTOCOL_ERROR, new Status(Transport.Code.INTERNAL, "Protocol error"))
57 .put(ErrorCode.INVALID_STREAM, new Status(Transport.Code.INTERNAL, "Invalid stream"))
58 .put(ErrorCode.UNSUPPORTED_VERSION,
59 new Status(Transport.Code.INTERNAL, "Unsupported version"))
60 .put(ErrorCode.STREAM_IN_USE, new Status(Transport.Code.INTERNAL, "Stream in use"))
61 .put(ErrorCode.STREAM_ALREADY_CLOSED,
62 new Status(Transport.Code.INTERNAL, "Stream already closed"))
63 .put(ErrorCode.INTERNAL_ERROR, new Status(Transport.Code.INTERNAL, "Internal error"))
64 .put(ErrorCode.FLOW_CONTROL_ERROR, new Status(Transport.Code.INTERNAL, "Flow control error"))
65 .put(ErrorCode.STREAM_CLOSED, new Status(Transport.Code.INTERNAL, "Stream closed"))
66 .put(ErrorCode.FRAME_TOO_LARGE, new Status(Transport.Code.INTERNAL, "Frame too large"))
67 .put(ErrorCode.REFUSED_STREAM, new Status(Transport.Code.INTERNAL, "Refused stream"))
68 .put(ErrorCode.CANCEL, new Status(Transport.Code.CANCELLED, "Cancelled"))
69 .put(ErrorCode.COMPRESSION_ERROR, new Status(Transport.Code.INTERNAL, "Compression error"))
70 .put(ErrorCode.INVALID_CREDENTIALS,
71 new Status(Transport.Code.PERMISSION_DENIED, "Invalid credentials"))
72 .build();
73
74 private final String host;
75 private final int port;
76 private FrameReader frameReader;
77 private AsyncFrameWriter frameWriter;
78 @GuardedBy("this")
79 private int nextStreamId;
80 private final Map<Integer, OkHttpClientStream> streams =
81 Collections.synchronizedMap(new HashMap<Integer, OkHttpClientStream>());
82 private final ExecutorService executor = Executors.newCachedThreadPool();
83 private int unacknowledgedBytesRead;
84
85 public OkHttpClientTransport(String host, int port) {
86 this.host = host;
87 this.port = port;
88 // Client initiated streams are odd, server initiated ones are even. Server should not need to
89 // use it. We start clients at 3 to avoid conflicting with HTTP negotiation.
90 nextStreamId = 3;
91 }
92
93 @Override
94 protected ClientStream newStreamInternal(MethodDescriptor<?, ?> method, StreamListener listener) {
95 return new OkHttpClientStream(method, listener);
96 }
97
98 @Override
99 protected void doStart() {
100 BufferedSource source;
101 BufferedSink sink;
102 try {
103 Socket socket = new Socket(host, port);
104 // TODO(user): use SpdyConnection.
105 source = Okio.buffer(Okio.source(socket));
106 sink = Okio.buffer(Okio.sink(socket));
107 } catch (IOException e) {
108 throw new RuntimeException(e);
109 }
110 Variant variant = new Http20Draft10();
111 frameReader = variant.newReader(source, true);
112 frameWriter = new AsyncFrameWriter(variant.newWriter(sink, true), this, executor);
113
114 executor.execute(new ClientFrameHandler());
115 notifyStarted();
116 }
117
118 @Override
119 protected void doStop() {
120 closeAllStreams(new Status(Code.INTERNAL, "Transport stopped"));
121 frameWriter.close();
122 try {
123 frameReader.close();
124 } catch (IOException e) {
125 throw new RuntimeException(e);
126 }
127 executor.shutdown();
128 notifyStopped();
129 }
130
131 /**
132 * Close and remove all streams.
133 */
134 private void closeAllStreams(Status status) {
135 Collection<OkHttpClientStream> streamsCopy;
136 synchronized (streams) {
137 streamsCopy = streams.values();
138 streams.clear();
139 }
140 for (OkHttpClientStream stream : streamsCopy) {
141 stream.setStatus(status);
142 }
143 }
144
145 /**
146 * Called when a HTTP2 stream is closed.
147 *
148 * <p> Return false if the stream has already finished.
149 */
150 private boolean finishStream(int streamId, Status status) {
151 OkHttpClientStream stream;
152 stream = streams.remove(streamId);
153 if (stream != null) {
154 // This is mainly for failed streams, for successfully finished streams, it's a no-op.
155 stream.setStatus(status);
156 return true;
157 }
158 return false;
159 }
160
161 /**
162 * Runnable which reads frames and dispatches them to in flight calls
163 */
164 private class ClientFrameHandler implements FrameReader.Handler, Runnable {
165 private ClientFrameHandler() {}
166
167 @Override
168 public void run() {
169 String threadName = Thread.currentThread().getName();
170 Thread.currentThread().setName("OkHttpClientTransport");
171 try {
172 // Read until the underlying socket closes.
173 while (frameReader.nextFrame(this)) {
174 }
175 } catch (IOException ioe) {
176 ioe.printStackTrace();
177 closeAllStreams(new Status(Code.INTERNAL, ioe.getMessage()));
178 } finally {
179 // Restore the original thread name.
180 Thread.currentThread().setName(threadName);
181 }
182 }
183
184 /**
185 * Handle a HTTP2 DATA frame
186 */
187 @Override
188 public void data(boolean inFinished, int streamId, BufferedSource in, int length)
189 throws IOException {
190 final OkHttpClientStream stream;
191 stream = streams.get(streamId);
192 if (stream == null) {
193 frameWriter.rstStream(streamId, ErrorCode.INVALID_STREAM);
194 return;
195 }
196 InputStreamDeframer deframer = stream.getDeframer();
197
198 // Wait until the frame is complete.
199 in.require(length);
200
201 deframer.deliverFrame(ByteStreams.limit(in.inputStream(), length), inFinished);
202 unacknowledgedBytesRead += length;
203 stream.unacknowledgedBytesRead += length;
204 if (unacknowledgedBytesRead >= DEFAULT_INITIAL_WINDOW_SIZE / 2) {
205 frameWriter.windowUpdate(0, unacknowledgedBytesRead);
206 unacknowledgedBytesRead = 0;
207 }
208 if (stream.unacknowledgedBytesRead >= DEFAULT_INITIAL_WINDOW_SIZE / 2) {
209 frameWriter.windowUpdate(streamId, stream.unacknowledgedBytesRead);
210 stream.unacknowledgedBytesRead = 0;
211 }
212 if (inFinished) {
213 finishStream(streamId, Status.OK);
214 }
215 }
216
217 /**
218 * Handle HTTP2 HEADER and CONTINUATION frames
219 */
220 @Override
221 public void headers(boolean outFinished,
222 boolean inFinished,
223 int streamId,
224 int associatedStreamId,
225 int priority,
226 List<Header> headers,
227 HeadersMode headersMode) {
228 // TODO(user): handle received headers.
229 }
230
231 @Override
232 public void rstStream(int streamId, ErrorCode errorCode) {
233 finishStream(streamId, ERROR_CODE_TO_STATUS.get(errorCode));
234 }
235
236 @Override
237 public void settings(boolean clearPrevious, Settings settings) {
238 // not impl
239 frameWriter.ackSettings();
240 }
241
242 @Override
243 public void ping(boolean ack, int payload1, int payload2) {
244 if (!ack) {
245 frameWriter.ping(true, payload1, payload2);
246 }
247 }
248
249 @Override
250 public void ackSettings() {
251 // Do nothing currently.
252 }
253
254 @Override
255 public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) {
256 // TODO(user): Log here and implement the real Go away behavior: streams have
257 // id <= lastGoodStreamId should not be closed.
258 closeAllStreams(new Status(Code.UNAVAILABLE, "Go away"));
259 stopAsync();
260 }
261
262 @Override
263 public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)
264 throws IOException {
265 // TODO(user): should send SETTINGS_ENABLE_PUSH=0, then here we should reset it with
266 // PROTOCOL_ERROR.
267 frameWriter.rstStream(streamId, ErrorCode.REFUSED_STREAM);
268 }
269
270 @Override
271 public void windowUpdate(int arg0, long arg1) {
272 // TODO(user): flow control.
273 }
274
275 @Override
276 public void priority(int streamId, int priority) {
277 // Ignore priority change.
278 // TODO(user): log
279 }
280 }
281
282 /**
283 * Client stream for the okhttp transport.
284 */
285 private class OkHttpClientStream extends AbstractStream implements ClientStream {
286 int streamId;
287 final InputStreamDeframer deframer;
288 int unacknowledgedBytesRead;
289
290 public OkHttpClientStream(MethodDescriptor<?, ?> method, StreamListener listener) {
291 super(listener);
292 Preconditions.checkState(streamId == 0, "StreamId should be 0");
293 synchronized (OkHttpClientTransport.this) {
294 streamId = nextStreamId;
295 nextStreamId += 2;
296 streams.put(streamId, this);
297 frameWriter.synStream(false, false, streamId, 0, 0, 0,
298 Headers.createRequestHeaders(method.getName()));
299 }
300 deframer = new InputStreamDeframer(inboundMessageHandler());
301 }
302
303 public InputStreamDeframer getDeframer() {
304 return deframer;
305 }
306
307 @Override
308 protected void sendFrame(ByteBuffer frame, boolean endOfStream) {
309 Preconditions.checkState(streamId != 0, "streamId should be set");
310 Buffer buffer;
311 try {
312 // Read the data into a buffer.
313 // TODO(user): swap to NIO buffers or zero-copy if/when okhttp/okio supports it
314 buffer = new Buffer().readFrom(ByteBuffers.newConsumingInputStream(frame));
315 } catch (IOException e) {
316 throw new RuntimeException(e);
317 }
318
319 // Write the data to the remote endpoint.
320 frameWriter.data(endOfStream, streamId, buffer);
321 frameWriter.flush();
322 }
323
324 @Override
325 public void cancel() {
326 Preconditions.checkState(streamId != 0, "streamId should be set");
327 outboundPhase = Phase.STATUS;
328 if (finishStream(streamId, ERROR_CODE_TO_STATUS.get(ErrorCode.CANCEL))) {
329 frameWriter.rstStream(streamId, ErrorCode.CANCEL);
330 }
331 }
332 }
333}