blob: 861fefce7924cfdf8923dd53e4a0721c0384ff64 [file] [log] [blame]
Kun Zhang7cb04972017-01-09 14:44:10 -08001/*
2 * Copyright 2015, Google Inc. All rights reserved.
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions are
6 * met:
7 *
8 * * Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above
11 * copyright notice, this list of conditions and the following disclaimer
12 * in the documentation and/or other materials provided with the
13 * distribution.
14 *
15 * * Neither the name of Google Inc. nor the names of its
16 * contributors may be used to endorse or promote products derived from
17 * this software without specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
20 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
21 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
22 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
23 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
26 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
27 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
28 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 */
31
32package io.grpc.internal;
33
34import static io.grpc.ConnectivityState.CONNECTING;
35import static io.grpc.ConnectivityState.READY;
36import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
37import static junit.framework.TestCase.assertNotSame;
38import static org.junit.Assert.assertEquals;
39import static org.junit.Assert.assertFalse;
40import static org.junit.Assert.assertNotNull;
41import static org.junit.Assert.assertNull;
42import static org.junit.Assert.assertSame;
43import static org.junit.Assert.assertTrue;
44import static org.mockito.Matchers.any;
45import static org.mockito.Matchers.anyObject;
46import static org.mockito.Matchers.anyString;
47import static org.mockito.Matchers.eq;
48import static org.mockito.Matchers.same;
49import static org.mockito.Mockito.atLeast;
50import static org.mockito.Mockito.doAnswer;
51import static org.mockito.Mockito.doThrow;
52import static org.mockito.Mockito.inOrder;
53import static org.mockito.Mockito.mock;
54import static org.mockito.Mockito.never;
55import static org.mockito.Mockito.times;
56import static org.mockito.Mockito.verify;
57import static org.mockito.Mockito.verifyNoMoreInteractions;
58import static org.mockito.Mockito.when;
59
60import com.google.common.collect.ImmutableList;
61
62import io.grpc.Attributes;
Kun Zhang7cb04972017-01-09 14:44:10 -080063import io.grpc.CallCredentials;
Carl Mastrangeloefbcd1f2017-01-23 12:29:35 -080064import io.grpc.CallCredentials.MetadataApplier;
Kun Zhang7cb04972017-01-09 14:44:10 -080065import io.grpc.CallOptions;
66import io.grpc.Channel;
67import io.grpc.ClientCall;
68import io.grpc.ClientInterceptor;
69import io.grpc.CompressorRegistry;
70import io.grpc.ConnectivityStateInfo;
71import io.grpc.Context;
72import io.grpc.DecompressorRegistry;
73import io.grpc.EquivalentAddressGroup;
74import io.grpc.IntegerMarshaller;
75import io.grpc.LoadBalancer2;
76import io.grpc.LoadBalancer2.Helper;
77import io.grpc.LoadBalancer2.PickResult;
78import io.grpc.LoadBalancer2.Subchannel;
79import io.grpc.LoadBalancer2.SubchannelPicker;
80import io.grpc.ManagedChannel;
81import io.grpc.Metadata;
82import io.grpc.MethodDescriptor;
Carl Mastrangeloefbcd1f2017-01-23 12:29:35 -080083import io.grpc.MethodDescriptor.MethodType;
Kun Zhang7cb04972017-01-09 14:44:10 -080084import io.grpc.NameResolver;
85import io.grpc.ResolvedServerInfo;
86import io.grpc.ResolvedServerInfoGroup;
87import io.grpc.SecurityLevel;
88import io.grpc.Status;
89import io.grpc.StringMarshaller;
90import io.grpc.internal.TestUtils.MockClientTransportInfo;
91import io.grpc.internal.testing.StatsTestUtils.FakeStatsContextFactory;
92
93import org.junit.After;
94import org.junit.Before;
95import org.junit.Rule;
96import org.junit.Test;
97import org.junit.rules.ExpectedException;
98import org.junit.runner.RunWith;
99import org.junit.runners.JUnit4;
100import org.mockito.ArgumentCaptor;
101import org.mockito.Captor;
102import org.mockito.InOrder;
103import org.mockito.Matchers;
104import org.mockito.Mock;
105import org.mockito.MockitoAnnotations;
106import org.mockito.invocation.InvocationOnMock;
107import org.mockito.stubbing.Answer;
108
109import java.net.SocketAddress;
110import java.net.URI;
111import java.util.ArrayList;
112import java.util.Arrays;
113import java.util.Collections;
114import java.util.LinkedList;
115import java.util.List;
116import java.util.concurrent.BlockingQueue;
117import java.util.concurrent.Executor;
118import java.util.concurrent.ScheduledExecutorService;
119import java.util.concurrent.TimeUnit;
120import java.util.concurrent.atomic.AtomicLong;
121
122/** Unit tests for {@link ManagedChannelImpl2}. */
123@RunWith(JUnit4.class)
124public class ManagedChannelImpl2Test {
125 private static final List<ClientInterceptor> NO_INTERCEPTOR =
126 Collections.<ClientInterceptor>emptyList();
127 private static final Attributes NAME_RESOLVER_PARAMS =
128 Attributes.newBuilder().set(NameResolver.Factory.PARAMS_DEFAULT_PORT, 447).build();
Carl Mastrangeloefbcd1f2017-01-23 12:29:35 -0800129 private static final MethodDescriptor<String, Integer> method =
130 MethodDescriptor.<String, Integer>newBuilder()
131 .setType(MethodType.UNKNOWN)
132 .setFullMethodName("/service/method")
133 .setRequestMarshaller(new StringMarshaller())
134 .setResponseMarshaller(new IntegerMarshaller())
135 .build();
Kun Zhang7cb04972017-01-09 14:44:10 -0800136 private static final Attributes.Key<String> SUBCHANNEL_ATTR_KEY =
137 Attributes.Key.of("subchannel-attr-key");
138 private final String serviceName = "fake.example.com";
139 private final String authority = serviceName;
140 private final String userAgent = "userAgent";
141 private final String target = "fake://" + serviceName;
142 private URI expectedUri;
143 private final SocketAddress socketAddress = new SocketAddress() {};
144 private final EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(socketAddress);
145 private final ResolvedServerInfo server = new ResolvedServerInfo(socketAddress, Attributes.EMPTY);
146 private final FakeClock timer = new FakeClock();
147 private final FakeClock executor = new FakeClock();
148 private final FakeClock oobExecutor = new FakeClock();
149 private final FakeStatsContextFactory statsCtxFactory = new FakeStatsContextFactory();
150
151 @Rule public final ExpectedException thrown = ExpectedException.none();
152
153 private ManagedChannelImpl2 channel;
154 private Helper helper;
155 @Captor
156 private ArgumentCaptor<Status> statusCaptor;
157 @Captor
158 private ArgumentCaptor<StatsTraceContext> statsTraceCtxCaptor;
159 @Mock
160 private LoadBalancer2.Factory mockLoadBalancerFactory;
161 @Mock
162 private LoadBalancer2 mockLoadBalancer;
163 @Captor
164 private ArgumentCaptor<ConnectivityStateInfo> stateInfoCaptor;
165 @Mock
166 private SubchannelPicker mockPicker;
167 @Mock
168 private ClientTransportFactory mockTransportFactory;
169 @Mock
170 private ClientCall.Listener<Integer> mockCallListener;
171 @Mock
172 private ClientCall.Listener<Integer> mockCallListener2;
173 @Mock
174 private ClientCall.Listener<Integer> mockCallListener3;
175 @Mock
176 private ClientCall.Listener<Integer> mockCallListener4;
177 @Mock
178 private ClientCall.Listener<Integer> mockCallListener5;
179 @Mock
180 private ObjectPool<ScheduledExecutorService> timerServicePool;
181 @Mock
182 private ObjectPool<Executor> executorPool;
183 @Mock
184 private ObjectPool<Executor> oobExecutorPool;
185 @Mock
186 private CallCredentials creds;
187 private BlockingQueue<MockClientTransportInfo> transports;
188
189 private ArgumentCaptor<ClientStreamListener> streamListenerCaptor =
190 ArgumentCaptor.forClass(ClientStreamListener.class);
191
192 private void createChannel(
193 NameResolver.Factory nameResolverFactory, List<ClientInterceptor> interceptors) {
194 channel = new ManagedChannelImpl2(target, new FakeBackoffPolicyProvider(),
195 nameResolverFactory, NAME_RESOLVER_PARAMS, mockLoadBalancerFactory,
196 mockTransportFactory, DecompressorRegistry.getDefaultInstance(),
197 CompressorRegistry.getDefaultInstance(), timerServicePool, executorPool, oobExecutorPool,
198 timer.getStopwatchSupplier(), ManagedChannelImpl2.IDLE_TIMEOUT_MILLIS_DISABLE, userAgent,
199 interceptors, statsCtxFactory);
200 // Force-exit the initial idle-mode
201 channel.exitIdleMode();
202 assertEquals(0, timer.numPendingTasks());
203
204 ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(null);
205 verify(mockLoadBalancerFactory).newLoadBalancer(helperCaptor.capture());
206 helper = helperCaptor.getValue();
207 }
208
209 @Before
210 public void setUp() throws Exception {
211 MockitoAnnotations.initMocks(this);
212 expectedUri = new URI(target);
213 when(mockLoadBalancerFactory.newLoadBalancer(any(Helper.class))).thenReturn(mockLoadBalancer);
214 transports = TestUtils.captureTransports(mockTransportFactory);
215 when(timerServicePool.getObject()).thenReturn(timer.getScheduledExecutorService());
216 when(executorPool.getObject()).thenReturn(executor.getScheduledExecutorService());
217 when(oobExecutorPool.getObject()).thenReturn(oobExecutor.getScheduledExecutorService());
218 }
219
220 @After
221 public void allPendingTasksAreRun() throws Exception {
222 // The "never" verifications in the tests only hold up if all due tasks are done.
223 // As for timer, although there may be scheduled tasks in a future time, since we don't test
224 // any time-related behavior in this test suite, we only care the tasks that are due. This
225 // would ignore any time-sensitive tasks, e.g., back-off and the idle timer.
226 assertTrue(timer.getDueTasks() + " should be empty", timer.getDueTasks().isEmpty());
227 assertEquals(executor.getPendingTasks() + " should be empty", 0, executor.numPendingTasks());
228 }
229
230 /**
231 * The counterpart of {@link ManagedChannelImpl2IdlenessTest#enterIdleModeAfterForceExit}.
232 */
233 @Test
234 @SuppressWarnings("unchecked")
235 public void idleModeDisabled() {
236 createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
237
238 // In this test suite, the channel is always created with idle mode disabled.
239 // No task is scheduled to enter idle mode
240 assertEquals(0, timer.numPendingTasks());
241 assertEquals(0, executor.numPendingTasks());
242 }
243
244 @Test
245 public void immediateDeadlineExceeded() {
246 createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
247 ClientCall<String, Integer> call =
248 channel.newCall(method, CallOptions.DEFAULT.withDeadlineAfter(0, TimeUnit.NANOSECONDS));
249 call.start(mockCallListener, new Metadata());
250 assertEquals(1, executor.runDueTasks());
251
252 verify(mockCallListener).onClose(statusCaptor.capture(), any(Metadata.class));
253 Status status = statusCaptor.getValue();
254 assertSame(Status.DEADLINE_EXCEEDED.getCode(), status.getCode());
255 }
256
257 @Test
258 public void shutdownWithNoTransportsEverCreated() {
259 createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
260 verify(executorPool).getObject();
261 verify(timerServicePool).getObject();
262 verify(executorPool, never()).returnObject(anyObject());
263 verify(timerServicePool, never()).returnObject(anyObject());
264 verifyNoMoreInteractions(mockTransportFactory);
265 channel.shutdown();
266 assertTrue(channel.isShutdown());
267 assertTrue(channel.isTerminated());
268 verify(executorPool).returnObject(executor.getScheduledExecutorService());
269 verify(timerServicePool).returnObject(timer.getScheduledExecutorService());
270 }
271
272 @Test
273 public void callsAndShutdown() {
274 subtestCallsAndShutdown(false, false);
275 }
276
277 @Test
278 public void callsAndShutdownNow() {
279 subtestCallsAndShutdown(true, false);
280 }
281
282 /** Make sure shutdownNow() after shutdown() has an effect. */
283 @Test
284 public void callsAndShutdownAndShutdownNow() {
285 subtestCallsAndShutdown(false, true);
286 }
287
288 private void subtestCallsAndShutdown(boolean shutdownNow, boolean shutdownNowAfterShutdown) {
289 FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory(true);
290 createChannel(nameResolverFactory, NO_INTERCEPTOR);
291 verify(executorPool).getObject();
292 verify(timerServicePool).getObject();
293 ClientStream mockStream = mock(ClientStream.class);
294 ClientStream mockStream2 = mock(ClientStream.class);
295 Metadata headers = new Metadata();
296 Metadata headers2 = new Metadata();
297
298 // Configure the picker so that first RPC goes to delayed transport, and second RPC goes to
299 // real transport.
300 Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
301 subchannel.requestConnection();
302 verify(mockTransportFactory).newClientTransport(
303 any(SocketAddress.class), any(String.class), any(String.class));
304 MockClientTransportInfo transportInfo = transports.poll();
305 ConnectionClientTransport mockTransport = transportInfo.transport;
306 verify(mockTransport).start(any(ManagedClientTransport.Listener.class));
307 ManagedClientTransport.Listener transportListener = transportInfo.listener;
308 when(mockTransport.newStream(same(method), same(headers), same(CallOptions.DEFAULT),
309 any(StatsTraceContext.class)))
310 .thenReturn(mockStream);
311 when(mockTransport.newStream(same(method), same(headers2), same(CallOptions.DEFAULT),
312 any(StatsTraceContext.class)))
313 .thenReturn(mockStream2);
314 transportListener.transportReady();
315 when(mockPicker.pickSubchannel(any(Attributes.class), same(headers))).thenReturn(
316 PickResult.withNoResult());
317 when(mockPicker.pickSubchannel(any(Attributes.class), same(headers2))).thenReturn(
318 PickResult.withSubchannel(subchannel));
319 helper.updatePicker(mockPicker);
320
321 // First RPC, will be pending
322 ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
323 verifyNoMoreInteractions(mockTransportFactory);
324 call.start(mockCallListener, headers);
325
326 verify(mockTransport, never()).newStream(same(method), same(headers), same(CallOptions.DEFAULT),
327 any(StatsTraceContext.class));
328 statsCtxFactory.pollContextOrFail();
329
330 // Second RPC, will be assigned to the real transport
331 ClientCall<String, Integer> call2 = channel.newCall(method, CallOptions.DEFAULT);
332 call2.start(mockCallListener2, headers2);
333 verify(mockTransport).newStream(same(method), same(headers2), same(CallOptions.DEFAULT),
334 statsTraceCtxCaptor.capture());
335 assertEquals(statsCtxFactory.pollContextOrFail(),
336 statsTraceCtxCaptor.getValue().getStatsContext());
337 verify(mockTransport).newStream(same(method), same(headers2), same(CallOptions.DEFAULT),
338 statsTraceCtxCaptor.capture());
339 verify(mockStream2).start(any(ClientStreamListener.class));
340
341 // Shutdown
342 if (shutdownNow) {
343 channel.shutdownNow();
344 } else {
345 channel.shutdown();
346 if (shutdownNowAfterShutdown) {
347 channel.shutdownNow();
348 shutdownNow = true;
349 }
350 }
351 assertTrue(channel.isShutdown());
352 assertFalse(channel.isTerminated());
353 assertEquals(1, nameResolverFactory.resolvers.size());
354 verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class));
355
356 // Further calls should fail without going to the transport
357 ClientCall<String, Integer> call3 = channel.newCall(method, CallOptions.DEFAULT);
358 call3.start(mockCallListener3, headers2);
359 timer.runDueTasks();
360 executor.runDueTasks();
361
362 verify(mockCallListener3).onClose(statusCaptor.capture(), any(Metadata.class));
363 assertSame(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode());
364
365 if (shutdownNow) {
366 // LoadBalancer and NameResolver are shut down as soon as delayed transport is terminated.
367 verify(mockLoadBalancer).shutdown();
368 assertTrue(nameResolverFactory.resolvers.get(0).shutdown);
369 // call should have been aborted by delayed transport
370 executor.runDueTasks();
371 verify(mockCallListener).onClose(same(ManagedChannelImpl2.SHUTDOWN_NOW_STATUS),
372 any(Metadata.class));
373 } else {
374 // LoadBalancer and NameResolver are still running.
375 verify(mockLoadBalancer, never()).shutdown();
376 assertFalse(nameResolverFactory.resolvers.get(0).shutdown);
377 // call and call2 are still alive, and can still be assigned to a real transport
378 SubchannelPicker picker2 = mock(SubchannelPicker.class);
379 when(picker2.pickSubchannel(any(Attributes.class), same(headers))).thenReturn(
380 PickResult.withSubchannel(subchannel));
381 helper.updatePicker(picker2);
382 executor.runDueTasks();
383 verify(mockTransport).newStream(same(method), same(headers), same(CallOptions.DEFAULT),
384 any(StatsTraceContext.class));
385 verify(mockStream).start(any(ClientStreamListener.class));
386 }
387
388 // After call is moved out of delayed transport, LoadBalancer, NameResolver and the transports
389 // will be shutdown.
390 verify(mockLoadBalancer).shutdown();
391 assertTrue(nameResolverFactory.resolvers.get(0).shutdown);
392
393 if (shutdownNow) {
394 // Channel shutdownNow() all subchannels after shutting down LoadBalancer
395 verify(mockTransport).shutdownNow(ManagedChannelImpl2.SHUTDOWN_NOW_STATUS);
396 } else {
397 verify(mockTransport, never()).shutdownNow(any(Status.class));
398 }
399 // LoadBalancer should shutdown the subchannel
400 subchannel.shutdown();
401 verify(mockTransport).shutdown();
402
403 // Killing the remaining real transport will terminate the channel
404 transportListener.transportShutdown(Status.UNAVAILABLE);
405 assertFalse(channel.isTerminated());
406 verify(executorPool, never()).returnObject(anyObject());
407 verify(timerServicePool, never()).returnObject(anyObject());
408 transportListener.transportTerminated();
409 assertTrue(channel.isTerminated());
410 verify(executorPool).returnObject(executor.getScheduledExecutorService());
411 verify(timerServicePool).returnObject(timer.getScheduledExecutorService());
412 verifyNoMoreInteractions(oobExecutorPool);
413
414 verify(mockTransportFactory).close();
415 verifyNoMoreInteractions(mockTransportFactory);
416 verify(mockTransport, atLeast(0)).getLogId();
417 verifyNoMoreInteractions(mockTransport);
418 }
419
420 @Test
421 public void shutdownNowWithMultipleOobChannels() {
422 }
423
424 @Test
425 public void interceptor() throws Exception {
426 final AtomicLong atomic = new AtomicLong();
427 ClientInterceptor interceptor = new ClientInterceptor() {
428 @Override
429 public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> interceptCall(
430 MethodDescriptor<RequestT, ResponseT> method, CallOptions callOptions,
431 Channel next) {
432 atomic.set(1);
433 return next.newCall(method, callOptions);
434 }
435 };
436 createChannel(new FakeNameResolverFactory(true), Arrays.asList(interceptor));
437 assertNotNull(channel.newCall(method, CallOptions.DEFAULT));
438 assertEquals(1, atomic.get());
439 }
440
441 @Test
442 public void callOptionsExecutor() {
443 Metadata headers = new Metadata();
444 ClientStream mockStream = mock(ClientStream.class);
445 FakeClock callExecutor = new FakeClock();
446 createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
447
448 // Start a call with a call executor
449 CallOptions options =
450 CallOptions.DEFAULT.withExecutor(callExecutor.getScheduledExecutorService());
451 ClientCall<String, Integer> call = channel.newCall(method, options);
452 call.start(mockCallListener, headers);
453
454 // Make the transport available
455 Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
456 verify(mockTransportFactory, never()).newClientTransport(
457 any(SocketAddress.class), any(String.class), any(String.class));
458 subchannel.requestConnection();
459 verify(mockTransportFactory).newClientTransport(
460 any(SocketAddress.class), any(String.class), any(String.class));
461 MockClientTransportInfo transportInfo = transports.poll();
462 ConnectionClientTransport mockTransport = transportInfo.transport;
463 ManagedClientTransport.Listener transportListener = transportInfo.listener;
464 when(mockTransport.newStream(same(method), same(headers), any(CallOptions.class),
465 any(StatsTraceContext.class)))
466 .thenReturn(mockStream);
467 transportListener.transportReady();
468 when(mockPicker.pickSubchannel(any(Attributes.class), any(Metadata.class)))
469 .thenReturn(PickResult.withSubchannel(subchannel));
470 assertEquals(0, callExecutor.numPendingTasks());
471 helper.updatePicker(mockPicker);
472
473 // Real streams are started in the call executor if they were previously buffered.
474 assertEquals(1, callExecutor.runDueTasks());
475 verify(mockTransport).newStream(same(method), same(headers), same(options),
476 any(StatsTraceContext.class));
477 verify(mockStream).start(streamListenerCaptor.capture());
478
479 // Call listener callbacks are also run in the call executor
480 ClientStreamListener streamListener = streamListenerCaptor.getValue();
481 Metadata trailers = new Metadata();
482 assertEquals(0, callExecutor.numPendingTasks());
483 streamListener.closed(Status.CANCELLED, trailers);
484 verify(mockCallListener, never()).onClose(same(Status.CANCELLED), same(trailers));
485 assertEquals(1, callExecutor.runDueTasks());
486 verify(mockCallListener).onClose(same(Status.CANCELLED), same(trailers));
487 }
488
489 @Test
490 public void nameResolutionFailed() {
491 Status error = Status.UNAVAILABLE.withCause(new Throwable("fake name resolution error"));
492
493 // Name resolution is started as soon as channel is created.
494 createChannel(new FailingNameResolverFactory(error), NO_INTERCEPTOR);
495 verify(mockLoadBalancer).handleNameResolutionError(same(error));
496 }
497
498 @Test
499 public void nameResolverReturnsEmptySubLists() {
500 String errorDescription = "NameResolver returned an empty list";
501
502 // Pass a FakeNameResolverFactory with an empty list
503 createChannel(new FakeNameResolverFactory(), NO_INTERCEPTOR);
504
505 // LoadBalancer received the error
506 verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class));
507 verify(mockLoadBalancer).handleNameResolutionError(statusCaptor.capture());
508 Status status = statusCaptor.getValue();
509 assertSame(Status.Code.UNAVAILABLE, status.getCode());
510 assertEquals(errorDescription, status.getDescription());
511 }
512
513 @Test
514 public void loadBalancerThrowsInHandleResolvedAddresses() {
515 RuntimeException ex = new RuntimeException("simulated");
516 // Delay the success of name resolution until allResolved() is called
517 FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory(false);
518 createChannel(nameResolverFactory, NO_INTERCEPTOR);
519
520 verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class));
521 doThrow(ex).when(mockLoadBalancer).handleResolvedAddresses(
522 Matchers.<List<ResolvedServerInfoGroup>>anyObject(), any(Attributes.class));
523
524 // NameResolver returns addresses.
525 nameResolverFactory.allResolved();
526
527 // The LoadBalancer will receive the error that it has thrown.
528 verify(mockLoadBalancer).handleNameResolutionError(statusCaptor.capture());
529 Status status = statusCaptor.getValue();
530 assertSame(Status.Code.INTERNAL, status.getCode());
531 assertSame(ex, status.getCause());
532 }
533
534 @Test
535 public void nameResolvedAfterChannelShutdown() {
536 // Delay the success of name resolution until allResolved() is called.
537 FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory(false);
538 createChannel(nameResolverFactory, NO_INTERCEPTOR);
539
540 channel.shutdown();
541
542 assertTrue(channel.isShutdown());
543 assertTrue(channel.isTerminated());
544 verify(mockLoadBalancer).shutdown();
545 // Name resolved after the channel is shut down, which is possible if the name resolution takes
546 // time and is not cancellable. The resolved address will be dropped.
547 nameResolverFactory.allResolved();
548 verifyNoMoreInteractions(mockLoadBalancer);
549 }
550
551 /**
552 * Verify that if the first resolved address points to a server that cannot be connected, the call
553 * will end up with the second address which works.
554 */
555 @Test
556 public void firstResolvedServerFailedToConnect() throws Exception {
557 final SocketAddress goodAddress = new SocketAddress() {
558 @Override public String toString() {
559 return "goodAddress";
560 }
561 };
562 final SocketAddress badAddress = new SocketAddress() {
563 @Override public String toString() {
564 return "badAddress";
565 }
566 };
567 final ResolvedServerInfo goodServer = new ResolvedServerInfo(goodAddress, Attributes.EMPTY);
568 final ResolvedServerInfo badServer = new ResolvedServerInfo(badAddress, Attributes.EMPTY);
569 InOrder inOrder = inOrder(mockLoadBalancer);
570
571 ResolvedServerInfoGroup serverInfoGroup = ResolvedServerInfoGroup.builder()
572 .add(badServer)
573 .add(goodServer)
574 .build();
575 FakeNameResolverFactory nameResolverFactory =
576 new FakeNameResolverFactory(serverInfoGroup.getResolvedServerInfoList());
577 createChannel(nameResolverFactory, NO_INTERCEPTOR);
578
579 // Start the call
580 ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
581 Metadata headers = new Metadata();
582 call.start(mockCallListener, headers);
583 executor.runDueTasks();
584
585 // Simulate name resolution results
586 inOrder.verify(mockLoadBalancer).handleResolvedAddresses(
587 eq(Arrays.asList(serverInfoGroup)), eq(Attributes.EMPTY));
588 Subchannel subchannel = helper.createSubchannel(
589 serverInfoGroup.toEquivalentAddressGroup(), Attributes.EMPTY);
590 when(mockPicker.pickSubchannel(any(Attributes.class), any(Metadata.class)))
591 .thenReturn(PickResult.withSubchannel(subchannel));
592 subchannel.requestConnection();
593 inOrder.verify(mockLoadBalancer).handleSubchannelState(
594 same(subchannel), stateInfoCaptor.capture());
595 assertEquals(CONNECTING, stateInfoCaptor.getValue().getState());
596
597 // The channel will starts with the first address (badAddress)
598 verify(mockTransportFactory)
599 .newClientTransport(same(badAddress), any(String.class), any(String.class));
600 verify(mockTransportFactory, times(0))
601 .newClientTransport(same(goodAddress), any(String.class), any(String.class));
602
603 MockClientTransportInfo badTransportInfo = transports.poll();
604 // Which failed to connect
605 badTransportInfo.listener.transportShutdown(Status.UNAVAILABLE);
606 inOrder.verifyNoMoreInteractions();
607
608 // The channel then try the second address (goodAddress)
609 verify(mockTransportFactory)
610 .newClientTransport(same(goodAddress), any(String.class), any(String.class));
611 MockClientTransportInfo goodTransportInfo = transports.poll();
612 when(goodTransportInfo.transport.newStream(
613 any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class),
614 any(StatsTraceContext.class)))
615 .thenReturn(mock(ClientStream.class));
Carl Mastrangeloefbcd1f2017-01-23 12:29:35 -0800616
Kun Zhang7cb04972017-01-09 14:44:10 -0800617 goodTransportInfo.listener.transportReady();
618 inOrder.verify(mockLoadBalancer).handleSubchannelState(
619 same(subchannel), stateInfoCaptor.capture());
620 assertEquals(READY, stateInfoCaptor.getValue().getState());
621
622 // A typical LoadBalancer will call this once the subchannel becomes READY
623 helper.updatePicker(mockPicker);
624 // Delayed transport uses the app executor to create real streams.
625 executor.runDueTasks();
626
627 verify(goodTransportInfo.transport).newStream(same(method), same(headers),
628 same(CallOptions.DEFAULT), any(StatsTraceContext.class));
629 // The bad transport was never used.
630 verify(badTransportInfo.transport, times(0)).newStream(any(MethodDescriptor.class),
631 any(Metadata.class), any(CallOptions.class), any(StatsTraceContext.class));
632 }
633
634 /**
635 * Verify that if all resolved addresses failed to connect, a fail-fast call will fail, while a
636 * wait-for-ready call will still be buffered.
637 */
638 @Test
639 public void allServersFailedToConnect() throws Exception {
640 final SocketAddress addr1 = new SocketAddress() {
641 @Override public String toString() {
642 return "addr1";
643 }
644 };
645 final SocketAddress addr2 = new SocketAddress() {
646 @Override public String toString() {
647 return "addr2";
648 }
649 };
650 final ResolvedServerInfo server1 = new ResolvedServerInfo(addr1, Attributes.EMPTY);
651 final ResolvedServerInfo server2 = new ResolvedServerInfo(addr2, Attributes.EMPTY);
652 InOrder inOrder = inOrder(mockLoadBalancer);
653
654 ResolvedServerInfoGroup serverInfoGroup = ResolvedServerInfoGroup.builder()
655 .add(server1)
656 .add(server2)
657 .build();
658
659 FakeNameResolverFactory nameResolverFactory =
660 new FakeNameResolverFactory(serverInfoGroup.getResolvedServerInfoList());
661 createChannel(nameResolverFactory, NO_INTERCEPTOR);
662
663 // Start a wait-for-ready call
664 ClientCall<String, Integer> call =
665 channel.newCall(method, CallOptions.DEFAULT.withWaitForReady());
666 Metadata headers = new Metadata();
667 call.start(mockCallListener, headers);
668 // ... and a fail-fast call
669 ClientCall<String, Integer> call2 =
670 channel.newCall(method, CallOptions.DEFAULT.withoutWaitForReady());
671 call2.start(mockCallListener2, headers);
672 executor.runDueTasks();
673
674 // Simulate name resolution results
675 inOrder.verify(mockLoadBalancer).handleResolvedAddresses(
676 eq(Arrays.asList(serverInfoGroup)), eq(Attributes.EMPTY));
677 Subchannel subchannel = helper.createSubchannel(
678 serverInfoGroup.toEquivalentAddressGroup(), Attributes.EMPTY);
679 when(mockPicker.pickSubchannel(any(Attributes.class), any(Metadata.class)))
680 .thenReturn(PickResult.withSubchannel(subchannel));
681 subchannel.requestConnection();
682 inOrder.verify(mockLoadBalancer).handleSubchannelState(
683 same(subchannel), stateInfoCaptor.capture());
684 assertEquals(CONNECTING, stateInfoCaptor.getValue().getState());
685
686 // Connecting to server1, which will fail
687 verify(mockTransportFactory)
688 .newClientTransport(same(addr1), any(String.class), any(String.class));
689 verify(mockTransportFactory, times(0))
690 .newClientTransport(same(addr2), any(String.class), any(String.class));
691 MockClientTransportInfo transportInfo1 = transports.poll();
692 transportInfo1.listener.transportShutdown(Status.UNAVAILABLE);
693
694 // Connecting to server2, which will fail too
695 verify(mockTransportFactory)
696 .newClientTransport(same(addr2), any(String.class), any(String.class));
697 MockClientTransportInfo transportInfo2 = transports.poll();
698 Status server2Error = Status.UNAVAILABLE.withDescription("Server2 failed to connect");
699 transportInfo2.listener.transportShutdown(server2Error);
700
701 // ... which makes the subchannel enter TRANSIENT_FAILURE. The last error Status is propagated
702 // to LoadBalancer.
703 inOrder.verify(mockLoadBalancer).handleSubchannelState(
704 same(subchannel), stateInfoCaptor.capture());
705 assertEquals(TRANSIENT_FAILURE, stateInfoCaptor.getValue().getState());
706 assertSame(server2Error, stateInfoCaptor.getValue().getStatus());
707
708 // A typical LoadBalancer would create a picker with error
709 SubchannelPicker picker2 = mock(SubchannelPicker.class);
710 when(picker2.pickSubchannel(any(Attributes.class), any(Metadata.class)))
711 .thenReturn(PickResult.withError(server2Error));
712 helper.updatePicker(picker2);
713 executor.runDueTasks();
714
715 // ... which fails the fail-fast call
716 verify(mockCallListener2).onClose(same(server2Error), any(Metadata.class));
717 // ... while the wait-for-ready call stays
718 verifyNoMoreInteractions(mockCallListener);
719 // No real stream was ever created
720 verify(transportInfo1.transport, times(0))
721 .newStream(any(MethodDescriptor.class), any(Metadata.class));
722 verify(transportInfo2.transport, times(0))
723 .newStream(any(MethodDescriptor.class), any(Metadata.class));
724 }
725
726 @Test
727 public void subchannels() {
728 createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
729
730 // createSubchannel() always return a new Subchannel
731 Attributes attrs1 = Attributes.newBuilder().set(SUBCHANNEL_ATTR_KEY, "attr1").build();
732 Attributes attrs2 = Attributes.newBuilder().set(SUBCHANNEL_ATTR_KEY, "attr2").build();
733 Subchannel sub1 = helper.createSubchannel(addressGroup, attrs1);
734 Subchannel sub2 = helper.createSubchannel(addressGroup, attrs2);
735 assertNotSame(sub1, sub2);
736 assertNotSame(attrs1, attrs2);
737 assertSame(attrs1, sub1.getAttributes());
738 assertSame(attrs2, sub2.getAttributes());
739 assertSame(addressGroup, sub1.getAddresses());
740 assertSame(addressGroup, sub2.getAddresses());
741
742 // requestConnection()
743 verify(mockTransportFactory, never()).newClientTransport(
744 any(SocketAddress.class), any(String.class), any(String.class));
745 sub1.requestConnection();
746 verify(mockTransportFactory).newClientTransport(socketAddress, authority, userAgent);
747 MockClientTransportInfo transportInfo1 = transports.poll();
748 assertNotNull(transportInfo1);
749
750 sub2.requestConnection();
751 verify(mockTransportFactory, times(2)).newClientTransport(socketAddress, authority, userAgent);
752 MockClientTransportInfo transportInfo2 = transports.poll();
753 assertNotNull(transportInfo2);
754
755 sub1.requestConnection();
756 sub2.requestConnection();
757 verify(mockTransportFactory, times(2)).newClientTransport(socketAddress, authority, userAgent);
758
759 // shutdown() has a delay
760 sub1.shutdown();
761 timer.forwardTime(ManagedChannelImpl2.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS - 1, TimeUnit.SECONDS);
762 sub1.shutdown();
763 verify(transportInfo1.transport, never()).shutdown();
764 timer.forwardTime(1, TimeUnit.SECONDS);
765 verify(transportInfo1.transport).shutdown();
766
767 // ... but not after Channel is terminating
768 verify(mockLoadBalancer, never()).shutdown();
769 channel.shutdown();
770 verify(mockLoadBalancer).shutdown();
771 verify(transportInfo2.transport, never()).shutdown();
772
773 sub2.shutdown();
774 verify(transportInfo2.transport).shutdown();
775 }
776
777 @Test
778 public void subchannelsWhenChannelShutdownNow() {
779 createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
780 Subchannel sub1 = helper.createSubchannel(addressGroup, Attributes.EMPTY);
781 Subchannel sub2 = helper.createSubchannel(addressGroup, Attributes.EMPTY);
782 sub1.requestConnection();
783 sub2.requestConnection();
784
785 assertEquals(2, transports.size());
786 MockClientTransportInfo ti1 = transports.poll();
787 MockClientTransportInfo ti2 = transports.poll();
788
789 ti1.listener.transportReady();
790 ti2.listener.transportReady();
791
792 channel.shutdownNow();
793 verify(ti1.transport).shutdownNow(any(Status.class));
794 verify(ti2.transport).shutdownNow(any(Status.class));
795
796 ti1.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now"));
797 ti2.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now"));
798 ti1.listener.transportTerminated();
799
800 assertFalse(channel.isTerminated());
801 ti2.listener.transportTerminated();
802 assertTrue(channel.isTerminated());
803 }
804
805 @Test
806 public void subchannelsNoConnectionShutdown() {
807 createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
808 Subchannel sub1 = helper.createSubchannel(addressGroup, Attributes.EMPTY);
809 Subchannel sub2 = helper.createSubchannel(addressGroup, Attributes.EMPTY);
810
811 channel.shutdown();
812 verify(mockLoadBalancer).shutdown();
813 sub1.shutdown();
814 assertFalse(channel.isTerminated());
815 sub2.shutdown();
816 assertTrue(channel.isTerminated());
817 verify(mockTransportFactory, never()).newClientTransport(any(SocketAddress.class), anyString(),
818 anyString());
819 }
820
821 @Test
822 public void subchannelsNoConnectionShutdownNow() {
823 createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
Eric Anderson1e99b292017-01-19 12:16:05 -0800824 helper.createSubchannel(addressGroup, Attributes.EMPTY);
825 helper.createSubchannel(addressGroup, Attributes.EMPTY);
Kun Zhang7cb04972017-01-09 14:44:10 -0800826 channel.shutdownNow();
827
828 verify(mockLoadBalancer).shutdown();
829 // Channel's shutdownNow() will call shutdownNow() on all subchannels and oobchannels.
830 // Therefore, channel is terminated without relying on LoadBalancer to shutdown subchannels.
831 assertTrue(channel.isTerminated());
832 verify(mockTransportFactory, never()).newClientTransport(any(SocketAddress.class), anyString(),
833 anyString());
834 }
835
836 @Test
837 public void oobchannels() {
838 createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
839
840 ManagedChannel oob1 = helper.createOobChannel(addressGroup, "oob1authority");
841 ManagedChannel oob2 = helper.createOobChannel(addressGroup, "oob2authority");
842 verify(oobExecutorPool, times(2)).getObject();
843
844 assertEquals("oob1authority", oob1.authority());
845 assertEquals("oob2authority", oob2.authority());
846
847 // OOB channels create connections lazily. A new call will initiate the connection.
848 Metadata headers = new Metadata();
849 ClientCall<String, Integer> call = oob1.newCall(method, CallOptions.DEFAULT);
850 call.start(mockCallListener, headers);
851 verify(mockTransportFactory).newClientTransport(socketAddress, "oob1authority", userAgent);
852 MockClientTransportInfo transportInfo = transports.poll();
853 assertNotNull(transportInfo);
854
855 assertEquals(0, oobExecutor.numPendingTasks());
856 transportInfo.listener.transportReady();
857 assertEquals(1, oobExecutor.runDueTasks());
858 verify(transportInfo.transport).newStream(same(method), same(headers),
859 same(CallOptions.DEFAULT), any(StatsTraceContext.class));
860
861 // The transport goes away
862 transportInfo.listener.transportShutdown(Status.UNAVAILABLE);
863 transportInfo.listener.transportTerminated();
864
865 // A new call will trigger a new transport
866 ClientCall<String, Integer> call2 = oob1.newCall(method, CallOptions.DEFAULT);
867 call2.start(mockCallListener2, headers);
868 ClientCall<String, Integer> call3 =
869 oob1.newCall(method, CallOptions.DEFAULT.withWaitForReady());
870 call3.start(mockCallListener3, headers);
871 verify(mockTransportFactory, times(2)).newClientTransport(
872 socketAddress, "oob1authority", userAgent);
873 transportInfo = transports.poll();
874 assertNotNull(transportInfo);
875
876 // This transport fails
877 Status transportError = Status.UNAVAILABLE.withDescription("Connection refused");
878 assertEquals(0, oobExecutor.numPendingTasks());
879 transportInfo.listener.transportShutdown(transportError);
880 assertTrue(oobExecutor.runDueTasks() > 0);
881
882 // Fail-fast RPC will fail, while wait-for-ready RPC will still be pending
883 verify(mockCallListener2).onClose(same(transportError), any(Metadata.class));
884 verify(mockCallListener3, never()).onClose(any(Status.class), any(Metadata.class));
885
886 // Shutdown
887 assertFalse(oob1.isShutdown());
888 assertFalse(oob2.isShutdown());
889 oob1.shutdown();
890 verify(oobExecutorPool, never()).returnObject(anyObject());
891 oob2.shutdownNow();
892 assertTrue(oob1.isShutdown());
893 assertTrue(oob2.isShutdown());
894 assertTrue(oob2.isTerminated());
895 verify(oobExecutorPool).returnObject(oobExecutor.getScheduledExecutorService());
896
897 // New RPCs will be rejected.
898 assertEquals(0, oobExecutor.numPendingTasks());
899 ClientCall<String, Integer> call4 = oob1.newCall(method, CallOptions.DEFAULT);
900 ClientCall<String, Integer> call5 = oob2.newCall(method, CallOptions.DEFAULT);
901 call4.start(mockCallListener4, headers);
902 call5.start(mockCallListener5, headers);
903 assertTrue(oobExecutor.runDueTasks() > 0);
904 verify(mockCallListener4).onClose(statusCaptor.capture(), any(Metadata.class));
905 Status status4 = statusCaptor.getValue();
906 assertEquals(Status.Code.UNAVAILABLE, status4.getCode());
907 verify(mockCallListener5).onClose(statusCaptor.capture(), any(Metadata.class));
908 Status status5 = statusCaptor.getValue();
909 assertEquals(Status.Code.UNAVAILABLE, status5.getCode());
910
911 // The pending RPC will still be pending
912 verify(mockCallListener3, never()).onClose(any(Status.class), any(Metadata.class));
913
914 // This will shutdownNow() the delayed transport, terminating the pending RPC
915 assertEquals(0, oobExecutor.numPendingTasks());
916 oob1.shutdownNow();
917 assertTrue(oobExecutor.runDueTasks() > 0);
918 verify(mockCallListener3).onClose(any(Status.class), any(Metadata.class));
919
920 // Shut down the channel, and it will not terminated because OOB channel has not.
921 channel.shutdown();
922 assertFalse(channel.isTerminated());
923 // Delayed transport has already terminated. Terminating the transport terminates the
924 // subchannel, which in turn terimates the OOB channel, which terminates the channel.
925 assertFalse(oob1.isTerminated());
926 verify(oobExecutorPool).returnObject(oobExecutor.getScheduledExecutorService());
927 transportInfo.listener.transportTerminated();
928 assertTrue(oob1.isTerminated());
929 assertTrue(channel.isTerminated());
930 verify(oobExecutorPool, times(2)).returnObject(oobExecutor.getScheduledExecutorService());
931 }
932
933 @Test
934 public void oobChannelsWhenChannelShutdownNow() {
935 createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
936 ManagedChannel oob1 = helper.createOobChannel(addressGroup, "oob1Authority");
937 ManagedChannel oob2 = helper.createOobChannel(addressGroup, "oob2Authority");
938
939 oob1.newCall(method, CallOptions.DEFAULT).start(mockCallListener, new Metadata());
940 oob2.newCall(method, CallOptions.DEFAULT).start(mockCallListener2, new Metadata());
941
942 assertEquals(2, transports.size());
943 MockClientTransportInfo ti1 = transports.poll();
944 MockClientTransportInfo ti2 = transports.poll();
945
946 ti1.listener.transportReady();
947 ti2.listener.transportReady();
948
949 channel.shutdownNow();
950 verify(ti1.transport).shutdownNow(any(Status.class));
951 verify(ti2.transport).shutdownNow(any(Status.class));
952
953 ti1.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now"));
954 ti2.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now"));
955 ti1.listener.transportTerminated();
956
957 assertFalse(channel.isTerminated());
958 ti2.listener.transportTerminated();
959 assertTrue(channel.isTerminated());
960 }
961
962 @Test
963 public void oobChannelsNoConnectionShutdown() {
964 createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
965 ManagedChannel oob1 = helper.createOobChannel(addressGroup, "oob1Authority");
966 ManagedChannel oob2 = helper.createOobChannel(addressGroup, "oob2Authority");
967 channel.shutdown();
968
969 verify(mockLoadBalancer).shutdown();
970 oob1.shutdown();
971 assertTrue(oob1.isTerminated());
972 assertFalse(channel.isTerminated());
973 oob2.shutdown();
974 assertTrue(oob2.isTerminated());
975 assertTrue(channel.isTerminated());
976 verify(mockTransportFactory, never()).newClientTransport(any(SocketAddress.class), anyString(),
977 anyString());
978 }
979
980 @Test
981 public void oobChannelsNoConnectionShutdownNow() {
982 createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
Eric Anderson1e99b292017-01-19 12:16:05 -0800983 helper.createOobChannel(addressGroup, "oob1Authority");
984 helper.createOobChannel(addressGroup, "oob2Authority");
Kun Zhang7cb04972017-01-09 14:44:10 -0800985 channel.shutdownNow();
986
987 verify(mockLoadBalancer).shutdown();
988 assertTrue(channel.isTerminated());
989 // Channel's shutdownNow() will call shutdownNow() on all subchannels and oobchannels.
990 // Therefore, channel is terminated without relying on LoadBalancer to shutdown oobchannels.
991 verify(mockTransportFactory, never()).newClientTransport(any(SocketAddress.class), anyString(),
992 anyString());
993 }
994
995 @Test
996 public void uriPattern() {
997 assertTrue(ManagedChannelImpl2.URI_PATTERN.matcher("a:/").matches());
998 assertTrue(ManagedChannelImpl2.URI_PATTERN.matcher("Z019+-.:/!@ #~ ").matches());
999 assertFalse(ManagedChannelImpl2.URI_PATTERN.matcher("a/:").matches()); // "/:" not matched
1000 assertFalse(ManagedChannelImpl2.URI_PATTERN.matcher("0a:/").matches()); // '0' not matched
1001 assertFalse(ManagedChannelImpl2.URI_PATTERN.matcher("a,:/").matches()); // ',' not matched
1002 assertFalse(ManagedChannelImpl2.URI_PATTERN.matcher(" a:/").matches()); // space not matched
1003 }
1004
1005 /**
1006 * Test that information such as the Call's context, MethodDescriptor, authority, executor are
1007 * propagated to newStream() and applyRequestMetadata().
1008 */
1009 @Test
1010 public void informationPropagatedToNewStreamAndCallCredentials() {
1011 ResolvedServerInfoGroup serverInfoGroup = ResolvedServerInfoGroup.builder()
1012 .add(server).build();
1013 createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
1014 CallOptions callOptions = CallOptions.DEFAULT.withCallCredentials(creds);
1015 final Context.Key<String> testKey = Context.key("testing");
1016 Context ctx = Context.current().withValue(testKey, "testValue");
1017 final LinkedList<Context> credsApplyContexts = new LinkedList<Context>();
1018 final LinkedList<Context> newStreamContexts = new LinkedList<Context>();
1019 doAnswer(new Answer<Void>() {
1020 @Override
1021 public Void answer(InvocationOnMock in) throws Throwable {
1022 credsApplyContexts.add(Context.current());
1023 return null;
1024 }
1025 }).when(creds).applyRequestMetadata(
1026 any(MethodDescriptor.class), any(Attributes.class), any(Executor.class),
1027 any(MetadataApplier.class));
1028
1029 // First call will be on delayed transport. Only newCall() is run within the expected context,
1030 // so that we can verify that the context is explicitly attached before calling newStream() and
1031 // applyRequestMetadata(), which happens after we detach the context from the thread.
1032 Context origCtx = ctx.attach();
1033 assertEquals("testValue", testKey.get());
1034 ClientCall<String, Integer> call = channel.newCall(method, callOptions);
1035 ctx.detach(origCtx);
1036 assertNull(testKey.get());
1037 call.start(mockCallListener, new Metadata());
1038
1039 // Simulate name resolution results
1040 Subchannel subchannel = helper.createSubchannel(
1041 serverInfoGroup.toEquivalentAddressGroup(), Attributes.EMPTY);
1042 subchannel.requestConnection();
1043 verify(mockTransportFactory).newClientTransport(
1044 same(socketAddress), eq(authority), eq(userAgent));
1045 MockClientTransportInfo transportInfo = transports.poll();
1046 final ConnectionClientTransport transport = transportInfo.transport;
Lukasz Strzalkowskib33d3cb2017-01-18 10:32:45 -08001047 when(transport.getAttributes()).thenReturn(Attributes.EMPTY);
Kun Zhang7cb04972017-01-09 14:44:10 -08001048 doAnswer(new Answer<ClientStream>() {
1049 @Override
1050 public ClientStream answer(InvocationOnMock in) throws Throwable {
1051 newStreamContexts.add(Context.current());
1052 return mock(ClientStream.class);
1053 }
1054 }).when(transport).newStream(
1055 any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class),
1056 any(StatsTraceContext.class));
1057
1058 verify(creds, never()).applyRequestMetadata(
1059 any(MethodDescriptor.class), any(Attributes.class), any(Executor.class),
1060 any(MetadataApplier.class));
1061
1062 // applyRequestMetadata() is called after the transport becomes ready.
1063 transportInfo.listener.transportReady();
1064 when(mockPicker.pickSubchannel(any(Attributes.class), any(Metadata.class)))
1065 .thenReturn(PickResult.withSubchannel(subchannel));
1066 helper.updatePicker(mockPicker);
1067 executor.runDueTasks();
1068 ArgumentCaptor<Attributes> attrsCaptor = ArgumentCaptor.forClass(Attributes.class);
1069 ArgumentCaptor<MetadataApplier> applierCaptor = ArgumentCaptor.forClass(MetadataApplier.class);
1070 verify(creds).applyRequestMetadata(same(method), attrsCaptor.capture(),
1071 same(executor.getScheduledExecutorService()), applierCaptor.capture());
1072 assertEquals("testValue", testKey.get(credsApplyContexts.poll()));
1073 assertEquals(authority, attrsCaptor.getValue().get(CallCredentials.ATTR_AUTHORITY));
1074 assertEquals(SecurityLevel.NONE,
1075 attrsCaptor.getValue().get(CallCredentials.ATTR_SECURITY_LEVEL));
1076 verify(transport, never()).newStream(
1077 any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class),
1078 any(StatsTraceContext.class));
1079
1080 // newStream() is called after apply() is called
1081 applierCaptor.getValue().apply(new Metadata());
1082 verify(transport).newStream(same(method), any(Metadata.class), same(callOptions),
1083 any(StatsTraceContext.class));
1084 assertEquals("testValue", testKey.get(newStreamContexts.poll()));
1085 // The context should not live beyond the scope of newStream() and applyRequestMetadata()
1086 assertNull(testKey.get());
1087
1088
1089 // Second call will not be on delayed transport
1090 origCtx = ctx.attach();
1091 call = channel.newCall(method, callOptions);
1092 ctx.detach(origCtx);
1093 call.start(mockCallListener, new Metadata());
1094
1095 verify(creds, times(2)).applyRequestMetadata(same(method), attrsCaptor.capture(),
1096 same(executor.getScheduledExecutorService()), applierCaptor.capture());
1097 assertEquals("testValue", testKey.get(credsApplyContexts.poll()));
1098 assertEquals(authority, attrsCaptor.getValue().get(CallCredentials.ATTR_AUTHORITY));
1099 assertEquals(SecurityLevel.NONE,
1100 attrsCaptor.getValue().get(CallCredentials.ATTR_SECURITY_LEVEL));
1101 // This is from the first call
1102 verify(transport).newStream(
1103 any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class),
1104 any(StatsTraceContext.class));
1105
1106 // Still, newStream() is called after apply() is called
1107 applierCaptor.getValue().apply(new Metadata());
1108 verify(transport, times(2)).newStream(same(method), any(Metadata.class), same(callOptions),
1109 any(StatsTraceContext.class));
1110 assertEquals("testValue", testKey.get(newStreamContexts.poll()));
1111
1112 assertNull(testKey.get());
1113 }
1114
1115 private static class FakeBackoffPolicyProvider implements BackoffPolicy.Provider {
1116 @Override
1117 public BackoffPolicy get() {
1118 return new BackoffPolicy() {
1119 @Override
1120 public long nextBackoffMillis() {
1121 return 1;
1122 }
1123 };
1124 }
1125 }
1126
1127 private class FakeNameResolverFactory extends NameResolver.Factory {
1128 final List<ResolvedServerInfoGroup> servers;
1129 final boolean resolvedAtStart;
1130 final ArrayList<FakeNameResolver> resolvers = new ArrayList<FakeNameResolver>();
1131
1132 FakeNameResolverFactory(boolean resolvedAtStart) {
1133 this.resolvedAtStart = resolvedAtStart;
1134 servers = Collections.singletonList(ResolvedServerInfoGroup.builder().add(server).build());
1135 }
1136
1137 FakeNameResolverFactory(List<ResolvedServerInfo> servers) {
1138 resolvedAtStart = true;
1139 this.servers = Collections.singletonList(
1140 ResolvedServerInfoGroup.builder().addAll(servers).build());
1141 }
1142
1143 public FakeNameResolverFactory() {
1144 resolvedAtStart = true;
1145 this.servers = ImmutableList.of();
1146 }
1147
1148 @Override
1149 public NameResolver newNameResolver(final URI targetUri, Attributes params) {
1150 if (!expectedUri.equals(targetUri)) {
1151 return null;
1152 }
1153 assertSame(NAME_RESOLVER_PARAMS, params);
1154 FakeNameResolver resolver = new FakeNameResolver();
1155 resolvers.add(resolver);
1156 return resolver;
1157 }
1158
1159 @Override
1160 public String getDefaultScheme() {
1161 return "fake";
1162 }
1163
1164 void allResolved() {
1165 for (FakeNameResolver resolver : resolvers) {
1166 resolver.resolved();
1167 }
1168 }
1169
1170 private class FakeNameResolver extends NameResolver {
1171 Listener listener;
1172 boolean shutdown;
1173
1174 @Override public String getServiceAuthority() {
1175 return expectedUri.getAuthority();
1176 }
1177
1178 @Override public void start(final Listener listener) {
1179 this.listener = listener;
1180 if (resolvedAtStart) {
1181 resolved();
1182 }
1183 }
1184
1185 void resolved() {
1186 listener.onUpdate(servers, Attributes.EMPTY);
1187 }
1188
1189 @Override public void shutdown() {
1190 shutdown = true;
1191 }
1192 }
1193 }
1194
1195 private static class FailingNameResolverFactory extends NameResolver.Factory {
1196 final Status error;
1197
1198 FailingNameResolverFactory(Status error) {
1199 this.error = error;
1200 }
1201
1202 @Override
1203 public NameResolver newNameResolver(URI notUsedUri, Attributes params) {
1204 return new NameResolver() {
1205 @Override public String getServiceAuthority() {
1206 return "irrelevant-authority";
1207 }
1208
1209 @Override public void start(final Listener listener) {
1210 listener.onError(error);
1211 }
1212
1213 @Override public void shutdown() {}
1214 };
1215 }
1216
1217 @Override
1218 public String getDefaultScheme() {
1219 return "fake";
1220 }
1221 }
1222}