Route Python Beta API through Python GA API
diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py
index bbf04ad..5ba5a4e 100644
--- a/src/python/grpcio/grpc/__init__.py
+++ b/src/python/grpcio/grpc/__init__.py
@@ -947,6 +947,21 @@
           metadata_plugin, effective_name))
 
 
+def access_token_call_credentials(access_token):
+  """Construct CallCredentials from an access token.
+
+  Args:
+    access_token: A string to place directly in the http request
+      authorization header, ie "Authorization: Bearer <access_token>".
+
+  Returns:
+    A CallCredentials.
+  """
+  from grpc import _auth
+  return metadata_call_credentials(
+      _auth.AccessTokenCallCredentials(access_token))
+
+
 def composite_call_credentials(call_credentials, additional_call_credentials):
   """Compose two CallCredentials to make a new one.
 
diff --git a/src/python/grpcio/grpc/_adapter/_implementations.py b/src/python/grpcio/grpc/_adapter/_implementations.py
deleted file mode 100644
index b85f228..0000000
--- a/src/python/grpcio/grpc/_adapter/_implementations.py
+++ /dev/null
@@ -1,48 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-#     * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-#     * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-#     * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-import collections
-
-from grpc.beta import interfaces
-
-class AuthMetadataContext(collections.namedtuple(
-    'AuthMetadataContext', [
-        'service_url',
-        'method_name'
-    ]), interfaces.GRPCAuthMetadataContext):
-  pass
-
-
-class AuthMetadataPluginCallback(interfaces.GRPCAuthMetadataContext):
-
-  def __init__(self, callback):
-    self._callback = callback
-
-  def __call__(self, metadata, error):
-    self._callback(metadata, error)
diff --git a/src/python/grpcio/grpc/_adapter/_low.py b/src/python/grpcio/grpc/_adapter/_low.py
index 00788bd..4841016 100644
--- a/src/python/grpcio/grpc/_adapter/_low.py
+++ b/src/python/grpcio/grpc/_adapter/_low.py
@@ -30,8 +30,8 @@
 import threading
 
 from grpc import _grpcio_metadata
+from grpc import _plugin_wrapping
 from grpc._cython import cygrpc
-from grpc._adapter import _implementations
 from grpc._adapter import _types
 
 _USER_AGENT = 'Python-gRPC-{}'.format(_grpcio_metadata.__version__)
@@ -57,78 +57,8 @@
   return cygrpc.channel_credentials_ssl(root_certificates, pair)
 
 
-class _WrappedCygrpcCallback(object):
-
-  def __init__(self, cygrpc_callback):
-    self.is_called = False
-    self.error = None
-    self.is_called_lock = threading.Lock()
-    self.cygrpc_callback = cygrpc_callback
-
-  def _invoke_failure(self, error):
-    # TODO(atash) translate different Exception superclasses into different
-    # status codes.
-    self.cygrpc_callback(
-        cygrpc.Metadata([]), cygrpc.StatusCode.internal, error.message)
-
-  def _invoke_success(self, metadata):
-    try:
-      cygrpc_metadata = cygrpc.Metadata(
-          cygrpc.Metadatum(key, value)
-          for key, value in metadata)
-    except Exception as error:
-      self._invoke_failure(error)
-      return
-    self.cygrpc_callback(cygrpc_metadata, cygrpc.StatusCode.ok, '')
-
-  def __call__(self, metadata, error):
-    with self.is_called_lock:
-      if self.is_called:
-        raise RuntimeError('callback should only ever be invoked once')
-      if self.error:
-        self._invoke_failure(self.error)
-        return
-      self.is_called = True
-    if error is None:
-      self._invoke_success(metadata)
-    else:
-      self._invoke_failure(error)
-
-  def notify_failure(self, error):
-    with self.is_called_lock:
-      if not self.is_called:
-        self.error = error
-
-
-class _WrappedPlugin(object):
-
-  def __init__(self, plugin):
-    self.plugin = plugin
-
-  def __call__(self, context, cygrpc_callback):
-    wrapped_cygrpc_callback = _WrappedCygrpcCallback(cygrpc_callback)
-    wrapped_context = _implementations.AuthMetadataContext(context.service_url,
-                                                           context.method_name)
-    try:
-      self.plugin(
-          wrapped_context,
-          _implementations.AuthMetadataPluginCallback(wrapped_cygrpc_callback))
-    except Exception as error:
-      wrapped_cygrpc_callback.notify_failure(error)
-      raise
-
-
-def call_credentials_metadata_plugin(plugin, name):
-  """
-  Args:
-    plugin: A callable accepting a _types.AuthMetadataContext
-      object and a callback (itself accepting a list of metadata key/value
-      2-tuples and a None-able exception value). The callback must be eventually
-      called, but need not be called in plugin's invocation.
-      plugin's invocation must be non-blocking.
-  """
-  return cygrpc.call_credentials_metadata_plugin(
-      cygrpc.CredentialsMetadataPlugin(_WrappedPlugin(plugin), name))
+call_credentials_metadata_plugin = (
+    _plugin_wrapping.call_credentials_metadata_plugin)
 
 
 class CompletionQueue(_types.CompletionQueue):
diff --git a/src/python/grpcio/grpc/beta/_auth.py b/src/python/grpcio/grpc/_auth.py
similarity index 94%
rename from src/python/grpcio/grpc/beta/_auth.py
rename to src/python/grpcio/grpc/_auth.py
index 553d4b9..3ae00ca 100644
--- a/src/python/grpcio/grpc/beta/_auth.py
+++ b/src/python/grpcio/grpc/_auth.py
@@ -31,7 +31,7 @@
 
 from concurrent import futures
 
-from grpc.beta import interfaces
+import grpc
 
 
 def _sign_request(callback, token, error):
@@ -39,7 +39,7 @@
   callback(metadata, error)
 
 
-class GoogleCallCredentials(interfaces.GRPCAuthMetadataPlugin):
+class GoogleCallCredentials(grpc.AuthMetadataPlugin):
   """Metadata wrapper for GoogleCredentials from the oauth2client library."""
 
   def __init__(self, credentials):
@@ -63,7 +63,7 @@
     self._pool.shutdown(wait=False)
 
 
-class AccessTokenCallCredentials(interfaces.GRPCAuthMetadataPlugin):
+class AccessTokenCallCredentials(grpc.AuthMetadataPlugin):
   """Metadata wrapper for raw access token credentials."""
 
   def __init__(self, access_token):
diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py
index c65070f..aae9f48 100644
--- a/src/python/grpcio/grpc/_server.py
+++ b/src/python/grpcio/grpc/_server.py
@@ -65,12 +65,23 @@
   return request_event.batch_operations[0].received_message.bytes()
 
 
-def _code(state):
+def _application_code(code):
+  cygrpc_code = _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE.get(code)
+  return cygrpc.StatusCode.unknown if cygrpc_code is None else cygrpc_code
+
+
+def _completion_code(state):
   if state.code is None:
     return cygrpc.StatusCode.ok
   else:
-    code = _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE.get(state.code)
-    return cygrpc.StatusCode.unknown if code is None else code
+    return _application_code(state.code)
+
+
+def _abortion_code(state, code):
+  if state.code is None:
+    return code
+  else:
+    return _application_code(state.code)
 
 
 def _details(state):
@@ -126,20 +137,22 @@
 
 def _abort(state, call, code, details):
   if state.client is not _CANCELLED:
+    effective_code = _abortion_code(state, code)
+    effective_details = details if state.details is None else state.details
     if state.initial_metadata_allowed:
       operations = (
           cygrpc.operation_send_initial_metadata(
               _EMPTY_METADATA, _EMPTY_FLAGS),
           cygrpc.operation_send_status_from_server(
-              _common.metadata(state.trailing_metadata), code, details,
-              _EMPTY_FLAGS),
+              _common.metadata(state.trailing_metadata), effective_code,
+              effective_details, _EMPTY_FLAGS),
       )
       token = _SEND_INITIAL_METADATA_AND_SEND_STATUS_FROM_SERVER_TOKEN
     else:
       operations = (
           cygrpc.operation_send_status_from_server(
-              _common.metadata(state.trailing_metadata), code, details,
-              _EMPTY_FLAGS),
+              _common.metadata(state.trailing_metadata), effective_code,
+              effective_details, _EMPTY_FLAGS),
       )
       token = _SEND_STATUS_FROM_SERVER_TOKEN
     call.start_batch(
@@ -346,7 +359,7 @@
 def _call_behavior(rpc_event, state, behavior, argument, request_deserializer):
   context = _Context(rpc_event, state, request_deserializer)
   try:
-    return behavior(argument, context)
+    return behavior(argument, context), True
   except Exception as e:  # pylint: disable=broad-except
     with state.condition:
       if e not in state.rpc_errors:
@@ -354,7 +367,7 @@
         logging.exception(details)
         _abort(
             state, rpc_event.operation_call, cygrpc.StatusCode.unknown, details)
-    return None
+    return None, False
 
 
 def _take_response_from_response_iterator(rpc_event, state, response_iterator):
@@ -415,7 +428,7 @@
   with state.condition:
     if state.client is not _CANCELLED:
       trailing_metadata = _common.metadata(state.trailing_metadata)
-      code = _code(state)
+      code = _completion_code(state)
       details = _details(state)
       operations = [
           cygrpc.operation_send_status_from_server(
@@ -440,9 +453,9 @@
     response_serializer):
   argument = argument_thunk()
   if argument is not None:
-    response = _call_behavior(
+    response, proceed = _call_behavior(
         rpc_event, state, behavior, argument, request_deserializer)
-    if response is not None:
+    if proceed:
       serialized_response = _serialize_response(
           rpc_event, state, response, response_serializer)
       if serialized_response is not None:
@@ -455,9 +468,9 @@
     response_serializer):
   argument = argument_thunk()
   if argument is not None:
-    response_iterator = _call_behavior(
+    response_iterator, proceed = _call_behavior(
         rpc_event, state, behavior, argument, request_deserializer)
-    if response_iterator is not None:
+    if proceed:
       while True:
         response, proceed = _take_response_from_response_iterator(
             rpc_event, state, response_iterator)
diff --git a/src/python/grpcio/grpc/beta/_client_adaptations.py b/src/python/grpcio/grpc/beta/_client_adaptations.py
new file mode 100644
index 0000000..621fcf2
--- /dev/null
+++ b/src/python/grpcio/grpc/beta/_client_adaptations.py
@@ -0,0 +1,566 @@
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Translates gRPC's client-side API into gRPC's client-side Beta API."""
+
+import grpc
+from grpc._cython import cygrpc
+from grpc.beta import interfaces
+from grpc.framework.common import cardinality
+from grpc.framework.foundation import future
+from grpc.framework.interfaces.face import face
+
+_STATUS_CODE_TO_ABORTION_KIND_AND_ABORTION_ERROR_CLASS = {
+    grpc.StatusCode.CANCELLED: (
+        face.Abortion.Kind.CANCELLED, face.CancellationError),
+    grpc.StatusCode.UNKNOWN: (
+        face.Abortion.Kind.REMOTE_FAILURE, face.RemoteError),
+    grpc.StatusCode.DEADLINE_EXCEEDED: (
+        face.Abortion.Kind.EXPIRED, face.ExpirationError),
+    grpc.StatusCode.UNIMPLEMENTED: (
+        face.Abortion.Kind.LOCAL_FAILURE, face.LocalError),
+}
+
+
+def _fully_qualified_method(group, method):
+  return b'/{}/{}'.format(group, method)
+
+
+def _effective_metadata(metadata, metadata_transformer):
+  non_none_metadata = () if metadata is None else metadata
+  if metadata_transformer is None:
+    return non_none_metadata
+  else:
+    return metadata_transformer(non_none_metadata)
+
+
+def _credentials(grpc_call_options):
+  return None if grpc_call_options is None else grpc_call_options.credentials
+
+
+def _abortion(rpc_error_call):
+  code = rpc_error_call.code()
+  pair = _STATUS_CODE_TO_ABORTION_KIND_AND_ABORTION_ERROR_CLASS.get(code)
+  error_kind = face.Abortion.Kind.LOCAL_FAILURE if pair is None else pair[0]
+  return face.Abortion(
+      error_kind, rpc_error_call.initial_metadata(),
+      rpc_error_call.trailing_metadata(), code, rpc_error_code.details())
+
+
+def _abortion_error(rpc_error_call):
+  code = rpc_error_call.code()
+  pair = _STATUS_CODE_TO_ABORTION_KIND_AND_ABORTION_ERROR_CLASS.get(code)
+  exception_class = face.AbortionError if pair is None else pair[1]
+  return exception_class(
+      rpc_error_call.initial_metadata(), rpc_error_call.trailing_metadata(),
+      code, rpc_error_call.details())
+
+
+class _InvocationProtocolContext(interfaces.GRPCInvocationContext):
+
+  def disable_next_request_compression(self):
+    pass  # TODO(https://github.com/grpc/grpc/issues/4078): design, implement.
+
+
+class _Rendezvous(future.Future, face.Call):
+
+  def __init__(self, response_future, response_iterator, call):
+    self._future = response_future
+    self._iterator = response_iterator
+    self._call = call
+
+  def cancel(self):
+    return self._call.cancel()
+
+  def cancelled(self):
+    return self._future.cancelled()
+
+  def running(self):
+    return self._future.running()
+
+  def done(self):
+    return self._future.done()
+
+  def result(self, timeout=None):
+    try:
+      return self._future.result(timeout=timeout)
+    except grpc.RpcError as rpc_error_call:
+      raise _abortion_error(rpc_error_call)
+    except grpc.FutureTimeoutError:
+      raise future.TimeoutError()
+    except grpc.FutureCancelledError:
+      raise future.CancelledError()
+
+  def exception(self, timeout=None):
+    try:
+      rpc_error_call = self._future.exception(timeout=timeout)
+      return _abortion_error(rpc_error_call)
+    except grpc.FutureTimeoutError:
+      raise future.TimeoutError()
+    except grpc.FutureCancelledError:
+      raise future.CancelledError()
+
+  def traceback(self, timeout=None):
+    try:
+      return self._future.traceback(timeout=timeout)
+    except grpc.FutureTimeoutError:
+      raise future.TimeoutError()
+    except grpc.FutureCancelledError:
+      raise future.CancelledError()
+
+  def add_done_callback(self, fn):
+    self._future.add_done_callback(lambda ignored_callback: fn(self))
+
+  def __iter__(self):
+    return self
+
+  def _next(self):
+    try:
+      return next(self._iterator)
+    except grpc.RpcError as rpc_error_call:
+      raise _abortion_error(rpc_error_call)
+
+  def __next__(self):
+    return self._next()
+
+  def next(self):
+    return self._next()
+
+  def is_active(self):
+    return self._call.is_active()
+
+  def time_remaining(self):
+    return self._call.time_remaining()
+
+  def add_abortion_callback(self, abortion_callback):
+    registered = self._call.add_callback(
+        lambda: abortion_callback(_abortion(self._call)))
+    return None if registered else _abortion(self._call)
+
+  def protocol_context(self):
+    return _InvocationProtocolContext()
+
+  def initial_metadata(self):
+    return self._call.initial_metadata()
+
+  def terminal_metadata(self):
+    return self._call.terminal_metadata()
+
+  def code(self):
+    return self._call.code()
+
+  def details(self):
+    return self._call.details()
+
+
+def _blocking_unary_unary(
+    channel, group, method, timeout, with_call, protocol_options, metadata,
+    metadata_transformer, request, request_serializer, response_deserializer):
+  try:
+    multi_callable = channel.unary_unary(
+        _fully_qualified_method(group, method),
+        request_serializer=request_serializer,
+        response_deserializer=response_deserializer)
+    effective_metadata = _effective_metadata(metadata, metadata_transformer)
+    if with_call:
+      response, call = multi_callable(
+          request, timeout=timeout, metadata=effective_metadata,
+          credentials=_credentials(protocol_options), with_call=True)
+      return response, _Rendezvous(None, None, call)
+    else:
+      return multi_callable(
+          request, timeout=timeout, metadata=effective_metadata,
+          credentials=_credentials(protocol_options))
+  except grpc.RpcError as rpc_error_call:
+    raise _abortion_error(rpc_error_call)
+
+
+def _future_unary_unary(
+    channel, group, method, timeout, protocol_options, metadata,
+    metadata_transformer, request, request_serializer, response_deserializer):
+  multi_callable = channel.unary_unary(
+      _fully_qualified_method(group, method),
+      request_serializer=request_serializer,
+      response_deserializer=response_deserializer)
+  effective_metadata = _effective_metadata(metadata, metadata_transformer)
+  response_future = multi_callable.future(
+      request, timeout=timeout, metadata=effective_metadata,
+      credentials=_credentials(protocol_options))
+  return _Rendezvous(response_future, None, response_future)
+
+
+def _unary_stream(
+    channel, group, method, timeout, protocol_options, metadata,
+    metadata_transformer, request, request_serializer, response_deserializer):
+  multi_callable = channel.unary_stream(
+      _fully_qualified_method(group, method),
+      request_serializer=request_serializer,
+      response_deserializer=response_deserializer)
+  effective_metadata = _effective_metadata(metadata, metadata_transformer)
+  response_iterator = multi_callable(
+      request, timeout=timeout, metadata=effective_metadata,
+      credentials=_credentials(protocol_options))
+  return _Rendezvous(None, response_iterator, response_iterator)
+
+
+def _blocking_stream_unary(
+    channel, group, method, timeout, with_call, protocol_options, metadata,
+    metadata_transformer, request_iterator, request_serializer,
+    response_deserializer):
+  try:
+    multi_callable = channel.stream_unary(
+        _fully_qualified_method(group, method),
+        request_serializer=request_serializer,
+        response_deserializer=response_deserializer)
+    effective_metadata = _effective_metadata(metadata, metadata_transformer)
+    if with_call:
+      response, call = multi_callable(
+          request_iterator, timeout=timeout, metadata=effective_metadata,
+          credentials=_credentials(protocol_options), with_call=True)
+      return response, _Rendezvous(None, None, call)
+    else:
+      return multi_callable(
+          request_iterator, timeout=timeout, metadata=effective_metadata,
+          credentials=_credentials(protocol_options))
+  except grpc.RpcError as rpc_error_call:
+    raise _abortion_error(rpc_error_call)
+
+
+def _future_stream_unary(
+    channel, group, method, timeout, protocol_options, metadata,
+    metadata_transformer, request_iterator, request_serializer,
+    response_deserializer):
+  multi_callable = channel.stream_unary(
+      _fully_qualified_method(group, method),
+      request_serializer=request_serializer,
+      response_deserializer=response_deserializer)
+  effective_metadata = _effective_metadata(metadata, metadata_transformer)
+  response_future = multi_callable.future(
+      request_iterator, timeout=timeout, metadata=effective_metadata,
+      credentials=_credentials(protocol_options))
+  return _Rendezvous(response_future, None, response_future)
+
+
+def _stream_stream(
+    channel, group, method, timeout, protocol_options, metadata,
+    metadata_transformer, request_iterator, request_serializer,
+    response_deserializer):
+  multi_callable = channel.stream_stream(
+      _fully_qualified_method(group, method),
+      request_serializer=request_serializer,
+      response_deserializer=response_deserializer)
+  effective_metadata = _effective_metadata(metadata, metadata_transformer)
+  response_iterator = multi_callable(
+      request_iterator, timeout=timeout, metadata=effective_metadata,
+      credentials=_credentials(protocol_options))
+  return _Rendezvous(None, response_iterator, response_iterator)
+
+
+class _UnaryUnaryMultiCallable(face.UnaryUnaryMultiCallable):
+
+  def __init__(
+      self, channel, group, method, metadata_transformer, request_serializer,
+      response_deserializer):
+    self._channel = channel
+    self._group = group
+    self._method = method
+    self._metadata_transformer = metadata_transformer
+    self._request_serializer = request_serializer
+    self._response_deserializer = response_deserializer
+
+  def __call__(
+      self, request, timeout, metadata=None, with_call=False,
+      protocol_options=None):
+    return _blocking_unary_unary(
+        self._channel, self._group, self._method, timeout, with_call,
+        protocol_options, metadata, self._metadata_transformer, request,
+        self._request_serializer, self._response_deserializer)
+
+  def future(self, request, timeout, metadata=None, protocol_options=None):
+    return _future_unary_unary(
+        self._channel, self._group, self._method, timeout, protocol_options,
+        metadata, self._metadata_transformer, request, self._request_serializer,
+        self._response_deserializer)
+
+  def event(
+      self, request, receiver, abortion_callback, timeout,
+      metadata=None, protocol_options=None):
+    raise NotImplementedError()
+
+
+class _UnaryStreamMultiCallable(face.UnaryStreamMultiCallable):
+
+  def __init__(
+      self, channel, group, method, metadata_transformer, request_serializer,
+      response_deserializer):
+    self._channel = channel
+    self._group = group
+    self._method = method
+    self._metadata_transformer = metadata_transformer
+    self._request_serializer = request_serializer
+    self._response_deserializer = response_deserializer
+
+  def __call__(self, request, timeout, metadata=None, protocol_options=None):
+    return _unary_stream(
+        self._channel, self._group, self._method, timeout, protocol_options,
+        metadata, self._metadata_transformer, request, self._request_serializer,
+        self._response_deserializer)
+
+  def event(
+      self, request, receiver, abortion_callback, timeout,
+      metadata=None, protocol_options=None):
+    raise NotImplementedError()
+
+
+class _StreamUnaryMultiCallable(face.StreamUnaryMultiCallable):
+
+  def __init__(
+      self, channel, group, method, metadata_transformer, request_serializer,
+      response_deserializer):
+    self._channel = channel
+    self._group = group
+    self._method = method
+    self._metadata_transformer = metadata_transformer
+    self._request_serializer = request_serializer
+    self._response_deserializer = response_deserializer
+
+  def __call__(
+      self, request_iterator, timeout, metadata=None, with_call=False,
+      protocol_options=None):
+    return _blocking_stream_unary(
+        self._channel, self._group, self._method, timeout, with_call,
+        protocol_options, metadata, self._metadata_transformer,
+        request_iterator, self._request_serializer, self._response_deserializer)
+
+  def future(
+      self, request_iterator, timeout, metadata=None, protocol_options=None):
+    return _future_stream_unary(
+        self._channel, self._group, self._method, timeout, protocol_options,
+        metadata, self._metadata_transformer, request_iterator,
+        self._request_serializer, self._response_deserializer)
+
+  def event(
+      self, receiver, abortion_callback, timeout, metadata=None,
+      protocol_options=None):
+    raise NotImplementedError()
+
+
+class _StreamStreamMultiCallable(face.StreamStreamMultiCallable):
+
+  def __init__(
+      self, channel, group, method, metadata_transformer, request_serializer,
+      response_deserializer):
+    self._channel = channel
+    self._group = group
+    self._method = method
+    self._metadata_transformer = metadata_transformer
+    self._request_serializer = request_serializer
+    self._response_deserializer = response_deserializer
+
+  def __call__(
+      self, request_iterator, timeout, metadata=None, protocol_options=None):
+    return _stream_stream(
+        self._channel, self._group, self._method, timeout, protocol_options,
+        metadata, self._metadata_transformer, request_iterator,
+        self._request_serializer, self._response_deserializer)
+
+  def event(
+      self, receiver, abortion_callback, timeout, metadata=None,
+      protocol_options=None):
+    raise NotImplementedError()
+
+
+class _GenericStub(face.GenericStub):
+
+  def __init__(
+      self, channel, metadata_transformer, request_serializers,
+      response_deserializers):
+    self._channel = channel
+    self._metadata_transformer = metadata_transformer
+    self._request_serializers = request_serializers or {}
+    self._response_deserializers = response_deserializers or {}
+
+  def blocking_unary_unary(
+      self, group, method, request, timeout, metadata=None,
+      with_call=None, protocol_options=None):
+    request_serializer = self._request_serializers.get((group, method,))
+    response_deserializer = self._response_deserializers.get((group, method,))
+    return _blocking_unary_unary(
+        self._channel, group, method, timeout, with_call, protocol_options,
+        metadata, self._metadata_transformer, request, request_serializer,
+        response_deserializer)
+
+  def future_unary_unary(
+      self, group, method, request, timeout, metadata=None,
+      protocol_options=None):
+    request_serializer = self._request_serializers.get((group, method,))
+    response_deserializer = self._response_deserializers.get((group, method,))
+    return _future_unary_unary(
+        self._channel, group, method, timeout, protocol_options, metadata,
+        self._metadata_transformer, request, request_serializer,
+        response_deserializer)
+
+  def inline_unary_stream(
+      self, group, method, request, timeout, metadata=None,
+      protocol_options=None):
+    request_serializer = self._request_serializers.get((group, method,))
+    response_deserializer = self._response_deserializers.get((group, method,))
+    return _unary_stream(
+        self._channel, group, method, timeout, protocol_options, metadata,
+        self._metadata_transformer, request, request_serializer,
+        response_deserializer)
+
+  def blocking_stream_unary(
+      self, group, method, request_iterator, timeout, metadata=None,
+      with_call=None, protocol_options=None):
+    request_serializer = self._request_serializers.get((group, method,))
+    response_deserializer = self._response_deserializers.get((group, method,))
+    return _blocking_stream_unary(
+        self._channel, group, method, timeout, with_call, protocol_options,
+        metadata, self._metadata_transformer, request_iterator,
+        request_serializer, response_deserializer)
+
+  def future_stream_unary(
+      self, group, method, request_iterator, timeout, metadata=None,
+      protocol_options=None):
+    request_serializer = self._request_serializers.get((group, method,))
+    response_deserializer = self._response_deserializers.get((group, method,))
+    return _future_stream_unary(
+        self._channel, group, method, timeout, protocol_options, metadata,
+        self._metadata_transformer, request_iterator, request_serializer,
+        response_deserializer)
+
+  def inline_stream_stream(
+      self, group, method, request_iterator, timeout, metadata=None,
+      protocol_options=None):
+    request_serializer = self._request_serializers.get((group, method,))
+    response_deserializer = self._response_deserializers.get((group, method,))
+    return _stream_stream(
+        self._channel, group, method, timeout, protocol_options, metadata,
+        self._metadata_transformer, request_iterator, request_serializer,
+        response_deserializer)
+
+  def event_unary_unary(
+      self, group, method, request, receiver, abortion_callback, timeout,
+      metadata=None, protocol_options=None):
+    raise NotImplementedError()
+
+  def event_unary_stream(
+      self, group, method, request, receiver, abortion_callback, timeout,
+      metadata=None, protocol_options=None):
+    raise NotImplementedError()
+
+  def event_stream_unary(
+      self, group, method, receiver, abortion_callback, timeout,
+      metadata=None, protocol_options=None):
+    raise NotImplementedError()
+
+  def event_stream_stream(
+      self, group, method, receiver, abortion_callback, timeout,
+      metadata=None, protocol_options=None):
+    raise NotImplementedError()
+
+  def unary_unary(self, group, method):
+    request_serializer = self._request_serializers.get((group, method,))
+    response_deserializer = self._response_deserializers.get((group, method,))
+    return _UnaryUnaryMultiCallable(
+        self._channel, group, method, self._metadata_transformer,
+        request_serializer, response_deserializer)
+
+  def unary_stream(self, group, method):
+    request_serializer = self._request_serializers.get((group, method,))
+    response_deserializer = self._response_deserializers.get((group, method,))
+    return _UnaryStreamMultiCallable(
+        self._channel, group, method, self._metadata_transformer,
+        request_serializer, response_deserializer)
+
+  def stream_unary(self, group, method):
+    request_serializer = self._request_serializers.get((group, method,))
+    response_deserializer = self._response_deserializers.get((group, method,))
+    return _StreamUnaryMultiCallable(
+        self._channel, group, method, self._metadata_transformer,
+        request_serializer, response_deserializer)
+
+  def stream_stream(self, group, method):
+    request_serializer = self._request_serializers.get((group, method,))
+    response_deserializer = self._response_deserializers.get((group, method,))
+    return _StreamStreamMultiCallable(
+        self._channel, group, method, self._metadata_transformer,
+        request_serializer, response_deserializer)
+
+  def __enter__(self):
+    return self
+
+  def __exit__(self, exc_type, exc_val, exc_tb):
+    return False
+
+
+class _DynamicStub(face.DynamicStub):
+
+  def __init__(self, generic_stub, group, cardinalities):
+    self._generic_stub = generic_stub
+    self._group = group
+    self._cardinalities = cardinalities
+
+  def __getattr__(self, attr):
+    method_cardinality = self._cardinalities.get(attr)
+    if method_cardinality is cardinality.Cardinality.UNARY_UNARY:
+      return self._generic_stub.unary_unary(self._group, attr)
+    elif method_cardinality is cardinality.Cardinality.UNARY_STREAM:
+      return self._generic_stub.unary_stream(self._group, attr)
+    elif method_cardinality is cardinality.Cardinality.STREAM_UNARY:
+      return self._generic_stub.stream_unary(self._group, attr)
+    elif method_cardinality is cardinality.Cardinality.STREAM_STREAM:
+      return self._generic_stub.stream_stream(self._group, attr)
+    else:
+      raise AttributeError('_DynamicStub object has no attribute "%s"!' % attr)
+
+  def __enter__(self):
+    return self
+
+  def __exit__(self, exc_type, exc_val, exc_tb):
+    return False
+
+
+def generic_stub(
+    channel, host, metadata_transformer, request_serializers,
+    response_deserializers):
+  return _GenericStub(
+      channel, metadata_transformer, request_serializers,
+      response_deserializers)
+
+
+def dynamic_stub(
+    channel, service, cardinalities, host, metadata_transformer,
+    request_serializers, response_deserializers):
+  return _DynamicStub(
+      _GenericStub(
+          channel, metadata_transformer, request_serializers,
+          response_deserializers),
+      service, cardinalities)
diff --git a/src/python/grpcio/grpc/beta/_server_adaptations.py b/src/python/grpcio/grpc/beta/_server_adaptations.py
new file mode 100644
index 0000000..52eadf2
--- /dev/null
+++ b/src/python/grpcio/grpc/beta/_server_adaptations.py
@@ -0,0 +1,359 @@
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Translates gRPC's server-side API into gRPC's server-side Beta API."""
+
+import collections
+import threading
+
+import grpc
+from grpc.beta import interfaces
+from grpc.framework.common import cardinality
+from grpc.framework.common import style
+from grpc.framework.foundation import abandonment
+from grpc.framework.foundation import logging_pool
+from grpc.framework.foundation import stream
+from grpc.framework.interfaces.face import face
+
+_DEFAULT_POOL_SIZE = 8
+
+
+class _ServerProtocolContext(interfaces.GRPCServicerContext):
+
+  def __init__(self, servicer_context):
+    self._servicer_context = servicer_context
+
+  def peer(self):
+    return self._servicer_context.peer()
+
+  def disable_next_response_compression(self):
+    pass  # TODO(https://github.com/grpc/grpc/issues/4078): design, implement.
+
+
+class _FaceServicerContext(face.ServicerContext):
+
+  def __init__(self, servicer_context):
+    self._servicer_context = servicer_context
+
+  def is_active(self):
+    return self._servicer_context.is_active()
+
+  def time_remaining(self):
+    return self._servicer_context.time_remaining()
+
+  def add_abortion_callback(self, abortion_callback):
+    raise NotImplementedError(
+        'add_abortion_callback no longer supported server-side!')
+
+  def cancel(self):
+    self._servicer_context.cancel()
+
+  def protocol_context(self):
+    return _ServerProtocolContext(self._servicer_context)
+
+  def invocation_metadata(self):
+    return self._servicer_context.invocation_metadata()
+
+  def initial_metadata(self, initial_metadata):
+    self._servicer_context.send_initial_metadata(initial_metadata)
+
+  def terminal_metadata(self, terminal_metadata):
+    self._servicer_context.set_terminal_metadata(terminal_metadata)
+
+  def code(self, code):
+    self._servicer_context.set_code(code)
+
+  def details(self, details):
+    self._servicer_context.set_details(details)
+
+
+def _adapt_unary_request_inline(unary_request_inline):
+  def adaptation(request, servicer_context):
+    return unary_request_inline(request, _FaceServicerContext(servicer_context))
+  return adaptation
+
+
+def _adapt_stream_request_inline(stream_request_inline):
+  def adaptation(request_iterator, servicer_context):
+    return stream_request_inline(
+        request_iterator, _FaceServicerContext(servicer_context))
+  return adaptation
+
+
+class _Callback(stream.Consumer):
+
+  def __init__(self):
+    self._condition = threading.Condition()
+    self._values = []
+    self._terminated = False
+    self._cancelled = False
+
+  def consume(self, value):
+    with self._condition:
+      self._values.append(value)
+      self._condition.notify_all()
+
+  def terminate(self):
+    with self._condition:
+      self._terminated = True
+      self._condition.notify_all()
+
+  def consume_and_terminate(self, value):
+    with self._condition:
+      self._values.append(value)
+      self._terminated = True
+      self._condition.notify_all()
+
+  def cancel(self):
+    with self._condition:
+      self._cancelled = True
+      self._condition.notify_all()
+
+  def draw_one_value(self):
+    with self._condition:
+      while True:
+        if self._cancelled:
+          raise abandonment.Abandoned()
+        elif self._values:
+          return self._values.pop(0)
+        elif self._terminated:
+          return None
+        else:
+          self._condition.wait()
+
+  def draw_all_values(self):
+    with self._condition:
+      while True:
+        if self._cancelled:
+          raise abandonment.Abandoned()
+        elif self._terminated:
+          all_values = tuple(self._values)
+          self._values = None
+          return all_values
+        else:
+          self._condition.wait()
+
+
+def _pipe_requests(request_iterator, request_consumer, servicer_context):
+  for request in request_iterator:
+    if not servicer_context.is_active():
+      return
+    request_consumer.consume(request)
+    if not servicer_context.is_active():
+      return
+  request_consumer.terminate()
+
+
+def _adapt_unary_unary_event(unary_unary_event):
+  def adaptation(request, servicer_context):
+    callback = _Callback()
+    if not servicer_context.add_callback(callback.cancel):
+      raise abandonment.Abandoned()
+    unary_unary_event(
+        request, callback.consume_and_terminate,
+        _FaceServicerContext(servicer_context))
+    return callback.draw_all_values()[0]
+  return adaptation
+
+
+def _adapt_unary_stream_event(unary_stream_event):
+  def adaptation(request, servicer_context):
+    callback = _Callback()
+    if not servicer_context.add_callback(callback.cancel):
+      raise abandonment.Abandoned()
+    unary_stream_event(
+        request, callback, _FaceServicerContext(servicer_context))
+    while True:
+      response = callback.draw_one_value()
+      if response is None:
+        return
+      else:
+        yield response
+  return adaptation
+
+
+def _adapt_stream_unary_event(stream_unary_event):
+  def adaptation(request_iterator, servicer_context):
+    callback = _Callback()
+    if not servicer_context.add_callback(callback.cancel):
+      raise abandonment.Abandoned()
+    request_consumer = stream_unary_event(
+        callback.consume_and_terminate, _FaceServicerContext(servicer_context))
+    request_pipe_thread = threading.Thread(
+        target=_pipe_requests,
+        args=(request_iterator, request_consumer, servicer_context,))
+    request_pipe_thread.start()
+    return callback.draw_all_values()[0]
+  return adaptation
+
+
+def _adapt_stream_stream_event(stream_stream_event):
+  def adaptation(request_iterator, servicer_context):
+    callback = _Callback()
+    if not servicer_context.add_callback(callback.cancel):
+      raise abandonment.Abandoned()
+    request_consumer = stream_stream_event(
+        callback, _FaceServicerContext(servicer_context))
+    request_pipe_thread = threading.Thread(
+        target=_pipe_requests,
+        args=(request_iterator, request_consumer, servicer_context,))
+    request_pipe_thread.start()
+    while True:
+      response = callback.draw_one_value()
+      if response is None:
+        return
+      else:
+        yield response
+  return adaptation
+
+
+class _SimpleMethodHandler(
+    collections.namedtuple(
+        '_MethodHandler',
+        ('request_streaming', 'response_streaming', 'request_deserializer',
+         'response_serializer', 'unary_unary', 'unary_stream', 'stream_unary',
+         'stream_stream',)),
+    grpc.RpcMethodHandler):
+  pass
+
+
+def _simple_method_handler(
+    implementation, request_deserializer, response_serializer):
+  if implementation.style is style.Service.INLINE:
+    if implementation.cardinality is cardinality.Cardinality.UNARY_UNARY:
+      return _SimpleMethodHandler(
+          False, False, request_deserializer, response_serializer,
+          _adapt_unary_request_inline(implementation.unary_unary_inline), None,
+          None, None)
+    elif implementation.cardinality is cardinality.Cardinality.UNARY_STREAM:
+      return _SimpleMethodHandler(
+          False, True, request_deserializer, response_serializer, None,
+          _adapt_unary_request_inline(implementation.unary_stream_inline), None,
+          None)
+    elif implementation.cardinality is cardinality.Cardinality.STREAM_UNARY:
+      return _SimpleMethodHandler(
+          True, False, request_deserializer, response_serializer, None, None,
+          _adapt_stream_request_inline(implementation.stream_unary_inline),
+          None)
+    elif implementation.cardinality is cardinality.Cardinality.STREAM_STREAM:
+      return _SimpleMethodHandler(
+          True, True, request_deserializer, response_serializer, None, None,
+          None,
+          _adapt_stream_request_inline(implementation.stream_stream_inline))
+  elif implementation.style is style.Service.EVENT:
+    if implementation.cardinality is cardinality.Cardinality.UNARY_UNARY:
+      return _SimpleMethodHandler(
+          False, False, request_deserializer, response_serializer,
+          _adapt_unary_unary_event(implementation.unary_unary_event), None,
+          None, None)
+    elif implementation.cardinality is cardinality.Cardinality.UNARY_STREAM:
+      return _SimpleMethodHandler(
+          False, True, request_deserializer, response_serializer, None,
+          _adapt_unary_stream_event(implementation.unary_stream_event), None,
+          None)
+    elif implementation.cardinality is cardinality.Cardinality.STREAM_UNARY:
+      return _SimpleMethodHandler(
+          True, False, request_deserializer, response_serializer, None, None,
+          _adapt_stream_unary_event(implementation.stream_unary_event), None)
+    elif implementation.cardinality is cardinality.Cardinality.STREAM_STREAM:
+      return _SimpleMethodHandler(
+          True, True, request_deserializer, response_serializer, None, None,
+          None, _adapt_stream_stream_event(implementation.stream_stream_event))
+
+
+class _GenericRpcHandler(grpc.GenericRpcHandler):
+
+  def __init__(
+      self, method_implementations, multi_method_implementation,
+      request_deserializers, response_serializers):
+    self._method_implementations = method_implementations
+    self._multi_method_implementation = multi_method_implementation
+    self._request_deserializers = request_deserializers or {}
+    self._response_serializers = response_serializers or {}
+
+  def service(self, handler_call_details):
+    try:
+      group_name, method_name = handler_call_details.method.split(b'/')[1:3]
+    except ValueError:
+      return None
+    else:
+      method_implementation = self._method_implementations.get(
+          (group_name, method_name,))
+      if method_implementation is not None:
+        return _simple_method_handler(
+            method_implementation,
+            self._request_deserializers.get((group_name, method_name,)),
+            self._response_serializers.get((group_name, method_name,)))
+      elif self._multi_method_implementation is None:
+        return None
+      else:
+        try:
+          return None  #TODO(nathaniel): call the multimethod.
+        except face.NoSuchMethodError:
+          return None
+
+
+class _Server(interfaces.Server):
+
+  def __init__(self, server):
+    self._server = server
+
+  def add_insecure_port(self, address):
+    return self._server.add_insecure_port(address)
+
+  def add_secure_port(self, address, server_credentials):
+    return self._server.add_secure_port(address, server_credentials)
+
+  def start(self):
+    self._server.start()
+
+  def stop(self, grace):
+    return self._server.stop(grace)
+
+  def __enter__(self):
+    self._server.start()
+    return self
+
+  def __exit__(self, exc_type, exc_val, exc_tb):
+    self._server.stop(None)
+    return False
+
+
+def server(
+    service_implementations, multi_method_implementation, request_deserializers,
+    response_serializers, thread_pool, thread_pool_size):
+  generic_rpc_handler = _GenericRpcHandler(
+      service_implementations, multi_method_implementation,
+      request_deserializers, response_serializers)
+  if thread_pool is None:
+    effective_thread_pool = logging_pool.pool(
+        _DEFAULT_POOL_SIZE if thread_pool_size is None else thread_pool_size)
+  else:
+    effective_thread_pool = thread_pool
+  return _Server(grpc.server((generic_rpc_handler,), effective_thread_pool))
diff --git a/src/python/grpcio/grpc/beta/implementations.py b/src/python/grpcio/grpc/beta/implementations.py
index d8c32dd..4ae6e7d 100644
--- a/src/python/grpcio/grpc/beta/implementations.py
+++ b/src/python/grpcio/grpc/beta/implementations.py
@@ -35,83 +35,20 @@
 import threading  # pylint: disable=unused-import
 
 # cardinality and face are referenced from specification in this module.
-from grpc._adapter import _intermediary_low
-from grpc._adapter import _low
+import grpc
+from grpc import _auth
 from grpc._adapter import _types
-from grpc.beta import _auth
-from grpc.beta import _connectivity_channel
-from grpc.beta import _server
-from grpc.beta import _stub
+from grpc.beta import _client_adaptations
+from grpc.beta import _server_adaptations
 from grpc.beta import interfaces
 from grpc.framework.common import cardinality  # pylint: disable=unused-import
 from grpc.framework.interfaces.face import face  # pylint: disable=unused-import
 
-_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = (
-    'Exception calling channel subscription callback!')
 
-
-class ChannelCredentials(object):
-  """A value encapsulating the data required to create a secure Channel.
-
-  This class and its instances have no supported interface - it exists to define
-  the type of its instances and its instances exist to be passed to other
-  functions.
-  """
-
-  def __init__(self, low_credentials):
-    self._low_credentials = low_credentials
-
-
-def ssl_channel_credentials(root_certificates=None, private_key=None,
-                            certificate_chain=None):
-  """Creates a ChannelCredentials for use with an SSL-enabled Channel.
-
-  Args:
-    root_certificates: The PEM-encoded root certificates or unset to ask for
-      them to be retrieved from a default location.
-    private_key: The PEM-encoded private key to use or unset if no private key
-      should be used.
-    certificate_chain: The PEM-encoded certificate chain to use or unset if no
-      certificate chain should be used.
-
-  Returns:
-    A ChannelCredentials for use with an SSL-enabled Channel.
-  """
-  return ChannelCredentials(_low.channel_credentials_ssl(
-      root_certificates, private_key, certificate_chain))
-
-
-class CallCredentials(object):
-  """A value encapsulating data asserting an identity over an *established*
-  channel. May be composed with ChannelCredentials to always assert identity for
-  every call over that channel.
-
-  This class and its instances have no supported interface - it exists to define
-  the type of its instances and its instances exist to be passed to other
-  functions.
-  """
-
-  def __init__(self, low_credentials):
-    self._low_credentials = low_credentials
-
-
-def metadata_call_credentials(metadata_plugin, name=None):
-  """Construct CallCredentials from an interfaces.GRPCAuthMetadataPlugin.
-
-  Args:
-    metadata_plugin: An interfaces.GRPCAuthMetadataPlugin to use in constructing
-      the CallCredentials object.
-
-  Returns:
-    A CallCredentials object for use in a GRPCCallOptions object.
-  """
-  if name is None:
-    try:
-      name = metadata_plugin.__name__
-    except AttributeError:
-      name = metadata_plugin.__class__.__name__
-  return CallCredentials(
-      _low.call_credentials_metadata_plugin(metadata_plugin, name))
+ChannelCredentials = grpc.ChannelCredentials
+ssl_channel_credentials = grpc.ssl_channel_credentials
+CallCredentials = grpc.CallCredentials
+metadata_call_credentials = grpc.metadata_call_credentials
 
 
 def google_call_credentials(credentials):
@@ -125,53 +62,9 @@
   """
   return metadata_call_credentials(_auth.GoogleCallCredentials(credentials))
 
-
-def access_token_call_credentials(access_token):
-  """Construct CallCredentials from an access token.
-
-  Args:
-    access_token: A string to place directly in the http request
-      authorization header, ie "Authorization: Bearer <access_token>".
-
-  Returns:
-    A CallCredentials object for use in a GRPCCallOptions object.
-  """
-  return metadata_call_credentials(
-      _auth.AccessTokenCallCredentials(access_token))
-
-
-def composite_call_credentials(call_credentials, additional_call_credentials):
-  """Compose two CallCredentials to make a new one.
-
-  Args:
-    call_credentials: A CallCredentials object.
-    additional_call_credentials: Another CallCredentials object to compose on
-      top of call_credentials.
-
-  Returns:
-    A CallCredentials object for use in a GRPCCallOptions object.
-  """
-  return CallCredentials(
-      _low.call_credentials_composite(
-          call_credentials._low_credentials,
-          additional_call_credentials._low_credentials))
-
-def composite_channel_credentials(channel_credentials,
-                                 additional_call_credentials):
-  """Compose ChannelCredentials on top of client credentials to make a new one.
-
-  Args:
-    channel_credentials: A ChannelCredentials object.
-    additional_call_credentials: A CallCredentials object to compose on
-      top of channel_credentials.
-
-  Returns:
-    A ChannelCredentials object for use in a GRPCCallOptions object.
-  """
-  return ChannelCredentials(
-      _low.channel_credentials_composite(
-          channel_credentials._low_credentials,
-          additional_call_credentials._low_credentials))
+access_token_call_credentials = grpc.access_token_call_credentials
+composite_call_credentials = grpc.composite_call_credentials
+composite_channel_credentials = grpc.composite_channel_credentials
 
 
 class Channel(object):
@@ -182,11 +75,8 @@
   unsupported.
   """
 
-  def __init__(self, low_channel, intermediary_low_channel):
-    self._low_channel = low_channel
-    self._intermediary_low_channel = intermediary_low_channel
-    self._connectivity_channel = _connectivity_channel.ConnectivityChannel(
-        low_channel)
+  def __init__(self, channel):
+    self._channel = channel
 
   def subscribe(self, callback, try_to_connect=None):
     """Subscribes to this Channel's connectivity.
@@ -201,7 +91,7 @@
         attempt to connect if it is not already connected and ready to conduct
         RPCs.
     """
-    self._connectivity_channel.subscribe(callback, try_to_connect)
+    self._channel.subscribe(callback, try_to_connect=try_to_connect)
 
   def unsubscribe(self, callback):
     """Unsubscribes a callback from this Channel's connectivity.
@@ -210,7 +100,7 @@
       callback: A callable previously registered with this Channel from having
         been passed to its "subscribe" method.
     """
-    self._connectivity_channel.unsubscribe(callback)
+    self._channel.unsubscribe(callback)
 
 
 def insecure_channel(host, port):
@@ -224,9 +114,9 @@
   Returns:
     A Channel to the remote host through which RPCs may be conducted.
   """
-  intermediary_low_channel = _intermediary_low.Channel(
-      '%s:%d' % (host, port) if port else host, None)
-  return Channel(intermediary_low_channel._internal, intermediary_low_channel)  # pylint: disable=protected-access
+  channel = grpc.insecure_channel(
+      host if port is None else '%s:%d' % (host, port))
+  return Channel(channel)
 
 
 def secure_channel(host, port, channel_credentials):
@@ -241,10 +131,9 @@
   Returns:
     A secure Channel to the remote host through which RPCs may be conducted.
   """
-  intermediary_low_channel = _intermediary_low.Channel(
-      '%s:%d' % (host, port) if port else host,
-      channel_credentials._low_credentials)
-  return Channel(intermediary_low_channel._internal, intermediary_low_channel)  # pylint: disable=protected-access
+  channel = grpc.secure_channel(
+      host if port is None else '%s:%d' % (host, port), channel_credentials)
+  return Channel(channel)
 
 
 class StubOptions(object):
@@ -308,12 +197,11 @@
     A face.GenericStub on which RPCs can be made.
   """
   effective_options = _EMPTY_STUB_OPTIONS if options is None else options
-  return _stub.generic_stub(
-      channel._intermediary_low_channel, effective_options.host,  # pylint: disable=protected-access
-      effective_options.metadata_transformer,
+  return _client_adaptations.generic_stub(
+      channel._channel,  # pylint: disable=protected-access
+      effective_options.host, effective_options.metadata_transformer,
       effective_options.request_serializers,
-      effective_options.response_deserializers, effective_options.thread_pool,
-      effective_options.thread_pool_size)
+      effective_options.response_deserializers)
 
 
 def dynamic_stub(channel, service, cardinalities, options=None):
@@ -331,55 +219,16 @@
     A face.DynamicStub with which RPCs can be invoked.
   """
   effective_options = StubOptions() if options is None else options
-  return _stub.dynamic_stub(
-      channel._intermediary_low_channel, effective_options.host, service,  # pylint: disable=protected-access
-      cardinalities, effective_options.metadata_transformer,
+  return _client_adaptations.dynamic_stub(
+      channel._channel,  # pylint: disable=protected-access
+      service, cardinalities, effective_options.host,
+      effective_options.metadata_transformer,
       effective_options.request_serializers,
-      effective_options.response_deserializers, effective_options.thread_pool,
-      effective_options.thread_pool_size)
+      effective_options.response_deserializers)
 
 
-class ServerCredentials(object):
-  """A value encapsulating the data required to open a secure port on a Server.
-
-  This class and its instances have no supported interface - it exists to define
-  the type of its instances and its instances exist to be passed to other
-  functions.
-  """
-
-  def __init__(self, low_credentials):
-    self._low_credentials = low_credentials
-
-
-def ssl_server_credentials(
-    private_key_certificate_chain_pairs, root_certificates=None,
-    require_client_auth=False):
-  """Creates a ServerCredentials for use with an SSL-enabled Server.
-
-  Args:
-    private_key_certificate_chain_pairs: A nonempty sequence each element of
-      which is a pair the first element of which is a PEM-encoded private key
-      and the second element of which is the corresponding PEM-encoded
-      certificate chain.
-    root_certificates: PEM-encoded client root certificates to be used for
-      verifying authenticated clients. If omitted, require_client_auth must also
-      be omitted or be False.
-    require_client_auth: A boolean indicating whether or not to require clients
-      to be authenticated. May only be True if root_certificates is not None.
-
-  Returns:
-    A ServerCredentials for use with an SSL-enabled Server.
-  """
-  if len(private_key_certificate_chain_pairs) == 0:
-    raise ValueError(
-        'At least one private key-certificate chain pairis required!')
-  elif require_client_auth and root_certificates is None:
-    raise ValueError(
-        'Illegal to require client auth without providing root certificates!')
-  else:
-    return ServerCredentials(_low.server_credentials_ssl(
-        root_certificates, private_key_certificate_chain_pairs,
-        require_client_auth))
+ServerCredentials = grpc.ServerCredentials
+ssl_server_credentials = grpc.ssl_server_credentials
 
 
 class ServerOptions(object):
@@ -452,9 +301,8 @@
     An interfaces.Server with which RPCs can be serviced.
   """
   effective_options = _EMPTY_SERVER_OPTIONS if options is None else options
-  return _server.server(
+  return _server_adaptations.server(
       service_implementations, effective_options.multi_method_implementation,
       effective_options.request_deserializers,
       effective_options.response_serializers, effective_options.thread_pool,
-      effective_options.thread_pool_size, effective_options.default_timeout,
-      effective_options.maximum_timeout)
+      effective_options.thread_pool_size)
diff --git a/src/python/grpcio/grpc/beta/interfaces.py b/src/python/grpcio/grpc/beta/interfaces.py
index 24de9ad..4343b6c 100644
--- a/src/python/grpcio/grpc/beta/interfaces.py
+++ b/src/python/grpcio/grpc/beta/interfaces.py
@@ -30,53 +30,13 @@
 """Constants and interfaces of the Beta API of gRPC Python."""
 
 import abc
-import enum
 
 import six
 
-from grpc._adapter import _types
+import grpc
 
-
-@enum.unique
-class ChannelConnectivity(enum.Enum):
-  """Mirrors grpc_connectivity_state in the gRPC Core.
-
-  Attributes:
-    IDLE: The channel is idle.
-    CONNECTING: The channel is connecting.
-    READY: The channel is ready to conduct RPCs.
-    TRANSIENT_FAILURE: The channel has seen a failure from which it expects to
-      recover.
-    FATAL_FAILURE: The channel has seen a failure from which it cannot recover.
-  """
-  IDLE = (_types.ConnectivityState.IDLE, 'idle',)
-  CONNECTING = (_types.ConnectivityState.CONNECTING, 'connecting',)
-  READY = (_types.ConnectivityState.READY, 'ready',)
-  TRANSIENT_FAILURE = (
-      _types.ConnectivityState.TRANSIENT_FAILURE, 'transient failure',)
-  FATAL_FAILURE = (_types.ConnectivityState.FATAL_FAILURE, 'fatal failure',)
-
-
-@enum.unique
-class StatusCode(enum.Enum):
-  """Mirrors grpc_status_code in the C core."""
-  OK                  = 0
-  CANCELLED           = 1
-  UNKNOWN             = 2
-  INVALID_ARGUMENT    = 3
-  DEADLINE_EXCEEDED   = 4
-  NOT_FOUND           = 5
-  ALREADY_EXISTS      = 6
-  PERMISSION_DENIED   = 7
-  RESOURCE_EXHAUSTED  = 8
-  FAILED_PRECONDITION = 9
-  ABORTED             = 10
-  OUT_OF_RANGE        = 11
-  UNIMPLEMENTED       = 12
-  INTERNAL            = 13
-  UNAVAILABLE         = 14
-  DATA_LOSS           = 15
-  UNAUTHENTICATED     = 16
+ChannelConnectivity = grpc.ChannelConnectivity
+StatusCode = grpc.StatusCode
 
 
 class GRPCCallOptions(object):
@@ -106,46 +66,9 @@
   """
   return GRPCCallOptions(disable_compression, None, credentials)
 
-
-class GRPCAuthMetadataContext(six.with_metaclass(abc.ABCMeta)):
-  """Provides information to call credentials metadata plugins.
-
-  Attributes:
-    service_url: A string URL of the service being called into.
-    method_name: A string of the fully qualified method name being called.
-  """
-
-
-class GRPCAuthMetadataPluginCallback(six.with_metaclass(abc.ABCMeta)):
-  """Callback object received by a metadata plugin."""
-
-  def __call__(self, metadata, error):
-    """Inform the gRPC runtime of the metadata to construct a CallCredentials.
-
-    Args:
-      metadata: An iterable of 2-sequences (e.g. tuples) of metadata key/value
-        pairs.
-      error: An Exception to indicate error or None to indicate success.
-    """
-    raise NotImplementedError()
-
-
-class GRPCAuthMetadataPlugin(six.with_metaclass(abc.ABCMeta)):
-  """
-  """
-
-  def __call__(self, context, callback):
-    """Invoke the plugin.
-
-    Must not block. Need only be called by the gRPC runtime.
-
-    Args:
-      context: A GRPCAuthMetadataContext providing information on what the
-        plugin is being used for.
-      callback: A GRPCAuthMetadataPluginCallback to be invoked either
-        synchronously or asynchronously.
-    """
-    raise NotImplementedError()
+GRPCAuthMetadataContext = grpc.AuthMetadataContext
+GRPCAuthMetadataPluginCallback = grpc.AuthMetadataPluginCallback
+GRPCAuthMetadataPlugin = grpc.AuthMetadataPlugin
 
 
 class GRPCServicerContext(six.with_metaclass(abc.ABCMeta)):
diff --git a/src/python/grpcio/tests/unit/beta/_auth_test.py b/src/python/grpcio/tests/unit/_auth_test.py
similarity index 98%
rename from src/python/grpcio/tests/unit/beta/_auth_test.py
rename to src/python/grpcio/tests/unit/_auth_test.py
index 694928a..c31f7b0 100644
--- a/src/python/grpcio/tests/unit/beta/_auth_test.py
+++ b/src/python/grpcio/tests/unit/_auth_test.py
@@ -33,7 +33,7 @@
 import threading
 import unittest
 
-from grpc.beta import _auth
+from grpc import _auth
 
 
 class MockGoogleCreds(object):
diff --git a/src/python/grpcio/tests/unit/beta/test_utilities.py b/src/python/grpcio/tests/unit/beta/test_utilities.py
index 0313e06..66b5f72 100644
--- a/src/python/grpcio/tests/unit/beta/test_utilities.py
+++ b/src/python/grpcio/tests/unit/beta/test_utilities.py
@@ -29,7 +29,7 @@
 
 """Test-appropriate entry points into the gRPC Python Beta API."""
 
-from grpc._adapter import _intermediary_low
+import grpc
 from grpc.beta import implementations
 
 
@@ -48,9 +48,8 @@
     An implementations.Channel to the remote host through which RPCs may be
       conducted.
   """
-  hostport = '%s:%d' % (host, port)
-  intermediary_low_channel = _intermediary_low.Channel(
-      hostport, channel_credentials._low_credentials,
-      server_host_override=server_host_override)
-  return implementations.Channel(
-      intermediary_low_channel._internal, intermediary_low_channel)
+  target = '%s:%d' % (host, port)
+  channel = grpc.secure_channel(
+      target, ((b'grpc.ssl_target_name_override', server_host_override,),),
+      channel_credentials._credentials)
+  return implementations.Channel(channel)