| /* |
| * Copyright 2014, gRPC Authors All rights reserved. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package io.grpc.internal; |
| |
| import static com.google.common.base.Preconditions.checkState; |
| import static com.google.common.util.concurrent.MoreExecutors.directExecutor; |
| import static io.grpc.Contexts.statusFromCancelled; |
| import static io.grpc.Status.DEADLINE_EXCEEDED; |
| import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY; |
| import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY; |
| import static java.util.concurrent.TimeUnit.NANOSECONDS; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| import io.grpc.Attributes; |
| import io.grpc.CompressorRegistry; |
| import io.grpc.Context; |
| import io.grpc.Decompressor; |
| import io.grpc.DecompressorRegistry; |
| import io.grpc.HandlerRegistry; |
| import io.grpc.InternalServerInterceptors; |
| import io.grpc.Metadata; |
| import io.grpc.ServerCall; |
| import io.grpc.ServerCallHandler; |
| import io.grpc.ServerInterceptor; |
| import io.grpc.ServerMethodDefinition; |
| import io.grpc.ServerServiceDefinition; |
| import io.grpc.ServerTransportFilter; |
| import io.grpc.Status; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import javax.annotation.concurrent.GuardedBy; |
| |
| /** |
| * Default implementation of {@link io.grpc.Server}, for creation by transports. |
| * |
| * <p>Expected usage (by a theoretical TCP transport): |
| * <pre><code>public class TcpTransportServerFactory { |
| * public static Server newServer(Executor executor, HandlerRegistry registry, |
| * String configuration) { |
| * return new ServerImpl(executor, registry, new TcpTransportServer(configuration)); |
| * } |
| * }</code></pre> |
| * |
| * <p>Starting the server starts the underlying transport for servicing requests. Stopping the |
| * server stops servicing new requests and waits for all connections to terminate. |
| */ |
| public final class ServerImpl extends io.grpc.Server implements WithLogId { |
| private static final ServerStreamListener NOOP_LISTENER = new NoopListener(); |
| |
| private final LogId logId = LogId.allocate(getClass().getName()); |
| private final ObjectPool<? extends Executor> executorPool; |
| /** Executor for application processing. Safe to read after {@link #start()}. */ |
| private Executor executor; |
| private final InternalHandlerRegistry registry; |
| private final HandlerRegistry fallbackRegistry; |
| private final List<ServerTransportFilter> transportFilters; |
| // This is iterated on a per-call basis. Use an array instead of a Collection to avoid iterator |
| // creations. |
| private final ServerInterceptor[] interceptors; |
| @GuardedBy("lock") private boolean started; |
| @GuardedBy("lock") private boolean shutdown; |
| /** non-{@code null} if immediate shutdown has been requested. */ |
| @GuardedBy("lock") private Status shutdownNowStatus; |
| /** {@code true} if ServerListenerImpl.serverShutdown() was called. */ |
| @GuardedBy("lock") private boolean serverShutdownCallbackInvoked; |
| @GuardedBy("lock") private boolean terminated; |
| /** Service encapsulating something similar to an accept() socket. */ |
| private final InternalServer transportServer; |
| private final Object lock = new Object(); |
| @GuardedBy("lock") private boolean transportServerTerminated; |
| /** {@code transportServer} and services encapsulating something similar to a TCP connection. */ |
| @GuardedBy("lock") private final Collection<ServerTransport> transports = |
| new HashSet<ServerTransport>(); |
| |
| private final ObjectPool<ScheduledExecutorService> timeoutServicePool; |
| private ScheduledExecutorService timeoutService; |
| private final Context rootContext; |
| |
| private final DecompressorRegistry decompressorRegistry; |
| private final CompressorRegistry compressorRegistry; |
| |
| /** |
| * Construct a server. |
| * |
| * @param executorPool provides an executor to call methods on behalf of remote clients |
| * @param registry the primary method registry |
| * @param fallbackRegistry the secondary method registry, used only if the primary registry |
| * doesn't have the method |
| */ |
| ServerImpl( |
| AbstractServerImplBuilder<?> builder, |
| ObjectPool<ScheduledExecutorService> timeoutServicePool, |
| InternalServer transportServer, |
| Context rootContext) { |
| this.executorPool = Preconditions.checkNotNull(builder.executorPool, "executorPool"); |
| this.timeoutServicePool = Preconditions.checkNotNull(timeoutServicePool, "timeoutServicePool"); |
| this.registry = Preconditions.checkNotNull(builder.registryBuilder.build(), "registryBuilder"); |
| this.fallbackRegistry = |
| Preconditions.checkNotNull(builder.fallbackRegistry, "fallbackRegistry"); |
| this.transportServer = Preconditions.checkNotNull(transportServer, "transportServer"); |
| // Fork from the passed in context so that it does not propagate cancellation, it only |
| // inherits values. |
| this.rootContext = Preconditions.checkNotNull(rootContext, "rootContext").fork(); |
| this.decompressorRegistry = builder.decompressorRegistry; |
| this.compressorRegistry = builder.compressorRegistry; |
| this.transportFilters = Collections.unmodifiableList( |
| new ArrayList<ServerTransportFilter>(builder.transportFilters)); |
| this.interceptors = |
| builder.interceptors.toArray(new ServerInterceptor[builder.interceptors.size()]); |
| } |
| |
| /** |
| * Bind and start the server. |
| * |
| * @return {@code this} object |
| * @throws IllegalStateException if already started |
| * @throws IOException if unable to bind |
| */ |
| @Override |
| public ServerImpl start() throws IOException { |
| synchronized (lock) { |
| checkState(!started, "Already started"); |
| checkState(!shutdown, "Shutting down"); |
| // Start and wait for any port to actually be bound. |
| transportServer.start(new ServerListenerImpl()); |
| timeoutService = Preconditions.checkNotNull(timeoutServicePool.getObject(), "timeoutService"); |
| executor = Preconditions.checkNotNull(executorPool.getObject(), "executor"); |
| started = true; |
| return this; |
| } |
| } |
| |
| @Override |
| public int getPort() { |
| synchronized (lock) { |
| checkState(started, "Not started"); |
| checkState(!terminated, "Already terminated"); |
| return transportServer.getPort(); |
| } |
| } |
| |
| @Override |
| public List<ServerServiceDefinition> getServices() { |
| List<ServerServiceDefinition> fallbackServices = fallbackRegistry.getServices(); |
| if (fallbackServices.isEmpty()) { |
| return registry.getServices(); |
| } else { |
| List<ServerServiceDefinition> registryServices = registry.getServices(); |
| int servicesCount = registryServices.size() + fallbackServices.size(); |
| List<ServerServiceDefinition> services = |
| new ArrayList<ServerServiceDefinition>(servicesCount); |
| services.addAll(registryServices); |
| services.addAll(fallbackServices); |
| return Collections.unmodifiableList(services); |
| } |
| } |
| |
| @Override |
| public List<ServerServiceDefinition> getImmutableServices() { |
| return registry.getServices(); |
| } |
| |
| @Override |
| public List<ServerServiceDefinition> getMutableServices() { |
| return Collections.unmodifiableList(fallbackRegistry.getServices()); |
| } |
| |
| /** |
| * Initiates an orderly shutdown in which preexisting calls continue but new calls are rejected. |
| */ |
| @Override |
| public ServerImpl shutdown() { |
| boolean shutdownTransportServer; |
| synchronized (lock) { |
| if (shutdown) { |
| return this; |
| } |
| shutdown = true; |
| shutdownTransportServer = started; |
| if (!shutdownTransportServer) { |
| transportServerTerminated = true; |
| checkForTermination(); |
| } |
| } |
| if (shutdownTransportServer) { |
| transportServer.shutdown(); |
| } |
| return this; |
| } |
| |
| @Override |
| public ServerImpl shutdownNow() { |
| shutdown(); |
| Collection<ServerTransport> transportsCopy; |
| Status nowStatus = Status.UNAVAILABLE.withDescription("Server shutdownNow invoked"); |
| boolean savedServerShutdownCallbackInvoked; |
| synchronized (lock) { |
| // Short-circuiting not strictly necessary, but prevents transports from needing to handle |
| // multiple shutdownNow invocations if shutdownNow is called multiple times. |
| if (shutdownNowStatus != null) { |
| return this; |
| } |
| shutdownNowStatus = nowStatus; |
| transportsCopy = new ArrayList<ServerTransport>(transports); |
| savedServerShutdownCallbackInvoked = serverShutdownCallbackInvoked; |
| } |
| // Short-circuiting not strictly necessary, but prevents transports from needing to handle |
| // multiple shutdownNow invocations, between here and the serverShutdown callback. |
| if (savedServerShutdownCallbackInvoked) { |
| // Have to call shutdownNow, because serverShutdown callback only called shutdown, not |
| // shutdownNow |
| for (ServerTransport transport : transportsCopy) { |
| transport.shutdownNow(nowStatus); |
| } |
| } |
| return this; |
| } |
| |
| @Override |
| public boolean isShutdown() { |
| synchronized (lock) { |
| return shutdown; |
| } |
| } |
| |
| @Override |
| public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { |
| synchronized (lock) { |
| long timeoutNanos = unit.toNanos(timeout); |
| long endTimeNanos = System.nanoTime() + timeoutNanos; |
| while (!terminated && (timeoutNanos = endTimeNanos - System.nanoTime()) > 0) { |
| NANOSECONDS.timedWait(lock, timeoutNanos); |
| } |
| return terminated; |
| } |
| } |
| |
| @Override |
| public void awaitTermination() throws InterruptedException { |
| synchronized (lock) { |
| while (!terminated) { |
| lock.wait(); |
| } |
| } |
| } |
| |
| @Override |
| public boolean isTerminated() { |
| synchronized (lock) { |
| return terminated; |
| } |
| } |
| |
| /** |
| * Remove transport service from accounting collection and notify of complete shutdown if |
| * necessary. |
| * |
| * @param transport service to remove |
| */ |
| private void transportClosed(ServerTransport transport) { |
| synchronized (lock) { |
| if (!transports.remove(transport)) { |
| throw new AssertionError("Transport already removed"); |
| } |
| checkForTermination(); |
| } |
| } |
| |
| /** Notify of complete shutdown if necessary. */ |
| private void checkForTermination() { |
| synchronized (lock) { |
| if (shutdown && transports.isEmpty() && transportServerTerminated) { |
| if (terminated) { |
| throw new AssertionError("Server already terminated"); |
| } |
| terminated = true; |
| if (timeoutService != null) { |
| timeoutService = timeoutServicePool.returnObject(timeoutService); |
| } |
| if (executor != null) { |
| executor = executorPool.returnObject(executor); |
| } |
| // TODO(carl-mastrangelo): move this outside the synchronized block. |
| lock.notifyAll(); |
| } |
| } |
| } |
| |
| private class ServerListenerImpl implements ServerListener { |
| @Override |
| public ServerTransportListener transportCreated(ServerTransport transport) { |
| synchronized (lock) { |
| transports.add(transport); |
| } |
| return new ServerTransportListenerImpl(transport); |
| } |
| |
| @Override |
| public void serverShutdown() { |
| ArrayList<ServerTransport> copiedTransports; |
| Status shutdownNowStatusCopy; |
| synchronized (lock) { |
| // transports collection can be modified during shutdown(), even if we hold the lock, due |
| // to reentrancy. |
| copiedTransports = new ArrayList<ServerTransport>(transports); |
| shutdownNowStatusCopy = shutdownNowStatus; |
| serverShutdownCallbackInvoked = true; |
| } |
| for (ServerTransport transport : copiedTransports) { |
| if (shutdownNowStatusCopy == null) { |
| transport.shutdown(); |
| } else { |
| transport.shutdownNow(shutdownNowStatusCopy); |
| } |
| } |
| synchronized (lock) { |
| transportServerTerminated = true; |
| checkForTermination(); |
| } |
| } |
| } |
| |
| private class ServerTransportListenerImpl implements ServerTransportListener { |
| private final ServerTransport transport; |
| private Attributes attributes; |
| |
| public ServerTransportListenerImpl(ServerTransport transport) { |
| this.transport = transport; |
| } |
| |
| @Override |
| public Attributes transportReady(Attributes attributes) { |
| for (ServerTransportFilter filter : transportFilters) { |
| attributes = Preconditions.checkNotNull(filter.transportReady(attributes), |
| "Filter %s returned null", filter); |
| } |
| this.attributes = attributes; |
| return attributes; |
| } |
| |
| @Override |
| public void transportTerminated() { |
| for (ServerTransportFilter filter : transportFilters) { |
| filter.transportTerminated(attributes); |
| } |
| transportClosed(transport); |
| } |
| |
| @Override |
| public void streamCreated( |
| final ServerStream stream, final String methodName, final Metadata headers) { |
| if (headers.containsKey(MESSAGE_ENCODING_KEY)) { |
| String encoding = headers.get(MESSAGE_ENCODING_KEY); |
| Decompressor decompressor = decompressorRegistry.lookupDecompressor(encoding); |
| if (decompressor == null) { |
| stream.close( |
| Status.UNIMPLEMENTED.withDescription( |
| String.format("Can't find decompressor for %s", encoding)), |
| new Metadata()); |
| return; |
| } |
| stream.setDecompressor(decompressor); |
| } |
| |
| final StatsTraceContext statsTraceCtx = Preconditions.checkNotNull( |
| stream.statsTraceContext(), "statsTraceCtx not present from stream"); |
| |
| final Context.CancellableContext context = createContext(stream, headers, statsTraceCtx); |
| final Executor wrappedExecutor; |
| // This is a performance optimization that avoids the synchronization and queuing overhead |
| // that comes with SerializingExecutor. |
| if (executor == directExecutor()) { |
| wrappedExecutor = new SerializeReentrantCallsDirectExecutor(); |
| } else { |
| wrappedExecutor = new SerializingExecutor(executor); |
| } |
| |
| final JumpToApplicationThreadServerStreamListener jumpListener |
| = new JumpToApplicationThreadServerStreamListener( |
| wrappedExecutor, executor, stream, context); |
| stream.setListener(jumpListener); |
| // Run in wrappedExecutor so jumpListener.setListener() is called before any callbacks |
| // are delivered, including any errors. Callbacks can still be triggered, but they will be |
| // queued. |
| wrappedExecutor.execute(new ContextRunnable(context) { |
| @Override |
| public void runInContext() { |
| ServerStreamListener listener = NOOP_LISTENER; |
| try { |
| ServerMethodDefinition<?, ?> method = registry.lookupMethod(methodName); |
| if (method == null) { |
| method = fallbackRegistry.lookupMethod(methodName, stream.getAuthority()); |
| } |
| if (method == null) { |
| Status status = Status.UNIMPLEMENTED.withDescription( |
| "Method not found: " + methodName); |
| // TODO(zhangkun83): this error may be recorded by the tracer, and if it's kept in |
| // memory as a map whose key is the method name, this would allow a misbehaving |
| // client to blow up the server in-memory stats storage by sending large number of |
| // distinct unimplemented method |
| // names. (https://github.com/grpc/grpc-java/issues/2285) |
| stream.close(status, new Metadata()); |
| context.cancel(null); |
| return; |
| } |
| listener = startCall(stream, methodName, method, headers, context, statsTraceCtx); |
| } catch (RuntimeException e) { |
| stream.close(Status.fromThrowable(e), new Metadata()); |
| context.cancel(null); |
| throw e; |
| } catch (Error e) { |
| stream.close(Status.fromThrowable(e), new Metadata()); |
| context.cancel(null); |
| throw e; |
| } finally { |
| jumpListener.setListener(listener); |
| } |
| } |
| }); |
| } |
| |
| private Context.CancellableContext createContext( |
| final ServerStream stream, Metadata headers, StatsTraceContext statsTraceCtx) { |
| Long timeoutNanos = headers.get(TIMEOUT_KEY); |
| |
| Context baseContext = statsTraceCtx.serverFilterContext(rootContext); |
| |
| if (timeoutNanos == null) { |
| return baseContext.withCancellation(); |
| } |
| |
| Context.CancellableContext context = |
| baseContext.withDeadlineAfter(timeoutNanos, NANOSECONDS, timeoutService); |
| context.addListener(new Context.CancellationListener() { |
| @Override |
| public void cancelled(Context context) { |
| Status status = statusFromCancelled(context); |
| if (DEADLINE_EXCEEDED.getCode().equals(status.getCode())) { |
| // This should rarely get run, since the client will likely cancel the stream before |
| // the timeout is reached. |
| stream.cancel(status); |
| } |
| } |
| }, directExecutor()); |
| |
| return context; |
| } |
| |
| /** Never returns {@code null}. */ |
| private <ReqT, RespT> ServerStreamListener startCall(ServerStream stream, String fullMethodName, |
| ServerMethodDefinition<ReqT, RespT> methodDef, Metadata headers, |
| Context.CancellableContext context, StatsTraceContext statsTraceCtx) { |
| // TODO(ejona86): should we update fullMethodName to have the canonical path of the method? |
| ServerCallImpl<ReqT, RespT> call = new ServerCallImpl<ReqT, RespT>( |
| stream, methodDef.getMethodDescriptor(), headers, context, |
| decompressorRegistry, compressorRegistry); |
| ServerCallHandler<ReqT, RespT> callHandler = methodDef.getServerCallHandler(); |
| statsTraceCtx.serverCallStarted(call); |
| for (ServerInterceptor interceptor : interceptors) { |
| callHandler = InternalServerInterceptors.interceptCallHandler(interceptor, callHandler); |
| } |
| ServerCall.Listener<ReqT> listener = callHandler.startCall(call, headers); |
| if (listener == null) { |
| throw new NullPointerException( |
| "startCall() returned a null listener for method " + fullMethodName); |
| } |
| return call.newServerStreamListener(listener); |
| } |
| } |
| |
| @Override |
| public LogId getLogId() { |
| return logId; |
| } |
| |
| private static class NoopListener implements ServerStreamListener { |
| @Override |
| public void messageRead(InputStream value) { |
| try { |
| value.close(); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @Override |
| public void halfClosed() {} |
| |
| @Override |
| public void closed(Status status) {} |
| |
| @Override |
| public void onReady() {} |
| } |
| |
| /** |
| * Dispatches callbacks onto an application-provided executor and correctly propagates |
| * exceptions. |
| */ |
| @VisibleForTesting |
| static class JumpToApplicationThreadServerStreamListener implements ServerStreamListener { |
| private final Executor callExecutor; |
| private final Executor cancelExecutor; |
| private final Context.CancellableContext context; |
| private final ServerStream stream; |
| // Only accessed from callExecutor. |
| private ServerStreamListener listener; |
| |
| public JumpToApplicationThreadServerStreamListener(Executor executor, |
| Executor cancelExecutor, ServerStream stream, Context.CancellableContext context) { |
| this.callExecutor = executor; |
| this.cancelExecutor = cancelExecutor; |
| this.stream = stream; |
| this.context = context; |
| } |
| |
| /** |
| * This call MUST be serialized on callExecutor to avoid races. |
| */ |
| private ServerStreamListener getListener() { |
| if (listener == null) { |
| throw new IllegalStateException("listener unset"); |
| } |
| return listener; |
| } |
| |
| @VisibleForTesting |
| void setListener(ServerStreamListener listener) { |
| Preconditions.checkNotNull(listener, "listener must not be null"); |
| Preconditions.checkState(this.listener == null, "Listener already set"); |
| this.listener = listener; |
| } |
| |
| /** |
| * Like {@link ServerCall#close(Status, Metadata)}, but thread-safe for internal use. |
| */ |
| private void internalClose() { |
| // TODO(ejona86): this is not thread-safe :) |
| stream.close(Status.UNKNOWN, new Metadata()); |
| } |
| |
| @Override |
| public void messageRead(final InputStream message) { |
| callExecutor.execute(new ContextRunnable(context) { |
| @Override |
| public void runInContext() { |
| try { |
| getListener().messageRead(message); |
| } catch (RuntimeException e) { |
| internalClose(); |
| throw e; |
| } catch (Error e) { |
| internalClose(); |
| throw e; |
| } |
| } |
| }); |
| } |
| |
| @Override |
| public void halfClosed() { |
| callExecutor.execute(new ContextRunnable(context) { |
| @Override |
| public void runInContext() { |
| try { |
| getListener().halfClosed(); |
| } catch (RuntimeException e) { |
| internalClose(); |
| throw e; |
| } catch (Error e) { |
| internalClose(); |
| throw e; |
| } |
| } |
| }); |
| } |
| |
| @Override |
| public void closed(final Status status) { |
| // For cancellations, promptly inform any users of the context that their work should be |
| // aborted. Otherwise, we can wait until pending work is done. |
| if (!status.isOk()) { |
| // The callExecutor might be busy doing user work. To avoid waiting, use an executor that |
| // is not serializing. |
| cancelExecutor.execute(new ContextCloser(context, status.getCause())); |
| } |
| callExecutor.execute(new ContextRunnable(context) { |
| @Override |
| public void runInContext() { |
| getListener().closed(status); |
| } |
| }); |
| } |
| |
| @Override |
| public void onReady() { |
| callExecutor.execute(new ContextRunnable(context) { |
| @Override |
| public void runInContext() { |
| try { |
| getListener().onReady(); |
| } catch (RuntimeException e) { |
| internalClose(); |
| throw e; |
| } catch (Error e) { |
| internalClose(); |
| throw e; |
| } |
| } |
| }); |
| } |
| } |
| |
| @VisibleForTesting |
| static class ContextCloser implements Runnable { |
| private final Context.CancellableContext context; |
| private final Throwable cause; |
| |
| ContextCloser(Context.CancellableContext context, Throwable cause) { |
| this.context = context; |
| this.cause = cause; |
| } |
| |
| @Override |
| public void run() { |
| context.cancel(cause); |
| } |
| } |
| } |