isReady() should return false until stream allocated

isReady() can provide pushback while the call is in progress, but it
can also provide the pushback necessary when the client creates more
streams than permitted by MAX_CONCURRENT_STREAMS.

As part of this commit, OkHttp is now calling onReady() after call
creation (it previously never called onReady()).
diff --git a/core/src/main/java/io/grpc/transport/AbstractServerStream.java b/core/src/main/java/io/grpc/transport/AbstractServerStream.java
index e5a267e..44f36f6 100644
--- a/core/src/main/java/io/grpc/transport/AbstractServerStream.java
+++ b/core/src/main/java/io/grpc/transport/AbstractServerStream.java
@@ -72,7 +72,7 @@
 
     // Now that the stream has actually been initialized, call the listener's onReady callback if
     // appropriate.
-    notifyIfReady();
+    onStreamAllocated();
   }
 
   @Override
diff --git a/core/src/main/java/io/grpc/transport/AbstractStream.java b/core/src/main/java/io/grpc/transport/AbstractStream.java
index e2ba6de..9ff3894 100644
--- a/core/src/main/java/io/grpc/transport/AbstractStream.java
+++ b/core/src/main/java/io/grpc/transport/AbstractStream.java
@@ -87,7 +87,14 @@
    * Indicates whether the listener is currently eligible for notification of
    * {@link StreamListener#onReady()}.
    */
+  @GuardedBy("onReadyLock")
   private boolean shouldNotifyOnReady = true;
+  /**
+   * Indicates the stream has been created on the connection. This implies that the stream is no
+   * longer limited by MAX_CONCURRENT_STREAMS.
+   */
+  @GuardedBy("onReadyLock")
+  private boolean allocated;
 
   private final Object onReadyLock = new Object();
 
@@ -144,11 +151,19 @@
    * Sets the number of queued bytes for a given stream, below which
    * {@link StreamListener#onReady()} will be called. If not called, defaults to
    * {@link #DEFAULT_ONREADY_THRESHOLD}.
+   *
+   * <p>This must be called from the transport thread, since a listener may be called back directly.
    */
   public void setOnReadyThreshold(int onReadyThreshold) {
     checkArgument(onReadyThreshold > 0, "onReadyThreshold must be > 0");
-    this.onReadyThreshold = onReadyThreshold;
-    notifyIfReady();
+    boolean doNotify;
+    synchronized (onReadyLock) {
+      this.onReadyThreshold = onReadyThreshold;
+      doNotify = needToNotifyOnReady();
+    }
+    if (doNotify) {
+      listener().onReady();
+    }
   }
 
   @Override
@@ -172,7 +187,7 @@
   public final boolean isReady() {
     if (listener() != null && outboundPhase() != Phase.STATUS) {
       synchronized (onReadyLock) {
-        if (numSentBytesQueued < onReadyThreshold) {
+        if (allocated && numSentBytesQueued < onReadyThreshold) {
           return true;
         }
       }
@@ -289,6 +304,26 @@
   }
 
   /**
+   * Event handler to be called by the subclass when the stream's headers have passed any connection
+   * flow control (i.e., MAX_CONCURRENT_STREAMS). It may call the listener's {@link
+   * StreamListener#onReady()} handler if appropriate. This must be called from the transport
+   * thread, since the listener may be called back directly.
+   */
+  protected void onStreamAllocated() {
+    boolean doNotify;
+    synchronized (onReadyLock) {
+      if (allocated) {
+        throw new IllegalStateException("Already allocated");
+      }
+      allocated = true;
+      doNotify = needToNotifyOnReady();
+    }
+    if (doNotify) {
+      listener().onReady();
+    }
+  }
+
+  /**
    * Event handler to be called by the subclass when a number of bytes are being queued for sending
    * to the remote endpoint.
    *
@@ -322,21 +357,6 @@
   }
 
   /**
-   * Utility method for subclasses which calls back the listener's {@link StreamListener#onReady()}
-   * handler if appropriate. This must be called from the transport thread, since the listener
-   * is called back directly.
-   */
-  protected final void notifyIfReady() {
-    boolean doNotify;
-    synchronized (onReadyLock) {
-      doNotify = needToNotifyOnReady();
-    }
-    if (doNotify) {
-      listener().onReady();
-    }
-  }
-
-  /**
    * Determines whether or not we need to call the {@link StreamListener#onReady()} handler now.
    * Calling this method has the side-effect of unsetting {@link #shouldNotifyOnReady} so the
    * handler should always be invoked immediately after calling this method.
diff --git a/netty/src/main/java/io/grpc/transport/netty/NettyClientStream.java b/netty/src/main/java/io/grpc/transport/netty/NettyClientStream.java
index 4301012..948c4e5 100644
--- a/netty/src/main/java/io/grpc/transport/netty/NettyClientStream.java
+++ b/netty/src/main/java/io/grpc/transport/netty/NettyClientStream.java
@@ -92,7 +92,7 @@
 
     // Now that the stream has actually been initialized, call the listener's onReady callback if
     // appropriate.
-    notifyIfReady();
+    onStreamAllocated();
   }
 
   /**
diff --git a/netty/src/test/java/io/grpc/transport/netty/NettyClientStreamTest.java b/netty/src/test/java/io/grpc/transport/netty/NettyClientStreamTest.java
index da02a62..bb426d0 100644
--- a/netty/src/test/java/io/grpc/transport/netty/NettyClientStreamTest.java
+++ b/netty/src/test/java/io/grpc/transport/netty/NettyClientStreamTest.java
@@ -42,7 +42,9 @@
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.verify;
 
 import io.grpc.Metadata;
@@ -276,10 +278,14 @@
 
   @Test
   public void setHttp2StreamShouldNotifyReady() {
+    listener = mock(ClientStreamListener.class);
+    stream = new NettyClientStream(listener, channel, handler);
     stream().id(STREAM_ID);
     verify(listener, never()).onReady();
+    assertFalse(stream.isReady());
     stream().setHttp2Stream(http2Stream);
     verify(listener).onReady();
+    assertTrue(stream.isReady());
   }
 
   @Override
@@ -288,6 +294,8 @@
     assertTrue(stream.canSend());
     assertTrue(stream.canReceive());
     stream.id(STREAM_ID);
+    stream.setHttp2Stream(http2Stream);
+    reset(listener);
     return stream;
   }
 
diff --git a/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientStream.java b/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientStream.java
index 71cbf5e..718c47d 100644
--- a/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientStream.java
+++ b/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientStream.java
@@ -126,6 +126,16 @@
     this.id = id;
   }
 
+  /**
+   * Notification that this stream was allocated for the connection. This means the stream has
+   * passed through any delay caused by MAX_CONCURRENT_STREAMS.
+   */
+  public void allocated() {
+    // Now that the stream has actually been initialized, call the listener's onReady callback if
+    // appropriate.
+    onStreamAllocated();
+  }
+
   public void transportHeadersReceived(List<Header> headers, boolean endOfStream) {
     synchronized (lock) {
       if (endOfStream) {
diff --git a/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientTransport.java
index 09527b6..9c46bb6 100644
--- a/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientTransport.java
+++ b/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientTransport.java
@@ -251,6 +251,7 @@
     stream.id(nextStreamId);
     streams.put(stream.id(), stream);
     frameWriter.synStream(false, false, stream.id(), 0, requestHeaders);
+    stream.allocated();
     // For unary and server streaming, there will be a data frame soon, no need to flush the header.
     if (stream.getType() != MethodType.UNARY
         && stream.getType() != MethodType.SERVER_STREAMING) {