The RPC Framework core package.

This is the second generation of the old base package (framework.base)
and implements the translation between the new links and base
interfaces.
diff --git a/src/python/grpcio/grpc/framework/core/__init__.py b/src/python/grpcio/grpc/framework/core/__init__.py
new file mode 100644
index 0000000..7086519
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/__init__.py
@@ -0,0 +1,30 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
diff --git a/src/python/grpcio/grpc/framework/core/_constants.py b/src/python/grpcio/grpc/framework/core/_constants.py
new file mode 100644
index 0000000..d3be3a4
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_constants.py
@@ -0,0 +1,59 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Private constants for the package."""
+
+from grpc.framework.interfaces.base import base
+from grpc.framework.interfaces.links import links
+
+TICKET_SUBSCRIPTION_FOR_BASE_SUBSCRIPTION_KIND = {
+    base.Subscription.Kind.NONE: links.Ticket.Subscription.NONE,
+    base.Subscription.Kind.TERMINATION_ONLY:
+        links.Ticket.Subscription.TERMINATION,
+    base.Subscription.Kind.FULL: links.Ticket.Subscription.FULL,
+    }
+
+# Mapping from abortive operation outcome to ticket termination to be
+# sent to the other side of the operation, or None to indicate that no
+# ticket should be sent to the other side in the event of such an
+# outcome.
+ABORTION_OUTCOME_TO_TICKET_TERMINATION = {
+    base.Outcome.CANCELLED: links.Ticket.Termination.CANCELLATION,
+    base.Outcome.EXPIRED: links.Ticket.Termination.EXPIRATION,
+    base.Outcome.LOCAL_SHUTDOWN: links.Ticket.Termination.SHUTDOWN,
+    base.Outcome.REMOTE_SHUTDOWN: None,
+    base.Outcome.RECEPTION_FAILURE: links.Ticket.Termination.RECEPTION_FAILURE,
+    base.Outcome.TRANSMISSION_FAILURE: None,
+    base.Outcome.LOCAL_FAILURE: links.Ticket.Termination.LOCAL_FAILURE,
+    base.Outcome.REMOTE_FAILURE: links.Ticket.Termination.REMOTE_FAILURE,
+}
+
+INTERNAL_ERROR_LOG_MESSAGE = ':-( RPC Framework (Core) internal error! )-:'
+TERMINATION_CALLBACK_EXCEPTION_LOG_MESSAGE = (
+    'Exception calling termination callback!')
diff --git a/src/python/grpcio/grpc/framework/core/_context.py b/src/python/grpcio/grpc/framework/core/_context.py
new file mode 100644
index 0000000..24a12b6
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_context.py
@@ -0,0 +1,92 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""State and behavior for operation context."""
+
+import time
+
+# _interfaces is referenced from specification in this module.
+from grpc.framework.core import _interfaces  # pylint: disable=unused-import
+from grpc.framework.interfaces.base import base
+
+
+class OperationContext(base.OperationContext):
+  """An implementation of interfaces.OperationContext."""
+
+  def __init__(
+      self, lock, termination_manager, transmission_manager,
+      expiration_manager):
+    """Constructor.
+
+    Args:
+      lock: The operation-wide lock.
+      termination_manager: The _interfaces.TerminationManager for the operation.
+      transmission_manager: The _interfaces.TransmissionManager for the
+        operation.
+      expiration_manager: The _interfaces.ExpirationManager for the operation.
+    """
+    self._lock = lock
+    self._termination_manager = termination_manager
+    self._transmission_manager = transmission_manager
+    self._expiration_manager = expiration_manager
+
+  def _abort(self, outcome):
+    with self._lock:
+      if self._termination_manager.outcome is None:
+        self._termination_manager.abort(outcome)
+        self._transmission_manager.abort(outcome)
+        self._expiration_manager.terminate()
+
+  def outcome(self):
+    """See base.OperationContext.outcome for specification."""
+    with self._lock:
+      return self._termination_manager.outcome
+
+  def add_termination_callback(self, callback):
+    """See base.OperationContext.add_termination_callback."""
+    with self._lock:
+      if self._termination_manager.outcome is None:
+        self._termination_manager.add_callback(callback)
+        return None
+      else:
+        return self._termination_manager.outcome
+
+  def time_remaining(self):
+    """See base.OperationContext.time_remaining for specification."""
+    with self._lock:
+      deadline = self._expiration_manager.deadline()
+    return max(0.0, deadline - time.time())
+
+  def cancel(self):
+    """See base.OperationContext.cancel for specification."""
+    self._abort(base.Outcome.CANCELLED)
+
+  def fail(self, exception):
+    """See base.OperationContext.fail for specification."""
+    self._abort(base.Outcome.LOCAL_FAILURE)
diff --git a/src/python/grpcio/grpc/framework/core/_emission.py b/src/python/grpcio/grpc/framework/core/_emission.py
new file mode 100644
index 0000000..7c702ab
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_emission.py
@@ -0,0 +1,97 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""State and behavior for handling emitted values."""
+
+from grpc.framework.core import _interfaces
+from grpc.framework.interfaces.base import base
+
+
+class EmissionManager(_interfaces.EmissionManager):
+  """An EmissionManager implementation."""
+
+  def __init__(
+      self, lock, termination_manager, transmission_manager,
+      expiration_manager):
+    """Constructor.
+
+    Args:
+      lock: The operation-wide lock.
+      termination_manager: The _interfaces.TerminationManager for the operation.
+      transmission_manager: The _interfaces.TransmissionManager for the
+        operation.
+      expiration_manager: The _interfaces.ExpirationManager for the operation.
+    """
+    self._lock = lock
+    self._termination_manager = termination_manager
+    self._transmission_manager = transmission_manager
+    self._expiration_manager = expiration_manager
+    self._ingestion_manager = None
+
+    self._initial_metadata_seen = False
+    self._payload_seen = False
+    self._completion_seen = False
+
+  def set_ingestion_manager(self, ingestion_manager):
+    """Sets the ingestion manager with which this manager will cooperate.
+
+    Args:
+      ingestion_manager: The _interfaces.IngestionManager for the operation.
+    """
+    self._ingestion_manager = ingestion_manager
+
+  def advance(
+      self, initial_metadata=None, payload=None, completion=None,
+      allowance=None):
+    initial_metadata_present = initial_metadata is not None
+    payload_present = payload is not None
+    completion_present = completion is not None
+    allowance_present = allowance is not None
+    with self._lock:
+      if self._termination_manager.outcome is None:
+        if (initial_metadata_present and (
+                self._initial_metadata_seen or self._payload_seen or
+                self._completion_seen) or
+            payload_present and self._completion_seen or
+            completion_present and self._completion_seen or
+            allowance_present and allowance <= 0):
+          self._termination_manager.abort(base.Outcome.LOCAL_FAILURE)
+          self._transmission_manager.abort(base.Outcome.LOCAL_FAILURE)
+          self._expiration_manager.terminate()
+        else:
+          self._initial_metadata_seen |= initial_metadata_present
+          self._payload_seen |= payload_present
+          self._completion_seen |= completion_present
+          if completion_present:
+            self._termination_manager.emission_complete()
+            self._ingestion_manager.local_emissions_done()
+          self._transmission_manager.advance(
+              initial_metadata, payload, completion, allowance)
+          if allowance_present:
+            self._ingestion_manager.add_local_allowance(allowance)
diff --git a/src/python/grpcio/grpc/framework/core/_end.py b/src/python/grpcio/grpc/framework/core/_end.py
new file mode 100644
index 0000000..fb2c532
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_end.py
@@ -0,0 +1,251 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Implementation of base.End."""
+
+import abc
+import enum
+import threading
+import uuid
+
+from grpc.framework.core import _operation
+from grpc.framework.core import _utilities
+from grpc.framework.foundation import callable_util
+from grpc.framework.foundation import later
+from grpc.framework.foundation import logging_pool
+from grpc.framework.interfaces.base import base
+from grpc.framework.interfaces.links import links
+from grpc.framework.interfaces.links import utilities
+
+_IDLE_ACTION_EXCEPTION_LOG_MESSAGE = 'Exception calling idle action!'
+
+
+class End(base.End, links.Link):
+  """A bridge between base.End and links.Link.
+
+  Implementations of this interface translate arriving tickets into
+  calls on application objects implementing base interfaces and
+  translate calls from application objects implementing base interfaces
+  into tickets sent to a joined link.
+  """
+  __metaclass__ = abc.ABCMeta
+
+
+class _Cycle(object):
+  """State for a single start-stop End lifecycle."""
+
+  def __init__(self, pool):
+    self.pool = pool
+    self.grace = False
+    self.futures = []
+    self.operations = {}
+    self.idle_actions = []
+
+
+def _abort(operations):
+  for operation in operations:
+    operation.abort(base.Outcome.LOCAL_SHUTDOWN)
+
+
+def _cancel_futures(futures):
+  for future in futures:
+    futures.cancel()
+
+
+def _future_shutdown(lock, cycle, event):
+  def in_future():
+    with lock:
+      _abort(cycle.operations.values())
+      _cancel_futures(cycle.futures)
+      pool = cycle.pool
+    cycle.pool.shutdown(wait=True)
+  return in_future
+
+
+def _termination_action(lock, stats, operation_id, cycle):
+  """Constructs the termination action for a single operation.
+
+  Args:
+    lock: A lock to hold during the termination action.
+    states: A mapping from base.Outcome values to integers to increment with
+      the outcome given to the termination action.
+    operation_id: The operation ID for the termination action.
+    cycle: A _Cycle value to be updated during the termination action.
+
+  Returns:
+    A callable that takes an operation outcome as its sole parameter and that
+      should be used as the termination action for the operation associated
+      with the given operation ID.
+  """
+  def termination_action(outcome):
+    with lock:
+      stats[outcome] += 1
+      cycle.operations.pop(operation_id, None)
+      if not cycle.operations:
+        for action in cycle.idle_actions:
+          cycle.pool.submit(action)
+        cycle.idle_actions = []
+        if cycle.grace:
+          _cancel_futures(cycle.futures)
+  return termination_action
+
+
+class _End(End):
+  """An End implementation."""
+
+  def __init__(self, servicer_package):
+    """Constructor.
+
+    Args:
+      servicer_package: A _ServicerPackage for servicing operations or None if
+        this end will not be used to service operations.
+    """
+    self._lock = threading.Condition()
+    self._servicer_package = servicer_package
+
+    self._stats = {outcome: 0 for outcome in base.Outcome}
+
+    self._mate = None
+
+    self._cycle = None
+
+  def start(self):
+    """See base.End.start for specification."""
+    with self._lock:
+      if self._cycle is not None:
+        raise ValueError('Tried to start a not-stopped End!')
+      else:
+        self._cycle = _Cycle(logging_pool.pool(1))
+
+  def stop(self, grace):
+    """See base.End.stop for specification."""
+    with self._lock:
+      if self._cycle is None:
+        event = threading.Event()
+        event.set()
+        return event
+      elif not self._cycle.operations:
+        event = threading.Event()
+        self._cycle.pool.submit(event.set)
+        self._cycle.pool.shutdown(wait=False)
+        self._cycle = None
+        return event
+      else:
+        self._cycle.grace = True
+        event = threading.Event()
+        self._cycle.idle_actions.append(event.set)
+        if 0 < grace:
+          future = later.later(
+              grace, _future_shutdown(self._lock, self._cycle, event))
+          self._cycle.futures.append(future)
+        else:
+          _abort(self._cycle.operations.values())
+        return event
+
+  def operate(
+      self, group, method, subscription, timeout, initial_metadata=None,
+      payload=None, completion=None):
+    """See base.End.operate for specification."""
+    operation_id = uuid.uuid4()
+    with self._lock:
+      if self._cycle is None or self._cycle.grace:
+        raise ValueError('Can\'t operate on stopped or stopping End!')
+      termination_action = _termination_action(
+          self._lock, self._stats, operation_id, self._cycle)
+      operation = _operation.invocation_operate(
+          operation_id, group, method, subscription, timeout, initial_metadata,
+          payload, completion, self._mate.accept_ticket, termination_action,
+          self._cycle.pool)
+      self._cycle.operations[operation_id] = operation
+      return operation.context, operation.operator
+
+  def operation_stats(self):
+    """See base.End.operation_stats for specification."""
+    with self._lock:
+      return dict(self._stats)
+
+  def add_idle_action(self, action):
+    """See base.End.add_idle_action for specification."""
+    with self._lock:
+      if self._cycle is None:
+        raise ValueError('Can\'t add idle action to stopped End!')
+      action_with_exceptions_logged = callable_util.with_exceptions_logged(
+          action, _IDLE_ACTION_EXCEPTION_LOG_MESSAGE)
+      if self._cycle.operations:
+        self._cycle.idle_actions.append(action_with_exceptions_logged)
+      else:
+        self._cycle.pool.submit(action_with_exceptions_logged)
+
+  def accept_ticket(self, ticket):
+    """See links.Link.accept_ticket for specification."""
+    with self._lock:
+      if self._cycle is not None and not self._cycle.grace:
+        operation = self._cycle.operations.get(ticket.operation_id)
+        if operation is not None:
+          operation.handle_ticket(ticket)
+        elif self._servicer_package is not None:
+          termination_action = _termination_action(
+              self._lock, self._stats, ticket.operation_id, self._cycle)
+          operation = _operation.service_operate(
+              self._servicer_package, ticket, self._mate.accept_ticket,
+              termination_action, self._cycle.pool)
+          if operation is not None:
+            self._cycle.operations[ticket.operation_id] = operation
+
+  def join_link(self, link):
+    """See links.Link.join_link for specification."""
+    with self._lock:
+      self._mate = utilities.NULL_LINK if link is None else link
+
+
+def serviceless_end_link():
+  """Constructs an End usable only for invoking operations.
+
+  Returns:
+    An End usable for translating operations into ticket exchange.
+  """
+  return _End(None)
+
+
+def serviceful_end_link(servicer, default_timeout, maximum_timeout):
+  """Constructs an End capable of servicing operations.
+
+  Args:
+    servicer: An interfaces.Servicer for servicing operations.
+    default_timeout: A length of time in seconds to be used as the default
+      time alloted for a single operation.
+    maximum_timeout: A length of time in seconds to be used as the maximum
+      time alloted for a single operation.
+
+  Returns:
+    An End capable of servicing the operations requested of it through ticket
+      exchange.
+  """
+  return _End(
+      _utilities.ServicerPackage(servicer, default_timeout, maximum_timeout))
diff --git a/src/python/grpcio/grpc/framework/core/_expiration.py b/src/python/grpcio/grpc/framework/core/_expiration.py
new file mode 100644
index 0000000..d94bdf2
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_expiration.py
@@ -0,0 +1,152 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""State and behavior for operation expiration."""
+
+import time
+
+from grpc.framework.core import _interfaces
+from grpc.framework.foundation import later
+from grpc.framework.interfaces.base import base
+
+
+class _ExpirationManager(_interfaces.ExpirationManager):
+  """An implementation of _interfaces.ExpirationManager."""
+
+  def __init__(
+      self, commencement, timeout, maximum_timeout, lock, termination_manager,
+      transmission_manager):
+    """Constructor.
+
+    Args:
+      commencement: The time in seconds since the epoch at which the operation
+        began.
+      timeout: A length of time in seconds to allow for the operation to run.
+      maximum_timeout: The maximum length of time in seconds to allow for the
+        operation to run despite what is requested via this object's
+        change_timout method.
+      lock: The operation-wide lock.
+      termination_manager: The _interfaces.TerminationManager for the operation.
+      transmission_manager: The _interfaces.TransmissionManager for the
+        operation.
+    """
+    self._lock = lock
+    self._termination_manager = termination_manager
+    self._transmission_manager = transmission_manager
+    self._commencement = commencement
+    self._maximum_timeout = maximum_timeout
+
+    self._timeout = timeout
+    self._deadline = commencement + timeout
+    self._index = None
+    self._future = None
+
+  def _expire(self, index):
+    def expire():
+      with self._lock:
+        if self._future is not None and index == self._index:
+          self._future = None
+          self._termination_manager.expire()
+          self._transmission_manager.abort(base.Outcome.EXPIRED)
+    return expire
+
+  def start(self):
+    self._index = 0
+    self._future = later.later(self._timeout, self._expire(0))
+
+  def change_timeout(self, timeout):
+    if self._future is not None and timeout != self._timeout:
+      self._future.cancel()
+      new_timeout = min(timeout, self._maximum_timeout)
+      new_index = self._index + 1
+      self._timeout = new_timeout
+      self._deadline = self._commencement + new_timeout
+      self._index = new_index
+      delay = self._deadline - time.time()
+      self._future = later.later(delay, self._expire(new_index))
+      if new_timeout != timeout:
+        self._transmission_manager.timeout(new_timeout)
+
+  def deadline(self):
+    return self._deadline
+
+  def terminate(self):
+    if self._future:
+      self._future.cancel()
+      self._future = None
+    self._deadline_index = None
+
+
+def invocation_expiration_manager(
+    timeout, lock, termination_manager, transmission_manager):
+  """Creates an _interfaces.ExpirationManager appropriate for front-side use.
+
+  Args:
+    timeout: A length of time in seconds to allow for the operation to run.
+    lock: The operation-wide lock.
+    termination_manager: The _interfaces.TerminationManager for the operation.
+    transmission_manager: The _interfaces.TransmissionManager for the
+      operation.
+
+  Returns:
+    An _interfaces.ExpirationManager appropriate for invocation-side use.
+  """
+  expiration_manager = _ExpirationManager(
+      time.time(), timeout, timeout, lock, termination_manager,
+      transmission_manager)
+  expiration_manager.start()
+  return expiration_manager
+
+
+def service_expiration_manager(
+    timeout, default_timeout, maximum_timeout, lock, termination_manager,
+    transmission_manager):
+  """Creates an _interfaces.ExpirationManager appropriate for back-side use.
+
+  Args:
+    timeout: A length of time in seconds to allow for the operation to run. May
+      be None in which case default_timeout will be used.
+    default_timeout: The default length of time in seconds to allow for the
+      operation to run if the front-side customer has not specified such a value
+      (or if the value they specified is not yet known).
+    maximum_timeout: The maximum length of time in seconds to allow for the
+      operation to run.
+    lock: The operation-wide lock.
+    termination_manager: The _interfaces.TerminationManager for the operation.
+    transmission_manager: The _interfaces.TransmissionManager for the
+      operation.
+
+  Returns:
+    An _interfaces.ExpirationManager appropriate for service-side use.
+  """
+  expiration_manager = _ExpirationManager(
+      time.time(), default_timeout if timeout is None else timeout,
+      maximum_timeout, lock, termination_manager, transmission_manager)
+  expiration_manager.start()
+  return expiration_manager
diff --git a/src/python/grpcio/grpc/framework/core/_ingestion.py b/src/python/grpcio/grpc/framework/core/_ingestion.py
new file mode 100644
index 0000000..59f7f8a
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_ingestion.py
@@ -0,0 +1,410 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""State and behavior for ingestion during an operation."""
+
+import abc
+import collections
+
+from grpc.framework.core import _constants
+from grpc.framework.core import _interfaces
+from grpc.framework.foundation import abandonment
+from grpc.framework.foundation import callable_util
+from grpc.framework.interfaces.base import base
+
+_CREATE_SUBSCRIPTION_EXCEPTION_LOG_MESSAGE = 'Exception initializing ingestion!'
+_INGESTION_EXCEPTION_LOG_MESSAGE = 'Exception during ingestion!'
+
+
+class _SubscriptionCreation(collections.namedtuple(
+    '_SubscriptionCreation', ('subscription', 'remote_error', 'abandoned'))):
+  """A sum type for the outcome of ingestion initialization.
+
+  Either subscription will be non-None, remote_error will be True, or abandoned
+  will be True.
+
+  Attributes:
+    subscription: A base.Subscription describing the customer's interest in
+      operation values from the other side.
+    remote_error: A boolean indicating that the subscription could not be
+      created due to an error on the remote side of the operation.
+    abandoned: A boolean indicating that subscription creation was abandoned.
+  """
+
+
+class _SubscriptionCreator(object):
+  """Common specification of subscription-creating behavior."""
+  __metaclass__ = abc.ABCMeta
+
+  @abc.abstractmethod
+  def create(self, group, method):
+    """Creates the base.Subscription of the local customer.
+
+    Any exceptions raised by this method should be attributed to and treated as
+    defects in the customer code called by this method.
+
+    Args:
+      group: The group identifier of the operation.
+      method: The method identifier of the operation.
+
+    Returns:
+      A _SubscriptionCreation describing the result of subscription creation.
+    """
+    raise NotImplementedError()
+
+
+class _ServiceSubscriptionCreator(_SubscriptionCreator):
+  """A _SubscriptionCreator appropriate for service-side use."""
+
+  def __init__(self, servicer, operation_context, output_operator):
+    """Constructor.
+
+    Args:
+      servicer: The base.Servicer that will service the operation.
+      operation_context: A base.OperationContext for the operation to be passed
+        to the customer.
+      output_operator: A base.Operator for the operation to be passed to the
+        customer and to be called by the customer to accept operation data
+        emitted by the customer.
+    """
+    self._servicer = servicer
+    self._operation_context = operation_context
+    self._output_operator = output_operator
+
+  def create(self, group, method):
+    try:
+      subscription = self._servicer.service(
+          group, method, self._operation_context, self._output_operator)
+    except base.NoSuchMethodError:
+      return _SubscriptionCreation(None, True, False)
+    except abandonment.Abandoned:
+      return _SubscriptionCreation(None, False, True)
+    else:
+      return _SubscriptionCreation(subscription, False, False)
+
+
+def _wrap(behavior):
+  def wrapped(*args, **kwargs):
+    try:
+      behavior(*args, **kwargs)
+    except abandonment.Abandoned:
+      return False
+    else:
+      return True
+  return wrapped
+
+
+class _IngestionManager(_interfaces.IngestionManager):
+  """An implementation of _interfaces.IngestionManager."""
+
+  def __init__(
+      self, lock, pool, subscription, subscription_creator, termination_manager,
+      transmission_manager, expiration_manager):
+    """Constructor.
+
+    Args:
+      lock: The operation-wide lock.
+      pool: A thread pool in which to execute customer code.
+      subscription: A base.Subscription describing the customer's interest in
+        operation values from the other side. May be None if
+        subscription_creator is not None.
+      subscription_creator: A _SubscriptionCreator wrapping the portion of
+        customer code that when called returns the base.Subscription describing
+        the customer's interest in operation values from the other side. May be
+        None if subscription is not None.
+      termination_manager: The _interfaces.TerminationManager for the operation.
+      transmission_manager: The _interfaces.TransmissionManager for the
+        operation.
+      expiration_manager: The _interfaces.ExpirationManager for the operation.
+    """
+    self._lock = lock
+    self._pool = pool
+    self._termination_manager = termination_manager
+    self._transmission_manager = transmission_manager
+    self._expiration_manager = expiration_manager
+
+    if subscription is None:
+      self._subscription_creator = subscription_creator
+      self._wrapped_operator = None
+    elif subscription.kind is base.Subscription.Kind.FULL:
+      self._subscription_creator = None
+      self._wrapped_operator = _wrap(subscription.operator.advance)
+    else:
+      # TODO(nathaniel): Support other subscriptions.
+      raise ValueError('Unsupported subscription "%s"!' % subscription.kind)
+    self._pending_initial_metadata = None
+    self._pending_payloads = []
+    self._pending_completion = None
+    self._local_allowance = 1
+    # A nonnegative integer or None, with None indicating that the local
+    # customer is done emitting anyway so there's no need to bother it by
+    # informing it that the remote customer has granted it further permission to
+    # emit.
+    self._remote_allowance = 0
+    self._processing = False
+
+  def _abort_internal_only(self):
+    self._subscription_creator = None
+    self._wrapped_operator = None
+    self._pending_initial_metadata = None
+    self._pending_payloads = None
+    self._pending_completion = None
+
+  def _abort_and_notify(self, outcome):
+    self._abort_internal_only()
+    self._termination_manager.abort(outcome)
+    self._transmission_manager.abort(outcome)
+    self._expiration_manager.terminate()
+
+  def _operator_next(self):
+    """Computes the next step for full-subscription ingestion.
+
+    Returns:
+      An initial_metadata, payload, completion, allowance, continue quintet
+        indicating what operation values (if any) are available to pass into
+        customer code and whether or not there is anything immediately
+        actionable to call customer code to do.
+    """
+    if self._wrapped_operator is None:
+      return None, None, None, None, False
+    else:
+      initial_metadata, payload, completion, allowance, action = [None] * 5
+      if self._pending_initial_metadata is not None:
+        initial_metadata = self._pending_initial_metadata
+        self._pending_initial_metadata = None
+        action = True
+      if self._pending_payloads and 0 < self._local_allowance:
+        payload = self._pending_payloads.pop(0)
+        self._local_allowance -= 1
+        action = True
+      if not self._pending_payloads and self._pending_completion is not None:
+        completion = self._pending_completion
+        self._pending_completion = None
+        action = True
+      if self._remote_allowance is not None and 0 < self._remote_allowance:
+        allowance = self._remote_allowance
+        self._remote_allowance = 0
+        action = True
+      return initial_metadata, payload, completion, allowance, bool(action)
+
+  def _operator_process(
+      self, wrapped_operator, initial_metadata, payload,
+      completion, allowance):
+    while True:
+      advance_outcome = callable_util.call_logging_exceptions(
+          wrapped_operator, _INGESTION_EXCEPTION_LOG_MESSAGE,
+          initial_metadata=initial_metadata, payload=payload,
+          completion=completion, allowance=allowance)
+      if advance_outcome.exception is None:
+        if advance_outcome.return_value:
+          with self._lock:
+            if self._termination_manager.outcome is not None:
+              return
+            if completion is not None:
+              self._termination_manager.ingestion_complete()
+            initial_metadata, payload, completion, allowance, moar = (
+                self._operator_next())
+            if not moar:
+              self._processing = False
+              return
+        else:
+          with self._lock:
+            if self._termination_manager.outcome is None:
+              self._abort_and_notify(base.Outcome.LOCAL_FAILURE)
+            return
+      else:
+        with self._lock:
+          if self._termination_manager.outcome is None:
+            self._abort_and_notify(base.Outcome.LOCAL_FAILURE)
+          return
+
+  def _operator_post_create(self, subscription):
+    wrapped_operator = _wrap(subscription.operator.advance)
+    with self._lock:
+      if self._termination_manager.outcome is not None:
+        return
+      self._wrapped_operator = wrapped_operator
+      self._subscription_creator = None
+      metadata, payload, completion, allowance, moar = self._operator_next()
+      if not moar:
+        self._processing = False
+        return
+    self._operator_process(
+        wrapped_operator, metadata, payload, completion, allowance)
+
+  def _create(self, subscription_creator, group, name):
+    outcome = callable_util.call_logging_exceptions(
+        subscription_creator.create, _CREATE_SUBSCRIPTION_EXCEPTION_LOG_MESSAGE,
+        group, name)
+    if outcome.return_value is None:
+      with self._lock:
+        if self._termination_manager.outcome is None:
+          self._abort_and_notify(base.Outcome.LOCAL_FAILURE)
+    elif outcome.return_value.abandoned:
+      with self._lock:
+        if self._termination_manager.outcome is None:
+          self._abort_and_notify(base.Outcome.LOCAL_FAILURE)
+    elif outcome.return_value.remote_error:
+      with self._lock:
+        if self._termination_manager.outcome is None:
+          self._abort_and_notify(base.Outcome.REMOTE_FAILURE)
+    elif outcome.return_value.subscription.kind is base.Subscription.Kind.FULL:
+      self._operator_post_create(outcome.return_value.subscription)
+    else:
+      # TODO(nathaniel): Support other subscriptions.
+      raise ValueError(
+          'Unsupported "%s"!' % outcome.return_value.subscription.kind)
+
+  def _store_advance(self, initial_metadata, payload, completion, allowance):
+    if initial_metadata is not None:
+      self._pending_initial_metadata = initial_metadata
+    if payload is not None:
+      self._pending_payloads.append(payload)
+    if completion is not None:
+      self._pending_completion = completion
+    if allowance is not None and self._remote_allowance is not None:
+      self._remote_allowance += allowance
+
+  def _operator_advance(self, initial_metadata, payload, completion, allowance):
+    if self._processing:
+      self._store_advance(initial_metadata, payload, completion, allowance)
+    else:
+      action = False
+      if initial_metadata is not None:
+        action = True
+      if payload is not None:
+        if 0 < self._local_allowance:
+          self._local_allowance -= 1
+          action = True
+        else:
+          self._pending_payloads.append(payload)
+          payload = False
+      if completion is not None:
+        if self._pending_payloads:
+          self._pending_completion = completion
+        else:
+          action = True
+      if allowance is not None and self._remote_allowance is not None:
+        allowance += self._remote_allowance
+        self._remote_allowance = 0
+        action = True
+      if action:
+        self._pool.submit(
+            callable_util.with_exceptions_logged(
+                self._operator_process, _constants.INTERNAL_ERROR_LOG_MESSAGE),
+            self._wrapped_operator, initial_metadata, payload, completion,
+            allowance)
+
+  def set_group_and_method(self, group, method):
+    """See _interfaces.IngestionManager.set_group_and_method for spec."""
+    if self._subscription_creator is not None and not self._processing:
+      self._pool.submit(
+          callable_util.with_exceptions_logged(
+              self._create, _constants.INTERNAL_ERROR_LOG_MESSAGE),
+          self._subscription_creator, group, method)
+      self._processing = True
+
+  def add_local_allowance(self, allowance):
+    """See _interfaces.IngestionManager.add_local_allowance for spec."""
+    if any((self._subscription_creator, self._wrapped_operator,)):
+      self._local_allowance += allowance
+      if not self._processing:
+        initial_metadata, payload, completion, allowance, moar = (
+            self._operator_next())
+        if moar:
+          self._pool.submit(
+              callable_util.with_exceptions_logged(
+                  self._operator_process,
+                  _constants.INTERNAL_ERROR_LOG_MESSAGE),
+              initial_metadata, payload, completion, allowance)
+
+  def local_emissions_done(self):
+    self._remote_allowance = None
+
+  def advance(self, initial_metadata, payload, completion, allowance):
+    """See _interfaces.IngestionManager.advance for specification."""
+    if self._subscription_creator is not None:
+      self._store_advance(initial_metadata, payload, completion, allowance)
+    elif self._wrapped_operator is not None:
+      self._operator_advance(initial_metadata, payload, completion, allowance)
+
+
+def invocation_ingestion_manager(
+    subscription, lock, pool, termination_manager, transmission_manager,
+    expiration_manager):
+  """Creates an IngestionManager appropriate for invocation-side use.
+
+  Args:
+    subscription: A base.Subscription indicating the customer's interest in the
+      data and results from the service-side of the operation.
+    lock: The operation-wide lock.
+    pool: A thread pool in which to execute customer code.
+    termination_manager: The _interfaces.TerminationManager for the operation.
+    transmission_manager: The _interfaces.TransmissionManager for the
+      operation.
+    expiration_manager: The _interfaces.ExpirationManager for the operation.
+
+  Returns:
+    An IngestionManager appropriate for invocation-side use.
+  """
+  return _IngestionManager(
+      lock, pool, subscription, None, termination_manager, transmission_manager,
+      expiration_manager)
+
+
+def service_ingestion_manager(
+    servicer, operation_context, output_operator, lock, pool,
+    termination_manager, transmission_manager, expiration_manager):
+  """Creates an IngestionManager appropriate for service-side use.
+
+  The returned IngestionManager will require its set_group_and_name method to be
+  called before its advance method may be called.
+
+  Args:
+    servicer: A base.Servicer for servicing the operation.
+    operation_context: A base.OperationContext for the operation to be passed to
+      the customer.
+    output_operator: A base.Operator for the operation to be passed to the
+      customer and to be called by the customer to accept operation data output
+      by the customer.
+    lock: The operation-wide lock.
+    pool: A thread pool in which to execute customer code.
+    termination_manager: The _interfaces.TerminationManager for the operation.
+    transmission_manager: The _interfaces.TransmissionManager for the
+      operation.
+    expiration_manager: The _interfaces.ExpirationManager for the operation.
+
+  Returns:
+    An IngestionManager appropriate for service-side use.
+  """
+  subscription_creator = _ServiceSubscriptionCreator(
+      servicer, operation_context, output_operator)
+  return _IngestionManager(
+      lock, pool, None, subscription_creator, termination_manager,
+      transmission_manager, expiration_manager)
diff --git a/src/python/grpcio/grpc/framework/core/_interfaces.py b/src/python/grpcio/grpc/framework/core/_interfaces.py
new file mode 100644
index 0000000..a626b9f
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_interfaces.py
@@ -0,0 +1,308 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Package-internal interfaces."""
+
+import abc
+
+from grpc.framework.interfaces.base import base
+
+
+class TerminationManager(object):
+  """An object responsible for handling the termination of an operation.
+
+  Attributes:
+    outcome: None if the operation is active or a base.Outcome value if it has
+      terminated.
+  """
+  __metaclass__ = abc.ABCMeta
+
+  @abc.abstractmethod
+  def add_callback(self, callback):
+    """Registers a callback to be called on operation termination.
+
+    If the operation has already terminated the callback will not be called.
+
+    Args:
+      callback: A callable that will be passed an interfaces.Outcome value.
+
+    Returns:
+      None if the operation has not yet terminated and the passed callback will
+        be called when it does, or a base.Outcome value describing the operation
+        termination if the operation has terminated and the callback will not be
+        called as a result of this method call.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def emission_complete(self):
+    """Indicates that emissions from customer code have completed."""
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def transmission_complete(self):
+    """Indicates that transmissions to the remote end are complete.
+
+    Returns:
+      True if the operation has terminated or False if the operation remains
+        ongoing.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def reception_complete(self):
+    """Indicates that reception from the other side is complete."""
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def ingestion_complete(self):
+    """Indicates that customer code ingestion of received values is complete."""
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def expire(self):
+    """Indicates that the operation must abort because it has taken too long."""
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def abort(self, outcome):
+    """Indicates that the operation must abort for the indicated reason.
+
+    Args:
+      outcome: An interfaces.Outcome indicating operation abortion.
+    """
+    raise NotImplementedError()
+
+
+class TransmissionManager(object):
+  """A manager responsible for transmitting to the other end of an operation."""
+  __metaclass__ = abc.ABCMeta
+
+  @abc.abstractmethod
+  def kick_off(
+      self, group, method, timeout, initial_metadata, payload, completion,
+      allowance):
+    """Transmits the values associated with operation invocation."""
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def advance(self, initial_metadata, payload, completion, allowance):
+    """Accepts values for transmission to the other end of the operation.
+
+    Args:
+      initial_metadata: An initial metadata value to be transmitted to the other
+        side of the operation. May only ever be non-None once.
+      payload: A payload value.
+      completion: A base.Completion value. May only ever be non-None in the last
+        transmission to be made to the other side.
+      allowance: A positive integer communicating the number of additional
+        payloads allowed to be transmitted from the other side to this side of
+        the operation, or None if no additional allowance is being granted in
+        this call.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def timeout(self, timeout):
+    """Accepts for transmission to the other side a new timeout value.
+
+    Args:
+      timeout: A positive float used as the new timeout value for the operation
+        to be transmitted to the other side.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def allowance(self, allowance):
+    """Indicates to this manager that the remote customer is allowing payloads.
+
+    Args:
+      allowance: A positive integer indicating the number of additional payloads
+        the remote customer is allowing to be transmitted from this side of the
+        operation.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def remote_complete(self):
+    """Indicates to this manager that data from the remote side is complete."""
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def abort(self, outcome):
+    """Indicates that the operation has aborted.
+
+    Args:
+      outcome: An interfaces.Outcome for the operation. If None, indicates that
+        the operation abortion should not be communicated to the other side of
+        the operation.
+    """
+    raise NotImplementedError()
+
+
+class ExpirationManager(object):
+  """A manager responsible for aborting the operation if it runs out of time."""
+  __metaclass__ = abc.ABCMeta
+
+  @abc.abstractmethod
+  def change_timeout(self, timeout):
+    """Changes the timeout allotted for the operation.
+
+    Operation duration is always measure from the beginning of the operation;
+    calling this method changes the operation's allotted time to timeout total
+    seconds, not timeout seconds from the time of this method call.
+
+    Args:
+      timeout: A length of time in seconds to allow for the operation.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def deadline(self):
+    """Returns the time until which the operation is allowed to run.
+
+    Returns:
+      The time (seconds since the epoch) at which the operation will expire.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def terminate(self):
+    """Indicates to this manager that the operation has terminated."""
+    raise NotImplementedError()
+
+
+class EmissionManager(base.Operator):
+  """A manager of values emitted by customer code."""
+  __metaclass__ = abc.ABCMeta
+
+  @abc.abstractmethod
+  def advance(
+      self, initial_metadata=None, payload=None, completion=None,
+      allowance=None):
+    """Accepts a value emitted by customer code.
+
+    This method should only be called by customer code.
+
+    Args:
+      initial_metadata: An initial metadata value emitted by the local customer
+        to be sent to the other side of the operation.
+      payload: A payload value emitted by the local customer to be sent to the
+        other side of the operation.
+      completion: A Completion value emitted by the local customer to be sent to
+        the other side of the operation.
+      allowance: A positive integer indicating an additional number of payloads
+        that the local customer is willing to accept from the other side of the
+        operation.
+    """
+    raise NotImplementedError()
+
+
+class IngestionManager(object):
+  """A manager responsible for executing customer code.
+
+  This name of this manager comes from its responsibility to pass successive
+  values from the other side of the operation into the code of the local
+  customer.
+  """
+  __metaclass__ = abc.ABCMeta
+
+  @abc.abstractmethod
+  def set_group_and_method(self, group, method):
+    """Communicates to this IngestionManager the operation group and method.
+
+    Args:
+      group: The group identifier of the operation.
+      method: The method identifier of the operation.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def add_local_allowance(self, allowance):
+    """Communicates to this IngestionManager that more payloads may be ingested.
+
+    Args:
+      allowance: A positive integer indicating an additional number of payloads
+        that the local customer is willing to ingest.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def local_emissions_done(self):
+    """Indicates to this manager that local emissions are done."""
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def advance(self, initial_metadata, payload, completion, allowance):
+    """Advances the operation by passing values to the local customer."""
+    raise NotImplementedError()
+
+
+class ReceptionManager(object):
+  """A manager responsible for receiving tickets from the other end."""
+  __metaclass__ = abc.ABCMeta
+
+  @abc.abstractmethod
+  def receive_ticket(self, ticket):
+    """Handle a ticket from the other side of the operation.
+
+    Args:
+      ticket: An interfaces.BackToFrontTicket or interfaces.FrontToBackTicket
+        appropriate to this end of the operation and this object.
+    """
+    raise NotImplementedError()
+
+
+class Operation(object):
+  """An ongoing operation.
+
+  Attributes:
+    context: A base.OperationContext object for the operation.
+    operator: A base.Operator object for the operation for use by the customer
+      of the operation.
+  """
+  __metaclass__ = abc.ABCMeta
+
+  @abc.abstractmethod
+  def handle_ticket(self, ticket):
+    """Handle a ticket from the other side of the operation.
+
+    Args:
+      ticket: A links.Ticket from the other side of the operation.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def abort(self, outcome):
+    """Aborts the operation.
+
+    Args:
+      outcome: A base.Outcome value indicating operation abortion.
+    """
+    raise NotImplementedError()
diff --git a/src/python/grpcio/grpc/framework/core/_operation.py b/src/python/grpcio/grpc/framework/core/_operation.py
new file mode 100644
index 0000000..d20e40a
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_operation.py
@@ -0,0 +1,192 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Implementation of operations."""
+
+import threading
+
+# _utilities is referenced from specification in this module.
+from grpc.framework.core import _context
+from grpc.framework.core import _emission
+from grpc.framework.core import _expiration
+from grpc.framework.core import _ingestion
+from grpc.framework.core import _interfaces
+from grpc.framework.core import _reception
+from grpc.framework.core import _termination
+from grpc.framework.core import _transmission
+from grpc.framework.core import _utilities  # pylint: disable=unused-import
+
+
+class _EasyOperation(_interfaces.Operation):
+  """A trivial implementation of interfaces.Operation."""
+
+  def __init__(
+      self, lock, termination_manager, transmission_manager, expiration_manager,
+      context, operator, reception_manager):
+    """Constructor.
+
+    Args:
+      lock: The operation-wide lock.
+      termination_manager: The _interfaces.TerminationManager for the operation.
+      transmission_manager: The _interfaces.TransmissionManager for the
+        operation.
+      expiration_manager: The _interfaces.ExpirationManager for the operation.
+      context: A base.OperationContext for use by the customer during the
+        operation.
+      operator: A base.Operator for use by the customer during the operation.
+      reception_manager: The _interfaces.ReceptionManager for the operation.
+    """
+    self._lock = lock
+    self._termination_manager = termination_manager
+    self._transmission_manager = transmission_manager
+    self._expiration_manager = expiration_manager
+    self._reception_manager = reception_manager
+
+    self.context = context
+    self.operator = operator
+
+  def handle_ticket(self, ticket):
+    with self._lock:
+      self._reception_manager.receive_ticket(ticket)
+
+  def abort(self, outcome):
+    with self._lock:
+      if self._termination_manager.outcome is None:
+        self._termination_manager.abort(outcome)
+        self._transmission_manager.abort(outcome)
+        self._expiration_manager.terminate()
+
+
+def invocation_operate(
+    operation_id, group, method, subscription, timeout, initial_metadata,
+    payload, completion, ticket_sink, termination_action, pool):
+  """Constructs objects necessary for front-side operation management.
+
+  Args:
+    operation_id: An object identifying the operation.
+    group: The group identifier of the operation.
+    method: The method identifier of the operation.
+    subscription: A base.Subscription describing the customer's interest in the
+      results of the operation.
+    timeout: A length of time in seconds to allow for the operation.
+    initial_metadata: An initial metadata value to be sent to the other side of
+      the operation. May be None if the initial metadata will be passed later or
+      if there will be no initial metadata passed at all.
+    payload: The first payload value to be transmitted to the other side. May be
+      None if there is no such value or if the customer chose not to pass it at
+      operation invocation.
+    completion: A base.Completion value indicating the end of values passed to
+      the other side of the operation.
+    ticket_sink: A callable that accepts links.Tickets and delivers them to the
+      other side of the operation.
+    termination_action: A callable that accepts the outcome of the operation as
+      a base.Outcome value to be called on operation completion.
+    pool: A thread pool with which to do the work of the operation.
+
+  Returns:
+    An _interfaces.Operation for the operation.
+  """
+  lock = threading.Lock()
+  with lock:
+    termination_manager = _termination.invocation_termination_manager(
+        termination_action, pool)
+    transmission_manager = _transmission.TransmissionManager(
+        operation_id, ticket_sink, lock, pool, termination_manager)
+    expiration_manager = _expiration.invocation_expiration_manager(
+        timeout, lock, termination_manager, transmission_manager)
+    operation_context = _context.OperationContext(
+        lock, termination_manager, transmission_manager, expiration_manager)
+    emission_manager = _emission.EmissionManager(
+        lock, termination_manager, transmission_manager, expiration_manager)
+    ingestion_manager = _ingestion.invocation_ingestion_manager(
+        subscription, lock, pool, termination_manager, transmission_manager,
+        expiration_manager)
+    reception_manager = _reception.ReceptionManager(
+        termination_manager, transmission_manager, expiration_manager,
+        ingestion_manager)
+
+    termination_manager.set_expiration_manager(expiration_manager)
+    transmission_manager.set_expiration_manager(expiration_manager)
+    emission_manager.set_ingestion_manager(ingestion_manager)
+
+    transmission_manager.kick_off(
+        group, method, timeout, initial_metadata, payload, completion, None)
+
+  return _EasyOperation(
+      lock, termination_manager, transmission_manager, expiration_manager,
+      operation_context, emission_manager, reception_manager)
+
+
+def service_operate(
+    servicer_package, ticket, ticket_sink, termination_action, pool):
+  """Constructs an Operation for service of an operation.
+
+  Args:
+    servicer_package: A _utilities.ServicerPackage to be used servicing the
+      operation.
+    ticket: The first links.Ticket received for the operation.
+    ticket_sink: A callable that accepts links.Tickets and delivers them to the
+      other side of the operation.
+    termination_action: A callable that accepts the outcome of the operation as
+      a base.Outcome value to be called on operation completion.
+    pool: A thread pool with which to do the work of the operation.
+
+  Returns:
+    An _interfaces.Operation for the operation.
+  """
+  lock = threading.Lock()
+  with lock:
+    termination_manager = _termination.service_termination_manager(
+        termination_action, pool)
+    transmission_manager = _transmission.TransmissionManager(
+        ticket.operation_id, ticket_sink, lock, pool, termination_manager)
+    expiration_manager = _expiration.service_expiration_manager(
+        ticket.timeout, servicer_package.default_timeout,
+        servicer_package.maximum_timeout, lock, termination_manager,
+        transmission_manager)
+    operation_context = _context.OperationContext(
+        lock, termination_manager, transmission_manager, expiration_manager)
+    emission_manager = _emission.EmissionManager(
+        lock, termination_manager, transmission_manager, expiration_manager)
+    ingestion_manager = _ingestion.service_ingestion_manager(
+        servicer_package.servicer, operation_context, emission_manager, lock,
+        pool, termination_manager, transmission_manager, expiration_manager)
+    reception_manager = _reception.ReceptionManager(
+        termination_manager, transmission_manager, expiration_manager,
+        ingestion_manager)
+
+    termination_manager.set_expiration_manager(expiration_manager)
+    transmission_manager.set_expiration_manager(expiration_manager)
+    emission_manager.set_ingestion_manager(ingestion_manager)
+
+    reception_manager.receive_ticket(ticket)
+
+  return _EasyOperation(
+      lock, termination_manager, transmission_manager, expiration_manager,
+      operation_context, emission_manager, reception_manager)
diff --git a/src/python/grpcio/grpc/framework/core/_reception.py b/src/python/grpcio/grpc/framework/core/_reception.py
new file mode 100644
index 0000000..b64faf8
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_reception.py
@@ -0,0 +1,137 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""State and behavior for ticket reception."""
+
+from grpc.framework.core import _interfaces
+from grpc.framework.interfaces.base import base
+from grpc.framework.interfaces.base import utilities
+from grpc.framework.interfaces.links import links
+
+_REMOTE_TICKET_TERMINATION_TO_LOCAL_OUTCOME = {
+    links.Ticket.Termination.CANCELLATION: base.Outcome.CANCELLED,
+    links.Ticket.Termination.EXPIRATION: base.Outcome.EXPIRED,
+    links.Ticket.Termination.SHUTDOWN: base.Outcome.REMOTE_SHUTDOWN,
+    links.Ticket.Termination.RECEPTION_FAILURE: base.Outcome.RECEPTION_FAILURE,
+    links.Ticket.Termination.TRANSMISSION_FAILURE:
+        base.Outcome.TRANSMISSION_FAILURE,
+    links.Ticket.Termination.LOCAL_FAILURE: base.Outcome.REMOTE_FAILURE,
+}
+
+
+class ReceptionManager(_interfaces.ReceptionManager):
+  """A ReceptionManager based around a _Receiver passed to it."""
+
+  def __init__(
+      self, termination_manager, transmission_manager, expiration_manager,
+      ingestion_manager):
+    """Constructor.
+
+    Args:
+      termination_manager: The operation's _interfaces.TerminationManager.
+      transmission_manager: The operation's _interfaces.TransmissionManager.
+      expiration_manager: The operation's _interfaces.ExpirationManager.
+      ingestion_manager: The operation's _interfaces.IngestionManager.
+    """
+    self._termination_manager = termination_manager
+    self._transmission_manager = transmission_manager
+    self._expiration_manager = expiration_manager
+    self._ingestion_manager = ingestion_manager
+
+    self._lowest_unseen_sequence_number = 0
+    self._out_of_sequence_tickets = {}
+    self._aborted = False
+
+  def _abort(self, outcome):
+    self._aborted = True
+    self._termination_manager.abort(outcome)
+    self._transmission_manager.abort(outcome)
+    self._expiration_manager.terminate()
+
+  def _sequence_failure(self, ticket):
+    """Determines a just-arrived ticket's sequential legitimacy.
+
+    Args:
+      ticket: A just-arrived ticket.
+
+    Returns:
+      True if the ticket is sequentially legitimate; False otherwise.
+    """
+    if ticket.sequence_number < self._lowest_unseen_sequence_number:
+      return True
+    elif ticket.sequence_number in self._out_of_sequence_tickets:
+      return True
+    else:
+      return False
+
+  def _process_one(self, ticket):
+    if ticket.sequence_number == 0:
+      self._ingestion_manager.set_group_and_method(ticket.group, ticket.method)
+    if ticket.timeout is not None:
+      self._expiration_manager.change_timeout(ticket.timeout)
+    if ticket.termination is None:
+      completion = None
+    else:
+      completion = utilities.completion(
+          ticket.terminal_metadata, ticket.code, ticket.message)
+    self._ingestion_manager.advance(
+        ticket.initial_metadata, ticket.payload, completion, ticket.allowance)
+    if ticket.allowance is not None:
+      self._transmission_manager.allowance(ticket.allowance)
+
+  def _process(self, ticket):
+    """Process those tickets ready to be processed.
+
+    Args:
+      ticket: A just-arrived ticket the sequence number of which matches this
+        _ReceptionManager's _lowest_unseen_sequence_number field.
+    """
+    while True:
+      self._process_one(ticket)
+      next_ticket = self._out_of_sequence_tickets.pop(
+          ticket.sequence_number + 1, None)
+      if next_ticket is None:
+        self._lowest_unseen_sequence_number = ticket.sequence_number + 1
+        return
+      else:
+        ticket = next_ticket
+
+  def receive_ticket(self, ticket):
+    """See _interfaces.ReceptionManager.receive_ticket for specification."""
+    if self._aborted:
+      return
+    elif self._sequence_failure(ticket):
+      self._abort(base.Outcome.RECEPTION_FAILURE)
+    elif ticket.termination not in (None, links.Ticket.Termination.COMPLETION):
+      outcome = _REMOTE_TICKET_TERMINATION_TO_LOCAL_OUTCOME[ticket.termination]
+      self._abort(outcome)
+    elif ticket.sequence_number == self._lowest_unseen_sequence_number:
+      self._process(ticket)
+    else:
+      self._out_of_sequence_tickets[ticket.sequence_number] = ticket
diff --git a/src/python/grpcio/grpc/framework/core/_termination.py b/src/python/grpcio/grpc/framework/core/_termination.py
new file mode 100644
index 0000000..ad9f612
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_termination.py
@@ -0,0 +1,212 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""State and behavior for operation termination."""
+
+import abc
+
+from grpc.framework.core import _constants
+from grpc.framework.core import _interfaces
+from grpc.framework.foundation import callable_util
+from grpc.framework.interfaces.base import base
+
+
+def _invocation_completion_predicate(
+    unused_emission_complete, unused_transmission_complete,
+    unused_reception_complete, ingestion_complete):
+  return ingestion_complete
+
+
+def _service_completion_predicate(
+    unused_emission_complete, transmission_complete, unused_reception_complete,
+    unused_ingestion_complete):
+  return transmission_complete
+
+
+class TerminationManager(_interfaces.TerminationManager):
+  """A _interfaces.TransmissionManager on which another manager may be set."""
+  __metaclass__ = abc.ABCMeta
+
+  @abc.abstractmethod
+  def set_expiration_manager(self, expiration_manager):
+    """Sets the expiration manager with which this manager will interact.
+
+    Args:
+      expiration_manager: The _interfaces.ExpirationManager associated with the
+        current operation.
+    """
+    raise NotImplementedError()
+
+
+class _TerminationManager(TerminationManager):
+  """An implementation of TerminationManager."""
+
+  def __init__(self, predicate, action, pool):
+    """Constructor.
+
+    Args:
+      predicate: One of _invocation_completion_predicate or
+        _service_completion_predicate to be used to determine when the operation
+        has completed.
+      action: A behavior to pass the operation outcome on operation termination.
+      pool: A thread pool.
+    """
+    self._predicate = predicate
+    self._action = action
+    self._pool = pool
+    self._expiration_manager = None
+
+    self.outcome = None
+    self._callbacks = []
+
+    self._emission_complete = False
+    self._transmission_complete = False
+    self._reception_complete = False
+    self._ingestion_complete = False
+
+  def set_expiration_manager(self, expiration_manager):
+    self._expiration_manager = expiration_manager
+
+  def _terminate_internal_only(self, outcome):
+    """Terminates the operation.
+
+    Args:
+      outcome: A base.Outcome describing the outcome of the operation.
+    """
+    self.outcome = outcome
+    callbacks = list(self._callbacks)
+    self._callbacks = None
+
+    act = callable_util.with_exceptions_logged(
+        self._action, _constants.INTERNAL_ERROR_LOG_MESSAGE)
+
+    if outcome is base.Outcome.LOCAL_FAILURE:
+      self._pool.submit(act, outcome)
+    else:
+      def call_callbacks_and_act(callbacks, outcome):
+        for callback in callbacks:
+          callback_outcome = callable_util.call_logging_exceptions(
+              callback, _constants.TERMINATION_CALLBACK_EXCEPTION_LOG_MESSAGE,
+              outcome)
+          if callback_outcome.exception is not None:
+            outcome = base.Outcome.LOCAL_FAILURE
+            break
+        act(outcome)
+
+      self._pool.submit(
+          callable_util.with_exceptions_logged(
+              call_callbacks_and_act, _constants.INTERNAL_ERROR_LOG_MESSAGE),
+          callbacks, outcome)
+
+  def _terminate_and_notify(self, outcome):
+    self._terminate_internal_only(outcome)
+    self._expiration_manager.terminate()
+
+  def _perhaps_complete(self):
+    if self._predicate(
+        self._emission_complete, self._transmission_complete,
+        self._reception_complete, self._ingestion_complete):
+      self._terminate_and_notify(base.Outcome.COMPLETED)
+      return True
+    else:
+      return False
+
+  def is_active(self):
+    """See _interfaces.TerminationManager.is_active for specification."""
+    return self.outcome is None
+
+  def add_callback(self, callback):
+    """See _interfaces.TerminationManager.add_callback for specification."""
+    if self.outcome is None:
+      self._callbacks.append(callback)
+      return None
+    else:
+      return self.outcome
+
+  def emission_complete(self):
+    """See superclass method for specification."""
+    if self.outcome is None:
+      self._emission_complete = True
+      self._perhaps_complete()
+
+  def transmission_complete(self):
+    """See superclass method for specification."""
+    if self.outcome is None:
+      self._transmission_complete = True
+      return self._perhaps_complete()
+    else:
+      return False
+
+  def reception_complete(self):
+    """See superclass method for specification."""
+    if self.outcome is None:
+      self._reception_complete = True
+      self._perhaps_complete()
+
+  def ingestion_complete(self):
+    """See superclass method for specification."""
+    if self.outcome is None:
+      self._ingestion_complete = True
+      self._perhaps_complete()
+
+  def expire(self):
+    """See _interfaces.TerminationManager.expire for specification."""
+    self._terminate_internal_only(base.Outcome.EXPIRED)
+
+  def abort(self, outcome):
+    """See _interfaces.TerminationManager.abort for specification."""
+    self._terminate_and_notify(outcome)
+
+
+def invocation_termination_manager(action, pool):
+  """Creates a TerminationManager appropriate for invocation-side use.
+
+  Args:
+    action: An action to call on operation termination.
+    pool: A thread pool in which to execute the passed action and any
+      termination callbacks that are registered during the operation.
+
+  Returns:
+    A TerminationManager appropriate for invocation-side use.
+  """
+  return _TerminationManager(_invocation_completion_predicate, action, pool)
+
+
+def service_termination_manager(action, pool):
+  """Creates a TerminationManager appropriate for service-side use.
+
+  Args:
+    action: An action to call on operation termination.
+    pool: A thread pool in which to execute the passed action and any
+      termination callbacks that are registered during the operation.
+
+  Returns:
+    A TerminationManager appropriate for service-side use.
+  """
+  return _TerminationManager(_service_completion_predicate, action, pool)
diff --git a/src/python/grpcio/grpc/framework/core/_transmission.py b/src/python/grpcio/grpc/framework/core/_transmission.py
new file mode 100644
index 0000000..01894d3
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_transmission.py
@@ -0,0 +1,294 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""State and behavior for ticket transmission during an operation."""
+
+from grpc.framework.core import _constants
+from grpc.framework.core import _interfaces
+from grpc.framework.foundation import callable_util
+from grpc.framework.interfaces.base import base
+from grpc.framework.interfaces.links import links
+
+_TRANSMISSION_EXCEPTION_LOG_MESSAGE = 'Exception during transmission!'
+
+
+def _explode_completion(completion):
+  if completion is None:
+    return None, None, None, None
+  else:
+    return (
+        completion.terminal_metadata, completion.code, completion.message,
+        links.Ticket.Termination.COMPLETION)
+
+
+class TransmissionManager(_interfaces.TransmissionManager):
+  """An _interfaces.TransmissionManager that sends links.Tickets."""
+
+  def __init__(
+      self, operation_id, ticket_sink, lock, pool, termination_manager):
+    """Constructor.
+
+    Args:
+      operation_id: The operation's ID.
+      ticket_sink: A callable that accepts tickets and sends them to the other
+        side of the operation.
+      lock: The operation-servicing-wide lock object.
+      pool: A thread pool in which the work of transmitting tickets will be
+        performed.
+      termination_manager: The _interfaces.TerminationManager associated with
+        this operation.
+    """
+    self._lock = lock
+    self._pool = pool
+    self._ticket_sink = ticket_sink
+    self._operation_id = operation_id
+    self._termination_manager = termination_manager
+    self._expiration_manager = None
+
+    self._lowest_unused_sequence_number = 0
+    self._remote_allowance = 1
+    self._remote_complete = False
+    self._timeout = None
+    self._local_allowance = 0
+    self._initial_metadata = None
+    self._payloads = []
+    self._completion = None
+    self._aborted = False
+    self._abortion_outcome = None
+    self._transmitting = False
+
+  def set_expiration_manager(self, expiration_manager):
+    """Sets the ExpirationManager with which this manager will cooperate."""
+    self._expiration_manager = expiration_manager
+
+  def _next_ticket(self):
+    """Creates the next ticket to be transmitted.
+
+    Returns:
+      A links.Ticket to be sent to the other side of the operation or None if
+        there is nothing to be sent at this time.
+    """
+    if self._aborted:
+      if self._abortion_outcome is None:
+        return None
+      else:
+        termination = _constants.ABORTION_OUTCOME_TO_TICKET_TERMINATION[
+            self._abortion_outcome]
+        if termination is None:
+          return None
+        else:
+          self._abortion_outcome = None
+          return links.Ticket(
+              self._operation_id, self._lowest_unused_sequence_number, None,
+              None, None, None, None, None, None, None, None, None,
+              termination)
+
+    action = False
+    # TODO(nathaniel): Support other subscriptions.
+    local_subscription = links.Ticket.Subscription.FULL
+    timeout = self._timeout
+    if timeout is not None:
+      self._timeout = None
+      action = True
+    if self._local_allowance <= 0:
+      allowance = None
+    else:
+      allowance = self._local_allowance
+      self._local_allowance = 0
+      action = True
+    initial_metadata = self._initial_metadata
+    if initial_metadata is not None:
+      self._initial_metadata = None
+      action = True
+    if not self._payloads or self._remote_allowance <= 0:
+      payload = None
+    else:
+      payload = self._payloads.pop(0)
+      self._remote_allowance -= 1
+      action = True
+    if self._completion is None or self._payloads:
+      terminal_metadata, code, message, termination = None, None, None, None
+    else:
+      terminal_metadata, code, message, termination = _explode_completion(
+          self._completion)
+      self._completion = None
+      action = True
+
+    if action:
+      ticket = links.Ticket(
+          self._operation_id, self._lowest_unused_sequence_number, None, None,
+          local_subscription, timeout, allowance, initial_metadata, payload,
+          terminal_metadata, code, message, termination)
+      self._lowest_unused_sequence_number += 1
+      return ticket
+    else:
+      return None
+
+  def _transmit(self, ticket):
+    """Commences the transmission loop sending tickets.
+
+    Args:
+      ticket: A links.Ticket to be sent to the other side of the operation.
+    """
+    def transmit(ticket):
+      while True:
+        transmission_outcome = callable_util.call_logging_exceptions(
+            self._ticket_sink, _TRANSMISSION_EXCEPTION_LOG_MESSAGE, ticket)
+        if transmission_outcome.exception is None:
+          with self._lock:
+            if ticket.termination is links.Ticket.Termination.COMPLETION:
+              self._termination_manager.transmission_complete()
+            ticket = self._next_ticket()
+            if ticket is None:
+              self._transmitting = False
+              return
+        else:
+          with self._lock:
+            if self._termination_manager.outcome is None:
+              self._termination_manager.abort(base.Outcome.TRANSMISSION_FAILURE)
+              self._expiration_manager.terminate()
+            return
+
+    self._pool.submit(callable_util.with_exceptions_logged(
+        transmit, _constants.INTERNAL_ERROR_LOG_MESSAGE), ticket)
+    self._transmitting = True
+
+  def kick_off(
+      self, group, method, timeout, initial_metadata, payload, completion,
+      allowance):
+    """See _interfaces.TransmissionManager.kickoff for specification."""
+    # TODO(nathaniel): Support other subscriptions.
+    subscription = links.Ticket.Subscription.FULL
+    terminal_metadata, code, message, termination = _explode_completion(
+        completion)
+    self._remote_allowance = 1 if payload is None else 0
+    ticket = links.Ticket(
+        self._operation_id, 0, group, method, subscription, timeout, allowance,
+        initial_metadata, payload, terminal_metadata, code, message,
+        termination)
+    self._lowest_unused_sequence_number = 1
+    self._transmit(ticket)
+
+  def advance(self, initial_metadata, payload, completion, allowance):
+    """See _interfaces.TransmissionManager.advance for specification."""
+    effective_initial_metadata = initial_metadata
+    effective_payload = payload
+    effective_completion = completion
+    if allowance is not None and not self._remote_complete:
+      effective_allowance = allowance
+    else:
+      effective_allowance = None
+    if self._transmitting:
+      if effective_initial_metadata is not None:
+        self._initial_metadata = effective_initial_metadata
+      if effective_payload is not None:
+        self._payloads.append(effective_payload)
+      if effective_completion is not None:
+        self._completion = effective_completion
+      if effective_allowance is not None:
+        self._local_allowance += effective_allowance
+    else:
+      if effective_payload is not None:
+        if 0 < self._remote_allowance:
+          ticket_payload = effective_payload
+          self._remote_allowance -= 1
+        else:
+          self._payloads.append(effective_payload)
+          ticket_payload = None
+      else:
+        ticket_payload = None
+      if effective_completion is not None and not self._payloads:
+        ticket_completion = effective_completion
+      else:
+        self._completion = effective_completion
+        ticket_completion = None
+      if any(
+          (effective_initial_metadata, ticket_payload, ticket_completion,
+           effective_allowance)):
+        terminal_metadata, code, message, termination = _explode_completion(
+            completion)
+        ticket = links.Ticket(
+            self._operation_id, self._lowest_unused_sequence_number, None, None,
+            None, None, allowance, effective_initial_metadata, ticket_payload,
+            terminal_metadata, code, message, termination)
+        self._lowest_unused_sequence_number += 1
+        self._transmit(ticket)
+
+  def timeout(self, timeout):
+    """See _interfaces.TransmissionManager.timeout for specification."""
+    if self._transmitting:
+      self._timeout = timeout
+    else:
+      ticket = links.Ticket(
+          self._operation_id, self._lowest_unused_sequence_number, None, None,
+          None, timeout, None, None, None, None, None, None, None)
+      self._lowest_unused_sequence_number += 1
+      self._transmit(ticket)
+
+  def allowance(self, allowance):
+    """See _interfaces.TransmissionManager.allowance for specification."""
+    if self._transmitting or not self._payloads:
+      self._remote_allowance += allowance
+    else:
+      self._remote_allowance += allowance - 1
+      payload = self._payloads.pop(0)
+      if self._payloads:
+        completion = None
+      else:
+        completion = self._completion
+        self._completion = None
+      terminal_metadata, code, message, termination = _explode_completion(
+          completion)
+      ticket = links.Ticket(
+          self._operation_id, self._lowest_unused_sequence_number, None, None,
+          None, None, None, None, payload, terminal_metadata, code, message,
+          termination)
+      self._lowest_unused_sequence_number += 1
+      self._transmit(ticket)
+
+  def remote_complete(self):
+    """See _interfaces.TransmissionManager.remote_complete for specification."""
+    self._remote_complete = True
+    self._local_allowance = 0
+
+  def abort(self, outcome):
+    """See _interfaces.TransmissionManager.abort for specification."""
+    if self._transmitting:
+      self._aborted, self._abortion_outcome = True, outcome
+    else:
+      self._aborted = True
+      if outcome is not None:
+        termination = _constants.ABORTION_OUTCOME_TO_TICKET_TERMINATION[
+            outcome]
+        if termination is not None:
+          ticket = links.Ticket(
+              self._operation_id, self._lowest_unused_sequence_number, None,
+              None, None, None, None, None, None, None, None, None,
+              termination)
+          self._transmit(ticket)
diff --git a/src/python/grpcio/grpc/framework/core/_utilities.py b/src/python/grpcio/grpc/framework/core/_utilities.py
new file mode 100644
index 0000000..5b0d798
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_utilities.py
@@ -0,0 +1,46 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Package-internal utilities."""
+
+import collections
+
+
+class ServicerPackage(
+    collections.namedtuple(
+        'ServicerPackage', ('servicer', 'default_timeout', 'maximum_timeout'))):
+  """A trivial bundle class.
+
+  Attributes:
+    servicer: A base.Servicer.
+    default_timeout: A float indicating the length of time in seconds to allow
+      for an operation invoked without a timeout.
+    maximum_timeout: A float indicating the maximum length of time in seconds to
+      allow for an operation.
+  """
diff --git a/src/python/grpcio/grpc/framework/core/implementations.py b/src/python/grpcio/grpc/framework/core/implementations.py
new file mode 100644
index 0000000..364a7fa
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/implementations.py
@@ -0,0 +1,62 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Entry points into the ticket-exchange-based base layer implementation."""
+
+# base and links are referenced from specification in this module.
+from grpc.framework.core import _end
+from grpc.framework.interfaces.base import base  # pylint: disable=unused-import
+from grpc.framework.interfaces.links import links  # pylint: disable=unused-import
+
+
+def invocation_end_link():
+  """Creates a base.End-links.Link suitable for operation invocation.
+
+  Returns:
+    An object that is both a base.End and a links.Link, that supports operation
+      invocation, and that translates operation invocation into ticket exchange.
+  """
+  return _end.serviceless_end_link()
+
+
+def service_end_link(servicer, default_timeout, maximum_timeout):
+  """Creates a base.End-links.Link suitable for operation service.
+
+  Args:
+    servicer: A base.Servicer for servicing operations.
+    default_timeout: A length of time in seconds to be used as the default
+      time alloted for a single operation.
+    maximum_timeout: A length of time in seconds to be used as the maximum
+      time alloted for a single operation.
+
+  Returns:
+    An object that is both a base.End and a links.Link and that services
+      operations that arrive at it through ticket exchange.
+  """
+  return _end.serviceful_end_link(servicer, default_timeout, maximum_timeout)
diff --git a/src/python/grpcio/grpc/framework/interfaces/base/base.py b/src/python/grpcio/grpc/framework/interfaces/base/base.py
index 9d1651d..76e0a5b 100644
--- a/src/python/grpcio/grpc/framework/interfaces/base/base.py
+++ b/src/python/grpcio/grpc/framework/interfaces/base/base.py
@@ -27,10 +27,20 @@
 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
-"""The base interface of RPC Framework."""
+"""The base interface of RPC Framework.
 
+Implementations of this interface support the conduct of "operations":
+exchanges between two distinct ends of an arbitrary number of data payloads
+and metadata such as a name for the operation, initial and terminal metadata
+in each direction, and flow control. These operations may be used for transfers
+of data, remote procedure calls, status indication, or anything else
+applications choose.
+"""
+
+# threading is referenced from specification in this module.
 import abc
 import enum
+import threading
 
 # abandonment is referenced from specification in this module.
 from grpc.framework.foundation import abandonment  # pylint: disable=unused-import
@@ -208,19 +218,26 @@
     raise NotImplementedError()
 
   @abc.abstractmethod
-  def stop_gracefully(self):
-    """Gracefully stops this object's service of operations.
+  def stop(self, grace):
+    """Stops this object's service of operations.
 
-    Operations in progress will be allowed to complete, and this method blocks
-    until all of them have.
-    """
-    raise NotImplementedError()
+    This object will refuse service of new operations as soon as this method is
+    called but operations under way at the time of the call may be given a
+    grace period during which they are allowed to finish.
 
-  @abc.abstractmethod
-  def stop_immediately(self):
-    """Immediately stops this object's service of operations.
+    Args:
+      grace: A duration of time in seconds to allow ongoing operations to
+        terminate before being forcefully terminated by the stopping of this
+        End. May be zero to terminate all ongoing operations and immediately
+        stop.
 
-    Operations in progress will not be allowed to complete.
+    Returns:
+      A threading.Event that will be set to indicate all operations having
+        terminated and this End having completely stopped. The returned event
+        may not be set until after the full grace period (if some ongoing
+        operation continues for the full length of the period) or it may be set
+        much sooner (if for example this End had no operations in progress at
+        the time its stop method was called).
     """
     raise NotImplementedError()
 
diff --git a/src/python/grpcio/grpc/framework/interfaces/links/links.py b/src/python/grpcio/grpc/framework/interfaces/links/links.py
index 5ebbac8..069ff02 100644
--- a/src/python/grpcio/grpc/framework/interfaces/links/links.py
+++ b/src/python/grpcio/grpc/framework/interfaces/links/links.py
@@ -98,7 +98,7 @@
     COMPLETION = 'completion'
     CANCELLATION = 'cancellation'
     EXPIRATION = 'expiration'
-    LOCAL_SHUTDOWN = 'local shutdown'
+    SHUTDOWN = 'shutdown'
     RECEPTION_FAILURE = 'reception failure'
     TRANSMISSION_FAILURE = 'transmission failure'
     LOCAL_FAILURE = 'local failure'
diff --git a/src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py b/src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py
new file mode 100644
index 0000000..72b1ae5
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py
@@ -0,0 +1,165 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Tests the RPC Framework Core's implementation of the Base interface."""
+
+import collections
+import logging
+import random
+import time
+import unittest
+
+from grpc._adapter import _intermediary_low
+from grpc._links import invocation
+from grpc._links import service
+from grpc.framework.core import implementations
+from grpc.framework.interfaces.base import utilities
+from grpc_test import test_common as grpc_test_common
+from grpc_test.framework.common import test_constants
+from grpc_test.framework.interfaces.base import test_cases
+from grpc_test.framework.interfaces.base import test_interfaces
+
+_INVOCATION_INITIAL_METADATA = ((b'0', b'abc'), (b'1', b'def'), (b'2', b'ghi'),)
+_SERVICE_INITIAL_METADATA = ((b'3', b'jkl'), (b'4', b'mno'), (b'5', b'pqr'),)
+_SERVICE_TERMINAL_METADATA = ((b'6', b'stu'), (b'7', b'vwx'), (b'8', b'yza'),)
+_CODE = _intermediary_low.Code.OK
+_MESSAGE = b'test message'
+
+
+class _SerializationBehaviors(
+    collections.namedtuple(
+        '_SerializationBehaviors',
+        ('request_serializers', 'request_deserializers', 'response_serializers',
+         'response_deserializers',))):
+  pass
+
+
+class _Links(
+    collections.namedtuple(
+        '_Links',
+        ('invocation_end_link', 'invocation_grpc_link', 'service_grpc_link',
+         'service_end_link'))):
+  pass
+
+
+def _serialization_behaviors_from_serializations(serializations):
+  request_serializers = {}
+  request_deserializers = {}
+  response_serializers = {}
+  response_deserializers = {}
+  for (group, method), serialization in serializations.iteritems():
+    request_serializers[group, method] = serialization.serialize_request
+    request_deserializers[group, method] = serialization.deserialize_request
+    response_serializers[group, method] = serialization.serialize_response
+    response_deserializers[group, method] = serialization.deserialize_response
+  return _SerializationBehaviors(
+      request_serializers, request_deserializers, response_serializers,
+      response_deserializers)
+
+
+class _Implementation(test_interfaces.Implementation):
+
+  def instantiate(self, serializations, servicer):
+    serialization_behaviors = _serialization_behaviors_from_serializations(
+        serializations)
+    invocation_end_link = implementations.invocation_end_link()
+    service_end_link = implementations.service_end_link(
+        servicer, test_constants.DEFAULT_TIMEOUT,
+        test_constants.MAXIMUM_TIMEOUT)
+    service_grpc_link = service.service_link(
+        serialization_behaviors.request_deserializers,
+        serialization_behaviors.response_serializers)
+    port = service_grpc_link.add_port(0, None)
+    channel = _intermediary_low.Channel('localhost:%d' % port, None)
+    invocation_grpc_link = invocation.invocation_link(
+        channel, b'localhost',
+        serialization_behaviors.request_serializers,
+        serialization_behaviors.response_deserializers)
+
+    invocation_end_link.join_link(invocation_grpc_link)
+    invocation_grpc_link.join_link(invocation_end_link)
+    service_end_link.join_link(service_grpc_link)
+    service_grpc_link.join_link(service_end_link)
+    invocation_grpc_link.start()
+    service_grpc_link.start()
+    return invocation_end_link, service_end_link, (
+        invocation_grpc_link, service_grpc_link)
+
+  def destantiate(self, memo):
+    invocation_grpc_link, service_grpc_link = memo
+    invocation_grpc_link.stop()
+    service_grpc_link.stop_gracefully()
+
+  def invocation_initial_metadata(self):
+    return _INVOCATION_INITIAL_METADATA
+
+  def service_initial_metadata(self):
+    return _SERVICE_INITIAL_METADATA
+
+  def invocation_completion(self):
+    return utilities.completion(None, None, None)
+
+  def service_completion(self):
+    return utilities.completion(_SERVICE_TERMINAL_METADATA, _CODE, _MESSAGE)
+
+  def metadata_transmitted(self, original_metadata, transmitted_metadata):
+    return original_metadata is None or grpc_test_common.metadata_transmitted(
+        original_metadata, transmitted_metadata)
+
+  def completion_transmitted(self, original_completion, transmitted_completion):
+    if (original_completion.terminal_metadata is not None and
+        not grpc_test_common.metadata_transmitted(
+            original_completion.terminal_metadata,
+            transmitted_completion.terminal_metadata)):
+        return False
+    elif original_completion.code is not transmitted_completion.code:
+      return False
+    elif original_completion.message != transmitted_completion.message:
+      return False
+    else:
+      return True
+
+
+def setUpModule():
+  logging.warn('setUpModule!')
+
+
+def tearDownModule():
+  logging.warn('tearDownModule!')
+
+
+def load_tests(loader, tests, pattern):
+  return unittest.TestSuite(
+      tests=tuple(
+          loader.loadTestsFromTestCase(test_case_class)
+          for test_case_class in test_cases.test_cases(_Implementation())))
+
+
+if __name__ == '__main__':
+  unittest.main(verbosity=2)
diff --git a/src/python/grpcio_test/grpc_test/framework/core/__init__.py b/src/python/grpcio_test/grpc_test/framework/core/__init__.py
new file mode 100644
index 0000000..7086519
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/core/__init__.py
@@ -0,0 +1,30 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
diff --git a/src/python/grpcio_test/grpc_test/framework/core/_base_interface_test.py b/src/python/grpcio_test/grpc_test/framework/core/_base_interface_test.py
new file mode 100644
index 0000000..8d72f13
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/core/_base_interface_test.py
@@ -0,0 +1,96 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Tests the RPC Framework Core's implementation of the Base interface."""
+
+import logging
+import random
+import time
+import unittest
+
+from grpc.framework.core import implementations
+from grpc.framework.interfaces.base import utilities
+from grpc_test.framework.common import test_constants
+from grpc_test.framework.interfaces.base import test_cases
+from grpc_test.framework.interfaces.base import test_interfaces
+
+
+class _Implementation(test_interfaces.Implementation):
+
+  def __init__(self):
+    self._invocation_initial_metadata = object()
+    self._service_initial_metadata = object()
+    self._invocation_terminal_metadata = object()
+    self._service_terminal_metadata = object()
+
+  def instantiate(self, serializations, servicer):
+    invocation = implementations.invocation_end_link()
+    service = implementations.service_end_link(
+        servicer, test_constants.DEFAULT_TIMEOUT,
+        test_constants.MAXIMUM_TIMEOUT)
+    invocation.join_link(service)
+    service.join_link(invocation)
+    return invocation, service, None
+
+  def destantiate(self, memo):
+    pass
+
+  def invocation_initial_metadata(self):
+    return self._invocation_initial_metadata
+
+  def service_initial_metadata(self):
+    return self._service_initial_metadata
+
+  def invocation_completion(self):
+    return utilities.completion(self._invocation_terminal_metadata, None, None)
+
+  def service_completion(self):
+    return utilities.completion(self._service_terminal_metadata, None, None)
+
+  def metadata_transmitted(self, original_metadata, transmitted_metadata):
+    return transmitted_metadata is original_metadata
+
+  def completion_transmitted(self, original_completion, transmitted_completion):
+    return (
+        (original_completion.terminal_metadata is
+         transmitted_completion.terminal_metadata) and
+        original_completion.code is transmitted_completion.code and
+        original_completion.message is transmitted_completion.message
+    )
+
+
+def load_tests(loader, tests, pattern):
+  return unittest.TestSuite(
+      tests=tuple(
+          loader.loadTestsFromTestCase(test_case_class)
+          for test_case_class in test_cases.test_cases(_Implementation())))
+
+
+if __name__ == '__main__':
+  unittest.main(verbosity=2)
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py b/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py
index dd332fe..5c8b176 100644
--- a/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py
@@ -211,8 +211,10 @@
       elif instruction.kind is _control.Instruction.Kind.CONCLUDE:
         break
 
-    invocation_end.stop_gracefully()
-    service_end.stop_gracefully()
+    invocation_stop_event = invocation_end.stop(0)
+    service_stop_event = service_end.stop(0)
+    invocation_stop_event.wait()
+    service_stop_event.wait()
     invocation_stats = invocation_end.operation_stats()
     service_stats = service_end.operation_stats()
 
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_utilities.py b/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_utilities.py
index 6c2e334..a2bd710 100644
--- a/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_utilities.py
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_utilities.py
@@ -29,9 +29,42 @@
 
 """State and behavior appropriate for use in tests."""
 
+import logging
 import threading
+import time
 
 from grpc.framework.interfaces.links import links
+from grpc.framework.interfaces.links import utilities
+
+# A more-or-less arbitrary limit on the length of raw data values to be logged.
+_UNCOMFORTABLY_LONG = 48
+
+
+def _safe_for_log_ticket(ticket):
+  """Creates a safe-for-printing-to-the-log ticket for a given ticket.
+
+  Args:
+    ticket: Any links.Ticket.
+
+  Returns:
+    A links.Ticket that is as much as can be equal to the given ticket but
+      possibly features values like the string "<payload of length 972321>" in
+      place of the actual values of the given ticket.
+  """
+  if isinstance(ticket.payload, (basestring,)):
+    payload_length = len(ticket.payload)
+  else:
+    payload_length = -1
+  if payload_length < _UNCOMFORTABLY_LONG:
+    return ticket
+  else:
+    return links.Ticket(
+        ticket.operation_id, ticket.sequence_number,
+        ticket.group, ticket.method, ticket.subscription, ticket.timeout,
+        ticket.allowance, ticket.initial_metadata,
+        '<payload of length {}>'.format(payload_length),
+        ticket.terminal_metadata, ticket.code, ticket.message,
+        ticket.termination)
 
 
 class RecordingLink(links.Link):
@@ -64,3 +97,71 @@
     """Returns a copy of the list of all tickets received by this Link."""
     with self._condition:
       return tuple(self._tickets)
+
+
+class _Pipe(object):
+  """A conduit that logs all tickets passed through it."""
+
+  def __init__(self, name):
+    self._lock = threading.Lock()
+    self._name = name
+    self._left_mate = utilities.NULL_LINK
+    self._right_mate = utilities.NULL_LINK
+
+  def accept_left_to_right_ticket(self, ticket):
+    with self._lock:
+      logging.warning(
+          '%s: moving left to right through %s: %s', time.time(), self._name,
+          _safe_for_log_ticket(ticket))
+      try:
+        self._right_mate.accept_ticket(ticket)
+      except Exception as e:  # pylint: disable=broad-except
+        logging.exception(e)
+
+  def accept_right_to_left_ticket(self, ticket):
+    with self._lock:
+      logging.warning(
+          '%s: moving right to left through %s: %s', time.time(), self._name,
+          _safe_for_log_ticket(ticket))
+      try:
+        self._left_mate.accept_ticket(ticket)
+      except Exception as e:  # pylint: disable=broad-except
+        logging.exception(e)
+
+  def join_left_mate(self, left_mate):
+    with self._lock:
+      self._left_mate = utilities.NULL_LINK if left_mate is None else left_mate
+
+  def join_right_mate(self, right_mate):
+    with self._lock:
+      self._right_mate = (
+          utilities.NULL_LINK if right_mate is None else right_mate)
+
+
+class _Facade(links.Link):
+
+  def __init__(self, accept, join):
+    self._accept = accept
+    self._join = join
+
+  def accept_ticket(self, ticket):
+    self._accept(ticket)
+
+  def join_link(self, link):
+    self._join(link)
+
+
+def logging_links(name):
+  """Creates a conduit that logs all tickets passed through it.
+
+  Args:
+    name: A name to use for the conduit to identify itself in logging output.
+
+  Returns:
+    Two links.Links, the first of which is the "left" side of the conduit
+      and the second of which is the "right" side of the conduit.
+  """
+  pipe = _Pipe(name)
+  left_facade = _Facade(pipe.accept_left_to_right_ticket, pipe.join_left_mate)
+  right_facade = _Facade(pipe.accept_right_to_left_ticket, pipe.join_right_mate)
+  return left_facade, right_facade