blob: 8bca0a48c5a0d4d405550a15af84ed4679b16919 [file] [log] [blame]
Kun Zhang7cb04972017-01-09 14:44:10 -08001/*
2 * Copyright 2015, Google Inc. All rights reserved.
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions are
6 * met:
7 *
8 * * Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above
11 * copyright notice, this list of conditions and the following disclaimer
12 * in the documentation and/or other materials provided with the
13 * distribution.
14 *
15 * * Neither the name of Google Inc. nor the names of its
16 * contributors may be used to endorse or promote products derived from
17 * this software without specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
20 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
21 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
22 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
23 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
26 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
27 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
28 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 */
31
32package io.grpc.internal;
33
34import static io.grpc.ConnectivityState.CONNECTING;
35import static io.grpc.ConnectivityState.READY;
36import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
37import static junit.framework.TestCase.assertNotSame;
38import static org.junit.Assert.assertEquals;
39import static org.junit.Assert.assertFalse;
40import static org.junit.Assert.assertNotNull;
41import static org.junit.Assert.assertNull;
42import static org.junit.Assert.assertSame;
43import static org.junit.Assert.assertTrue;
44import static org.mockito.Matchers.any;
45import static org.mockito.Matchers.anyObject;
46import static org.mockito.Matchers.anyString;
47import static org.mockito.Matchers.eq;
48import static org.mockito.Matchers.same;
49import static org.mockito.Mockito.atLeast;
50import static org.mockito.Mockito.doAnswer;
51import static org.mockito.Mockito.doThrow;
52import static org.mockito.Mockito.inOrder;
53import static org.mockito.Mockito.mock;
54import static org.mockito.Mockito.never;
55import static org.mockito.Mockito.times;
56import static org.mockito.Mockito.verify;
57import static org.mockito.Mockito.verifyNoMoreInteractions;
58import static org.mockito.Mockito.when;
59
60import com.google.common.collect.ImmutableList;
Kun Zhang7cb04972017-01-09 14:44:10 -080061import io.grpc.Attributes;
Kun Zhang7cb04972017-01-09 14:44:10 -080062import io.grpc.CallCredentials;
Carl Mastrangeloefbcd1f2017-01-23 12:29:35 -080063import io.grpc.CallCredentials.MetadataApplier;
Kun Zhang7cb04972017-01-09 14:44:10 -080064import io.grpc.CallOptions;
65import io.grpc.Channel;
66import io.grpc.ClientCall;
67import io.grpc.ClientInterceptor;
68import io.grpc.CompressorRegistry;
69import io.grpc.ConnectivityStateInfo;
70import io.grpc.Context;
71import io.grpc.DecompressorRegistry;
72import io.grpc.EquivalentAddressGroup;
73import io.grpc.IntegerMarshaller;
Kun Zhanga9bd9472017-02-21 17:11:03 -080074import io.grpc.LoadBalancer;
75import io.grpc.LoadBalancer.Helper;
76import io.grpc.LoadBalancer.PickResult;
77import io.grpc.LoadBalancer.Subchannel;
78import io.grpc.LoadBalancer.SubchannelPicker;
Kun Zhang7cb04972017-01-09 14:44:10 -080079import io.grpc.ManagedChannel;
80import io.grpc.Metadata;
81import io.grpc.MethodDescriptor;
Carl Mastrangeloefbcd1f2017-01-23 12:29:35 -080082import io.grpc.MethodDescriptor.MethodType;
Kun Zhang7cb04972017-01-09 14:44:10 -080083import io.grpc.NameResolver;
84import io.grpc.ResolvedServerInfo;
85import io.grpc.ResolvedServerInfoGroup;
86import io.grpc.SecurityLevel;
87import io.grpc.Status;
88import io.grpc.StringMarshaller;
89import io.grpc.internal.TestUtils.MockClientTransportInfo;
90import io.grpc.internal.testing.StatsTestUtils.FakeStatsContextFactory;
Carl Mastrangelo89bc2cd2017-01-26 13:43:06 -080091import java.net.SocketAddress;
92import java.net.URI;
93import java.util.ArrayList;
94import java.util.Arrays;
95import java.util.Collections;
96import java.util.LinkedList;
97import java.util.List;
98import java.util.concurrent.BlockingQueue;
99import java.util.concurrent.Executor;
100import java.util.concurrent.ScheduledExecutorService;
101import java.util.concurrent.TimeUnit;
102import java.util.concurrent.atomic.AtomicLong;
Kun Zhang7cb04972017-01-09 14:44:10 -0800103import org.junit.After;
104import org.junit.Before;
105import org.junit.Rule;
106import org.junit.Test;
107import org.junit.rules.ExpectedException;
108import org.junit.runner.RunWith;
109import org.junit.runners.JUnit4;
110import org.mockito.ArgumentCaptor;
111import org.mockito.Captor;
112import org.mockito.InOrder;
113import org.mockito.Matchers;
114import org.mockito.Mock;
115import org.mockito.MockitoAnnotations;
116import org.mockito.invocation.InvocationOnMock;
117import org.mockito.stubbing.Answer;
118
Kun Zhanga9bd9472017-02-21 17:11:03 -0800119/** Unit tests for {@link ManagedChannelImpl}. */
Kun Zhang7cb04972017-01-09 14:44:10 -0800120@RunWith(JUnit4.class)
Kun Zhanga9bd9472017-02-21 17:11:03 -0800121public class ManagedChannelImplTest {
Kun Zhang7cb04972017-01-09 14:44:10 -0800122 private static final List<ClientInterceptor> NO_INTERCEPTOR =
123 Collections.<ClientInterceptor>emptyList();
124 private static final Attributes NAME_RESOLVER_PARAMS =
125 Attributes.newBuilder().set(NameResolver.Factory.PARAMS_DEFAULT_PORT, 447).build();
Carl Mastrangeloefbcd1f2017-01-23 12:29:35 -0800126 private static final MethodDescriptor<String, Integer> method =
127 MethodDescriptor.<String, Integer>newBuilder()
128 .setType(MethodType.UNKNOWN)
129 .setFullMethodName("/service/method")
130 .setRequestMarshaller(new StringMarshaller())
131 .setResponseMarshaller(new IntegerMarshaller())
132 .build();
Kun Zhang7cb04972017-01-09 14:44:10 -0800133 private static final Attributes.Key<String> SUBCHANNEL_ATTR_KEY =
134 Attributes.Key.of("subchannel-attr-key");
135 private final String serviceName = "fake.example.com";
136 private final String authority = serviceName;
137 private final String userAgent = "userAgent";
138 private final String target = "fake://" + serviceName;
139 private URI expectedUri;
140 private final SocketAddress socketAddress = new SocketAddress() {};
141 private final EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(socketAddress);
142 private final ResolvedServerInfo server = new ResolvedServerInfo(socketAddress, Attributes.EMPTY);
143 private final FakeClock timer = new FakeClock();
144 private final FakeClock executor = new FakeClock();
145 private final FakeClock oobExecutor = new FakeClock();
146 private final FakeStatsContextFactory statsCtxFactory = new FakeStatsContextFactory();
147
148 @Rule public final ExpectedException thrown = ExpectedException.none();
149
Kun Zhanga9bd9472017-02-21 17:11:03 -0800150 private ManagedChannelImpl channel;
Kun Zhang7cb04972017-01-09 14:44:10 -0800151 private Helper helper;
152 @Captor
153 private ArgumentCaptor<Status> statusCaptor;
154 @Captor
155 private ArgumentCaptor<StatsTraceContext> statsTraceCtxCaptor;
156 @Mock
Kun Zhanga9bd9472017-02-21 17:11:03 -0800157 private LoadBalancer.Factory mockLoadBalancerFactory;
Kun Zhang7cb04972017-01-09 14:44:10 -0800158 @Mock
Kun Zhanga9bd9472017-02-21 17:11:03 -0800159 private LoadBalancer mockLoadBalancer;
Kun Zhang7cb04972017-01-09 14:44:10 -0800160 @Captor
161 private ArgumentCaptor<ConnectivityStateInfo> stateInfoCaptor;
162 @Mock
163 private SubchannelPicker mockPicker;
164 @Mock
165 private ClientTransportFactory mockTransportFactory;
166 @Mock
167 private ClientCall.Listener<Integer> mockCallListener;
168 @Mock
169 private ClientCall.Listener<Integer> mockCallListener2;
170 @Mock
171 private ClientCall.Listener<Integer> mockCallListener3;
172 @Mock
173 private ClientCall.Listener<Integer> mockCallListener4;
174 @Mock
175 private ClientCall.Listener<Integer> mockCallListener5;
176 @Mock
177 private ObjectPool<ScheduledExecutorService> timerServicePool;
178 @Mock
179 private ObjectPool<Executor> executorPool;
180 @Mock
181 private ObjectPool<Executor> oobExecutorPool;
182 @Mock
183 private CallCredentials creds;
184 private BlockingQueue<MockClientTransportInfo> transports;
185
186 private ArgumentCaptor<ClientStreamListener> streamListenerCaptor =
187 ArgumentCaptor.forClass(ClientStreamListener.class);
188
189 private void createChannel(
190 NameResolver.Factory nameResolverFactory, List<ClientInterceptor> interceptors) {
Kun Zhanga9bd9472017-02-21 17:11:03 -0800191 channel = new ManagedChannelImpl(target, new FakeBackoffPolicyProvider(),
Kun Zhang7cb04972017-01-09 14:44:10 -0800192 nameResolverFactory, NAME_RESOLVER_PARAMS, mockLoadBalancerFactory,
193 mockTransportFactory, DecompressorRegistry.getDefaultInstance(),
194 CompressorRegistry.getDefaultInstance(), timerServicePool, executorPool, oobExecutorPool,
Kun Zhanga9bd9472017-02-21 17:11:03 -0800195 timer.getStopwatchSupplier(), ManagedChannelImpl.IDLE_TIMEOUT_MILLIS_DISABLE, userAgent,
Kun Zhang7cb04972017-01-09 14:44:10 -0800196 interceptors, statsCtxFactory);
197 // Force-exit the initial idle-mode
198 channel.exitIdleMode();
199 assertEquals(0, timer.numPendingTasks());
200
201 ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(null);
202 verify(mockLoadBalancerFactory).newLoadBalancer(helperCaptor.capture());
203 helper = helperCaptor.getValue();
204 }
205
206 @Before
207 public void setUp() throws Exception {
208 MockitoAnnotations.initMocks(this);
209 expectedUri = new URI(target);
210 when(mockLoadBalancerFactory.newLoadBalancer(any(Helper.class))).thenReturn(mockLoadBalancer);
211 transports = TestUtils.captureTransports(mockTransportFactory);
212 when(timerServicePool.getObject()).thenReturn(timer.getScheduledExecutorService());
213 when(executorPool.getObject()).thenReturn(executor.getScheduledExecutorService());
214 when(oobExecutorPool.getObject()).thenReturn(oobExecutor.getScheduledExecutorService());
215 }
216
217 @After
218 public void allPendingTasksAreRun() throws Exception {
219 // The "never" verifications in the tests only hold up if all due tasks are done.
220 // As for timer, although there may be scheduled tasks in a future time, since we don't test
221 // any time-related behavior in this test suite, we only care the tasks that are due. This
222 // would ignore any time-sensitive tasks, e.g., back-off and the idle timer.
223 assertTrue(timer.getDueTasks() + " should be empty", timer.getDueTasks().isEmpty());
224 assertEquals(executor.getPendingTasks() + " should be empty", 0, executor.numPendingTasks());
225 }
226
227 /**
Łukasz Strzałkowski63a77652017-02-21 09:48:25 -0800228 * The counterpart of {@link ManagedChannelImplIdlenessTest#enterIdleModeAfterForceExit}.
Kun Zhang7cb04972017-01-09 14:44:10 -0800229 */
230 @Test
231 @SuppressWarnings("unchecked")
232 public void idleModeDisabled() {
233 createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
234
235 // In this test suite, the channel is always created with idle mode disabled.
236 // No task is scheduled to enter idle mode
237 assertEquals(0, timer.numPendingTasks());
238 assertEquals(0, executor.numPendingTasks());
239 }
240
241 @Test
242 public void immediateDeadlineExceeded() {
243 createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
244 ClientCall<String, Integer> call =
245 channel.newCall(method, CallOptions.DEFAULT.withDeadlineAfter(0, TimeUnit.NANOSECONDS));
246 call.start(mockCallListener, new Metadata());
247 assertEquals(1, executor.runDueTasks());
248
249 verify(mockCallListener).onClose(statusCaptor.capture(), any(Metadata.class));
250 Status status = statusCaptor.getValue();
251 assertSame(Status.DEADLINE_EXCEEDED.getCode(), status.getCode());
252 }
253
254 @Test
255 public void shutdownWithNoTransportsEverCreated() {
256 createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
257 verify(executorPool).getObject();
258 verify(timerServicePool).getObject();
259 verify(executorPool, never()).returnObject(anyObject());
260 verify(timerServicePool, never()).returnObject(anyObject());
261 verifyNoMoreInteractions(mockTransportFactory);
262 channel.shutdown();
263 assertTrue(channel.isShutdown());
264 assertTrue(channel.isTerminated());
265 verify(executorPool).returnObject(executor.getScheduledExecutorService());
266 verify(timerServicePool).returnObject(timer.getScheduledExecutorService());
267 }
268
269 @Test
270 public void callsAndShutdown() {
271 subtestCallsAndShutdown(false, false);
272 }
273
274 @Test
275 public void callsAndShutdownNow() {
276 subtestCallsAndShutdown(true, false);
277 }
278
279 /** Make sure shutdownNow() after shutdown() has an effect. */
280 @Test
281 public void callsAndShutdownAndShutdownNow() {
282 subtestCallsAndShutdown(false, true);
283 }
284
285 private void subtestCallsAndShutdown(boolean shutdownNow, boolean shutdownNowAfterShutdown) {
286 FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory(true);
287 createChannel(nameResolverFactory, NO_INTERCEPTOR);
288 verify(executorPool).getObject();
289 verify(timerServicePool).getObject();
290 ClientStream mockStream = mock(ClientStream.class);
291 ClientStream mockStream2 = mock(ClientStream.class);
292 Metadata headers = new Metadata();
293 Metadata headers2 = new Metadata();
294
295 // Configure the picker so that first RPC goes to delayed transport, and second RPC goes to
296 // real transport.
297 Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
298 subchannel.requestConnection();
299 verify(mockTransportFactory).newClientTransport(
300 any(SocketAddress.class), any(String.class), any(String.class));
301 MockClientTransportInfo transportInfo = transports.poll();
302 ConnectionClientTransport mockTransport = transportInfo.transport;
303 verify(mockTransport).start(any(ManagedClientTransport.Listener.class));
304 ManagedClientTransport.Listener transportListener = transportInfo.listener;
305 when(mockTransport.newStream(same(method), same(headers), same(CallOptions.DEFAULT),
306 any(StatsTraceContext.class)))
307 .thenReturn(mockStream);
308 when(mockTransport.newStream(same(method), same(headers2), same(CallOptions.DEFAULT),
309 any(StatsTraceContext.class)))
310 .thenReturn(mockStream2);
311 transportListener.transportReady();
312 when(mockPicker.pickSubchannel(any(Attributes.class), same(headers))).thenReturn(
313 PickResult.withNoResult());
314 when(mockPicker.pickSubchannel(any(Attributes.class), same(headers2))).thenReturn(
315 PickResult.withSubchannel(subchannel));
316 helper.updatePicker(mockPicker);
317
318 // First RPC, will be pending
319 ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
320 verifyNoMoreInteractions(mockTransportFactory);
321 call.start(mockCallListener, headers);
322
323 verify(mockTransport, never()).newStream(same(method), same(headers), same(CallOptions.DEFAULT),
324 any(StatsTraceContext.class));
325 statsCtxFactory.pollContextOrFail();
326
327 // Second RPC, will be assigned to the real transport
328 ClientCall<String, Integer> call2 = channel.newCall(method, CallOptions.DEFAULT);
329 call2.start(mockCallListener2, headers2);
330 verify(mockTransport).newStream(same(method), same(headers2), same(CallOptions.DEFAULT),
331 statsTraceCtxCaptor.capture());
332 assertEquals(statsCtxFactory.pollContextOrFail(),
333 statsTraceCtxCaptor.getValue().getStatsContext());
334 verify(mockTransport).newStream(same(method), same(headers2), same(CallOptions.DEFAULT),
335 statsTraceCtxCaptor.capture());
336 verify(mockStream2).start(any(ClientStreamListener.class));
337
338 // Shutdown
339 if (shutdownNow) {
340 channel.shutdownNow();
341 } else {
342 channel.shutdown();
343 if (shutdownNowAfterShutdown) {
344 channel.shutdownNow();
345 shutdownNow = true;
346 }
347 }
348 assertTrue(channel.isShutdown());
349 assertFalse(channel.isTerminated());
350 assertEquals(1, nameResolverFactory.resolvers.size());
351 verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class));
352
353 // Further calls should fail without going to the transport
354 ClientCall<String, Integer> call3 = channel.newCall(method, CallOptions.DEFAULT);
355 call3.start(mockCallListener3, headers2);
356 timer.runDueTasks();
357 executor.runDueTasks();
358
359 verify(mockCallListener3).onClose(statusCaptor.capture(), any(Metadata.class));
360 assertSame(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode());
361
362 if (shutdownNow) {
363 // LoadBalancer and NameResolver are shut down as soon as delayed transport is terminated.
364 verify(mockLoadBalancer).shutdown();
365 assertTrue(nameResolverFactory.resolvers.get(0).shutdown);
366 // call should have been aborted by delayed transport
367 executor.runDueTasks();
Kun Zhanga9bd9472017-02-21 17:11:03 -0800368 verify(mockCallListener).onClose(same(ManagedChannelImpl.SHUTDOWN_NOW_STATUS),
Kun Zhang7cb04972017-01-09 14:44:10 -0800369 any(Metadata.class));
370 } else {
371 // LoadBalancer and NameResolver are still running.
372 verify(mockLoadBalancer, never()).shutdown();
373 assertFalse(nameResolverFactory.resolvers.get(0).shutdown);
374 // call and call2 are still alive, and can still be assigned to a real transport
375 SubchannelPicker picker2 = mock(SubchannelPicker.class);
376 when(picker2.pickSubchannel(any(Attributes.class), same(headers))).thenReturn(
377 PickResult.withSubchannel(subchannel));
378 helper.updatePicker(picker2);
379 executor.runDueTasks();
380 verify(mockTransport).newStream(same(method), same(headers), same(CallOptions.DEFAULT),
381 any(StatsTraceContext.class));
382 verify(mockStream).start(any(ClientStreamListener.class));
383 }
384
385 // After call is moved out of delayed transport, LoadBalancer, NameResolver and the transports
386 // will be shutdown.
387 verify(mockLoadBalancer).shutdown();
388 assertTrue(nameResolverFactory.resolvers.get(0).shutdown);
389
390 if (shutdownNow) {
391 // Channel shutdownNow() all subchannels after shutting down LoadBalancer
Kun Zhanga9bd9472017-02-21 17:11:03 -0800392 verify(mockTransport).shutdownNow(ManagedChannelImpl.SHUTDOWN_NOW_STATUS);
Kun Zhang7cb04972017-01-09 14:44:10 -0800393 } else {
394 verify(mockTransport, never()).shutdownNow(any(Status.class));
395 }
396 // LoadBalancer should shutdown the subchannel
397 subchannel.shutdown();
398 verify(mockTransport).shutdown();
399
400 // Killing the remaining real transport will terminate the channel
401 transportListener.transportShutdown(Status.UNAVAILABLE);
402 assertFalse(channel.isTerminated());
403 verify(executorPool, never()).returnObject(anyObject());
404 verify(timerServicePool, never()).returnObject(anyObject());
405 transportListener.transportTerminated();
406 assertTrue(channel.isTerminated());
407 verify(executorPool).returnObject(executor.getScheduledExecutorService());
408 verify(timerServicePool).returnObject(timer.getScheduledExecutorService());
409 verifyNoMoreInteractions(oobExecutorPool);
410
411 verify(mockTransportFactory).close();
412 verifyNoMoreInteractions(mockTransportFactory);
413 verify(mockTransport, atLeast(0)).getLogId();
414 verifyNoMoreInteractions(mockTransport);
415 }
416
417 @Test
418 public void shutdownNowWithMultipleOobChannels() {
419 }
420
421 @Test
422 public void interceptor() throws Exception {
423 final AtomicLong atomic = new AtomicLong();
424 ClientInterceptor interceptor = new ClientInterceptor() {
425 @Override
426 public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> interceptCall(
427 MethodDescriptor<RequestT, ResponseT> method, CallOptions callOptions,
428 Channel next) {
429 atomic.set(1);
430 return next.newCall(method, callOptions);
431 }
432 };
433 createChannel(new FakeNameResolverFactory(true), Arrays.asList(interceptor));
434 assertNotNull(channel.newCall(method, CallOptions.DEFAULT));
435 assertEquals(1, atomic.get());
436 }
437
438 @Test
439 public void callOptionsExecutor() {
440 Metadata headers = new Metadata();
441 ClientStream mockStream = mock(ClientStream.class);
442 FakeClock callExecutor = new FakeClock();
443 createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
444
445 // Start a call with a call executor
446 CallOptions options =
447 CallOptions.DEFAULT.withExecutor(callExecutor.getScheduledExecutorService());
448 ClientCall<String, Integer> call = channel.newCall(method, options);
449 call.start(mockCallListener, headers);
450
451 // Make the transport available
452 Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
453 verify(mockTransportFactory, never()).newClientTransport(
454 any(SocketAddress.class), any(String.class), any(String.class));
455 subchannel.requestConnection();
456 verify(mockTransportFactory).newClientTransport(
457 any(SocketAddress.class), any(String.class), any(String.class));
458 MockClientTransportInfo transportInfo = transports.poll();
459 ConnectionClientTransport mockTransport = transportInfo.transport;
460 ManagedClientTransport.Listener transportListener = transportInfo.listener;
461 when(mockTransport.newStream(same(method), same(headers), any(CallOptions.class),
462 any(StatsTraceContext.class)))
463 .thenReturn(mockStream);
464 transportListener.transportReady();
465 when(mockPicker.pickSubchannel(any(Attributes.class), any(Metadata.class)))
466 .thenReturn(PickResult.withSubchannel(subchannel));
467 assertEquals(0, callExecutor.numPendingTasks());
468 helper.updatePicker(mockPicker);
469
470 // Real streams are started in the call executor if they were previously buffered.
471 assertEquals(1, callExecutor.runDueTasks());
472 verify(mockTransport).newStream(same(method), same(headers), same(options),
473 any(StatsTraceContext.class));
474 verify(mockStream).start(streamListenerCaptor.capture());
475
476 // Call listener callbacks are also run in the call executor
477 ClientStreamListener streamListener = streamListenerCaptor.getValue();
478 Metadata trailers = new Metadata();
479 assertEquals(0, callExecutor.numPendingTasks());
480 streamListener.closed(Status.CANCELLED, trailers);
481 verify(mockCallListener, never()).onClose(same(Status.CANCELLED), same(trailers));
482 assertEquals(1, callExecutor.runDueTasks());
483 verify(mockCallListener).onClose(same(Status.CANCELLED), same(trailers));
484 }
485
486 @Test
487 public void nameResolutionFailed() {
488 Status error = Status.UNAVAILABLE.withCause(new Throwable("fake name resolution error"));
489
490 // Name resolution is started as soon as channel is created.
491 createChannel(new FailingNameResolverFactory(error), NO_INTERCEPTOR);
492 verify(mockLoadBalancer).handleNameResolutionError(same(error));
493 }
494
495 @Test
496 public void nameResolverReturnsEmptySubLists() {
497 String errorDescription = "NameResolver returned an empty list";
498
499 // Pass a FakeNameResolverFactory with an empty list
500 createChannel(new FakeNameResolverFactory(), NO_INTERCEPTOR);
501
502 // LoadBalancer received the error
503 verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class));
504 verify(mockLoadBalancer).handleNameResolutionError(statusCaptor.capture());
505 Status status = statusCaptor.getValue();
506 assertSame(Status.Code.UNAVAILABLE, status.getCode());
507 assertEquals(errorDescription, status.getDescription());
508 }
509
510 @Test
511 public void loadBalancerThrowsInHandleResolvedAddresses() {
512 RuntimeException ex = new RuntimeException("simulated");
513 // Delay the success of name resolution until allResolved() is called
514 FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory(false);
515 createChannel(nameResolverFactory, NO_INTERCEPTOR);
516
517 verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class));
518 doThrow(ex).when(mockLoadBalancer).handleResolvedAddresses(
519 Matchers.<List<ResolvedServerInfoGroup>>anyObject(), any(Attributes.class));
520
521 // NameResolver returns addresses.
522 nameResolverFactory.allResolved();
523
524 // The LoadBalancer will receive the error that it has thrown.
525 verify(mockLoadBalancer).handleNameResolutionError(statusCaptor.capture());
526 Status status = statusCaptor.getValue();
527 assertSame(Status.Code.INTERNAL, status.getCode());
528 assertSame(ex, status.getCause());
529 }
530
531 @Test
532 public void nameResolvedAfterChannelShutdown() {
533 // Delay the success of name resolution until allResolved() is called.
534 FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory(false);
535 createChannel(nameResolverFactory, NO_INTERCEPTOR);
536
537 channel.shutdown();
538
539 assertTrue(channel.isShutdown());
540 assertTrue(channel.isTerminated());
541 verify(mockLoadBalancer).shutdown();
542 // Name resolved after the channel is shut down, which is possible if the name resolution takes
543 // time and is not cancellable. The resolved address will be dropped.
544 nameResolverFactory.allResolved();
545 verifyNoMoreInteractions(mockLoadBalancer);
546 }
547
548 /**
549 * Verify that if the first resolved address points to a server that cannot be connected, the call
550 * will end up with the second address which works.
551 */
552 @Test
553 public void firstResolvedServerFailedToConnect() throws Exception {
554 final SocketAddress goodAddress = new SocketAddress() {
555 @Override public String toString() {
556 return "goodAddress";
557 }
558 };
559 final SocketAddress badAddress = new SocketAddress() {
560 @Override public String toString() {
561 return "badAddress";
562 }
563 };
564 final ResolvedServerInfo goodServer = new ResolvedServerInfo(goodAddress, Attributes.EMPTY);
565 final ResolvedServerInfo badServer = new ResolvedServerInfo(badAddress, Attributes.EMPTY);
566 InOrder inOrder = inOrder(mockLoadBalancer);
567
568 ResolvedServerInfoGroup serverInfoGroup = ResolvedServerInfoGroup.builder()
569 .add(badServer)
570 .add(goodServer)
571 .build();
572 FakeNameResolverFactory nameResolverFactory =
573 new FakeNameResolverFactory(serverInfoGroup.getResolvedServerInfoList());
574 createChannel(nameResolverFactory, NO_INTERCEPTOR);
575
576 // Start the call
577 ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
578 Metadata headers = new Metadata();
579 call.start(mockCallListener, headers);
580 executor.runDueTasks();
581
582 // Simulate name resolution results
583 inOrder.verify(mockLoadBalancer).handleResolvedAddresses(
584 eq(Arrays.asList(serverInfoGroup)), eq(Attributes.EMPTY));
585 Subchannel subchannel = helper.createSubchannel(
586 serverInfoGroup.toEquivalentAddressGroup(), Attributes.EMPTY);
587 when(mockPicker.pickSubchannel(any(Attributes.class), any(Metadata.class)))
588 .thenReturn(PickResult.withSubchannel(subchannel));
589 subchannel.requestConnection();
590 inOrder.verify(mockLoadBalancer).handleSubchannelState(
591 same(subchannel), stateInfoCaptor.capture());
592 assertEquals(CONNECTING, stateInfoCaptor.getValue().getState());
593
594 // The channel will starts with the first address (badAddress)
595 verify(mockTransportFactory)
596 .newClientTransport(same(badAddress), any(String.class), any(String.class));
597 verify(mockTransportFactory, times(0))
598 .newClientTransport(same(goodAddress), any(String.class), any(String.class));
599
600 MockClientTransportInfo badTransportInfo = transports.poll();
601 // Which failed to connect
602 badTransportInfo.listener.transportShutdown(Status.UNAVAILABLE);
603 inOrder.verifyNoMoreInteractions();
604
605 // The channel then try the second address (goodAddress)
606 verify(mockTransportFactory)
607 .newClientTransport(same(goodAddress), any(String.class), any(String.class));
608 MockClientTransportInfo goodTransportInfo = transports.poll();
609 when(goodTransportInfo.transport.newStream(
610 any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class),
611 any(StatsTraceContext.class)))
612 .thenReturn(mock(ClientStream.class));
Carl Mastrangeloefbcd1f2017-01-23 12:29:35 -0800613
Kun Zhang7cb04972017-01-09 14:44:10 -0800614 goodTransportInfo.listener.transportReady();
615 inOrder.verify(mockLoadBalancer).handleSubchannelState(
616 same(subchannel), stateInfoCaptor.capture());
617 assertEquals(READY, stateInfoCaptor.getValue().getState());
618
619 // A typical LoadBalancer will call this once the subchannel becomes READY
620 helper.updatePicker(mockPicker);
621 // Delayed transport uses the app executor to create real streams.
622 executor.runDueTasks();
623
624 verify(goodTransportInfo.transport).newStream(same(method), same(headers),
625 same(CallOptions.DEFAULT), any(StatsTraceContext.class));
626 // The bad transport was never used.
627 verify(badTransportInfo.transport, times(0)).newStream(any(MethodDescriptor.class),
628 any(Metadata.class), any(CallOptions.class), any(StatsTraceContext.class));
629 }
630
631 /**
632 * Verify that if all resolved addresses failed to connect, a fail-fast call will fail, while a
633 * wait-for-ready call will still be buffered.
634 */
635 @Test
636 public void allServersFailedToConnect() throws Exception {
637 final SocketAddress addr1 = new SocketAddress() {
638 @Override public String toString() {
639 return "addr1";
640 }
641 };
642 final SocketAddress addr2 = new SocketAddress() {
643 @Override public String toString() {
644 return "addr2";
645 }
646 };
647 final ResolvedServerInfo server1 = new ResolvedServerInfo(addr1, Attributes.EMPTY);
648 final ResolvedServerInfo server2 = new ResolvedServerInfo(addr2, Attributes.EMPTY);
649 InOrder inOrder = inOrder(mockLoadBalancer);
650
651 ResolvedServerInfoGroup serverInfoGroup = ResolvedServerInfoGroup.builder()
652 .add(server1)
653 .add(server2)
654 .build();
655
656 FakeNameResolverFactory nameResolverFactory =
657 new FakeNameResolverFactory(serverInfoGroup.getResolvedServerInfoList());
658 createChannel(nameResolverFactory, NO_INTERCEPTOR);
659
660 // Start a wait-for-ready call
661 ClientCall<String, Integer> call =
662 channel.newCall(method, CallOptions.DEFAULT.withWaitForReady());
663 Metadata headers = new Metadata();
664 call.start(mockCallListener, headers);
665 // ... and a fail-fast call
666 ClientCall<String, Integer> call2 =
667 channel.newCall(method, CallOptions.DEFAULT.withoutWaitForReady());
668 call2.start(mockCallListener2, headers);
669 executor.runDueTasks();
670
671 // Simulate name resolution results
672 inOrder.verify(mockLoadBalancer).handleResolvedAddresses(
673 eq(Arrays.asList(serverInfoGroup)), eq(Attributes.EMPTY));
674 Subchannel subchannel = helper.createSubchannel(
675 serverInfoGroup.toEquivalentAddressGroup(), Attributes.EMPTY);
676 when(mockPicker.pickSubchannel(any(Attributes.class), any(Metadata.class)))
677 .thenReturn(PickResult.withSubchannel(subchannel));
678 subchannel.requestConnection();
679 inOrder.verify(mockLoadBalancer).handleSubchannelState(
680 same(subchannel), stateInfoCaptor.capture());
681 assertEquals(CONNECTING, stateInfoCaptor.getValue().getState());
682
683 // Connecting to server1, which will fail
684 verify(mockTransportFactory)
685 .newClientTransport(same(addr1), any(String.class), any(String.class));
686 verify(mockTransportFactory, times(0))
687 .newClientTransport(same(addr2), any(String.class), any(String.class));
688 MockClientTransportInfo transportInfo1 = transports.poll();
689 transportInfo1.listener.transportShutdown(Status.UNAVAILABLE);
690
691 // Connecting to server2, which will fail too
692 verify(mockTransportFactory)
693 .newClientTransport(same(addr2), any(String.class), any(String.class));
694 MockClientTransportInfo transportInfo2 = transports.poll();
695 Status server2Error = Status.UNAVAILABLE.withDescription("Server2 failed to connect");
696 transportInfo2.listener.transportShutdown(server2Error);
697
698 // ... which makes the subchannel enter TRANSIENT_FAILURE. The last error Status is propagated
699 // to LoadBalancer.
700 inOrder.verify(mockLoadBalancer).handleSubchannelState(
701 same(subchannel), stateInfoCaptor.capture());
702 assertEquals(TRANSIENT_FAILURE, stateInfoCaptor.getValue().getState());
703 assertSame(server2Error, stateInfoCaptor.getValue().getStatus());
704
705 // A typical LoadBalancer would create a picker with error
706 SubchannelPicker picker2 = mock(SubchannelPicker.class);
707 when(picker2.pickSubchannel(any(Attributes.class), any(Metadata.class)))
708 .thenReturn(PickResult.withError(server2Error));
709 helper.updatePicker(picker2);
710 executor.runDueTasks();
711
712 // ... which fails the fail-fast call
713 verify(mockCallListener2).onClose(same(server2Error), any(Metadata.class));
714 // ... while the wait-for-ready call stays
715 verifyNoMoreInteractions(mockCallListener);
716 // No real stream was ever created
717 verify(transportInfo1.transport, times(0))
718 .newStream(any(MethodDescriptor.class), any(Metadata.class));
719 verify(transportInfo2.transport, times(0))
720 .newStream(any(MethodDescriptor.class), any(Metadata.class));
721 }
722
723 @Test
724 public void subchannels() {
725 createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
726
727 // createSubchannel() always return a new Subchannel
728 Attributes attrs1 = Attributes.newBuilder().set(SUBCHANNEL_ATTR_KEY, "attr1").build();
729 Attributes attrs2 = Attributes.newBuilder().set(SUBCHANNEL_ATTR_KEY, "attr2").build();
730 Subchannel sub1 = helper.createSubchannel(addressGroup, attrs1);
731 Subchannel sub2 = helper.createSubchannel(addressGroup, attrs2);
732 assertNotSame(sub1, sub2);
733 assertNotSame(attrs1, attrs2);
734 assertSame(attrs1, sub1.getAttributes());
735 assertSame(attrs2, sub2.getAttributes());
736 assertSame(addressGroup, sub1.getAddresses());
737 assertSame(addressGroup, sub2.getAddresses());
738
739 // requestConnection()
740 verify(mockTransportFactory, never()).newClientTransport(
741 any(SocketAddress.class), any(String.class), any(String.class));
742 sub1.requestConnection();
743 verify(mockTransportFactory).newClientTransport(socketAddress, authority, userAgent);
744 MockClientTransportInfo transportInfo1 = transports.poll();
745 assertNotNull(transportInfo1);
746
747 sub2.requestConnection();
748 verify(mockTransportFactory, times(2)).newClientTransport(socketAddress, authority, userAgent);
749 MockClientTransportInfo transportInfo2 = transports.poll();
750 assertNotNull(transportInfo2);
751
752 sub1.requestConnection();
753 sub2.requestConnection();
754 verify(mockTransportFactory, times(2)).newClientTransport(socketAddress, authority, userAgent);
755
756 // shutdown() has a delay
757 sub1.shutdown();
Kun Zhanga9bd9472017-02-21 17:11:03 -0800758 timer.forwardTime(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS - 1, TimeUnit.SECONDS);
Kun Zhang7cb04972017-01-09 14:44:10 -0800759 sub1.shutdown();
760 verify(transportInfo1.transport, never()).shutdown();
761 timer.forwardTime(1, TimeUnit.SECONDS);
762 verify(transportInfo1.transport).shutdown();
763
764 // ... but not after Channel is terminating
765 verify(mockLoadBalancer, never()).shutdown();
766 channel.shutdown();
767 verify(mockLoadBalancer).shutdown();
768 verify(transportInfo2.transport, never()).shutdown();
769
770 sub2.shutdown();
771 verify(transportInfo2.transport).shutdown();
772 }
773
774 @Test
775 public void subchannelsWhenChannelShutdownNow() {
776 createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
777 Subchannel sub1 = helper.createSubchannel(addressGroup, Attributes.EMPTY);
778 Subchannel sub2 = helper.createSubchannel(addressGroup, Attributes.EMPTY);
779 sub1.requestConnection();
780 sub2.requestConnection();
781
782 assertEquals(2, transports.size());
783 MockClientTransportInfo ti1 = transports.poll();
784 MockClientTransportInfo ti2 = transports.poll();
785
786 ti1.listener.transportReady();
787 ti2.listener.transportReady();
788
789 channel.shutdownNow();
790 verify(ti1.transport).shutdownNow(any(Status.class));
791 verify(ti2.transport).shutdownNow(any(Status.class));
792
793 ti1.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now"));
794 ti2.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now"));
795 ti1.listener.transportTerminated();
796
797 assertFalse(channel.isTerminated());
798 ti2.listener.transportTerminated();
799 assertTrue(channel.isTerminated());
800 }
801
802 @Test
803 public void subchannelsNoConnectionShutdown() {
804 createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
805 Subchannel sub1 = helper.createSubchannel(addressGroup, Attributes.EMPTY);
806 Subchannel sub2 = helper.createSubchannel(addressGroup, Attributes.EMPTY);
807
808 channel.shutdown();
809 verify(mockLoadBalancer).shutdown();
810 sub1.shutdown();
811 assertFalse(channel.isTerminated());
812 sub2.shutdown();
813 assertTrue(channel.isTerminated());
814 verify(mockTransportFactory, never()).newClientTransport(any(SocketAddress.class), anyString(),
815 anyString());
816 }
817
818 @Test
819 public void subchannelsNoConnectionShutdownNow() {
820 createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
Eric Anderson1e99b292017-01-19 12:16:05 -0800821 helper.createSubchannel(addressGroup, Attributes.EMPTY);
822 helper.createSubchannel(addressGroup, Attributes.EMPTY);
Kun Zhang7cb04972017-01-09 14:44:10 -0800823 channel.shutdownNow();
824
825 verify(mockLoadBalancer).shutdown();
826 // Channel's shutdownNow() will call shutdownNow() on all subchannels and oobchannels.
827 // Therefore, channel is terminated without relying on LoadBalancer to shutdown subchannels.
828 assertTrue(channel.isTerminated());
829 verify(mockTransportFactory, never()).newClientTransport(any(SocketAddress.class), anyString(),
830 anyString());
831 }
832
833 @Test
834 public void oobchannels() {
835 createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
836
837 ManagedChannel oob1 = helper.createOobChannel(addressGroup, "oob1authority");
838 ManagedChannel oob2 = helper.createOobChannel(addressGroup, "oob2authority");
839 verify(oobExecutorPool, times(2)).getObject();
840
841 assertEquals("oob1authority", oob1.authority());
842 assertEquals("oob2authority", oob2.authority());
843
844 // OOB channels create connections lazily. A new call will initiate the connection.
845 Metadata headers = new Metadata();
846 ClientCall<String, Integer> call = oob1.newCall(method, CallOptions.DEFAULT);
847 call.start(mockCallListener, headers);
848 verify(mockTransportFactory).newClientTransport(socketAddress, "oob1authority", userAgent);
849 MockClientTransportInfo transportInfo = transports.poll();
850 assertNotNull(transportInfo);
851
852 assertEquals(0, oobExecutor.numPendingTasks());
853 transportInfo.listener.transportReady();
854 assertEquals(1, oobExecutor.runDueTasks());
855 verify(transportInfo.transport).newStream(same(method), same(headers),
856 same(CallOptions.DEFAULT), any(StatsTraceContext.class));
857
858 // The transport goes away
859 transportInfo.listener.transportShutdown(Status.UNAVAILABLE);
860 transportInfo.listener.transportTerminated();
861
862 // A new call will trigger a new transport
863 ClientCall<String, Integer> call2 = oob1.newCall(method, CallOptions.DEFAULT);
864 call2.start(mockCallListener2, headers);
865 ClientCall<String, Integer> call3 =
866 oob1.newCall(method, CallOptions.DEFAULT.withWaitForReady());
867 call3.start(mockCallListener3, headers);
868 verify(mockTransportFactory, times(2)).newClientTransport(
869 socketAddress, "oob1authority", userAgent);
870 transportInfo = transports.poll();
871 assertNotNull(transportInfo);
872
873 // This transport fails
874 Status transportError = Status.UNAVAILABLE.withDescription("Connection refused");
875 assertEquals(0, oobExecutor.numPendingTasks());
876 transportInfo.listener.transportShutdown(transportError);
877 assertTrue(oobExecutor.runDueTasks() > 0);
878
879 // Fail-fast RPC will fail, while wait-for-ready RPC will still be pending
880 verify(mockCallListener2).onClose(same(transportError), any(Metadata.class));
881 verify(mockCallListener3, never()).onClose(any(Status.class), any(Metadata.class));
882
883 // Shutdown
884 assertFalse(oob1.isShutdown());
885 assertFalse(oob2.isShutdown());
886 oob1.shutdown();
887 verify(oobExecutorPool, never()).returnObject(anyObject());
888 oob2.shutdownNow();
889 assertTrue(oob1.isShutdown());
890 assertTrue(oob2.isShutdown());
891 assertTrue(oob2.isTerminated());
892 verify(oobExecutorPool).returnObject(oobExecutor.getScheduledExecutorService());
893
894 // New RPCs will be rejected.
895 assertEquals(0, oobExecutor.numPendingTasks());
896 ClientCall<String, Integer> call4 = oob1.newCall(method, CallOptions.DEFAULT);
897 ClientCall<String, Integer> call5 = oob2.newCall(method, CallOptions.DEFAULT);
898 call4.start(mockCallListener4, headers);
899 call5.start(mockCallListener5, headers);
900 assertTrue(oobExecutor.runDueTasks() > 0);
901 verify(mockCallListener4).onClose(statusCaptor.capture(), any(Metadata.class));
902 Status status4 = statusCaptor.getValue();
903 assertEquals(Status.Code.UNAVAILABLE, status4.getCode());
904 verify(mockCallListener5).onClose(statusCaptor.capture(), any(Metadata.class));
905 Status status5 = statusCaptor.getValue();
906 assertEquals(Status.Code.UNAVAILABLE, status5.getCode());
907
908 // The pending RPC will still be pending
909 verify(mockCallListener3, never()).onClose(any(Status.class), any(Metadata.class));
910
911 // This will shutdownNow() the delayed transport, terminating the pending RPC
912 assertEquals(0, oobExecutor.numPendingTasks());
913 oob1.shutdownNow();
914 assertTrue(oobExecutor.runDueTasks() > 0);
915 verify(mockCallListener3).onClose(any(Status.class), any(Metadata.class));
916
917 // Shut down the channel, and it will not terminated because OOB channel has not.
918 channel.shutdown();
919 assertFalse(channel.isTerminated());
920 // Delayed transport has already terminated. Terminating the transport terminates the
921 // subchannel, which in turn terimates the OOB channel, which terminates the channel.
922 assertFalse(oob1.isTerminated());
923 verify(oobExecutorPool).returnObject(oobExecutor.getScheduledExecutorService());
924 transportInfo.listener.transportTerminated();
925 assertTrue(oob1.isTerminated());
926 assertTrue(channel.isTerminated());
927 verify(oobExecutorPool, times(2)).returnObject(oobExecutor.getScheduledExecutorService());
928 }
929
930 @Test
931 public void oobChannelsWhenChannelShutdownNow() {
932 createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
933 ManagedChannel oob1 = helper.createOobChannel(addressGroup, "oob1Authority");
934 ManagedChannel oob2 = helper.createOobChannel(addressGroup, "oob2Authority");
935
936 oob1.newCall(method, CallOptions.DEFAULT).start(mockCallListener, new Metadata());
937 oob2.newCall(method, CallOptions.DEFAULT).start(mockCallListener2, new Metadata());
938
939 assertEquals(2, transports.size());
940 MockClientTransportInfo ti1 = transports.poll();
941 MockClientTransportInfo ti2 = transports.poll();
942
943 ti1.listener.transportReady();
944 ti2.listener.transportReady();
945
946 channel.shutdownNow();
947 verify(ti1.transport).shutdownNow(any(Status.class));
948 verify(ti2.transport).shutdownNow(any(Status.class));
949
950 ti1.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now"));
951 ti2.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now"));
952 ti1.listener.transportTerminated();
953
954 assertFalse(channel.isTerminated());
955 ti2.listener.transportTerminated();
956 assertTrue(channel.isTerminated());
957 }
958
959 @Test
960 public void oobChannelsNoConnectionShutdown() {
961 createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
962 ManagedChannel oob1 = helper.createOobChannel(addressGroup, "oob1Authority");
963 ManagedChannel oob2 = helper.createOobChannel(addressGroup, "oob2Authority");
964 channel.shutdown();
965
966 verify(mockLoadBalancer).shutdown();
967 oob1.shutdown();
968 assertTrue(oob1.isTerminated());
969 assertFalse(channel.isTerminated());
970 oob2.shutdown();
971 assertTrue(oob2.isTerminated());
972 assertTrue(channel.isTerminated());
973 verify(mockTransportFactory, never()).newClientTransport(any(SocketAddress.class), anyString(),
974 anyString());
975 }
976
977 @Test
978 public void oobChannelsNoConnectionShutdownNow() {
979 createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
Eric Anderson1e99b292017-01-19 12:16:05 -0800980 helper.createOobChannel(addressGroup, "oob1Authority");
981 helper.createOobChannel(addressGroup, "oob2Authority");
Kun Zhang7cb04972017-01-09 14:44:10 -0800982 channel.shutdownNow();
983
984 verify(mockLoadBalancer).shutdown();
985 assertTrue(channel.isTerminated());
986 // Channel's shutdownNow() will call shutdownNow() on all subchannels and oobchannels.
987 // Therefore, channel is terminated without relying on LoadBalancer to shutdown oobchannels.
988 verify(mockTransportFactory, never()).newClientTransport(any(SocketAddress.class), anyString(),
989 anyString());
990 }
991
992 @Test
993 public void uriPattern() {
Kun Zhanga9bd9472017-02-21 17:11:03 -0800994 assertTrue(ManagedChannelImpl.URI_PATTERN.matcher("a:/").matches());
995 assertTrue(ManagedChannelImpl.URI_PATTERN.matcher("Z019+-.:/!@ #~ ").matches());
996 assertFalse(ManagedChannelImpl.URI_PATTERN.matcher("a/:").matches()); // "/:" not matched
997 assertFalse(ManagedChannelImpl.URI_PATTERN.matcher("0a:/").matches()); // '0' not matched
998 assertFalse(ManagedChannelImpl.URI_PATTERN.matcher("a,:/").matches()); // ',' not matched
999 assertFalse(ManagedChannelImpl.URI_PATTERN.matcher(" a:/").matches()); // space not matched
Kun Zhang7cb04972017-01-09 14:44:10 -08001000 }
1001
1002 /**
1003 * Test that information such as the Call's context, MethodDescriptor, authority, executor are
1004 * propagated to newStream() and applyRequestMetadata().
1005 */
1006 @Test
1007 public void informationPropagatedToNewStreamAndCallCredentials() {
1008 ResolvedServerInfoGroup serverInfoGroup = ResolvedServerInfoGroup.builder()
1009 .add(server).build();
1010 createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
1011 CallOptions callOptions = CallOptions.DEFAULT.withCallCredentials(creds);
1012 final Context.Key<String> testKey = Context.key("testing");
1013 Context ctx = Context.current().withValue(testKey, "testValue");
1014 final LinkedList<Context> credsApplyContexts = new LinkedList<Context>();
1015 final LinkedList<Context> newStreamContexts = new LinkedList<Context>();
1016 doAnswer(new Answer<Void>() {
1017 @Override
1018 public Void answer(InvocationOnMock in) throws Throwable {
1019 credsApplyContexts.add(Context.current());
1020 return null;
1021 }
1022 }).when(creds).applyRequestMetadata(
1023 any(MethodDescriptor.class), any(Attributes.class), any(Executor.class),
1024 any(MetadataApplier.class));
1025
1026 // First call will be on delayed transport. Only newCall() is run within the expected context,
1027 // so that we can verify that the context is explicitly attached before calling newStream() and
1028 // applyRequestMetadata(), which happens after we detach the context from the thread.
1029 Context origCtx = ctx.attach();
1030 assertEquals("testValue", testKey.get());
1031 ClientCall<String, Integer> call = channel.newCall(method, callOptions);
1032 ctx.detach(origCtx);
1033 assertNull(testKey.get());
1034 call.start(mockCallListener, new Metadata());
1035
1036 // Simulate name resolution results
1037 Subchannel subchannel = helper.createSubchannel(
1038 serverInfoGroup.toEquivalentAddressGroup(), Attributes.EMPTY);
1039 subchannel.requestConnection();
1040 verify(mockTransportFactory).newClientTransport(
1041 same(socketAddress), eq(authority), eq(userAgent));
1042 MockClientTransportInfo transportInfo = transports.poll();
1043 final ConnectionClientTransport transport = transportInfo.transport;
Lukasz Strzalkowskib33d3cb2017-01-18 10:32:45 -08001044 when(transport.getAttributes()).thenReturn(Attributes.EMPTY);
Kun Zhang7cb04972017-01-09 14:44:10 -08001045 doAnswer(new Answer<ClientStream>() {
1046 @Override
1047 public ClientStream answer(InvocationOnMock in) throws Throwable {
1048 newStreamContexts.add(Context.current());
1049 return mock(ClientStream.class);
1050 }
1051 }).when(transport).newStream(
1052 any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class),
1053 any(StatsTraceContext.class));
1054
1055 verify(creds, never()).applyRequestMetadata(
1056 any(MethodDescriptor.class), any(Attributes.class), any(Executor.class),
1057 any(MetadataApplier.class));
1058
1059 // applyRequestMetadata() is called after the transport becomes ready.
1060 transportInfo.listener.transportReady();
1061 when(mockPicker.pickSubchannel(any(Attributes.class), any(Metadata.class)))
1062 .thenReturn(PickResult.withSubchannel(subchannel));
1063 helper.updatePicker(mockPicker);
1064 executor.runDueTasks();
1065 ArgumentCaptor<Attributes> attrsCaptor = ArgumentCaptor.forClass(Attributes.class);
1066 ArgumentCaptor<MetadataApplier> applierCaptor = ArgumentCaptor.forClass(MetadataApplier.class);
1067 verify(creds).applyRequestMetadata(same(method), attrsCaptor.capture(),
1068 same(executor.getScheduledExecutorService()), applierCaptor.capture());
1069 assertEquals("testValue", testKey.get(credsApplyContexts.poll()));
1070 assertEquals(authority, attrsCaptor.getValue().get(CallCredentials.ATTR_AUTHORITY));
1071 assertEquals(SecurityLevel.NONE,
1072 attrsCaptor.getValue().get(CallCredentials.ATTR_SECURITY_LEVEL));
1073 verify(transport, never()).newStream(
1074 any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class),
1075 any(StatsTraceContext.class));
1076
1077 // newStream() is called after apply() is called
1078 applierCaptor.getValue().apply(new Metadata());
1079 verify(transport).newStream(same(method), any(Metadata.class), same(callOptions),
1080 any(StatsTraceContext.class));
1081 assertEquals("testValue", testKey.get(newStreamContexts.poll()));
1082 // The context should not live beyond the scope of newStream() and applyRequestMetadata()
1083 assertNull(testKey.get());
1084
1085
1086 // Second call will not be on delayed transport
1087 origCtx = ctx.attach();
1088 call = channel.newCall(method, callOptions);
1089 ctx.detach(origCtx);
1090 call.start(mockCallListener, new Metadata());
1091
1092 verify(creds, times(2)).applyRequestMetadata(same(method), attrsCaptor.capture(),
1093 same(executor.getScheduledExecutorService()), applierCaptor.capture());
1094 assertEquals("testValue", testKey.get(credsApplyContexts.poll()));
1095 assertEquals(authority, attrsCaptor.getValue().get(CallCredentials.ATTR_AUTHORITY));
1096 assertEquals(SecurityLevel.NONE,
1097 attrsCaptor.getValue().get(CallCredentials.ATTR_SECURITY_LEVEL));
1098 // This is from the first call
1099 verify(transport).newStream(
1100 any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class),
1101 any(StatsTraceContext.class));
1102
1103 // Still, newStream() is called after apply() is called
1104 applierCaptor.getValue().apply(new Metadata());
1105 verify(transport, times(2)).newStream(same(method), any(Metadata.class), same(callOptions),
1106 any(StatsTraceContext.class));
1107 assertEquals("testValue", testKey.get(newStreamContexts.poll()));
1108
1109 assertNull(testKey.get());
1110 }
1111
1112 private static class FakeBackoffPolicyProvider implements BackoffPolicy.Provider {
1113 @Override
1114 public BackoffPolicy get() {
1115 return new BackoffPolicy() {
1116 @Override
1117 public long nextBackoffMillis() {
1118 return 1;
1119 }
1120 };
1121 }
1122 }
1123
1124 private class FakeNameResolverFactory extends NameResolver.Factory {
1125 final List<ResolvedServerInfoGroup> servers;
1126 final boolean resolvedAtStart;
1127 final ArrayList<FakeNameResolver> resolvers = new ArrayList<FakeNameResolver>();
1128
1129 FakeNameResolverFactory(boolean resolvedAtStart) {
1130 this.resolvedAtStart = resolvedAtStart;
1131 servers = Collections.singletonList(ResolvedServerInfoGroup.builder().add(server).build());
1132 }
1133
1134 FakeNameResolverFactory(List<ResolvedServerInfo> servers) {
1135 resolvedAtStart = true;
1136 this.servers = Collections.singletonList(
1137 ResolvedServerInfoGroup.builder().addAll(servers).build());
1138 }
1139
1140 public FakeNameResolverFactory() {
1141 resolvedAtStart = true;
1142 this.servers = ImmutableList.of();
1143 }
1144
1145 @Override
1146 public NameResolver newNameResolver(final URI targetUri, Attributes params) {
1147 if (!expectedUri.equals(targetUri)) {
1148 return null;
1149 }
1150 assertSame(NAME_RESOLVER_PARAMS, params);
1151 FakeNameResolver resolver = new FakeNameResolver();
1152 resolvers.add(resolver);
1153 return resolver;
1154 }
1155
1156 @Override
1157 public String getDefaultScheme() {
1158 return "fake";
1159 }
1160
1161 void allResolved() {
1162 for (FakeNameResolver resolver : resolvers) {
1163 resolver.resolved();
1164 }
1165 }
1166
1167 private class FakeNameResolver extends NameResolver {
1168 Listener listener;
1169 boolean shutdown;
1170
1171 @Override public String getServiceAuthority() {
1172 return expectedUri.getAuthority();
1173 }
1174
1175 @Override public void start(final Listener listener) {
1176 this.listener = listener;
1177 if (resolvedAtStart) {
1178 resolved();
1179 }
1180 }
1181
1182 void resolved() {
1183 listener.onUpdate(servers, Attributes.EMPTY);
1184 }
1185
1186 @Override public void shutdown() {
1187 shutdown = true;
1188 }
1189 }
1190 }
1191
1192 private static class FailingNameResolverFactory extends NameResolver.Factory {
1193 final Status error;
1194
1195 FailingNameResolverFactory(Status error) {
1196 this.error = error;
1197 }
1198
1199 @Override
1200 public NameResolver newNameResolver(URI notUsedUri, Attributes params) {
1201 return new NameResolver() {
1202 @Override public String getServiceAuthority() {
1203 return "irrelevant-authority";
1204 }
1205
1206 @Override public void start(final Listener listener) {
1207 listener.onError(error);
1208 }
1209
1210 @Override public void shutdown() {}
1211 };
1212 }
1213
1214 @Override
1215 public String getDefaultScheme() {
1216 return "fake";
1217 }
1218 }
1219}