| /* |
| * Copyright 2015, gRPC Authors All rights reserved. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package io.grpc.internal; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertSame; |
| import static org.junit.Assert.assertTrue; |
| import static org.mockito.Matchers.any; |
| import static org.mockito.Matchers.same; |
| import static org.mockito.Mockito.doAnswer; |
| import static org.mockito.Mockito.inOrder; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.never; |
| import static org.mockito.Mockito.timeout; |
| import static org.mockito.Mockito.times; |
| import static org.mockito.Mockito.verify; |
| import static org.mockito.Mockito.verifyNoMoreInteractions; |
| import static org.mockito.Mockito.when; |
| |
| import io.grpc.CallOptions; |
| import io.grpc.IntegerMarshaller; |
| import io.grpc.LoadBalancer.PickResult; |
| import io.grpc.LoadBalancer.PickSubchannelArgs; |
| import io.grpc.LoadBalancer.SubchannelPicker; |
| import io.grpc.Metadata; |
| import io.grpc.MethodDescriptor; |
| import io.grpc.MethodDescriptor.MethodType; |
| import io.grpc.Status; |
| import io.grpc.StringMarshaller; |
| import java.util.concurrent.CyclicBarrier; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.JUnit4; |
| import org.mockito.ArgumentCaptor; |
| import org.mockito.Captor; |
| import org.mockito.InOrder; |
| import org.mockito.Mock; |
| import org.mockito.MockitoAnnotations; |
| import org.mockito.invocation.InvocationOnMock; |
| import org.mockito.stubbing.Answer; |
| |
| /** |
| * Unit tests for {@link DelayedClientTransport}. |
| */ |
| @RunWith(JUnit4.class) |
| public class DelayedClientTransportTest { |
| @Mock private ManagedClientTransport.Listener transportListener; |
| @Mock private SubchannelPicker mockPicker; |
| @Mock private SubchannelImpl mockSubchannel; |
| @Mock private ClientTransport mockRealTransport; |
| @Mock private ClientTransport mockRealTransport2; |
| @Mock private ClientStream mockRealStream; |
| @Mock private ClientStream mockRealStream2; |
| @Mock private ClientStreamListener streamListener; |
| @Mock private Executor mockExecutor; |
| @Captor private ArgumentCaptor<Status> statusCaptor; |
| @Captor private ArgumentCaptor<ClientStreamListener> listenerCaptor; |
| |
| private static final CallOptions.Key<Integer> SHARD_ID = CallOptions.Key.of("shard-id", -1); |
| |
| private final MethodDescriptor<String, Integer> method = |
| MethodDescriptor.<String, Integer>newBuilder() |
| .setType(MethodType.UNKNOWN) |
| .setFullMethodName("/service/method") |
| .setRequestMarshaller(new StringMarshaller()) |
| .setResponseMarshaller(new IntegerMarshaller()) |
| .build(); |
| private final MethodDescriptor<String, Integer> method2 = |
| method.toBuilder().setFullMethodName("/service/method").build(); |
| private final Metadata headers = new Metadata(); |
| private final Metadata headers2 = new Metadata(); |
| |
| private final CallOptions callOptions = CallOptions.DEFAULT.withAuthority("dummy_value"); |
| private final CallOptions callOptions2 = CallOptions.DEFAULT.withAuthority("dummy_value2"); |
| |
| private final FakeClock fakeExecutor = new FakeClock(); |
| |
| private final DelayedClientTransport delayedTransport = new DelayedClientTransport( |
| fakeExecutor.getScheduledExecutorService(), new ChannelExecutor()); |
| |
| @Before public void setUp() { |
| MockitoAnnotations.initMocks(this); |
| when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) |
| .thenReturn(PickResult.withSubchannel(mockSubchannel)); |
| when(mockSubchannel.obtainActiveTransport()).thenReturn(mockRealTransport); |
| when(mockRealTransport.newStream(same(method), same(headers), same(callOptions))) |
| .thenReturn(mockRealStream); |
| when(mockRealTransport2.newStream(same(method2), same(headers2), same(callOptions2))) |
| .thenReturn(mockRealStream2); |
| delayedTransport.start(transportListener); |
| } |
| |
| @After public void noMorePendingTasks() { |
| assertEquals(0, fakeExecutor.numPendingTasks()); |
| } |
| |
| @Test public void streamStartThenAssignTransport() { |
| assertFalse(delayedTransport.hasPendingStreams()); |
| ClientStream stream = delayedTransport.newStream(method, headers, callOptions); |
| stream.start(streamListener); |
| assertEquals(1, delayedTransport.getPendingStreamsCount()); |
| assertTrue(delayedTransport.hasPendingStreams()); |
| assertTrue(stream instanceof DelayedStream); |
| assertEquals(0, fakeExecutor.numPendingTasks()); |
| delayedTransport.reprocess(mockPicker); |
| assertEquals(0, delayedTransport.getPendingStreamsCount()); |
| assertFalse(delayedTransport.hasPendingStreams()); |
| assertEquals(1, fakeExecutor.runDueTasks()); |
| verify(mockRealTransport).newStream(same(method), same(headers), same(callOptions)); |
| verify(mockRealStream).start(listenerCaptor.capture()); |
| verifyNoMoreInteractions(streamListener); |
| listenerCaptor.getValue().onReady(); |
| verify(streamListener).onReady(); |
| verifyNoMoreInteractions(streamListener); |
| } |
| |
| @Test public void newStreamThenAssignTransportThenShutdown() { |
| ClientStream stream = delayedTransport.newStream(method, headers, callOptions); |
| assertEquals(1, delayedTransport.getPendingStreamsCount()); |
| assertTrue(stream instanceof DelayedStream); |
| delayedTransport.reprocess(mockPicker); |
| assertEquals(0, delayedTransport.getPendingStreamsCount()); |
| delayedTransport.shutdown(); |
| verify(transportListener).transportShutdown(any(Status.class)); |
| verify(transportListener).transportTerminated(); |
| assertEquals(1, fakeExecutor.runDueTasks()); |
| verify(mockRealTransport).newStream(same(method), same(headers), same(callOptions)); |
| stream.start(streamListener); |
| verify(mockRealStream).start(same(streamListener)); |
| } |
| |
| @Test public void transportTerminatedThenAssignTransport() { |
| delayedTransport.shutdown(); |
| verify(transportListener).transportShutdown(any(Status.class)); |
| verify(transportListener).transportTerminated(); |
| delayedTransport.reprocess(mockPicker); |
| verifyNoMoreInteractions(transportListener); |
| } |
| |
| @Test public void assignTransportThenShutdownThenNewStream() { |
| delayedTransport.reprocess(mockPicker); |
| delayedTransport.shutdown(); |
| verify(transportListener).transportShutdown(any(Status.class)); |
| verify(transportListener).transportTerminated(); |
| ClientStream stream = delayedTransport.newStream(method, headers, callOptions); |
| assertEquals(0, delayedTransport.getPendingStreamsCount()); |
| assertTrue(stream instanceof FailingClientStream); |
| verify(mockRealTransport, never()).newStream( |
| any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)); |
| } |
| |
| @Test public void assignTransportThenShutdownNowThenNewStream() { |
| delayedTransport.reprocess(mockPicker); |
| delayedTransport.shutdownNow(Status.UNAVAILABLE); |
| verify(transportListener).transportShutdown(any(Status.class)); |
| verify(transportListener).transportTerminated(); |
| ClientStream stream = delayedTransport.newStream(method, headers, callOptions); |
| assertEquals(0, delayedTransport.getPendingStreamsCount()); |
| assertTrue(stream instanceof FailingClientStream); |
| verify(mockRealTransport, never()).newStream( |
| any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)); |
| } |
| |
| @Test public void cancelStreamWithoutSetTransport() { |
| ClientStream stream = delayedTransport.newStream(method, new Metadata()); |
| assertEquals(1, delayedTransport.getPendingStreamsCount()); |
| stream.cancel(Status.CANCELLED); |
| assertEquals(0, delayedTransport.getPendingStreamsCount()); |
| verifyNoMoreInteractions(mockRealTransport); |
| verifyNoMoreInteractions(mockRealStream); |
| } |
| |
| @Test public void startThenCancelStreamWithoutSetTransport() { |
| ClientStream stream = delayedTransport.newStream(method, new Metadata()); |
| stream.start(streamListener); |
| assertEquals(1, delayedTransport.getPendingStreamsCount()); |
| stream.cancel(Status.CANCELLED); |
| assertEquals(0, delayedTransport.getPendingStreamsCount()); |
| verify(streamListener).closed(same(Status.CANCELLED), any(Metadata.class)); |
| verifyNoMoreInteractions(mockRealTransport); |
| verifyNoMoreInteractions(mockRealStream); |
| } |
| |
| @Test public void newStreamThenShutdownTransportThenAssignTransport() { |
| ClientStream stream = delayedTransport.newStream(method, headers, callOptions); |
| stream.start(streamListener); |
| delayedTransport.shutdown(); |
| |
| // Stream is still buffered |
| verify(transportListener).transportShutdown(any(Status.class)); |
| verify(transportListener, times(0)).transportTerminated(); |
| assertEquals(1, delayedTransport.getPendingStreamsCount()); |
| |
| // ... and will proceed if a real transport is available |
| delayedTransport.reprocess(mockPicker); |
| fakeExecutor.runDueTasks(); |
| verify(mockRealTransport).newStream(method, headers, callOptions); |
| verify(mockRealStream).start(any(ClientStreamListener.class)); |
| |
| // Since no more streams are pending, delayed transport is now terminated |
| assertEquals(0, delayedTransport.getPendingStreamsCount()); |
| verify(transportListener).transportTerminated(); |
| |
| // Further newStream() will return a failing stream |
| stream = delayedTransport.newStream(method, new Metadata()); |
| verify(streamListener, never()).closed(any(Status.class), any(Metadata.class)); |
| stream.start(streamListener); |
| verify(streamListener).closed(statusCaptor.capture(), any(Metadata.class)); |
| assertEquals(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode()); |
| |
| assertEquals(0, delayedTransport.getPendingStreamsCount()); |
| verifyNoMoreInteractions(mockRealTransport); |
| verifyNoMoreInteractions(mockRealStream); |
| } |
| |
| @Test public void newStreamThenShutdownTransportThenCancelStream() { |
| ClientStream stream = delayedTransport.newStream(method, new Metadata()); |
| delayedTransport.shutdown(); |
| verify(transportListener).transportShutdown(any(Status.class)); |
| verify(transportListener, times(0)).transportTerminated(); |
| assertEquals(1, delayedTransport.getPendingStreamsCount()); |
| stream.cancel(Status.CANCELLED); |
| verify(transportListener).transportTerminated(); |
| assertEquals(0, delayedTransport.getPendingStreamsCount()); |
| verifyNoMoreInteractions(mockRealTransport); |
| verifyNoMoreInteractions(mockRealStream); |
| } |
| |
| @Test public void shutdownThenNewStream() { |
| delayedTransport.shutdown(); |
| verify(transportListener).transportShutdown(any(Status.class)); |
| verify(transportListener).transportTerminated(); |
| ClientStream stream = delayedTransport.newStream(method, new Metadata()); |
| stream.start(streamListener); |
| verify(streamListener).closed(statusCaptor.capture(), any(Metadata.class)); |
| assertEquals(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode()); |
| } |
| |
| @Test public void startStreamThenShutdownNow() { |
| ClientStream stream = delayedTransport.newStream(method, new Metadata()); |
| stream.start(streamListener); |
| delayedTransport.shutdownNow(Status.UNAVAILABLE); |
| verify(transportListener).transportShutdown(any(Status.class)); |
| verify(transportListener).transportTerminated(); |
| verify(streamListener).closed(statusCaptor.capture(), any(Metadata.class)); |
| assertEquals(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode()); |
| } |
| |
| @Test public void shutdownNowThenNewStream() { |
| delayedTransport.shutdownNow(Status.UNAVAILABLE); |
| verify(transportListener).transportShutdown(any(Status.class)); |
| verify(transportListener).transportTerminated(); |
| ClientStream stream = delayedTransport.newStream(method, new Metadata()); |
| stream.start(streamListener); |
| verify(streamListener).closed(statusCaptor.capture(), any(Metadata.class)); |
| assertEquals(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode()); |
| } |
| |
| @Test public void reprocessSemantics() { |
| CallOptions failFastCallOptions = CallOptions.DEFAULT.withOption(SHARD_ID, 1); |
| CallOptions waitForReadyCallOptions = CallOptions.DEFAULT.withOption(SHARD_ID, 2) |
| .withWaitForReady(); |
| |
| SubchannelImpl subchannel1 = mock(SubchannelImpl.class); |
| SubchannelImpl subchannel2 = mock(SubchannelImpl.class); |
| SubchannelImpl subchannel3 = mock(SubchannelImpl.class); |
| when(mockRealTransport.newStream(any(MethodDescriptor.class), any(Metadata.class), |
| any(CallOptions.class))).thenReturn(mockRealStream); |
| when(mockRealTransport2.newStream(any(MethodDescriptor.class), any(Metadata.class), |
| any(CallOptions.class))).thenReturn(mockRealStream2); |
| when(subchannel1.obtainActiveTransport()).thenReturn(mockRealTransport); |
| when(subchannel2.obtainActiveTransport()).thenReturn(mockRealTransport2); |
| when(subchannel3.obtainActiveTransport()).thenReturn(null); |
| |
| // Fail-fast streams |
| DelayedStream ff1 = (DelayedStream) delayedTransport.newStream( |
| method, headers, failFastCallOptions); |
| PickSubchannelArgsImpl ff1args = new PickSubchannelArgsImpl(method, headers, |
| failFastCallOptions); |
| verify(transportListener).transportInUse(true); |
| DelayedStream ff2 = (DelayedStream) delayedTransport.newStream( |
| method2, headers2, failFastCallOptions); |
| PickSubchannelArgsImpl ff2args = new PickSubchannelArgsImpl(method2, headers2, |
| failFastCallOptions); |
| DelayedStream ff3 = (DelayedStream) delayedTransport.newStream( |
| method, headers, failFastCallOptions); |
| PickSubchannelArgsImpl ff3args = new PickSubchannelArgsImpl(method, headers, |
| failFastCallOptions); |
| DelayedStream ff4 = (DelayedStream) delayedTransport.newStream( |
| method2, headers2, failFastCallOptions); |
| PickSubchannelArgsImpl ff4args = new PickSubchannelArgsImpl(method2, headers2, |
| failFastCallOptions); |
| |
| // Wait-for-ready streams |
| FakeClock wfr3Executor = new FakeClock(); |
| DelayedStream wfr1 = (DelayedStream) delayedTransport.newStream( |
| method, headers, waitForReadyCallOptions); |
| PickSubchannelArgsImpl wfr1args = new PickSubchannelArgsImpl(method, headers, |
| waitForReadyCallOptions); |
| DelayedStream wfr2 = (DelayedStream) delayedTransport.newStream( |
| method2, headers2, waitForReadyCallOptions); |
| PickSubchannelArgsImpl wfr2args = new PickSubchannelArgsImpl(method2, headers2, |
| waitForReadyCallOptions); |
| CallOptions wfr3callOptions = waitForReadyCallOptions.withExecutor( |
| wfr3Executor.getScheduledExecutorService()); |
| DelayedStream wfr3 = (DelayedStream) delayedTransport.newStream( |
| method, headers, wfr3callOptions); |
| PickSubchannelArgsImpl wfr3args = new PickSubchannelArgsImpl(method, headers, |
| wfr3callOptions); |
| DelayedStream wfr4 = (DelayedStream) delayedTransport.newStream( |
| method2, headers2, waitForReadyCallOptions); |
| PickSubchannelArgsImpl wfr4args = new PickSubchannelArgsImpl(method2, headers2, |
| waitForReadyCallOptions); |
| |
| assertEquals(8, delayedTransport.getPendingStreamsCount()); |
| |
| // First reprocess(). Some will proceed, some will fail and the rest will stay buffered. |
| SubchannelPicker picker = mock(SubchannelPicker.class); |
| when(picker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn( |
| // For the fail-fast streams |
| PickResult.withSubchannel(subchannel1), // ff1: proceed |
| PickResult.withError(Status.UNAVAILABLE), // ff2: fail |
| PickResult.withSubchannel(subchannel3), // ff3: stay |
| PickResult.withNoResult(), // ff4: stay |
| // For the wait-for-ready streams |
| PickResult.withSubchannel(subchannel2), // wfr1: proceed |
| PickResult.withError(Status.RESOURCE_EXHAUSTED), // wfr2: stay |
| PickResult.withSubchannel(subchannel3)); // wfr3: stay |
| InOrder inOrder = inOrder(picker); |
| delayedTransport.reprocess(picker); |
| |
| assertEquals(5, delayedTransport.getPendingStreamsCount()); |
| inOrder.verify(picker).pickSubchannel(ff1args); |
| inOrder.verify(picker).pickSubchannel(ff2args); |
| inOrder.verify(picker).pickSubchannel(ff3args); |
| inOrder.verify(picker).pickSubchannel(ff4args); |
| inOrder.verify(picker).pickSubchannel(wfr1args); |
| inOrder.verify(picker).pickSubchannel(wfr2args); |
| inOrder.verify(picker).pickSubchannel(wfr3args); |
| inOrder.verify(picker).pickSubchannel(wfr4args); |
| |
| inOrder.verifyNoMoreInteractions(); |
| // Make sure that real transport creates streams in the executor |
| verify(mockRealTransport, never()).newStream( |
| any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)); |
| verify(mockRealTransport2, never()).newStream( |
| any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)); |
| fakeExecutor.runDueTasks(); |
| assertEquals(0, fakeExecutor.numPendingTasks()); |
| // ff1 and wfr1 went through |
| verify(mockRealTransport).newStream(method, headers, failFastCallOptions); |
| verify(mockRealTransport2).newStream(method, headers, waitForReadyCallOptions); |
| assertSame(mockRealStream, ff1.getRealStream()); |
| assertSame(mockRealStream2, wfr1.getRealStream()); |
| // The ff2 has failed due to picker returning an error |
| assertSame(Status.UNAVAILABLE, ((FailingClientStream) ff2.getRealStream()).getError()); |
| // Other streams are still buffered |
| assertNull(ff3.getRealStream()); |
| assertNull(ff4.getRealStream()); |
| assertNull(wfr2.getRealStream()); |
| assertNull(wfr3.getRealStream()); |
| assertNull(wfr4.getRealStream()); |
| |
| // Second reprocess(). All existing streams will proceed. |
| picker = mock(SubchannelPicker.class); |
| when(picker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn( |
| PickResult.withSubchannel(subchannel1), // ff3 |
| PickResult.withSubchannel(subchannel2), // ff4 |
| PickResult.withSubchannel(subchannel2), // wfr2 |
| PickResult.withSubchannel(subchannel1), // wfr3 |
| PickResult.withSubchannel(subchannel2), // wfr4 |
| PickResult.withNoResult()); // wfr5 (not yet created) |
| inOrder = inOrder(picker); |
| assertEquals(0, wfr3Executor.numPendingTasks()); |
| verify(transportListener, never()).transportInUse(false); |
| |
| delayedTransport.reprocess(picker); |
| assertEquals(0, delayedTransport.getPendingStreamsCount()); |
| verify(transportListener).transportInUse(false); |
| inOrder.verify(picker).pickSubchannel(ff3args); // ff3 |
| inOrder.verify(picker).pickSubchannel(ff4args); // ff4 |
| inOrder.verify(picker).pickSubchannel(wfr2args); // wfr2 |
| inOrder.verify(picker).pickSubchannel(wfr3args); // wfr3 |
| inOrder.verify(picker).pickSubchannel(wfr4args); // wfr4 |
| inOrder.verifyNoMoreInteractions(); |
| fakeExecutor.runDueTasks(); |
| assertEquals(0, fakeExecutor.numPendingTasks()); |
| assertSame(mockRealStream, ff3.getRealStream()); |
| assertSame(mockRealStream2, ff4.getRealStream()); |
| assertSame(mockRealStream2, wfr2.getRealStream()); |
| assertSame(mockRealStream2, wfr4.getRealStream()); |
| |
| // If there is an executor in the CallOptions, it will be used to create the real stream. |
| assertNull(wfr3.getRealStream()); |
| wfr3Executor.runDueTasks(); |
| assertSame(mockRealStream, wfr3.getRealStream()); |
| |
| // New streams will use the last picker |
| DelayedStream wfr5 = (DelayedStream) delayedTransport.newStream( |
| method, headers, waitForReadyCallOptions); |
| assertNull(wfr5.getRealStream()); |
| inOrder.verify(picker).pickSubchannel( |
| new PickSubchannelArgsImpl(method, headers, waitForReadyCallOptions)); |
| inOrder.verifyNoMoreInteractions(); |
| assertEquals(1, delayedTransport.getPendingStreamsCount()); |
| |
| // wfr5 will stop delayed transport from terminating |
| delayedTransport.shutdown(); |
| verify(transportListener).transportShutdown(any(Status.class)); |
| verify(transportListener, never()).transportTerminated(); |
| // ... until it's gone |
| picker = mock(SubchannelPicker.class); |
| when(picker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn( |
| PickResult.withSubchannel(subchannel1)); |
| delayedTransport.reprocess(picker); |
| verify(picker).pickSubchannel( |
| new PickSubchannelArgsImpl(method, headers, waitForReadyCallOptions)); |
| fakeExecutor.runDueTasks(); |
| assertSame(mockRealStream, wfr5.getRealStream()); |
| assertEquals(0, delayedTransport.getPendingStreamsCount()); |
| verify(transportListener).transportTerminated(); |
| } |
| |
| @Test |
| public void reprocess_NoPendingStream() { |
| SubchannelPicker picker = mock(SubchannelPicker.class); |
| SubchannelImpl subchannel = mock(SubchannelImpl.class); |
| when(subchannel.obtainActiveTransport()).thenReturn(mockRealTransport); |
| when(picker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn( |
| PickResult.withSubchannel(subchannel)); |
| when(mockRealTransport.newStream(any(MethodDescriptor.class), any(Metadata.class), |
| any(CallOptions.class))).thenReturn(mockRealStream); |
| delayedTransport.reprocess(picker); |
| verifyNoMoreInteractions(picker); |
| verifyNoMoreInteractions(transportListener); |
| |
| // Though picker was not originally used, it will be saved and serve future streams. |
| ClientStream stream = delayedTransport.newStream(method, headers, CallOptions.DEFAULT); |
| verify(picker).pickSubchannel(new PickSubchannelArgsImpl(method, headers, CallOptions.DEFAULT)); |
| verify(subchannel).obtainActiveTransport(); |
| assertSame(mockRealStream, stream); |
| } |
| |
| @Test |
| public void reprocess_newStreamRacesWithReprocess() throws Exception { |
| final CyclicBarrier barrier = new CyclicBarrier(2); |
| // In both phases, we only expect the first pickSubchannel() call to block on the barrier. |
| final AtomicBoolean nextPickShouldWait = new AtomicBoolean(true); |
| ///////// Phase 1: reprocess() twice with the same picker |
| SubchannelPicker picker = mock(SubchannelPicker.class); |
| |
| doAnswer(new Answer<PickResult>() { |
| @Override |
| public PickResult answer(InvocationOnMock invocation) throws Throwable { |
| if (nextPickShouldWait.compareAndSet(true, false)) { |
| try { |
| barrier.await(); |
| return PickResult.withNoResult(); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| } |
| } |
| return PickResult.withNoResult(); |
| } |
| }).when(picker).pickSubchannel(any(PickSubchannelArgs.class)); |
| |
| // Because there is no pending stream yet, it will do nothing but save the picker. |
| delayedTransport.reprocess(picker); |
| verify(picker, never()).pickSubchannel(any(PickSubchannelArgs.class)); |
| |
| Thread sideThread = new Thread("sideThread") { |
| @Override |
| public void run() { |
| // Will call pickSubchannel and wait on barrier |
| delayedTransport.newStream(method, headers, callOptions); |
| } |
| }; |
| sideThread.start(); |
| |
| PickSubchannelArgsImpl args = new PickSubchannelArgsImpl(method, headers, callOptions); |
| PickSubchannelArgsImpl args2 = new PickSubchannelArgsImpl(method, headers2, callOptions); |
| |
| // Is called from sideThread |
| verify(picker, timeout(5000)).pickSubchannel(args); |
| |
| // Because stream has not been buffered (it's still stuck in newStream()), this will do nothing, |
| // but incrementing the picker version. |
| delayedTransport.reprocess(picker); |
| verify(picker).pickSubchannel(args); |
| |
| // Now let the stuck newStream() through |
| barrier.await(5, TimeUnit.SECONDS); |
| |
| sideThread.join(5000); |
| assertFalse("sideThread should've exited", sideThread.isAlive()); |
| // newStream() detects that there has been a new picker while it's stuck, thus will pick again. |
| verify(picker, times(2)).pickSubchannel(args); |
| |
| barrier.reset(); |
| nextPickShouldWait.set(true); |
| |
| ////////// Phase 2: reprocess() with a different picker |
| // Create the second stream |
| Thread sideThread2 = new Thread("sideThread2") { |
| @Override |
| public void run() { |
| // Will call pickSubchannel and wait on barrier |
| delayedTransport.newStream(method, headers2, callOptions); |
| } |
| }; |
| sideThread2.start(); |
| // The second stream will see the first picker |
| verify(picker, timeout(5000)).pickSubchannel(args2); |
| // While the first stream won't use the first picker any more. |
| verify(picker, times(2)).pickSubchannel(args); |
| |
| // Now use a different picker |
| SubchannelPicker picker2 = mock(SubchannelPicker.class); |
| when(picker2.pickSubchannel(any(PickSubchannelArgs.class))) |
| .thenReturn(PickResult.withNoResult()); |
| delayedTransport.reprocess(picker2); |
| // The pending first stream uses the new picker |
| verify(picker2).pickSubchannel(args); |
| // The second stream is still pending in creation, doesn't use the new picker. |
| verify(picker2, never()).pickSubchannel(args2); |
| |
| // Now let the second stream finish creation |
| barrier.await(5, TimeUnit.SECONDS); |
| |
| sideThread2.join(5000); |
| assertFalse("sideThread2 should've exited", sideThread2.isAlive()); |
| // The second stream should see the new picker |
| verify(picker2, timeout(5000)).pickSubchannel(args2); |
| |
| // Wrapping up |
| verify(picker, times(2)).pickSubchannel(args); |
| verify(picker).pickSubchannel(args2); |
| verify(picker2).pickSubchannel(args); |
| verify(picker2).pickSubchannel(args); |
| } |
| } |