The Beta API Channel
diff --git a/src/python/grpcio/grpc/_adapter/_c/types/channel.c b/src/python/grpcio/grpc/_adapter/_c/types/channel.c
index c577ac0..cf866dd 100644
--- a/src/python/grpcio/grpc/_adapter/_c/types/channel.c
+++ b/src/python/grpcio/grpc/_adapter/_c/types/channel.c
@@ -164,7 +164,7 @@
   int last_observed_state;
   CompletionQueue *completion_queue;
   char *keywords[] = {"last_observed_state", "deadline",
-                      "completion_queue", "tag"};
+                      "completion_queue", "tag", NULL};
   if (!PyArg_ParseTupleAndKeywords(
       args, kwargs, "idO!O:watch_connectivity_state", keywords,
       &last_observed_state, &deadline, &pygrpc_CompletionQueue_type,
diff --git a/src/python/grpcio/grpc/beta/__init__.py b/src/python/grpcio/grpc/beta/__init__.py
new file mode 100644
index 0000000..b893988
--- /dev/null
+++ b/src/python/grpcio/grpc/beta/__init__.py
@@ -0,0 +1,28 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/src/python/grpcio/grpc/beta/_connectivity_channel.py b/src/python/grpcio/grpc/beta/_connectivity_channel.py
new file mode 100644
index 0000000..457ede7
--- /dev/null
+++ b/src/python/grpcio/grpc/beta/_connectivity_channel.py
@@ -0,0 +1,148 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Affords a connectivity-state-listenable channel."""
+
+import threading
+import time
+
+from grpc._adapter import _low
+from grpc.framework.foundation import callable_util
+
+_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = (
+    'Exception calling channel subscription callback!')
+
+
+class ConnectivityChannel(object):
+
+  def __init__(self, low_channel, mapping):
+    self._lock = threading.Lock()
+    self._low_channel = low_channel
+    self._mapping = mapping
+
+    self._polling = False
+    self._connectivity = None
+    self._try_to_connect = False
+    self._callbacks_and_connectivities = []
+    self._delivering = False
+
+  def _deliveries(self, connectivity):
+    callbacks_needing_update = []
+    for callback_and_connectivity in self._callbacks_and_connectivities:
+      callback, callback_connectivity = callback_and_connectivity
+      if callback_connectivity is not connectivity:
+        callbacks_needing_update.append(callback)
+        callback_and_connectivity[1] = connectivity
+    return callbacks_needing_update
+
+  def _deliver(self, initial_connectivity, initial_callbacks):
+    connectivity = initial_connectivity
+    callbacks = initial_callbacks
+    while True:
+      for callback in callbacks:
+        callable_util.call_logging_exceptions(
+            callback, _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE,
+            connectivity)
+      with self._lock:
+        callbacks = self._deliveries(self._connectivity)
+        if callbacks:
+          connectivity = self._connectivity
+        else:
+          self._delivering = False
+          return
+
+  def _spawn_delivery(self, connectivity, callbacks):
+    delivering_thread = threading.Thread(
+        target=self._deliver, args=(connectivity, callbacks,))
+    delivering_thread.start()
+    self._delivering = True
+
+  # TODO(issue 3064): Don't poll.
+  def _poll_connectivity(self, low_channel, initial_try_to_connect):
+    try_to_connect = initial_try_to_connect
+    low_connectivity = low_channel.check_connectivity_state(try_to_connect)
+    with self._lock:
+      self._connectivity = self._mapping[low_connectivity]
+      callbacks = tuple(
+          callback for callback, unused_but_known_to_be_none_connectivity
+          in self._callbacks_and_connectivities)
+      for callback_and_connectivity in self._callbacks_and_connectivities:
+        callback_and_connectivity[1] = self._connectivity
+      if callbacks:
+        self._spawn_delivery(self._connectivity, callbacks)
+    completion_queue = _low.CompletionQueue()
+    while True:
+      low_channel.watch_connectivity_state(
+          low_connectivity, time.time() + 0.2, completion_queue, None)
+      event = completion_queue.next()
+      with self._lock:
+        if not self._callbacks_and_connectivities and not self._try_to_connect:
+          self._polling = False
+          self._connectivity = None
+          completion_queue.shutdown()
+          break
+        try_to_connect = self._try_to_connect
+        self._try_to_connect = False
+      if event.success or try_to_connect:
+        low_connectivity = low_channel.check_connectivity_state(try_to_connect)
+        with self._lock:
+          self._connectivity = self._mapping[low_connectivity]
+          if not self._delivering:
+            callbacks = self._deliveries(self._connectivity)
+            if callbacks:
+              self._spawn_delivery(self._connectivity, callbacks)
+
+  def subscribe(self, callback, try_to_connect):
+    with self._lock:
+      if not self._callbacks_and_connectivities and not self._polling:
+        polling_thread = threading.Thread(
+            target=self._poll_connectivity,
+            args=(self._low_channel, bool(try_to_connect)))
+        polling_thread.start()
+        self._polling = True
+        self._callbacks_and_connectivities.append([callback, None])
+      elif not self._delivering and self._connectivity is not None:
+        self._spawn_delivery(self._connectivity, (callback,))
+        self._try_to_connect |= bool(try_to_connect)
+        self._callbacks_and_connectivities.append(
+            [callback, self._connectivity])
+      else:
+        self._try_to_connect |= bool(try_to_connect)
+        self._callbacks_and_connectivities.append([callback, None])
+
+  def unsubscribe(self, callback):
+    with self._lock:
+      for index, (subscribed_callback, unused_connectivity) in enumerate(
+          self._callbacks_and_connectivities):
+        if callback == subscribed_callback:
+          self._callbacks_and_connectivities.pop(index)
+          break
+
+  def low_channel(self):
+    return self._low_channel
diff --git a/src/python/grpcio/grpc/beta/beta.py b/src/python/grpcio/grpc/beta/beta.py
new file mode 100644
index 0000000..40cad5e
--- /dev/null
+++ b/src/python/grpcio/grpc/beta/beta.py
@@ -0,0 +1,114 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Entry points into gRPC Python Beta."""
+
+import enum
+
+from grpc._adapter import _low
+from grpc._adapter import _types
+from grpc.beta import _connectivity_channel
+
+_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = (
+    'Exception calling channel subscription callback!')
+
+
+@enum.unique
+class ChannelConnectivity(enum.Enum):
+  """Mirrors grpc_connectivity_state in the gRPC Core.
+
+  Attributes:
+    IDLE: The channel is idle.
+    CONNECTING: The channel is connecting.
+    READY: The channel is ready to conduct RPCs.
+    TRANSIENT_FAILURE: The channel has seen a failure from which it expects to
+      recover.
+    FATAL_FAILURE: The channel has seen a failure from which it cannot recover.
+  """
+
+  IDLE = (_types.ConnectivityState.IDLE, 'idle',)
+  CONNECTING = (_types.ConnectivityState.CONNECTING, 'connecting',)
+  READY = (_types.ConnectivityState.READY, 'ready',)
+  TRANSIENT_FAILURE = (
+      _types.ConnectivityState.TRANSIENT_FAILURE, 'transient failure',)
+  FATAL_FAILURE = (_types.ConnectivityState.FATAL_FAILURE, 'fatal failure',)
+
+_LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY = {
+    state: connectivity for state, connectivity in zip(
+        _types.ConnectivityState, ChannelConnectivity)
+}
+
+
+class Channel(object):
+  """A channel to a remote host through which RPCs may be conducted.
+
+  Only the "subscribe" and "unsubscribe" methods are supported for application
+  use. This class' instance constructor and all other attributes are
+  unsupported.
+  """
+
+  def __init__(self, low_channel):
+    self._connectivity_channel = _connectivity_channel.ConnectivityChannel(
+        low_channel, _LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY)
+
+  def subscribe(self, callback, try_to_connect=None):
+    """Subscribes to this Channel's connectivity.
+
+    Args:
+      callback: A callable to be invoked and passed this Channel's connectivity.
+        The callable will be invoked immediately upon subscription and again for
+        every change to this Channel's connectivity thereafter until it is
+        unsubscribed.
+      try_to_connect: A boolean indicating whether or not this Channel should
+        attempt to connect if it is not already connected and ready to conduct
+        RPCs.
+    """
+    self._connectivity_channel.subscribe(callback, try_to_connect)
+
+  def unsubscribe(self, callback):
+    """Unsubscribes a callback from this Channel's connectivity.
+
+    Args:
+      callback: A callable previously registered with this Channel from having
+        been passed to its "subscribe" method.
+    """
+    self._connectivity_channel.unsubscribe(callback)
+
+
+def create_insecure_channel(host, port):
+  """Creates an insecure Channel to a remote host.
+
+  Args:
+    host: The name of the remote host to which to connect.
+    port: The port of the remote host to which to connect.
+
+  Returns:
+    A Channel to the remote host through which RPCs may be conducted.
+  """
+  return Channel(_low.Channel('%s:%d' % (host, port), ()))
diff --git a/src/python/grpcio/grpc/beta/utilities.py b/src/python/grpcio/grpc/beta/utilities.py
new file mode 100644
index 0000000..1b5356e
--- /dev/null
+++ b/src/python/grpcio/grpc/beta/utilities.py
@@ -0,0 +1,161 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Utilities for the gRPC Python Beta API."""
+
+import threading
+import time
+
+from grpc.beta import beta
+from grpc.framework.foundation import callable_util
+from grpc.framework.foundation import future
+
+_DONE_CALLBACK_EXCEPTION_LOG_MESSAGE = (
+    'Exception calling connectivity future "done" callback!')
+
+
+class _ChannelReadyFuture(future.Future):
+
+  def __init__(self, channel):
+    self._condition = threading.Condition()
+    self._channel = channel
+
+    self._matured = False
+    self._cancelled = False
+    self._done_callbacks = []
+
+  def _block(self, timeout):
+    until = None if timeout is None else time.time() + timeout
+    with self._condition:
+      while True:
+        if self._cancelled:
+          raise future.CancelledError()
+        elif self._matured:
+          return
+        else:
+          if until is None:
+            self._condition.wait()
+          else:
+            remaining = until - time.time()
+            if remaining < 0:
+              raise future.TimeoutError()
+            else:
+              self._condition.wait(timeout=remaining)
+
+  def _update(self, connectivity):
+    with self._condition:
+      if not self._cancelled and connectivity is beta.ChannelConnectivity.READY:
+        self._matured = True
+        self._channel.unsubscribe(self._update)
+        self._condition.notify_all()
+        done_callbacks = tuple(self._done_callbacks)
+        self._done_callbacks = None
+      else:
+        return
+
+    for done_callback in done_callbacks:
+      callable_util.call_logging_exceptions(
+          done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self)
+
+  def cancel(self):
+    with self._condition:
+      if not self._matured:
+        self._cancelled = True
+        self._channel.unsubscribe(self._update)
+        self._condition.notify_all()
+        done_callbacks = tuple(self._done_callbacks)
+        self._done_callbacks = None
+      else:
+        return False
+
+    for done_callback in done_callbacks:
+      callable_util.call_logging_exceptions(
+          done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self)
+
+  def cancelled(self):
+    with self._condition:
+      return self._cancelled
+
+  def running(self):
+    with self._condition:
+      return not self._cancelled and not self._matured
+
+  def done(self):
+    with self._condition:
+      return self._cancelled or self._matured
+
+  def result(self, timeout=None):
+    self._block(timeout)
+    return None
+
+  def exception(self, timeout=None):
+    self._block(timeout)
+    return None
+
+  def traceback(self, timeout=None):
+    self._block(timeout)
+    return None
+
+  def add_done_callback(self, fn):
+    with self._condition:
+      if not self._cancelled and not self._matured:
+        self._done_callbacks.append(fn)
+        return
+
+    fn(self)
+
+  def start(self):
+    with self._condition:
+      self._channel.subscribe(self._update, try_to_connect=True)
+
+  def __del__(self):
+    with self._condition:
+      if not self._cancelled and not self._matured:
+        self._channel.unsubscribe(self._update)
+
+
+def channel_ready_future(channel):
+  """Creates a future.Future that matures when a beta.Channel is ready.
+
+  Cancelling the returned future.Future does not tell the given beta.Channel to
+  abandon attempts it may have been making to connect; cancelling merely
+  deactivates the return future.Future's subscription to the given
+  beta.Channel's connectivity.
+
+  Args:
+    channel: A beta.Channel.
+
+  Returns:
+    A future.Future that matures when the given Channel has connectivity
+      beta.ChannelConnectivity.READY.
+  """
+  ready_future = _ChannelReadyFuture(channel)
+  ready_future.start()
+  return ready_future
+
diff --git a/src/python/grpcio_test/grpc_test/beta/__init__.py b/src/python/grpcio_test/grpc_test/beta/__init__.py
new file mode 100644
index 0000000..7086519
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/beta/__init__.py
@@ -0,0 +1,30 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
diff --git a/src/python/grpcio_test/grpc_test/beta/_connectivity_channel_test.py b/src/python/grpcio_test/grpc_test/beta/_connectivity_channel_test.py
new file mode 100644
index 0000000..0384648
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/beta/_connectivity_channel_test.py
@@ -0,0 +1,180 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Tests of grpc.beta._connectivity_channel."""
+
+import threading
+import time
+import unittest
+
+from grpc._adapter import _low
+from grpc._adapter import _types
+from grpc.beta import _connectivity_channel
+from grpc_test.framework.common import test_constants
+
+_MAPPING_FUNCTION = lambda integer: integer * 200 + 17
+_MAPPING = {
+    state: _MAPPING_FUNCTION(state) for state in _types.ConnectivityState}
+_IDLE, _CONNECTING, _READY, _TRANSIENT_FAILURE, _FATAL_FAILURE = map(
+    _MAPPING_FUNCTION, _types.ConnectivityState)
+
+
+def _drive_completion_queue(completion_queue):
+  while True:
+    event = completion_queue.next(time.time() + 24 * 60 * 60)
+    if event.type == _types.EventType.QUEUE_SHUTDOWN:
+      break
+
+
+class _Callback(object):
+
+  def __init__(self):
+    self._condition = threading.Condition()
+    self._connectivities = []
+
+  def update(self, connectivity):
+    with self._condition:
+      self._connectivities.append(connectivity)
+      self._condition.notify()
+
+  def connectivities(self):
+    with self._condition:
+      return tuple(self._connectivities)
+
+  def block_until_connectivities_satisfy(self, predicate):
+    with self._condition:
+      while True:
+        connectivities = tuple(self._connectivities)
+        if predicate(connectivities):
+          return connectivities
+        else:
+          self._condition.wait()
+
+
+class ChannelConnectivityTest(unittest.TestCase):
+
+  def test_lonely_channel_connectivity(self):
+    low_channel = _low.Channel('localhost:12345', ())
+    callback = _Callback()
+
+    connectivity_channel = _connectivity_channel.ConnectivityChannel(
+        low_channel, _MAPPING)
+    connectivity_channel.subscribe(callback.update, try_to_connect=False)
+    first_connectivities = callback.block_until_connectivities_satisfy(bool)
+    connectivity_channel.subscribe(callback.update, try_to_connect=True)
+    second_connectivities = callback.block_until_connectivities_satisfy(
+        lambda connectivities: 2 <= len(connectivities))
+    # Wait for a connection that will never happen.
+    time.sleep(test_constants.SHORT_TIMEOUT)
+    third_connectivities = callback.connectivities()
+    connectivity_channel.unsubscribe(callback.update)
+    fourth_connectivities = callback.connectivities()
+    connectivity_channel.unsubscribe(callback.update)
+    fifth_connectivities = callback.connectivities()
+
+    self.assertSequenceEqual((_IDLE,), first_connectivities)
+    self.assertNotIn(_READY, second_connectivities)
+    self.assertNotIn(_READY, third_connectivities)
+    self.assertNotIn(_READY, fourth_connectivities)
+    self.assertNotIn(_READY, fifth_connectivities)
+
+  def test_immediately_connectable_channel_connectivity(self):
+    server_completion_queue = _low.CompletionQueue()
+    server = _low.Server(server_completion_queue, [])
+    port = server.add_http2_port('[::]:0')
+    server.start()
+    server_completion_queue_thread = threading.Thread(
+        target=_drive_completion_queue, args=(server_completion_queue,))
+    server_completion_queue_thread.start()
+    low_channel = _low.Channel('localhost:%d' % port, ())
+    first_callback = _Callback()
+    second_callback = _Callback()
+
+    connectivity_channel = _connectivity_channel.ConnectivityChannel(
+        low_channel, _MAPPING)
+    connectivity_channel.subscribe(first_callback.update, try_to_connect=False)
+    first_connectivities = first_callback.block_until_connectivities_satisfy(
+        bool)
+    # Wait for a connection that will never happen because try_to_connect=True
+    # has not yet been passed.
+    time.sleep(test_constants.SHORT_TIMEOUT)
+    second_connectivities = first_callback.connectivities()
+    connectivity_channel.subscribe(second_callback.update, try_to_connect=True)
+    third_connectivities = first_callback.block_until_connectivities_satisfy(
+        lambda connectivities: 2 <= len(connectivities))
+    fourth_connectivities = second_callback.block_until_connectivities_satisfy(
+        bool)
+    # Wait for a connection that will happen (or may already have happened).
+    first_callback.block_until_connectivities_satisfy(
+        lambda connectivities: _READY in connectivities)
+    second_callback.block_until_connectivities_satisfy(
+        lambda connectivities: _READY in connectivities)
+    connectivity_channel.unsubscribe(first_callback.update)
+    connectivity_channel.unsubscribe(second_callback.update)
+
+    server.shutdown()
+    server_completion_queue.shutdown()
+    server_completion_queue_thread.join()
+
+    self.assertSequenceEqual((_IDLE,), first_connectivities)
+    self.assertSequenceEqual((_IDLE,), second_connectivities)
+    self.assertNotIn(_TRANSIENT_FAILURE, third_connectivities)
+    self.assertNotIn(_FATAL_FAILURE, third_connectivities)
+    self.assertNotIn(_TRANSIENT_FAILURE, fourth_connectivities)
+    self.assertNotIn(_FATAL_FAILURE, fourth_connectivities)
+
+  def test_reachable_then_unreachable_channel_connectivity(self):
+    server_completion_queue = _low.CompletionQueue()
+    server = _low.Server(server_completion_queue, [])
+    port = server.add_http2_port('[::]:0')
+    server.start()
+    server_completion_queue_thread = threading.Thread(
+        target=_drive_completion_queue, args=(server_completion_queue,))
+    server_completion_queue_thread.start()
+    low_channel = _low.Channel('localhost:%d' % port, ())
+    callback = _Callback()
+
+    connectivity_channel = _connectivity_channel.ConnectivityChannel(
+        low_channel, _MAPPING)
+    connectivity_channel.subscribe(callback.update, try_to_connect=True)
+    callback.block_until_connectivities_satisfy(
+        lambda connectivities: _READY in connectivities)
+    # Now take down the server and confirm that channel readiness is repudiated.
+    server.shutdown()
+    callback.block_until_connectivities_satisfy(
+        lambda connectivities: connectivities[-1] is not _READY)
+    connectivity_channel.unsubscribe(callback.update)
+
+    server.shutdown()
+    server_completion_queue.shutdown()
+    server_completion_queue_thread.join()
+
+
+if __name__ == '__main__':
+  unittest.main(verbosity=2)
diff --git a/src/python/grpcio_test/grpc_test/beta/_utilities_test.py b/src/python/grpcio_test/grpc_test/beta/_utilities_test.py
new file mode 100644
index 0000000..998e74c
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/beta/_utilities_test.py
@@ -0,0 +1,123 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Tests of grpc.beta.utilities."""
+
+import threading
+import time
+import unittest
+
+from grpc._adapter import _low
+from grpc._adapter import _types
+from grpc.beta import beta
+from grpc.beta import utilities
+from grpc.framework.foundation import future
+from grpc_test.framework.common import test_constants
+
+
+def _drive_completion_queue(completion_queue):
+  while True:
+    event = completion_queue.next(time.time() + 24 * 60 * 60)
+    if event.type == _types.EventType.QUEUE_SHUTDOWN:
+      break
+
+
+class _Callback(object):
+
+  def __init__(self):
+    self._condition = threading.Condition()
+    self._value = None
+
+  def accept_value(self, value):
+    with self._condition:
+      self._value = value
+      self._condition.notify_all()
+
+  def block_until_called(self):
+    with self._condition:
+      while self._value is None:
+        self._condition.wait()
+      return self._value
+
+
+class ChannelConnectivityTest(unittest.TestCase):
+
+  def test_lonely_channel_connectivity(self):
+    channel = beta.create_insecure_channel('localhost', 12345)
+    callback = _Callback()
+
+    ready_future = utilities.channel_ready_future(channel)
+    ready_future.add_done_callback(callback.accept_value)
+    with self.assertRaises(future.TimeoutError):
+      ready_future.result(test_constants.SHORT_TIMEOUT)
+    self.assertFalse(ready_future.cancelled())
+    self.assertFalse(ready_future.done())
+    self.assertTrue(ready_future.running())
+    ready_future.cancel()
+    value_passed_to_callback = callback.block_until_called()
+    self.assertIs(ready_future, value_passed_to_callback)
+    self.assertTrue(ready_future.cancelled())
+    self.assertTrue(ready_future.done())
+    self.assertFalse(ready_future.running())
+
+  def test_immediately_connectable_channel_connectivity(self):
+    server_completion_queue = _low.CompletionQueue()
+    server = _low.Server(server_completion_queue, [])
+    port = server.add_http2_port('[::]:0')
+    server.start()
+    server_completion_queue_thread = threading.Thread(
+        target=_drive_completion_queue, args=(server_completion_queue,))
+    server_completion_queue_thread.start()
+    channel = beta.create_insecure_channel('localhost', port)
+    callback = _Callback()
+
+    try:
+      ready_future = utilities.channel_ready_future(channel)
+      ready_future.add_done_callback(callback.accept_value)
+      self.assertIsNone(
+          ready_future.result(test_constants.SHORT_TIMEOUT))
+      value_passed_to_callback = callback.block_until_called()
+      self.assertIs(ready_future, value_passed_to_callback)
+      self.assertFalse(ready_future.cancelled())
+      self.assertTrue(ready_future.done())
+      self.assertFalse(ready_future.running())
+      # Cancellation after maturity has no effect.
+      ready_future.cancel()
+      self.assertFalse(ready_future.cancelled())
+      self.assertTrue(ready_future.done())
+      self.assertFalse(ready_future.running())
+    finally:
+      ready_future.cancel()
+      server.shutdown()
+      server_completion_queue.shutdown()
+      server_completion_queue_thread.join()
+
+
+if __name__ == '__main__':
+  unittest.main(verbosity=2)