| /* |
| * Copyright 2017, gRPC Authors All rights reserved. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package io.grpc.internal; |
| |
| import static com.google.instrumentation.stats.ContextUtils.STATS_CONTEXT_KEY; |
| import static java.util.concurrent.TimeUnit.MILLISECONDS; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertNotSame; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertSame; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| import static org.mockito.Matchers.any; |
| import static org.mockito.Matchers.anyString; |
| import static org.mockito.Matchers.argThat; |
| import static org.mockito.Matchers.eq; |
| import static org.mockito.Matchers.isNull; |
| import static org.mockito.Matchers.same; |
| import static org.mockito.Mockito.never; |
| import static org.mockito.Mockito.spy; |
| import static org.mockito.Mockito.verify; |
| import static org.mockito.Mockito.verifyNoMoreInteractions; |
| import static org.mockito.Mockito.verifyZeroInteractions; |
| import static org.mockito.Mockito.when; |
| |
| import com.google.instrumentation.stats.RpcConstants; |
| import com.google.instrumentation.stats.StatsContext; |
| import com.google.instrumentation.stats.TagValue; |
| import com.google.instrumentation.trace.Annotation; |
| import com.google.instrumentation.trace.AttributeValue; |
| import com.google.instrumentation.trace.BinaryPropagationHandler; |
| import com.google.instrumentation.trace.ContextUtils; |
| import com.google.instrumentation.trace.EndSpanOptions; |
| import com.google.instrumentation.trace.Link; |
| import com.google.instrumentation.trace.NetworkEvent; |
| import com.google.instrumentation.trace.Span; |
| import com.google.instrumentation.trace.SpanContext; |
| import com.google.instrumentation.trace.SpanFactory; |
| import com.google.instrumentation.trace.SpanId; |
| import com.google.instrumentation.trace.StartSpanOptions; |
| import com.google.instrumentation.trace.TraceId; |
| import com.google.instrumentation.trace.TraceOptions; |
| import com.google.instrumentation.trace.Tracer; |
| import io.grpc.CallOptions; |
| import io.grpc.Channel; |
| import io.grpc.ClientCall; |
| import io.grpc.ClientInterceptor; |
| import io.grpc.ClientInterceptors; |
| import io.grpc.ClientStreamTracer; |
| import io.grpc.Context; |
| import io.grpc.Metadata; |
| import io.grpc.MethodDescriptor; |
| import io.grpc.ServerCall; |
| import io.grpc.ServerCallHandler; |
| import io.grpc.ServerServiceDefinition; |
| import io.grpc.ServerStreamTracer; |
| import io.grpc.Status; |
| import io.grpc.internal.testing.StatsTestUtils; |
| import io.grpc.internal.testing.StatsTestUtils.FakeStatsContextFactory; |
| import io.grpc.testing.GrpcServerRule; |
| import java.io.ByteArrayInputStream; |
| import java.io.InputStream; |
| import java.text.ParseException; |
| import java.util.Map; |
| import java.util.Random; |
| import java.util.concurrent.atomic.AtomicReference; |
| import javax.annotation.Nullable; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.JUnit4; |
| import org.mockito.ArgumentCaptor; |
| import org.mockito.ArgumentMatcher; |
| import org.mockito.Captor; |
| import org.mockito.Mock; |
| import org.mockito.MockitoAnnotations; |
| |
| /** |
| * Test for {@link CensusStatsModule} and {@link CensusTracingModule}. |
| */ |
| @RunWith(JUnit4.class) |
| public class CensusModulesTest { |
| private static final CallOptions.Key<String> CUSTOM_OPTION = |
| CallOptions.Key.of("option1", "default"); |
| private static final CallOptions CALL_OPTIONS = |
| CallOptions.DEFAULT.withOption(CUSTOM_OPTION, "customvalue"); |
| |
| private static class StringInputStream extends InputStream { |
| final String string; |
| |
| StringInputStream(String string) { |
| this.string = string; |
| } |
| |
| @Override |
| public int read() { |
| // InProcessTransport doesn't actually read bytes from the InputStream. The InputStream is |
| // passed to the InProcess server and consumed by MARSHALLER.parse(). |
| throw new UnsupportedOperationException("Should not be called"); |
| } |
| } |
| |
| private static final MethodDescriptor.Marshaller<String> MARSHALLER = |
| new MethodDescriptor.Marshaller<String>() { |
| @Override |
| public InputStream stream(String value) { |
| return new StringInputStream(value); |
| } |
| |
| @Override |
| public String parse(InputStream stream) { |
| return ((StringInputStream) stream).string; |
| } |
| }; |
| |
| private final MethodDescriptor<String, String> method = |
| MethodDescriptor.<String, String>newBuilder() |
| .setType(MethodDescriptor.MethodType.UNKNOWN) |
| .setRequestMarshaller(MARSHALLER) |
| .setResponseMarshaller(MARSHALLER) |
| .setFullMethodName("package1.service2/method3") |
| .build(); |
| private final FakeClock fakeClock = new FakeClock(); |
| private final FakeStatsContextFactory statsCtxFactory = new FakeStatsContextFactory(); |
| private final Random random = new Random(0); |
| private final SpanContext fakeClientSpanContext = |
| SpanContext.create( |
| TraceId.generateRandomId(random), SpanId.generateRandomId(random), |
| TraceOptions.builder().build()); |
| private final SpanContext fakeClientParentSpanContext = |
| SpanContext.create( |
| TraceId.generateRandomId(random), SpanId.generateRandomId(random), |
| TraceOptions.builder().build()); |
| private final SpanContext fakeServerSpanContext = |
| SpanContext.create( |
| TraceId.generateRandomId(random), SpanId.generateRandomId(random), |
| TraceOptions.builder().build()); |
| private final SpanContext fakeServerParentSpanContext = |
| SpanContext.create( |
| TraceId.generateRandomId(random), SpanId.generateRandomId(random), |
| TraceOptions.builder().build()); |
| private final Span fakeClientSpan = new FakeSpan(fakeClientSpanContext); |
| private final Span fakeServerSpan = new FakeSpan(fakeServerSpanContext); |
| private final Span fakeClientParentSpan = new FakeSpan(fakeClientParentSpanContext); |
| private final Span fakeServerParentSpan = new FakeSpan(fakeServerParentSpanContext); |
| private final Span spyClientSpan = spy(fakeClientSpan); |
| private final Span spyServerSpan = spy(fakeServerSpan); |
| private final byte[] binarySpanContext = new byte[]{3, 1, 5}; |
| private final ArgumentMatcher<StartSpanOptions> startSpanOptionsMatcher = |
| new ArgumentMatcher<StartSpanOptions>() { |
| @Override |
| public boolean matches(Object argument) { |
| return Boolean.TRUE.equals(((StartSpanOptions) argument).getRecordEvents()); |
| } |
| }; |
| |
| @Rule |
| public final GrpcServerRule grpcServerRule = new GrpcServerRule().directExecutor(); |
| |
| @Mock |
| private AccessibleSpanFactory mockSpanFactory; |
| @Mock |
| private BinaryPropagationHandler mockTracingPropagationHandler; |
| @Mock |
| private ClientCall.Listener<String> mockClientCallListener; |
| @Mock |
| private ServerCall.Listener<String> mockServerCallListener; |
| @Captor |
| private ArgumentCaptor<CallOptions> callOptionsCaptor; |
| @Captor |
| private ArgumentCaptor<ClientCall.Listener<String>> clientCallListenerCaptor; |
| @Captor |
| private ArgumentCaptor<Status> statusCaptor; |
| |
| private Tracer tracer; |
| private CensusStatsModule censusStats; |
| private CensusTracingModule censusTracing; |
| |
| @Before |
| @SuppressWarnings("unchecked") |
| public void setUp() throws Exception { |
| MockitoAnnotations.initMocks(this); |
| when(mockSpanFactory.startSpan(any(Span.class), anyString(), any(StartSpanOptions.class))) |
| .thenReturn(spyClientSpan); |
| when( |
| mockSpanFactory.startSpanWithRemoteParent( |
| any(SpanContext.class), anyString(), any(StartSpanOptions.class))) |
| .thenReturn(spyServerSpan); |
| when(mockTracingPropagationHandler.toBinaryValue(any(SpanContext.class))) |
| .thenReturn(binarySpanContext); |
| when(mockTracingPropagationHandler.fromBinaryValue(any(byte[].class))) |
| .thenReturn(fakeServerParentSpanContext); |
| tracer = new Tracer(mockSpanFactory) {}; |
| censusStats = new CensusStatsModule(statsCtxFactory, fakeClock.getStopwatchSupplier(), true); |
| censusTracing = new CensusTracingModule(tracer, mockTracingPropagationHandler); |
| } |
| |
| @After |
| public void wrapUp() { |
| assertNull(statsCtxFactory.pollRecord()); |
| } |
| |
| @Test |
| public void clientInterceptorNoCustomTag() { |
| testClientInterceptors(false); |
| } |
| |
| @Test |
| public void clientInterceptorCustomTag() { |
| testClientInterceptors(true); |
| } |
| |
| // Test that Census ClientInterceptors uses the StatsContext and Span out of the current Context |
| // to create the ClientCallTracer, and that it intercepts ClientCall.Listener.onClose() to call |
| // ClientCallTracer.callEnded(). |
| private void testClientInterceptors(boolean nonDefaultContext) { |
| grpcServerRule.getServiceRegistry().addService( |
| ServerServiceDefinition.builder("package1.service2").addMethod( |
| method, new ServerCallHandler<String, String>() { |
| @Override |
| public ServerCall.Listener<String> startCall( |
| ServerCall<String, String> call, Metadata headers) { |
| call.sendHeaders(new Metadata()); |
| call.sendMessage("Hello"); |
| call.close( |
| Status.PERMISSION_DENIED.withDescription("No you don't"), new Metadata()); |
| return mockServerCallListener; |
| } |
| }).build()); |
| |
| final AtomicReference<CallOptions> capturedCallOptions = new AtomicReference<CallOptions>(); |
| ClientInterceptor callOptionsCaptureInterceptor = new ClientInterceptor() { |
| @Override |
| public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall( |
| MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) { |
| capturedCallOptions.set(callOptions); |
| return next.newCall(method, callOptions); |
| } |
| }; |
| Channel interceptedChannel = |
| ClientInterceptors.intercept( |
| grpcServerRule.getChannel(), callOptionsCaptureInterceptor, |
| censusStats.getClientInterceptor(), censusTracing.getClientInterceptor()); |
| ClientCall<String, String> call; |
| if (nonDefaultContext) { |
| Context ctx = |
| Context.ROOT.withValues( |
| STATS_CONTEXT_KEY, |
| statsCtxFactory.getDefault().with( |
| StatsTestUtils.EXTRA_TAG, TagValue.create("extra value")), |
| ContextUtils.CONTEXT_SPAN_KEY, |
| fakeClientParentSpan); |
| Context origCtx = ctx.attach(); |
| try { |
| call = interceptedChannel.newCall(method, CALL_OPTIONS); |
| } finally { |
| ctx.detach(origCtx); |
| } |
| } else { |
| assertNull(STATS_CONTEXT_KEY.get()); |
| assertNull(ContextUtils.CONTEXT_SPAN_KEY.get()); |
| call = interceptedChannel.newCall(method, CALL_OPTIONS); |
| } |
| |
| // The interceptor adds tracer factory to CallOptions |
| assertEquals("customvalue", capturedCallOptions.get().getOption(CUSTOM_OPTION)); |
| assertEquals(2, capturedCallOptions.get().getStreamTracerFactories().size()); |
| assertTrue( |
| capturedCallOptions.get().getStreamTracerFactories().get(0) |
| instanceof CensusTracingModule.ClientCallTracer); |
| assertTrue( |
| capturedCallOptions.get().getStreamTracerFactories().get(1) |
| instanceof CensusStatsModule.ClientCallTracer); |
| |
| // Make the call |
| Metadata headers = new Metadata(); |
| call.start(mockClientCallListener, headers); |
| assertNull(statsCtxFactory.pollRecord()); |
| if (nonDefaultContext) { |
| verify(mockSpanFactory).startSpan( |
| same(fakeClientParentSpan), eq("Sent.package1.service2.method3"), |
| argThat(startSpanOptionsMatcher)); |
| } else { |
| verify(mockSpanFactory).startSpan( |
| isNull(Span.class), eq("Sent.package1.service2.method3"), |
| argThat(startSpanOptionsMatcher)); |
| } |
| verify(spyClientSpan, never()).end(any(EndSpanOptions.class)); |
| |
| // End the call |
| call.halfClose(); |
| call.request(1); |
| |
| verify(mockClientCallListener).onClose(statusCaptor.capture(), any(Metadata.class)); |
| Status status = statusCaptor.getValue(); |
| assertEquals(Status.Code.PERMISSION_DENIED, status.getCode()); |
| assertEquals("No you don't", status.getDescription()); |
| |
| // The intercepting listener calls callEnded() on ClientCallTracer, which records to Census. |
| StatsTestUtils.MetricsRecord record = statsCtxFactory.pollRecord(); |
| assertNotNull(record); |
| TagValue methodTag = record.tags.get(RpcConstants.RPC_CLIENT_METHOD); |
| assertEquals(method.getFullMethodName(), methodTag.toString()); |
| TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS); |
| assertEquals(Status.Code.PERMISSION_DENIED.toString(), statusTag.toString()); |
| if (nonDefaultContext) { |
| TagValue extraTag = record.tags.get(StatsTestUtils.EXTRA_TAG); |
| assertEquals("extra value", extraTag.toString()); |
| } else { |
| assertNull(record.tags.get(StatsTestUtils.EXTRA_TAG)); |
| } |
| verify(spyClientSpan).end( |
| EndSpanOptions.builder() |
| .setStatus( |
| com.google.instrumentation.trace.Status.PERMISSION_DENIED |
| .withDescription("No you don't")) |
| .build()); |
| verify(spyClientSpan, never()).end(); |
| } |
| |
| @Test |
| public void clientBasicStatsDefaultContext() { |
| CensusStatsModule.ClientCallTracer callTracer = |
| censusStats.newClientCallTracer(statsCtxFactory.getDefault(), method.getFullMethodName()); |
| Metadata headers = new Metadata(); |
| ClientStreamTracer tracer = callTracer.newClientStreamTracer(headers); |
| |
| fakeClock.forwardTime(30, MILLISECONDS); |
| tracer.outboundHeaders(); |
| |
| fakeClock.forwardTime(100, MILLISECONDS); |
| tracer.outboundWireSize(1028); |
| tracer.outboundUncompressedSize(1128); |
| |
| fakeClock.forwardTime(16, MILLISECONDS); |
| tracer.inboundWireSize(33); |
| tracer.inboundUncompressedSize(67); |
| tracer.outboundWireSize(99); |
| tracer.outboundUncompressedSize(865); |
| |
| fakeClock.forwardTime(24, MILLISECONDS); |
| tracer.inboundWireSize(154); |
| tracer.inboundUncompressedSize(552); |
| tracer.streamClosed(Status.OK); |
| callTracer.callEnded(Status.OK); |
| |
| StatsTestUtils.MetricsRecord record = statsCtxFactory.pollRecord(); |
| assertNotNull(record); |
| assertNoServerContent(record); |
| TagValue methodTag = record.tags.get(RpcConstants.RPC_CLIENT_METHOD); |
| assertEquals(method.getFullMethodName(), methodTag.toString()); |
| TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS); |
| assertEquals(Status.Code.OK.toString(), statusTag.toString()); |
| assertNull(record.getMetric(RpcConstants.RPC_CLIENT_ERROR_COUNT)); |
| assertEquals(1028 + 99, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_REQUEST_BYTES)); |
| assertEquals(1128 + 865, |
| record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES)); |
| assertEquals(33 + 154, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_RESPONSE_BYTES)); |
| assertEquals(67 + 552, |
| record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES)); |
| assertEquals(30 + 100 + 16 + 24, |
| record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_ROUNDTRIP_LATENCY)); |
| } |
| |
| @Test |
| public void clientBasicTracingDefaultSpan() { |
| CensusTracingModule.ClientCallTracer callTracer = |
| censusTracing.newClientCallTracer(null, method.getFullMethodName()); |
| Metadata headers = new Metadata(); |
| ClientStreamTracer tracer = callTracer.newClientStreamTracer(headers); |
| verify(mockSpanFactory).startSpan( |
| isNull(Span.class), eq("Sent.package1.service2.method3"), argThat(startSpanOptionsMatcher)); |
| verify(spyClientSpan, never()).end(any(EndSpanOptions.class)); |
| |
| tracer.streamClosed(Status.OK); |
| callTracer.callEnded(Status.OK); |
| |
| verify(spyClientSpan).end( |
| EndSpanOptions.builder().setStatus(com.google.instrumentation.trace.Status.OK).build()); |
| verifyNoMoreInteractions(mockSpanFactory); |
| } |
| |
| @Test |
| public void clientStreamNeverCreatedStillRecordStats() { |
| CensusStatsModule.ClientCallTracer callTracer = |
| censusStats.newClientCallTracer( |
| statsCtxFactory.getDefault(), method.getFullMethodName()); |
| |
| fakeClock.forwardTime(3000, MILLISECONDS); |
| callTracer.callEnded(Status.DEADLINE_EXCEEDED.withDescription("3 seconds")); |
| |
| StatsTestUtils.MetricsRecord record = statsCtxFactory.pollRecord(); |
| assertNotNull(record); |
| assertNoServerContent(record); |
| TagValue methodTag = record.tags.get(RpcConstants.RPC_CLIENT_METHOD); |
| assertEquals(method.getFullMethodName(), methodTag.toString()); |
| TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS); |
| assertEquals(Status.Code.DEADLINE_EXCEEDED.toString(), statusTag.toString()); |
| assertEquals(1, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_ERROR_COUNT)); |
| assertEquals(0, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_REQUEST_BYTES)); |
| assertEquals(0, |
| record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES)); |
| assertEquals(0, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_RESPONSE_BYTES)); |
| assertEquals(0, |
| record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES)); |
| assertEquals(3000, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_ROUNDTRIP_LATENCY)); |
| assertNull(record.getMetric(RpcConstants.RPC_CLIENT_SERVER_ELAPSED_TIME)); |
| } |
| |
| @Test |
| public void clientStreamNeverCreatedStillRecordTracing() { |
| CensusTracingModule.ClientCallTracer callTracer = |
| censusTracing.newClientCallTracer(fakeClientParentSpan, method.getFullMethodName()); |
| verify(mockSpanFactory).startSpan( |
| same(fakeClientParentSpan), eq("Sent.package1.service2.method3"), |
| argThat(startSpanOptionsMatcher)); |
| |
| callTracer.callEnded(Status.DEADLINE_EXCEEDED.withDescription("3 seconds")); |
| verify(spyClientSpan).end( |
| EndSpanOptions.builder() |
| .setStatus( |
| com.google.instrumentation.trace.Status.DEADLINE_EXCEEDED |
| .withDescription("3 seconds")) |
| .build()); |
| verify(spyClientSpan, never()).end(); |
| } |
| |
| @Test |
| public void statsHeadersPropagateTags() { |
| subtestStatsHeadersPropagateTags(true); |
| } |
| |
| @Test |
| public void statsHeadersNotPropagateTags() { |
| subtestStatsHeadersPropagateTags(false); |
| } |
| |
| private void subtestStatsHeadersPropagateTags(boolean propagate) { |
| // EXTRA_TAG is propagated by the FakeStatsContextFactory. Note that not all tags are |
| // propagated. The StatsContextFactory decides which tags are to propagated. gRPC facilitates |
| // the propagation by putting them in the headers. |
| StatsContext clientCtx = statsCtxFactory.getDefault().with( |
| StatsTestUtils.EXTRA_TAG, TagValue.create("extra-tag-value-897")); |
| CensusStatsModule census = |
| new CensusStatsModule(statsCtxFactory, fakeClock.getStopwatchSupplier(), propagate); |
| Metadata headers = new Metadata(); |
| CensusStatsModule.ClientCallTracer callTracer = |
| census.newClientCallTracer(clientCtx, method.getFullMethodName()); |
| // This propagates clientCtx to headers if propagates==true |
| callTracer.newClientStreamTracer(headers); |
| if (propagate) { |
| assertTrue(headers.containsKey(census.statsHeader)); |
| } else { |
| assertFalse(headers.containsKey(census.statsHeader)); |
| return; |
| } |
| |
| ServerStreamTracer serverTracer = |
| census.getServerTracerFactory().newServerStreamTracer( |
| method.getFullMethodName(), headers); |
| // Server tracer deserializes clientCtx from the headers, so that it records stats with the |
| // propagated tags. |
| Context serverContext = serverTracer.filterContext(Context.ROOT); |
| // It also put clientCtx in the Context seen by the call handler |
| assertEquals(clientCtx, STATS_CONTEXT_KEY.get(serverContext)); |
| |
| // Verifies that the server tracer records the status with the propagated tag |
| serverTracer.streamClosed(Status.OK); |
| |
| StatsTestUtils.MetricsRecord serverRecord = statsCtxFactory.pollRecord(); |
| assertNotNull(serverRecord); |
| assertNoClientContent(serverRecord); |
| TagValue serverMethodTag = serverRecord.tags.get(RpcConstants.RPC_SERVER_METHOD); |
| assertEquals(method.getFullMethodName(), serverMethodTag.toString()); |
| TagValue serverStatusTag = serverRecord.tags.get(RpcConstants.RPC_STATUS); |
| assertEquals(Status.Code.OK.toString(), serverStatusTag.toString()); |
| assertNull(serverRecord.getMetric(RpcConstants.RPC_SERVER_ERROR_COUNT)); |
| TagValue serverPropagatedTag = serverRecord.tags.get(StatsTestUtils.EXTRA_TAG); |
| assertEquals("extra-tag-value-897", serverPropagatedTag.toString()); |
| |
| // Verifies that the client tracer factory uses clientCtx, which includes the custom tags, to |
| // record stats. |
| callTracer.callEnded(Status.OK); |
| |
| StatsTestUtils.MetricsRecord clientRecord = statsCtxFactory.pollRecord(); |
| assertNotNull(clientRecord); |
| assertNoServerContent(clientRecord); |
| TagValue clientMethodTag = clientRecord.tags.get(RpcConstants.RPC_CLIENT_METHOD); |
| assertEquals(method.getFullMethodName(), clientMethodTag.toString()); |
| TagValue clientStatusTag = clientRecord.tags.get(RpcConstants.RPC_STATUS); |
| assertEquals(Status.Code.OK.toString(), clientStatusTag.toString()); |
| assertNull(clientRecord.getMetric(RpcConstants.RPC_CLIENT_ERROR_COUNT)); |
| TagValue clientPropagatedTag = clientRecord.tags.get(StatsTestUtils.EXTRA_TAG); |
| assertEquals("extra-tag-value-897", clientPropagatedTag.toString()); |
| } |
| |
| @Test |
| public void statsHeadersNotPropagateDefaultContext() { |
| CensusStatsModule.ClientCallTracer callTracer = |
| censusStats.newClientCallTracer(statsCtxFactory.getDefault(), method.getFullMethodName()); |
| Metadata headers = new Metadata(); |
| callTracer.newClientStreamTracer(headers); |
| assertFalse(headers.containsKey(censusStats.statsHeader)); |
| } |
| |
| @Test |
| public void statsHeaderMalformed() { |
| // Construct a malformed header and make sure parsing it will throw |
| byte[] statsHeaderValue = new byte[]{1}; |
| Metadata.Key<byte[]> arbitraryStatsHeader = |
| Metadata.Key.of("grpc-tags-bin", Metadata.BINARY_BYTE_MARSHALLER); |
| try { |
| statsCtxFactory.deserialize(new ByteArrayInputStream(statsHeaderValue)); |
| fail("Should have thrown"); |
| } catch (Exception e) { |
| // Expected |
| } |
| |
| // But the header key will return a default context for it |
| Metadata headers = new Metadata(); |
| assertNull(headers.get(censusStats.statsHeader)); |
| headers.put(arbitraryStatsHeader, statsHeaderValue); |
| assertSame(statsCtxFactory.getDefault(), headers.get(censusStats.statsHeader)); |
| } |
| |
| @Test |
| public void traceHeadersPropagateSpanContext() throws Exception { |
| CensusTracingModule.ClientCallTracer callTracer = |
| censusTracing.newClientCallTracer(fakeClientParentSpan, method.getFullMethodName()); |
| Metadata headers = new Metadata(); |
| callTracer.newClientStreamTracer(headers); |
| |
| verify(mockTracingPropagationHandler).toBinaryValue(same(fakeClientSpanContext)); |
| verifyNoMoreInteractions(mockTracingPropagationHandler); |
| verify(mockSpanFactory).startSpan( |
| same(fakeClientParentSpan), eq("Sent.package1.service2.method3"), |
| argThat(startSpanOptionsMatcher)); |
| verifyNoMoreInteractions(mockSpanFactory); |
| assertTrue(headers.containsKey(censusTracing.tracingHeader)); |
| |
| ServerStreamTracer serverTracer = |
| censusTracing.getServerTracerFactory().newServerStreamTracer( |
| method.getFullMethodName(), headers); |
| verify(mockTracingPropagationHandler).fromBinaryValue(same(binarySpanContext)); |
| verify(mockSpanFactory).startSpanWithRemoteParent( |
| same(fakeServerParentSpanContext), eq("Recv.package1.service2.method3"), |
| argThat(startSpanOptionsMatcher)); |
| |
| Context filteredContext = serverTracer.filterContext(Context.ROOT); |
| assertSame(spyServerSpan, ContextUtils.CONTEXT_SPAN_KEY.get(filteredContext)); |
| } |
| |
| @Test |
| public void traceHeaderMalformed() throws Exception { |
| // As comparison, normal header parsing |
| Metadata headers = new Metadata(); |
| headers.put(censusTracing.tracingHeader, fakeClientSpanContext); |
| // mockTracingPropagationHandler was stubbed to always return fakeServerParentSpanContext |
| assertSame(fakeServerParentSpanContext, headers.get(censusTracing.tracingHeader)); |
| |
| // Make BinaryPropagationHandler always throw when parsing the header |
| when(mockTracingPropagationHandler.fromBinaryValue(any(byte[].class))) |
| .thenThrow(new ParseException("Malformed header", 0)); |
| |
| headers = new Metadata(); |
| assertNull(headers.get(censusTracing.tracingHeader)); |
| headers.put(censusTracing.tracingHeader, fakeClientSpanContext); |
| assertSame(SpanContext.INVALID, headers.get(censusTracing.tracingHeader)); |
| assertNotSame(fakeServerParentSpanContext, SpanContext.INVALID); |
| |
| // A null Span is used as the parent in this case |
| censusTracing.getServerTracerFactory().newServerStreamTracer( |
| method.getFullMethodName(), headers); |
| verify(mockSpanFactory).startSpanWithRemoteParent( |
| isNull(SpanContext.class), eq("Recv.package1.service2.method3"), |
| argThat(startSpanOptionsMatcher)); |
| } |
| |
| @Test |
| public void serverBasicStatsNoHeaders() { |
| ServerStreamTracer.Factory tracerFactory = censusStats.getServerTracerFactory(); |
| ServerStreamTracer tracer = |
| tracerFactory.newServerStreamTracer(method.getFullMethodName(), new Metadata()); |
| |
| Context filteredContext = tracer.filterContext(Context.ROOT); |
| assertNull(STATS_CONTEXT_KEY.get(filteredContext)); |
| |
| tracer.inboundWireSize(34); |
| tracer.inboundUncompressedSize(67); |
| |
| fakeClock.forwardTime(100, MILLISECONDS); |
| tracer.outboundWireSize(1028); |
| tracer.outboundUncompressedSize(1128); |
| |
| fakeClock.forwardTime(16, MILLISECONDS); |
| tracer.inboundWireSize(154); |
| tracer.inboundUncompressedSize(552); |
| tracer.outboundWireSize(99); |
| tracer.outboundUncompressedSize(865); |
| |
| fakeClock.forwardTime(24, MILLISECONDS); |
| |
| tracer.streamClosed(Status.CANCELLED); |
| |
| StatsTestUtils.MetricsRecord record = statsCtxFactory.pollRecord(); |
| assertNotNull(record); |
| assertNoClientContent(record); |
| TagValue methodTag = record.tags.get(RpcConstants.RPC_SERVER_METHOD); |
| assertEquals(method.getFullMethodName(), methodTag.toString()); |
| TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS); |
| assertEquals(Status.Code.CANCELLED.toString(), statusTag.toString()); |
| assertEquals(1, record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_ERROR_COUNT)); |
| assertEquals(1028 + 99, record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_RESPONSE_BYTES)); |
| assertEquals(1128 + 865, |
| record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES)); |
| assertEquals(34 + 154, record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_REQUEST_BYTES)); |
| assertEquals(67 + 552, |
| record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES)); |
| assertEquals(100 + 16 + 24, |
| record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_SERVER_LATENCY)); |
| } |
| |
| @Test |
| public void serverBasicTracingNoHeaders() { |
| ServerStreamTracer.Factory tracerFactory = censusTracing.getServerTracerFactory(); |
| ServerStreamTracer tracer = |
| tracerFactory.newServerStreamTracer(method.getFullMethodName(), new Metadata()); |
| verifyZeroInteractions(mockTracingPropagationHandler); |
| verify(mockSpanFactory).startSpanWithRemoteParent( |
| isNull(SpanContext.class), eq("Recv.package1.service2.method3"), |
| argThat(startSpanOptionsMatcher)); |
| |
| Context filteredContext = tracer.filterContext(Context.ROOT); |
| assertSame(spyServerSpan, ContextUtils.CONTEXT_SPAN_KEY.get(filteredContext)); |
| |
| verify(spyServerSpan, never()).end(any(EndSpanOptions.class)); |
| tracer.streamClosed(Status.CANCELLED); |
| |
| verify(spyServerSpan).end( |
| EndSpanOptions.builder() |
| .setStatus(com.google.instrumentation.trace.Status.CANCELLED).build()); |
| verify(spyServerSpan, never()).end(); |
| } |
| |
| @Test |
| public void convertToTracingStatus() { |
| // Without description |
| for (Status.Code grpcCode : Status.Code.values()) { |
| Status grpcStatus = Status.fromCode(grpcCode); |
| com.google.instrumentation.trace.Status tracingStatus = |
| CensusTracingModule.convertStatus(grpcStatus); |
| assertEquals(grpcCode.toString(), tracingStatus.getCanonicalCode().toString()); |
| assertNull(tracingStatus.getDescription()); |
| } |
| |
| // With description |
| for (Status.Code grpcCode : Status.Code.values()) { |
| Status grpcStatus = Status.fromCode(grpcCode).withDescription("This is my description"); |
| com.google.instrumentation.trace.Status tracingStatus = |
| CensusTracingModule.convertStatus(grpcStatus); |
| assertEquals(grpcCode.toString(), tracingStatus.getCanonicalCode().toString()); |
| assertEquals(grpcStatus.getDescription(), tracingStatus.getDescription()); |
| } |
| } |
| |
| private static void assertNoServerContent(StatsTestUtils.MetricsRecord record) { |
| assertNull(record.getMetric(RpcConstants.RPC_SERVER_ERROR_COUNT)); |
| assertNull(record.getMetric(RpcConstants.RPC_SERVER_REQUEST_BYTES)); |
| assertNull(record.getMetric(RpcConstants.RPC_SERVER_RESPONSE_BYTES)); |
| assertNull(record.getMetric(RpcConstants.RPC_SERVER_SERVER_ELAPSED_TIME)); |
| assertNull(record.getMetric(RpcConstants.RPC_SERVER_SERVER_LATENCY)); |
| assertNull(record.getMetric(RpcConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES)); |
| assertNull(record.getMetric(RpcConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES)); |
| } |
| |
| private static void assertNoClientContent(StatsTestUtils.MetricsRecord record) { |
| assertNull(record.getMetric(RpcConstants.RPC_CLIENT_ERROR_COUNT)); |
| assertNull(record.getMetric(RpcConstants.RPC_CLIENT_REQUEST_BYTES)); |
| assertNull(record.getMetric(RpcConstants.RPC_CLIENT_RESPONSE_BYTES)); |
| assertNull(record.getMetric(RpcConstants.RPC_CLIENT_ROUNDTRIP_LATENCY)); |
| assertNull(record.getMetric(RpcConstants.RPC_CLIENT_SERVER_ELAPSED_TIME)); |
| assertNull(record.getMetric(RpcConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES)); |
| assertNull(record.getMetric(RpcConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES)); |
| } |
| |
| // Promote the visibility of SpanFactory's methods to allow mocking |
| private abstract static class AccessibleSpanFactory extends SpanFactory { |
| @Override |
| public abstract Span startSpan(@Nullable Span parent, String name, StartSpanOptions options); |
| |
| @Override |
| public abstract Span startSpanWithRemoteParent( |
| @Nullable SpanContext remoteParent, String name, StartSpanOptions options); |
| } |
| |
| private static class FakeSpan extends Span { |
| FakeSpan(SpanContext ctx) { |
| super(ctx, null); |
| } |
| |
| @Override |
| public void addAttributes(Map<String, AttributeValue> attributes) { |
| } |
| |
| @Override |
| public void addAnnotation(String description, Map<String, AttributeValue> attributes) { |
| } |
| |
| @Override |
| public void addAnnotation(Annotation annotation) { |
| } |
| |
| @Override |
| public void addNetworkEvent(NetworkEvent networkEvent) { |
| } |
| |
| @Override |
| public void addLink(Link link) { |
| } |
| |
| @Override |
| public void end(EndSpanOptions options) { |
| } |
| } |
| } |