Add simple server timeout support

Reintroduce throws

Add timeoutExecutor shutdown

Use a default future

Move timeout cancellation

Cancel the timeout in error cases
diff --git a/core/src/main/java/io/grpc/ChannelImpl.java b/core/src/main/java/io/grpc/ChannelImpl.java
index 5a19266..03387a4 100644
--- a/core/src/main/java/io/grpc/ChannelImpl.java
+++ b/core/src/main/java/io/grpc/ChannelImpl.java
@@ -306,7 +306,7 @@
     }
   }
 
-  private class CallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
+  private final class CallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
     private final MethodDescriptor<ReqT, RespT> method;
     private final SerializingExecutor callExecutor;
     private final boolean unaryRequest;
@@ -314,7 +314,7 @@
     private ClientStream stream;
     private volatile ScheduledFuture<?> deadlineCancellationFuture;
 
-    public CallImpl(MethodDescriptor<ReqT, RespT> method, SerializingExecutor executor,
+    private CallImpl(MethodDescriptor<ReqT, RespT> method, SerializingExecutor executor,
         CallOptions callOptions) {
       this.method = method;
       this.callExecutor = executor;
@@ -403,6 +403,7 @@
         stream.writeMessage(payloadIs);
         failed = false;
       } finally {
+        // TODO(notcarl): Find out if payloadIs needs to be closed.
         if (failed) {
           cancel();
         }
diff --git a/core/src/main/java/io/grpc/Metadata.java b/core/src/main/java/io/grpc/Metadata.java
index 342c0e4..cba3d1a 100644
--- a/core/src/main/java/io/grpc/Metadata.java
+++ b/core/src/main/java/io/grpc/Metadata.java
@@ -42,7 +42,6 @@
 
 import java.util.Arrays;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 
 import javax.annotation.concurrent.NotThreadSafe;
@@ -204,9 +203,9 @@
     Preconditions.checkState(serializable, "Can't serialize raw metadata");
     byte[][] serialized = new byte[store.size() * 2][];
     int i = 0;
-    for (Map.Entry<String, MetadataEntry> entry : store.entries()) {
-      serialized[i++] = entry.getValue().key.asciiName();
-      serialized[i++] = entry.getValue().getSerialized();
+    for (MetadataEntry entry : store.values()) {
+      serialized[i++] = entry.key.asciiName();
+      serialized[i++] = entry.getSerialized();
     }
     return serialized;
   }
diff --git a/core/src/main/java/io/grpc/ServerImpl.java b/core/src/main/java/io/grpc/ServerImpl.java
index cccdbb0..32b5e14 100644
--- a/core/src/main/java/io/grpc/ServerImpl.java
+++ b/core/src/main/java/io/grpc/ServerImpl.java
@@ -33,6 +33,7 @@
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.Futures;
 
 import io.grpc.transport.ServerListener;
 import io.grpc.transport.ServerStream;
@@ -42,9 +43,13 @@
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -64,6 +69,8 @@
 public final class ServerImpl extends Server {
   private static final ServerStreamListener NOOP_LISTENER = new NoopListener();
 
+  private static final Future<?> DEFAULT_TIMEOUT_FUTURE = Futures.immediateCancelledFuture();
+
   /** Executor for application processing. */
   private final Executor executor;
   private final HandlerRegistry registry;
@@ -77,6 +84,8 @@
   /** {@code transportServer} and services encapsulating something similar to a TCP connection. */
   private final Collection<ServerTransport> transports = new HashSet<ServerTransport>();
 
+  private final ScheduledExecutorService timeoutService;
+
   /**
    * Construct a server.
    *
@@ -88,6 +97,8 @@
     this.executor = Preconditions.checkNotNull(executor, "executor");
     this.registry = Preconditions.checkNotNull(registry, "registry");
     this.transportServer = Preconditions.checkNotNull(transportServer, "transportServer");
+    // TODO(carl-mastrangelo): replace this with the shared scheduler once PR #576 is merged.
+    this.timeoutService = Executors.newScheduledThreadPool(1);
   }
 
   /** Hack to allow executors to auto-shutdown. Not for general use. */
@@ -122,6 +133,7 @@
     }
     transportServer.shutdown();
     shutdown = true;
+    timeoutService.shutdown();
     return this;
   }
 
@@ -224,8 +236,7 @@
       synchronized (ServerImpl.this) {
         // transports collection can be modified during shutdown(), even if we hold the lock, due
         // to reentrancy.
-        for (ServerTransport transport
-            : transports.toArray(new ServerTransport[transports.size()])) {
+        for (ServerTransport transport : new ArrayList<ServerTransport>(transports)) {
           transport.shutdown();
         }
         transportServerTerminated = true;
@@ -249,6 +260,7 @@
     @Override
     public ServerStreamListener streamCreated(final ServerStream stream, final String methodName,
         final Metadata.Headers headers) {
+      final Future<?> timeout = scheduleTimeout(stream, headers);
       SerializingExecutor serializingExecutor = new SerializingExecutor(executor);
       final JumpToApplicationThreadServerStreamListener jumpListener
           = new JumpToApplicationThreadServerStreamListener(serializingExecutor, stream);
@@ -256,32 +268,53 @@
       // are delivered, including any errors. Callbacks can still be triggered, but they will be
       // queued.
       serializingExecutor.execute(new Runnable() {
-            @Override
-            public void run() {
-              ServerStreamListener listener = NOOP_LISTENER;
-              try {
-                HandlerRegistry.Method method = registry.lookupMethod(methodName);
-                if (method == null) {
-                  stream.close(
-                      Status.UNIMPLEMENTED.withDescription("Method not found: " + methodName),
-                      new Metadata.Trailers());
-                  return;
-                }
-                listener = startCall(stream, methodName, method.getMethodDefinition(), headers);
-              } catch (Throwable t) {
-                stream.close(Status.fromThrowable(t), new Metadata.Trailers());
-                throw Throwables.propagate(t);
-              } finally {
-                jumpListener.setListener(listener);
+          @Override
+          public void run() {
+            ServerStreamListener listener = NOOP_LISTENER;
+            try {
+              HandlerRegistry.Method method = registry.lookupMethod(methodName);
+              if (method == null) {
+                stream.close(
+                    Status.UNIMPLEMENTED.withDescription("Method not found: " + methodName),
+                    new Metadata.Trailers());
+                timeout.cancel(true);
+                return;
               }
+              listener = startCall(stream, methodName, method.getMethodDefinition(), timeout,
+                  headers);
+            } catch (Throwable t) {
+              stream.close(Status.fromThrowable(t), new Metadata.Trailers());
+              timeout.cancel(true);
+              throw Throwables.propagate(t);
+            } finally {
+              jumpListener.setListener(listener);
             }
-          });
+          }
+        });
       return jumpListener;
     }
 
+    private Future<?> scheduleTimeout(final ServerStream stream, Metadata.Headers headers) {
+      Long timeoutMicros = headers.get(ChannelImpl.TIMEOUT_KEY);
+      if (timeoutMicros == null) {
+        return DEFAULT_TIMEOUT_FUTURE;
+      }
+      return timeoutService.schedule(new Runnable() {
+          @Override
+          public void run() {
+            // This should rarely get run, since the client will likely cancel the stream before
+            // the timeout is reached.
+            stream.cancel(Status.DEADLINE_EXCEEDED);
+          }
+        },
+        timeoutMicros,
+        TimeUnit.MICROSECONDS);
+    }
+
     /** Never returns {@code null}. */
     private <ReqT, RespT> ServerStreamListener startCall(ServerStream stream, String fullMethodName,
-        ServerMethodDefinition<ReqT, RespT> methodDef, Metadata.Headers headers) {
+        ServerMethodDefinition<ReqT, RespT> methodDef, Future<?> timeout,
+        Metadata.Headers headers) {
       // TODO(ejona86): should we update fullMethodName to have the canonical path of the method?
       final ServerCallImpl<ReqT, RespT> call = new ServerCallImpl<ReqT, RespT>(
           stream, methodDef.getMethodDescriptor());
@@ -291,7 +324,7 @@
         throw new NullPointerException(
             "startCall() returned a null listener for method " + fullMethodName);
       }
-      return call.newServerStreamListener(listener);
+      return call.newServerStreamListener(listener, timeout);
     }
   }
 
@@ -403,7 +436,7 @@
     }
   }
 
-  private class ServerCallImpl<ReqT, RespT> extends ServerCall<RespT> {
+  private static class ServerCallImpl<ReqT, RespT> extends ServerCall<RespT> {
     private final ServerStream stream;
     private final MethodDescriptor<ReqT, RespT> method;
     private volatile boolean cancelled;
@@ -450,8 +483,9 @@
       return cancelled;
     }
 
-    private ServerStreamListenerImpl newServerStreamListener(ServerCall.Listener<ReqT> listener) {
-      return new ServerStreamListenerImpl(listener);
+    private ServerStreamListenerImpl newServerStreamListener(ServerCall.Listener<ReqT> listener,
+        Future<?> timeout) {
+      return new ServerStreamListenerImpl(listener, timeout);
     }
 
     /**
@@ -460,9 +494,11 @@
      */
     private class ServerStreamListenerImpl implements ServerStreamListener {
       private final ServerCall.Listener<ReqT> listener;
+      private final Future<?> timeout;
 
-      public ServerStreamListenerImpl(ServerCall.Listener<ReqT> listener) {
+      public ServerStreamListenerImpl(ServerCall.Listener<ReqT> listener, Future<?> timeout) {
         this.listener = Preconditions.checkNotNull(listener, "listener must not be null");
+        this.timeout = timeout;
       }
 
       @Override
@@ -493,6 +529,7 @@
 
       @Override
       public void closed(Status status) {
+        timeout.cancel(true);
         if (status.isOk()) {
           listener.onComplete();
         } else {
diff --git a/core/src/main/java/io/grpc/transport/ClientTransport.java b/core/src/main/java/io/grpc/transport/ClientTransport.java
index e9edc69..ed23ea5 100644
--- a/core/src/main/java/io/grpc/transport/ClientTransport.java
+++ b/core/src/main/java/io/grpc/transport/ClientTransport.java
@@ -63,7 +63,7 @@
                          ClientStreamListener listener);
 
   /**
-   * Starts transport. Implementations must not call {@code listener} until after {@code start()}
+   * Starts transport. Implementations must not call {@code listener} until after {@link #start}
    * returns.
    *
    * @param listener non-{@code null} listener of transport events
@@ -81,7 +81,7 @@
 
   /**
    * Initiates an orderly shutdown of the transport. Existing streams continue, but new streams will
-   * fail (once {@link Listener#transportShutdown()} callback called).
+   * fail (once {@link Listener#transportShutdown} callback called).
    */
   void shutdown();
 
diff --git a/core/src/main/java/io/grpc/transport/ServerStream.java b/core/src/main/java/io/grpc/transport/ServerStream.java
index 454db1d..5fc62f2 100644
--- a/core/src/main/java/io/grpc/transport/ServerStream.java
+++ b/core/src/main/java/io/grpc/transport/ServerStream.java
@@ -41,8 +41,8 @@
 
   /**
    * Writes custom metadata as headers on the response stream sent to the client. This method may
-   * only be called once and cannot be called after calls to {@code Stream#writePayload}
-   * or {@code #close}.
+   * only be called once and cannot be called after calls to {@link Stream#writeMessage}
+   * or {@link #close}.
    *
    * @param headers to send to client.
    */
@@ -57,4 +57,10 @@
    * @param trailers an additional block of metadata to pass to the client on stream closure.
    */
   void close(Status status, Metadata.Trailers trailers);
+
+
+  /**
+   * Tears down the stream, typically in the event of a timeout.
+   */
+  public void cancel(Status status);
 }
diff --git a/netty/src/main/java/io/grpc/transport/netty/NettyServerStream.java b/netty/src/main/java/io/grpc/transport/netty/NettyServerStream.java
index 099b572..e46c198 100644
--- a/netty/src/main/java/io/grpc/transport/netty/NettyServerStream.java
+++ b/netty/src/main/java/io/grpc/transport/netty/NettyServerStream.java
@@ -34,6 +34,7 @@
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import io.grpc.Metadata;
+import io.grpc.Status;
 import io.grpc.transport.AbstractServerStream;
 import io.grpc.transport.WritableBuffer;
 import io.netty.buffer.ByteBuf;
@@ -121,4 +122,9 @@
     handler.returnProcessedBytes(http2Stream, processedBytes);
     writeQueue.scheduleFlush();
   }
+
+  @Override
+  public void cancel(Status status) {
+    // TODO(carl-mastrangelo): implement this
+  }
 }