blob: cb3287021d4e225902d8d2756c74733cbce7e42c [file] [log] [blame]
ejona07d3f6a2014-05-14 11:26:57 -07001package com.google.net.stubby.http;
2
3import com.google.common.io.ByteBuffers;
4import com.google.net.stubby.AbstractRequest;
lryane4bd1c72014-09-08 14:03:35 -07005import com.google.net.stubby.Metadata;
ejona07d3f6a2014-05-14 11:26:57 -07006import com.google.net.stubby.Operation;
7import com.google.net.stubby.Response;
8import com.google.net.stubby.Session;
9import com.google.net.stubby.Status;
10import com.google.net.stubby.transport.Framer;
11import com.google.net.stubby.transport.MessageFramer;
12import com.google.net.stubby.transport.Transport;
13
14import java.io.DataOutputStream;
15import java.io.IOException;
16import java.io.InputStream;
17import java.net.HttpURLConnection;
18import java.net.URI;
19import java.nio.ByteBuffer;
20
21/**
22 * Implementation of {@link Session} using {@link HttpURLConnection} for clients. Services
23 * are dispatched relative to a base URI.
24 */
25public class UrlConnectionClientSession implements Session {
26
27 private final URI base;
28
29 public UrlConnectionClientSession(URI base) {
30 this.base = base;
31 }
32
33 @Override
lryane4bd1c72014-09-08 14:03:35 -070034 public Request startRequest(String operationName, Metadata.Headers headers,
lryan34650402014-07-14 13:39:39 -070035 Response.ResponseBuilder responseBuilder) {
lryane4bd1c72014-09-08 14:03:35 -070036 return new Request(base.resolve(operationName), headers,
37 responseBuilder.build());
ejona07d3f6a2014-05-14 11:26:57 -070038 }
39
40 private class Request extends AbstractRequest implements Framer.Sink {
41
42 private final HttpURLConnection connection;
43 private final DataOutputStream outputStream;
44 private final MessageFramer framer;
45
lryane4bd1c72014-09-08 14:03:35 -070046 private Request(URI uri, Metadata.Headers headers, Response response) {
ejona07d3f6a2014-05-14 11:26:57 -070047 super(response);
48 try {
49 connection = (HttpURLConnection) uri.toURL().openConnection();
50 connection.setDoOutput(true);
51 connection.setDoInput(true);
52 connection.setRequestMethod("POST");
53 connection.setRequestProperty("Content-Type", "application/protorpc");
lryane4bd1c72014-09-08 14:03:35 -070054 String[] serialized = headers.serializeAscii();
55 for (int i = 0; i < serialized.length; i++) {
56 connection.setRequestProperty(
57 serialized[i],
58 serialized[++i]);
lryan34650402014-07-14 13:39:39 -070059 }
ejona07d3f6a2014-05-14 11:26:57 -070060 outputStream = new DataOutputStream(connection.getOutputStream());
61 } catch (IOException t) {
62 throw new RuntimeException(t);
63 }
64 // No compression when framing over HTTP for the moment
65 framer = new MessageFramer(4096);
66 framer.setAllowCompression(false);
67 }
68
69 @Override
70 public Operation addContext(String type, InputStream message, Phase nextPhase) {
71 super.addContext(type, message, nextPhase);
72 framer.writeContext(type, message, getPhase() == Phase.CLOSED, this);
73 return this;
74 }
75
76 @Override
77 public Operation addPayload(InputStream payload, Phase nextPhase) {
78 super.addPayload(payload, nextPhase);
79 framer.writePayload(payload, getPhase() == Phase.CLOSED, this);
80 return this;
81 }
82
83 @Override
lryan2ce84462014-06-02 14:43:36 -070084 public Operation close(Status status) {
85 // TODO(user): This is broken but necessary to get test passing with the introduction
86 // of Channel as now for most calls the close() call is decoupled from the last call to
87 // addPayload. The real fix is to remove 'nextPhase' from the Operation interface and
88 // clean up Framer. For a follow up CL.
89 boolean alreadyClosed = getPhase() == Phase.CLOSED;
90 super.close(status);
91 if (!alreadyClosed) {
92 framer.writeStatus(status, true, this);
93 }
94 return this;
95 }
96
97 @Override
ejona07d3f6a2014-05-14 11:26:57 -070098 public void deliverFrame(ByteBuffer frame, boolean endOfMessage) {
99 boolean closed = getPhase() == Phase.CLOSED;
ejona07d3f6a2014-05-14 11:26:57 -0700100 try {
101 ByteBuffers.asByteSource(frame).copyTo(outputStream);
102 if (closed && endOfMessage) {
103 connection.getOutputStream().close();
104 // The request has completed so now process the response. Must do this in the same
105 // thread as URLConnection has threading issues.
106 new HttpStreamDeframer().deframe(connection.getInputStream(), getResponse());
107 connection.getInputStream().close();
108 connection.disconnect();
109 }
110 } catch (IOException ioe) {
111 close(new Status(Transport.Code.INTERNAL, ioe));
ejona07d3f6a2014-05-14 11:26:57 -0700112 }
113 }
114 }
115}