blob: 921e96f4cb6cbb614d757f347589e5b1341f6732 [file] [log] [blame]
Kun Zhang7cb04972017-01-09 14:44:10 -08001/*
2 * Copyright 2016, Google Inc. All rights reserved.
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions are
6 * met:
7 *
8 * * Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above
11 * copyright notice, this list of conditions and the following disclaimer
12 * in the documentation and/or other materials provided with the
13 * distribution.
14 *
15 * * Neither the name of Google Inc. nor the names of its
16 * contributors may be used to endorse or promote products derived from
17 * this software without specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
20 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
21 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
22 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
23 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
26 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
27 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
28 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 */
31
32package io.grpc.internal;
33
34import static com.google.common.base.Preconditions.checkNotNull;
35import static io.grpc.ConnectivityState.READY;
36import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
37
38import com.google.common.base.Stopwatch;
39import com.google.common.base.Supplier;
40import com.google.instrumentation.stats.StatsContextFactory;
41
42import io.grpc.Attributes;
43import io.grpc.CallOptions;
44import io.grpc.ClientCall;
45import io.grpc.ConnectivityStateInfo;
46import io.grpc.EquivalentAddressGroup;
47import io.grpc.LoadBalancer2.PickResult;
48import io.grpc.LoadBalancer2.SubchannelPicker;
49import io.grpc.ManagedChannel;
50import io.grpc.Metadata;
51import io.grpc.MethodDescriptor;
52import io.grpc.Status;
53import io.grpc.internal.ClientCallImpl.ClientTransportProvider;
54
55import java.util.concurrent.CountDownLatch;
56import java.util.concurrent.Executor;
57import java.util.concurrent.ScheduledExecutorService;
58import java.util.concurrent.TimeUnit;
59import java.util.logging.Level;
60import java.util.logging.Logger;
61import javax.annotation.concurrent.ThreadSafe;
62
63/**
64 * A ManagedChannel backed by a single {@link InternalSubchannel} and used for {@link LoadBalancer2}
65 * to its own RPC needs.
66 */
67@ThreadSafe
68final class OobChannel extends ManagedChannel implements WithLogId {
69 private static final Logger log = Logger.getLogger(OobChannel.class.getName());
70
71 private SubchannelImpl subchannelImpl;
72 private SubchannelPicker subchannelPicker;
73
74 private final LogId logId = LogId.allocate(getClass().getName());
75 private final StatsContextFactory statsFactory;
76 private final String authority;
77 private final DelayedClientTransport2 delayedTransport;
78 private final ObjectPool<? extends Executor> executorPool;
79 private final Executor executor;
80 private final ScheduledExecutorService deadlineCancellationExecutor;
81 private final Supplier<Stopwatch> stopwatchSupplier;
82 private final CountDownLatch terminatedLatch = new CountDownLatch(1);
83 private volatile boolean shutdown;
84
85 private final ClientTransportProvider transportProvider = new ClientTransportProvider() {
86 @Override
87 public ClientTransport get(CallOptions callOptions, Metadata headers) {
88 // delayed transport's newStream() always acquires a lock, but concurrent performance doesn't
89 // matter here because OOB communication should be sparse, and it's not on application RPC's
90 // critical path.
91 return delayedTransport;
92 }
93 };
94
95 OobChannel(StatsContextFactory statsFactory, String authority,
96 ObjectPool<? extends Executor> executorPool,
97 ScheduledExecutorService deadlineCancellationExecutor, Supplier<Stopwatch> stopwatchSupplier,
98 ChannelExecutor channelExecutor) {
99 this.statsFactory = checkNotNull(statsFactory, "statsFactory");
100 this.authority = checkNotNull(authority, "authority");
101 this.executorPool = checkNotNull(executorPool, "executorPool");
102 this.executor = checkNotNull(executorPool.getObject(), "executor");
103 this.deadlineCancellationExecutor = checkNotNull(
104 deadlineCancellationExecutor, "deadlineCancellationExecutor");
105 this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
106 this.delayedTransport = new DelayedClientTransport2(executor, channelExecutor);
107 this.delayedTransport.start(new ManagedClientTransport.Listener() {
108 @Override
109 public void transportShutdown(Status s) {
110 // Don't care
111 }
112
113 @Override
114 public void transportTerminated() {
115 subchannelImpl.shutdown();
116 }
117
118 @Override
119 public void transportReady() {
120 // Don't care
121 }
122
123 @Override
124 public void transportInUse(boolean inUse) {
125 // Don't care
126 }
127 });
128 }
129
130 // Must be called only once, right after the OobChannel is created.
131 void setSubchannel(final InternalSubchannel subchannel) {
132 log.log(Level.FINE, "[{0}] Created with [{1}]", new Object[] {this, subchannel});
133 subchannelImpl = new SubchannelImpl() {
134 @Override
135 public void shutdown() {
136 subchannel.shutdown();
137 }
138
139 @Override
140 ClientTransport obtainActiveTransport() {
141 return subchannel.obtainActiveTransport();
142 }
143
144 @Override
145 public void requestConnection() {
146 subchannel.obtainActiveTransport();
147 }
148
149 @Override
150 public EquivalentAddressGroup getAddresses() {
151 return subchannel.getAddressGroup();
152 }
153
154 @Override
155 public Attributes getAttributes() {
156 return Attributes.EMPTY;
157 }
158 };
159
160 subchannelPicker = new SubchannelPicker() {
161 final PickResult result = PickResult.withSubchannel(subchannelImpl);
162
163 @Override
164 public PickResult pickSubchannel(Attributes affinity, Metadata headers) {
165 return result;
166 }
167 };
168 delayedTransport.reprocess(subchannelPicker);
169 }
170
171 @Override
172 public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
173 MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) {
174 StatsTraceContext statsTraceCtx = StatsTraceContext.newClientContext(
175 methodDescriptor.getFullMethodName(), statsFactory, stopwatchSupplier);
176 return new ClientCallImpl<RequestT, ResponseT>(methodDescriptor,
177 callOptions.getExecutor() == null ? executor : callOptions.getExecutor(),
178 callOptions, statsTraceCtx, transportProvider,
179 deadlineCancellationExecutor);
180 }
181
182 @Override
183 public String authority() {
184 return authority;
185 }
186
187 @Override
188 public LogId getLogId() {
189 return logId;
190 }
191
192 @Override
193 public boolean isTerminated() {
194 return terminatedLatch.getCount() == 0;
195 }
196
197 @Override
198 public boolean awaitTermination(long time, TimeUnit unit) throws InterruptedException {
199 return terminatedLatch.await(time, unit);
200 }
201
202 @Override
203 public ManagedChannel shutdown() {
204 shutdown = true;
205 delayedTransport.shutdown();
206 return this;
207 }
208
209 @Override
210 public boolean isShutdown() {
211 return shutdown;
212 }
213
214 @Override
215 public ManagedChannel shutdownNow() {
216 shutdown = true;
217 delayedTransport.shutdownNow(
218 Status.UNAVAILABLE.withDescription("OobChannel.shutdownNow() called"));
219 return this;
220 }
221
222 void handleSubchannelStateChange(final ConnectivityStateInfo newState) {
223 switch (newState.getState()) {
224 case READY:
225 case IDLE:
226 delayedTransport.reprocess(subchannelPicker);
227 break;
228 case TRANSIENT_FAILURE:
229 delayedTransport.reprocess(new SubchannelPicker() {
230 final PickResult errorResult = PickResult.withError(newState.getStatus());
231
232 @Override
233 public PickResult pickSubchannel(Attributes affinity, Metadata headers) {
234 return errorResult;
235 }
236 });
237 break;
238 default:
239 // Do nothing
240 }
241 }
242
243 void handleSubchannelTerminated() {
244 // When delayedTransport is terminated, it shuts down subchannel. Therefore, at this point
245 // both delayedTransport and subchannel have terminated.
246 executorPool.returnObject(executor);
247 terminatedLatch.countDown();
248 }
249}