blob: cbf43e0a1ebcd8c374de65d01fa8d49bb7de0dfd [file] [log] [blame]
/*
* Copyright 2014, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.internal;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import io.grpc.InternalMetadata;
import io.grpc.InternalStatus;
import io.grpc.Metadata;
import io.grpc.Status;
import java.nio.charset.Charset;
import javax.annotation.Nullable;
/**
* Base implementation for client streams using HTTP2 as the transport.
*/
public abstract class Http2ClientStreamTransportState extends AbstractClientStream.TransportState {
/**
* Metadata marshaller for HTTP status lines.
*/
private static final InternalMetadata.TrustedAsciiMarshaller<Integer> HTTP_STATUS_MARSHALLER =
new InternalMetadata.TrustedAsciiMarshaller<Integer>() {
@Override
public byte[] toAsciiString(Integer value) {
throw new UnsupportedOperationException();
}
/**
* RFC 7231 says status codes are 3 digits long.
*
* @see <a href="https://tools.ietf.org/html/rfc7231#section-6">RFC 7231</a>
*/
@Override
public Integer parseAsciiString(byte[] serialized) {
if (serialized.length >= 3) {
return (serialized[0] - '0') * 100 + (serialized[1] - '0') * 10 + (serialized[2] - '0');
}
throw new NumberFormatException(
"Malformed status code " + new String(serialized, InternalMetadata.US_ASCII));
}
};
private static final Metadata.Key<Integer> HTTP2_STATUS = InternalMetadata.keyOf(":status",
HTTP_STATUS_MARSHALLER);
/** When non-{@code null}, {@link #transportErrorMetadata} must also be non-{@code null}. */
private Status transportError;
private Metadata transportErrorMetadata;
private Charset errorCharset = Charsets.UTF_8;
private boolean headersReceived;
protected Http2ClientStreamTransportState(int maxMessageSize, StatsTraceContext statsTraceCtx) {
super(maxMessageSize, statsTraceCtx);
}
/**
* Called to process a failure in HTTP/2 processing. It should notify the transport to cancel the
* stream and call {@code transportReportStatus()}.
*/
protected abstract void http2ProcessingFailed(Status status, Metadata trailers);
/**
* Called by subclasses whenever {@code Headers} are received from the transport.
*
* @param headers the received headers
*/
protected void transportHeadersReceived(Metadata headers) {
Preconditions.checkNotNull(headers, "headers");
if (transportError != null) {
// Already received a transport error so just augment it. Something is really, really strange.
transportError = transportError.augmentDescription("headers: " + headers);
return;
}
try {
if (headersReceived) {
transportError = Status.INTERNAL.withDescription("Received headers twice");
return;
}
Integer httpStatus = headers.get(HTTP2_STATUS);
if (httpStatus != null && httpStatus >= 100 && httpStatus < 200) {
// Ignore the headers. See RFC 7540 ยง8.1
return;
}
headersReceived = true;
transportError = validateInitialMetadata(headers);
if (transportError != null) {
return;
}
stripTransportDetails(headers);
inboundHeadersReceived(headers);
} finally {
if (transportError != null) {
// Note we don't immediately report the transport error, instead we wait for more data on
// the stream so we can accumulate more detail into the error before reporting it.
transportError = transportError.augmentDescription("headers: " + headers);
transportErrorMetadata = headers;
errorCharset = extractCharset(headers);
}
}
}
/**
* Called by subclasses whenever a data frame is received from the transport.
*
* @param frame the received data frame
* @param endOfStream {@code true} if there will be no more data received for this stream
*/
protected void transportDataReceived(ReadableBuffer frame, boolean endOfStream) {
if (transportError != null) {
// We've already detected a transport error and now we're just accumulating more detail
// for it.
transportError = transportError.augmentDescription("DATA-----------------------------\n"
+ ReadableBuffers.readAsString(frame, errorCharset));
frame.close();
if (transportError.getDescription().length() > 1000 || endOfStream) {
http2ProcessingFailed(transportError, transportErrorMetadata);
}
} else {
if (!headersReceived) {
http2ProcessingFailed(
Status.INTERNAL.withDescription("headers not received before payload"),
new Metadata());
return;
}
inboundDataReceived(frame);
if (endOfStream) {
// This is a protocol violation as we expect to receive trailers.
transportError =
Status.INTERNAL.withDescription("Received unexpected EOS on DATA frame from server.");
transportErrorMetadata = new Metadata();
transportReportStatus(transportError, false, transportErrorMetadata);
}
}
}
/**
* Called by subclasses for the terminal trailer metadata on a stream.
*
* @param trailers the received terminal trailer metadata
*/
protected void transportTrailersReceived(Metadata trailers) {
Preconditions.checkNotNull(trailers, "trailers");
if (transportError == null && !headersReceived) {
transportError = validateInitialMetadata(trailers);
if (transportError != null) {
transportErrorMetadata = trailers;
}
}
if (transportError != null) {
transportError = transportError.augmentDescription("trailers: " + trailers);
http2ProcessingFailed(transportError, transportErrorMetadata);
} else {
Status status = statusFromTrailers(trailers);
stripTransportDetails(trailers);
inboundTrailersReceived(trailers, status);
}
}
/**
* Extract the response status from trailers.
*/
private Status statusFromTrailers(Metadata trailers) {
Status status = trailers.get(InternalStatus.CODE_KEY);
if (status != null) {
return status.withDescription(trailers.get(InternalStatus.MESSAGE_KEY));
}
// No status; something is broken. Try to provide a resonanable error.
if (headersReceived) {
return Status.UNKNOWN.withDescription("missing GRPC status in response");
}
Integer httpStatus = trailers.get(HTTP2_STATUS);
if (httpStatus != null) {
status = GrpcUtil.httpStatusToGrpcStatus(httpStatus);
} else {
status = Status.INTERNAL.withDescription("missing HTTP status code");
}
return status.augmentDescription(
"missing GRPC status, inferred error from HTTP status code");
}
/**
* Inspect initial headers to make sure they conform to HTTP and gRPC, returning a {@code Status}
* on failure.
*
* @return status with description of failure, or {@code null} when valid
*/
@Nullable
private Status validateInitialMetadata(Metadata headers) {
Integer httpStatus = headers.get(HTTP2_STATUS);
if (httpStatus == null) {
return Status.INTERNAL.withDescription("Missing HTTP status code");
}
String contentType = headers.get(GrpcUtil.CONTENT_TYPE_KEY);
if (!GrpcUtil.isGrpcContentType(contentType)) {
return GrpcUtil.httpStatusToGrpcStatus(httpStatus)
.augmentDescription("invalid content-type: " + contentType);
}
return null;
}
/**
* Inspect the raw metadata and figure out what charset is being used.
*/
private static Charset extractCharset(Metadata headers) {
String contentType = headers.get(GrpcUtil.CONTENT_TYPE_KEY);
if (contentType != null) {
String[] split = contentType.split("charset=");
try {
return Charset.forName(split[split.length - 1].trim());
} catch (Exception t) {
// Ignore and assume UTF-8
}
}
return Charsets.UTF_8;
}
/**
* Strip HTTP transport implementation details so they don't leak via metadata into
* the application layer.
*/
private static void stripTransportDetails(Metadata metadata) {
metadata.discardAll(HTTP2_STATUS);
metadata.discardAll(InternalStatus.CODE_KEY);
metadata.discardAll(InternalStatus.MESSAGE_KEY);
}
}