Add simple server timeout support
Reintroduce throws
Add timeoutExecutor shutdown
Use a default future
Move timeout cancellation
Cancel the timeout in error cases
diff --git a/core/src/main/java/io/grpc/ChannelImpl.java b/core/src/main/java/io/grpc/ChannelImpl.java
index 5a19266..03387a4 100644
--- a/core/src/main/java/io/grpc/ChannelImpl.java
+++ b/core/src/main/java/io/grpc/ChannelImpl.java
@@ -306,7 +306,7 @@
}
}
- private class CallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
+ private final class CallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
private final MethodDescriptor<ReqT, RespT> method;
private final SerializingExecutor callExecutor;
private final boolean unaryRequest;
@@ -314,7 +314,7 @@
private ClientStream stream;
private volatile ScheduledFuture<?> deadlineCancellationFuture;
- public CallImpl(MethodDescriptor<ReqT, RespT> method, SerializingExecutor executor,
+ private CallImpl(MethodDescriptor<ReqT, RespT> method, SerializingExecutor executor,
CallOptions callOptions) {
this.method = method;
this.callExecutor = executor;
@@ -403,6 +403,7 @@
stream.writeMessage(payloadIs);
failed = false;
} finally {
+ // TODO(notcarl): Find out if payloadIs needs to be closed.
if (failed) {
cancel();
}
diff --git a/core/src/main/java/io/grpc/Metadata.java b/core/src/main/java/io/grpc/Metadata.java
index 342c0e4..cba3d1a 100644
--- a/core/src/main/java/io/grpc/Metadata.java
+++ b/core/src/main/java/io/grpc/Metadata.java
@@ -42,7 +42,6 @@
import java.util.Arrays;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import javax.annotation.concurrent.NotThreadSafe;
@@ -204,9 +203,9 @@
Preconditions.checkState(serializable, "Can't serialize raw metadata");
byte[][] serialized = new byte[store.size() * 2][];
int i = 0;
- for (Map.Entry<String, MetadataEntry> entry : store.entries()) {
- serialized[i++] = entry.getValue().key.asciiName();
- serialized[i++] = entry.getValue().getSerialized();
+ for (MetadataEntry entry : store.values()) {
+ serialized[i++] = entry.key.asciiName();
+ serialized[i++] = entry.getSerialized();
}
return serialized;
}
diff --git a/core/src/main/java/io/grpc/ServerImpl.java b/core/src/main/java/io/grpc/ServerImpl.java
index cccdbb0..32b5e14 100644
--- a/core/src/main/java/io/grpc/ServerImpl.java
+++ b/core/src/main/java/io/grpc/ServerImpl.java
@@ -33,6 +33,7 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.Futures;
import io.grpc.transport.ServerListener;
import io.grpc.transport.ServerStream;
@@ -42,9 +43,13 @@
import java.io.IOException;
import java.io.InputStream;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
@@ -64,6 +69,8 @@
public final class ServerImpl extends Server {
private static final ServerStreamListener NOOP_LISTENER = new NoopListener();
+ private static final Future<?> DEFAULT_TIMEOUT_FUTURE = Futures.immediateCancelledFuture();
+
/** Executor for application processing. */
private final Executor executor;
private final HandlerRegistry registry;
@@ -77,6 +84,8 @@
/** {@code transportServer} and services encapsulating something similar to a TCP connection. */
private final Collection<ServerTransport> transports = new HashSet<ServerTransport>();
+ private final ScheduledExecutorService timeoutService;
+
/**
* Construct a server.
*
@@ -88,6 +97,8 @@
this.executor = Preconditions.checkNotNull(executor, "executor");
this.registry = Preconditions.checkNotNull(registry, "registry");
this.transportServer = Preconditions.checkNotNull(transportServer, "transportServer");
+ // TODO(carl-mastrangelo): replace this with the shared scheduler once PR #576 is merged.
+ this.timeoutService = Executors.newScheduledThreadPool(1);
}
/** Hack to allow executors to auto-shutdown. Not for general use. */
@@ -122,6 +133,7 @@
}
transportServer.shutdown();
shutdown = true;
+ timeoutService.shutdown();
return this;
}
@@ -224,8 +236,7 @@
synchronized (ServerImpl.this) {
// transports collection can be modified during shutdown(), even if we hold the lock, due
// to reentrancy.
- for (ServerTransport transport
- : transports.toArray(new ServerTransport[transports.size()])) {
+ for (ServerTransport transport : new ArrayList<ServerTransport>(transports)) {
transport.shutdown();
}
transportServerTerminated = true;
@@ -249,6 +260,7 @@
@Override
public ServerStreamListener streamCreated(final ServerStream stream, final String methodName,
final Metadata.Headers headers) {
+ final Future<?> timeout = scheduleTimeout(stream, headers);
SerializingExecutor serializingExecutor = new SerializingExecutor(executor);
final JumpToApplicationThreadServerStreamListener jumpListener
= new JumpToApplicationThreadServerStreamListener(serializingExecutor, stream);
@@ -256,32 +268,53 @@
// are delivered, including any errors. Callbacks can still be triggered, but they will be
// queued.
serializingExecutor.execute(new Runnable() {
- @Override
- public void run() {
- ServerStreamListener listener = NOOP_LISTENER;
- try {
- HandlerRegistry.Method method = registry.lookupMethod(methodName);
- if (method == null) {
- stream.close(
- Status.UNIMPLEMENTED.withDescription("Method not found: " + methodName),
- new Metadata.Trailers());
- return;
- }
- listener = startCall(stream, methodName, method.getMethodDefinition(), headers);
- } catch (Throwable t) {
- stream.close(Status.fromThrowable(t), new Metadata.Trailers());
- throw Throwables.propagate(t);
- } finally {
- jumpListener.setListener(listener);
+ @Override
+ public void run() {
+ ServerStreamListener listener = NOOP_LISTENER;
+ try {
+ HandlerRegistry.Method method = registry.lookupMethod(methodName);
+ if (method == null) {
+ stream.close(
+ Status.UNIMPLEMENTED.withDescription("Method not found: " + methodName),
+ new Metadata.Trailers());
+ timeout.cancel(true);
+ return;
}
+ listener = startCall(stream, methodName, method.getMethodDefinition(), timeout,
+ headers);
+ } catch (Throwable t) {
+ stream.close(Status.fromThrowable(t), new Metadata.Trailers());
+ timeout.cancel(true);
+ throw Throwables.propagate(t);
+ } finally {
+ jumpListener.setListener(listener);
}
- });
+ }
+ });
return jumpListener;
}
+ private Future<?> scheduleTimeout(final ServerStream stream, Metadata.Headers headers) {
+ Long timeoutMicros = headers.get(ChannelImpl.TIMEOUT_KEY);
+ if (timeoutMicros == null) {
+ return DEFAULT_TIMEOUT_FUTURE;
+ }
+ return timeoutService.schedule(new Runnable() {
+ @Override
+ public void run() {
+ // This should rarely get run, since the client will likely cancel the stream before
+ // the timeout is reached.
+ stream.cancel(Status.DEADLINE_EXCEEDED);
+ }
+ },
+ timeoutMicros,
+ TimeUnit.MICROSECONDS);
+ }
+
/** Never returns {@code null}. */
private <ReqT, RespT> ServerStreamListener startCall(ServerStream stream, String fullMethodName,
- ServerMethodDefinition<ReqT, RespT> methodDef, Metadata.Headers headers) {
+ ServerMethodDefinition<ReqT, RespT> methodDef, Future<?> timeout,
+ Metadata.Headers headers) {
// TODO(ejona86): should we update fullMethodName to have the canonical path of the method?
final ServerCallImpl<ReqT, RespT> call = new ServerCallImpl<ReqT, RespT>(
stream, methodDef.getMethodDescriptor());
@@ -291,7 +324,7 @@
throw new NullPointerException(
"startCall() returned a null listener for method " + fullMethodName);
}
- return call.newServerStreamListener(listener);
+ return call.newServerStreamListener(listener, timeout);
}
}
@@ -403,7 +436,7 @@
}
}
- private class ServerCallImpl<ReqT, RespT> extends ServerCall<RespT> {
+ private static class ServerCallImpl<ReqT, RespT> extends ServerCall<RespT> {
private final ServerStream stream;
private final MethodDescriptor<ReqT, RespT> method;
private volatile boolean cancelled;
@@ -450,8 +483,9 @@
return cancelled;
}
- private ServerStreamListenerImpl newServerStreamListener(ServerCall.Listener<ReqT> listener) {
- return new ServerStreamListenerImpl(listener);
+ private ServerStreamListenerImpl newServerStreamListener(ServerCall.Listener<ReqT> listener,
+ Future<?> timeout) {
+ return new ServerStreamListenerImpl(listener, timeout);
}
/**
@@ -460,9 +494,11 @@
*/
private class ServerStreamListenerImpl implements ServerStreamListener {
private final ServerCall.Listener<ReqT> listener;
+ private final Future<?> timeout;
- public ServerStreamListenerImpl(ServerCall.Listener<ReqT> listener) {
+ public ServerStreamListenerImpl(ServerCall.Listener<ReqT> listener, Future<?> timeout) {
this.listener = Preconditions.checkNotNull(listener, "listener must not be null");
+ this.timeout = timeout;
}
@Override
@@ -493,6 +529,7 @@
@Override
public void closed(Status status) {
+ timeout.cancel(true);
if (status.isOk()) {
listener.onComplete();
} else {
diff --git a/core/src/main/java/io/grpc/transport/ClientTransport.java b/core/src/main/java/io/grpc/transport/ClientTransport.java
index e9edc69..ed23ea5 100644
--- a/core/src/main/java/io/grpc/transport/ClientTransport.java
+++ b/core/src/main/java/io/grpc/transport/ClientTransport.java
@@ -63,7 +63,7 @@
ClientStreamListener listener);
/**
- * Starts transport. Implementations must not call {@code listener} until after {@code start()}
+ * Starts transport. Implementations must not call {@code listener} until after {@link #start}
* returns.
*
* @param listener non-{@code null} listener of transport events
@@ -81,7 +81,7 @@
/**
* Initiates an orderly shutdown of the transport. Existing streams continue, but new streams will
- * fail (once {@link Listener#transportShutdown()} callback called).
+ * fail (once {@link Listener#transportShutdown} callback called).
*/
void shutdown();
diff --git a/core/src/main/java/io/grpc/transport/ServerStream.java b/core/src/main/java/io/grpc/transport/ServerStream.java
index 454db1d..5fc62f2 100644
--- a/core/src/main/java/io/grpc/transport/ServerStream.java
+++ b/core/src/main/java/io/grpc/transport/ServerStream.java
@@ -41,8 +41,8 @@
/**
* Writes custom metadata as headers on the response stream sent to the client. This method may
- * only be called once and cannot be called after calls to {@code Stream#writePayload}
- * or {@code #close}.
+ * only be called once and cannot be called after calls to {@link Stream#writeMessage}
+ * or {@link #close}.
*
* @param headers to send to client.
*/
@@ -57,4 +57,10 @@
* @param trailers an additional block of metadata to pass to the client on stream closure.
*/
void close(Status status, Metadata.Trailers trailers);
+
+
+ /**
+ * Tears down the stream, typically in the event of a timeout.
+ */
+ public void cancel(Status status);
}
diff --git a/netty/src/main/java/io/grpc/transport/netty/NettyServerStream.java b/netty/src/main/java/io/grpc/transport/netty/NettyServerStream.java
index 099b572..e46c198 100644
--- a/netty/src/main/java/io/grpc/transport/netty/NettyServerStream.java
+++ b/netty/src/main/java/io/grpc/transport/netty/NettyServerStream.java
@@ -34,6 +34,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import io.grpc.Metadata;
+import io.grpc.Status;
import io.grpc.transport.AbstractServerStream;
import io.grpc.transport.WritableBuffer;
import io.netty.buffer.ByteBuf;
@@ -121,4 +122,9 @@
handler.returnProcessedBytes(http2Stream, processedBytes);
writeQueue.scheduleFlush();
}
+
+ @Override
+ public void cancel(Status status) {
+ // TODO(carl-mastrangelo): implement this
+ }
}