Phase 1 of converting to new Headers mechanism for side-channel.
Introduces Header and uses it for propagating text-only header values over existing transports
Leaves Context & wire format otherwise unchanged
Next phases
- Remove context from interfaces
- Switch the wire format (ESF needs to be done in near lock-step)
Interface changes are relatively light
Headers class is functional but not optimal
All serialization is done as string until transports expose interface for binary headers
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=75050265
diff --git a/auth/src/main/java/com/google/net/stubby/auth/OAuth2ChannelInterceptor.java b/auth/src/main/java/com/google/net/stubby/auth/OAuth2ChannelInterceptor.java
index 3df0f04..1459fd7 100644
--- a/auth/src/main/java/com/google/net/stubby/auth/OAuth2ChannelInterceptor.java
+++ b/auth/src/main/java/com/google/net/stubby/auth/OAuth2ChannelInterceptor.java
@@ -1,18 +1,21 @@
package com.google.net.stubby.auth;
import com.google.api.client.auth.oauth2.Credential;
-import com.google.common.base.Preconditions;
import com.google.net.stubby.Call;
import com.google.net.stubby.Channel;
+import com.google.net.stubby.Metadata;
import com.google.net.stubby.MethodDescriptor;
+import com.google.net.stubby.context.ForwardingChannel;
import java.util.concurrent.Executor;
import javax.inject.Provider;
/** Channel wrapper that authenticates all calls with OAuth2. */
-public class OAuth2ChannelInterceptor implements Channel {
- private final Channel delegate;
+public class OAuth2ChannelInterceptor extends ForwardingChannel {
+ private static final Metadata.Key<String> AUTHORIZATION =
+ new Metadata.Key<String>("Authorization", Metadata.STRING_MARSHALLER);
+
private final OAuth2AccessTokenProvider accessTokenProvider;
private final Provider<String> authorizationHeaderProvider
= new Provider<String>() {
@@ -23,7 +26,7 @@
};
public OAuth2ChannelInterceptor(Channel delegate, Credential credential, Executor executor) {
- this.delegate = Preconditions.checkNotNull(delegate);
+ super(delegate);
this.accessTokenProvider = new OAuth2AccessTokenProvider(credential, executor);
}
@@ -31,6 +34,12 @@
public <ReqT, RespT> Call<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method) {
// TODO(user): If the call fails for Auth reasons, this does not properly propagate info that
// would be in WWW-Authenticate, because it does not yet have access to the header.
- return delegate.newCall(method.withHeader("Authorization", authorizationHeaderProvider));
+ return new ForwardingCall<ReqT, RespT>(delegate.newCall(method)) {
+ @Override
+ public void start(Listener<RespT> responseListener, Metadata.Headers headers) {
+ headers.put(AUTHORIZATION, authorizationHeaderProvider.get());
+ super.start(responseListener, headers);
+ }
+ };
}
}
diff --git a/core/src/main/java/com/google/net/stubby/Call.java b/core/src/main/java/com/google/net/stubby/Call.java
index 554049b..6be24a8 100644
--- a/core/src/main/java/com/google/net/stubby/Call.java
+++ b/core/src/main/java/com/google/net/stubby/Call.java
@@ -15,9 +15,9 @@
*
* <p>{@link #start} is required to be the first of any methods called.
*
- * <p>Any contexts must be sent before any payloads, which must be sent before half closing.
+ * <p>Any headers must be sent before any payloads, which must be sent before half closing.
*
- * <p>No generic method for determining message receipt or providing acknowlegement is provided.
+ * <p>No generic method for determining message receipt or providing acknowledgement is provided.
* Applications are expected to utilize normal payload messages for such signals, as a response
* natually acknowledges its request.
*
@@ -27,13 +27,22 @@
/**
* Callbacks for consuming incoming RPC messages.
*
- * <p>Any contexts are guaranteed to arrive before any payloads, which are guaranteed before
+ * <p>Response headers are guaranteed to arrive before any payloads, which are guaranteed
+ * to arrive before close. An additional block of headers called 'trailers' can be delivered with
* close.
*
* <p>Implementations are free to block for extended periods of time. Implementations are not
* required to be thread-safe.
*/
public abstract static class Listener<T> {
+
+ /**
+ * The response headers have been received. Headers always precede payloads.
+ * This method is always called, if no headers were received then an empty {@link Metadata}
+ * is passed.
+ */
+ public abstract ListenableFuture<Void> onHeaders(Metadata.Headers headers);
+
/**
* A response context has been received. Any context messages will precede payload messages.
*
@@ -42,6 +51,7 @@
* method.
*/
@Nullable
+ @Deprecated
public abstract ListenableFuture<Void> onContext(String name, InputStream value);
/**
@@ -52,33 +62,24 @@
public abstract ListenableFuture<Void> onPayload(T payload);
/**
- * The Call has been closed. No further sending or receiving will occur. If {@code status} is
- * not equal to {@link Status#OK}, then the call failed.
+ * The Call has been closed. No further sending or receiving can occur. If {@code status} is
+ * not equal to {@link Status#OK}, then the call failed. An additional block of headers may be
+ * received at the end of the call from the server. An empty {@link Metadata} object is passed
+ * if no trailers are received.
*/
- public abstract void onClose(Status status);
+ public abstract void onClose(Status status, Metadata.Trailers trailers);
}
/**
* Start a call, using {@code responseListener} for processing response messages.
*
* @param responseListener receives response messages
+ * @param headers which can contain extra information like authentication.
* @throws IllegalStateException if call is already started
*/
- public abstract void start(Listener<ResponseT> responseListener);
+ // TODO(user): Might be better to put into Channel#newCall, might reduce decoration burden
+ public abstract void start(Listener<ResponseT> responseListener, Metadata.Headers headers);
- /**
- * Prevent any further processing for this Call. No further messages may be sent or will be
- * received. The server is informed of cancellations, but may not stop processing the call.
- * Cancellation is permitted even if previously {@code cancel()}ed or {@link #halfClose}d.
- */
- public abstract void cancel();
-
- /**
- * Close call for message sending. Incoming messages are unaffected.
- *
- * @throws IllegalStateException if call is already {@code halfClose()}d or {@link #cancel}ed
- */
- public abstract void halfClose();
/**
* Send a context message. Context messages are intended for side-channel information like
@@ -89,6 +90,7 @@
* @throws IllegalStateException if call is {@link #halfClose}d or explicitly {@link #cancel}ed,
* or after {@link #sendPayload}
*/
+ @Deprecated
public void sendContext(String name, InputStream value) {
sendContext(name, value, null);
}
@@ -108,8 +110,23 @@
* @throws IllegalStateException if call is {@link #halfClose}d or explicitly {@link #cancel}ed,
* or after {@link #sendPayload}
*/
+ @Deprecated
public abstract void sendContext(String name, InputStream value,
- @Nullable SettableFuture<Void> accepted);
+ @Nullable SettableFuture<Void> accepted);
+
+ /**
+ * Prevent any further processing for this Call. No further messages may be sent or will be
+ * received. The server is informed of cancellations, but may not stop processing the call.
+ * Cancellation is permitted even if previously {@code cancel()}ed or {@link #halfClose}d.
+ */
+ public abstract void cancel();
+
+ /**
+ * Close call for message sending. Incoming messages are unaffected.
+ *
+ * @throws IllegalStateException if call is already {@code halfClose()}d or {@link #cancel}ed
+ */
+ public abstract void halfClose();
/**
* Send a payload message. Payload messages are the primary form of communication associated with
diff --git a/core/src/main/java/com/google/net/stubby/ChannelImpl.java b/core/src/main/java/com/google/net/stubby/ChannelImpl.java
index 9717495..41f0b22 100644
--- a/core/src/main/java/com/google/net/stubby/ChannelImpl.java
+++ b/core/src/main/java/com/google/net/stubby/ChannelImpl.java
@@ -11,8 +11,8 @@
import com.google.net.stubby.newtransport.ClientTransportFactory;
import com.google.net.stubby.newtransport.StreamListener;
-import java.io.InputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -136,9 +136,12 @@
}
@Override
- public void start(Listener<RespT> observer) {
+ public void start(Listener<RespT> observer, Metadata.Headers headers) {
Preconditions.checkState(stream == null, "Already started");
- stream = obtainActiveTransport().newStream(method, new StreamListenerImpl(observer));
+ headers.setPath(method.getName());
+ headers.setAuthority("fixme");
+ stream = obtainActiveTransport().newStream(method, headers,
+ new StreamListenerImpl(observer));
}
@Override
@@ -239,6 +242,16 @@
}
@Override
+ public ListenableFuture<Void> headersRead(final Metadata.Headers headers) {
+ return dispatchCallable(new Callable<ListenableFuture<Void>>() {
+ @Override
+ public ListenableFuture<Void> call() throws Exception {
+ return observer.onHeaders(headers);
+ }
+ });
+ }
+
+ @Override
public ListenableFuture<Void> contextRead(final String name, final InputStream value,
final int length) {
return dispatchCallable(new Callable<ListenableFuture<Void>>() {
@@ -260,7 +273,7 @@
}
@Override
- public void closed(final Status status) {
+ public void closed(final Status status, final Metadata.Trailers trailers) {
for (SettableFuture<Void> future : inProcessFutures) {
future.cancel(false);
}
@@ -268,7 +281,7 @@
callExecutor.execute(new Runnable() {
@Override
public void run() {
- observer.onClose(status);
+ observer.onClose(status, trailers);
}
});
}
diff --git a/core/src/main/java/com/google/net/stubby/Metadata.java b/core/src/main/java/com/google/net/stubby/Metadata.java
new file mode 100644
index 0000000..ca83e0c
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/Metadata.java
@@ -0,0 +1,472 @@
+package com.google.net.stubby;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+/**
+ * Provides access to read and write metadata values to be exchanged during a call.
+ * <p>
+ * This class is not thread safe, implementations should ensure that header reads and writes
+ * do not occur in multiple threads concurrently.
+ * </p>
+ */
+@NotThreadSafe
+public abstract class Metadata<S extends Metadata> {
+
+ /**
+ * Interleave keys and values into a single iterator.
+ */
+ private static Iterator<String> fromMapEntries(Iterable<Map.Entry<String, String>> entries) {
+ final Iterator<Map.Entry<String, String>> iterator = entries.iterator();
+ return new Iterator<String>() {
+ Map.Entry<String, String> last;
+ @Override
+ public boolean hasNext() {
+ return last != null || iterator.hasNext();
+ }
+
+ @Override
+ public String next() {
+ if (last == null) {
+ last = iterator.next();
+ return last.getKey();
+ } else {
+ String val = last.getValue();
+ last = null;
+ return val;
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ /**
+ * Simple metadata marshaller that encodes strings as either UTF-8 or ASCII bytes.
+ */
+ public static final Marshaller<String> STRING_MARSHALLER =
+ new Marshaller<String>() {
+
+ @Override
+ public byte[] toBytes(String value) {
+ return value.getBytes(UTF_8);
+ }
+
+ @Override
+ public String toAscii(String value) {
+ return value;
+ }
+
+ @Override
+ public String parseBytes(byte[] serialized) {
+ return new String(serialized, UTF_8);
+ }
+
+ @Override
+ public String parseAscii(String ascii) {
+ return ascii;
+ }
+ };
+
+ private final ListMultimap<String, MetadataEntry> store;
+ private final boolean serializable;
+
+ /**
+ * Constructor called by the transport layer when it receives binary metadata.
+ */
+ // TODO(user): Convert to use ByteString so we can cache transformations
+ private Metadata(byte[]... binaryValues) {
+ store = LinkedListMultimap.create();
+ for (int i = 0; i < binaryValues.length; i++) {
+ String name = new String(binaryValues[i], StandardCharsets.US_ASCII);
+ store.put(name, new MetadataEntry(binaryValues[++i]));
+ }
+ this.serializable = false;
+ }
+
+ /**
+ * Constructor called by the transport layer when it receives ASCII metadata.
+ */
+ private Metadata(String... asciiValues) {
+ store = LinkedListMultimap.create();
+ for (int i = 0; i < asciiValues.length; i++) {
+ store.put(asciiValues[i], new MetadataEntry(asciiValues[++i]));
+ }
+ this.serializable = false;
+ }
+
+ /**
+ * Constructor called by the application layer when it wants to send metadata.
+ */
+ private Metadata() {
+ store = LinkedListMultimap.create();
+ this.serializable = true;
+ }
+
+ /**
+ * Returns true if a value is defined for the given key.
+ */
+ public <T> boolean containsKey(Key key) {
+ return store.containsKey(key.name);
+ }
+
+ /**
+ * Returns the last metadata entry added with the name 'name' parsed as T.
+ * @return the parsed metadata entry or null if not defined
+ */
+ public <T> T get(Key<T> key) {
+ MetadataEntry metadataEntry = Iterables.getLast(store.get(key.name()));
+ return metadataEntry == null ? null : metadataEntry.getParsed(key);
+ }
+
+ /**
+ * Returns all the metadata entries named 'name', in the order they were received,
+ * parsed as T.
+ */
+ public <T> Iterable<T> getAll(final Key<T> key) {
+ return Iterables.transform(
+ store.get(key.name()),
+ new Function<MetadataEntry, T>() {
+ @Override
+ public T apply(MetadataEntry entry) {
+ return entry.getParsed(key);
+ }
+ });
+ }
+
+ public <T> void put(Key<T> key, T value) {
+ store.put(key.name(), new MetadataEntry(key, value));
+ }
+
+ /**
+ * Remove a specific value.
+ */
+ public <T> boolean remove(Key<T> key, T value) {
+ return store.remove(key.name(), value);
+ }
+
+ /**
+ * Remove all values for the given key.
+ */
+ public <T> List<T> removeAll(final Key<T> key) {
+ return Lists.transform(store.removeAll(key.name()), new Function<MetadataEntry, T>() {
+ @Override
+ public T apply(MetadataEntry metadataEntry) {
+ return metadataEntry.getParsed(key);
+ }
+ });
+ }
+
+ /**
+ * Can this metadata be serialized. Metadata constructed from raw binary or ascii values
+ * cannot be serialized without merging it into a serializable instance using
+ * {@link #merge(Metadata, java.util.Set)}
+ */
+ public boolean isSerializable() {
+ return serializable;
+ }
+
+ /**
+ * Serialize all the metadata entries
+ */
+ public byte[][] serialize() {
+ Preconditions.checkState(serializable, "Can't serialize raw metadata");
+ byte[][] serialized = new byte[store.size() * 2][];
+ int i = 0;
+ for (Map.Entry<String, MetadataEntry> entry : store.entries()) {
+ serialized[i++] = entry.getValue().key.asciiName();
+ serialized[i++] = entry.getValue().getSerialized();
+ }
+ return serialized;
+ }
+
+ /**
+ * Serialize all the metadata entries
+ */
+ public String[] serializeAscii() {
+ Preconditions.checkState(serializable, "Can't serialize received metadata");
+ String[] serialized = new String[store.size() * 2];
+ int i = 0;
+ for (Map.Entry<String, MetadataEntry> entry : store.entries()) {
+ serialized[i++] = entry.getValue().key.name();
+ serialized[i++] = entry.getValue().getSerializedAscii();
+ }
+ return serialized;
+ }
+
+ /**
+ * Perform a simple merge of two sets of metadata.
+ * <p>
+ * Note that we can't merge non-serializable metadata into serializable.
+ * </p>
+ */
+ public void merge(Metadata other) {
+ Preconditions.checkNotNull(other);
+ if (this.serializable) {
+ if (!other.serializable) {
+ throw new IllegalArgumentException(
+ "Cannot merge non-serializable metadata into serializable metadata without keys");
+ }
+ }
+ store.putAll(other.store);
+ }
+
+ /**
+ * Merge values for the given set of keys into this set of metadata.
+ */
+ public void merge(Metadata other, Set<Key> keys) {
+ Preconditions.checkNotNull(other);
+ for (Key key : keys) {
+ if (other.containsKey(key)) {
+ Iterable values = other.getAll(key);
+ for (Object value : values) {
+ put(key, value);
+ }
+ }
+ }
+ }
+
+ /**
+ * Concrete instance for metadata attached to the start of a call.
+ */
+ public static class Headers extends Metadata<Headers> {
+ private String path;
+ private String authority;
+
+ /**
+ * Called by the transport layer to create headers from their binary serialized values.
+ */
+ public Headers(byte[]... headers) {
+ super(headers);
+ }
+
+ /**
+ * Called by the transport layer to create headers from their ASCII serialized values.
+ */
+ public Headers(String... asciiValues) {
+ super(asciiValues);
+ }
+
+ /**
+ * Called by the transport layer to create headers from their ASCII serialized values.
+ */
+ public Headers(Iterable<Map.Entry<String, String>> mapEntries) {
+ super(Iterators.toArray(fromMapEntries(mapEntries), String.class));
+ }
+
+ /**
+ * Called by the application layer to construct headers prior to passing them to the
+ * transport for serialization.
+ */
+ public Headers() {
+ }
+
+ /**
+ * The path for the operation.
+ */
+ public String getPath() {
+ return path;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ /**
+ * The serving authority for the operation.
+ */
+ public String getAuthority() {
+ return authority;
+ }
+
+ public void setAuthority(String authority) {
+ this.authority = authority;
+ }
+ }
+
+ /**
+ * Concrete instance for metadata attached to the end of the call. Only provided by
+ * servers.
+ */
+ public static class Trailers extends Metadata<Headers> {
+ /**
+ * Called by the transport layer to create trailers from their binary serialized values.
+ */
+ public Trailers(byte[]... headers) {
+ super(headers);
+ }
+
+ /**
+ * Called by the transport layer to create trailers from their ASCII serialized values.
+ */
+ public Trailers(String... asciiValues) {
+ super(asciiValues);
+ }
+
+ /**
+ * Called by the transport layer to create headers from their ASCII serialized values.
+ */
+ public Trailers(Iterable<Map.Entry<String, String>> mapEntries) {
+ super(Iterators.toArray(fromMapEntries(mapEntries), String.class));
+ }
+
+ /**
+ * Called by the application layer to construct trailers prior to passing them to the
+ * transport for serialization.
+ */
+ public Trailers() {
+ }
+ }
+
+
+ /**
+ * Marshaller for metadata values.
+ */
+ public static interface Marshaller<T> {
+ /**
+ * Serialize a metadata value to bytes.
+ * @param value to serialize
+ * @return serialized version of value, or null if value cannot be transmitted.
+ */
+ public byte[] toBytes(T value);
+
+ /**
+ * Serialize a metadata value to an ASCII string
+ * @param value to serialize
+ * @return serialized ascii version of value, or null if value cannot be transmitted.
+ */
+ public String toAscii(T value);
+
+ /**
+ * Parse a serialized metadata value from bytes.
+ * @param serialized value of metadata to parse
+ * @return a parsed instance of type T
+ */
+ public T parseBytes(byte[] serialized);
+
+ /**
+ * Parse a serialized metadata value from an ascii string.
+ * @param ascii string value of metadata to parse
+ * @return a parsed instance of type T
+ */
+ public T parseAscii(String ascii);
+ }
+
+ /**
+ * Key for metadata entries. Allows for parsing and serialization of metadata.
+ */
+ public static class Key<T> {
+
+ private final String name;
+ private final byte[] asciiName;
+ private final Marshaller<T> marshaller;
+
+ /**
+ * Keys have a name and a marshaller used for serialization.
+ */
+ public Key(String name, Marshaller<T> marshaller) {
+ this.name = Preconditions.checkNotNull(name, "name").intern();
+ this.asciiName = name.getBytes(StandardCharsets.US_ASCII);
+ this.marshaller = Preconditions.checkNotNull(marshaller);
+ }
+
+ public String name() {
+ return name;
+ }
+
+ @VisibleForTesting
+ byte[] asciiName() {
+ return asciiName;
+ }
+
+ public Marshaller<T> getMarshaller() {
+ return marshaller;
+ }
+ }
+
+ private static class MetadataEntry {
+ Object parsed;
+ Key key;
+ byte[] serializedBinary;
+ String serializedAscii;
+
+ /**
+ * Constructor used when application layer adds a parsed value.
+ */
+ private MetadataEntry(Key key, Object parsed) {
+ this.parsed = Preconditions.checkNotNull(parsed);
+ this.key = Preconditions.checkNotNull(key);
+ }
+
+ /**
+ * Constructor used when reading a value from the transport.
+ */
+ private MetadataEntry(byte[] serialized) {
+ Preconditions.checkNotNull(serialized);
+ this.serializedBinary = serialized;
+ }
+
+ /**
+ * Constructor used when reading a value from the transport.
+ */
+ private MetadataEntry(String serializedAscii) {
+ this.serializedAscii = Preconditions.checkNotNull(serializedAscii);
+ }
+
+ public <T> T getParsed(Key<T> key) {
+ @SuppressWarnings("unchecked")
+ T value = (T) parsed;
+ if (value != null) {
+ if (this.key != key) {
+ // Keys don't match so serialize using the old key
+ serializedBinary = this.key.getMarshaller().toBytes(value);
+ } else {
+ return value;
+ }
+ }
+ this.key = key;
+ if (serializedBinary != null) {
+ value = key.getMarshaller().parseBytes(serializedBinary);
+ } else if (serializedAscii != null) {
+ value = key.getMarshaller().parseAscii(serializedAscii);
+ }
+ parsed = value;
+ return value;
+ }
+
+ @SuppressWarnings("unchecked")
+ public byte[] getSerialized() {
+ return serializedBinary =
+ serializedBinary == null
+ ? key.getMarshaller().toBytes(parsed) :
+ serializedBinary;
+ }
+
+ @SuppressWarnings("unchecked")
+ public String getSerializedAscii() {
+ return serializedAscii =
+ serializedAscii == null
+ ? key.getMarshaller().toAscii(parsed) :
+ serializedAscii;
+ }
+ }
+}
diff --git a/core/src/main/java/com/google/net/stubby/MethodDescriptor.java b/core/src/main/java/com/google/net/stubby/MethodDescriptor.java
index d5671fa..a81e4ac 100644
--- a/core/src/main/java/com/google/net/stubby/MethodDescriptor.java
+++ b/core/src/main/java/com/google/net/stubby/MethodDescriptor.java
@@ -10,7 +10,6 @@
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.Immutable;
-import javax.inject.Provider;
/**
* Descriptor for a single operation, used by Channel to execute a call.
@@ -31,28 +30,24 @@
private final Marshaller<RequestT> requestMarshaller;
private final Marshaller<ResponseT> responseMarshaller;
private final long timeoutMicros;
- private final Map<String, Provider<String>> headers;
public static <RequestT, ResponseT> MethodDescriptor<RequestT, ResponseT> create(
Type type, String name, long timeout, TimeUnit timeoutUnit,
Marshaller<RequestT> requestMarshaller,
Marshaller<ResponseT> responseMarshaller) {
return new MethodDescriptor<RequestT, ResponseT>(
- type, name, timeoutUnit.toMicros(timeout), requestMarshaller, responseMarshaller,
- Collections.<String, Provider<String>>emptyMap());
+ type, name, timeoutUnit.toMicros(timeout), requestMarshaller, responseMarshaller);
}
private MethodDescriptor(Type type, String name, long timeoutMicros,
Marshaller<RequestT> requestMarshaller,
- Marshaller<ResponseT> responseMarshaller,
- Map<String, Provider<String>> headers) {
+ Marshaller<ResponseT> responseMarshaller) {
this.type = Preconditions.checkNotNull(type);
this.name = name;
Preconditions.checkArgument(timeoutMicros > 0);
this.timeoutMicros = timeoutMicros;
this.requestMarshaller = requestMarshaller;
this.responseMarshaller = responseMarshaller;
- this.headers = Collections.unmodifiableMap(headers);
}
/**
@@ -77,20 +72,6 @@
}
/**
- * Return a snapshot of the headers.
- */
- public Map<String, String> getHeaders() {
- if (headers.isEmpty()) {
- return Collections.emptyMap();
- }
- Map<String, String> snapshot = new HashMap<String, String>();
- for (Entry<String, Provider<String>> entry : headers.entrySet()) {
- snapshot.put(entry.getKey(), entry.getValue().get());
- }
- return Collections.unmodifiableMap(snapshot);
- }
-
- /**
* Parse a response payload from the given {@link InputStream}
*/
public ResponseT parseResponse(InputStream input) {
@@ -109,28 +90,6 @@
*/
public MethodDescriptor<RequestT, ResponseT> withTimeout(long timeout, TimeUnit unit) {
return new MethodDescriptor<RequestT, ResponseT>(type, name, unit.toMicros(timeout),
- requestMarshaller, responseMarshaller, headers);
- }
-
- /**
- * Create a new descriptor with an additional bound header.
- */
- public MethodDescriptor<RequestT, ResponseT> withHeader(String headerName,
- Provider<String> headerValueProvider) {
- Map<String, Provider<String>> newHeaders = new HashMap<String, Provider<String>>(headers);
- newHeaders.put(headerName, headerValueProvider);
- return new MethodDescriptor<RequestT, ResponseT>(type, name, timeoutMicros,
- requestMarshaller, responseMarshaller, newHeaders);
- }
-
- /**
- * Creates a new descriptor with additional bound headers.
- */
- public MethodDescriptor<RequestT, ResponseT> withHeaders(
- Map<String, Provider<String>> additionalHeaders) {
- Map<String, Provider<String>> newHeaders = new HashMap<String, Provider<String>>(headers);
- newHeaders.putAll(additionalHeaders);
- return new MethodDescriptor<RequestT, ResponseT>(type, name, timeoutMicros,
- requestMarshaller, responseMarshaller, newHeaders);
+ requestMarshaller, responseMarshaller);
}
}
diff --git a/core/src/main/java/com/google/net/stubby/ServerCall.java b/core/src/main/java/com/google/net/stubby/ServerCall.java
index 043b249..5e9183a 100644
--- a/core/src/main/java/com/google/net/stubby/ServerCall.java
+++ b/core/src/main/java/com/google/net/stubby/ServerCall.java
@@ -3,6 +3,7 @@
import com.google.common.util.concurrent.ListenableFuture;
import java.io.InputStream;
+
import javax.annotation.Nullable;
/**
@@ -11,9 +12,9 @@
* response is most common. This API is generally intended for use generated handlers, but advanced
* applications may have need for it.
*
- * <p>Any contexts must be sent before any payloads, which must be sent before closing.
+ * <p>Headers must be sent before any payloads, which must be sent before closing.
*
- * <p>No generic method for determining message receipt or providing acknowlegement is provided.
+ * <p>No generic method for determining message receipt or providing acknowledgement is provided.
* Applications are expected to utilize normal payload messages for such signals, as a response
* natually acknowledges its request.
*
@@ -34,6 +35,19 @@
// a case then we either get to generate a half close or purposefully omit it.
public abstract static class Listener<RequestT> {
/**
+ * Called upon receiving all header information from the remote end-point.
+ * <p>This method should return quickly, as the same thread may be used to process other
+ * streams.
+ *
+ * @param headers the fully buffered received headers.
+ * @return a processing completion future, or {@code null} to indicate that processing of the
+ * headers is immediately complete.
+ */
+ @Nullable
+ public abstract ListenableFuture<Void> headersRead(Metadata.Headers headers);
+
+
+ /**
* A request context has been received. Any context messages will precede payload messages.
*
* <p>The {@code value} {@link InputStream} will be closed when the returned future completes.
@@ -41,6 +55,7 @@
* method.
*/
@Nullable
+ @Deprecated
public abstract ListenableFuture<Void> onContext(String name, InputStream value);
/**
@@ -83,7 +98,7 @@
*
* @throws IllegalStateException if call is already {@code close}d
*/
- public abstract void close(Status status);
+ public abstract void close(Status status, Metadata.Trailers trailers);
/**
* Send a context message. Context messages are intended for side-channel information like
@@ -93,6 +108,7 @@
* @param value context value bytes
* @throws IllegalStateException if call is {@link #close}d, or after {@link #sendPayload}
*/
+ @Deprecated
public abstract void sendContext(String name, InputStream value);
/**
diff --git a/core/src/main/java/com/google/net/stubby/Session.java b/core/src/main/java/com/google/net/stubby/Session.java
index 1b70dc2..d7ae015 100644
--- a/core/src/main/java/com/google/net/stubby/Session.java
+++ b/core/src/main/java/com/google/net/stubby/Session.java
@@ -1,7 +1,5 @@
package com.google.net.stubby;
-import java.util.Map;
-
/**
* Session interface to be bound to the transport layer which is used by the higher-level
* layers to dispatch calls.
@@ -18,6 +16,6 @@
* Start a request in the context of this session.
*/
public Request startRequest(String operationName,
- Map<String, String> headers,
+ Metadata.Headers headers,
Response.ResponseBuilder responseBuilder);
}
diff --git a/core/src/main/java/com/google/net/stubby/SessionClientStream.java b/core/src/main/java/com/google/net/stubby/SessionClientStream.java
index 04f8a2f..20b6922 100644
--- a/core/src/main/java/com/google/net/stubby/SessionClientStream.java
+++ b/core/src/main/java/com/google/net/stubby/SessionClientStream.java
@@ -1,18 +1,12 @@
package com.google.net.stubby;
-import com.google.net.stubby.AbstractResponse;
-import com.google.net.stubby.Operation;
-import com.google.net.stubby.Request;
-import com.google.net.stubby.Response;
-import com.google.net.stubby.Session;
-import com.google.net.stubby.Status;
import com.google.net.stubby.newtransport.ClientStream;
import com.google.net.stubby.newtransport.StreamListener;
import com.google.net.stubby.newtransport.StreamState;
import com.google.net.stubby.transport.Transport;
-import java.io.InputStream;
import java.io.IOException;
+import java.io.InputStream;
/**
* A temporary shim layer between the new (Channel) and the old (Session). Will go away when the
@@ -151,7 +145,7 @@
}
private void propagateClosed() {
- listener.closed(getStatus());
+ listener.closed(getStatus(), new Metadata.Trailers());
}
}
}
diff --git a/core/src/main/java/com/google/net/stubby/SessionClientTransport.java b/core/src/main/java/com/google/net/stubby/SessionClientTransport.java
index 799ca56..e92e96d 100644
--- a/core/src/main/java/com/google/net/stubby/SessionClientTransport.java
+++ b/core/src/main/java/com/google/net/stubby/SessionClientTransport.java
@@ -27,9 +27,10 @@
@Override
public ClientStream newStream(MethodDescriptor<?, ?> method,
+ Metadata.Headers headers,
StreamListener listener) {
final SessionClientStream stream = new SessionClientStream(listener);
- Request request = session.startRequest(method.getName(), method.getHeaders(),
+ Request request = session.startRequest(method.getName(), headers,
stream.responseBuilder());
stream.start(request);
return stream;
diff --git a/core/src/main/java/com/google/net/stubby/context/ContextExchangeChannel.java b/core/src/main/java/com/google/net/stubby/context/ContextExchangeChannel.java
deleted file mode 100644
index 3e24a37..0000000
--- a/core/src/main/java/com/google/net/stubby/context/ContextExchangeChannel.java
+++ /dev/null
@@ -1,114 +0,0 @@
-package com.google.net.stubby.context;
-
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.net.stubby.Call;
-import com.google.net.stubby.Channel;
-import com.google.net.stubby.Marshaller;
-import com.google.net.stubby.MethodDescriptor;
-
-import java.io.InputStream;
-import java.util.Map;
-
-import javax.annotation.concurrent.NotThreadSafe;
-import javax.inject.Provider;
-
-/**
- * A channel implementation that sends bound context values and records received context.
- * Unlike {@Channel} this class is not thread-safe so it is recommended to create an instance
- * per thread.
- */
-@NotThreadSafe
-public class ContextExchangeChannel extends ForwardingChannel {
-
- private Map<String, Object> captured;
- private Map<String, Provider<InputStream>> provided;
-
- public ContextExchangeChannel(Channel channel) {
- super(channel);
- // builder?
- captured = Maps.newTreeMap();
- provided = Maps.newTreeMap();
- }
-
- @SuppressWarnings("unchecked")
- public <T> Provider<T> receive(final String name, final Marshaller<T> m) {
- synchronized (captured) {
- captured.put(name, null);
- }
- return new Provider<T>() {
- @Override
- public T get() {
- synchronized (captured) {
- Object o = captured.get(name);
- if (o instanceof InputStream) {
- o = m.parse((InputStream) o);
- captured.put(name, o);
- }
- return (T) o;
- }
- }
- };
- }
-
- public <T> void send(final String name, final T value, final Marshaller<T> m) {
- synchronized (provided) {
- provided.put(name, new Provider<InputStream>() {
- @Override
- public InputStream get() {
- return m.stream(value);
- }
- });
- }
- }
-
- /**
- * Clear all received values and allow another call
- */
- public void clearLastReceived() {
- synchronized (captured) {
- for (Map.Entry<String, Object> entry : captured.entrySet()) {
- entry.setValue(null);
- }
- }
- }
-
-
- @Override
- public <ReqT, RespT> Call<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method) {
- return new CallImpl<ReqT, RespT>(delegate.newCall(method));
- }
-
- private class CallImpl<ReqT, RespT> extends ForwardingCall<ReqT, RespT> {
- private CallImpl(Call<ReqT, RespT> delegate) {
- super(delegate);
- }
-
- @Override
- public void start(Listener<RespT> responseListener) {
- super.start(new ListenerImpl<RespT>(responseListener));
- synchronized (provided) {
- for (Map.Entry<String, Provider<InputStream>> entry : provided.entrySet()) {
- sendContext(entry.getKey(), entry.getValue().get());
- }
- }
- }
- }
-
- private class ListenerImpl<T> extends ForwardingListener<T> {
- private ListenerImpl(Call.Listener<T> delegate) {
- super(delegate);
- }
-
- @Override
- public ListenableFuture<Void> onContext(String name, InputStream value) {
- synchronized (captured) {
- if (captured.containsKey(name)) {
- captured.put(name, value);
- return null;
- }
- }
- return super.onContext(name, value);
- }
- }
-}
diff --git a/core/src/main/java/com/google/net/stubby/context/ForwardingChannel.java b/core/src/main/java/com/google/net/stubby/context/ForwardingChannel.java
index 8f299e4..fd7ab6d 100644
--- a/core/src/main/java/com/google/net/stubby/context/ForwardingChannel.java
+++ b/core/src/main/java/com/google/net/stubby/context/ForwardingChannel.java
@@ -4,6 +4,7 @@
import com.google.common.util.concurrent.SettableFuture;
import com.google.net.stubby.Call;
import com.google.net.stubby.Channel;
+import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status;
import java.io.InputStream;
@@ -35,8 +36,8 @@
}
@Override
- public void start(Listener<ResponseT> responseListener) {
- this.delegate.start(responseListener);
+ public void start(Listener<ResponseT> responseListener, Metadata.Headers headers) {
+ this.delegate.start(responseListener, headers);
}
@Override
@@ -72,6 +73,11 @@
}
@Override
+ public ListenableFuture<Void> onHeaders(Metadata.Headers headers) {
+ return delegate.onHeaders(headers);
+ }
+
+ @Override
public ListenableFuture<Void> onContext(String name, InputStream value) {
return delegate.onContext(name, value);
}
@@ -82,8 +88,8 @@
}
@Override
- public void onClose(Status status) {
- delegate.onClose(status);
+ public void onClose(Status status, Metadata.Trailers trailers) {
+ delegate.onClose(status, trailers);
}
}
}
diff --git a/core/src/main/java/com/google/net/stubby/http/ServletSession.java b/core/src/main/java/com/google/net/stubby/http/ServletSession.java
index 5781f7d..2307487 100644
--- a/core/src/main/java/com/google/net/stubby/http/ServletSession.java
+++ b/core/src/main/java/com/google/net/stubby/http/ServletSession.java
@@ -1,10 +1,10 @@
package com.google.net.stubby.http;
-import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteBuffers;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.net.stubby.AbstractResponse;
+import com.google.net.stubby.Metadata;
import com.google.net.stubby.Operation;
import com.google.net.stubby.Request;
import com.google.net.stubby.Response;
@@ -20,7 +20,9 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Enumeration;
+import java.util.List;
import java.util.concurrent.Executor;
import javax.servlet.ServletException;
@@ -97,15 +99,17 @@
return null;
}
- ImmutableMap.Builder<String, String> headerMapBuilder = ImmutableMap.builder();
+ List<String> headerList = new ArrayList<>();
Enumeration headerNames = req.getHeaderNames();
while (headerNames.hasMoreElements()) {
String name = headerNames.nextElement().toString();
- headerMapBuilder.put(name, req.getHeader(name));
+ headerList.add(name);
+ headerList.add(req.getHeader(name));
}
+ Metadata.Headers headers = new Metadata.Headers(headerList.toArray(new String[]{}));
// Create the operation and bind an HTTP response operation
- Request op = session.startRequest(operationName, headerMapBuilder.build(),
+ Request op = session.startRequest(operationName, headers,
HttpResponseOperation.builder(responseStream));
if (op == null) {
// TODO(user): Unify error handling once spec finalized
diff --git a/core/src/main/java/com/google/net/stubby/http/UrlConnectionClientSession.java b/core/src/main/java/com/google/net/stubby/http/UrlConnectionClientSession.java
index 22794e4..cb32870 100644
--- a/core/src/main/java/com/google/net/stubby/http/UrlConnectionClientSession.java
+++ b/core/src/main/java/com/google/net/stubby/http/UrlConnectionClientSession.java
@@ -2,6 +2,7 @@
import com.google.common.io.ByteBuffers;
import com.google.net.stubby.AbstractRequest;
+import com.google.net.stubby.Metadata;
import com.google.net.stubby.Operation;
import com.google.net.stubby.Response;
import com.google.net.stubby.Session;
@@ -16,7 +17,6 @@
import java.net.HttpURLConnection;
import java.net.URI;
import java.nio.ByteBuffer;
-import java.util.Map;
/**
* Implementation of {@link Session} using {@link HttpURLConnection} for clients. Services
@@ -31,9 +31,10 @@
}
@Override
- public Request startRequest(String operationName, Map<String, String> headers,
+ public Request startRequest(String operationName, Metadata.Headers headers,
Response.ResponseBuilder responseBuilder) {
- return new Request(base.resolve(operationName), headers, responseBuilder.build());
+ return new Request(base.resolve(operationName), headers,
+ responseBuilder.build());
}
private class Request extends AbstractRequest implements Framer.Sink {
@@ -42,7 +43,7 @@
private final DataOutputStream outputStream;
private final MessageFramer framer;
- private Request(URI uri, Map<String, String> headers, Response response) {
+ private Request(URI uri, Metadata.Headers headers, Response response) {
super(response);
try {
connection = (HttpURLConnection) uri.toURL().openConnection();
@@ -50,8 +51,11 @@
connection.setDoInput(true);
connection.setRequestMethod("POST");
connection.setRequestProperty("Content-Type", "application/protorpc");
- for (Map.Entry<String, String> header : headers.entrySet()) {
- connection.setRequestProperty(header.getKey(), header.getValue());
+ String[] serialized = headers.serializeAscii();
+ for (int i = 0; i < serialized.length; i++) {
+ connection.setRequestProperty(
+ serialized[i],
+ serialized[++i]);
}
outputStream = new DataOutputStream(connection.getOutputStream());
} catch (IOException t) {
diff --git a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Codec.java b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Codec.java
index 6dc46a8..873a880 100644
--- a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Codec.java
+++ b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Codec.java
@@ -1,6 +1,6 @@
package com.google.net.stubby.http2.netty;
-import com.google.common.collect.ImmutableMap;
+import com.google.net.stubby.Metadata;
import com.google.net.stubby.NoOpRequest;
import com.google.net.stubby.Operation;
import com.google.net.stubby.Operation.Phase;
@@ -23,8 +23,6 @@
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Settings;
-import java.util.Map;
-
/**
* Codec used by clients and servers to interpret HTTP2 frames in the context of an ongoing
* request-response dialog
@@ -212,12 +210,9 @@
if (operationName == null) {
return null;
}
- ImmutableMap.Builder<String, String> headerMap = ImmutableMap.builder();
- for (Map.Entry<String, String> header : headers) {
- headerMap.put(header);
- }
+ Metadata.Headers grpcHeaders = new Metadata.Headers(headers);
// Create the operation and bind a HTTP2 response operation
- Request op = session.startRequest(operationName, headerMap.build(),
+ Request op = session.startRequest(operationName, grpcHeaders,
createResponse(new Http2Writer(ctx), streamId));
if (op == null) {
return null;
diff --git a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Request.java b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Request.java
index 0d6d2d5..10bf6e4 100644
--- a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Request.java
+++ b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Request.java
@@ -1,14 +1,14 @@
package com.google.net.stubby.http2.netty;
+import com.google.net.stubby.Metadata;
import com.google.net.stubby.Request;
import com.google.net.stubby.Response;
import com.google.net.stubby.transport.Framer;
+import io.netty.handler.codec.http2.DefaultHttp2Headers;
+
import java.net.InetAddress;
import java.net.UnknownHostException;
-import java.util.Map;
-
-import io.netty.handler.codec.http2.DefaultHttp2Headers;
/**
* A HTTP2 based implementation of {@link Request}
@@ -30,12 +30,15 @@
private final Response response;
public Http2Request(Response response, String operationName,
- Map<String, String> headers,
+ Metadata.Headers headers,
Http2Codec.Http2Writer writer, Framer framer) {
super(response.getId(), writer, framer);
DefaultHttp2Headers.Builder headersBuilder = DefaultHttp2Headers.newBuilder();
- for (Map.Entry<String, String> entry : headers.entrySet()) {
- headersBuilder.add(entry.getKey(), entry.getValue());
+ // TODO(user) Switch the ASCII requirement to false once Netty supports binary
+ // headers.
+ String[] headerValues = headers.serializeAscii();
+ for (int i = 0; i < headerValues.length; i++) {
+ headersBuilder.add(headerValues[i], headerValues[++i]);
}
headersBuilder.method("POST")
.path("/" + operationName)
diff --git a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Session.java b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Session.java
index 7e22f60..8a05312 100644
--- a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Session.java
+++ b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Session.java
@@ -1,12 +1,12 @@
package com.google.net.stubby.http2.netty;
+import com.google.net.stubby.Metadata;
import com.google.net.stubby.Request;
import com.google.net.stubby.RequestRegistry;
import com.google.net.stubby.Response;
import com.google.net.stubby.Session;
import com.google.net.stubby.transport.MessageFramer;
-import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -33,7 +33,7 @@
}
@Override
- public Request startRequest(String operationName, Map<String, String> headers,
+ public Request startRequest(String operationName, Metadata.Headers headers,
Response.ResponseBuilder response) {
int nextSessionId = getNextStreamId();
Request operation = new Http2Request(response.build(nextSessionId), operationName,
diff --git a/core/src/main/java/com/google/net/stubby/http2/okhttp/Http2Request.java b/core/src/main/java/com/google/net/stubby/http2/okhttp/Http2Request.java
index 5f8d47c..afaa921 100644
--- a/core/src/main/java/com/google/net/stubby/http2/okhttp/Http2Request.java
+++ b/core/src/main/java/com/google/net/stubby/http2/okhttp/Http2Request.java
@@ -1,5 +1,6 @@
package com.google.net.stubby.http2.okhttp;
+import com.google.net.stubby.Metadata;
import com.google.net.stubby.Request;
import com.google.net.stubby.RequestRegistry;
import com.google.net.stubby.Response;
@@ -13,7 +14,6 @@
import java.io.IOException;
import java.util.List;
-import java.util.Map;
/**
* A HTTP2 based implementation of {@link Request}
@@ -22,7 +22,7 @@
private final Response response;
public Http2Request(FrameWriter frameWriter, String operationName,
- Map<String, String> headers,
+ Metadata.Headers headers,
Response response, RequestRegistry requestRegistry,
Framer framer) {
super(response.getId(), frameWriter, framer);
@@ -31,10 +31,8 @@
// Register this request.
requestRegistry.register(this);
- List<Header> requestHeaders = Headers.createRequestHeaders(operationName);
- for (Map.Entry<String, String> entry : headers.entrySet()) {
- requestHeaders.add(new Header(entry.getKey(), entry.getValue()));
- }
+ List<Header> requestHeaders = Headers.createRequestHeaders(operationName,
+ headers.serialize());
frameWriter.synStream(false, false, getId(), 0, requestHeaders);
} catch (IOException ioe) {
close(new Status(Transport.Code.UNKNOWN, ioe));
diff --git a/core/src/main/java/com/google/net/stubby/http2/okhttp/OkHttpSession.java b/core/src/main/java/com/google/net/stubby/http2/okhttp/OkHttpSession.java
index 8fd7c2e..c8f9d39 100644
--- a/core/src/main/java/com/google/net/stubby/http2/okhttp/OkHttpSession.java
+++ b/core/src/main/java/com/google/net/stubby/http2/okhttp/OkHttpSession.java
@@ -4,6 +4,7 @@
import com.google.common.io.ByteStreams;
import com.google.common.io.CountingInputStream;
import com.google.common.io.CountingOutputStream;
+import com.google.net.stubby.Metadata;
import com.google.net.stubby.Operation;
import com.google.net.stubby.Operation.Phase;
import com.google.net.stubby.Request;
@@ -25,18 +26,17 @@
import com.squareup.okhttp.internal.spdy.Settings;
import com.squareup.okhttp.internal.spdy.Variant;
-import java.io.IOException;
-import java.net.Socket;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicInteger;
-
import okio.BufferedSink;
import okio.BufferedSource;
import okio.ByteString;
import okio.Okio;
+import java.io.IOException;
+import java.net.Socket;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+
/**
* Basic implementation of {@link Session} using OkHttp
*/
@@ -147,7 +147,8 @@
}
@Override
- public Request startRequest(String operationName, Map<String, String> headers,
+ public Request startRequest(String operationName,
+ Metadata.Headers headers,
Response.ResponseBuilder responseBuilder) {
int nextStreamId = getNextStreamId();
Response response = responseBuilder.build(nextStreamId);
@@ -256,18 +257,21 @@
HeadersMode headersMode) {
Operation op = getOperation(streamId);
- ImmutableMap.Builder<String, String> mapBuilder = ImmutableMap.builder();
- for (Header header : headers) {
- mapBuilder.put(header.name.utf8(), header.value.utf8());
- }
- Map<String, String> headersMap = mapBuilder.build();
-
// Start an Operation for SYN_STREAM
if (op == null && headersMode == HeadersMode.HTTP_20_HEADERS) {
- String path = headersMap.get(Header.TARGET_PATH.utf8());
+ String path = findReservedHeader(Header.TARGET_PATH.utf8(), headers);
+ byte[][] binaryHeaders = new byte[headers.size() * 2][];
+ for (int i = 0; i < headers.size(); i++) {
+ Header header = headers.get(i);
+ binaryHeaders[i * 2] = header.name.toByteArray();
+ binaryHeaders[(i * 2) + 1] = header.value.toByteArray();
+ }
+ Metadata.Headers grpcHeaders = new Metadata.Headers(binaryHeaders);
+ grpcHeaders.setPath(path);
+ grpcHeaders.setAuthority(findReservedHeader(Header.TARGET_AUTHORITY.utf8(), headers));
if (path != null) {
Request request = serverSession.startRequest(path,
- headersMap,
+ grpcHeaders,
Http2Response.builder(streamId, frameWriter, new MessageFramer(4096)));
requestRegistry.register(request);
op = request;
@@ -283,6 +287,20 @@
}
}
+ private String findReservedHeader(String name, List<Header> headers) {
+ for (Header header : headers) {
+ // Reserved headers must come before non-reserved headers, so we can exit the loop
+ // early if we see a non-reserved header.
+ if (!header.name.utf8().startsWith(":")) {
+ return null;
+ }
+ if (header.name.utf8().equals(name)) {
+ return header.value.utf8();
+ }
+ }
+ return null;
+ }
+
@Override
public void rstStream(int streamId, ErrorCode errorCode) {
try {
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/AbstractClientStream.java b/core/src/main/java/com/google/net/stubby/newtransport/AbstractClientStream.java
index 9d11921..99508f8 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/AbstractClientStream.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/AbstractClientStream.java
@@ -5,6 +5,7 @@
import static com.google.net.stubby.newtransport.StreamState.READ_ONLY;
import com.google.common.base.Preconditions;
+import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status;
/**
@@ -37,9 +38,10 @@
// Wraps the base handler to get status update.
return new ForwardingStreamListener(super.inboundMessageHandler()) {
@Override
- public void closed(Status status) {
+ public void closed(Status status, Metadata.Trailers trailers) {
inboundPhase(Phase.STATUS);
- setStatus(status);
+ // TODO(user): Fix once we switch the wire format to express status in trailers
+ setStatus(status, new Metadata.Trailers());
}
};
}
@@ -51,7 +53,7 @@
* @param newStatus the new status to set
* @return {@code} true if the status was not already set.
*/
- public boolean setStatus(final Status newStatus) {
+ public boolean setStatus(final Status newStatus, Metadata.Trailers trailers) {
Preconditions.checkNotNull(newStatus, "newStatus");
synchronized (stateLock) {
if (status != null) {
@@ -64,7 +66,7 @@
}
// Invoke the observer callback.
- listener.closed(newStatus);
+ listener.closed(newStatus, trailers);
// Free any resources.
dispose();
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/AbstractClientTransport.java b/core/src/main/java/com/google/net/stubby/newtransport/AbstractClientTransport.java
index 71d48f3..ebd235c 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/AbstractClientTransport.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/AbstractClientTransport.java
@@ -2,6 +2,7 @@
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractService;
+import com.google.net.stubby.Metadata;
import com.google.net.stubby.MethodDescriptor;
/**
@@ -12,7 +13,9 @@
public abstract class AbstractClientTransport extends AbstractService implements ClientTransport {
@Override
- public final ClientStream newStream(MethodDescriptor<?, ?> method, StreamListener listener) {
+ public final ClientStream newStream(MethodDescriptor<?, ?> method,
+ Metadata.Headers headers,
+ StreamListener listener) {
Preconditions.checkNotNull(method, "method");
Preconditions.checkNotNull(listener, "listener");
if (state() == State.STARTING) {
@@ -25,7 +28,7 @@
}
// Create the stream.
- return newStreamInternal(method, listener);
+ return newStreamInternal(method, headers, listener);
}
/**
@@ -38,5 +41,6 @@
* @return the new stream.
*/
protected abstract ClientStream newStreamInternal(MethodDescriptor<?, ?> method,
+ Metadata.Headers headers,
StreamListener listener);
}
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/AbstractServerStream.java b/core/src/main/java/com/google/net/stubby/newtransport/AbstractServerStream.java
index 5856c5a..84dbfa8 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/AbstractServerStream.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/AbstractServerStream.java
@@ -5,6 +5,7 @@
import static com.google.net.stubby.newtransport.StreamState.WRITE_ONLY;
import com.google.common.base.Preconditions;
+import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status;
import com.google.net.stubby.transport.Transport;
@@ -28,7 +29,7 @@
}
@Override
- public final void close(Status status) {
+ public final void close(Status status, Metadata.Trailers trailers) {
synchronized (stateLock) {
Preconditions.checkState(!status.isOk() || state == WRITE_ONLY,
"Cannot close with OK before client half-closes");
@@ -57,7 +58,7 @@
}
if (previousState == OPEN) {
inboundPhase(Phase.STATUS);
- listener.closed(Status.OK);
+ listener.closed(Status.OK, new Metadata.Trailers());
} else {
abortStream(
new Status(Transport.Code.FAILED_PRECONDITION, "Client-end of the stream already closed"),
@@ -69,8 +70,8 @@
* Aborts the stream with an error status, cleans up resources and notifies the listener if
* necessary.
*
- * <p>Unlike {@link #close(Status)}, this method is only called from the gRPC framework, so that
- * we need to call closed() on the listener if it has not been called.
+ * <p>Unlike {@link #close(Status, Metadata.Trailers)}, this method is only called from the
+ * gRPC framework, so that we need to call closed() on the listener if it has not been called.
*
* @param status the error status. Must not be Status.OK.
* @param notifyClient true if the stream is still writable and you want to notify the client
@@ -88,7 +89,7 @@
}
if (previousState == OPEN) {
- listener.closed(status);
+ listener.closed(status, new Metadata.Trailers());
} // Otherwise, previousState is WRITE_ONLY thus closed() has already been called.
outboundPhase(Phase.STATUS);
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/AbstractStream.java b/core/src/main/java/com/google/net/stubby/newtransport/AbstractStream.java
index 0cef2be..10687c9 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/AbstractStream.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/AbstractStream.java
@@ -4,6 +4,7 @@
import com.google.common.io.Closeables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
+import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status;
import java.io.InputStream;
@@ -42,6 +43,15 @@
* Internal handler for Deframer output. Informs the {@link #listener()} of inbound messages.
*/
private final StreamListener inboundMessageHandler = new StreamListener() {
+
+ @Override
+ public ListenableFuture<Void> headersRead(Metadata.Headers headers) {
+ inboundPhase(Phase.CONTEXT);
+ ListenableFuture<Void> future = listener().headersRead(headers);
+ disableWindowUpdate(future);
+ return future;
+ }
+
@Override
public ListenableFuture<Void> contextRead(String name, InputStream value, int length) {
ListenableFuture<Void> future = null;
@@ -69,9 +79,9 @@
}
@Override
- public void closed(Status status) {
+ public void closed(Status status, Metadata.Trailers trailers) {
inboundPhase(Phase.STATUS);
- listener().closed(status);
+ listener().closed(status, trailers);
}
};
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/ClientTransport.java b/core/src/main/java/com/google/net/stubby/newtransport/ClientTransport.java
index e32b9e8..a596c6d 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/ClientTransport.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/ClientTransport.java
@@ -1,6 +1,7 @@
package com.google.net.stubby.newtransport;
import com.google.common.util.concurrent.Service;
+import com.google.net.stubby.Metadata;
import com.google.net.stubby.MethodDescriptor;
/**
@@ -23,8 +24,11 @@
* completed (either successfully or unsuccessfully).
*
* @param method the descriptor of the remote method to be called for this stream.
+ * @param headers to send at the beginning of the call
* @param listener the listener for the newly created stream.
* @return the newly created stream.
*/
- ClientStream newStream(MethodDescriptor<?, ?> method, StreamListener listener);
+ ClientStream newStream(MethodDescriptor<?, ?> method,
+ Metadata.Headers headers,
+ StreamListener listener);
}
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/Deframer.java b/core/src/main/java/com/google/net/stubby/newtransport/Deframer.java
index f709a5d..72e44c8 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/Deframer.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/Deframer.java
@@ -2,6 +2,7 @@
import com.google.common.io.ByteStreams;
import com.google.net.stubby.GrpcFramingUtil;
+import com.google.net.stubby.Metadata;
import com.google.net.stubby.Operation;
import com.google.net.stubby.Status;
import com.google.net.stubby.transport.Transport;
@@ -154,7 +155,7 @@
}
private void writeStatus(Status status) {
- target.closed(status);
+ target.closed(status, new Metadata.Trailers());
statusDelivered = true;
}
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/ForwardingStreamListener.java b/core/src/main/java/com/google/net/stubby/newtransport/ForwardingStreamListener.java
index 5cd9b2d..42dbf4b 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/ForwardingStreamListener.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/ForwardingStreamListener.java
@@ -1,6 +1,7 @@
package com.google.net.stubby.newtransport;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status;
import java.io.InputStream;
@@ -17,6 +18,11 @@
}
@Override
+ public ListenableFuture<Void> headersRead(Metadata.Headers headers) {
+ return delegate.headersRead(headers);
+ }
+
+ @Override
public ListenableFuture<Void> contextRead(String name, InputStream value, int length) {
return delegate.contextRead(name, value, length);
}
@@ -27,7 +33,7 @@
}
@Override
- public void closed(Status status) {
- delegate.closed(status);
+ public void closed(Status status, Metadata.Trailers trailers) {
+ delegate.closed(status, trailers);
}
}
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/GrpcDeframer.java b/core/src/main/java/com/google/net/stubby/newtransport/GrpcDeframer.java
index c08fa81..512ac90 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/GrpcDeframer.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/GrpcDeframer.java
@@ -9,6 +9,7 @@
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status;
import com.google.net.stubby.transport.Transport;
@@ -255,6 +256,6 @@
*/
private void notifyStatus(Status status) {
statusNotified = true;
- listener.closed(status);
+ listener.closed(status, new Metadata.Trailers());
}
}
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/ServerStream.java b/core/src/main/java/com/google/net/stubby/newtransport/ServerStream.java
index 1e629bb..987400c 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/ServerStream.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/ServerStream.java
@@ -1,8 +1,8 @@
package com.google.net.stubby.newtransport;
+import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status;
-
/**
* Extension of {@link Stream} to support server-side termination semantics.
*/
@@ -14,6 +14,7 @@
* local side of the stream (i.e. half-closed). Any other value implies abnormal termination.
*
* @param status details for the closure of the local-side of this stream.
+ * @param trailers an additional block of headers to pass to the client on stream closure.
*/
- void close(Status status);
+ void close(Status status, Metadata.Trailers trailers);
}
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/ServerTransportListener.java b/core/src/main/java/com/google/net/stubby/newtransport/ServerTransportListener.java
index 7453968..6edfa8a 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/ServerTransportListener.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/ServerTransportListener.java
@@ -1,5 +1,6 @@
package com.google.net.stubby.newtransport;
+import com.google.net.stubby.Metadata;
import com.google.net.stubby.MethodDescriptor;
/**
@@ -12,7 +13,9 @@
*
* @param stream the newly created stream.
* @param method the full method name being called on the server.
+ * @param headers containing metadata for the call.
* @return a listener for events on the new stream.
*/
- StreamListener streamCreated(ServerStream stream, String method);
+ StreamListener streamCreated(ServerStream stream, String method,
+ Metadata.Headers headers);
}
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/Stream.java b/core/src/main/java/com/google/net/stubby/newtransport/Stream.java
index 77dc1cc..e2853fe 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/Stream.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/Stream.java
@@ -33,6 +33,7 @@
* @param length the length of the {@link InputStream}.
* @param accepted an optional callback for when the transport has accepted the write.
*/
+ @Deprecated
void writeContext(String name, InputStream value, int length, @Nullable Runnable accepted);
/**
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/StreamListener.java b/core/src/main/java/com/google/net/stubby/newtransport/StreamListener.java
index 559c368..7a3d71f 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/StreamListener.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/StreamListener.java
@@ -1,6 +1,7 @@
package com.google.net.stubby.newtransport;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status;
import java.io.InputStream;
@@ -14,6 +15,17 @@
public interface StreamListener {
/**
+ * Called upon receiving all header information from the remote end-point.
+ * <p>This method should return quickly, as the same thread may be used to process other streams.
+ *
+ * @param headers the fully buffered received headers.
+ * @return a processing completion future, or {@code null} to indicate that processing of the
+ * headers is immediately complete.
+ */
+ @Nullable
+ ListenableFuture<Void> headersRead(Metadata.Headers headers);
+
+ /**
* Called upon receiving context information from the remote end-point. The {@link InputStream} is
* non-blocking and contains the entire context.
*
@@ -33,6 +45,7 @@
* context is immediately complete.
*/
@Nullable
+ @Deprecated
ListenableFuture<Void> contextRead(String name, InputStream value, int length);
/**
@@ -58,15 +71,9 @@
ListenableFuture<Void> messageRead(InputStream message, int length);
/**
- * Called when the remote side of the transport closed. A status code of
- * {@link com.google.net.stubby.transport.Transport.Code#OK} implies normal termination of the
- * remote side of the stream (i.e. half-closed). Any other value implies abnormal termination. If
- * the remote end-point was abnormally terminated, no further messages will be received on the
- * stream.
- *
- * <p>This method should return quickly, as the same thread may be used to process other streams.
- *
- * @param status details of the remote stream closure.
+ * Called when the remote side of the transport closed.
+ * @param status details about the remote closure
+ * @param trailers that may optionally include status information about call closure.
*/
- void closed(Status status);
+ void closed(Status status, Metadata.Trailers trailers);
}
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/http/HttpClientTransport.java b/core/src/main/java/com/google/net/stubby/newtransport/http/HttpClientTransport.java
index f446166..399840f 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/http/HttpClientTransport.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/http/HttpClientTransport.java
@@ -7,6 +7,7 @@
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.net.stubby.Metadata;
import com.google.net.stubby.MethodDescriptor;
import com.google.net.stubby.Status;
import com.google.net.stubby.newtransport.AbstractClientStream;
@@ -41,9 +42,11 @@
}
@Override
- protected ClientStream newStreamInternal(MethodDescriptor<?, ?> method, StreamListener listener) {
+ protected ClientStream newStreamInternal(MethodDescriptor<?, ?> method,
+ Metadata.Headers headers,
+ StreamListener listener) {
URI uri = baseUri.resolve(method.getName());
- HttpClientStream stream = new HttpClientStream(uri, listener);
+ HttpClientStream stream = new HttpClientStream(uri, headers.serializeAscii(), listener);
synchronized (streams) {
// Check for RUNNING to deal with race condition of this being executed right after doStop
// cancels all the streams.
@@ -80,7 +83,7 @@
final DataOutputStream outputStream;
boolean connected;
- HttpClientStream(URI uri, StreamListener listener) {
+ HttpClientStream(URI uri, String[] headers, StreamListener listener) {
super(listener);
try {
@@ -89,6 +92,9 @@
connection.setDoInput(true);
connection.setRequestMethod(HTTP_METHOD);
connection.setRequestProperty(CONTENT_TYPE_HEADER, CONTENT_TYPE_PROTORPC);
+ for (int i = 0; i < headers.length; i++) {
+ connection.setRequestProperty(headers[i], headers[++i]);
+ }
outputStream = new DataOutputStream(connection.getOutputStream());
connected = true;
} catch (IOException e) {
@@ -99,7 +105,7 @@
@Override
public void cancel() {
outboundPhase = Phase.STATUS;
- if (setStatus(CANCELLED)) {
+ if (setStatus(CANCELLED, new Metadata.Trailers())) {
disconnect();
}
}
@@ -136,7 +142,7 @@
}
}
} catch (IOException ioe) {
- setStatus(new Status(Transport.Code.INTERNAL, ioe));
+ setStatus(new Status(Transport.Code.INTERNAL, ioe), new Metadata.Trailers());
}
}
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/netty/CreateStreamCommand.java b/core/src/main/java/com/google/net/stubby/newtransport/netty/CreateStreamCommand.java
index 2107796..98fea67 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/netty/CreateStreamCommand.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/netty/CreateStreamCommand.java
@@ -8,12 +8,15 @@
* {@link NettyClientHandler} for processing in the Channel thread.
*/
class CreateStreamCommand {
- final MethodDescriptor<?, ?> method;
- final NettyClientStream stream;
+ private final MethodDescriptor<?, ?> method;
+ private final String[] headers;
+ private final NettyClientStream stream;
- CreateStreamCommand(MethodDescriptor<?, ?> method, NettyClientStream stream) {
+ CreateStreamCommand(MethodDescriptor<?, ?> method, String[] headers,
+ NettyClientStream stream) {
this.method = Preconditions.checkNotNull(method, "method");
this.stream = Preconditions.checkNotNull(stream, "stream");
+ this.headers = Preconditions.checkNotNull(headers, "headers");
}
MethodDescriptor<?, ?> method() {
@@ -23,4 +26,8 @@
NettyClientStream stream() {
return stream;
}
+
+ String[] headers() {
+ return headers;
+ }
}
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientHandler.java b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientHandler.java
index 567ece4..cada74e 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientHandler.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientHandler.java
@@ -6,6 +6,7 @@
import static com.google.net.stubby.newtransport.netty.NettyClientStream.PENDING_STREAM_ID;
import com.google.common.base.Preconditions;
+import com.google.net.stubby.Metadata;
import com.google.net.stubby.MethodDescriptor;
import com.google.net.stubby.Status;
import com.google.net.stubby.transport.Transport;
@@ -32,7 +33,6 @@
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Iterator;
-import java.util.Map;
/**
* Client-side Netty handler for GRPC processing. All event handlers are executed entirely within
@@ -46,11 +46,13 @@
*/
private final class PendingStream {
private final MethodDescriptor<?, ?> method;
+ private final String[] headers;
private final NettyClientStream stream;
private final ChannelPromise promise;
public PendingStream(CreateStreamCommand command, ChannelPromise promise) {
method = command.method();
+ headers = command.headers();
stream = command.stream();
this.promise = promise;
}
@@ -155,7 +157,7 @@
// TODO(user): do something with errorCode?
Http2Stream http2Stream = connection().requireStream(streamId);
NettyClientStream stream = clientStream(http2Stream);
- stream.setStatus(new Status(Transport.Code.UNKNOWN));
+ stream.setStatus(new Status(Transport.Code.UNKNOWN), new Metadata.Trailers());
}
/**
@@ -170,7 +172,7 @@
// Any streams that are still active must be closed.
for (Http2Stream stream : http2Streams()) {
- clientStream(stream).setStatus(goAwayStatus);
+ clientStream(stream).setStatus(goAwayStatus, new Metadata.Trailers());
}
}
@@ -194,7 +196,7 @@
// Close the stream with a status that contains the cause.
Http2Stream stream = connection().stream(cause.streamId());
if (stream != null) {
- clientStream(stream).setStatus(Status.fromThrowable(cause));
+ clientStream(stream).setStatus(Status.fromThrowable(cause), new Metadata.Trailers());
}
super.onStreamError(ctx, cause);
}
@@ -217,7 +219,7 @@
private void cancelStream(ChannelHandlerContext ctx, CancelStreamCommand cmd,
ChannelPromise promise) throws Http2Exception {
NettyClientStream stream = cmd.stream();
- stream.setStatus(Status.CANCELLED);
+ stream.setStatus(Status.CANCELLED, new Metadata.Trailers());
// No need to set the stream status for a cancellation. It should already have been
// set prior to sending the command.
@@ -272,7 +274,7 @@
int lastKnownStream = connection().local().lastKnownStream();
for (Http2Stream stream : http2Streams()) {
if (lastKnownStream < stream.id()) {
- clientStream(stream).setStatus(goAwayStatus);
+ clientStream(stream).setStatus(goAwayStatus, new Metadata.Trailers());
stream.close();
}
}
@@ -319,10 +321,12 @@
// Finish creation of the stream by writing a headers frame.
final PendingStream pendingStream = pendingStreams.remove();
// TODO(user): Change Netty to not send priority, just use default.
+ // TODO(user): Switch to binary headers when Netty supports it.
DefaultHttp2Headers.Builder headersBuilder = DefaultHttp2Headers.newBuilder();
- // Add custom headers from the method descriptor
- for (Map.Entry<String, String> entry : pendingStream.method.getHeaders().entrySet()) {
- headersBuilder.add(entry.getKey(), entry.getValue());
+ for (int i = 0; i < pendingStream.headers.length; i++) {
+ headersBuilder.add(
+ pendingStream.headers[i],
+ pendingStream.headers[++i]);
}
headersBuilder
.method(HTTP_METHOD)
@@ -410,7 +414,8 @@
case RESERVED_REMOTE:
// Disallowed state, terminate the stream.
clientStream(stream).setStatus(
- new Status(Transport.Code.INTERNAL, "Stream in invalid state: " + stream.state()));
+ new Status(Transport.Code.INTERNAL, "Stream in invalid state: " + stream.state()),
+ new Metadata.Trailers());
writeRstStream(ctx(), stream.id(), Http2Error.INTERNAL_ERROR.code(), ctx().newPromise());
break;
default:
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientStream.java b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientStream.java
index 1460ee5..62b7196 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientStream.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientStream.java
@@ -5,6 +5,7 @@
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status;
import com.google.net.stubby.newtransport.AbstractClientStream;
import com.google.net.stubby.newtransport.GrpcDeframer;
@@ -73,7 +74,7 @@
responseCode = responseCode(headers);
isGrpcResponse = isGrpcResponse(headers, responseCode);
if (!isGrpcResponse && endOfStream) {
- setStatus(new Status(responseCode));
+ setStatus(new Status(responseCode), new Metadata.Trailers());
}
}
@@ -102,7 +103,7 @@
if (endOfStream) {
String msg = nonGrpcErrorMessage.toString();
- setStatus(new Status(responseCode, msg));
+ setStatus(new Status(responseCode, msg), new Metadata.Trailers());
}
}
}
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientTransport.java b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientTransport.java
index 80c34ff..b23dd5d 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientTransport.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientTransport.java
@@ -3,6 +3,7 @@
import static io.netty.channel.ChannelOption.SO_KEEPALIVE;
import com.google.common.base.Preconditions;
+import com.google.net.stubby.Metadata;
import com.google.net.stubby.MethodDescriptor;
import com.google.net.stubby.newtransport.AbstractClientTransport;
import com.google.net.stubby.newtransport.ClientStream;
@@ -11,11 +12,6 @@
import com.google.net.stubby.newtransport.netty.NettyClientTransportFactory.NegotiationType;
import com.google.net.stubby.testing.utils.ssl.SslContextFactory;
-import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
-
-import io.netty.handler.codec.http2.Http2InboundFrameLogger;
-import io.netty.util.internal.logging.InternalLogLevel;
-import io.netty.handler.codec.http2.Http2FrameLogger;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
@@ -30,9 +26,13 @@
import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController;
import io.netty.handler.codec.http2.DefaultHttp2StreamRemovalPolicy;
import io.netty.handler.codec.http2.Http2Connection;
+import io.netty.handler.codec.http2.Http2FrameLogger;
import io.netty.handler.codec.http2.Http2FrameReader;
import io.netty.handler.codec.http2.Http2FrameWriter;
+import io.netty.handler.codec.http2.Http2InboundFrameLogger;
import io.netty.handler.codec.http2.Http2OutboundFlowController;
+import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
+import io.netty.util.internal.logging.InternalLogLevel;
import java.util.concurrent.ExecutionException;
@@ -83,13 +83,17 @@
}
@Override
- protected ClientStream newStreamInternal(MethodDescriptor<?, ?> method, StreamListener listener) {
+ protected ClientStream newStreamInternal(MethodDescriptor<?, ?> method,
+ Metadata.Headers headers,
+ StreamListener listener) {
// Create the stream.
NettyClientStream stream = new NettyClientStream(listener, channel, handler.inboundFlow());
try {
// Write the request and await creation of the stream.
- channel.writeAndFlush(new CreateStreamCommand(method, stream)).get();
+ channel.writeAndFlush(new CreateStreamCommand(method,
+ headers.serializeAscii(),
+ stream)).get();
} catch (InterruptedException e) {
// Restore the interrupt.
Thread.currentThread().interrupt();
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyServerHandler.java b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyServerHandler.java
index e857a6c..fddaee0 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyServerHandler.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyServerHandler.java
@@ -5,27 +5,27 @@
import static com.google.net.stubby.newtransport.HttpUtil.HTTP_METHOD;
import com.google.common.base.Preconditions;
-import com.google.net.stubby.MethodDescriptor;
+import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status;
import com.google.net.stubby.newtransport.ServerTransportListener;
import com.google.net.stubby.newtransport.StreamListener;
import com.google.net.stubby.newtransport.TransportFrameUtil;
import com.google.net.stubby.transport.Transport;
-import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController;
-import io.netty.handler.codec.http2.Http2FrameReader;
-import io.netty.handler.codec.http2.Http2FrameWriter;
-import io.netty.handler.codec.http2.Http2OutboundFlowController;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.AbstractHttp2ConnectionHandler;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
+import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2ConnectionAdapter;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Exception;
+import io.netty.handler.codec.http2.Http2FrameReader;
+import io.netty.handler.codec.http2.Http2FrameWriter;
import io.netty.handler.codec.http2.Http2Headers;
+import io.netty.handler.codec.http2.Http2OutboundFlowController;
import io.netty.handler.codec.http2.Http2Stream;
import io.netty.handler.codec.http2.Http2StreamException;
import io.netty.util.ReferenceCountUtil;
@@ -83,7 +83,8 @@
Http2Stream http2Stream = connection().requireStream(streamId);
http2Stream.data(stream);
String method = determineMethod(streamId, headers);
- StreamListener listener = transportListener.streamCreated(stream, method);
+ StreamListener listener = transportListener.streamCreated(stream, method,
+ new Metadata.Headers(headers));
stream.setListener(listener);
} catch (Http2Exception e) {
throw e;
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/okhttp/Headers.java b/core/src/main/java/com/google/net/stubby/newtransport/okhttp/Headers.java
index 1775f7b..f63640a 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/okhttp/Headers.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/okhttp/Headers.java
@@ -4,6 +4,8 @@
import com.squareup.okhttp.internal.spdy.Header;
+import okio.ByteString;
+
import java.util.List;
/**
@@ -15,12 +17,15 @@
new Header("content-type", "application/protorpc");
public static final Header RESPONSE_STATUS_OK = new Header(Header.RESPONSE_STATUS, "200");
- public static List<Header> createRequestHeaders(String operationName) {
- List<Header> headers = Lists.newArrayListWithCapacity(6);
- headers.add(new Header(Header.TARGET_PATH, operationName));
- headers.add(SCHEME_HEADER);
- headers.add(CONTENT_TYPE_HEADER);
- return headers;
+ public static List<Header> createRequestHeaders(String operationName, byte[][] headers) {
+ List<Header> okhttpHeaders = Lists.newArrayListWithCapacity(6);
+ okhttpHeaders.add(new Header(Header.TARGET_PATH, operationName));
+ okhttpHeaders.add(SCHEME_HEADER);
+ okhttpHeaders.add(CONTENT_TYPE_HEADER);
+ for (int i = 0; i < headers.length; i++) {
+ okhttpHeaders.add(new Header(ByteString.of(headers[i]), ByteString.of(headers[++i])));
+ }
+ return okhttpHeaders;
}
public static List<Header> createResponseHeaders() {
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/okhttp/OkHttpClientTransport.java b/core/src/main/java/com/google/net/stubby/newtransport/okhttp/OkHttpClientTransport.java
index 22350b6..4d94ac1 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/okhttp/OkHttpClientTransport.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/okhttp/OkHttpClientTransport.java
@@ -4,6 +4,7 @@
import com.google.common.base.Preconditions;
import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.net.stubby.Metadata;
import com.google.net.stubby.MethodDescriptor;
import com.google.net.stubby.Status;
import com.google.net.stubby.newtransport.AbstractClientStream;
@@ -87,7 +88,7 @@
private final int port;
private FrameReader frameReader;
private AsyncFrameWriter frameWriter;
- private Object lock = new Object();
+ private final Object lock = new Object();
@GuardedBy("lock")
private int nextStreamId;
private final Map<Integer, OkHttpClientStream> streams =
@@ -125,8 +126,10 @@
}
@Override
- protected ClientStream newStreamInternal(MethodDescriptor<?, ?> method, StreamListener listener) {
- return new OkHttpClientStream(method, listener);
+ protected ClientStream newStreamInternal(MethodDescriptor<?, ?> method,
+ Metadata.Headers headers,
+ StreamListener listener) {
+ return new OkHttpClientStream(method, headers.serialize(), listener);
}
@Override
@@ -204,7 +207,7 @@
stopAsync();
for (OkHttpClientStream stream : goAwayStreams) {
- stream.setStatus(status);
+ stream.setStatus(status, new Metadata.Trailers());
}
}
@@ -218,7 +221,7 @@
stream = streams.remove(streamId);
if (stream != null) {
// This is mainly for failed streams, for successfully finished streams, it's a no-op.
- stream.setStatus(status);
+ stream.setStatus(status, new Metadata.Trailers());
return true;
}
return false;
@@ -398,18 +401,18 @@
final InputStreamDeframer deframer;
int unacknowledgedBytesRead;
- OkHttpClientStream(MethodDescriptor<?, ?> method, StreamListener listener) {
+ OkHttpClientStream(MethodDescriptor<?, ?> method, byte[][] headers, StreamListener listener) {
super(listener);
deframer = new InputStreamDeframer(inboundMessageHandler());
synchronized (lock) {
if (goAway) {
- setStatus(goAwayStatus);
+ setStatus(goAwayStatus, new Metadata.Trailers());
return;
}
assignStreamId(this);
}
frameWriter.synStream(false, false, streamId, 0,
- Headers.createRequestHeaders(method.getName()));
+ Headers.createRequestHeaders(method.getName(), headers));
}
InputStreamDeframer getDeframer() {
diff --git a/core/src/test/java/com/google/net/stubby/MetadataTest.java b/core/src/test/java/com/google/net/stubby/MetadataTest.java
new file mode 100644
index 0000000..168c539
--- /dev/null
+++ b/core/src/test/java/com/google/net/stubby/MetadataTest.java
@@ -0,0 +1,123 @@
+package com.google.net.stubby;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+
+/**
+ * Tests for {@link Metadata}
+ */
+@RunWith(JUnit4.class)
+public class MetadataTest {
+
+ private static final Metadata.Marshaller<Fish> FISH_MARSHALLER =
+ new Metadata.Marshaller<Fish>() {
+ @Override
+ public byte[] toBytes(Fish fish) {
+ return fish.name.getBytes(StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public String toAscii(Fish value) {
+ return value.name;
+ }
+
+ @Override
+ public Fish parseBytes(byte[] serialized) {
+ return new Fish(new String(serialized, StandardCharsets.UTF_8));
+ }
+
+ @Override
+ public Fish parseAscii(String ascii) {
+ return new Fish(ascii);
+ }
+ };
+
+ private static final String LANCE = "lance";
+ private static final byte[] LANCE_BYTES = LANCE.getBytes(StandardCharsets.US_ASCII);
+ private static final Metadata.Key<Fish> KEY = new Metadata.Key<Fish>("test", FISH_MARSHALLER);
+
+ @Test
+ public void testWriteParsed() {
+ Fish lance = new Fish(LANCE);
+ Metadata.Headers metadata = new Metadata.Headers();
+ metadata.put(KEY, lance);
+ // Should be able to read same instance out
+ assertSame(lance, metadata.get(KEY));
+ Iterator<Fish> fishes = metadata.<Fish>getAll(KEY).iterator();
+ assertTrue(fishes.hasNext());
+ assertSame(fishes.next(), lance);
+ assertFalse(fishes.hasNext());
+ byte[][] serialized = metadata.serialize();
+ assertEquals(2, serialized.length);
+ assertEquals(new String(serialized[0], StandardCharsets.US_ASCII), "test");
+ assertArrayEquals(LANCE_BYTES, serialized[1]);
+ assertSame(lance, metadata.get(KEY));
+ // Serialized instance should be cached too
+ assertSame(serialized[0], metadata.serialize()[0]);
+ assertSame(serialized[1], metadata.serialize()[1]);
+ }
+
+ @Test
+ public void testWriteRaw() {
+ Metadata.Headers raw = new Metadata.Headers(
+ KEY.asciiName(), LANCE_BYTES);
+ Fish lance = raw.get(KEY);
+ assertEquals(lance, new Fish(LANCE));
+ // Reading again should return the same parsed instance
+ assertSame(lance, raw.get(KEY));
+ }
+
+ @Test
+ public void testFailSerializeRaw() {
+ Metadata.Headers raw = new Metadata.Headers(
+ KEY.asciiName(), LANCE_BYTES);
+
+ try {
+ raw.serialize();
+ fail("Can't serialize raw metadata");
+ } catch (IllegalStateException ise) {
+ // Success
+ }
+ }
+
+ @Test
+ public void testFailMergeRawIntoSerializable() {
+ Metadata.Headers raw = new Metadata.Headers(
+ KEY.asciiName(), LANCE_BYTES);
+ Metadata.Headers serializable = new Metadata.Headers();
+ try {
+ serializable.merge(raw);
+ fail("Can't serialize raw metadata");
+ } catch (IllegalArgumentException iae) {
+ // Success
+ }
+ }
+
+ private static class Fish {
+ private String name;
+
+ private Fish(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Fish fish = (Fish) o;
+ if (name != null ? !name.equals(fish.name) : fish.name != null) return false;
+ return true;
+ }
+ }
+}
diff --git a/core/src/test/java/com/google/net/stubby/context/ContextExchangeChannelTest.java b/core/src/test/java/com/google/net/stubby/context/ContextExchangeChannelTest.java
deleted file mode 100644
index 58b8a74..0000000
--- a/core/src/test/java/com/google/net/stubby/context/ContextExchangeChannelTest.java
+++ /dev/null
@@ -1,133 +0,0 @@
-package com.google.net.stubby.context;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.mockito.Mockito.*;
-
-import com.google.common.util.concurrent.SettableFuture;
-import com.google.net.stubby.Call;
-import com.google.net.stubby.Channel;
-import com.google.net.stubby.MethodDescriptor;
-import com.google.net.stubby.Status;
-import com.google.net.stubby.stub.Marshallers;
-import com.google.net.stubby.testing.integration.Messages.Payload;
-import com.google.net.stubby.testing.integration.Messages.PayloadType;
-import com.google.net.stubby.testing.integration.Messages.SimpleRequest;
-import com.google.net.stubby.testing.integration.Messages.SimpleResponse;
-import com.google.net.stubby.testing.integration.TestServiceGrpc;
-import com.google.protobuf.ByteString;
-
-import org.hamcrest.BaseMatcher;
-import org.hamcrest.Description;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import javax.inject.Provider;
-
-/**
- * Tests for {@link ContextExchangeChannel}
- */
-@RunWith(JUnit4.class)
-public class ContextExchangeChannelTest {
-
- private static final SimpleRequest REQ = SimpleRequest.newBuilder().setPayload(
- Payload.newBuilder().setPayloadCompressable("mary").
- setPayloadType(PayloadType.COMPRESSABLE).build())
- .build();
-
- private static final SimpleResponse RESP = SimpleResponse.newBuilder().setPayload(
- Payload.newBuilder().setPayloadCompressable("bob").
- setPayloadType(PayloadType.COMPRESSABLE).build())
- .build();
-
- @Mock
- Channel channel;
-
- @Mock
- Call call;
-
- @Before @SuppressWarnings("unchecked")
- public void setup() {
- MockitoAnnotations.initMocks(this);
- when(channel.newCall(Mockito.any(MethodDescriptor.class))).thenReturn(call);
- }
-
- @Test
- public void testReceive() throws Exception {
- ContextExchangeChannel exchange = new ContextExchangeChannel(channel);
- Provider<SimpleResponse> auth =
- exchange.receive("auth", Marshallers.forProto(SimpleResponse.PARSER));
- // Should be null, nothing has happened
- assertNull(auth.get());
- TestServiceGrpc.TestServiceBlockingStub stub =
- TestServiceGrpc.newBlockingStub(exchange);
- callStub(stub);
- assertEquals(RESP, auth.get());
- exchange.clearLastReceived();
- assertNull(auth.get());
- }
-
- @Test @SuppressWarnings("unchecked")
- public void testSend() throws Exception {
- ContextExchangeChannel exchange = new ContextExchangeChannel(channel);
- exchange.send("auth", RESP, Marshallers.forProto(SimpleResponse.PARSER));
- TestServiceGrpc.TestServiceBlockingStub stub =
- TestServiceGrpc.newBlockingStub(exchange);
- callStub(stub);
- verify(call).sendContext(eq("auth"),
- argThat(new BaseMatcher<InputStream>() {
- @Override
- public boolean matches(Object o) {
- try {
- // Just check the length, consuming the stream will fail the test and Mockito
- // calls this more than once.
- return ((InputStream) o).available() == RESP.getSerializedSize();
- } catch (IOException ioe) {
- throw new RuntimeException(ioe);
- }
- }
-
- @Override
- public void describeTo(Description description) {
- }
- }), (SettableFuture<Void>) isNull());
- }
-
- @SuppressWarnings("unchecked")
- private void callStub(final TestServiceGrpc.TestServiceBlockingStub stub) throws Exception {
- when(channel.newCall(Mockito.<MethodDescriptor>any())).thenReturn(call);
-
- // execute the call in another thread so we don't deadlock waiting for the
- // listener.onClose
- Future<?> pending = Executors.newSingleThreadExecutor().submit(new Runnable() {
- @Override
- public void run() {
- stub.unaryCall(REQ);
- }
- });
- ArgumentCaptor<Call.Listener> listenerCapture = ArgumentCaptor.forClass(Call.Listener.class);
- // Wait for the call to start to capture the listener
- verify(call, timeout(1000)).start(listenerCapture.capture());
-
- ByteString response = RESP.toByteString();
- Call.Listener listener = listenerCapture.getValue();
- // Respond with a context-value
- listener.onContext("auth", response.newInput());
- listener.onContext("something-else", response.newInput());
- // .. and single payload
- listener.onPayload(RESP);
- listener.onClose(Status.OK);
- pending.get();
- }
-}
diff --git a/core/src/test/java/com/google/net/stubby/newtransport/GrpcDeframerTest.java b/core/src/test/java/com/google/net/stubby/newtransport/GrpcDeframerTest.java
index e39d80f..24d4f0d 100644
--- a/core/src/test/java/com/google/net/stubby/newtransport/GrpcDeframerTest.java
+++ b/core/src/test/java/com/google/net/stubby/newtransport/GrpcDeframerTest.java
@@ -9,6 +9,7 @@
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.notNull;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -16,6 +17,7 @@
import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
+import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status;
import com.google.net.stubby.transport.Transport;
import com.google.protobuf.ByteString;
@@ -228,7 +230,7 @@
private void verifyStatus(Transport.Code code) {
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
- verify(listener).closed(captor.capture());
+ verify(listener).closed(captor.capture(), notNull(Metadata.Trailers.class));
assertEquals(code, captor.getValue().getCode());
}
@@ -241,7 +243,7 @@
}
private void verifyNoStatus() {
- verify(listener, never()).closed(any(Status.class));
+ verify(listener, never()).closed(any(Status.class), notNull(Metadata.Trailers.class));
}
private byte[] contextFrame() throws IOException {
diff --git a/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyClientHandlerTest.java b/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyClientHandlerTest.java
index 33946b5..9c7ce95 100644
--- a/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyClientHandlerTest.java
+++ b/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyClientHandlerTest.java
@@ -7,28 +7,20 @@
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.notNull;
import static org.mockito.Mockito.calls;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import com.google.common.collect.ImmutableMap;
+import com.google.net.stubby.Metadata;
import com.google.net.stubby.MethodDescriptor;
import com.google.net.stubby.Status;
import com.google.net.stubby.newtransport.HttpUtil;
import com.google.net.stubby.newtransport.StreamState;
import com.google.net.stubby.transport.Transport;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.ArgumentCaptor;
-import org.mockito.InOrder;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
@@ -47,6 +39,15 @@
import io.netty.handler.codec.http2.Http2OutboundFlowController;
import io.netty.handler.codec.http2.Http2Settings;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
/**
* Tests for {@link NettyClientHandler}.
*/
@@ -63,6 +64,7 @@
@Mock
private MethodDescriptor<?, ?> method;
private ByteBuf content;
+ private Metadata.Headers grpcHeaders;
@Before
public void setup() throws Exception {
@@ -77,8 +79,11 @@
mockContext();
mockFuture(true);
+ Metadata.Key key = new Metadata.Key("auth", Metadata.STRING_MARSHALLER);
+ grpcHeaders = new Metadata.Headers();
+ grpcHeaders.put(key, "sometoken");
+
when(method.getName()).thenReturn("fakemethod");
- when(method.getHeaders()).thenReturn(ImmutableMap.of("auth", "sometoken"));
when(stream.state()).thenReturn(StreamState.OPEN);
// Simulate activation of the handler to force writing of the initial settings
@@ -95,7 +100,8 @@
@Test
public void createStreamShouldSucceed() throws Exception {
- handler.write(ctx, new CreateStreamCommand(method, stream), promise);
+ handler.write(ctx, new CreateStreamCommand(method, grpcHeaders.serializeAscii(), stream),
+ promise);
verify(promise).setSuccess();
verify(stream).id(eq(3));
@@ -184,7 +190,8 @@
public void createShouldQueueStream() throws Exception {
// Disallow stream creation to force the stream to get added to the pending queue.
setMaxConcurrentStreams(0);
- handler.write(ctx, new CreateStreamCommand(method, stream), promise);
+ handler.write(ctx, new CreateStreamCommand(method, grpcHeaders.serializeAscii(), stream),
+ promise);
// Make sure the write never occurred.
verify(frameListener, never()).onHeadersRead(eq(ctx),
@@ -201,7 +208,8 @@
public void receivedGoAwayShouldFailQueuedStreams() throws Exception {
// Force a stream to get added to the pending queue.
setMaxConcurrentStreams(0);
- handler.write(ctx, new CreateStreamCommand(method, stream), promise);
+ handler.write(ctx, new CreateStreamCommand(method, grpcHeaders.serializeAscii(), stream),
+ promise);
handler.channelRead(ctx, goAwayFrame(0));
verify(promise).setFailure(any(Throwable.class));
@@ -210,13 +218,14 @@
@Test
public void receivedGoAwayShouldFailUnknownStreams() throws Exception {
// Force a stream to get added to the pending queue.
- handler.write(ctx, new CreateStreamCommand(method, stream), promise);
+ handler.write(ctx, new CreateStreamCommand(method, grpcHeaders.serializeAscii(), stream),
+ promise);
// Read a GOAWAY that indicates our stream was never processed by the server.
handler.channelRead(ctx, goAwayFrame(0));
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
InOrder inOrder = inOrder(stream);
- inOrder.verify(stream, calls(1)).setStatus(captor.capture());
+ inOrder.verify(stream, calls(1)).setStatus(captor.capture(), notNull(Metadata.Trailers.class));
assertEquals(Transport.Code.UNAVAILABLE, captor.getValue().getCode());
}
@@ -237,7 +246,8 @@
private void createStream() throws Exception {
// Create the stream.
- handler.write(ctx, new CreateStreamCommand(method, stream), promise);
+ handler.write(ctx, new CreateStreamCommand(method, grpcHeaders.serializeAscii(), stream),
+ promise);
when(stream.id()).thenReturn(3);
// Reset the context mock to clear recording of sent headers frame.
mockContext();
diff --git a/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyClientStreamTest.java b/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyClientStreamTest.java
index 9df1d69..1e7f767 100644
--- a/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyClientStreamTest.java
+++ b/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyClientStreamTest.java
@@ -4,22 +4,24 @@
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
import static org.mockito.Mockito.verify;
+import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status;
import com.google.net.stubby.newtransport.HttpUtil;
import com.google.net.stubby.newtransport.StreamState;
import com.google.net.stubby.transport.Transport;
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.http2.DefaultHttp2Headers;
+import io.netty.handler.codec.http2.Http2Headers;
+
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
-import io.netty.buffer.Unpooled;
-import io.netty.handler.codec.http2.DefaultHttp2Headers;
-import io.netty.handler.codec.http2.Http2Headers;
-
/**
* Tests for {@link NettyClientStream}.
*/
@@ -63,34 +65,35 @@
@Test
public void setStatusWithOkShouldCloseStream() {
stream().id(1);
- stream().setStatus(Status.OK);
- verify(listener).closed(Status.OK);
+ stream().setStatus(Status.OK, new Metadata.Trailers());
+ verify(listener).closed(same(Status.OK), any(Metadata.Trailers.class));
assertEquals(StreamState.CLOSED, stream.state());
}
@Test
public void setStatusWithErrorShouldCloseStream() {
Status errorStatus = new Status(Transport.Code.INTERNAL);
- stream().setStatus(errorStatus);
- verify(listener).closed(eq(errorStatus));
+ stream().setStatus(errorStatus, new Metadata.Trailers());
+ verify(listener).closed(eq(errorStatus), any(Metadata.Trailers.class));
assertEquals(StreamState.CLOSED, stream.state());
}
@Test
public void setStatusWithOkShouldNotOverrideError() {
Status errorStatus = new Status(Transport.Code.INTERNAL);
- stream().setStatus(errorStatus);
- stream().setStatus(Status.OK);
- verify(listener).closed(any(Status.class));
+ stream().setStatus(errorStatus, new Metadata.Trailers());
+ stream().setStatus(Status.OK, new Metadata.Trailers());
+ verify(listener).closed(any(Status.class), any(Metadata.Trailers.class));
assertEquals(StreamState.CLOSED, stream.state());
}
@Test
public void setStatusWithErrorShouldNotOverridePreviousError() {
Status errorStatus = new Status(Transport.Code.INTERNAL);
- stream().setStatus(errorStatus);
- stream().setStatus(Status.fromThrowable(new RuntimeException("fake")));
- verify(listener).closed(any(Status.class));
+ stream().setStatus(errorStatus, new Metadata.Trailers());
+ stream().setStatus(Status.fromThrowable(new RuntimeException("fake")),
+ new Metadata.Trailers());
+ verify(listener).closed(any(Status.class), any(Metadata.Trailers.class));
assertEquals(StreamState.CLOSED, stream.state());
}
@@ -123,7 +126,7 @@
stream.inboundDataReceived(statusFrame(new Status(Transport.Code.INTERNAL)), false);
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
- verify(listener).closed(captor.capture());
+ verify(listener).closed(captor.capture(), any(Metadata.Trailers.class));
assertEquals(Transport.Code.INTERNAL, captor.getValue().getCode());
assertEquals(StreamState.CLOSED, stream.state());
}
@@ -132,7 +135,7 @@
public void nonGrpcResponseShouldSetStatus() throws Exception {
stream.inboundDataReceived(Unpooled.copiedBuffer(MESSAGE, UTF_8), true);
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
- verify(listener).closed(captor.capture());
+ verify(listener).closed(captor.capture(), any(Metadata.Trailers.class));
assertEquals(MESSAGE, captor.getValue().getDescription());
}
diff --git a/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyServerHandlerTest.java b/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyServerHandlerTest.java
index 1aa20dc..75ed68f 100644
--- a/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyServerHandlerTest.java
+++ b/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyServerHandlerTest.java
@@ -6,12 +6,15 @@
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.notNull;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import com.google.common.io.ByteStreams;
+import com.google.net.stubby.Metadata;
+import com.google.net.stubby.MethodDescriptor;
import com.google.net.stubby.Status;
import com.google.net.stubby.newtransport.Framer;
import com.google.net.stubby.newtransport.HttpUtil;
@@ -28,9 +31,15 @@
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
+import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController;
+import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController;
import io.netty.handler.codec.http2.Http2CodecUtil;
+import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2Error;
+import io.netty.handler.codec.http2.Http2FrameReader;
+import io.netty.handler.codec.http2.Http2FrameWriter;
import io.netty.handler.codec.http2.Http2Headers;
+import io.netty.handler.codec.http2.Http2OutboundFlowController;
import io.netty.handler.codec.http2.Http2Settings;
import org.junit.Before;
@@ -41,13 +50,6 @@
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
-import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController;
-import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController;
-import io.netty.handler.codec.http2.Http2Connection;
-import io.netty.handler.codec.http2.Http2FrameReader;
-import io.netty.handler.codec.http2.Http2FrameWriter;
-import io.netty.handler.codec.http2.Http2OutboundFlowController;
-
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.nio.ByteBuffer;
@@ -73,7 +75,9 @@
public void setup() throws Exception {
MockitoAnnotations.initMocks(this);
- when(transportListener.streamCreated(any(ServerStream.class), any(String.class)))
+ when(transportListener.streamCreated(any(ServerStream.class),
+ any(String.class),
+ any(Metadata.Headers.class)))
.thenReturn(streamListener);
handler = newHandler(transportListener);
frameWriter = new DefaultHttp2FrameWriter();
@@ -132,7 +136,7 @@
assertArrayEquals(CONTENT, ByteStreams.toByteArray(captor.getValue()));
if (endStream) {
- verify(streamListener).closed(eq(Status.OK));
+ verify(streamListener).closed(eq(Status.OK), notNull(Metadata.Trailers.class));
}
verifyNoMoreInteractions(streamListener);
}
@@ -143,7 +147,7 @@
handler.channelRead(ctx, emptyDataFrame(STREAM_ID, true));
verify(streamListener, never()).messageRead(any(InputStream.class), anyInt());
- verify(streamListener).closed(eq(Status.OK));
+ verify(streamListener).closed(eq(Status.OK), notNull(Metadata.Trailers.class));
verifyNoMoreInteractions(streamListener);
}
@@ -153,7 +157,7 @@
handler.channelRead(ctx, rstStreamFrame(STREAM_ID, Http2Error.CANCEL.code()));
verify(streamListener, never()).messageRead(any(InputStream.class), anyInt());
- verify(streamListener).closed(eq(Status.CANCELLED));
+ verify(streamListener).closed(eq(Status.CANCELLED), notNull(Metadata.Trailers.class));
verifyNoMoreInteractions(streamListener);
}
@@ -169,7 +173,8 @@
ArgumentCaptor.forClass(NettyServerStream.class);
@SuppressWarnings("rawtypes")
ArgumentCaptor<String> methodCaptor = ArgumentCaptor.forClass(String.class);
- verify(transportListener).streamCreated(streamCaptor.capture(), methodCaptor.capture());
+ verify(transportListener).streamCreated(streamCaptor.capture(), methodCaptor.capture(),
+ any(Metadata.Headers.class));
stream = streamCaptor.getValue();
}
diff --git a/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyServerStreamTest.java b/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyServerStreamTest.java
index 980c54b..52871b0 100644
--- a/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyServerStreamTest.java
+++ b/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyServerStreamTest.java
@@ -2,11 +2,15 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.notNull;
+import static org.mockito.Matchers.same;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions;
+import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status;
import com.google.net.stubby.newtransport.StreamState;
import com.google.net.stubby.transport.Transport;
@@ -40,7 +44,7 @@
@Test
public void closeBeforeClientHalfCloseShouldFail() {
try {
- stream().close(Status.OK);
+ stream().close(Status.OK, new Metadata.Trailers());
fail("Should throw exception");
} catch (IllegalStateException expected) {
}
@@ -50,7 +54,7 @@
@Test
public void closeWithErrorBeforeClientHalfCloseShouldSucceed() throws Exception {
- stream().close(Status.CANCELLED);
+ stream().close(Status.CANCELLED, new Metadata.Trailers());
assertEquals(StreamState.CLOSED, stream.state());
verify(channel).writeAndFlush(
new SendGrpcFrameCommand(STREAM_ID, statusFrame(Status.CANCELLED), true));
@@ -62,9 +66,9 @@
// Client half-closes. Listener gets closed()
stream().remoteEndClosed();
assertEquals(StreamState.WRITE_ONLY, stream.state());
- verify(listener).closed(Status.OK);
+ verify(listener).closed(eq(Status.OK), notNull(Metadata.Trailers.class));
// Server closes. Status sent.
- stream().close(Status.OK);
+ stream().close(Status.OK, new Metadata.Trailers());
assertEquals(StreamState.CLOSED, stream.state());
verify(channel).writeAndFlush(
new SendGrpcFrameCommand(STREAM_ID, statusFrame(Status.OK), true));
@@ -76,7 +80,7 @@
// Client half-closes. Listener gets closed()
stream().remoteEndClosed();
assertEquals(StreamState.WRITE_ONLY, stream.state());
- verify(listener).closed(Status.OK);
+ verify(listener).closed(eq(Status.OK), notNull(Metadata.Trailers.class));
// Client half-closes again. Stream will be aborted with an error.
stream().remoteEndClosed();
assertEquals(StreamState.CLOSED, stream.state());
@@ -91,7 +95,7 @@
Status status = new Status(Transport.Code.INTERNAL, new Throwable());
stream().abortStream(status, true);
assertEquals(StreamState.CLOSED, stream.state());
- verify(listener).closed(status);
+ verify(listener).closed(same(status), notNull(Metadata.Trailers.class));
verify(channel).writeAndFlush(new SendGrpcFrameCommand(STREAM_ID, statusFrame(status), true));
verifyNoMoreInteractions(listener);
}
@@ -101,7 +105,7 @@
Status status = new Status(Transport.Code.INTERNAL, new Throwable());
stream().abortStream(status, false);
assertEquals(StreamState.CLOSED, stream.state());
- verify(listener).closed(status);
+ verify(listener).closed(same(status), notNull(Metadata.Trailers.class));
verify(channel, never()).writeAndFlush(
new SendGrpcFrameCommand(STREAM_ID, statusFrame(status), true));
verifyNoMoreInteractions(listener);
@@ -113,7 +117,7 @@
// Client half-closes. Listener gets closed()
stream().remoteEndClosed();
assertEquals(StreamState.WRITE_ONLY, stream.state());
- verify(listener).closed(Status.OK);
+ verify(listener).closed(same(Status.OK), notNull(Metadata.Trailers.class));
// Abort
stream().abortStream(status, true);
assertEquals(StreamState.CLOSED, stream.state());
diff --git a/core/src/test/java/com/google/net/stubby/newtransport/okhttp/OkHttpClientTransportTest.java b/core/src/test/java/com/google/net/stubby/newtransport/okhttp/OkHttpClientTransportTest.java
index e0465ff..1407b07 100644
--- a/core/src/test/java/com/google/net/stubby/newtransport/okhttp/OkHttpClientTransportTest.java
+++ b/core/src/test/java/com/google/net/stubby/newtransport/okhttp/OkHttpClientTransportTest.java
@@ -12,6 +12,7 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
+import com.google.net.stubby.Metadata;
import com.google.net.stubby.MethodDescriptor;
import com.google.net.stubby.Status;
import com.google.net.stubby.newtransport.StreamListener;
@@ -104,8 +105,8 @@
public void nextFrameThrowIOException() throws Exception {
MockStreamListener listener1 = new MockStreamListener();
MockStreamListener listener2 = new MockStreamListener();
- clientTransport.newStream(method, listener1);
- clientTransport.newStream(method, listener2);
+ clientTransport.newStream(method, new Metadata.Headers(), listener1);
+ clientTransport.newStream(method, new Metadata.Headers(), listener2);
assertEquals(2, streams.size());
assertTrue(streams.containsKey(3));
assertTrue(streams.containsKey(5));
@@ -126,7 +127,7 @@
final int numMessages = 10;
final String message = "Hello Client";
MockStreamListener listener = new MockStreamListener();
- clientTransport.newStream(method, listener);
+ clientTransport.newStream(method, new Metadata.Headers(), listener);
assertTrue(streams.containsKey(3));
for (int i = 0; i < numMessages; i++) {
BufferedSource source = mock(BufferedSource.class);
@@ -148,7 +149,7 @@
final String key = "KEY";
final String value = "value";
MockStreamListener listener = new MockStreamListener();
- clientTransport.newStream(method, listener);
+ clientTransport.newStream(method,new Metadata.Headers(), listener);
assertTrue(streams.containsKey(3));
for (int i = 0; i < numContexts; i++) {
BufferedSource source = mock(BufferedSource.class);
@@ -169,7 +170,7 @@
@Test
public void readStatus() throws Exception {
MockStreamListener listener = new MockStreamListener();
- clientTransport.newStream(method, listener);
+ clientTransport.newStream(method,new Metadata.Headers(), listener);
assertTrue(streams.containsKey(3));
BufferedSource source = mock(BufferedSource.class);
InputStream inputStream = createStatusFrame((short) Transport.Code.UNAVAILABLE.getNumber());
@@ -182,7 +183,7 @@
@Test
public void receiveReset() throws Exception {
MockStreamListener listener = new MockStreamListener();
- clientTransport.newStream(method, listener);
+ clientTransport.newStream(method,new Metadata.Headers(), listener);
assertTrue(streams.containsKey(3));
frameHandler.rstStream(3, ErrorCode.PROTOCOL_ERROR);
listener.waitUntilStreamClosed();
@@ -192,7 +193,7 @@
@Test
public void cancelStream() throws Exception {
MockStreamListener listener = new MockStreamListener();
- clientTransport.newStream(method, listener);
+ clientTransport.newStream(method,new Metadata.Headers(), listener);
OkHttpClientStream stream = streams.get(3);
assertNotNull(stream);
stream.cancel();
@@ -205,7 +206,7 @@
public void writeMessage() throws Exception {
final String message = "Hello Server";
MockStreamListener listener = new MockStreamListener();
- clientTransport.newStream(method, listener);
+ clientTransport.newStream(method,new Metadata.Headers(), listener);
OkHttpClientStream stream = streams.get(3);
InputStream input = new ByteArrayInputStream(message.getBytes(StandardCharsets.UTF_8));
stream.writeMessage(input, input.available(), null);
@@ -222,7 +223,7 @@
final String key = "KEY";
final String value = "VALUE";
MockStreamListener listener = new MockStreamListener();
- clientTransport.newStream(method, listener);
+ clientTransport.newStream(method,new Metadata.Headers(), listener);
OkHttpClientStream stream = streams.get(3);
InputStream input = new ByteArrayInputStream(value.getBytes(StandardCharsets.UTF_8));
stream.writeContext(key, input, input.available(), null);
@@ -240,8 +241,8 @@
public void windowUpdate() throws Exception {
MockStreamListener listener1 = new MockStreamListener();
MockStreamListener listener2 = new MockStreamListener();
- clientTransport.newStream(method, listener1);
- clientTransport.newStream(method, listener2);
+ clientTransport.newStream(method,new Metadata.Headers(), listener1);
+ clientTransport.newStream(method,new Metadata.Headers(), listener2);
assertEquals(2, streams.size());
OkHttpClientStream stream1 = streams.get(3);
OkHttpClientStream stream2 = streams.get(5);
@@ -300,8 +301,8 @@
public void stopNormally() throws Exception {
MockStreamListener listener1 = new MockStreamListener();
MockStreamListener listener2 = new MockStreamListener();
- clientTransport.newStream(method, listener1);
- clientTransport.newStream(method, listener2);
+ clientTransport.newStream(method,new Metadata.Headers(), listener1);
+ clientTransport.newStream(method,new Metadata.Headers(), listener2);
assertEquals(2, streams.size());
clientTransport.stopAsync();
listener1.waitUntilStreamClosed();
@@ -318,8 +319,8 @@
// start 2 streams.
MockStreamListener listener1 = new MockStreamListener();
MockStreamListener listener2 = new MockStreamListener();
- clientTransport.newStream(method, listener1);
- clientTransport.newStream(method, listener2);
+ clientTransport.newStream(method,new Metadata.Headers(), listener1);
+ clientTransport.newStream(method,new Metadata.Headers(), listener2);
assertEquals(2, streams.size());
// Receive goAway, max good id is 3.
@@ -336,7 +337,7 @@
// New stream should be failed.
MockStreamListener listener3 = new MockStreamListener();
try {
- clientTransport.newStream(method, listener3);
+ clientTransport.newStream(method,new Metadata.Headers(), listener3);
fail("new stream should no be accepted by a go-away transport.");
} catch (IllegalStateException ex) {
// expected.
@@ -380,10 +381,10 @@
streams = transport.getStreams();
MockStreamListener listener1 = new MockStreamListener();
- transport.newStream(method, listener1);
+ transport.newStream(method,new Metadata.Headers(), listener1);
try {
- transport.newStream(method, new MockStreamListener());
+ transport.newStream(method, new Metadata.Headers(), new MockStreamListener());
fail("new stream should not be accepted by a go-away transport.");
} catch (IllegalStateException ex) {
// expected.
@@ -526,6 +527,11 @@
}
@Override
+ public ListenableFuture<Void> headersRead(Metadata.Headers headers) {
+ return null;
+ }
+
+ @Override
public ListenableFuture<Void> messageRead(InputStream message, int length) {
String msg = getContent(message);
if (msg != null) {
@@ -535,7 +541,7 @@
}
@Override
- public void closed(Status status) {
+ public void closed(Status status, Metadata.Trailers trailers) {
this.status = status;
closed.countDown();
}
diff --git a/stub/src/main/java/com/google/net/stubby/stub/AbstractStub.java b/stub/src/main/java/com/google/net/stubby/stub/AbstractStub.java
index f073263..df97c0c 100644
--- a/stub/src/main/java/com/google/net/stubby/stub/AbstractStub.java
+++ b/stub/src/main/java/com/google/net/stubby/stub/AbstractStub.java
@@ -7,8 +7,6 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import javax.inject.Provider;
-
/**
* Common base type for stub implementations. Allows for reconfiguration.
*/
@@ -61,16 +59,6 @@
}
/**
- * Set a header provider for all methods in the stub.
- */
- public StubConfigBuilder setHeader(String headerName, Provider<String> headerValueProvider) {
- for (Map.Entry<String, MethodDescriptor> entry : methodMap.entrySet()) {
- entry.setValue(entry.getValue().withHeader(headerName, headerValueProvider));
- }
- return this;
- }
-
- /**
* Set a timeout for all methods in the stub.
*/
public StubConfigBuilder setTimeout(long timeout, TimeUnit unit) {
diff --git a/stub/src/main/java/com/google/net/stubby/stub/Calls.java b/stub/src/main/java/com/google/net/stubby/stub/Calls.java
index bd6849e..bba5c57 100644
--- a/stub/src/main/java/com/google/net/stubby/stub/Calls.java
+++ b/stub/src/main/java/com/google/net/stubby/stub/Calls.java
@@ -7,6 +7,7 @@
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.UncheckedExecutionException;
import com.google.net.stubby.Call;
+import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status;
import com.google.net.stubby.transport.Transport;
@@ -121,7 +122,7 @@
Call<ReqT, RespT> call,
ReqT param,
Call.Listener<RespT> responseListener) {
- call.start(responseListener);
+ call.start(responseListener, new Metadata.Headers());
try {
call.sendPayload(param);
call.halfClose();
@@ -139,7 +140,7 @@
Call<ReqT, RespT> call,
Iterator<ReqT> clientStream) {
SettableFuture<RespT> responseFuture = SettableFuture.create();
- call.start(new UnaryStreamToFuture<RespT>(responseFuture));
+ call.start(new UnaryStreamToFuture<RespT>(responseFuture), new Metadata.Headers());
try {
while (clientStream.hasNext()) {
call.sendPayload(clientStream.next());
@@ -173,7 +174,8 @@
*/
public static <ReqT, RespT> StreamObserver<ReqT> duplexStreamingCall(
Call<ReqT, RespT> call, StreamObserver<RespT> responseObserver) {
- call.start(new StreamObserverToCallListenerAdapter<RespT>(responseObserver));
+ call.start(new StreamObserverToCallListenerAdapter<RespT>(responseObserver),
+ new Metadata.Headers());
return new CallToStreamObserverAdapter<ReqT>(call);
}
@@ -209,6 +211,11 @@
}
@Override
+ public ListenableFuture<Void> onHeaders(Metadata.Headers headers) {
+ return null;
+ }
+
+ @Override
public ListenableFuture<Void> onContext(String name, InputStream value) {
// StreamObservers don't receive contexts.
return null;
@@ -221,7 +228,7 @@
}
@Override
- public void onClose(Status status) {
+ public void onClose(Status status, Metadata.Trailers trailers) {
if (status.isOk()) {
observer.onCompleted();
} else {
@@ -242,6 +249,11 @@
}
@Override
+ public ListenableFuture<Void> onHeaders(Metadata.Headers headers) {
+ return null;
+ }
+
+ @Override
public ListenableFuture<Void> onContext(String name, InputStream value) {
// Don't care about contexts.
return null;
@@ -258,7 +270,7 @@
}
@Override
- public void onClose(Status status) {
+ public void onClose(Status status, Metadata.Trailers trailers) {
if (status.isOk()) {
if (value == null) {
// No value received so mark the future as an error
@@ -333,6 +345,11 @@
}
@Override
+ public ListenableFuture<Void> onHeaders(Metadata.Headers headers) {
+ return null;
+ }
+
+ @Override
public ListenableFuture<Void> onPayload(T value) {
Preconditions.checkState(!done, "Call already closed");
SettableFuture<Void> future = SettableFuture.create();
@@ -341,7 +358,7 @@
}
@Override
- public void onClose(Status status) {
+ public void onClose(Status status, Metadata.Trailers trailers) {
Preconditions.checkState(!done, "Call already closed");
if (status.isOk()) {
buffer.add(this);
diff --git a/stub/src/main/java/com/google/net/stubby/stub/HeadersInterceptor.java b/stub/src/main/java/com/google/net/stubby/stub/HeadersInterceptor.java
new file mode 100644
index 0000000..6a02c51
--- /dev/null
+++ b/stub/src/main/java/com/google/net/stubby/stub/HeadersInterceptor.java
@@ -0,0 +1,37 @@
+package com.google.net.stubby.stub;
+
+import com.google.net.stubby.Call;
+import com.google.net.stubby.Metadata;
+import com.google.net.stubby.MethodDescriptor;
+import com.google.net.stubby.context.ForwardingChannel;
+
+/**
+ * Utility functions for binding and receiving headers
+ */
+public class HeadersInterceptor {
+
+ /**
+ * Attach a set of request headers to a stub.
+ * @param stub to bind the headers to.
+ * @param extraHeaders the headers to be passed by each call on the returned stub.
+ * @return an implementation of the stub with extraHeaders bound to each call.
+ */
+ @SuppressWarnings("unchecked")
+ public static <T extends AbstractStub> T intercept(
+ T stub,
+ final Metadata.Headers extraHeaders) {
+ return (T) stub.configureNewStub().setChannel(
+ new ForwardingChannel(stub.getChannel()) {
+ @Override
+ public <ReqT, RespT> Call<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method) {
+ return new ForwardingCall<ReqT, RespT>(delegate.newCall(method)) {
+ @Override
+ public void start(Listener<RespT> responseListener, Metadata.Headers headers) {
+ headers.merge(extraHeaders);
+ delegate.start(responseListener, headers);
+ }
+ };
+ }
+ }).build();
+ }
+}