blob: 96c5b7d1010fe186035c7e263f577e54470a46ed [file] [log] [blame]
ejona07d3f6a2014-05-14 11:26:57 -07001package com.google.net.stubby.http;
2
3import com.google.common.io.ByteBuffers;
4import com.google.net.stubby.AbstractRequest;
5import com.google.net.stubby.Operation;
6import com.google.net.stubby.Response;
7import com.google.net.stubby.Session;
8import com.google.net.stubby.Status;
9import com.google.net.stubby.transport.Framer;
10import com.google.net.stubby.transport.MessageFramer;
11import com.google.net.stubby.transport.Transport;
12
13import java.io.DataOutputStream;
14import java.io.IOException;
15import java.io.InputStream;
16import java.net.HttpURLConnection;
17import java.net.URI;
18import java.nio.ByteBuffer;
19
20/**
21 * Implementation of {@link Session} using {@link HttpURLConnection} for clients. Services
22 * are dispatched relative to a base URI.
23 */
24public class UrlConnectionClientSession implements Session {
25
26 private final URI base;
27
28 public UrlConnectionClientSession(URI base) {
29 this.base = base;
30 }
31
32 @Override
33 public Request startRequest(String operationName, Response.ResponseBuilder responseBuilder) {
34 return new Request(base.resolve(operationName), responseBuilder.build());
35 }
36
37 private class Request extends AbstractRequest implements Framer.Sink {
38
39 private final HttpURLConnection connection;
40 private final DataOutputStream outputStream;
41 private final MessageFramer framer;
42
43 private Request(URI uri, Response response) {
44 super(response);
45 try {
46 connection = (HttpURLConnection) uri.toURL().openConnection();
47 connection.setDoOutput(true);
48 connection.setDoInput(true);
49 connection.setRequestMethod("POST");
50 connection.setRequestProperty("Content-Type", "application/protorpc");
51 outputStream = new DataOutputStream(connection.getOutputStream());
52 } catch (IOException t) {
53 throw new RuntimeException(t);
54 }
55 // No compression when framing over HTTP for the moment
56 framer = new MessageFramer(4096);
57 framer.setAllowCompression(false);
58 }
59
60 @Override
61 public Operation addContext(String type, InputStream message, Phase nextPhase) {
62 super.addContext(type, message, nextPhase);
63 framer.writeContext(type, message, getPhase() == Phase.CLOSED, this);
64 return this;
65 }
66
67 @Override
68 public Operation addPayload(InputStream payload, Phase nextPhase) {
69 super.addPayload(payload, nextPhase);
70 framer.writePayload(payload, getPhase() == Phase.CLOSED, this);
71 return this;
72 }
73
74 @Override
75 public void deliverFrame(ByteBuffer frame, boolean endOfMessage) {
76 boolean closed = getPhase() == Phase.CLOSED;
77
78 try {
79 ByteBuffers.asByteSource(frame).copyTo(outputStream);
80 if (closed && endOfMessage) {
81 connection.getOutputStream().close();
82 // The request has completed so now process the response. Must do this in the same
83 // thread as URLConnection has threading issues.
84 new HttpStreamDeframer().deframe(connection.getInputStream(), getResponse());
85 connection.getInputStream().close();
86 connection.disconnect();
87 }
88 } catch (IOException ioe) {
89 close(new Status(Transport.Code.INTERNAL, ioe));
90 } finally {
91 if (closed && endOfMessage) {
92 framer.close();
93 }
94 }
95 }
96 }
97}
98