Encode binary headers with Base64 on the wire, and requires all binary headers
have "-bin" suffix in their names.

Split Metadata.Marshaller into BinaryMarshaller and AsciiMarshaller.
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=81306135
diff --git a/core/src/main/java/com/google/net/stubby/Metadata.java b/core/src/main/java/com/google/net/stubby/Metadata.java
index 5c69c19..4f32e18 100644
--- a/core/src/main/java/com/google/net/stubby/Metadata.java
+++ b/core/src/main/java/com/google/net/stubby/Metadata.java
@@ -1,12 +1,10 @@
 package com.google.net.stubby;
 
 import static com.google.common.base.Charsets.US_ASCII;
-import static com.google.common.base.Charsets.UTF_8;
 
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
 import com.google.common.collect.LinkedListMultimap;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
@@ -29,6 +27,11 @@
 public abstract class Metadata {
 
   /**
+   * All binary headers should have this suffix in their names. Vice versa.
+   */
+  public static final String BINARY_HEADER_SUFFIX = "-bin";
+
+  /**
    * Interleave keys and values into a single iterator.
    */
   private static Iterator<String> fromMapEntries(Iterable<Map.Entry<String, String>> entries) {
@@ -60,65 +63,38 @@
   }
 
   /**
-   * Simple metadata marshaller that encodes strings as either UTF-8 or ASCII bytes.
+   * Simple metadata marshaller that encodes strings as is.
+   *
+   * <p>This should be used with ASCII strings that only contain printable characters and space.
+   * Otherwise the output may be considered invalid and discarded by the transport.
    */
-  public static final Marshaller<String> STRING_MARSHALLER =
-      new Marshaller<String>() {
+  public static final AsciiMarshaller<String> ASCII_STRING_MARSHALLER =
+      new AsciiMarshaller<String>() {
 
     @Override
-    public byte[] toBytes(String value) {
-      return value.getBytes(UTF_8);
-    }
-
-    @Override
-    public String toAscii(String value) {
+    public String toAsciiString(String value) {
       return value;
     }
 
     @Override
-    public String parseBytes(byte[] serialized) {
-      return new String(serialized, UTF_8);
-    }
-
-    @Override
-    public String parseAscii(String ascii) {
-      return ascii;
+    public String parseAsciiString(String serialized) {
+      return serialized;
     }
   };
 
   /**
-   * Simple metadata marshaller that encodes an integer as a signed decimal string or as big endian
-   * binary with four bytes.
+   * Simple metadata marshaller that encodes an integer as a signed decimal string.
    */
-  public static final Marshaller<Integer> INTEGER_MARSHALLER = new Marshaller<Integer>() {
-    @Override
-    public byte[] toBytes(Integer value) {
-      return new byte[] {
-        (byte) (value >>> 24),
-        (byte) (value >>> 16),
-        (byte) (value >>> 8),
-        (byte) (value >>> 0)};
-    }
+  public static final AsciiMarshaller<Integer> INTEGER_MARSHALLER = new AsciiMarshaller<Integer>() {
 
     @Override
-    public String toAscii(Integer value) {
+    public String toAsciiString(Integer value) {
       return value.toString();
     }
 
     @Override
-    public Integer parseBytes(byte[] serialized) {
-      if (serialized.length != 4) {
-        throw new IllegalArgumentException("Can only deserialize 4 bytes into an integer");
-      }
-      return (serialized[0] << 24)
-          |  (serialized[1] << 16)
-          |  (serialized[2] << 8)
-          |   serialized[3];
-    }
-
-    @Override
-    public Integer parseAscii(String ascii) {
-      return Integer.valueOf(ascii);
+    public Integer parseAsciiString(String serialized) {
+      return Integer.parseInt(serialized);
     }
   };
 
@@ -139,17 +115,6 @@
   }
 
   /**
-   * Constructor called by the transport layer when it receives ASCII metadata.
-   */
-  private Metadata(String... asciiValues) {
-    store = LinkedListMultimap.create();
-    for (int i = 0; i < asciiValues.length; i++) {
-      store.put(asciiValues[i], new MetadataEntry(asciiValues[++i]));
-    }
-    this.serializable = false;
-  }
-
-  /**
    * Constructor called by the application layer when it wants to send metadata.
    */
   private Metadata() {
@@ -227,7 +192,13 @@
   }
 
   /**
-   * Serialize all the metadata entries
+   * Serialize all the metadata entries.
+   *
+   * <p>It produces serialized names and values interleaved. result[i*2] are names, while
+   * result[i*2+1] are values.
+   *
+   * <p>Names are ASCII string bytes. If the name ends with "-bin", the value can be raw binary.
+   * Otherwise, the value must be printable ASCII characters or space.
    */
   public byte[][] serialize() {
     Preconditions.checkState(serializable, "Can't serialize raw metadata");
@@ -241,20 +212,6 @@
   }
 
   /**
-   * Serialize all the metadata entries
-   */
-  public String[] serializeAscii() {
-    Preconditions.checkState(serializable, "Can't serialize received metadata");
-    String[] serialized = new String[store.size() * 2];
-    int i = 0;
-    for (Map.Entry<String, MetadataEntry> entry : store.entries()) {
-      serialized[i++] = entry.getValue().key.name();
-      serialized[i++] = entry.getValue().getSerializedAscii();
-    }
-    return serialized;
-  }
-
-  /**
    * Perform a simple merge of two sets of metadata.
    * <p>
    * Note that we can't merge non-serializable metadata into serializable.
@@ -302,20 +259,6 @@
     }
 
     /**
-     * Called by the transport layer to create headers from their ASCII serialized values.
-     */
-    public Headers(String... asciiValues) {
-      super(asciiValues);
-    }
-
-    /**
-     * Called by the transport layer to create headers from their ASCII serialized values.
-     */
-    public Headers(Iterable<Map.Entry<String, String>> mapEntries) {
-      super(Iterators.toArray(fromMapEntries(mapEntries), String.class));
-    }
-
-    /**
      * Called by the application layer to construct headers prior to passing them to the
      * transport for serialization.
      */
@@ -378,20 +321,6 @@
     }
 
     /**
-     * Called by the transport layer to create trailers from their ASCII serialized values.
-     */
-    public Trailers(String... asciiValues) {
-      super(asciiValues);
-    }
-
-    /**
-     * Called by the transport layer to create headers from their ASCII serialized values.
-     */
-    public Trailers(Iterable<Map.Entry<String, String>> mapEntries) {
-      super(Iterators.toArray(fromMapEntries(mapEntries), String.class));
-    }
-
-    /**
      * Called by the application layer to construct trailers prior to passing them to the
      * transport for serialization.
      */
@@ -401,57 +330,75 @@
 
 
   /**
-   * Marshaller for metadata values.
+   * Marshaller for metadata values that are serialized into raw binary.
    */
-  public static interface Marshaller<T> {
+  public static interface BinaryMarshaller<T> {
     /**
      * Serialize a metadata value to bytes.
      * @param value to serialize
-     * @return serialized version of value, or null if value cannot be transmitted.
+     * @return serialized version of value
      */
     public byte[] toBytes(T value);
 
     /**
-     * Serialize a metadata value to an ASCII string
-     * @param value to serialize
-     * @return serialized ascii version of value, or null if value cannot be transmitted.
-     */
-    public String toAscii(T value);
-
-    /**
      * Parse a serialized metadata value from bytes.
      * @param serialized value of metadata to parse
      * @return a parsed instance of type T
      */
     public T parseBytes(byte[] serialized);
+  }
+
+  /**
+   * Marshaller for metadata values that are serialized into ASCII strings that contain only
+   * printable characters and space.
+   */
+  public static interface AsciiMarshaller<T> {
+    /**
+     * Serialize a metadata value to a ASCII string that contains only printable characters and
+     * space.
+     *
+     * @param value to serialize
+     * @return serialized version of value, or null if value cannot be transmitted.
+     */
+    public String toAsciiString(T value);
 
     /**
-     * Parse a serialized metadata value from an ascii string.
-     * @param ascii string value of metadata to parse
+     * Parse a serialized metadata value from an ASCII string.
+     * @param serialized value of metadata to parse
      * @return a parsed instance of type T
      */
-    public T parseAscii(String ascii);
+    public T parseAsciiString(String serialized);
   }
 
   /**
    * Key for metadata entries. Allows for parsing and serialization of metadata.
    */
-  public static class Key<T> {
-    public static <T> Key<T> of(String name, Marshaller<T> marshaller) {
-      return new Key<T>(name, marshaller);
+  public abstract static class Key<T> {
+
+    /**
+     * Creates a key for a binary header.
+     *
+     * @param name must end with {@link BINARY_HEADER_SUFFIX}
+     */
+    public static <T> Key<T> of(String name, BinaryMarshaller<T> marshaller) {
+      return new BinaryKey<T>(name, marshaller);
+    }
+
+    /**
+     * Creates a key for a ASCII header.
+     *
+     * @param name must not end with {@link BINARY_HEADER_SUFFIX}
+     */
+    public static <T> Key<T> of(String name, AsciiMarshaller<T> marshaller) {
+      return new AsciiKey<T>(name, marshaller);
     }
 
     private final String name;
     private final byte[] asciiName;
-    private final Marshaller<T> marshaller;
 
-    /**
-     * Keys have a name and a marshaller used for serialization.
-     */
-    private Key(String name, Marshaller<T> marshaller) {
+    private Key(String name) {
       this.name = Preconditions.checkNotNull(name, "name").toLowerCase().intern();
       this.asciiName = this.name.getBytes(US_ASCII);
-      this.marshaller = Preconditions.checkNotNull(marshaller);
     }
 
     public String name() {
@@ -463,10 +410,6 @@
       return asciiName;
     }
 
-    public Marshaller<T> getMarshaller() {
-      return marshaller;
-    }
-
     @Override
     public boolean equals(Object o) {
       if (this == o) return true;
@@ -484,6 +427,68 @@
     public String toString() {
       return "Key{name='" + name + "'}";
     }
+
+    /**
+     * Serialize a metadata value to bytes.
+     * @param value to serialize
+     * @return serialized version of value
+     */
+    abstract byte[] toBytes(T value);
+
+    /**
+     * Parse a serialized metadata value from bytes.
+     * @param serialized value of metadata to parse
+     * @return a parsed instance of type T
+     */
+    abstract T parseBytes(byte[] serialized);
+  }
+
+  private static class BinaryKey<T> extends Key<T> {
+    private final BinaryMarshaller<T> marshaller;
+
+    /**
+     * Keys have a name and a binary marshaller used for serialization.
+     */
+    private BinaryKey(String name, BinaryMarshaller<T> marshaller) {
+      super(name);
+      Preconditions.checkArgument(name.endsWith(BINARY_HEADER_SUFFIX),
+          "Binary header is named " + name + ". It must end with " + BINARY_HEADER_SUFFIX);
+      this.marshaller = Preconditions.checkNotNull(marshaller);
+    }
+
+    @Override
+    byte[] toBytes(T value) {
+      return marshaller.toBytes(value);
+    }
+
+    @Override
+    T parseBytes(byte[] serialized) {
+      return marshaller.parseBytes(serialized);
+    }
+  }
+
+  private static class AsciiKey<T> extends Key<T> {
+    private final AsciiMarshaller<T> marshaller;
+
+    /**
+     * Keys have a name and an ASCII marshaller used for serialization.
+     */
+    private AsciiKey(String name, AsciiMarshaller<T> marshaller) {
+      super(name);
+      Preconditions.checkArgument(!name.endsWith(BINARY_HEADER_SUFFIX),
+          "ASCII header is named " + name + ". It must not end with " + BINARY_HEADER_SUFFIX);
+      this.marshaller = Preconditions.checkNotNull(marshaller);
+    }
+
+    @Override
+    byte[] toBytes(T value) {
+      return marshaller.toAsciiString(value).getBytes(US_ASCII);
+    }
+
+    @Override
+    T parseBytes(byte[] serialized) {
+      return marshaller.parseAsciiString(new String(serialized, US_ASCII));
+    }
   }
 
   private static class MetadataEntry {
@@ -492,7 +497,6 @@
     @SuppressWarnings("rawtypes")
     Key key;
     byte[] serializedBinary;
-    String serializedAscii;
 
     /**
      * Constructor used when application layer adds a parsed value.
@@ -510,29 +514,20 @@
       this.serializedBinary = serialized;
     }
 
-    /**
-     * Constructor used when reading a value from the transport.
-     */
-    private MetadataEntry(String serializedAscii) {
-      this.serializedAscii = Preconditions.checkNotNull(serializedAscii);
-    }
-
     @SuppressWarnings("unchecked")
     public <T> T getParsed(Key<T> key) {
       T value = (T) parsed;
       if (value != null) {
         if (this.key != key) {
           // Keys don't match so serialize using the old key
-          serializedBinary = this.key.getMarshaller().toBytes(value);
+          serializedBinary = this.key.toBytes(value);
         } else {
           return value;
         }
       }
       this.key = key;
       if (serializedBinary != null) {
-        value = key.getMarshaller().parseBytes(serializedBinary);
-      } else if (serializedAscii != null) {
-        value = key.getMarshaller().parseAscii(serializedAscii);
+        value = key.parseBytes(serializedBinary);
       }
       parsed = value;
       return value;
@@ -542,16 +537,7 @@
     public byte[] getSerialized() {
       return serializedBinary =
           serializedBinary == null
-              ? key.getMarshaller().toBytes(parsed) :
-              serializedBinary;
-    }
-
-    @SuppressWarnings("unchecked")
-    public String getSerializedAscii() {
-      return serializedAscii =
-          serializedAscii == null
-              ? key.getMarshaller().toAscii(parsed) :
-              serializedAscii;
+              ? key.toBytes(parsed) : serializedBinary;
     }
   }
 }
diff --git a/core/src/main/java/com/google/net/stubby/Status.java b/core/src/main/java/com/google/net/stubby/Status.java
index fe5318e..4209938 100644
--- a/core/src/main/java/com/google/net/stubby/Status.java
+++ b/core/src/main/java/com/google/net/stubby/Status.java
@@ -1,7 +1,5 @@
 package com.google.net.stubby;
 
-import static com.google.common.base.Charsets.US_ASCII;
-
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
@@ -210,7 +208,7 @@
    * Key to bind status message to trailers.
    */
   public static final Metadata.Key<String> MESSAGE_KEY
-      = Metadata.Key.of("grpc-message", Metadata.STRING_MARSHALLER);
+      = Metadata.Key.of("grpc-message", Metadata.ASCII_STRING_MARSHALLER);
 
   /**
    * Extract an error {@link Status} from the causal chain of a {@link Throwable}.
@@ -354,25 +352,15 @@
         .toString();
   }
 
-  private static class StatusCodeMarshaller implements Metadata.Marshaller<Status> {
+  private static class StatusCodeMarshaller implements Metadata.AsciiMarshaller<Status> {
     @Override
-    public byte[] toBytes(Status status) {
-      return toAscii(status).getBytes(US_ASCII);
-    }
-
-    @Override
-    public String toAscii(Status status) {
+    public String toAsciiString(Status status) {
       return status.getCode().valueAscii();
     }
 
     @Override
-    public Status parseBytes(byte[] serialized) {
-      return parseAscii(new String(serialized, US_ASCII));
-    }
-
-    @Override
-    public Status parseAscii(String ascii) {
-      return fromCodeValue(Integer.valueOf(ascii));
+    public Status parseAsciiString(String serialized) {
+      return fromCodeValue(Integer.valueOf(serialized));
     }
   }
 }
diff --git a/core/src/main/java/com/google/net/stubby/proto/ProtoUtils.java b/core/src/main/java/com/google/net/stubby/proto/ProtoUtils.java
index ae19766..892c0f4 100644
--- a/core/src/main/java/com/google/net/stubby/proto/ProtoUtils.java
+++ b/core/src/main/java/com/google/net/stubby/proto/ProtoUtils.java
@@ -1,6 +1,5 @@
 package com.google.net.stubby.proto;
 
-import com.google.common.io.BaseEncoding;
 import com.google.net.stubby.Marshaller;
 import com.google.net.stubby.Metadata;
 import com.google.net.stubby.Status;
@@ -39,19 +38,15 @@
    * Produce a metadata key for a generated protobuf type.
    */
   public static <T extends GeneratedMessage> Metadata.Key<T> keyForProto(final T instance) {
-    return Metadata.Key.of(instance.getDescriptorForType().getFullName(),
-        new  Metadata.Marshaller<T>() {
+    return Metadata.Key.of(
+        instance.getDescriptorForType().getFullName() + Metadata.BINARY_HEADER_SUFFIX,
+        new Metadata.BinaryMarshaller<T>() {
           @Override
           public byte[] toBytes(T value) {
             return value.toByteArray();
           }
 
           @Override
-          public String toAscii(T value) {
-            return BaseEncoding.base64().encode(value.toByteArray());
-          }
-
-          @Override
           @SuppressWarnings("unchecked")
           public T parseBytes(byte[] serialized) {
             try {
@@ -60,11 +55,6 @@
               throw new IllegalArgumentException(ipbe);
             }
           }
-
-          @Override
-          public T parseAscii(String ascii) {
-            return parseBytes(BaseEncoding.base64().decode(ascii));
-          }
         });
   }
 
diff --git a/core/src/main/java/com/google/net/stubby/transport/Http2ClientStream.java b/core/src/main/java/com/google/net/stubby/transport/Http2ClientStream.java
index f4b9c1a..38bf876 100644
--- a/core/src/main/java/com/google/net/stubby/transport/Http2ClientStream.java
+++ b/core/src/main/java/com/google/net/stubby/transport/Http2ClientStream.java
@@ -19,26 +19,16 @@
   /**
    * Metadata marshaller for HTTP status lines.
    */
-  private static final Metadata.Marshaller<Integer> HTTP_STATUS_LINE_MARSHALLER =
-      new Metadata.Marshaller<Integer>() {
+  private static final Metadata.AsciiMarshaller<Integer> HTTP_STATUS_LINE_MARSHALLER =
+      new Metadata.AsciiMarshaller<Integer>() {
         @Override
-        public byte[] toBytes(Integer value) {
-          return value.toString().getBytes(Charsets.US_ASCII);
-        }
-
-        @Override
-        public String toAscii(Integer value) {
+        public String toAsciiString(Integer value) {
           return value.toString();
         }
 
         @Override
-        public Integer parseBytes(byte[] serialized) {
-          return parseAscii(new String(serialized, Charsets.US_ASCII));
-        }
-
-        @Override
-        public Integer parseAscii(String ascii) {
-          return Integer.parseInt(ascii.split(" ", 2)[0]);
+        public Integer parseAsciiString(String serialized) {
+          return Integer.parseInt(serialized.split(" ", 2)[0]);
         }
       };
 
diff --git a/core/src/main/java/com/google/net/stubby/transport/HttpUtil.java b/core/src/main/java/com/google/net/stubby/transport/HttpUtil.java
index cced608..49dc6fc 100644
--- a/core/src/main/java/com/google/net/stubby/transport/HttpUtil.java
+++ b/core/src/main/java/com/google/net/stubby/transport/HttpUtil.java
@@ -14,7 +14,7 @@
    * spec.
    */
   public static final Metadata.Key<String> CONTENT_TYPE =
-      Metadata.Key.of("content-type", Metadata.STRING_MARSHALLER);
+      Metadata.Key.of("content-type", Metadata.ASCII_STRING_MARSHALLER);
 
   /**
    * Content-Type used for GRPC-over-HTTP/2.
@@ -29,7 +29,8 @@
   /**
    * The TE header name. Defined here since it is not explicitly defined by the HTTP/2 spec.
    */
-  public static final Metadata.Key<String> TE = Metadata.Key.of("te", Metadata.STRING_MARSHALLER);
+  public static final Metadata.Key<String> TE = Metadata.Key.of("te",
+      Metadata.ASCII_STRING_MARSHALLER);
 
   /**
    * The TE (transport encoding) header for requests over HTTP/2
diff --git a/core/src/main/java/com/google/net/stubby/transport/TransportFrameUtil.java b/core/src/main/java/com/google/net/stubby/transport/TransportFrameUtil.java
index 021bee8..a11f6bf 100644
--- a/core/src/main/java/com/google/net/stubby/transport/TransportFrameUtil.java
+++ b/core/src/main/java/com/google/net/stubby/transport/TransportFrameUtil.java
@@ -1,5 +1,14 @@
 package com.google.net.stubby.transport;
 
+import static com.google.common.base.Charsets.US_ASCII;
+
+import com.google.common.io.BaseEncoding;
+import com.google.net.stubby.Metadata;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.logging.Logger;
+
 import javax.annotation.Nullable;
 
 /**
@@ -10,6 +19,12 @@
  */
 public final class TransportFrameUtil {
 
+  private static final Logger logger = Logger.getLogger(TransportFrameUtil.class.getName());
+
+  private static final byte[] binaryHeaderSuffixBytes =
+      Metadata.BINARY_HEADER_SUFFIX.getBytes(US_ASCII);
+
+
   // Compression modes (lowest order 3 bits of frame flags)
   public static final byte NO_COMPRESS_FLAG = 0x0;
   public static final byte FLATE_FLAG = 0x1;
@@ -57,5 +72,88 @@
     return path;
   }
 
+  /**
+   * Transform the given headers to a format where only spec-compliant ASCII characters are allowed.
+   * Binary header values are encoded by Base64 in the result.
+   *
+   * @return the interleaved keys and values.
+   */
+  public static byte[][] toHttp2Headers(Metadata headers) {
+    byte[][] serializedHeaders = headers.serialize();
+    ArrayList<byte[]> result = new ArrayList<byte[]>();
+    for (int i = 0; i < serializedHeaders.length; i += 2) {
+      byte[] key = serializedHeaders[i];
+      byte[] value = serializedHeaders[i + 1];
+      if (endsWith(key, binaryHeaderSuffixBytes)) {
+        // Binary header.
+        result.add(key);
+        result.add(BaseEncoding.base64().encode(value).getBytes(US_ASCII));
+      } else {
+        // Non-binary header.
+        // Filter out headers that contain non-spec-compliant ASCII characters.
+        // TODO(user): only do such check in development mode since it's expensive
+        if (isSpecCompliantAscii(value)) {
+          result.add(key);
+          result.add(value);
+        } else {
+          String keyString = new String(key, US_ASCII);
+          logger.warning("Metadata key=" + keyString + ", value=" + Arrays.toString(value)
+              + " contains invalid ASCII characters");
+        }
+      }
+    }
+    return result.toArray(new byte[result.size()][]);
+  }
+
+  /**
+   * Transform HTTP/2-compliant headers to the raw serialized format which can be deserialized by
+   * metadata marshallers. It decodes the Base64-encoded binary headers.
+   */
+  public static byte[][] toRawSerializedHeaders(byte[][] http2Headers) {
+    byte[][] result = new byte[http2Headers.length][];
+    for (int i = 0; i < http2Headers.length; i += 2) {
+      byte[] key = http2Headers[i];
+      byte[] value = http2Headers[i + 1];
+      result[i] = key;
+      if (endsWith(key, binaryHeaderSuffixBytes)) {
+        // Binary header
+        result[i + 1] = BaseEncoding.base64().decode(new String(value, US_ASCII));
+      } else {
+        // Non-binary header
+        result[i + 1] = value;
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Returns true if <b>subject</b> ends with <b>suffix</b>.
+   */
+  private static boolean endsWith(byte[] subject, byte[] suffix) {
+    int start = subject.length - suffix.length;
+    if (start < 0) {
+      return false;
+    }
+    for (int i = start; i < subject.length; i++) {
+      if (subject[i] != suffix[i - start]) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Returns true if <b>subject</b> contains only bytes that are spec-compliant ASCII characters and
+   * space.
+   */
+  private static boolean isSpecCompliantAscii(byte[] subject) {
+    for (byte b : subject) {
+      if (b < 32 || b > 126) {
+        return false;
+      }
+    }
+    return true;
+  }
+
   private TransportFrameUtil() {}
 }
diff --git a/core/src/test/java/com/google/net/stubby/ClientInterceptorsTest.java b/core/src/test/java/com/google/net/stubby/ClientInterceptorsTest.java
index 990f365..63f3442 100644
--- a/core/src/test/java/com/google/net/stubby/ClientInterceptorsTest.java
+++ b/core/src/test/java/com/google/net/stubby/ClientInterceptorsTest.java
@@ -128,7 +128,7 @@
 
   @Test
   public void addOutboundHeaders() {
-    final Metadata.Key<String> credKey = Metadata.Key.of("Cred", Metadata.STRING_MARSHALLER);
+    final Metadata.Key<String> credKey = Metadata.Key.of("Cred", Metadata.ASCII_STRING_MARSHALLER);
     ClientInterceptor interceptor = new ClientInterceptor() {
       @Override
       public <ReqT, RespT> Call<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
diff --git a/core/src/test/java/com/google/net/stubby/MetadataTest.java b/core/src/test/java/com/google/net/stubby/MetadataTest.java
index 158f03c..566cd26 100644
--- a/core/src/test/java/com/google/net/stubby/MetadataTest.java
+++ b/core/src/test/java/com/google/net/stubby/MetadataTest.java
@@ -7,8 +7,6 @@
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
-import com.google.common.primitives.Bytes;
-
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -22,31 +20,22 @@
 @RunWith(JUnit4.class)
 public class MetadataTest {
 
-  private static final Metadata.Marshaller<Fish> FISH_MARSHALLER = new Metadata.Marshaller<Fish>() {
+  private static final Metadata.BinaryMarshaller<Fish> FISH_MARSHALLER =
+      new Metadata.BinaryMarshaller<Fish>() {
     @Override
     public byte[] toBytes(Fish fish) {
       return fish.name.getBytes(UTF_8);
     }
 
     @Override
-    public String toAscii(Fish value) {
-      return value.name;
-    }
-
-    @Override
     public Fish parseBytes(byte[] serialized) {
       return new Fish(new String(serialized, UTF_8));
     }
-
-    @Override
-    public Fish parseAscii(String ascii) {
-      return new Fish(ascii);
-    }
   };
 
   private static final String LANCE = "lance";
   private static final byte[] LANCE_BYTES = LANCE.getBytes(StandardCharsets.US_ASCII);
-  private static final Metadata.Key<Fish> KEY = Metadata.Key.of("test", FISH_MARSHALLER);
+  private static final Metadata.Key<Fish> KEY = Metadata.Key.of("test-bin", FISH_MARSHALLER);
 
   @Test
   public void testWriteParsed() {
@@ -61,7 +50,7 @@
     assertFalse(fishes.hasNext());
     byte[][] serialized = metadata.serialize();
     assertEquals(2, serialized.length);
-    assertEquals(new String(serialized[0], StandardCharsets.US_ASCII), "test");
+    assertEquals(new String(serialized[0], StandardCharsets.US_ASCII), "test-bin");
     assertArrayEquals(LANCE_BYTES, serialized[1]);
     assertSame(lance, metadata.get(KEY));
     // Serialized instance should be cached too
@@ -112,14 +101,8 @@
   }
 
   @Test
-  public void integerMarshallerBytesIsBigEndian() {
-    assertEquals(Bytes.asList(new byte[] {0x12, 0x34, 0x56, 0x78}),
-          Bytes.asList(Metadata.INTEGER_MARSHALLER.toBytes(0x12345678)));
-  }
-
-  @Test
-  public void integerMarshallerAsciiIsDecimal() {
-    assertEquals("12345678", Metadata.INTEGER_MARSHALLER.toAscii(12345678));
+  public void integerMarshallerIsDecimal() {
+    assertEquals("12345678", Metadata.INTEGER_MARSHALLER.toAsciiString(12345678));
   }
 
   @Test
@@ -132,8 +115,8 @@
   }
 
   private void roundTripInteger(Integer i) {
-    assertEquals(i, Metadata.INTEGER_MARSHALLER.parseBytes(Metadata.INTEGER_MARSHALLER.toBytes(i)));
-    assertEquals(i, Metadata.INTEGER_MARSHALLER.parseAscii(Metadata.INTEGER_MARSHALLER.toAscii(i)));
+    assertEquals(i, Metadata.INTEGER_MARSHALLER.parseAsciiString(
+        Metadata.INTEGER_MARSHALLER.toAsciiString(i)));
   }
 
   private static class Fish {
diff --git a/core/src/test/java/com/google/net/stubby/transport/TransportFrameUtilTest.java b/core/src/test/java/com/google/net/stubby/transport/TransportFrameUtilTest.java
new file mode 100644
index 0000000..56a1f7e
--- /dev/null
+++ b/core/src/test/java/com/google/net/stubby/transport/TransportFrameUtilTest.java
@@ -0,0 +1,102 @@
+package com.google.net.stubby.transport;
+
+import static com.google.common.base.Charsets.US_ASCII;
+import static com.google.common.base.Charsets.UTF_8;
+import static com.google.net.stubby.Metadata.ASCII_STRING_MARSHALLER;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+import com.google.common.io.BaseEncoding;
+import com.google.net.stubby.Metadata.BinaryMarshaller;
+import com.google.net.stubby.Metadata.Headers;
+import com.google.net.stubby.Metadata.Key;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.Arrays;
+
+/** Unit tests for {@link TransportFrameUtil}. */
+@RunWith(JUnit4.class)
+public class TransportFrameUtilTest {
+
+  private static final String NONCOMPLIANT_ASCII_STRING = new String(new char[]{1, 2, 3});
+
+  private static final String COMPLIANT_ASCII_STRING = "Kyle";
+
+  private static final BinaryMarshaller<String> UTF8_STRING_MARSHALLER =
+      new BinaryMarshaller<String>() {
+    @Override
+    public byte[] toBytes(String value) {
+      return value.getBytes(UTF_8);
+    }
+
+    @Override
+    public String parseBytes(byte[] serialized) {
+      return new String(serialized, UTF_8);
+    }
+  };
+
+  private static final Key<String> PLAIN_STRING = Key.of("plainstring", ASCII_STRING_MARSHALLER);
+  private static final Key<String> BINARY_STRING = Key.of("string-bin", UTF8_STRING_MARSHALLER);
+  private static final Key<String> BINARY_STRING_WITHOUT_SUFFIX =
+      Key.of("string", ASCII_STRING_MARSHALLER);
+
+  @Test
+  public void testToHttp2Headers() {
+    Headers headers = new Headers();
+    headers.put(PLAIN_STRING, COMPLIANT_ASCII_STRING);
+    headers.put(BINARY_STRING, NONCOMPLIANT_ASCII_STRING);
+    headers.put(BINARY_STRING_WITHOUT_SUFFIX, NONCOMPLIANT_ASCII_STRING);
+    byte[][] http2Headers = TransportFrameUtil.toHttp2Headers(headers);
+    // BINARY_STRING_WITHOUT_SUFFIX should not get in because it contains non-compliant ASCII
+    // characters but doesn't have "-bin" in the name.
+    byte[][] answer = new byte[][] {
+        "plainstring".getBytes(US_ASCII), COMPLIANT_ASCII_STRING.getBytes(US_ASCII),
+        "string-bin".getBytes(US_ASCII),
+        base64Encode(NONCOMPLIANT_ASCII_STRING.getBytes(US_ASCII))};
+    assertEquals(answer.length, http2Headers.length);
+    // http2Headers may re-sort the keys, so we cannot compare it with the answer side-by-side.
+    for (int i = 0; i < answer.length; i += 2) {
+      assertContains(http2Headers, answer[i], answer[i + 1]);
+    }
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void binaryHeaderWithoutSuffix() {
+    Key.of("plainstring", UTF8_STRING_MARSHALLER);
+  }
+
+  @Test
+  public void testToAndFromHttp2Headers() {
+    Headers headers = new Headers();
+    headers.put(PLAIN_STRING, COMPLIANT_ASCII_STRING);
+    headers.put(BINARY_STRING, NONCOMPLIANT_ASCII_STRING);
+    headers.put(BINARY_STRING_WITHOUT_SUFFIX, NONCOMPLIANT_ASCII_STRING);
+    byte[][] http2Headers = TransportFrameUtil.toHttp2Headers(headers);
+    byte[][] rawSerialized = TransportFrameUtil.toRawSerializedHeaders(http2Headers);
+    Headers recoveredHeaders = new Headers(rawSerialized);
+    assertEquals(COMPLIANT_ASCII_STRING, recoveredHeaders.get(PLAIN_STRING));
+    assertEquals(NONCOMPLIANT_ASCII_STRING, recoveredHeaders.get(BINARY_STRING));
+    assertNull(recoveredHeaders.get(BINARY_STRING_WITHOUT_SUFFIX));
+  }
+
+  private static void assertContains(byte[][] headers, byte[] key, byte[] value) {
+    String keyString = new String(key, US_ASCII);
+    for (int i = 0; i < headers.length; i += 2) {
+      if (Arrays.equals(headers[i], key)) {
+        assertArrayEquals("value for key=" + keyString, value, headers[i + 1]);
+        return;
+      }
+    }
+    fail("key=" + keyString + " not found");
+  }
+
+  private static byte[] base64Encode(byte[] input) {
+    return BaseEncoding.base64().encode(input).getBytes(US_ASCII);
+  }
+
+}