Convenient builders for channels and servers.
Right now only netty-based builders are implemented. OkHttp based
builders will be created separately.
Minimal example of creating a stub:
ChannelImpl channel = NettyChannelBuilder
.newBuilder(new InetSocketAddress("localhost", 8980))
.buildAndWaitForRunning();
StockBlockingStub stub = StockGrpc.newStub(channel);
Minimal example of creating and starting a server:
ServerImpl server = NettyServerBuilder.newBuilder(8980)
.addService(StockGrpc.bindService(new StockServer()))
.buildAndWaitForRunning();
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=77787502
diff --git a/core/src/main/java/com/google/net/stubby/AbstractChannelBuilder.java b/core/src/main/java/com/google/net/stubby/AbstractChannelBuilder.java
new file mode 100644
index 0000000..7f74d45
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/AbstractChannelBuilder.java
@@ -0,0 +1,47 @@
+package com.google.net.stubby;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.Service;
+import com.google.net.stubby.newtransport.ClientTransportFactory;
+
+import java.util.concurrent.ExecutorService;
+
+import javax.annotation.Nullable;
+
+/**
+ * The base class for channel builders.
+ */
+public abstract class AbstractChannelBuilder<BuilderT extends AbstractChannelBuilder>
+ extends AbstractServiceBuilder<ChannelImpl, BuilderT> {
+
+ @Override
+ protected final ChannelImpl buildImpl(ExecutorService executor) {
+ ChannelEssentials essentials = buildEssentials();
+ ChannelImpl channel = new ChannelImpl(essentials.transportFactory, executor);
+ if (essentials.listener != null) {
+ channel.addListener(essentials.listener, MoreExecutors.directExecutor());
+ }
+ return channel;
+ }
+
+ /**
+ * The essentials required for creating a channel.
+ */
+ protected static class ChannelEssentials {
+ final ClientTransportFactory transportFactory;
+ @Nullable final Service.Listener listener;
+
+ /**
+ * @param transportFactory the created channel uses this factory to create transports
+ * @param listener will be called at the channel's life-cycle events
+ */
+ public ChannelEssentials(ClientTransportFactory transportFactory,
+ @Nullable Service.Listener listener) {
+ this.transportFactory = Preconditions.checkNotNull(transportFactory);
+ this.listener = listener;
+ }
+ }
+
+ protected abstract ChannelEssentials buildEssentials();
+}
diff --git a/core/src/main/java/com/google/net/stubby/AbstractServerBuilder.java b/core/src/main/java/com/google/net/stubby/AbstractServerBuilder.java
new file mode 100644
index 0000000..11d805a
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/AbstractServerBuilder.java
@@ -0,0 +1,53 @@
+package com.google.net.stubby;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Service;
+import com.google.net.stubby.newtransport.ServerListener;
+
+import java.util.concurrent.ExecutorService;
+
+/**
+ * The base class for server builders.
+ */
+public abstract class AbstractServerBuilder<BuilderT extends AbstractServerBuilder>
+ extends AbstractServiceBuilder<ServerImpl, BuilderT> {
+
+ private final HandlerRegistry registry;
+
+ /**
+ * Constructs using a given handler registry.
+ */
+ protected AbstractServerBuilder(HandlerRegistry registry) {
+ this.registry = Preconditions.checkNotNull(registry);
+ }
+
+ /**
+ * Constructs with a MutableHandlerRegistry created internally.
+ */
+ protected AbstractServerBuilder() {
+ this.registry = new MutableHandlerRegistryImpl();
+ }
+
+ /**
+ * Adds a service implementation to the handler registry.
+ *
+ * <p>This is supported only if the user didn't provide a handler registry, or the provided one is
+ * a {@link MutableHandlerRegistry}. Otherwise it throws an UnsupportedOperationException.
+ */
+ public final BuilderT addService(ServerServiceDefinition service) {
+ if (registry instanceof MutableHandlerRegistry) {
+ ((MutableHandlerRegistry) registry).addService(service);
+ return (BuilderT) this;
+ }
+ throw new UnsupportedOperationException("Underlying HandlerRegistry is not mutable");
+ }
+
+ @Override
+ protected final ServerImpl buildImpl(ExecutorService executor) {
+ ServerImpl server = new ServerImpl(executor, registry);
+ server.setTransportServer(buildTransportServer(server.serverListener()));
+ return server;
+ }
+
+ protected abstract Service buildTransportServer(ServerListener serverListener);
+}
diff --git a/core/src/main/java/com/google/net/stubby/AbstractServiceBuilder.java b/core/src/main/java/com/google/net/stubby/AbstractServiceBuilder.java
new file mode 100644
index 0000000..3b7616a
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/AbstractServiceBuilder.java
@@ -0,0 +1,137 @@
+package com.google.net.stubby;
+
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.Service;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.annotation.Nullable;
+
+/**
+ * Base class for channel builders and server builders.
+ *
+ * <p>The ownership rule: a builder generally does not take ownership of any objects passed to it.
+ * The caller is responsible for closing them if needed. The builder is only responsible for the
+ * life-cycle of objects created inside.
+ */
+abstract class AbstractServiceBuilder<ProductT extends Service,
+ BuilderT extends AbstractServiceBuilder> {
+
+ @Nullable
+ private ExecutorService userExecutor;
+
+ /**
+ * Provides a custom executor.
+ *
+ * <p>It's an optional parameter. If the user has not provided an executor when the service is
+ * built, the builder will create a cached thread-pool executor.
+ *
+ * <p>The service won't take ownership of the given executor. It's caller's responsibility to
+ * shut down the executor when it's desired.
+ */
+ public final BuilderT executor(ExecutorService executor) {
+ userExecutor = executor;
+ return (BuilderT) this;
+ }
+
+ /**
+ * Builds a service using the given parameters.
+ *
+ * <p>The returned service has not been started at this point. You will need to start it by
+ * yourself or use {@link #buildAndStart()}.
+ */
+ private ProductT build() {
+ final ExecutorService executor = (userExecutor == null)
+ ? Executors.newCachedThreadPool() : userExecutor;
+ ProductT service = buildImpl(executor);
+ // We shut down the executor only if we created it.
+ if (userExecutor == null) {
+ service.addListener(new ClosureHook() {
+ @Override
+ protected void onClosed() {
+ executor.shutdown();
+ }
+ }, MoreExecutors.directExecutor());
+ }
+ return service;
+ }
+
+ /**
+ * Builds and starts a service.
+ *
+ * <p>The service may not be running when this method returns. If you want to wait until it's up
+ * and running, either use {@link Service#awaitRunning()} or {@link #buildAndWaitForRunning()}.
+ *
+ * @return the service that has just been built and started
+ */
+ public final ProductT buildAndStart() {
+ ProductT service = build();
+ service.startAsync();
+ return service;
+ }
+
+ /**
+ * Builds and starts a service, and wait until it's up and running.
+ *
+ * @return the service that has just been built and is now running.
+ */
+ public final ProductT buildAndWaitForRunning() {
+ ProductT service = buildAndStart();
+ try {
+ service.awaitRunning();
+ } catch (Exception e) {
+ service.stopAsync();
+ throw Throwables.propagate(e);
+ }
+ return service;
+ }
+
+ /**
+ * Builds and starts a service, and wait until it's up and running, with a timeout.
+ *
+ * @return the service that has just been built and is now running.
+ * @throws TimeoutException if the service didn't become running within the given timeout.
+ */
+ public final ProductT buildAndWaitForRunning(long timeout, TimeUnit unit)
+ throws TimeoutException {
+ ProductT service = buildAndStart();
+ try {
+ service.awaitRunning(timeout, unit);
+ } catch (Exception e) {
+ service.stopAsync();
+ if (e instanceof TimeoutException) {
+ throw (TimeoutException) e;
+ } else {
+ throw Throwables.propagate(e);
+ }
+ }
+ return service;
+ }
+
+ /**
+ * Subclasses may use this as a convenient listener for cleaning up after the built service.
+ */
+ protected abstract static class ClosureHook extends Service.Listener {
+ protected abstract void onClosed();
+
+ @Override
+ public void terminated(Service.State from) {
+ onClosed();
+ }
+
+ @Override
+ public void failed(Service.State from, Throwable failure) {
+ onClosed();
+ }
+ }
+
+ /**
+ * Implemented by subclasses to build the actual service object. The given executor is owned by
+ * this base class.
+ */
+ protected abstract ProductT buildImpl(ExecutorService executor);
+}
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyChannelBuilder.java b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyChannelBuilder.java
new file mode 100644
index 0000000..c84737f
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyChannelBuilder.java
@@ -0,0 +1,76 @@
+package com.google.net.stubby.newtransport.netty;
+
+import com.google.common.util.concurrent.Service;
+import com.google.net.stubby.AbstractChannelBuilder;
+import com.google.net.stubby.newtransport.ClientTransportFactory;
+import com.google.net.stubby.newtransport.netty.NettyClientTransportFactory.NegotiationType;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Convenient class for building channels with the netty transport.
+ */
+public final class NettyChannelBuilder extends AbstractChannelBuilder<NettyChannelBuilder> {
+
+ private final InetSocketAddress serverAddress;
+
+ private NegotiationType negotiationType = NegotiationType.TLS;
+ private EventLoopGroup userEventLoopGroup;
+
+ /**
+ * Creates a new builder with the given server address.
+ */
+ public static NettyChannelBuilder forAddress(InetSocketAddress serverAddress) {
+ return new NettyChannelBuilder(serverAddress);
+ }
+
+ private NettyChannelBuilder(InetSocketAddress serverAddress) {
+ this.serverAddress = serverAddress;
+ }
+
+ /**
+ * Sets the negotiation type for the HTTP/2 connection.
+ *
+ * <p>Default: <code>TLS</code>
+ */
+ public NettyChannelBuilder negotiationType(NegotiationType type) {
+ negotiationType = type;
+ return this;
+ }
+
+ /**
+ * Provides an EventGroupLoop to be used by the netty transport.
+ *
+ * <p>It's an optional parameter. If the user has not provided an EventGroupLoop when the channel
+ * is built, the builder will create one.
+ *
+ * <p>The channel won't take ownership of the given EventLoopGroup. It's caller's responsibility
+ * to shut it down when it's desired.
+ */
+ public NettyChannelBuilder eventLoopGroup(EventLoopGroup group) {
+ userEventLoopGroup = group;
+ return this;
+ }
+
+ @Override
+ protected ChannelEssentials buildEssentials() {
+ final EventLoopGroup group = (userEventLoopGroup == null)
+ ? new NioEventLoopGroup() : userEventLoopGroup;
+ ClientTransportFactory transportFactory = new NettyClientTransportFactory(
+ serverAddress, negotiationType, group);
+ Service.Listener listener = null;
+ // We shut down the EventLoopGroup only if we created it.
+ if (userEventLoopGroup == null) {
+ listener = new ClosureHook() {
+ @Override
+ protected void onClosed() {
+ group.shutdownGracefully();
+ }
+ };
+ }
+ return new ChannelEssentials(transportFactory, listener);
+ }
+}
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyServerBuilder.java b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyServerBuilder.java
new file mode 100644
index 0000000..7a5bdab
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyServerBuilder.java
@@ -0,0 +1,92 @@
+package com.google.net.stubby.newtransport.netty;
+
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.Service;
+import com.google.net.stubby.AbstractServerBuilder;
+import com.google.net.stubby.HandlerRegistry;
+import com.google.net.stubby.newtransport.ServerListener;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+
+/**
+ * The convenient builder for a netty-based GRPC server.
+ */
+public final class NettyServerBuilder extends AbstractServerBuilder<NettyServerBuilder> {
+
+ private final int port;
+
+ private EventLoopGroup userBossEventLoopGroup;
+ private EventLoopGroup userWorkerEventLoopGroup;
+
+ public static NettyServerBuilder forPort(int port) {
+ return new NettyServerBuilder(port);
+ }
+
+ public static NettyServerBuilder forRegistryAndPort(HandlerRegistry registry, int port) {
+ return new NettyServerBuilder(registry, port);
+ }
+
+ private NettyServerBuilder(int port) {
+ this.port = port;
+ }
+
+ private NettyServerBuilder(HandlerRegistry registry, int port) {
+ super(registry);
+ this.port = port;
+ }
+
+ /**
+ * Provides the boss EventGroupLoop to the server.
+ *
+ * <p>It's an optional parameter. If the user has not provided one when the server is built, the
+ * builder will create one.
+ *
+ * <p>The server won't take ownership of the given EventLoopGroup. It's caller's responsibility
+ * to shut it down when it's desired.
+ */
+ public NettyServerBuilder userBossEventLoopGroup(EventLoopGroup group) {
+ this.userBossEventLoopGroup = group;
+ return this;
+ }
+
+ /**
+ * Provides the worker EventGroupLoop to the server.
+ *
+ * <p>It's an optional parameter. If the user has not provided one when the server is built, the
+ * builder will create one.
+ *
+ * <p>The server won't take ownership of the given EventLoopGroup. It's caller's responsibility
+ * to shut it down when it's desired.
+ */
+ public NettyServerBuilder workerEventLoopGroup(EventLoopGroup group) {
+ this.userWorkerEventLoopGroup = group;
+ return this;
+ }
+
+ @Override
+ protected Service buildTransportServer(ServerListener serverListener) {
+ final EventLoopGroup bossEventLoopGroup = (userBossEventLoopGroup == null)
+ ? new NioEventLoopGroup() : userBossEventLoopGroup;
+ final EventLoopGroup workerEventLoopGroup = (userWorkerEventLoopGroup == null)
+ ? new NioEventLoopGroup() : userWorkerEventLoopGroup;
+ NettyServer server = new NettyServer(serverListener, port);
+ if (userBossEventLoopGroup == null) {
+ server.addListener(new ClosureHook() {
+ @Override
+ protected void onClosed() {
+ bossEventLoopGroup.shutdownGracefully();
+ }
+ }, MoreExecutors.directExecutor());
+ }
+ if (userWorkerEventLoopGroup == null) {
+ server.addListener(new ClosureHook() {
+ @Override
+ protected void onClosed() {
+ workerEventLoopGroup.shutdownGracefully();
+ }
+ }, MoreExecutors.directExecutor());
+ }
+ return server;
+ }
+}