Phase 1 of converting to new Headers mechanism for side-channel.
Introduces Header and uses it for propagating text-only header values over existing transports
Leaves Context & wire format otherwise unchanged

Next phases
- Remove context from interfaces
- Switch the wire format (ESF needs to be done in near lock-step)

Interface changes are relatively light
Headers class is functional but not optimal
All serialization is done as string until transports expose interface for binary headers
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=75050265
diff --git a/auth/src/main/java/com/google/net/stubby/auth/OAuth2ChannelInterceptor.java b/auth/src/main/java/com/google/net/stubby/auth/OAuth2ChannelInterceptor.java
index 3df0f04..1459fd7 100644
--- a/auth/src/main/java/com/google/net/stubby/auth/OAuth2ChannelInterceptor.java
+++ b/auth/src/main/java/com/google/net/stubby/auth/OAuth2ChannelInterceptor.java
@@ -1,18 +1,21 @@
 package com.google.net.stubby.auth;
 
 import com.google.api.client.auth.oauth2.Credential;
-import com.google.common.base.Preconditions;
 import com.google.net.stubby.Call;
 import com.google.net.stubby.Channel;
+import com.google.net.stubby.Metadata;
 import com.google.net.stubby.MethodDescriptor;
+import com.google.net.stubby.context.ForwardingChannel;
 
 import java.util.concurrent.Executor;
 
 import javax.inject.Provider;
 
 /** Channel wrapper that authenticates all calls with OAuth2. */
-public class OAuth2ChannelInterceptor implements Channel {
-  private final Channel delegate;
+public class OAuth2ChannelInterceptor extends ForwardingChannel {
+  private static final Metadata.Key<String> AUTHORIZATION =
+      new Metadata.Key<String>("Authorization", Metadata.STRING_MARSHALLER);
+
   private final OAuth2AccessTokenProvider accessTokenProvider;
   private final Provider<String> authorizationHeaderProvider
       = new Provider<String>() {
@@ -23,7 +26,7 @@
       };
 
   public OAuth2ChannelInterceptor(Channel delegate, Credential credential, Executor executor) {
-    this.delegate = Preconditions.checkNotNull(delegate);
+    super(delegate);
     this.accessTokenProvider = new OAuth2AccessTokenProvider(credential, executor);
   }
 
@@ -31,6 +34,12 @@
   public <ReqT, RespT> Call<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method) {
     // TODO(user): If the call fails for Auth reasons, this does not properly propagate info that
     // would be in WWW-Authenticate, because it does not yet have access to the header.
-    return delegate.newCall(method.withHeader("Authorization", authorizationHeaderProvider));
+    return new ForwardingCall<ReqT, RespT>(delegate.newCall(method)) {
+      @Override
+      public void start(Listener<RespT> responseListener, Metadata.Headers headers) {
+        headers.put(AUTHORIZATION, authorizationHeaderProvider.get());
+        super.start(responseListener, headers);
+      }
+    };
   }
 }
diff --git a/core/src/main/java/com/google/net/stubby/Call.java b/core/src/main/java/com/google/net/stubby/Call.java
index 554049b..6be24a8 100644
--- a/core/src/main/java/com/google/net/stubby/Call.java
+++ b/core/src/main/java/com/google/net/stubby/Call.java
@@ -15,9 +15,9 @@
  *
  * <p>{@link #start} is required to be the first of any methods called.
  *
- * <p>Any contexts must be sent before any payloads, which must be sent before half closing.
+ * <p>Any headers must be sent before any payloads, which must be sent before half closing.
  *
- * <p>No generic method for determining message receipt or providing acknowlegement is provided.
+ * <p>No generic method for determining message receipt or providing acknowledgement is provided.
  * Applications are expected to utilize normal payload messages for such signals, as a response
  * natually acknowledges its request.
  *
@@ -27,13 +27,22 @@
   /**
    * Callbacks for consuming incoming RPC messages.
    *
-   * <p>Any contexts are guaranteed to arrive before any payloads, which are guaranteed before
+   * <p>Response headers are guaranteed to arrive before any payloads, which are guaranteed
+   * to arrive before close. An additional block of headers called 'trailers' can be delivered with
    * close.
    *
    * <p>Implementations are free to block for extended periods of time. Implementations are not
    * required to be thread-safe.
    */
   public abstract static class Listener<T> {
+
+    /**
+     * The response headers have been received. Headers always precede payloads.
+     * This method is always called, if no headers were received then an empty {@link Metadata}
+     * is passed.
+     */
+    public abstract ListenableFuture<Void> onHeaders(Metadata.Headers headers);
+
     /**
      * A response context has been received. Any context messages will precede payload messages.
      *
@@ -42,6 +51,7 @@
      * method.
      */
     @Nullable
+    @Deprecated
     public abstract ListenableFuture<Void> onContext(String name, InputStream value);
 
     /**
@@ -52,33 +62,24 @@
     public abstract ListenableFuture<Void> onPayload(T payload);
 
     /**
-     * The Call has been closed. No further sending or receiving will occur. If {@code status} is
-     * not equal to {@link Status#OK}, then the call failed.
+     * The Call has been closed. No further sending or receiving can occur. If {@code status} is
+     * not equal to {@link Status#OK}, then the call failed. An additional block of headers may be
+     * received at the end of the call from the server. An empty {@link Metadata} object is passed
+     * if no trailers are received.
      */
-    public abstract void onClose(Status status);
+    public abstract void onClose(Status status, Metadata.Trailers trailers);
   }
 
   /**
    * Start a call, using {@code responseListener} for processing response messages.
    *
    * @param responseListener receives response messages
+   * @param headers which can contain extra information like authentication.
    * @throws IllegalStateException if call is already started
    */
-  public abstract void start(Listener<ResponseT> responseListener);
+  // TODO(user): Might be better to put into Channel#newCall, might reduce decoration burden
+  public abstract void start(Listener<ResponseT> responseListener, Metadata.Headers headers);
 
-  /**
-   * Prevent any further processing for this Call. No further messages may be sent or will be
-   * received. The server is informed of cancellations, but may not stop processing the call.
-   * Cancellation is permitted even if previously {@code cancel()}ed or {@link #halfClose}d.
-   */
-  public abstract void cancel();
-
-  /**
-   * Close call for message sending. Incoming messages are unaffected.
-   *
-   * @throws IllegalStateException if call is already {@code halfClose()}d or {@link #cancel}ed
-   */
-  public abstract void halfClose();
 
   /**
    * Send a context message. Context messages are intended for side-channel information like
@@ -89,6 +90,7 @@
    * @throws IllegalStateException if call is {@link #halfClose}d or explicitly {@link #cancel}ed,
    *     or after {@link #sendPayload}
    */
+  @Deprecated
   public void sendContext(String name, InputStream value) {
     sendContext(name, value, null);
   }
@@ -108,8 +110,23 @@
    * @throws IllegalStateException if call is {@link #halfClose}d or explicitly {@link #cancel}ed,
    *     or after {@link #sendPayload}
    */
+  @Deprecated
   public abstract void sendContext(String name, InputStream value,
-      @Nullable SettableFuture<Void> accepted);
+                                   @Nullable SettableFuture<Void> accepted);
+
+  /**
+   * Prevent any further processing for this Call. No further messages may be sent or will be
+   * received. The server is informed of cancellations, but may not stop processing the call.
+   * Cancellation is permitted even if previously {@code cancel()}ed or {@link #halfClose}d.
+   */
+  public abstract void cancel();
+
+  /**
+   * Close call for message sending. Incoming messages are unaffected.
+   *
+   * @throws IllegalStateException if call is already {@code halfClose()}d or {@link #cancel}ed
+   */
+  public abstract void halfClose();
 
   /**
    * Send a payload message. Payload messages are the primary form of communication associated with
diff --git a/core/src/main/java/com/google/net/stubby/ChannelImpl.java b/core/src/main/java/com/google/net/stubby/ChannelImpl.java
index 9717495..41f0b22 100644
--- a/core/src/main/java/com/google/net/stubby/ChannelImpl.java
+++ b/core/src/main/java/com/google/net/stubby/ChannelImpl.java
@@ -11,8 +11,8 @@
 import com.google.net.stubby.newtransport.ClientTransportFactory;
 import com.google.net.stubby.newtransport.StreamListener;
 
-import java.io.InputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -136,9 +136,12 @@
     }
 
     @Override
-    public void start(Listener<RespT> observer) {
+    public void start(Listener<RespT> observer, Metadata.Headers headers) {
       Preconditions.checkState(stream == null, "Already started");
-      stream = obtainActiveTransport().newStream(method, new StreamListenerImpl(observer));
+      headers.setPath(method.getName());
+      headers.setAuthority("fixme");
+      stream = obtainActiveTransport().newStream(method, headers,
+          new StreamListenerImpl(observer));
     }
 
     @Override
@@ -239,6 +242,16 @@
       }
 
       @Override
+      public ListenableFuture<Void> headersRead(final Metadata.Headers headers) {
+        return dispatchCallable(new Callable<ListenableFuture<Void>>() {
+          @Override
+          public ListenableFuture<Void> call() throws Exception {
+            return observer.onHeaders(headers);
+          }
+        });
+      }
+
+      @Override
       public ListenableFuture<Void> contextRead(final String name, final InputStream value,
           final int length) {
         return dispatchCallable(new Callable<ListenableFuture<Void>>() {
@@ -260,7 +273,7 @@
       }
 
       @Override
-      public void closed(final Status status) {
+      public void closed(final Status status, final Metadata.Trailers trailers) {
         for (SettableFuture<Void> future : inProcessFutures) {
           future.cancel(false);
         }
@@ -268,7 +281,7 @@
         callExecutor.execute(new Runnable() {
           @Override
           public void run() {
-            observer.onClose(status);
+            observer.onClose(status, trailers);
           }
         });
       }
diff --git a/core/src/main/java/com/google/net/stubby/Metadata.java b/core/src/main/java/com/google/net/stubby/Metadata.java
new file mode 100644
index 0000000..ca83e0c
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/Metadata.java
@@ -0,0 +1,472 @@
+package com.google.net.stubby;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+/**
+ * Provides access to read and write metadata values to be exchanged during a call.
+ * <p>
+ * This class is not thread safe, implementations should ensure that header reads and writes
+ * do not occur in multiple threads concurrently.
+ * </p>
+ */
+@NotThreadSafe
+public abstract class Metadata<S extends Metadata> {
+
+  /**
+   * Interleave keys and values into a single iterator.
+   */
+  private static Iterator<String> fromMapEntries(Iterable<Map.Entry<String, String>> entries) {
+    final Iterator<Map.Entry<String, String>> iterator = entries.iterator();
+    return new Iterator<String>() {
+      Map.Entry<String, String> last;
+      @Override
+      public boolean hasNext() {
+        return last != null || iterator.hasNext();
+      }
+
+      @Override
+      public String next() {
+        if (last == null) {
+          last = iterator.next();
+          return last.getKey();
+        } else {
+          String val = last.getValue();
+          last = null;
+          return val;
+        }
+      }
+
+      @Override
+      public void remove() {
+        throw new UnsupportedOperationException();
+      }
+    };
+  }
+
+  /**
+   * Simple metadata marshaller that encodes strings as either UTF-8 or ASCII bytes.
+   */
+  public static final Marshaller<String> STRING_MARSHALLER =
+      new Marshaller<String>() {
+
+    @Override
+    public byte[] toBytes(String value) {
+      return value.getBytes(UTF_8);
+    }
+
+    @Override
+    public String toAscii(String value) {
+      return value;
+    }
+
+    @Override
+    public String parseBytes(byte[] serialized) {
+      return new String(serialized, UTF_8);
+    }
+
+    @Override
+    public String parseAscii(String ascii) {
+      return ascii;
+    }
+  };
+
+  private final ListMultimap<String, MetadataEntry> store;
+  private final boolean serializable;
+
+  /**
+   * Constructor called by the transport layer when it receives binary metadata.
+   */
+  // TODO(user): Convert to use ByteString so we can cache transformations
+  private Metadata(byte[]... binaryValues) {
+    store = LinkedListMultimap.create();
+    for (int i = 0; i < binaryValues.length; i++) {
+      String name = new String(binaryValues[i], StandardCharsets.US_ASCII);
+      store.put(name, new MetadataEntry(binaryValues[++i]));
+    }
+    this.serializable = false;
+  }
+
+  /**
+   * Constructor called by the transport layer when it receives ASCII metadata.
+   */
+  private Metadata(String... asciiValues) {
+    store = LinkedListMultimap.create();
+    for (int i = 0; i < asciiValues.length; i++) {
+      store.put(asciiValues[i], new MetadataEntry(asciiValues[++i]));
+    }
+    this.serializable = false;
+  }
+
+  /**
+   * Constructor called by the application layer when it wants to send metadata.
+   */
+  private Metadata() {
+    store = LinkedListMultimap.create();
+    this.serializable = true;
+  }
+
+  /**
+   * Returns true if a value is defined for the given key.
+   */
+  public <T> boolean containsKey(Key key) {
+    return store.containsKey(key.name);
+  }
+
+  /**
+   * Returns the last metadata entry added with the name 'name' parsed as T.
+   * @return the parsed metadata entry or null if not defined
+   */
+  public <T> T get(Key<T> key) {
+    MetadataEntry metadataEntry = Iterables.getLast(store.get(key.name()));
+    return metadataEntry == null ? null : metadataEntry.getParsed(key);
+  }
+
+  /**
+   * Returns all the metadata entries named 'name', in the order they were received,
+   * parsed as T.
+   */
+  public <T> Iterable<T> getAll(final Key<T> key) {
+    return Iterables.transform(
+        store.get(key.name()),
+        new Function<MetadataEntry, T>() {
+          @Override
+          public T apply(MetadataEntry entry) {
+            return entry.getParsed(key);
+          }
+        });
+  }
+
+  public <T> void put(Key<T> key, T value) {
+    store.put(key.name(), new MetadataEntry(key, value));
+  }
+
+  /**
+   * Remove a specific value.
+   */
+  public <T> boolean remove(Key<T> key, T value) {
+    return store.remove(key.name(), value);
+  }
+
+  /**
+   * Remove all values for the given key.
+   */
+  public <T> List<T> removeAll(final Key<T> key) {
+    return Lists.transform(store.removeAll(key.name()), new Function<MetadataEntry, T>() {
+      @Override
+      public T apply(MetadataEntry metadataEntry) {
+        return metadataEntry.getParsed(key);
+      }
+    });
+  }
+
+  /**
+   * Can this metadata be serialized. Metadata constructed from raw binary or ascii values
+   * cannot be serialized without merging it into a serializable instance using
+   * {@link #merge(Metadata, java.util.Set)}
+   */
+  public boolean isSerializable() {
+    return serializable;
+  }
+
+  /**
+   * Serialize all the metadata entries
+   */
+  public byte[][] serialize() {
+    Preconditions.checkState(serializable, "Can't serialize raw metadata");
+    byte[][] serialized = new byte[store.size() * 2][];
+    int i = 0;
+    for (Map.Entry<String, MetadataEntry> entry : store.entries()) {
+      serialized[i++] = entry.getValue().key.asciiName();
+      serialized[i++] = entry.getValue().getSerialized();
+    }
+    return serialized;
+  }
+
+  /**
+   * Serialize all the metadata entries
+   */
+  public String[] serializeAscii() {
+    Preconditions.checkState(serializable, "Can't serialize received metadata");
+    String[] serialized = new String[store.size() * 2];
+    int i = 0;
+    for (Map.Entry<String, MetadataEntry> entry : store.entries()) {
+      serialized[i++] = entry.getValue().key.name();
+      serialized[i++] = entry.getValue().getSerializedAscii();
+    }
+    return serialized;
+  }
+
+  /**
+   * Perform a simple merge of two sets of metadata.
+   * <p>
+   * Note that we can't merge non-serializable metadata into serializable.
+   * </p>
+   */
+  public void merge(Metadata other) {
+    Preconditions.checkNotNull(other);
+    if (this.serializable) {
+      if (!other.serializable) {
+        throw new IllegalArgumentException(
+            "Cannot merge non-serializable metadata into serializable metadata without keys");
+      }
+    }
+    store.putAll(other.store);
+  }
+
+  /**
+   * Merge values for the given set of keys into this set of metadata.
+   */
+  public void merge(Metadata other, Set<Key> keys) {
+    Preconditions.checkNotNull(other);
+    for (Key key : keys) {
+      if (other.containsKey(key)) {
+        Iterable values = other.getAll(key);
+        for (Object value : values) {
+          put(key, value);
+        }
+      }
+    }
+  }
+
+  /**
+   * Concrete instance for metadata attached to the start of a call.
+   */
+  public static class Headers extends Metadata<Headers> {
+    private String path;
+    private String authority;
+
+    /**
+     * Called by the transport layer to create headers from their binary serialized values.
+     */
+    public Headers(byte[]... headers) {
+      super(headers);
+    }
+
+    /**
+     * Called by the transport layer to create headers from their ASCII serialized values.
+     */
+    public Headers(String... asciiValues) {
+      super(asciiValues);
+    }
+
+    /**
+     * Called by the transport layer to create headers from their ASCII serialized values.
+     */
+    public Headers(Iterable<Map.Entry<String, String>> mapEntries) {
+      super(Iterators.toArray(fromMapEntries(mapEntries), String.class));
+    }
+
+    /**
+     * Called by the application layer to construct headers prior to passing them to the
+     * transport for serialization.
+     */
+    public Headers() {
+    }
+
+    /**
+     * The path for the operation.
+     */
+    public String getPath() {
+      return path;
+    }
+
+    public void setPath(String path) {
+      this.path = path;
+    }
+
+    /**
+     * The serving authority for the operation.
+     */
+    public String getAuthority() {
+      return authority;
+    }
+
+    public void setAuthority(String authority) {
+      this.authority = authority;
+    }
+  }
+
+  /**
+   * Concrete instance for metadata attached to the end of the call. Only provided by
+   * servers.
+   */
+  public static class Trailers extends Metadata<Headers> {
+    /**
+     * Called by the transport layer to create trailers from their binary serialized values.
+     */
+    public Trailers(byte[]... headers) {
+      super(headers);
+    }
+
+    /**
+     * Called by the transport layer to create trailers from their ASCII serialized values.
+     */
+    public Trailers(String... asciiValues) {
+      super(asciiValues);
+    }
+
+    /**
+     * Called by the transport layer to create headers from their ASCII serialized values.
+     */
+    public Trailers(Iterable<Map.Entry<String, String>> mapEntries) {
+      super(Iterators.toArray(fromMapEntries(mapEntries), String.class));
+    }
+
+    /**
+     * Called by the application layer to construct trailers prior to passing them to the
+     * transport for serialization.
+     */
+    public Trailers() {
+    }
+  }
+
+
+  /**
+   * Marshaller for metadata values.
+   */
+  public static interface Marshaller<T> {
+    /**
+     * Serialize a metadata value to bytes.
+     * @param value to serialize
+     * @return serialized version of value, or null if value cannot be transmitted.
+     */
+    public byte[] toBytes(T value);
+
+    /**
+     * Serialize a metadata value to an ASCII string
+     * @param value to serialize
+     * @return serialized ascii version of value, or null if value cannot be transmitted.
+     */
+    public String toAscii(T value);
+
+    /**
+     * Parse a serialized metadata value from bytes.
+     * @param serialized value of metadata to parse
+     * @return a parsed instance of type T
+     */
+    public T parseBytes(byte[] serialized);
+
+    /**
+     * Parse a serialized metadata value from an ascii string.
+     * @param ascii string value of metadata to parse
+     * @return a parsed instance of type T
+     */
+    public T parseAscii(String ascii);
+  }
+
+  /**
+   * Key for metadata entries. Allows for parsing and serialization of metadata.
+   */
+  public static class Key<T> {
+
+    private final String name;
+    private final byte[] asciiName;
+    private final Marshaller<T> marshaller;
+
+    /**
+     * Keys have a name and a marshaller used for serialization.
+     */
+    public Key(String name, Marshaller<T> marshaller) {
+      this.name = Preconditions.checkNotNull(name, "name").intern();
+      this.asciiName = name.getBytes(StandardCharsets.US_ASCII);
+      this.marshaller = Preconditions.checkNotNull(marshaller);
+    }
+
+    public String name() {
+      return name;
+    }
+
+    @VisibleForTesting
+    byte[] asciiName() {
+      return asciiName;
+    }
+
+    public Marshaller<T> getMarshaller() {
+      return marshaller;
+    }
+  }
+
+  private static class MetadataEntry {
+    Object parsed;
+    Key key;
+    byte[] serializedBinary;
+    String serializedAscii;
+
+    /**
+     * Constructor used when application layer adds a parsed value.
+     */
+    private MetadataEntry(Key key, Object parsed) {
+      this.parsed = Preconditions.checkNotNull(parsed);
+      this.key = Preconditions.checkNotNull(key);
+    }
+
+    /**
+     * Constructor used when reading a value from the transport.
+     */
+    private MetadataEntry(byte[] serialized) {
+      Preconditions.checkNotNull(serialized);
+      this.serializedBinary = serialized;
+    }
+
+    /**
+     * Constructor used when reading a value from the transport.
+     */
+    private MetadataEntry(String serializedAscii) {
+      this.serializedAscii = Preconditions.checkNotNull(serializedAscii);
+    }
+
+    public <T> T getParsed(Key<T> key) {
+      @SuppressWarnings("unchecked")
+      T value = (T) parsed;
+      if (value != null) {
+        if (this.key != key) {
+          // Keys don't match so serialize using the old key
+          serializedBinary = this.key.getMarshaller().toBytes(value);
+        } else {
+          return value;
+        }
+      }
+      this.key = key;
+      if (serializedBinary != null) {
+        value = key.getMarshaller().parseBytes(serializedBinary);
+      } else if (serializedAscii != null) {
+        value = key.getMarshaller().parseAscii(serializedAscii);
+      }
+      parsed = value;
+      return value;
+    }
+
+    @SuppressWarnings("unchecked")
+    public byte[] getSerialized() {
+      return serializedBinary =
+          serializedBinary == null
+              ? key.getMarshaller().toBytes(parsed) :
+              serializedBinary;
+    }
+
+    @SuppressWarnings("unchecked")
+    public String getSerializedAscii() {
+      return serializedAscii =
+          serializedAscii == null
+              ? key.getMarshaller().toAscii(parsed) :
+              serializedAscii;
+    }
+  }
+}
diff --git a/core/src/main/java/com/google/net/stubby/MethodDescriptor.java b/core/src/main/java/com/google/net/stubby/MethodDescriptor.java
index d5671fa..a81e4ac 100644
--- a/core/src/main/java/com/google/net/stubby/MethodDescriptor.java
+++ b/core/src/main/java/com/google/net/stubby/MethodDescriptor.java
@@ -10,7 +10,6 @@
 import java.util.concurrent.TimeUnit;
 
 import javax.annotation.concurrent.Immutable;
-import javax.inject.Provider;
 
 /**
  * Descriptor for a single operation, used by Channel to execute a call.
@@ -31,28 +30,24 @@
   private final Marshaller<RequestT> requestMarshaller;
   private final Marshaller<ResponseT> responseMarshaller;
   private final long timeoutMicros;
-  private final Map<String, Provider<String>> headers;
 
   public static <RequestT, ResponseT> MethodDescriptor<RequestT, ResponseT> create(
       Type type, String name, long timeout, TimeUnit timeoutUnit,
       Marshaller<RequestT> requestMarshaller,
       Marshaller<ResponseT> responseMarshaller) {
     return new MethodDescriptor<RequestT, ResponseT>(
-        type, name, timeoutUnit.toMicros(timeout), requestMarshaller, responseMarshaller,
-        Collections.<String, Provider<String>>emptyMap());
+        type, name, timeoutUnit.toMicros(timeout), requestMarshaller, responseMarshaller);
   }
 
   private MethodDescriptor(Type type, String name, long timeoutMicros,
                            Marshaller<RequestT> requestMarshaller,
-                           Marshaller<ResponseT> responseMarshaller,
-                           Map<String, Provider<String>> headers) {
+                           Marshaller<ResponseT> responseMarshaller) {
     this.type = Preconditions.checkNotNull(type);
     this.name = name;
     Preconditions.checkArgument(timeoutMicros > 0);
     this.timeoutMicros = timeoutMicros;
     this.requestMarshaller = requestMarshaller;
     this.responseMarshaller = responseMarshaller;
-    this.headers = Collections.unmodifiableMap(headers);
   }
 
   /**
@@ -77,20 +72,6 @@
   }
 
   /**
-   * Return a snapshot of the headers.
-   */
-  public Map<String, String> getHeaders() {
-    if (headers.isEmpty()) {
-      return Collections.emptyMap();
-    }
-    Map<String, String> snapshot = new HashMap<String, String>();
-    for (Entry<String, Provider<String>> entry : headers.entrySet()) {
-      snapshot.put(entry.getKey(), entry.getValue().get());
-    }
-    return Collections.unmodifiableMap(snapshot);
-  }
-
-  /**
    * Parse a response payload from the given {@link InputStream}
    */
   public ResponseT parseResponse(InputStream input) {
@@ -109,28 +90,6 @@
    */
   public MethodDescriptor<RequestT, ResponseT> withTimeout(long timeout, TimeUnit unit) {
     return new MethodDescriptor<RequestT, ResponseT>(type, name, unit.toMicros(timeout),
-        requestMarshaller, responseMarshaller, headers);
-  }
-
-  /**
-   * Create a new descriptor with an additional bound header.
-   */
-  public MethodDescriptor<RequestT, ResponseT> withHeader(String headerName,
-      Provider<String> headerValueProvider) {
-    Map<String, Provider<String>> newHeaders = new HashMap<String, Provider<String>>(headers);
-    newHeaders.put(headerName, headerValueProvider);
-    return new MethodDescriptor<RequestT, ResponseT>(type, name, timeoutMicros,
-        requestMarshaller, responseMarshaller, newHeaders);
-  }
-
-  /**
-   * Creates a new descriptor with additional bound headers.
-   */
-  public MethodDescriptor<RequestT, ResponseT> withHeaders(
-      Map<String, Provider<String>> additionalHeaders) {
-    Map<String, Provider<String>> newHeaders = new HashMap<String, Provider<String>>(headers);
-    newHeaders.putAll(additionalHeaders);
-    return new MethodDescriptor<RequestT, ResponseT>(type, name, timeoutMicros,
-        requestMarshaller, responseMarshaller, newHeaders);
+        requestMarshaller, responseMarshaller);
   }
 }
diff --git a/core/src/main/java/com/google/net/stubby/ServerCall.java b/core/src/main/java/com/google/net/stubby/ServerCall.java
index 043b249..5e9183a 100644
--- a/core/src/main/java/com/google/net/stubby/ServerCall.java
+++ b/core/src/main/java/com/google/net/stubby/ServerCall.java
@@ -3,6 +3,7 @@
 import com.google.common.util.concurrent.ListenableFuture;
 
 import java.io.InputStream;
+
 import javax.annotation.Nullable;
 
 /**
@@ -11,9 +12,9 @@
  * response is most common. This API is generally intended for use generated handlers, but advanced
  * applications may have need for it.
  *
- * <p>Any contexts must be sent before any payloads, which must be sent before closing.
+ * <p>Headers must be sent before any payloads, which must be sent before closing.
  *
- * <p>No generic method for determining message receipt or providing acknowlegement is provided.
+ * <p>No generic method for determining message receipt or providing acknowledgement is provided.
  * Applications are expected to utilize normal payload messages for such signals, as a response
  * natually acknowledges its request.
  *
@@ -34,6 +35,19 @@
   // a case then we either get to generate a half close or purposefully omit it.
   public abstract static class Listener<RequestT> {
     /**
+     * Called upon receiving all header information from the remote end-point.
+     * <p>This method should return quickly, as the same thread may be used to process other
+     * streams.
+     *
+     * @param headers the fully buffered received headers.
+     * @return a processing completion future, or {@code null} to indicate that processing of the
+     *         headers is immediately complete.
+     */
+    @Nullable
+    public abstract ListenableFuture<Void> headersRead(Metadata.Headers headers);
+
+
+    /**
      * A request context has been received. Any context messages will precede payload messages.
      *
      * <p>The {@code value} {@link InputStream} will be closed when the returned future completes.
@@ -41,6 +55,7 @@
      * method.
      */
     @Nullable
+    @Deprecated
     public abstract ListenableFuture<Void> onContext(String name, InputStream value);
 
     /**
@@ -83,7 +98,7 @@
    *
    * @throws IllegalStateException if call is already {@code close}d
    */
-  public abstract void close(Status status);
+  public abstract void close(Status status, Metadata.Trailers trailers);
 
   /**
    * Send a context message. Context messages are intended for side-channel information like
@@ -93,6 +108,7 @@
    * @param value context value bytes
    * @throws IllegalStateException if call is {@link #close}d, or after {@link #sendPayload}
    */
+  @Deprecated
   public abstract void sendContext(String name, InputStream value);
 
   /**
diff --git a/core/src/main/java/com/google/net/stubby/Session.java b/core/src/main/java/com/google/net/stubby/Session.java
index 1b70dc2..d7ae015 100644
--- a/core/src/main/java/com/google/net/stubby/Session.java
+++ b/core/src/main/java/com/google/net/stubby/Session.java
@@ -1,7 +1,5 @@
 package com.google.net.stubby;
 
-import java.util.Map;
-
 /**
  * Session interface to be bound to the transport layer which is used by the higher-level
  * layers to dispatch calls.
@@ -18,6 +16,6 @@
    * Start a request in the context of this session.
    */
   public Request startRequest(String operationName,
-                              Map<String, String> headers,
+                              Metadata.Headers headers,
                               Response.ResponseBuilder responseBuilder);
 }
diff --git a/core/src/main/java/com/google/net/stubby/SessionClientStream.java b/core/src/main/java/com/google/net/stubby/SessionClientStream.java
index 04f8a2f..20b6922 100644
--- a/core/src/main/java/com/google/net/stubby/SessionClientStream.java
+++ b/core/src/main/java/com/google/net/stubby/SessionClientStream.java
@@ -1,18 +1,12 @@
 package com.google.net.stubby;
 
-import com.google.net.stubby.AbstractResponse;
-import com.google.net.stubby.Operation;
-import com.google.net.stubby.Request;
-import com.google.net.stubby.Response;
-import com.google.net.stubby.Session;
-import com.google.net.stubby.Status;
 import com.google.net.stubby.newtransport.ClientStream;
 import com.google.net.stubby.newtransport.StreamListener;
 import com.google.net.stubby.newtransport.StreamState;
 import com.google.net.stubby.transport.Transport;
 
-import java.io.InputStream;
 import java.io.IOException;
+import java.io.InputStream;
 
 /**
  * A temporary shim layer between the new (Channel) and the old (Session). Will go away when the
@@ -151,7 +145,7 @@
     }
 
     private void propagateClosed() {
-      listener.closed(getStatus());
+      listener.closed(getStatus(), new Metadata.Trailers());
     }
   }
 }
diff --git a/core/src/main/java/com/google/net/stubby/SessionClientTransport.java b/core/src/main/java/com/google/net/stubby/SessionClientTransport.java
index 799ca56..e92e96d 100644
--- a/core/src/main/java/com/google/net/stubby/SessionClientTransport.java
+++ b/core/src/main/java/com/google/net/stubby/SessionClientTransport.java
@@ -27,9 +27,10 @@
 
   @Override
   public ClientStream newStream(MethodDescriptor<?, ?> method,
+                                Metadata.Headers headers,
                                 StreamListener listener) {
     final SessionClientStream stream = new SessionClientStream(listener);
-    Request request = session.startRequest(method.getName(), method.getHeaders(),
+    Request request = session.startRequest(method.getName(), headers,
         stream.responseBuilder());
     stream.start(request);
     return stream;
diff --git a/core/src/main/java/com/google/net/stubby/context/ContextExchangeChannel.java b/core/src/main/java/com/google/net/stubby/context/ContextExchangeChannel.java
deleted file mode 100644
index 3e24a37..0000000
--- a/core/src/main/java/com/google/net/stubby/context/ContextExchangeChannel.java
+++ /dev/null
@@ -1,114 +0,0 @@
-package com.google.net.stubby.context;
-
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.net.stubby.Call;
-import com.google.net.stubby.Channel;
-import com.google.net.stubby.Marshaller;
-import com.google.net.stubby.MethodDescriptor;
-
-import java.io.InputStream;
-import java.util.Map;
-
-import javax.annotation.concurrent.NotThreadSafe;
-import javax.inject.Provider;
-
-/**
- * A channel implementation that sends bound context values and records received context.
- * Unlike {@Channel} this class is not thread-safe so it is recommended to create an instance
- * per thread.
- */
-@NotThreadSafe
-public class ContextExchangeChannel extends ForwardingChannel {
-
-  private Map<String, Object> captured;
-  private Map<String, Provider<InputStream>> provided;
-
-  public ContextExchangeChannel(Channel channel) {
-    super(channel);
-    // builder?
-    captured = Maps.newTreeMap();
-    provided = Maps.newTreeMap();
-  }
-
-  @SuppressWarnings("unchecked")
-  public <T> Provider<T> receive(final String name, final Marshaller<T> m) {
-    synchronized (captured) {
-      captured.put(name, null);
-    }
-    return new Provider<T>() {
-      @Override
-      public T get() {
-        synchronized (captured) {
-          Object o = captured.get(name);
-          if (o instanceof InputStream) {
-            o = m.parse((InputStream) o);
-            captured.put(name, o);
-          }
-          return (T) o;
-        }
-      }
-    };
-  }
-
-  public <T> void send(final String name, final T value, final Marshaller<T> m) {
-    synchronized (provided) {
-      provided.put(name, new Provider<InputStream>() {
-        @Override
-        public InputStream get() {
-          return m.stream(value);
-        }
-      });
-    }
-  }
-
-  /**
-   * Clear all received values and allow another call
-   */
-  public void clearLastReceived() {
-    synchronized (captured) {
-      for (Map.Entry<String, Object> entry : captured.entrySet()) {
-        entry.setValue(null);
-      }
-    }
-  }
-
-
-  @Override
-  public <ReqT, RespT> Call<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method) {
-    return new CallImpl<ReqT, RespT>(delegate.newCall(method));
-  }
-
-  private class CallImpl<ReqT, RespT> extends ForwardingCall<ReqT, RespT> {
-    private CallImpl(Call<ReqT, RespT> delegate) {
-      super(delegate);
-    }
-
-    @Override
-    public void start(Listener<RespT> responseListener) {
-      super.start(new ListenerImpl<RespT>(responseListener));
-      synchronized (provided) {
-        for (Map.Entry<String, Provider<InputStream>> entry : provided.entrySet()) {
-          sendContext(entry.getKey(), entry.getValue().get());
-        }
-      }
-    }
-  }
-
-  private class ListenerImpl<T> extends ForwardingListener<T> {
-    private ListenerImpl(Call.Listener<T> delegate) {
-      super(delegate);
-    }
-
-    @Override
-    public ListenableFuture<Void> onContext(String name, InputStream value) {
-      synchronized (captured) {
-        if (captured.containsKey(name)) {
-          captured.put(name, value);
-          return null;
-        }
-      }
-      return super.onContext(name, value);
-    }
-  }
-}
diff --git a/core/src/main/java/com/google/net/stubby/context/ForwardingChannel.java b/core/src/main/java/com/google/net/stubby/context/ForwardingChannel.java
index 8f299e4..fd7ab6d 100644
--- a/core/src/main/java/com/google/net/stubby/context/ForwardingChannel.java
+++ b/core/src/main/java/com/google/net/stubby/context/ForwardingChannel.java
@@ -4,6 +4,7 @@
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.net.stubby.Call;
 import com.google.net.stubby.Channel;
+import com.google.net.stubby.Metadata;
 import com.google.net.stubby.Status;
 
 import java.io.InputStream;
@@ -35,8 +36,8 @@
     }
 
     @Override
-    public void start(Listener<ResponseT> responseListener) {
-      this.delegate.start(responseListener);
+    public void start(Listener<ResponseT> responseListener, Metadata.Headers headers) {
+      this.delegate.start(responseListener, headers);
     }
 
     @Override
@@ -72,6 +73,11 @@
     }
 
     @Override
+    public ListenableFuture<Void> onHeaders(Metadata.Headers headers) {
+      return delegate.onHeaders(headers);
+    }
+
+    @Override
     public ListenableFuture<Void> onContext(String name, InputStream value) {
       return delegate.onContext(name, value);
     }
@@ -82,8 +88,8 @@
     }
 
     @Override
-    public void onClose(Status status) {
-      delegate.onClose(status);
+    public void onClose(Status status, Metadata.Trailers trailers) {
+      delegate.onClose(status, trailers);
     }
   }
 }
diff --git a/core/src/main/java/com/google/net/stubby/http/ServletSession.java b/core/src/main/java/com/google/net/stubby/http/ServletSession.java
index 5781f7d..2307487 100644
--- a/core/src/main/java/com/google/net/stubby/http/ServletSession.java
+++ b/core/src/main/java/com/google/net/stubby/http/ServletSession.java
@@ -1,10 +1,10 @@
 package com.google.net.stubby.http;
 
-import com.google.common.collect.ImmutableMap;
 import com.google.common.io.ByteBuffers;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.net.stubby.AbstractResponse;
+import com.google.net.stubby.Metadata;
 import com.google.net.stubby.Operation;
 import com.google.net.stubby.Request;
 import com.google.net.stubby.Response;
@@ -20,7 +20,9 @@
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Enumeration;
+import java.util.List;
 import java.util.concurrent.Executor;
 
 import javax.servlet.ServletException;
@@ -97,15 +99,17 @@
       return null;
     }
 
-    ImmutableMap.Builder<String, String> headerMapBuilder = ImmutableMap.builder();
+    List<String> headerList = new ArrayList<>();
     Enumeration headerNames = req.getHeaderNames();
     while (headerNames.hasMoreElements()) {
       String name = headerNames.nextElement().toString();
-      headerMapBuilder.put(name, req.getHeader(name));
+      headerList.add(name);
+      headerList.add(req.getHeader(name));
     }
+    Metadata.Headers headers = new Metadata.Headers(headerList.toArray(new String[]{}));
 
     // Create the operation and bind an HTTP response operation
-    Request op = session.startRequest(operationName, headerMapBuilder.build(),
+    Request op = session.startRequest(operationName, headers,
         HttpResponseOperation.builder(responseStream));
     if (op == null) {
       // TODO(user): Unify error handling once spec finalized
diff --git a/core/src/main/java/com/google/net/stubby/http/UrlConnectionClientSession.java b/core/src/main/java/com/google/net/stubby/http/UrlConnectionClientSession.java
index 22794e4..cb32870 100644
--- a/core/src/main/java/com/google/net/stubby/http/UrlConnectionClientSession.java
+++ b/core/src/main/java/com/google/net/stubby/http/UrlConnectionClientSession.java
@@ -2,6 +2,7 @@
 
 import com.google.common.io.ByteBuffers;
 import com.google.net.stubby.AbstractRequest;
+import com.google.net.stubby.Metadata;
 import com.google.net.stubby.Operation;
 import com.google.net.stubby.Response;
 import com.google.net.stubby.Session;
@@ -16,7 +17,6 @@
 import java.net.HttpURLConnection;
 import java.net.URI;
 import java.nio.ByteBuffer;
-import java.util.Map;
 
 /**
  * Implementation of {@link Session} using {@link HttpURLConnection} for clients. Services
@@ -31,9 +31,10 @@
   }
 
   @Override
-  public Request startRequest(String operationName, Map<String, String> headers,
+  public Request startRequest(String operationName, Metadata.Headers headers,
                               Response.ResponseBuilder responseBuilder) {
-    return new Request(base.resolve(operationName), headers, responseBuilder.build());
+    return new Request(base.resolve(operationName), headers,
+        responseBuilder.build());
   }
 
   private class Request extends AbstractRequest implements Framer.Sink {
@@ -42,7 +43,7 @@
     private final DataOutputStream outputStream;
     private final MessageFramer framer;
 
-    private Request(URI uri, Map<String, String> headers, Response response) {
+    private Request(URI uri, Metadata.Headers headers, Response response) {
       super(response);
       try {
         connection = (HttpURLConnection) uri.toURL().openConnection();
@@ -50,8 +51,11 @@
         connection.setDoInput(true);
         connection.setRequestMethod("POST");
         connection.setRequestProperty("Content-Type", "application/protorpc");
-        for (Map.Entry<String, String> header : headers.entrySet()) {
-          connection.setRequestProperty(header.getKey(), header.getValue());
+        String[] serialized = headers.serializeAscii();
+        for (int i = 0; i < serialized.length; i++) {
+          connection.setRequestProperty(
+              serialized[i],
+              serialized[++i]);
         }
         outputStream = new DataOutputStream(connection.getOutputStream());
       } catch (IOException t) {
diff --git a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Codec.java b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Codec.java
index 6dc46a8..873a880 100644
--- a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Codec.java
+++ b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Codec.java
@@ -1,6 +1,6 @@
 package com.google.net.stubby.http2.netty;
 
-import com.google.common.collect.ImmutableMap;
+import com.google.net.stubby.Metadata;
 import com.google.net.stubby.NoOpRequest;
 import com.google.net.stubby.Operation;
 import com.google.net.stubby.Operation.Phase;
@@ -23,8 +23,6 @@
 import io.netty.handler.codec.http2.Http2Headers;
 import io.netty.handler.codec.http2.Http2Settings;
 
-import java.util.Map;
-
 /**
  * Codec used by clients and servers to interpret HTTP2 frames in the context of an ongoing
  * request-response dialog
@@ -212,12 +210,9 @@
     if (operationName == null) {
       return null;
     }
-    ImmutableMap.Builder<String, String> headerMap = ImmutableMap.builder();
-    for (Map.Entry<String, String> header : headers) {
-      headerMap.put(header);
-    }
+    Metadata.Headers grpcHeaders = new Metadata.Headers(headers);
     // Create the operation and bind a HTTP2 response operation
-    Request op = session.startRequest(operationName, headerMap.build(),
+    Request op = session.startRequest(operationName, grpcHeaders,
         createResponse(new Http2Writer(ctx), streamId));
     if (op == null) {
       return null;
diff --git a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Request.java b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Request.java
index 0d6d2d5..10bf6e4 100644
--- a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Request.java
+++ b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Request.java
@@ -1,14 +1,14 @@
 package com.google.net.stubby.http2.netty;
 
+import com.google.net.stubby.Metadata;
 import com.google.net.stubby.Request;
 import com.google.net.stubby.Response;
 import com.google.net.stubby.transport.Framer;
 
+import io.netty.handler.codec.http2.DefaultHttp2Headers;
+
 import java.net.InetAddress;
 import java.net.UnknownHostException;
-import java.util.Map;
-
-import io.netty.handler.codec.http2.DefaultHttp2Headers;
 
 /**
  * A HTTP2 based implementation of {@link Request}
@@ -30,12 +30,15 @@
   private final Response response;
 
   public Http2Request(Response response, String operationName,
-                      Map<String, String> headers,
+                      Metadata.Headers headers,
                       Http2Codec.Http2Writer writer, Framer framer) {
     super(response.getId(), writer, framer);
     DefaultHttp2Headers.Builder headersBuilder = DefaultHttp2Headers.newBuilder();
-    for (Map.Entry<String, String> entry : headers.entrySet()) {
-      headersBuilder.add(entry.getKey(), entry.getValue());
+    // TODO(user) Switch the ASCII requirement to false once Netty supports binary
+    // headers.
+    String[] headerValues = headers.serializeAscii();
+    for (int i = 0; i < headerValues.length; i++) {
+      headersBuilder.add(headerValues[i], headerValues[++i]);
     }
     headersBuilder.method("POST")
         .path("/" + operationName)
diff --git a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Session.java b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Session.java
index 7e22f60..8a05312 100644
--- a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Session.java
+++ b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Session.java
@@ -1,12 +1,12 @@
 package com.google.net.stubby.http2.netty;
 
+import com.google.net.stubby.Metadata;
 import com.google.net.stubby.Request;
 import com.google.net.stubby.RequestRegistry;
 import com.google.net.stubby.Response;
 import com.google.net.stubby.Session;
 import com.google.net.stubby.transport.MessageFramer;
 
-import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -33,7 +33,7 @@
   }
 
   @Override
-  public Request startRequest(String operationName, Map<String, String> headers,
+  public Request startRequest(String operationName, Metadata.Headers headers,
                               Response.ResponseBuilder response) {
     int nextSessionId = getNextStreamId();
     Request operation = new Http2Request(response.build(nextSessionId), operationName,
diff --git a/core/src/main/java/com/google/net/stubby/http2/okhttp/Http2Request.java b/core/src/main/java/com/google/net/stubby/http2/okhttp/Http2Request.java
index 5f8d47c..afaa921 100644
--- a/core/src/main/java/com/google/net/stubby/http2/okhttp/Http2Request.java
+++ b/core/src/main/java/com/google/net/stubby/http2/okhttp/Http2Request.java
@@ -1,5 +1,6 @@
 package com.google.net.stubby.http2.okhttp;
 
+import com.google.net.stubby.Metadata;
 import com.google.net.stubby.Request;
 import com.google.net.stubby.RequestRegistry;
 import com.google.net.stubby.Response;
@@ -13,7 +14,6 @@
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Map;
 
 /**
  * A HTTP2 based implementation of {@link Request}
@@ -22,7 +22,7 @@
   private final Response response;
 
   public Http2Request(FrameWriter frameWriter, String operationName,
-                     Map<String, String> headers,
+                     Metadata.Headers headers,
                      Response response, RequestRegistry requestRegistry,
                      Framer framer) {
     super(response.getId(), frameWriter, framer);
@@ -31,10 +31,8 @@
       // Register this request.
       requestRegistry.register(this);
 
-      List<Header> requestHeaders = Headers.createRequestHeaders(operationName);
-      for (Map.Entry<String, String> entry : headers.entrySet()) {
-        requestHeaders.add(new Header(entry.getKey(), entry.getValue()));
-      }
+      List<Header> requestHeaders = Headers.createRequestHeaders(operationName,
+          headers.serialize());
       frameWriter.synStream(false, false, getId(), 0, requestHeaders);
     } catch (IOException ioe) {
       close(new Status(Transport.Code.UNKNOWN, ioe));
diff --git a/core/src/main/java/com/google/net/stubby/http2/okhttp/OkHttpSession.java b/core/src/main/java/com/google/net/stubby/http2/okhttp/OkHttpSession.java
index 8fd7c2e..c8f9d39 100644
--- a/core/src/main/java/com/google/net/stubby/http2/okhttp/OkHttpSession.java
+++ b/core/src/main/java/com/google/net/stubby/http2/okhttp/OkHttpSession.java
@@ -4,6 +4,7 @@
 import com.google.common.io.ByteStreams;
 import com.google.common.io.CountingInputStream;
 import com.google.common.io.CountingOutputStream;
+import com.google.net.stubby.Metadata;
 import com.google.net.stubby.Operation;
 import com.google.net.stubby.Operation.Phase;
 import com.google.net.stubby.Request;
@@ -25,18 +26,17 @@
 import com.squareup.okhttp.internal.spdy.Settings;
 import com.squareup.okhttp.internal.spdy.Variant;
 
-import java.io.IOException;
-import java.net.Socket;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import okio.BufferedSink;
 import okio.BufferedSource;
 import okio.ByteString;
 import okio.Okio;
 
+import java.io.IOException;
+import java.net.Socket;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+
 /**
  * Basic implementation of {@link Session} using OkHttp
  */
@@ -147,7 +147,8 @@
   }
 
   @Override
-  public Request startRequest(String operationName, Map<String, String> headers,
+  public Request startRequest(String operationName,
+                              Metadata.Headers headers,
                               Response.ResponseBuilder responseBuilder) {
     int nextStreamId = getNextStreamId();
     Response response = responseBuilder.build(nextStreamId);
@@ -256,18 +257,21 @@
         HeadersMode headersMode) {
       Operation op = getOperation(streamId);
 
-      ImmutableMap.Builder<String, String> mapBuilder = ImmutableMap.builder();
-      for (Header header : headers) {
-        mapBuilder.put(header.name.utf8(), header.value.utf8());
-      }
-      Map<String, String> headersMap = mapBuilder.build();
-
       // Start an Operation for SYN_STREAM
       if (op == null && headersMode == HeadersMode.HTTP_20_HEADERS) {
-        String path = headersMap.get(Header.TARGET_PATH.utf8());
+        String path = findReservedHeader(Header.TARGET_PATH.utf8(), headers);
+        byte[][] binaryHeaders = new byte[headers.size() * 2][];
+        for (int i = 0; i < headers.size(); i++) {
+          Header header = headers.get(i);
+          binaryHeaders[i * 2] = header.name.toByteArray();
+          binaryHeaders[(i * 2) + 1] = header.value.toByteArray();
+        }
+        Metadata.Headers grpcHeaders = new Metadata.Headers(binaryHeaders);
+        grpcHeaders.setPath(path);
+        grpcHeaders.setAuthority(findReservedHeader(Header.TARGET_AUTHORITY.utf8(), headers));
         if (path != null) {
           Request request = serverSession.startRequest(path,
-              headersMap,
+              grpcHeaders,
               Http2Response.builder(streamId, frameWriter, new MessageFramer(4096)));
           requestRegistry.register(request);
           op = request;
@@ -283,6 +287,20 @@
       }
     }
 
+    private String findReservedHeader(String name, List<Header> headers) {
+      for (Header header : headers) {
+        // Reserved headers must come before non-reserved headers, so we can exit the loop
+        // early if we see a non-reserved header.
+        if (!header.name.utf8().startsWith(":")) {
+          return null;
+        }
+        if (header.name.utf8().equals(name)) {
+          return header.value.utf8();
+        }
+      }
+      return null;
+    }
+
     @Override
     public void rstStream(int streamId, ErrorCode errorCode) {
       try {
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/AbstractClientStream.java b/core/src/main/java/com/google/net/stubby/newtransport/AbstractClientStream.java
index 9d11921..99508f8 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/AbstractClientStream.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/AbstractClientStream.java
@@ -5,6 +5,7 @@
 import static com.google.net.stubby.newtransport.StreamState.READ_ONLY;
 
 import com.google.common.base.Preconditions;
+import com.google.net.stubby.Metadata;
 import com.google.net.stubby.Status;
 
 /**
@@ -37,9 +38,10 @@
     // Wraps the base handler to get status update.
     return new ForwardingStreamListener(super.inboundMessageHandler()) {
       @Override
-      public void closed(Status status) {
+      public void closed(Status status, Metadata.Trailers trailers) {
         inboundPhase(Phase.STATUS);
-        setStatus(status);
+        // TODO(user): Fix once we switch the wire format to express status in trailers
+        setStatus(status, new Metadata.Trailers());
       }
     };
   }
@@ -51,7 +53,7 @@
    * @param newStatus the new status to set
    * @return {@code} true if the status was not already set.
    */
-  public boolean setStatus(final Status newStatus) {
+  public boolean setStatus(final Status newStatus, Metadata.Trailers trailers) {
     Preconditions.checkNotNull(newStatus, "newStatus");
     synchronized (stateLock) {
       if (status != null) {
@@ -64,7 +66,7 @@
     }
 
     // Invoke the observer callback.
-    listener.closed(newStatus);
+    listener.closed(newStatus, trailers);
 
     // Free any resources.
     dispose();
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/AbstractClientTransport.java b/core/src/main/java/com/google/net/stubby/newtransport/AbstractClientTransport.java
index 71d48f3..ebd235c 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/AbstractClientTransport.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/AbstractClientTransport.java
@@ -2,6 +2,7 @@
 
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.AbstractService;
+import com.google.net.stubby.Metadata;
 import com.google.net.stubby.MethodDescriptor;
 
 /**
@@ -12,7 +13,9 @@
 public abstract class AbstractClientTransport extends AbstractService implements ClientTransport {
 
   @Override
-  public final ClientStream newStream(MethodDescriptor<?, ?> method, StreamListener listener) {
+  public final ClientStream newStream(MethodDescriptor<?, ?> method,
+                                      Metadata.Headers headers,
+                                      StreamListener listener) {
     Preconditions.checkNotNull(method, "method");
     Preconditions.checkNotNull(listener, "listener");
     if (state() == State.STARTING) {
@@ -25,7 +28,7 @@
     }
 
     // Create the stream.
-    return newStreamInternal(method, listener);
+    return newStreamInternal(method, headers, listener);
   }
 
   /**
@@ -38,5 +41,6 @@
    * @return the new stream.
    */
   protected abstract ClientStream newStreamInternal(MethodDescriptor<?, ?> method,
+      Metadata.Headers headers,
       StreamListener listener);
 }
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/AbstractServerStream.java b/core/src/main/java/com/google/net/stubby/newtransport/AbstractServerStream.java
index 5856c5a..84dbfa8 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/AbstractServerStream.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/AbstractServerStream.java
@@ -5,6 +5,7 @@
 import static com.google.net.stubby.newtransport.StreamState.WRITE_ONLY;
 
 import com.google.common.base.Preconditions;
+import com.google.net.stubby.Metadata;
 import com.google.net.stubby.Status;
 import com.google.net.stubby.transport.Transport;
 
@@ -28,7 +29,7 @@
   }
 
   @Override
-  public final void close(Status status) {
+  public final void close(Status status, Metadata.Trailers trailers) {
     synchronized (stateLock) {
       Preconditions.checkState(!status.isOk() || state == WRITE_ONLY,
           "Cannot close with OK before client half-closes");
@@ -57,7 +58,7 @@
     }
     if (previousState == OPEN) {
       inboundPhase(Phase.STATUS);
-      listener.closed(Status.OK);
+      listener.closed(Status.OK, new Metadata.Trailers());
     } else {
       abortStream(
           new Status(Transport.Code.FAILED_PRECONDITION, "Client-end of the stream already closed"),
@@ -69,8 +70,8 @@
    * Aborts the stream with an error status, cleans up resources and notifies the listener if
    * necessary.
    *
-   * <p>Unlike {@link #close(Status)}, this method is only called from the gRPC framework, so that
-   * we need to call closed() on the listener if it has not been called.
+   * <p>Unlike {@link #close(Status, Metadata.Trailers)}, this method is only called from the
+   * gRPC framework, so that we need to call closed() on the listener if it has not been called.
    *
    * @param status the error status. Must not be Status.OK.
    * @param notifyClient true if the stream is still writable and you want to notify the client
@@ -88,7 +89,7 @@
     }
 
     if (previousState == OPEN) {
-      listener.closed(status);
+      listener.closed(status, new Metadata.Trailers());
     }  // Otherwise, previousState is WRITE_ONLY thus closed() has already been called.
 
     outboundPhase(Phase.STATUS);
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/AbstractStream.java b/core/src/main/java/com/google/net/stubby/newtransport/AbstractStream.java
index 0cef2be..10687c9 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/AbstractStream.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/AbstractStream.java
@@ -4,6 +4,7 @@
 import com.google.common.io.Closeables;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
+import com.google.net.stubby.Metadata;
 import com.google.net.stubby.Status;
 
 import java.io.InputStream;
@@ -42,6 +43,15 @@
    * Internal handler for Deframer output. Informs the {@link #listener()} of inbound messages.
    */
   private final StreamListener inboundMessageHandler = new StreamListener() {
+
+    @Override
+    public ListenableFuture<Void> headersRead(Metadata.Headers headers) {
+      inboundPhase(Phase.CONTEXT);
+      ListenableFuture<Void> future = listener().headersRead(headers);
+      disableWindowUpdate(future);
+      return future;
+    }
+
     @Override
     public ListenableFuture<Void> contextRead(String name, InputStream value, int length) {
       ListenableFuture<Void> future = null;
@@ -69,9 +79,9 @@
     }
 
     @Override
-    public void closed(Status status) {
+    public void closed(Status status, Metadata.Trailers trailers) {
       inboundPhase(Phase.STATUS);
-      listener().closed(status);
+      listener().closed(status, trailers);
     }
   };
 
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/ClientTransport.java b/core/src/main/java/com/google/net/stubby/newtransport/ClientTransport.java
index e32b9e8..a596c6d 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/ClientTransport.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/ClientTransport.java
@@ -1,6 +1,7 @@
 package com.google.net.stubby.newtransport;
 
 import com.google.common.util.concurrent.Service;
+import com.google.net.stubby.Metadata;
 import com.google.net.stubby.MethodDescriptor;
 
 /**
@@ -23,8 +24,11 @@
    * completed (either successfully or unsuccessfully).
    *
    * @param method the descriptor of the remote method to be called for this stream.
+   * @param headers to send at the beginning of the call
    * @param listener the listener for the newly created stream.
    * @return the newly created stream.
    */
-  ClientStream newStream(MethodDescriptor<?, ?> method, StreamListener listener);
+  ClientStream newStream(MethodDescriptor<?, ?> method,
+                         Metadata.Headers headers,
+                         StreamListener listener);
 }
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/Deframer.java b/core/src/main/java/com/google/net/stubby/newtransport/Deframer.java
index f709a5d..72e44c8 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/Deframer.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/Deframer.java
@@ -2,6 +2,7 @@
 
 import com.google.common.io.ByteStreams;
 import com.google.net.stubby.GrpcFramingUtil;
+import com.google.net.stubby.Metadata;
 import com.google.net.stubby.Operation;
 import com.google.net.stubby.Status;
 import com.google.net.stubby.transport.Transport;
@@ -154,7 +155,7 @@
   }
 
   private void writeStatus(Status status) {
-    target.closed(status);
+    target.closed(status, new Metadata.Trailers());
     statusDelivered = true;
   }
 
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/ForwardingStreamListener.java b/core/src/main/java/com/google/net/stubby/newtransport/ForwardingStreamListener.java
index 5cd9b2d..42dbf4b 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/ForwardingStreamListener.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/ForwardingStreamListener.java
@@ -1,6 +1,7 @@
 package com.google.net.stubby.newtransport;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.net.stubby.Metadata;
 import com.google.net.stubby.Status;
 
 import java.io.InputStream;
@@ -17,6 +18,11 @@
   }
 
   @Override
+  public ListenableFuture<Void> headersRead(Metadata.Headers headers) {
+    return delegate.headersRead(headers);
+  }
+
+  @Override
   public ListenableFuture<Void> contextRead(String name, InputStream value, int length) {
     return delegate.contextRead(name, value, length);
   }
@@ -27,7 +33,7 @@
   }
 
   @Override
-  public void closed(Status status) {
-    delegate.closed(status);
+  public void closed(Status status, Metadata.Trailers trailers) {
+    delegate.closed(status, trailers);
   }
 }
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/GrpcDeframer.java b/core/src/main/java/com/google/net/stubby/newtransport/GrpcDeframer.java
index c08fa81..512ac90 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/GrpcDeframer.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/GrpcDeframer.java
@@ -9,6 +9,7 @@
 
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.net.stubby.Metadata;
 import com.google.net.stubby.Status;
 import com.google.net.stubby.transport.Transport;
 
@@ -255,6 +256,6 @@
    */
   private void notifyStatus(Status status) {
     statusNotified = true;
-    listener.closed(status);
+    listener.closed(status, new Metadata.Trailers());
   }
 }
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/ServerStream.java b/core/src/main/java/com/google/net/stubby/newtransport/ServerStream.java
index 1e629bb..987400c 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/ServerStream.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/ServerStream.java
@@ -1,8 +1,8 @@
 package com.google.net.stubby.newtransport;
 
+import com.google.net.stubby.Metadata;
 import com.google.net.stubby.Status;
 
-
 /**
  * Extension of {@link Stream} to support server-side termination semantics.
  */
@@ -14,6 +14,7 @@
    * local side of the stream (i.e. half-closed). Any other value implies abnormal termination.
    *
    * @param status details for the closure of the local-side of this stream.
+   * @param trailers an additional block of headers to pass to the client on stream closure.
    */
-  void close(Status status);
+  void close(Status status, Metadata.Trailers trailers);
 }
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/ServerTransportListener.java b/core/src/main/java/com/google/net/stubby/newtransport/ServerTransportListener.java
index 7453968..6edfa8a 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/ServerTransportListener.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/ServerTransportListener.java
@@ -1,5 +1,6 @@
 package com.google.net.stubby.newtransport;
 
+import com.google.net.stubby.Metadata;
 import com.google.net.stubby.MethodDescriptor;
 
 /**
@@ -12,7 +13,9 @@
    *
    * @param stream the newly created stream.
    * @param method the full method name being called on the server.
+   * @param headers containing metadata for the call.
    * @return a listener for events on the new stream.
    */
-  StreamListener streamCreated(ServerStream stream, String method);
+  StreamListener streamCreated(ServerStream stream, String method,
+                               Metadata.Headers headers);
 }
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/Stream.java b/core/src/main/java/com/google/net/stubby/newtransport/Stream.java
index 77dc1cc..e2853fe 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/Stream.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/Stream.java
@@ -33,6 +33,7 @@
    * @param length the length of the {@link InputStream}.
    * @param accepted an optional callback for when the transport has accepted the write.
    */
+  @Deprecated
   void writeContext(String name, InputStream value, int length, @Nullable Runnable accepted);
 
   /**
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/StreamListener.java b/core/src/main/java/com/google/net/stubby/newtransport/StreamListener.java
index 559c368..7a3d71f 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/StreamListener.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/StreamListener.java
@@ -1,6 +1,7 @@
 package com.google.net.stubby.newtransport;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.net.stubby.Metadata;
 import com.google.net.stubby.Status;
 
 import java.io.InputStream;
@@ -14,6 +15,17 @@
 public interface StreamListener {
 
   /**
+   * Called upon receiving all header information from the remote end-point.
+   * <p>This method should return quickly, as the same thread may be used to process other streams.
+   *
+   * @param headers the fully buffered received headers.
+   * @return a processing completion future, or {@code null} to indicate that processing of the
+   *         headers is immediately complete.
+   */
+  @Nullable
+  ListenableFuture<Void> headersRead(Metadata.Headers headers);
+
+  /**
    * Called upon receiving context information from the remote end-point. The {@link InputStream} is
    * non-blocking and contains the entire context.
    *
@@ -33,6 +45,7 @@
    *         context is immediately complete.
    */
   @Nullable
+  @Deprecated
   ListenableFuture<Void> contextRead(String name, InputStream value, int length);
 
   /**
@@ -58,15 +71,9 @@
   ListenableFuture<Void> messageRead(InputStream message, int length);
 
   /**
-   * Called when the remote side of the transport closed. A status code of
-   * {@link com.google.net.stubby.transport.Transport.Code#OK} implies normal termination of the
-   * remote side of the stream (i.e. half-closed). Any other value implies abnormal termination. If
-   * the remote end-point was abnormally terminated, no further messages will be received on the
-   * stream.
-   *
-   * <p>This method should return quickly, as the same thread may be used to process other streams.
-   *
-   * @param status details of the remote stream closure.
+   * Called when the remote side of the transport closed.
+   * @param status details about the remote closure
+   * @param trailers that may optionally include status information about call closure.
    */
-  void closed(Status status);
+  void closed(Status status, Metadata.Trailers trailers);
 }
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/http/HttpClientTransport.java b/core/src/main/java/com/google/net/stubby/newtransport/http/HttpClientTransport.java
index f446166..399840f 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/http/HttpClientTransport.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/http/HttpClientTransport.java
@@ -7,6 +7,7 @@
 
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.net.stubby.Metadata;
 import com.google.net.stubby.MethodDescriptor;
 import com.google.net.stubby.Status;
 import com.google.net.stubby.newtransport.AbstractClientStream;
@@ -41,9 +42,11 @@
   }
 
   @Override
-  protected ClientStream newStreamInternal(MethodDescriptor<?, ?> method, StreamListener listener) {
+  protected ClientStream newStreamInternal(MethodDescriptor<?, ?> method,
+                                           Metadata.Headers headers,
+                                           StreamListener listener) {
     URI uri = baseUri.resolve(method.getName());
-    HttpClientStream stream = new HttpClientStream(uri, listener);
+    HttpClientStream stream = new HttpClientStream(uri, headers.serializeAscii(), listener);
     synchronized (streams) {
       // Check for RUNNING to deal with race condition of this being executed right after doStop
       // cancels all the streams.
@@ -80,7 +83,7 @@
     final DataOutputStream outputStream;
     boolean connected;
 
-    HttpClientStream(URI uri, StreamListener listener) {
+    HttpClientStream(URI uri, String[] headers, StreamListener listener) {
       super(listener);
 
       try {
@@ -89,6 +92,9 @@
         connection.setDoInput(true);
         connection.setRequestMethod(HTTP_METHOD);
         connection.setRequestProperty(CONTENT_TYPE_HEADER, CONTENT_TYPE_PROTORPC);
+        for (int i = 0; i < headers.length; i++) {
+          connection.setRequestProperty(headers[i], headers[++i]);
+        }
         outputStream = new DataOutputStream(connection.getOutputStream());
         connected = true;
       } catch (IOException e) {
@@ -99,7 +105,7 @@
     @Override
     public void cancel() {
       outboundPhase = Phase.STATUS;
-      if (setStatus(CANCELLED)) {
+      if (setStatus(CANCELLED, new Metadata.Trailers())) {
         disconnect();
       }
     }
@@ -136,7 +142,7 @@
           }
         }
       } catch (IOException ioe) {
-        setStatus(new Status(Transport.Code.INTERNAL, ioe));
+        setStatus(new Status(Transport.Code.INTERNAL, ioe), new Metadata.Trailers());
       }
     }
 
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/netty/CreateStreamCommand.java b/core/src/main/java/com/google/net/stubby/newtransport/netty/CreateStreamCommand.java
index 2107796..98fea67 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/netty/CreateStreamCommand.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/netty/CreateStreamCommand.java
@@ -8,12 +8,15 @@
  * {@link NettyClientHandler} for processing in the Channel thread.
  */
 class CreateStreamCommand {
-  final MethodDescriptor<?, ?> method;
-  final NettyClientStream stream;
+  private final MethodDescriptor<?, ?> method;
+  private final String[] headers;
+  private final NettyClientStream stream;
 
-  CreateStreamCommand(MethodDescriptor<?, ?> method, NettyClientStream stream) {
+  CreateStreamCommand(MethodDescriptor<?, ?> method, String[] headers,
+                      NettyClientStream stream) {
     this.method = Preconditions.checkNotNull(method, "method");
     this.stream = Preconditions.checkNotNull(stream, "stream");
+    this.headers = Preconditions.checkNotNull(headers, "headers");
   }
 
   MethodDescriptor<?, ?> method() {
@@ -23,4 +26,8 @@
   NettyClientStream stream() {
     return stream;
   }
+
+  String[] headers() {
+    return headers;
+  }
 }
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientHandler.java b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientHandler.java
index 567ece4..cada74e 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientHandler.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientHandler.java
@@ -6,6 +6,7 @@
 import static com.google.net.stubby.newtransport.netty.NettyClientStream.PENDING_STREAM_ID;
 
 import com.google.common.base.Preconditions;
+import com.google.net.stubby.Metadata;
 import com.google.net.stubby.MethodDescriptor;
 import com.google.net.stubby.Status;
 import com.google.net.stubby.transport.Transport;
@@ -32,7 +33,6 @@
 import java.util.ArrayDeque;
 import java.util.Deque;
 import java.util.Iterator;
-import java.util.Map;
 
 /**
  * Client-side Netty handler for GRPC processing. All event handlers are executed entirely within
@@ -46,11 +46,13 @@
    */
   private final class PendingStream {
     private final MethodDescriptor<?, ?> method;
+    private final String[] headers;
     private final NettyClientStream stream;
     private final ChannelPromise promise;
 
     public PendingStream(CreateStreamCommand command, ChannelPromise promise) {
       method = command.method();
+      headers = command.headers();
       stream = command.stream();
       this.promise = promise;
     }
@@ -155,7 +157,7 @@
     // TODO(user): do something with errorCode?
     Http2Stream http2Stream = connection().requireStream(streamId);
     NettyClientStream stream = clientStream(http2Stream);
-    stream.setStatus(new Status(Transport.Code.UNKNOWN));
+    stream.setStatus(new Status(Transport.Code.UNKNOWN), new Metadata.Trailers());
   }
 
   /**
@@ -170,7 +172,7 @@
 
     // Any streams that are still active must be closed.
     for (Http2Stream stream : http2Streams()) {
-      clientStream(stream).setStatus(goAwayStatus);
+      clientStream(stream).setStatus(goAwayStatus, new Metadata.Trailers());
     }
   }
 
@@ -194,7 +196,7 @@
     // Close the stream with a status that contains the cause.
     Http2Stream stream = connection().stream(cause.streamId());
     if (stream != null) {
-      clientStream(stream).setStatus(Status.fromThrowable(cause));
+      clientStream(stream).setStatus(Status.fromThrowable(cause), new Metadata.Trailers());
     }
     super.onStreamError(ctx, cause);
   }
@@ -217,7 +219,7 @@
   private void cancelStream(ChannelHandlerContext ctx, CancelStreamCommand cmd,
       ChannelPromise promise) throws Http2Exception {
     NettyClientStream stream = cmd.stream();
-    stream.setStatus(Status.CANCELLED);
+    stream.setStatus(Status.CANCELLED, new Metadata.Trailers());
 
     // No need to set the stream status for a cancellation. It should already have been
     // set prior to sending the command.
@@ -272,7 +274,7 @@
       int lastKnownStream = connection().local().lastKnownStream();
       for (Http2Stream stream : http2Streams()) {
         if (lastKnownStream < stream.id()) {
-          clientStream(stream).setStatus(goAwayStatus);
+          clientStream(stream).setStatus(goAwayStatus, new Metadata.Trailers());
           stream.close();
         }
       }
@@ -319,10 +321,12 @@
       // Finish creation of the stream by writing a headers frame.
       final PendingStream pendingStream = pendingStreams.remove();
       // TODO(user): Change Netty to not send priority, just use default.
+      // TODO(user): Switch to binary headers when Netty supports it.
       DefaultHttp2Headers.Builder headersBuilder = DefaultHttp2Headers.newBuilder();
-      // Add custom headers from the method descriptor
-      for (Map.Entry<String, String> entry : pendingStream.method.getHeaders().entrySet()) {
-        headersBuilder.add(entry.getKey(), entry.getValue());
+      for (int i = 0; i < pendingStream.headers.length; i++) {
+        headersBuilder.add(
+            pendingStream.headers[i],
+            pendingStream.headers[++i]);
       }
       headersBuilder
           .method(HTTP_METHOD)
@@ -410,7 +414,8 @@
       case RESERVED_REMOTE:
         // Disallowed state, terminate the stream.
         clientStream(stream).setStatus(
-            new Status(Transport.Code.INTERNAL, "Stream in invalid state: " + stream.state()));
+            new Status(Transport.Code.INTERNAL, "Stream in invalid state: " + stream.state()),
+            new Metadata.Trailers());
         writeRstStream(ctx(), stream.id(), Http2Error.INTERNAL_ERROR.code(), ctx().newPromise());
         break;
       default:
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientStream.java b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientStream.java
index 1460ee5..62b7196 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientStream.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientStream.java
@@ -5,6 +5,7 @@
 
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.net.stubby.Metadata;
 import com.google.net.stubby.Status;
 import com.google.net.stubby.newtransport.AbstractClientStream;
 import com.google.net.stubby.newtransport.GrpcDeframer;
@@ -73,7 +74,7 @@
     responseCode = responseCode(headers);
     isGrpcResponse = isGrpcResponse(headers, responseCode);
     if (!isGrpcResponse && endOfStream) {
-      setStatus(new Status(responseCode));
+      setStatus(new Status(responseCode), new Metadata.Trailers());
     }
   }
 
@@ -102,7 +103,7 @@
 
       if (endOfStream) {
         String msg = nonGrpcErrorMessage.toString();
-        setStatus(new Status(responseCode, msg));
+        setStatus(new Status(responseCode, msg), new Metadata.Trailers());
       }
     }
   }
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientTransport.java b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientTransport.java
index 80c34ff..b23dd5d 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientTransport.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientTransport.java
@@ -3,6 +3,7 @@
 import static io.netty.channel.ChannelOption.SO_KEEPALIVE;
 
 import com.google.common.base.Preconditions;
+import com.google.net.stubby.Metadata;
 import com.google.net.stubby.MethodDescriptor;
 import com.google.net.stubby.newtransport.AbstractClientTransport;
 import com.google.net.stubby.newtransport.ClientStream;
@@ -11,11 +12,6 @@
 import com.google.net.stubby.newtransport.netty.NettyClientTransportFactory.NegotiationType;
 import com.google.net.stubby.testing.utils.ssl.SslContextFactory;
 
-import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
-
-import io.netty.handler.codec.http2.Http2InboundFrameLogger;
-import io.netty.util.internal.logging.InternalLogLevel;
-import io.netty.handler.codec.http2.Http2FrameLogger;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
@@ -30,9 +26,13 @@
 import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController;
 import io.netty.handler.codec.http2.DefaultHttp2StreamRemovalPolicy;
 import io.netty.handler.codec.http2.Http2Connection;
+import io.netty.handler.codec.http2.Http2FrameLogger;
 import io.netty.handler.codec.http2.Http2FrameReader;
 import io.netty.handler.codec.http2.Http2FrameWriter;
+import io.netty.handler.codec.http2.Http2InboundFrameLogger;
 import io.netty.handler.codec.http2.Http2OutboundFlowController;
+import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
+import io.netty.util.internal.logging.InternalLogLevel;
 
 import java.util.concurrent.ExecutionException;
 
@@ -83,13 +83,17 @@
   }
 
   @Override
-  protected ClientStream newStreamInternal(MethodDescriptor<?, ?> method, StreamListener listener) {
+  protected ClientStream newStreamInternal(MethodDescriptor<?, ?> method,
+                                           Metadata.Headers headers,
+                                           StreamListener listener) {
     // Create the stream.
     NettyClientStream stream = new NettyClientStream(listener, channel, handler.inboundFlow());
 
     try {
       // Write the request and await creation of the stream.
-      channel.writeAndFlush(new CreateStreamCommand(method, stream)).get();
+      channel.writeAndFlush(new CreateStreamCommand(method,
+          headers.serializeAscii(),
+          stream)).get();
     } catch (InterruptedException e) {
       // Restore the interrupt.
       Thread.currentThread().interrupt();
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyServerHandler.java b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyServerHandler.java
index e857a6c..fddaee0 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyServerHandler.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyServerHandler.java
@@ -5,27 +5,27 @@
 import static com.google.net.stubby.newtransport.HttpUtil.HTTP_METHOD;
 
 import com.google.common.base.Preconditions;
-import com.google.net.stubby.MethodDescriptor;
+import com.google.net.stubby.Metadata;
 import com.google.net.stubby.Status;
 import com.google.net.stubby.newtransport.ServerTransportListener;
 import com.google.net.stubby.newtransport.StreamListener;
 import com.google.net.stubby.newtransport.TransportFrameUtil;
 import com.google.net.stubby.transport.Transport;
 
-import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController;
-import io.netty.handler.codec.http2.Http2FrameReader;
-import io.netty.handler.codec.http2.Http2FrameWriter;
-import io.netty.handler.codec.http2.Http2OutboundFlowController;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
 import io.netty.handler.codec.http2.AbstractHttp2ConnectionHandler;
 import io.netty.handler.codec.http2.DefaultHttp2Headers;
+import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController;
 import io.netty.handler.codec.http2.Http2Connection;
 import io.netty.handler.codec.http2.Http2ConnectionAdapter;
 import io.netty.handler.codec.http2.Http2Error;
 import io.netty.handler.codec.http2.Http2Exception;
+import io.netty.handler.codec.http2.Http2FrameReader;
+import io.netty.handler.codec.http2.Http2FrameWriter;
 import io.netty.handler.codec.http2.Http2Headers;
+import io.netty.handler.codec.http2.Http2OutboundFlowController;
 import io.netty.handler.codec.http2.Http2Stream;
 import io.netty.handler.codec.http2.Http2StreamException;
 import io.netty.util.ReferenceCountUtil;
@@ -83,7 +83,8 @@
       Http2Stream http2Stream = connection().requireStream(streamId);
       http2Stream.data(stream);
       String method = determineMethod(streamId, headers);
-      StreamListener listener = transportListener.streamCreated(stream, method);
+      StreamListener listener = transportListener.streamCreated(stream, method,
+          new Metadata.Headers(headers));
       stream.setListener(listener);
     } catch (Http2Exception e) {
       throw e;
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/okhttp/Headers.java b/core/src/main/java/com/google/net/stubby/newtransport/okhttp/Headers.java
index 1775f7b..f63640a 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/okhttp/Headers.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/okhttp/Headers.java
@@ -4,6 +4,8 @@
 
 import com.squareup.okhttp.internal.spdy.Header;
 
+import okio.ByteString;
+
 import java.util.List;
 
 /**
@@ -15,12 +17,15 @@
       new Header("content-type", "application/protorpc");
   public static final Header RESPONSE_STATUS_OK = new Header(Header.RESPONSE_STATUS, "200");
 
-  public static List<Header> createRequestHeaders(String operationName) {
-    List<Header> headers = Lists.newArrayListWithCapacity(6);
-    headers.add(new Header(Header.TARGET_PATH, operationName));
-    headers.add(SCHEME_HEADER);
-    headers.add(CONTENT_TYPE_HEADER);
-    return headers;
+  public static List<Header> createRequestHeaders(String operationName, byte[][] headers) {
+    List<Header> okhttpHeaders = Lists.newArrayListWithCapacity(6);
+    okhttpHeaders.add(new Header(Header.TARGET_PATH, operationName));
+    okhttpHeaders.add(SCHEME_HEADER);
+    okhttpHeaders.add(CONTENT_TYPE_HEADER);
+    for (int i = 0; i < headers.length; i++) {
+      okhttpHeaders.add(new Header(ByteString.of(headers[i]), ByteString.of(headers[++i])));
+    }
+    return okhttpHeaders;
   }
 
   public static List<Header> createResponseHeaders() {
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/okhttp/OkHttpClientTransport.java b/core/src/main/java/com/google/net/stubby/newtransport/okhttp/OkHttpClientTransport.java
index 22350b6..4d94ac1 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/okhttp/OkHttpClientTransport.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/okhttp/OkHttpClientTransport.java
@@ -4,6 +4,7 @@
 import com.google.common.base.Preconditions;
 import com.google.common.io.ByteStreams;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.net.stubby.Metadata;
 import com.google.net.stubby.MethodDescriptor;
 import com.google.net.stubby.Status;
 import com.google.net.stubby.newtransport.AbstractClientStream;
@@ -87,7 +88,7 @@
   private final int port;
   private FrameReader frameReader;
   private AsyncFrameWriter frameWriter;
-  private Object lock = new Object();
+  private final Object lock = new Object();
   @GuardedBy("lock")
   private int nextStreamId;
   private final Map<Integer, OkHttpClientStream> streams =
@@ -125,8 +126,10 @@
   }
 
   @Override
-  protected ClientStream newStreamInternal(MethodDescriptor<?, ?> method, StreamListener listener) {
-    return new OkHttpClientStream(method, listener);
+  protected ClientStream newStreamInternal(MethodDescriptor<?, ?> method,
+                                           Metadata.Headers headers,
+                                           StreamListener listener) {
+    return new OkHttpClientStream(method, headers.serialize(), listener);
   }
 
   @Override
@@ -204,7 +207,7 @@
     stopAsync();
 
     for (OkHttpClientStream stream : goAwayStreams) {
-      stream.setStatus(status);
+      stream.setStatus(status, new Metadata.Trailers());
     }
   }
 
@@ -218,7 +221,7 @@
     stream = streams.remove(streamId);
     if (stream != null) {
       // This is mainly for failed streams, for successfully finished streams, it's a no-op.
-      stream.setStatus(status);
+      stream.setStatus(status, new Metadata.Trailers());
       return true;
     }
     return false;
@@ -398,18 +401,18 @@
     final InputStreamDeframer deframer;
     int unacknowledgedBytesRead;
 
-    OkHttpClientStream(MethodDescriptor<?, ?> method, StreamListener listener) {
+    OkHttpClientStream(MethodDescriptor<?, ?> method, byte[][] headers, StreamListener listener) {
       super(listener);
       deframer = new InputStreamDeframer(inboundMessageHandler());
       synchronized (lock) {
         if (goAway) {
-          setStatus(goAwayStatus);
+          setStatus(goAwayStatus, new Metadata.Trailers());
           return;
         }
         assignStreamId(this);
       }
       frameWriter.synStream(false, false, streamId, 0,
-          Headers.createRequestHeaders(method.getName()));
+          Headers.createRequestHeaders(method.getName(), headers));
     }
 
     InputStreamDeframer getDeframer() {
diff --git a/core/src/test/java/com/google/net/stubby/MetadataTest.java b/core/src/test/java/com/google/net/stubby/MetadataTest.java
new file mode 100644
index 0000000..168c539
--- /dev/null
+++ b/core/src/test/java/com/google/net/stubby/MetadataTest.java
@@ -0,0 +1,123 @@
+package com.google.net.stubby;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+
+/**
+ * Tests for {@link Metadata}
+ */
+@RunWith(JUnit4.class)
+public class MetadataTest {
+
+  private static final Metadata.Marshaller<Fish> FISH_MARSHALLER =
+      new Metadata.Marshaller<Fish>() {
+    @Override
+    public byte[] toBytes(Fish fish) {
+      return fish.name.getBytes(StandardCharsets.UTF_8);
+    }
+
+    @Override
+    public String toAscii(Fish value) {
+      return value.name;
+    }
+
+        @Override
+    public Fish parseBytes(byte[] serialized) {
+      return new Fish(new String(serialized, StandardCharsets.UTF_8));
+    }
+
+    @Override
+    public Fish parseAscii(String ascii) {
+      return new Fish(ascii);
+    }
+  };
+
+  private static final String LANCE = "lance";
+  private static final byte[] LANCE_BYTES = LANCE.getBytes(StandardCharsets.US_ASCII);
+  private static final Metadata.Key<Fish> KEY = new Metadata.Key<Fish>("test", FISH_MARSHALLER);
+
+  @Test
+  public void testWriteParsed() {
+    Fish lance = new Fish(LANCE);
+    Metadata.Headers metadata = new Metadata.Headers();
+    metadata.put(KEY, lance);
+    // Should be able to read same instance out
+    assertSame(lance, metadata.get(KEY));
+    Iterator<Fish> fishes = metadata.<Fish>getAll(KEY).iterator();
+    assertTrue(fishes.hasNext());
+    assertSame(fishes.next(), lance);
+    assertFalse(fishes.hasNext());
+    byte[][] serialized = metadata.serialize();
+    assertEquals(2, serialized.length);
+    assertEquals(new String(serialized[0], StandardCharsets.US_ASCII), "test");
+    assertArrayEquals(LANCE_BYTES, serialized[1]);
+    assertSame(lance, metadata.get(KEY));
+    // Serialized instance should be cached too
+    assertSame(serialized[0], metadata.serialize()[0]);
+    assertSame(serialized[1], metadata.serialize()[1]);
+  }
+
+  @Test
+  public void testWriteRaw() {
+    Metadata.Headers raw = new Metadata.Headers(
+        KEY.asciiName(), LANCE_BYTES);
+    Fish lance = raw.get(KEY);
+    assertEquals(lance, new Fish(LANCE));
+    // Reading again should return the same parsed instance
+    assertSame(lance, raw.get(KEY));
+  }
+
+  @Test
+  public void testFailSerializeRaw() {
+    Metadata.Headers raw = new Metadata.Headers(
+        KEY.asciiName(), LANCE_BYTES);
+
+    try {
+      raw.serialize();
+      fail("Can't serialize raw metadata");
+    } catch (IllegalStateException ise) {
+      // Success
+    }
+  }
+
+  @Test
+  public void testFailMergeRawIntoSerializable() {
+    Metadata.Headers raw = new Metadata.Headers(
+        KEY.asciiName(), LANCE_BYTES);
+    Metadata.Headers serializable = new Metadata.Headers();
+    try {
+      serializable.merge(raw);
+      fail("Can't serialize raw metadata");
+    } catch (IllegalArgumentException iae) {
+      // Success
+    }
+  }
+
+  private static class Fish {
+    private String name;
+
+    private Fish(String name) {
+      this.name = name;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+      Fish fish = (Fish) o;
+      if (name != null ? !name.equals(fish.name) : fish.name != null) return false;
+      return true;
+    }
+  }
+}
diff --git a/core/src/test/java/com/google/net/stubby/context/ContextExchangeChannelTest.java b/core/src/test/java/com/google/net/stubby/context/ContextExchangeChannelTest.java
deleted file mode 100644
index 58b8a74..0000000
--- a/core/src/test/java/com/google/net/stubby/context/ContextExchangeChannelTest.java
+++ /dev/null
@@ -1,133 +0,0 @@
-package com.google.net.stubby.context;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.mockito.Mockito.*;
-
-import com.google.common.util.concurrent.SettableFuture;
-import com.google.net.stubby.Call;
-import com.google.net.stubby.Channel;
-import com.google.net.stubby.MethodDescriptor;
-import com.google.net.stubby.Status;
-import com.google.net.stubby.stub.Marshallers;
-import com.google.net.stubby.testing.integration.Messages.Payload;
-import com.google.net.stubby.testing.integration.Messages.PayloadType;
-import com.google.net.stubby.testing.integration.Messages.SimpleRequest;
-import com.google.net.stubby.testing.integration.Messages.SimpleResponse;
-import com.google.net.stubby.testing.integration.TestServiceGrpc;
-import com.google.protobuf.ByteString;
-
-import org.hamcrest.BaseMatcher;
-import org.hamcrest.Description;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import javax.inject.Provider;
-
-/**
- * Tests for {@link ContextExchangeChannel}
- */
-@RunWith(JUnit4.class)
-public class ContextExchangeChannelTest {
-
-  private static final SimpleRequest REQ = SimpleRequest.newBuilder().setPayload(
-      Payload.newBuilder().setPayloadCompressable("mary").
-          setPayloadType(PayloadType.COMPRESSABLE).build())
-      .build();
-
-  private static final SimpleResponse RESP = SimpleResponse.newBuilder().setPayload(
-      Payload.newBuilder().setPayloadCompressable("bob").
-          setPayloadType(PayloadType.COMPRESSABLE).build())
-      .build();
-
-  @Mock
-  Channel channel;
-
-  @Mock
-  Call call;
-
-  @Before @SuppressWarnings("unchecked")
-  public void setup() {
-    MockitoAnnotations.initMocks(this);
-    when(channel.newCall(Mockito.any(MethodDescriptor.class))).thenReturn(call);
-  }
-
-  @Test
-  public void testReceive() throws Exception {
-    ContextExchangeChannel exchange = new ContextExchangeChannel(channel);
-    Provider<SimpleResponse> auth =
-        exchange.receive("auth", Marshallers.forProto(SimpleResponse.PARSER));
-    // Should be null, nothing has happened
-    assertNull(auth.get());
-    TestServiceGrpc.TestServiceBlockingStub stub =
-        TestServiceGrpc.newBlockingStub(exchange);
-    callStub(stub);
-    assertEquals(RESP, auth.get());
-    exchange.clearLastReceived();
-    assertNull(auth.get());
-  }
-
-  @Test @SuppressWarnings("unchecked")
-  public void testSend() throws Exception {
-    ContextExchangeChannel exchange = new ContextExchangeChannel(channel);
-    exchange.send("auth", RESP, Marshallers.forProto(SimpleResponse.PARSER));
-    TestServiceGrpc.TestServiceBlockingStub stub =
-        TestServiceGrpc.newBlockingStub(exchange);
-    callStub(stub);
-    verify(call).sendContext(eq("auth"),
-        argThat(new BaseMatcher<InputStream>() {
-          @Override
-          public boolean matches(Object o) {
-            try {
-              // Just check the length, consuming the stream will fail the test and Mockito
-              // calls this more than once.
-              return ((InputStream) o).available() == RESP.getSerializedSize();
-            } catch (IOException ioe) {
-              throw new RuntimeException(ioe);
-            }
-          }
-
-          @Override
-          public void describeTo(Description description) {
-          }
-        }), (SettableFuture<Void>) isNull());
-  }
-
-  @SuppressWarnings("unchecked")
-  private void callStub(final TestServiceGrpc.TestServiceBlockingStub stub) throws Exception {
-    when(channel.newCall(Mockito.<MethodDescriptor>any())).thenReturn(call);
-
-    // execute the call in another thread so we don't deadlock waiting for the
-    // listener.onClose
-    Future<?> pending = Executors.newSingleThreadExecutor().submit(new Runnable() {
-      @Override
-      public void run() {
-        stub.unaryCall(REQ);
-      }
-    });
-    ArgumentCaptor<Call.Listener> listenerCapture = ArgumentCaptor.forClass(Call.Listener.class);
-    // Wait for the call to start to capture the listener
-    verify(call, timeout(1000)).start(listenerCapture.capture());
-
-    ByteString response = RESP.toByteString();
-    Call.Listener listener = listenerCapture.getValue();
-    // Respond with a context-value
-    listener.onContext("auth", response.newInput());
-    listener.onContext("something-else", response.newInput());
-    // .. and single payload
-    listener.onPayload(RESP);
-    listener.onClose(Status.OK);
-    pending.get();
-  }
-}
diff --git a/core/src/test/java/com/google/net/stubby/newtransport/GrpcDeframerTest.java b/core/src/test/java/com/google/net/stubby/newtransport/GrpcDeframerTest.java
index e39d80f..24d4f0d 100644
--- a/core/src/test/java/com/google/net/stubby/newtransport/GrpcDeframerTest.java
+++ b/core/src/test/java/com/google/net/stubby/newtransport/GrpcDeframerTest.java
@@ -9,6 +9,7 @@
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.notNull;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -16,6 +17,7 @@
 import com.google.common.io.ByteStreams;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
+import com.google.net.stubby.Metadata;
 import com.google.net.stubby.Status;
 import com.google.net.stubby.transport.Transport;
 import com.google.protobuf.ByteString;
@@ -228,7 +230,7 @@
 
   private void verifyStatus(Transport.Code code) {
     ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
-    verify(listener).closed(captor.capture());
+    verify(listener).closed(captor.capture(), notNull(Metadata.Trailers.class));
     assertEquals(code, captor.getValue().getCode());
   }
 
@@ -241,7 +243,7 @@
   }
 
   private void verifyNoStatus() {
-    verify(listener, never()).closed(any(Status.class));
+    verify(listener, never()).closed(any(Status.class), notNull(Metadata.Trailers.class));
   }
 
   private byte[] contextFrame() throws IOException {
diff --git a/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyClientHandlerTest.java b/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyClientHandlerTest.java
index 33946b5..9c7ce95 100644
--- a/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyClientHandlerTest.java
+++ b/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyClientHandlerTest.java
@@ -7,28 +7,20 @@
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.notNull;
 import static org.mockito.Mockito.calls;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import com.google.common.collect.ImmutableMap;
+import com.google.net.stubby.Metadata;
 import com.google.net.stubby.MethodDescriptor;
 import com.google.net.stubby.Status;
 import com.google.net.stubby.newtransport.HttpUtil;
 import com.google.net.stubby.newtransport.StreamState;
 import com.google.net.stubby.transport.Transport;
 
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.ArgumentCaptor;
-import org.mockito.InOrder;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandlerContext;
@@ -47,6 +39,15 @@
 import io.netty.handler.codec.http2.Http2OutboundFlowController;
 import io.netty.handler.codec.http2.Http2Settings;
 
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
 /**
  * Tests for {@link NettyClientHandler}.
  */
@@ -63,6 +64,7 @@
   @Mock
   private MethodDescriptor<?, ?> method;
   private ByteBuf content;
+  private Metadata.Headers grpcHeaders;
 
   @Before
   public void setup() throws Exception {
@@ -77,8 +79,11 @@
     mockContext();
     mockFuture(true);
 
+    Metadata.Key key = new Metadata.Key("auth", Metadata.STRING_MARSHALLER);
+    grpcHeaders = new Metadata.Headers();
+    grpcHeaders.put(key, "sometoken");
+
     when(method.getName()).thenReturn("fakemethod");
-    when(method.getHeaders()).thenReturn(ImmutableMap.of("auth", "sometoken"));
     when(stream.state()).thenReturn(StreamState.OPEN);
 
     // Simulate activation of the handler to force writing of the initial settings
@@ -95,7 +100,8 @@
 
   @Test
   public void createStreamShouldSucceed() throws Exception {
-    handler.write(ctx, new CreateStreamCommand(method, stream), promise);
+    handler.write(ctx, new CreateStreamCommand(method, grpcHeaders.serializeAscii(), stream),
+        promise);
     verify(promise).setSuccess();
     verify(stream).id(eq(3));
 
@@ -184,7 +190,8 @@
   public void createShouldQueueStream() throws Exception {
     // Disallow stream creation to force the stream to get added to the pending queue.
     setMaxConcurrentStreams(0);
-    handler.write(ctx, new CreateStreamCommand(method, stream), promise);
+    handler.write(ctx, new CreateStreamCommand(method, grpcHeaders.serializeAscii(), stream),
+        promise);
 
     // Make sure the write never occurred.
     verify(frameListener, never()).onHeadersRead(eq(ctx),
@@ -201,7 +208,8 @@
   public void receivedGoAwayShouldFailQueuedStreams() throws Exception {
     // Force a stream to get added to the pending queue.
     setMaxConcurrentStreams(0);
-    handler.write(ctx, new CreateStreamCommand(method, stream), promise);
+    handler.write(ctx, new CreateStreamCommand(method, grpcHeaders.serializeAscii(), stream),
+        promise);
 
     handler.channelRead(ctx, goAwayFrame(0));
     verify(promise).setFailure(any(Throwable.class));
@@ -210,13 +218,14 @@
   @Test
   public void receivedGoAwayShouldFailUnknownStreams() throws Exception {
     // Force a stream to get added to the pending queue.
-    handler.write(ctx, new CreateStreamCommand(method, stream), promise);
+    handler.write(ctx, new CreateStreamCommand(method, grpcHeaders.serializeAscii(), stream),
+        promise);
 
     // Read a GOAWAY that indicates our stream was never processed by the server.
     handler.channelRead(ctx, goAwayFrame(0));
     ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
     InOrder inOrder = inOrder(stream);
-    inOrder.verify(stream, calls(1)).setStatus(captor.capture());
+    inOrder.verify(stream, calls(1)).setStatus(captor.capture(), notNull(Metadata.Trailers.class));
     assertEquals(Transport.Code.UNAVAILABLE, captor.getValue().getCode());
   }
 
@@ -237,7 +246,8 @@
 
   private void createStream() throws Exception {
     // Create the stream.
-    handler.write(ctx, new CreateStreamCommand(method, stream), promise);
+    handler.write(ctx, new CreateStreamCommand(method, grpcHeaders.serializeAscii(), stream),
+        promise);
     when(stream.id()).thenReturn(3);
     // Reset the context mock to clear recording of sent headers frame.
     mockContext();
diff --git a/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyClientStreamTest.java b/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyClientStreamTest.java
index 9df1d69..1e7f767 100644
--- a/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyClientStreamTest.java
+++ b/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyClientStreamTest.java
@@ -4,22 +4,24 @@
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.verify;
 
+import com.google.net.stubby.Metadata;
 import com.google.net.stubby.Status;
 import com.google.net.stubby.newtransport.HttpUtil;
 import com.google.net.stubby.newtransport.StreamState;
 import com.google.net.stubby.transport.Transport;
 
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.http2.DefaultHttp2Headers;
+import io.netty.handler.codec.http2.Http2Headers;
+
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.mockito.ArgumentCaptor;
 
-import io.netty.buffer.Unpooled;
-import io.netty.handler.codec.http2.DefaultHttp2Headers;
-import io.netty.handler.codec.http2.Http2Headers;
-
 /**
  * Tests for {@link NettyClientStream}.
  */
@@ -63,34 +65,35 @@
   @Test
   public void setStatusWithOkShouldCloseStream() {
     stream().id(1);
-    stream().setStatus(Status.OK);
-    verify(listener).closed(Status.OK);
+    stream().setStatus(Status.OK, new Metadata.Trailers());
+    verify(listener).closed(same(Status.OK), any(Metadata.Trailers.class));
     assertEquals(StreamState.CLOSED, stream.state());
   }
 
   @Test
   public void setStatusWithErrorShouldCloseStream() {
     Status errorStatus = new Status(Transport.Code.INTERNAL);
-    stream().setStatus(errorStatus);
-    verify(listener).closed(eq(errorStatus));
+    stream().setStatus(errorStatus, new Metadata.Trailers());
+    verify(listener).closed(eq(errorStatus), any(Metadata.Trailers.class));
     assertEquals(StreamState.CLOSED, stream.state());
   }
 
   @Test
   public void setStatusWithOkShouldNotOverrideError() {
     Status errorStatus = new Status(Transport.Code.INTERNAL);
-    stream().setStatus(errorStatus);
-    stream().setStatus(Status.OK);
-    verify(listener).closed(any(Status.class));
+    stream().setStatus(errorStatus, new Metadata.Trailers());
+    stream().setStatus(Status.OK, new Metadata.Trailers());
+    verify(listener).closed(any(Status.class), any(Metadata.Trailers.class));
     assertEquals(StreamState.CLOSED, stream.state());
   }
 
   @Test
   public void setStatusWithErrorShouldNotOverridePreviousError() {
     Status errorStatus = new Status(Transport.Code.INTERNAL);
-    stream().setStatus(errorStatus);
-    stream().setStatus(Status.fromThrowable(new RuntimeException("fake")));
-    verify(listener).closed(any(Status.class));
+    stream().setStatus(errorStatus, new Metadata.Trailers());
+    stream().setStatus(Status.fromThrowable(new RuntimeException("fake")),
+        new Metadata.Trailers());
+    verify(listener).closed(any(Status.class), any(Metadata.Trailers.class));
     assertEquals(StreamState.CLOSED, stream.state());
   }
 
@@ -123,7 +126,7 @@
 
     stream.inboundDataReceived(statusFrame(new Status(Transport.Code.INTERNAL)), false);
     ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
-    verify(listener).closed(captor.capture());
+    verify(listener).closed(captor.capture(), any(Metadata.Trailers.class));
     assertEquals(Transport.Code.INTERNAL, captor.getValue().getCode());
     assertEquals(StreamState.CLOSED, stream.state());
   }
@@ -132,7 +135,7 @@
   public void nonGrpcResponseShouldSetStatus() throws Exception {
     stream.inboundDataReceived(Unpooled.copiedBuffer(MESSAGE, UTF_8), true);
     ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
-    verify(listener).closed(captor.capture());
+    verify(listener).closed(captor.capture(), any(Metadata.Trailers.class));
     assertEquals(MESSAGE, captor.getValue().getDescription());
   }
 
diff --git a/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyServerHandlerTest.java b/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyServerHandlerTest.java
index 1aa20dc..75ed68f 100644
--- a/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyServerHandlerTest.java
+++ b/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyServerHandlerTest.java
@@ -6,12 +6,15 @@
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.notNull;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
 import com.google.common.io.ByteStreams;
+import com.google.net.stubby.Metadata;
+import com.google.net.stubby.MethodDescriptor;
 import com.google.net.stubby.Status;
 import com.google.net.stubby.newtransport.Framer;
 import com.google.net.stubby.newtransport.HttpUtil;
@@ -28,9 +31,15 @@
 import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
 import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
 import io.netty.handler.codec.http2.DefaultHttp2Headers;
+import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController;
+import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController;
 import io.netty.handler.codec.http2.Http2CodecUtil;
+import io.netty.handler.codec.http2.Http2Connection;
 import io.netty.handler.codec.http2.Http2Error;
+import io.netty.handler.codec.http2.Http2FrameReader;
+import io.netty.handler.codec.http2.Http2FrameWriter;
 import io.netty.handler.codec.http2.Http2Headers;
+import io.netty.handler.codec.http2.Http2OutboundFlowController;
 import io.netty.handler.codec.http2.Http2Settings;
 
 import org.junit.Before;
@@ -41,13 +50,6 @@
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 
-import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController;
-import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController;
-import io.netty.handler.codec.http2.Http2Connection;
-import io.netty.handler.codec.http2.Http2FrameReader;
-import io.netty.handler.codec.http2.Http2FrameWriter;
-import io.netty.handler.codec.http2.Http2OutboundFlowController;
-
 import java.io.ByteArrayInputStream;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
@@ -73,7 +75,9 @@
   public void setup() throws Exception {
     MockitoAnnotations.initMocks(this);
 
-    when(transportListener.streamCreated(any(ServerStream.class), any(String.class)))
+    when(transportListener.streamCreated(any(ServerStream.class),
+        any(String.class),
+        any(Metadata.Headers.class)))
         .thenReturn(streamListener);
     handler = newHandler(transportListener);
     frameWriter = new DefaultHttp2FrameWriter();
@@ -132,7 +136,7 @@
     assertArrayEquals(CONTENT, ByteStreams.toByteArray(captor.getValue()));
 
     if (endStream) {
-      verify(streamListener).closed(eq(Status.OK));
+      verify(streamListener).closed(eq(Status.OK), notNull(Metadata.Trailers.class));
     }
     verifyNoMoreInteractions(streamListener);
   }
@@ -143,7 +147,7 @@
 
     handler.channelRead(ctx, emptyDataFrame(STREAM_ID, true));
     verify(streamListener, never()).messageRead(any(InputStream.class), anyInt());
-    verify(streamListener).closed(eq(Status.OK));
+    verify(streamListener).closed(eq(Status.OK), notNull(Metadata.Trailers.class));
     verifyNoMoreInteractions(streamListener);
   }
 
@@ -153,7 +157,7 @@
 
     handler.channelRead(ctx, rstStreamFrame(STREAM_ID, Http2Error.CANCEL.code()));
     verify(streamListener, never()).messageRead(any(InputStream.class), anyInt());
-    verify(streamListener).closed(eq(Status.CANCELLED));
+    verify(streamListener).closed(eq(Status.CANCELLED), notNull(Metadata.Trailers.class));
     verifyNoMoreInteractions(streamListener);
   }
 
@@ -169,7 +173,8 @@
         ArgumentCaptor.forClass(NettyServerStream.class);
     @SuppressWarnings("rawtypes")
     ArgumentCaptor<String> methodCaptor = ArgumentCaptor.forClass(String.class);
-    verify(transportListener).streamCreated(streamCaptor.capture(), methodCaptor.capture());
+    verify(transportListener).streamCreated(streamCaptor.capture(), methodCaptor.capture(),
+        any(Metadata.Headers.class));
     stream = streamCaptor.getValue();
   }
 
diff --git a/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyServerStreamTest.java b/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyServerStreamTest.java
index 980c54b..52871b0 100644
--- a/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyServerStreamTest.java
+++ b/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyServerStreamTest.java
@@ -2,11 +2,15 @@
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.notNull;
+import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.verifyZeroInteractions;
 
+import com.google.net.stubby.Metadata;
 import com.google.net.stubby.Status;
 import com.google.net.stubby.newtransport.StreamState;
 import com.google.net.stubby.transport.Transport;
@@ -40,7 +44,7 @@
   @Test
   public void closeBeforeClientHalfCloseShouldFail() {
     try {
-      stream().close(Status.OK);
+      stream().close(Status.OK, new Metadata.Trailers());
       fail("Should throw exception");
     } catch (IllegalStateException expected) {
     }
@@ -50,7 +54,7 @@
 
   @Test
   public void closeWithErrorBeforeClientHalfCloseShouldSucceed() throws Exception {
-    stream().close(Status.CANCELLED);
+    stream().close(Status.CANCELLED, new Metadata.Trailers());
     assertEquals(StreamState.CLOSED, stream.state());
     verify(channel).writeAndFlush(
         new SendGrpcFrameCommand(STREAM_ID, statusFrame(Status.CANCELLED), true));
@@ -62,9 +66,9 @@
     // Client half-closes. Listener gets closed()
     stream().remoteEndClosed();
     assertEquals(StreamState.WRITE_ONLY, stream.state());
-    verify(listener).closed(Status.OK);
+    verify(listener).closed(eq(Status.OK), notNull(Metadata.Trailers.class));
     // Server closes. Status sent.
-    stream().close(Status.OK);
+    stream().close(Status.OK, new Metadata.Trailers());
     assertEquals(StreamState.CLOSED, stream.state());
     verify(channel).writeAndFlush(
         new SendGrpcFrameCommand(STREAM_ID, statusFrame(Status.OK), true));
@@ -76,7 +80,7 @@
     // Client half-closes. Listener gets closed()
     stream().remoteEndClosed();
     assertEquals(StreamState.WRITE_ONLY, stream.state());
-    verify(listener).closed(Status.OK);
+    verify(listener).closed(eq(Status.OK), notNull(Metadata.Trailers.class));
     // Client half-closes again. Stream will be aborted with an error.
     stream().remoteEndClosed();
     assertEquals(StreamState.CLOSED, stream.state());
@@ -91,7 +95,7 @@
     Status status = new Status(Transport.Code.INTERNAL, new Throwable());
     stream().abortStream(status, true);
     assertEquals(StreamState.CLOSED, stream.state());
-    verify(listener).closed(status);
+    verify(listener).closed(same(status), notNull(Metadata.Trailers.class));
     verify(channel).writeAndFlush(new SendGrpcFrameCommand(STREAM_ID, statusFrame(status), true));
     verifyNoMoreInteractions(listener);
   }
@@ -101,7 +105,7 @@
     Status status = new Status(Transport.Code.INTERNAL, new Throwable());
     stream().abortStream(status, false);
     assertEquals(StreamState.CLOSED, stream.state());
-    verify(listener).closed(status);
+    verify(listener).closed(same(status), notNull(Metadata.Trailers.class));
     verify(channel, never()).writeAndFlush(
         new SendGrpcFrameCommand(STREAM_ID, statusFrame(status), true));
     verifyNoMoreInteractions(listener);
@@ -113,7 +117,7 @@
     // Client half-closes. Listener gets closed()
     stream().remoteEndClosed();
     assertEquals(StreamState.WRITE_ONLY, stream.state());
-    verify(listener).closed(Status.OK);
+    verify(listener).closed(same(Status.OK), notNull(Metadata.Trailers.class));
     // Abort
     stream().abortStream(status, true);
     assertEquals(StreamState.CLOSED, stream.state());
diff --git a/core/src/test/java/com/google/net/stubby/newtransport/okhttp/OkHttpClientTransportTest.java b/core/src/test/java/com/google/net/stubby/newtransport/okhttp/OkHttpClientTransportTest.java
index e0465ff..1407b07 100644
--- a/core/src/test/java/com/google/net/stubby/newtransport/okhttp/OkHttpClientTransportTest.java
+++ b/core/src/test/java/com/google/net/stubby/newtransport/okhttp/OkHttpClientTransportTest.java
@@ -12,6 +12,7 @@
 
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.Service;
+import com.google.net.stubby.Metadata;
 import com.google.net.stubby.MethodDescriptor;
 import com.google.net.stubby.Status;
 import com.google.net.stubby.newtransport.StreamListener;
@@ -104,8 +105,8 @@
   public void nextFrameThrowIOException() throws Exception {
     MockStreamListener listener1 = new MockStreamListener();
     MockStreamListener listener2 = new MockStreamListener();
-    clientTransport.newStream(method, listener1);
-    clientTransport.newStream(method, listener2);
+    clientTransport.newStream(method, new Metadata.Headers(), listener1);
+    clientTransport.newStream(method, new Metadata.Headers(), listener2);
     assertEquals(2, streams.size());
     assertTrue(streams.containsKey(3));
     assertTrue(streams.containsKey(5));
@@ -126,7 +127,7 @@
     final int numMessages = 10;
     final String message = "Hello Client";
     MockStreamListener listener = new MockStreamListener();
-    clientTransport.newStream(method, listener);
+    clientTransport.newStream(method, new Metadata.Headers(), listener);
     assertTrue(streams.containsKey(3));
     for (int i = 0; i < numMessages; i++) {
       BufferedSource source = mock(BufferedSource.class);
@@ -148,7 +149,7 @@
     final String key = "KEY";
     final String value = "value";
     MockStreamListener listener = new MockStreamListener();
-    clientTransport.newStream(method, listener);
+    clientTransport.newStream(method,new Metadata.Headers(), listener);
     assertTrue(streams.containsKey(3));
     for (int i = 0; i < numContexts; i++) {
       BufferedSource source = mock(BufferedSource.class);
@@ -169,7 +170,7 @@
   @Test
   public void readStatus() throws Exception {
     MockStreamListener listener = new MockStreamListener();
-    clientTransport.newStream(method, listener);
+    clientTransport.newStream(method,new Metadata.Headers(), listener);
     assertTrue(streams.containsKey(3));
     BufferedSource source = mock(BufferedSource.class);
     InputStream inputStream = createStatusFrame((short) Transport.Code.UNAVAILABLE.getNumber());
@@ -182,7 +183,7 @@
   @Test
   public void receiveReset() throws Exception {
     MockStreamListener listener = new MockStreamListener();
-    clientTransport.newStream(method, listener);
+    clientTransport.newStream(method,new Metadata.Headers(), listener);
     assertTrue(streams.containsKey(3));
     frameHandler.rstStream(3, ErrorCode.PROTOCOL_ERROR);
     listener.waitUntilStreamClosed();
@@ -192,7 +193,7 @@
   @Test
   public void cancelStream() throws Exception {
     MockStreamListener listener = new MockStreamListener();
-    clientTransport.newStream(method, listener);
+    clientTransport.newStream(method,new Metadata.Headers(), listener);
     OkHttpClientStream stream = streams.get(3);
     assertNotNull(stream);
     stream.cancel();
@@ -205,7 +206,7 @@
   public void writeMessage() throws Exception {
     final String message = "Hello Server";
     MockStreamListener listener = new MockStreamListener();
-    clientTransport.newStream(method, listener);
+    clientTransport.newStream(method,new Metadata.Headers(), listener);
     OkHttpClientStream stream = streams.get(3);
     InputStream input = new ByteArrayInputStream(message.getBytes(StandardCharsets.UTF_8));
     stream.writeMessage(input, input.available(), null);
@@ -222,7 +223,7 @@
     final String key = "KEY";
     final String value = "VALUE";
     MockStreamListener listener = new MockStreamListener();
-    clientTransport.newStream(method, listener);
+    clientTransport.newStream(method,new Metadata.Headers(), listener);
     OkHttpClientStream stream = streams.get(3);
     InputStream input = new ByteArrayInputStream(value.getBytes(StandardCharsets.UTF_8));
     stream.writeContext(key, input, input.available(), null);
@@ -240,8 +241,8 @@
   public void windowUpdate() throws Exception {
     MockStreamListener listener1 = new MockStreamListener();
     MockStreamListener listener2 = new MockStreamListener();
-    clientTransport.newStream(method, listener1);
-    clientTransport.newStream(method, listener2);
+    clientTransport.newStream(method,new Metadata.Headers(), listener1);
+    clientTransport.newStream(method,new Metadata.Headers(), listener2);
     assertEquals(2, streams.size());
     OkHttpClientStream stream1 = streams.get(3);
     OkHttpClientStream stream2 = streams.get(5);
@@ -300,8 +301,8 @@
   public void stopNormally() throws Exception {
     MockStreamListener listener1 = new MockStreamListener();
     MockStreamListener listener2 = new MockStreamListener();
-    clientTransport.newStream(method, listener1);
-    clientTransport.newStream(method, listener2);
+    clientTransport.newStream(method,new Metadata.Headers(), listener1);
+    clientTransport.newStream(method,new Metadata.Headers(), listener2);
     assertEquals(2, streams.size());
     clientTransport.stopAsync();
     listener1.waitUntilStreamClosed();
@@ -318,8 +319,8 @@
     // start 2 streams.
     MockStreamListener listener1 = new MockStreamListener();
     MockStreamListener listener2 = new MockStreamListener();
-    clientTransport.newStream(method, listener1);
-    clientTransport.newStream(method, listener2);
+    clientTransport.newStream(method,new Metadata.Headers(), listener1);
+    clientTransport.newStream(method,new Metadata.Headers(), listener2);
     assertEquals(2, streams.size());
 
     // Receive goAway, max good id is 3.
@@ -336,7 +337,7 @@
     // New stream should be failed.
     MockStreamListener listener3 = new MockStreamListener();
     try {
-      clientTransport.newStream(method, listener3);
+      clientTransport.newStream(method,new Metadata.Headers(), listener3);
       fail("new stream should no be accepted by a go-away transport.");
     } catch (IllegalStateException ex) {
       // expected.
@@ -380,10 +381,10 @@
     streams = transport.getStreams();
 
     MockStreamListener listener1 = new MockStreamListener();
-    transport.newStream(method, listener1);
+    transport.newStream(method,new Metadata.Headers(), listener1);
 
     try {
-      transport.newStream(method, new MockStreamListener());
+      transport.newStream(method, new Metadata.Headers(), new MockStreamListener());
       fail("new stream should not be accepted by a go-away transport.");
     } catch (IllegalStateException ex) {
       // expected.
@@ -526,6 +527,11 @@
     }
 
     @Override
+    public ListenableFuture<Void> headersRead(Metadata.Headers headers) {
+      return null;
+    }
+
+    @Override
     public ListenableFuture<Void> messageRead(InputStream message, int length) {
       String msg = getContent(message);
       if (msg != null) {
@@ -535,7 +541,7 @@
     }
 
     @Override
-    public void closed(Status status) {
+    public void closed(Status status, Metadata.Trailers trailers) {
       this.status = status;
       closed.countDown();
     }
diff --git a/stub/src/main/java/com/google/net/stubby/stub/AbstractStub.java b/stub/src/main/java/com/google/net/stubby/stub/AbstractStub.java
index f073263..df97c0c 100644
--- a/stub/src/main/java/com/google/net/stubby/stub/AbstractStub.java
+++ b/stub/src/main/java/com/google/net/stubby/stub/AbstractStub.java
@@ -7,8 +7,6 @@
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
-import javax.inject.Provider;
-
 /**
  * Common base type for stub implementations. Allows for reconfiguration.
  */
@@ -61,16 +59,6 @@
     }
 
     /**
-     * Set a header provider for all methods in the stub.
-     */
-    public StubConfigBuilder setHeader(String headerName, Provider<String> headerValueProvider) {
-      for (Map.Entry<String, MethodDescriptor> entry : methodMap.entrySet()) {
-        entry.setValue(entry.getValue().withHeader(headerName, headerValueProvider));
-      }
-      return this;
-    }
-
-    /**
      * Set a timeout for all methods in the stub.
      */
     public StubConfigBuilder setTimeout(long timeout, TimeUnit unit) {
diff --git a/stub/src/main/java/com/google/net/stubby/stub/Calls.java b/stub/src/main/java/com/google/net/stubby/stub/Calls.java
index bd6849e..bba5c57 100644
--- a/stub/src/main/java/com/google/net/stubby/stub/Calls.java
+++ b/stub/src/main/java/com/google/net/stubby/stub/Calls.java
@@ -7,6 +7,7 @@
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.common.util.concurrent.UncheckedExecutionException;
 import com.google.net.stubby.Call;
+import com.google.net.stubby.Metadata;
 import com.google.net.stubby.Status;
 import com.google.net.stubby.transport.Transport;
 
@@ -121,7 +122,7 @@
       Call<ReqT, RespT> call,
       ReqT param,
       Call.Listener<RespT> responseListener) {
-    call.start(responseListener);
+    call.start(responseListener, new Metadata.Headers());
     try {
       call.sendPayload(param);
       call.halfClose();
@@ -139,7 +140,7 @@
       Call<ReqT, RespT> call,
       Iterator<ReqT> clientStream) {
     SettableFuture<RespT> responseFuture = SettableFuture.create();
-    call.start(new UnaryStreamToFuture<RespT>(responseFuture));
+    call.start(new UnaryStreamToFuture<RespT>(responseFuture), new Metadata.Headers());
     try {
       while (clientStream.hasNext()) {
         call.sendPayload(clientStream.next());
@@ -173,7 +174,8 @@
    */
   public static <ReqT, RespT> StreamObserver<ReqT> duplexStreamingCall(
       Call<ReqT, RespT> call, StreamObserver<RespT> responseObserver) {
-    call.start(new StreamObserverToCallListenerAdapter<RespT>(responseObserver));
+    call.start(new StreamObserverToCallListenerAdapter<RespT>(responseObserver),
+        new Metadata.Headers());
     return new CallToStreamObserverAdapter<ReqT>(call);
   }
 
@@ -209,6 +211,11 @@
     }
 
     @Override
+    public ListenableFuture<Void> onHeaders(Metadata.Headers headers) {
+      return null;
+    }
+
+    @Override
     public ListenableFuture<Void> onContext(String name, InputStream value) {
       // StreamObservers don't receive contexts.
       return null;
@@ -221,7 +228,7 @@
     }
 
     @Override
-    public void onClose(Status status) {
+    public void onClose(Status status, Metadata.Trailers trailers) {
       if (status.isOk()) {
         observer.onCompleted();
       } else {
@@ -242,6 +249,11 @@
     }
 
     @Override
+    public ListenableFuture<Void> onHeaders(Metadata.Headers headers) {
+      return null;
+    }
+
+    @Override
     public ListenableFuture<Void> onContext(String name, InputStream value) {
       // Don't care about contexts.
       return null;
@@ -258,7 +270,7 @@
     }
 
     @Override
-    public void onClose(Status status) {
+    public void onClose(Status status, Metadata.Trailers trailers) {
       if (status.isOk()) {
         if (value == null) {
           // No value received so mark the future as an error
@@ -333,6 +345,11 @@
       }
 
       @Override
+      public ListenableFuture<Void> onHeaders(Metadata.Headers headers) {
+        return null;
+      }
+
+      @Override
       public ListenableFuture<Void> onPayload(T value) {
         Preconditions.checkState(!done, "Call already closed");
         SettableFuture<Void> future = SettableFuture.create();
@@ -341,7 +358,7 @@
       }
 
       @Override
-      public void onClose(Status status) {
+      public void onClose(Status status, Metadata.Trailers trailers) {
         Preconditions.checkState(!done, "Call already closed");
         if (status.isOk()) {
           buffer.add(this);
diff --git a/stub/src/main/java/com/google/net/stubby/stub/HeadersInterceptor.java b/stub/src/main/java/com/google/net/stubby/stub/HeadersInterceptor.java
new file mode 100644
index 0000000..6a02c51
--- /dev/null
+++ b/stub/src/main/java/com/google/net/stubby/stub/HeadersInterceptor.java
@@ -0,0 +1,37 @@
+package com.google.net.stubby.stub;
+
+import com.google.net.stubby.Call;
+import com.google.net.stubby.Metadata;
+import com.google.net.stubby.MethodDescriptor;
+import com.google.net.stubby.context.ForwardingChannel;
+
+/**
+ * Utility functions for binding and receiving headers
+ */
+public class HeadersInterceptor {
+
+  /**
+   * Attach a set of request headers to a stub.
+   * @param stub to bind the headers to.
+   * @param extraHeaders the headers to be passed by each call on the returned stub.
+   * @return an implementation of the stub with extraHeaders bound to each call.
+   */
+  @SuppressWarnings("unchecked")
+  public static <T extends AbstractStub> T intercept(
+      T stub,
+      final Metadata.Headers extraHeaders) {
+    return (T) stub.configureNewStub().setChannel(
+      new ForwardingChannel(stub.getChannel()) {
+        @Override
+        public <ReqT, RespT> Call<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method) {
+          return new ForwardingCall<ReqT, RespT>(delegate.newCall(method)) {
+            @Override
+            public void start(Listener<RespT> responseListener, Metadata.Headers headers) {
+              headers.merge(extraHeaders);
+              delegate.start(responseListener, headers);
+            }
+          };
+        }
+      }).build();
+  }
+}