blob: 86fad438906e5d4767b7877c05daf73c7deb5e29 [file] [log] [blame]
package com.google.net.stubby.newtransport.http;
import static com.google.net.stubby.Status.CANCELLED;
import static com.google.net.stubby.newtransport.HttpUtil.CONTENT_TYPE_HEADER;
import static com.google.net.stubby.newtransport.HttpUtil.CONTENT_TYPE_PROTORPC;
import static com.google.net.stubby.newtransport.HttpUtil.HTTP_METHOD;
import com.google.common.base.Preconditions;
import com.google.common.io.ByteBuffers;
import com.google.net.stubby.MethodDescriptor;
import com.google.net.stubby.Status;
import com.google.net.stubby.newtransport.AbstractClientTransport;
import com.google.net.stubby.newtransport.AbstractStream;
import com.google.net.stubby.newtransport.ClientStream;
import com.google.net.stubby.newtransport.InputStreamDeframer;
import com.google.net.stubby.newtransport.StreamListener;
import com.google.net.stubby.newtransport.StreamState;
import com.google.net.stubby.transport.Transport;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
/**
* A simple client-side transport for RPC-over-HTTP/1.1. All execution (including listener
* callbacks) are executed in the application thread context.
*/
public class HttpClientTransport extends AbstractClientTransport {
private final URI baseUri;
private final Set<HttpClientStream> streams =
Collections.synchronizedSet(new HashSet<HttpClientStream>());
public HttpClientTransport(URI baseUri) {
this.baseUri = Preconditions.checkNotNull(baseUri, "baseUri");
}
@Override
protected ClientStream newStreamInternal(MethodDescriptor<?, ?> method, StreamListener listener) {
URI uri = baseUri.resolve(method.getName());
HttpClientStream stream = new HttpClientStream(uri, listener);
synchronized (streams) {
// Check for RUNNING to deal with race condition of this being executed right after doStop
// cancels all the streams.
if (state() != State.RUNNING) {
throw new IllegalStateException("Invalid state for creating new stream: " + state());
}
streams.add(stream);
return stream;
}
}
@Override
protected void doStart() {
notifyStarted();
}
@Override
protected void doStop() {
// Cancel all of the streams for this transport.
synchronized (streams) {
// Guaranteed to be in the STOPPING state here.
for (HttpClientStream stream : streams.toArray(new HttpClientStream[0])) {
stream.cancel();
}
}
notifyStopped();
}
/**
* Client stream implementation for an HTTP transport.
*/
private class HttpClientStream extends AbstractStream implements ClientStream {
final HttpURLConnection connection;
final DataOutputStream outputStream;
boolean connected;
HttpClientStream(URI uri, StreamListener listener) {
super(listener);
try {
connection = (HttpURLConnection) uri.toURL().openConnection();
connection.setDoOutput(true);
connection.setDoInput(true);
connection.setRequestMethod(HTTP_METHOD);
connection.setRequestProperty(CONTENT_TYPE_HEADER, CONTENT_TYPE_PROTORPC);
outputStream = new DataOutputStream(connection.getOutputStream());
connected = true;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void cancel() {
outboundPhase = Phase.STATUS;
if (setStatus(CANCELLED)) {
disconnect();
}
}
@Override
protected void sendFrame(ByteBuffer frame, boolean endOfStream) {
if (state() == StreamState.CLOSED) {
// Ignore outbound frames after the stream has closed.
return;
}
try {
// Synchronizing here to protect against cancellation due to the transport shutting down.
synchronized (connection) {
// Write the data to the connection output stream.
ByteBuffers.asByteSource(frame).copyTo(outputStream);
if (endOfStream) {
// Close the output stream on this connection.
connection.getOutputStream().close();
// The request has completed so now process the response. This results in the listener's
// closed() callback being invoked since we're indicating that this is the end of the
// response stream.
//
// NOTE: Must read the response in the sending thread, since URLConnection has threading
// issues.
new InputStreamDeframer(inboundMessageHandler()).deliverFrame(
connection.getInputStream(), true);
// Close the input stream and disconnect.
connection.getInputStream().close();
disconnect();
}
}
} catch (IOException ioe) {
setStatus(new Status(Transport.Code.INTERNAL, ioe));
}
}
@Override
public void dispose() {
super.dispose();
disconnect();
}
/**
* Disconnects the HTTP connection if currently connected.
*/
private void disconnect() {
// Synchronizing since this may be called for the stream (i.e. cancel or read complete) or
// due to shutting down the transport (i.e. cancel).
synchronized (connection) {
if (connected) {
connected = false;
streams.remove(this);
connection.disconnect();
}
}
}
}
}