[2/2] Swap Operation wrapping from Channel to Transport.

Session is now (properly) implementing transport API, so ChannelImpl has some
testing.
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=69074094
diff --git a/core/src/main/java/com/google/net/stubby/ChannelImpl.java b/core/src/main/java/com/google/net/stubby/ChannelImpl.java
index acf4861..013d3c4 100644
--- a/core/src/main/java/com/google/net/stubby/ChannelImpl.java
+++ b/core/src/main/java/com/google/net/stubby/ChannelImpl.java
@@ -43,6 +43,8 @@
   public ChannelImpl(ClientTransportFactory transportFactory, ExecutorService executor) {
     this.transportFactory = transportFactory;
     this.executor = executor;
+    // FIXME(ejona): Remove once we have our top-level lifecycle.
+    startAsync();
   }
 
   @Override
diff --git a/core/src/main/java/com/google/net/stubby/SessionClientStream.java b/core/src/main/java/com/google/net/stubby/SessionClientStream.java
new file mode 100644
index 0000000..94f8f2e
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/SessionClientStream.java
@@ -0,0 +1,157 @@
+package com.google.net.stubby;
+
+import com.google.net.stubby.AbstractResponse;
+import com.google.net.stubby.Operation;
+import com.google.net.stubby.Request;
+import com.google.net.stubby.Response;
+import com.google.net.stubby.Session;
+import com.google.net.stubby.Status;
+import com.google.net.stubby.newtransport.ClientStream;
+import com.google.net.stubby.newtransport.StreamListener;
+import com.google.net.stubby.newtransport.StreamState;
+import com.google.net.stubby.transport.Transport;
+
+import java.io.InputStream;
+import java.io.IOException;
+
+/**
+ * A temporary shim layer between the new (Channel) and the old (Session). Will go away when the
+ * new transport layer is created.
+ */
+// TODO(user): Delete this class when new transport interfaces are introduced
+class SessionClientStream implements ClientStream {
+  private final StreamListener listener;
+  /**
+   * The {@link Request} used by the stub to dispatch the call
+   */
+  private Request request;
+  private Response response;
+
+  public SessionClientStream(StreamListener listener) {
+    this.listener = listener;
+  }
+
+  public void start(Request request) {
+    this.request = request;
+  }
+
+  public Response.ResponseBuilder responseBuilder() {
+    return new Response.ResponseBuilder() {
+      @Override
+      public Response build(int id) {
+        response = new SessionResponse(id);
+        return response;
+      }
+
+      @Override
+      public Response build() {
+        response = new SessionResponse(-1);
+        return response;
+      }
+    };
+  }
+
+  @Override
+  public StreamState state() {
+    boolean requestOpen = request.getPhase() != Operation.Phase.CLOSED;
+    boolean responseOpen = response.getPhase() != Operation.Phase.CLOSED;
+    if (requestOpen && responseOpen) {
+      return StreamState.OPEN;
+    } else if (requestOpen) {
+      return StreamState.WRITE_ONLY;
+    } else if (responseOpen) {
+      return StreamState.READ_ONLY;
+    } else {
+      return StreamState.CLOSED;
+    }
+  }
+
+  @Override
+  public void close() {
+    request.close(Status.OK);
+  }
+
+  @Override
+  public void writeContext(String name, InputStream value, int length, Runnable accepted) {
+    request.addContext(name, value, Operation.Phase.HEADERS);
+    if (accepted != null) {
+      accepted.run();
+    }
+  }
+
+  @Override
+  public void writeMessage(InputStream message, int length, Runnable accepted) {
+    request.addPayload(message, Operation.Phase.PAYLOAD);
+    if (accepted != null) {
+      accepted.run();
+    }
+  }
+
+  @Override
+  public void flush() {}
+
+  /**
+   * An error occurred while producing the request output. Cancel the request
+   * and close the response stream.
+   */
+  @Override
+  public void cancel() {
+    request.close(new Status(Transport.Code.CANCELLED));
+  }
+
+  /**
+   * Adapts the transport layer response to calls on the response observer or
+   * recorded context state.
+   */
+  private class SessionResponse extends AbstractResponse {
+
+    private SessionResponse(int id) {
+      super(id);
+    }
+
+    private int available(InputStream is) {
+      try {
+        return is.available();
+      } catch (IOException ex) {
+        throw new RuntimeException(ex);
+      }
+    }
+
+    @Override
+    public Operation addContext(String type, InputStream message, Phase nextPhase) {
+      try {
+        listener.contextRead(type, message, available(message));
+        return super.addContext(type, message, nextPhase);
+      } finally {
+        if (getPhase() == Phase.CLOSED) {
+          propagateClosed();
+        }
+      }
+    }
+
+    @Override
+    public Operation addPayload(InputStream payload, Phase nextPhase) {
+      try {
+        listener.messageRead(payload, available(payload));
+        return super.addPayload(payload, nextPhase);
+      } finally {
+        if (getPhase() == Phase.CLOSED) {
+          propagateClosed();
+        }
+      }
+    }
+
+    @Override
+    public Operation close(Status status) {
+      try {
+        return super.close(status);
+      } finally {
+        propagateClosed();
+      }
+    }
+
+    private void propagateClosed() {
+      listener.closed(getStatus());
+    }
+  }
+}
diff --git a/core/src/main/java/com/google/net/stubby/SessionClientTransportFactory.java b/core/src/main/java/com/google/net/stubby/SessionClientTransportFactory.java
new file mode 100644
index 0000000..9c32b77
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/SessionClientTransportFactory.java
@@ -0,0 +1,48 @@
+package com.google.net.stubby;
+
+import com.google.common.util.concurrent.AbstractService;
+import com.google.net.stubby.MethodDescriptor;
+import com.google.net.stubby.newtransport.ClientStream;
+import com.google.net.stubby.newtransport.ClientTransport;
+import com.google.net.stubby.newtransport.ClientTransportFactory;
+import com.google.net.stubby.newtransport.StreamListener;
+
+/**
+ * Shim between Session and Channel. Will be removed when Session is removed.
+ *
+ * <p>This factory always returns the same instance, which does not adhere to the API.
+ */
+public class SessionClientTransportFactory implements ClientTransportFactory {
+  private final SessionClientTransport transport;
+
+  public SessionClientTransportFactory(Session session) {
+    transport = new SessionClientTransport(session);
+  }
+
+  @Override
+  public ClientTransport newClientTransport() {
+    return transport;
+  }
+
+  private static class SessionClientTransport extends AbstractService implements ClientTransport {
+    private final Session session;
+
+    public SessionClientTransport(Session session) {
+      this.session = session;
+    }
+
+    @Override
+    protected void doStart() {}
+
+    @Override
+    public void doStop() {}
+
+    @Override
+    public ClientStream newStream(MethodDescriptor<?, ?> method, StreamListener listener) {
+      final SessionClientStream stream = new SessionClientStream(listener);
+      Request request = session.startRequest(method.getName(), stream.responseBuilder());
+      stream.start(request);
+      return stream;
+    }
+  }
+}
diff --git a/stub/src/main/java/com/google/net/stubby/stub/SessionCall.java b/stub/src/main/java/com/google/net/stubby/stub/SessionCall.java
deleted file mode 100644
index 6ea9381..0000000
--- a/stub/src/main/java/com/google/net/stubby/stub/SessionCall.java
+++ /dev/null
@@ -1,131 +0,0 @@
-package com.google.net.stubby.stub;
-
-import com.google.common.util.concurrent.SettableFuture;
-import com.google.net.stubby.AbstractResponse;
-import com.google.net.stubby.Call;
-import com.google.net.stubby.MethodDescriptor;
-import com.google.net.stubby.Operation;
-import com.google.net.stubby.Request;
-import com.google.net.stubby.Response;
-import com.google.net.stubby.Session;
-import com.google.net.stubby.Status;
-import com.google.net.stubby.transport.Transport;
-
-import java.io.InputStream;
-
-/**
- * A temporary shim layer between the new (Channel) and the old (Session). Will go away when the
- * new transport layer is created.
- */
-// TODO(user): Delete this class when new transport interfaces are introduced
-public class SessionCall<RequestT, ResponseT> extends Call<RequestT, ResponseT> {
-  /**
-   * The {@link Request} used by the stub to dispatch the call
-   */
-  private Request request;
-
-  private Listener<ResponseT> responseListener;
-
-  private final MethodDescriptor<RequestT, ResponseT> methodDescriptor;
-  private final Session session;
-
-  protected SessionCall(MethodDescriptor<RequestT, ResponseT> methodDescriptor, Session session) {
-    // This will go away when we introduce new transport API.... nothing to see here
-    this.methodDescriptor = methodDescriptor;
-    this.session = session;
-  }
-
-  @Override
-  public void start(Listener<ResponseT> responseListener) {
-    request = session.startRequest(methodDescriptor.getName(), new Response.ResponseBuilder() {
-      @Override
-      public Response build(int id) {
-        return new CallResponse(id);
-      }
-
-      @Override
-      public Response build() {
-        return new CallResponse(-1);
-      }
-    });
-    this.responseListener = responseListener;
-  }
-
-  @Override
-  public void sendPayload(RequestT value, SettableFuture<Void> future) {
-    request.addPayload(methodDescriptor.streamRequest(value), Operation.Phase.PAYLOAD);
-    if (future != null) {
-      future.set(null);
-    }
-  }
-
-  @Override
-  public void sendContext(String name, InputStream value, SettableFuture<Void> future) {
-    request.addContext(name, value, Operation.Phase.HEADERS);
-    if (future != null) {
-      future.set(null);
-    }
-  }
-
-  /**
-   * An error occurred while producing the request output. Cancel the request
-   * and close the response stream.
-   */
-  @Override
-  public void cancel() {
-    request.close(new Status(Transport.Code.CANCELLED));
-  }
-
-  @Override
-  public void halfClose() {
-    request.close(Status.OK);
-  }
-
-  /**
-   * Adapts the transport layer response to calls on the response observer or
-   * recorded context state.
-   */
-  private class CallResponse extends AbstractResponse {
-
-    private CallResponse(int id) {
-      super(id);
-    }
-
-    @Override
-    public Operation addContext(String type, InputStream message, Phase nextPhase) {
-      try {
-        responseListener.onContext(type, message);
-        return super.addContext(type, message, nextPhase);
-      } finally {
-        if (getPhase() == Phase.CLOSED) {
-          propagateClosed();
-        }
-      }
-    }
-
-    @Override
-    public Operation addPayload(InputStream payload, Phase nextPhase) {
-      try {
-        responseListener.onPayload(methodDescriptor.parseResponse(payload));
-        return super.addPayload(payload, nextPhase);
-      } finally {
-        if (getPhase() == Phase.CLOSED) {
-          propagateClosed();
-        }
-      }
-    }
-
-    @Override
-    public Operation close(Status status) {
-      try {
-        return super.close(status);
-      } finally {
-        propagateClosed();
-      }
-    }
-
-    private void propagateClosed() {
-      responseListener.onClose(getStatus());
-    }
-  }
-}
diff --git a/stub/src/main/java/com/google/net/stubby/stub/SessionChannel.java b/stub/src/main/java/com/google/net/stubby/stub/SessionChannel.java
deleted file mode 100644
index 088418e..0000000
--- a/stub/src/main/java/com/google/net/stubby/stub/SessionChannel.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package com.google.net.stubby.stub;
-
-import com.google.net.stubby.Channel;
-import com.google.net.stubby.MethodDescriptor;
-import com.google.net.stubby.Session;
-
-/**
- * This class is a shim between Session &amp; Channel. Will be removed when the new transport
- * API is introduced.
- */
-public class SessionChannel implements Channel {
-  private final Session session;
-
-  public SessionChannel(Session session) {
-    this.session = session;
-  }
-
-  @Override
-  public <ReqT, RespT> SessionCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method) {
-    return new SessionCall<ReqT, RespT>(method, session);
-  }
-}