[1/3] Move AbstractTransportTest to third_party.

This required making it no longer depend on GrpcClient. Instead, we now
use the builders that make using GrpcClient almost completely obsolete.
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=78860648
diff --git a/integration-testing/src/main/java/com/google/net/stubby/testing/integration/AbstractTransportTest.java b/integration-testing/src/main/java/com/google/net/stubby/testing/integration/AbstractTransportTest.java
new file mode 100644
index 0000000..6804620
--- /dev/null
+++ b/integration-testing/src/main/java/com/google/net/stubby/testing/integration/AbstractTransportTest.java
@@ -0,0 +1,382 @@
+package com.google.net.stubby.testing.integration;
+
+import static com.google.net.stubby.testing.integration.Messages.PayloadType.COMPRESSABLE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.google.net.stubby.Call;
+import com.google.net.stubby.ChannelImpl;
+import com.google.net.stubby.Metadata;
+import com.google.net.stubby.Status;
+import com.google.net.stubby.proto.ProtoUtils;
+import com.google.net.stubby.stub.MetadataUtils;
+import com.google.net.stubby.stub.StreamObserver;
+import com.google.net.stubby.stub.StreamRecorder;
+import com.google.net.stubby.testing.integration.Messages.PayloadType;
+import com.google.net.stubby.testing.integration.Messages.SimpleRequest;
+import com.google.net.stubby.testing.integration.Messages.SimpleResponse;
+import com.google.net.stubby.testing.integration.Messages.StreamingInputCallRequest;
+import com.google.net.stubby.testing.integration.Messages.StreamingInputCallResponse;
+import com.google.net.stubby.testing.integration.Messages.StreamingOutputCallRequest;
+import com.google.net.stubby.testing.integration.Messages.StreamingOutputCallResponse;
+import com.google.net.stubby.transport.AbstractStream;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.EmptyProtos.Empty;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Abstract base class for all GRPC transport tests.
+ */
+public abstract class AbstractTransportTest {
+
+  public static final Metadata.Key<Messages.SimpleContext> METADATA_KEY =
+      ProtoUtils.keyForProto(Messages.SimpleContext.getDefaultInstance());
+
+  protected ChannelImpl channel;
+  protected TestServiceGrpc.TestServiceBlockingStub blockingStub;
+  protected TestServiceGrpc.TestService asyncStub;
+
+  /**
+   * Must be called by the subclass setup method.
+   */
+  @Before
+  public void setup() throws Exception {
+    channel = createChannel();
+    channel.startAsync();
+    channel.awaitRunning();
+    blockingStub = TestServiceGrpc.newBlockingStub(channel);
+    asyncStub = TestServiceGrpc.newStub(channel);
+  }
+
+  @After
+  public void teardown() throws Exception {
+    if (channel != null) {
+      channel.stopAsync();
+    }
+  }
+
+  protected abstract ChannelImpl createChannel();
+
+  @Test
+  public void emptyShouldSucceed() throws Exception {
+    Empty response = blockingStub.emptyCall(Empty.getDefaultInstance());
+    assertNotNull(response);
+  }
+
+  @Test
+  public void unaryCallShouldSucceed() throws Exception {
+    // Create the request.
+    SimpleResponse response = blockingStub.unaryCall(unaryRequest());
+    assertNotNull(response);
+    assertEquals(COMPRESSABLE, response.getPayload().getType());
+    assertEquals(10, response.getPayload().getBody().size());
+  }
+
+  @Test
+  public void streamingOutputCallShouldSucceed() throws Exception {
+    // Build the request.
+    List<Integer> responseSizes = Arrays.asList(50, 100, 150, 200);
+    StreamingOutputCallRequest.Builder streamingOutputBuilder =
+        StreamingOutputCallRequest.newBuilder();
+    streamingOutputBuilder.setResponseType(COMPRESSABLE);
+    for (Integer size : responseSizes) {
+      streamingOutputBuilder.addResponseParametersBuilder().setSize(size).setIntervalUs(0);
+    }
+    StreamingOutputCallRequest request = streamingOutputBuilder.build();
+
+    StreamRecorder<StreamingOutputCallResponse> recorder = StreamRecorder.create();
+    asyncStub.streamingOutputCall(request, recorder);
+    recorder.awaitCompletion();
+    assertSuccess(recorder);
+    assertEquals(responseSizes.size(), recorder.getValues().size());
+    for (int ix = 0; ix < recorder.getValues().size(); ++ix) {
+      StreamingOutputCallResponse response = recorder.getValues().get(ix);
+      assertEquals(COMPRESSABLE, response.getPayload().getType());
+      int length = response.getPayload().getBody().size();
+      assertEquals("comparison failed at index " + ix, responseSizes.get(ix).intValue(), length);
+    }
+  }
+
+  @Test
+  public void streamingInputCallShouldSucceed() throws Exception {
+    // Build the request.
+    String message = "hello world!";
+    StreamingInputCallRequest.Builder streamingInputBuilder =
+        StreamingInputCallRequest.newBuilder();
+    streamingInputBuilder.getPayloadBuilder().setType(PayloadType.COMPRESSABLE)
+        .setBody(ByteString.copyFromUtf8(message));
+    final StreamingInputCallRequest request = streamingInputBuilder.build();
+
+    StreamRecorder<StreamingInputCallResponse> recorder = StreamRecorder.create();
+    StreamObserver<StreamingInputCallRequest> requestStream =
+        asyncStub.streamingInputCall(recorder);
+    for (int ix = 10; ix > 0; --ix) {
+      // Send the request and close the request stream.
+      requestStream.onValue(request);
+    }
+    requestStream.onCompleted();
+    recorder.awaitCompletion();
+    assertSuccess(recorder);
+    assertEquals(1, recorder.getValues().size());
+    assertEquals(10 * message.length(), recorder.getValues().get(0).getAggregatedPayloadSize());
+  }
+
+  @Test
+  public void fullDuplexCallShouldSucceed() throws Exception {
+    // Build the request.
+    List<Integer> responseSizes = Arrays.asList(50, 100, 150, 200);
+    StreamingOutputCallRequest.Builder streamingOutputBuilder =
+        StreamingOutputCallRequest.newBuilder();
+    streamingOutputBuilder.setResponseType(COMPRESSABLE);
+    for (Integer size : responseSizes) {
+      streamingOutputBuilder.addResponseParametersBuilder().setSize(size).setIntervalUs(0);
+    }
+    final StreamingOutputCallRequest request = streamingOutputBuilder.build();
+
+    StreamRecorder<StreamingOutputCallResponse> recorder = StreamRecorder.create();
+    StreamObserver<StreamingOutputCallRequest> requestStream =
+        blockingStub.fullDuplexCall(recorder);
+
+    final int numRequests = 10;
+    for (int ix = numRequests; ix > 0; --ix) {
+      requestStream.onValue(request);
+    }
+    requestStream.onCompleted();
+    recorder.awaitCompletion();
+    assertSuccess(recorder);
+    assertEquals(responseSizes.size() * numRequests, recorder.getValues().size());
+    for (int ix = 0; ix < recorder.getValues().size(); ++ix) {
+      StreamingOutputCallResponse response = recorder.getValues().get(ix);
+      assertEquals(COMPRESSABLE, response.getPayload().getType());
+      int length = response.getPayload().getBody().size();
+      int expectedSize = responseSizes.get(ix % responseSizes.size());
+      assertEquals("comparison failed at index " + ix, expectedSize, length);
+    }
+  }
+
+  @Test
+  public void halfDuplexCallShouldSucceed() throws Exception {
+    // Build the request.
+    List<Integer> responseSizes = Arrays.asList(50, 100, 150, 200);
+    StreamingOutputCallRequest.Builder streamingOutputBuilder =
+        StreamingOutputCallRequest.newBuilder();
+    streamingOutputBuilder.setResponseType(COMPRESSABLE);
+    for (Integer size : responseSizes) {
+      streamingOutputBuilder.addResponseParametersBuilder().setSize(size).setIntervalUs(0);
+    }
+    final StreamingOutputCallRequest request = streamingOutputBuilder.build();
+
+    StreamRecorder<StreamingOutputCallResponse> recorder = StreamRecorder.create();
+    StreamObserver<StreamingOutputCallRequest> requestStream = asyncStub.halfDuplexCall(recorder);
+
+    final int numRequests = 10;
+    for (int ix = numRequests; ix > 0; --ix) {
+      requestStream.onValue(request);
+    }
+    requestStream.onCompleted();
+    recorder.awaitCompletion();
+    assertSuccess(recorder);
+    assertEquals(responseSizes.size() * numRequests, recorder.getValues().size());
+    for (int ix = 0; ix < recorder.getValues().size(); ++ix) {
+      StreamingOutputCallResponse response = recorder.getValues().get(ix);
+      assertEquals(COMPRESSABLE, response.getPayload().getType());
+      int length = response.getPayload().getBody().size();
+      int expectedSize = responseSizes.get(ix % responseSizes.size());
+      assertEquals("comparison failed at index " + ix, expectedSize, length);
+    }
+  }
+
+  @Test
+  public void streamingOutputShouldBeFlowControlled() throws Exception {
+    // Create the call object.
+    Call<StreamingOutputCallRequest, StreamingOutputCallResponse> call =
+        channel.newCall(TestServiceGrpc.CONFIG.streamingOutputCall);
+
+    // Build the request.
+    List<Integer> responseSizes = Arrays.asList(50, 100, 150, 200);
+    StreamingOutputCallRequest.Builder streamingOutputBuilder =
+        StreamingOutputCallRequest.newBuilder();
+    streamingOutputBuilder.setResponseType(COMPRESSABLE);
+    for (Integer size : responseSizes) {
+      streamingOutputBuilder.addResponseParametersBuilder().setSize(size).setIntervalUs(0);
+    }
+    StreamingOutputCallRequest request = streamingOutputBuilder.build();
+
+    // Start the call and prepare capture of results.
+    final List<StreamingOutputCallResponse> results =
+        Collections.synchronizedList(new ArrayList<StreamingOutputCallResponse>());
+    final List<SettableFuture<Void>> processedFutures =
+        Collections.synchronizedList(new LinkedList<SettableFuture<Void>>());
+    final SettableFuture<Void> completionFuture = SettableFuture.create();
+    call.start(new Call.Listener<StreamingOutputCallResponse>() {
+
+      @Override
+      public ListenableFuture<Void> onHeaders(Metadata.Headers headers) {
+        return null;
+      }
+
+      @Override
+      public ListenableFuture<Void> onPayload(final StreamingOutputCallResponse payload) {
+        SettableFuture<Void> processedFuture = SettableFuture.create();
+        results.add(payload);
+        processedFutures.add(processedFuture);
+        return processedFuture;
+      }
+
+      @Override
+      public void onClose(Status status, Metadata.Trailers trailers) {
+        if (status.isOk()) {
+          completionFuture.set(null);
+        } else {
+          completionFuture.setException(status.asException());
+        }
+      }
+    }, new Metadata.Headers());
+
+    // Send the request.
+    call.sendPayload(request);
+    call.halfClose();
+
+    // Slowly set completion on all of the futures.
+    int expectedResults = responseSizes.size();
+    int count = 0;
+    while (count < expectedResults) {
+      if (!processedFutures.isEmpty()) {
+        assertEquals(1, processedFutures.size());
+        assertEquals(count + 1, results.size());
+        count++;
+
+        // Remove and set the first future to allow receipt of additional messages
+        // from flow control.
+        processedFutures.remove(0).set(null);
+      }
+
+      // Sleep a bit.
+      Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+    }
+
+    // Wait for successful completion of the response.
+    completionFuture.get();
+
+    assertEquals(responseSizes.size(), results.size());
+    for (int ix = 0; ix < results.size(); ++ix) {
+      StreamingOutputCallResponse response = results.get(ix);
+      assertEquals(COMPRESSABLE, response.getPayload().getType());
+      int length = response.getPayload().getBody().size();
+      assertEquals("comparison failed at index " + ix, responseSizes.get(ix).intValue(), length);
+    }
+  }
+
+  @org.junit.Test
+  public void exchangeContextUnaryCall() throws Exception {
+    Assume.assumeTrue(AbstractStream.GRPC_V2_PROTOCOL);
+    TestServiceGrpc.TestServiceBlockingStub stub =
+        TestServiceGrpc.newBlockingStub(channel);
+
+    // Capture the context exchange
+    Metadata.Headers fixedHeaders = new Metadata.Headers();
+    // Send a context proto (as it's in the default extension registry)
+    Messages.SimpleContext contextValue =
+        Messages.SimpleContext.newBuilder().setValue("dog").build();
+    fixedHeaders.put(METADATA_KEY, contextValue);
+    stub = MetadataUtils.attachHeaders(stub, fixedHeaders);
+    // .. and expect it to be echoed back in trailers
+    AtomicReference<Metadata.Trailers> trailersCapture = new AtomicReference<>();
+    AtomicReference<Metadata.Headers> headersCapture = new AtomicReference<>();
+    stub = MetadataUtils.captureMetadata(stub, headersCapture, trailersCapture);
+
+    Assert.assertNotNull(stub.unaryCall(unaryRequest()));
+
+    // Assert that our side channel object is echoed back in both headers and trailers
+    Assert.assertEquals(contextValue, headersCapture.get().get(METADATA_KEY));
+    if (AbstractStream.GRPC_V2_PROTOCOL) {
+      Assert.assertEquals(contextValue, trailersCapture.get().get(METADATA_KEY));
+    }
+  }
+
+  @org.junit.Test
+  public void exchangeContextStreamingCall() throws Exception {
+    Assume.assumeTrue(AbstractStream.GRPC_V2_PROTOCOL);
+    TestServiceGrpc.TestServiceBlockingStub stub =
+        TestServiceGrpc.newBlockingStub(channel);
+
+    // Capture the context exchange
+    Metadata.Headers fixedHeaders = new Metadata.Headers();
+    // Send a context proto (as it's in the default extension registry)
+    Messages.SimpleContext contextValue =
+        Messages.SimpleContext.newBuilder().setValue("dog").build();
+    fixedHeaders.put(METADATA_KEY, contextValue);
+    stub = MetadataUtils.attachHeaders(stub, fixedHeaders);
+    // .. and expect it to be echoed back in trailers
+    AtomicReference<Metadata.Trailers> trailersCapture = new AtomicReference<>();
+    AtomicReference<Metadata.Headers> headersCapture = new AtomicReference<>();
+    stub = MetadataUtils.captureMetadata(stub, headersCapture, trailersCapture);
+
+    List<Integer> responseSizes = Arrays.asList(50, 100, 150, 200);
+    Messages.StreamingOutputCallRequest.Builder streamingOutputBuilder =
+        Messages.StreamingOutputCallRequest.newBuilder();
+    streamingOutputBuilder.setResponseType(COMPRESSABLE);
+    for (Integer size : responseSizes) {
+      streamingOutputBuilder.addResponseParametersBuilder().setSize(size).setIntervalUs(0);
+    }
+    final Messages.StreamingOutputCallRequest request = streamingOutputBuilder.build();
+
+    StreamRecorder<Messages.StreamingOutputCallResponse> recorder = StreamRecorder.create();
+    StreamObserver<Messages.StreamingOutputCallRequest> requestStream =
+        stub.fullDuplexCall(recorder);
+
+    final int numRequests = 10;
+    for (int ix = numRequests; ix > 0; --ix) {
+      requestStream.onValue(request);
+    }
+    requestStream.onCompleted();
+    recorder.awaitCompletion();
+    assertSuccess(recorder);
+    org.junit.Assert.assertEquals(responseSizes.size() * numRequests, recorder.getValues().size());
+
+    // Assert that our side channel object is echoed back in both headers and trailers
+    Assert.assertEquals(contextValue, headersCapture.get().get(METADATA_KEY));
+    if (AbstractStream.GRPC_V2_PROTOCOL) {
+      Assert.assertEquals(contextValue, trailersCapture.get().get(METADATA_KEY));
+    }
+  }
+
+
+  protected int unaryPayloadLength() {
+    // 10MiB.
+    return 10485760;
+  }
+
+  protected SimpleRequest unaryRequest() {
+    SimpleRequest.Builder unaryBuilder = SimpleRequest.newBuilder();
+    unaryBuilder.getPayloadBuilder().setType(PayloadType.COMPRESSABLE);
+    byte[] data = new byte[unaryPayloadLength()];
+    new Random().nextBytes(data);
+    unaryBuilder.getPayloadBuilder().setBody(ByteString.copyFrom(data));
+    unaryBuilder.setResponseSize(10).setResponseType(PayloadType.COMPRESSABLE);
+    return unaryBuilder.build();
+  }
+
+  protected static void assertSuccess(StreamRecorder<?> recorder) {
+    if (recorder.getError() != null) {
+      throw new AssertionError("Error in stream", recorder.getError());
+    }
+  }
+}
diff --git a/okhttp/src/main/java/com/google/net/stubby/transport/okhttp/OkHttpChannelBuilder.java b/okhttp/src/main/java/com/google/net/stubby/transport/okhttp/OkHttpChannelBuilder.java
new file mode 100644
index 0000000..0eab849
--- /dev/null
+++ b/okhttp/src/main/java/com/google/net/stubby/transport/okhttp/OkHttpChannelBuilder.java
@@ -0,0 +1,78 @@
+package com.google.net.stubby.transport.okhttp;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Service;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.net.stubby.AbstractChannelBuilder;
+import com.google.net.stubby.SharedResourceHolder;
+import com.google.net.stubby.SharedResourceHolder.Resource;
+import com.google.net.stubby.transport.ClientTransportFactory;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+
+/** Convenience class for building channels with the OkHttp transport. */
+public final class OkHttpChannelBuilder extends AbstractChannelBuilder<OkHttpChannelBuilder> {
+  private static final Resource<ExecutorService> DEFAULT_TRANSPORT_THREAD_POOL
+      = new Resource<ExecutorService>() {
+        @Override
+        public ExecutorService create() {
+          return Executors.newCachedThreadPool(new ThreadFactoryBuilder()
+              .setNameFormat("grpc-okhttp-%d")
+              .build());
+        }
+
+        @Override
+        public void close(ExecutorService executor) {
+          executor.shutdown();
+        }
+      };
+
+  /** Creates a new builder for the given server address. */
+  public static OkHttpChannelBuilder forAddress(InetSocketAddress serverAddress) {
+    return new OkHttpChannelBuilder(serverAddress);
+  }
+
+  /** Creates a new builder for the given server host and port. */
+  public static OkHttpChannelBuilder forAddress(String host, int port) {
+    return forAddress(new InetSocketAddress(host, port));
+  }
+
+  private final InetSocketAddress serverAddress;
+  private ExecutorService transportExecutor;
+
+  private OkHttpChannelBuilder(InetSocketAddress serverAddress) {
+    this.serverAddress = Preconditions.checkNotNull(serverAddress, "serverAddress");
+  }
+
+  /**
+   * Override the default executor necessary for internal transport use.
+   *
+   * <p>The channel does not take ownership of the given executor. It is the caller' responsibility
+   * to shutdown the executor when appropriate.
+   */
+  public OkHttpChannelBuilder transportExecutor(ExecutorService executor) {
+    this.transportExecutor = executor;
+    return this;
+  }
+
+  @Override
+  protected ChannelEssentials buildEssentials() {
+    final ExecutorService executor = (transportExecutor == null)
+        ? SharedResourceHolder.get(DEFAULT_TRANSPORT_THREAD_POOL) : transportExecutor;
+    ClientTransportFactory transportFactory
+        = new OkHttpClientTransportFactory(serverAddress, executor);
+    Service.Listener listener = null;
+    // We shut down the executor only if we created it.
+    if (transportExecutor == null) {
+      listener = new ClosureHook() {
+        @Override
+        protected void onClosed() {
+          SharedResourceHolder.release(DEFAULT_TRANSPORT_THREAD_POOL, executor);
+        }
+      };
+    }
+    return new ChannelEssentials(transportFactory, listener);
+  }
+}