blob: 2307487ae67173133be89c3c2d0d4c3b78e6008e [file] [log] [blame]
package com.google.net.stubby.http;
import com.google.common.io.ByteBuffers;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.net.stubby.AbstractResponse;
import com.google.net.stubby.Metadata;
import com.google.net.stubby.Operation;
import com.google.net.stubby.Request;
import com.google.net.stubby.Response;
import com.google.net.stubby.Session;
import com.google.net.stubby.Status;
import com.google.net.stubby.transport.Framer;
import com.google.net.stubby.transport.MessageFramer;
import com.google.net.stubby.transport.Transport;
import com.google.net.stubby.transport.TransportFrameUtil;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.concurrent.Executor;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
/**
* A server-only session to be used with a servlet engine. The wrapped session MUST use a
* same-thread executor for work-dispatch
*/
// TODO(user) Support a more flexible threading model than same-thread
// TODO(user) Investigate Servlet3 compliance, in particular thread detaching
public class ServletSession extends HttpServlet {
public static final String PROTORPC = "application/protorpc";
public static final String CONTENT_TYPE = "content-type";
private final Session session;
private final Executor executor;
public ServletSession(Session session, Executor executor) {
this.session = session;
this.executor = executor;
}
@Override
public String getServletName() {
return "gRPCServlet";
}
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException,
IOException {
try {
final SettableFuture<Void> requestCompleteFuture = SettableFuture.create();
final ResponseStream responseStream = new ResponseStream(resp, requestCompleteFuture);
final Request request = startRequest(req, resp, responseStream);
if (request == null) {
return;
}
// Deframe the request and begin the response processing.
new HttpStreamDeframer().deframe(req.getInputStream(), request);
request.close(Status.OK);
// Notify the response processing that the request is complete.
requestCompleteFuture.set(null);
// Block until the response is complete.
responseStream.getResponseCompleteFuture().get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* Start the Request operation on the server
*/
@SuppressWarnings("unchecked")
private Request startRequest(HttpServletRequest req, HttpServletResponse resp,
ResponseStream responseStream) throws IOException {
// TODO(user): Move into shared utility
if (!PROTORPC.equals(req.getHeader(CONTENT_TYPE))) {
resp.sendError(HttpServletResponse.SC_UNSUPPORTED_MEDIA_TYPE,
"The only supported content-type is " + PROTORPC);
return null;
}
// Use Path to specify the operation
String operationName = normalizeOperationName(req.getPathInfo());
if (operationName == null) {
resp.sendError(HttpServletResponse.SC_NOT_FOUND);
return null;
}
List<String> headerList = new ArrayList<>();
Enumeration headerNames = req.getHeaderNames();
while (headerNames.hasMoreElements()) {
String name = headerNames.nextElement().toString();
headerList.add(name);
headerList.add(req.getHeader(name));
}
Metadata.Headers headers = new Metadata.Headers(headerList.toArray(new String[]{}));
// Create the operation and bind an HTTP response operation
Request op = session.startRequest(operationName, headers,
HttpResponseOperation.builder(responseStream));
if (op == null) {
// TODO(user): Unify error handling once spec finalized
resp.sendError(HttpServletResponse.SC_NOT_FOUND, "Unknown RPC operation");
return null;
}
return op;
}
private String normalizeOperationName(String path) {
// TODO(user): This is where we would add path-namespacing of different implementations
// of services so they do not collide. For the moment this is not supported.
return path.substring(1);
}
/**
* Implementation of {@link Response}
*/
private static class HttpResponseOperation extends AbstractResponse implements Framer.Sink {
static ResponseBuilder builder(final ResponseStream responseStream) {
return new ResponseBuilder() {
@Override
public Response build(int id) {
return new HttpResponseOperation(id, responseStream);
}
@Override
public Response build() {
return new HttpResponseOperation(-1, responseStream);
}
};
}
private final MessageFramer framer;
private final ResponseStream responseStream;
private HttpResponseOperation(int id, ResponseStream responseStream) {
super(id);
this.responseStream = responseStream;
// Always use no compression framing and treat the stream as one large frame
framer = new MessageFramer(4096);
framer.setAllowCompression(false);
try {
responseStream.write(TransportFrameUtil.NO_COMPRESS_FLAG);
} catch (IOException ioe) {
close(new Status(Transport.Code.INTERNAL, ioe));
}
}
@Override
public Operation addContext(String type, InputStream message, Phase nextPhase) {
super.addContext(type, message, nextPhase);
framer.writeContext(type, message, getPhase() == Phase.CLOSED, this);
return this;
}
@Override
public Operation addPayload(InputStream payload, Phase nextPhase) {
super.addPayload(payload, Phase.PAYLOAD);
framer.writePayload(payload, false, this);
if (nextPhase == Phase.CLOSED) {
close(Status.OK);
}
return this;
}
@Override
public Operation close(Status status) {
boolean alreadyClosed = getPhase() == Phase.CLOSED;
super.close(status);
if (!alreadyClosed) {
framer.writeStatus(status, true, this);
}
return this;
}
@Override
public void deliverFrame(ByteBuffer frame, boolean endOfMessage) {
boolean closed = getPhase() == Phase.CLOSED;
try {
// Skip the frame flag as we don't care about it for streaming output
frame.position(1);
ByteBuffers.asByteSource(frame).copyTo(responseStream);
} catch (Throwable t) {
close(new Status(Transport.Code.INTERNAL, t));
} finally {
if (closed && endOfMessage) {
framer.close();
responseStream.close();
}
}
}
}
/**
* Wraps the HTTP response {@link OutputStream}. Will buffer bytes until the request is
* complete. It will then flush its buffer to the output stream and all subsequent writes
* will go directly to the HTTP response.
*/
private class ResponseStream extends OutputStream {
private final SettableFuture<Void> responseCompleteFuture = SettableFuture.create();
private final ByteArrayOutputStream buffer = new ByteArrayOutputStream();
private final HttpServletResponse httpResponse;
private volatile boolean requestComplete;
private volatile boolean closed;
public ResponseStream(HttpServletResponse httpResponse,
SettableFuture<Void> requestCompleteFuture) {
this.httpResponse = httpResponse;
httpResponse.setHeader("Content-Type", PROTORPC);
requestCompleteFuture.addListener(new Runnable() {
@Override
public void run() {
onRequestComplete();
}
}, executor);
}
public ListenableFuture<Void> getResponseCompleteFuture() {
return responseCompleteFuture;
}
@Override
public void close() {
synchronized (buffer) {
closed = true;
// If all the data has been written to the output stream, finish the response.
if (buffer.size() == 0) {
finish();
}
}
}
@Override
public void write(int b) throws IOException {
write(new byte[] {(byte) b}, 0, 1);
}
@Override
public void write(byte[] b) throws IOException {
write(b, 0, b.length);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
// This loop will execute at most 2 times.
while (true) {
if (requestComplete) {
// The request already completed, just write directly to the response output stream.
httpResponse.getOutputStream().write(b, off, len);
return;
}
synchronized (buffer) {
if (requestComplete) {
// Handle the case that we completed the request just after the first check
// above. Just go back to the top of the loop and write directly to the response.
continue;
}
// Request hasn't completed yet, buffer the data for now.
buffer.write(b, off, len);
return;
}
}
}
private void onRequestComplete() {
try {
// Write the content of the buffer to the HTTP response.
synchronized (buffer) {
if (buffer.size() > 0) {
httpResponse.getOutputStream().write(buffer.toByteArray());
buffer.reset();
}
requestComplete = true;
if (closed) {
// The response is complete, finish the response.
finish();
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* Closes the HTTP response and sets the future response completion future.
*/
private void finish() {
try {
httpResponse.getOutputStream().close();
responseCompleteFuture.set(null);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}