Add code and details to base.Outcome

It may seem weird that code and details would travel along two paths
now instead of one but it makes sense after considering that sometimes
the code and details are application data from the remote application
and sometimes they are transport data from the transport between the
local and remote applications.
diff --git a/src/python/grpcio/grpc/framework/core/_constants.py b/src/python/grpcio/grpc/framework/core/_constants.py
index d3be3a4..0f47cb4 100644
--- a/src/python/grpcio/grpc/framework/core/_constants.py
+++ b/src/python/grpcio/grpc/framework/core/_constants.py
@@ -44,14 +44,15 @@
 # ticket should be sent to the other side in the event of such an
 # outcome.
 ABORTION_OUTCOME_TO_TICKET_TERMINATION = {
-    base.Outcome.CANCELLED: links.Ticket.Termination.CANCELLATION,
-    base.Outcome.EXPIRED: links.Ticket.Termination.EXPIRATION,
-    base.Outcome.LOCAL_SHUTDOWN: links.Ticket.Termination.SHUTDOWN,
-    base.Outcome.REMOTE_SHUTDOWN: None,
-    base.Outcome.RECEPTION_FAILURE: links.Ticket.Termination.RECEPTION_FAILURE,
-    base.Outcome.TRANSMISSION_FAILURE: None,
-    base.Outcome.LOCAL_FAILURE: links.Ticket.Termination.LOCAL_FAILURE,
-    base.Outcome.REMOTE_FAILURE: links.Ticket.Termination.REMOTE_FAILURE,
+    base.Outcome.Kind.CANCELLED: links.Ticket.Termination.CANCELLATION,
+    base.Outcome.Kind.EXPIRED: links.Ticket.Termination.EXPIRATION,
+    base.Outcome.Kind.LOCAL_SHUTDOWN: links.Ticket.Termination.SHUTDOWN,
+    base.Outcome.Kind.REMOTE_SHUTDOWN: None,
+    base.Outcome.Kind.RECEPTION_FAILURE:
+        links.Ticket.Termination.RECEPTION_FAILURE,
+    base.Outcome.Kind.TRANSMISSION_FAILURE: None,
+    base.Outcome.Kind.LOCAL_FAILURE: links.Ticket.Termination.LOCAL_FAILURE,
+    base.Outcome.Kind.REMOTE_FAILURE: links.Ticket.Termination.REMOTE_FAILURE,
 }
 
 INTERNAL_ERROR_LOG_MESSAGE = ':-( RPC Framework (Core) internal error! )-:'
diff --git a/src/python/grpcio/grpc/framework/core/_context.py b/src/python/grpcio/grpc/framework/core/_context.py
index 76b3534..a346e9d 100644
--- a/src/python/grpcio/grpc/framework/core/_context.py
+++ b/src/python/grpcio/grpc/framework/core/_context.py
@@ -33,6 +33,7 @@
 
 # _interfaces is referenced from specification in this module.
 from grpc.framework.core import _interfaces  # pylint: disable=unused-import
+from grpc.framework.core import _utilities
 from grpc.framework.interfaces.base import base
 
 
@@ -56,11 +57,12 @@
     self._transmission_manager = transmission_manager
     self._expiration_manager = expiration_manager
 
-  def _abort(self, outcome):
+  def _abort(self, outcome_kind):
     with self._lock:
       if self._termination_manager.outcome is None:
+        outcome = _utilities.Outcome(outcome_kind, None, None)
         self._termination_manager.abort(outcome)
-        self._transmission_manager.abort(outcome, None, None)
+        self._transmission_manager.abort(outcome)
         self._expiration_manager.terminate()
 
   def outcome(self):
@@ -85,8 +87,8 @@
 
   def cancel(self):
     """See base.OperationContext.cancel for specification."""
-    self._abort(base.Outcome.CANCELLED)
+    self._abort(base.Outcome.Kind.CANCELLED)
 
   def fail(self, exception):
     """See base.OperationContext.fail for specification."""
-    self._abort(base.Outcome.LOCAL_FAILURE)
+    self._abort(base.Outcome.Kind.LOCAL_FAILURE)
diff --git a/src/python/grpcio/grpc/framework/core/_emission.py b/src/python/grpcio/grpc/framework/core/_emission.py
index 2d7b2e2..8ab59dc 100644
--- a/src/python/grpcio/grpc/framework/core/_emission.py
+++ b/src/python/grpcio/grpc/framework/core/_emission.py
@@ -30,6 +30,7 @@
 """State and behavior for handling emitted values."""
 
 from grpc.framework.core import _interfaces
+from grpc.framework.core import _utilities
 from grpc.framework.interfaces.base import base
 
 
@@ -81,9 +82,10 @@
             payload_present and self._completion_seen or
             completion_present and self._completion_seen or
             allowance_present and allowance <= 0):
-          self._termination_manager.abort(base.Outcome.LOCAL_FAILURE)
-          self._transmission_manager.abort(
-              base.Outcome.LOCAL_FAILURE, None, None)
+          outcome = _utilities.Outcome(
+              base.Outcome.Kind.LOCAL_FAILURE, None, None)
+          self._termination_manager.abort(outcome)
+          self._transmission_manager.abort(outcome)
           self._expiration_manager.terminate()
         else:
           self._initial_metadata_seen |= initial_metadata_present
diff --git a/src/python/grpcio/grpc/framework/core/_end.py b/src/python/grpcio/grpc/framework/core/_end.py
index f57cde4..336e9c2 100644
--- a/src/python/grpcio/grpc/framework/core/_end.py
+++ b/src/python/grpcio/grpc/framework/core/_end.py
@@ -69,7 +69,7 @@
 
 def _abort(operations):
   for operation in operations:
-    operation.abort(base.Outcome.LOCAL_SHUTDOWN)
+    operation.abort(base.Outcome.Kind.LOCAL_SHUTDOWN)
 
 
 def _cancel_futures(futures):
@@ -90,19 +90,19 @@
 
   Args:
     lock: A lock to hold during the termination action.
-    states: A mapping from base.Outcome values to integers to increment with
-      the outcome given to the termination action.
+    stats: A mapping from base.Outcome.Kind values to integers to increment
+      with the outcome kind given to the termination action.
     operation_id: The operation ID for the termination action.
     cycle: A _Cycle value to be updated during the termination action.
 
   Returns:
-    A callable that takes an operation outcome as its sole parameter and that
-      should be used as the termination action for the operation associated
-      with the given operation ID.
+    A callable that takes an operation outcome kind as its sole parameter and
+      that should be used as the termination action for the operation
+      associated with the given operation ID.
   """
-  def termination_action(outcome):
+  def termination_action(outcome_kind):
     with lock:
-      stats[outcome] += 1
+      stats[outcome_kind] += 1
       cycle.operations.pop(operation_id, None)
       if not cycle.operations:
         for action in cycle.idle_actions:
@@ -127,7 +127,7 @@
     self._lock = threading.Condition()
     self._servicer_package = servicer_package
 
-    self._stats = {outcome: 0 for outcome in base.Outcome}
+    self._stats = {outcome_kind: 0 for outcome_kind in base.Outcome.Kind}
 
     self._mate = None
 
diff --git a/src/python/grpcio/grpc/framework/core/_expiration.py b/src/python/grpcio/grpc/framework/core/_expiration.py
index d8690b3..ded0ab6 100644
--- a/src/python/grpcio/grpc/framework/core/_expiration.py
+++ b/src/python/grpcio/grpc/framework/core/_expiration.py
@@ -32,6 +32,7 @@
 import time
 
 from grpc.framework.core import _interfaces
+from grpc.framework.core import _utilities
 from grpc.framework.foundation import later
 from grpc.framework.interfaces.base import base
 
@@ -73,7 +74,8 @@
         if self._future is not None and index == self._index:
           self._future = None
           self._termination_manager.expire()
-          self._transmission_manager.abort(base.Outcome.EXPIRED, None, None)
+          self._transmission_manager.abort(
+              _utilities.Outcome(base.Outcome.Kind.EXPIRED, None, None))
     return expire
 
   def start(self):
diff --git a/src/python/grpcio/grpc/framework/core/_ingestion.py b/src/python/grpcio/grpc/framework/core/_ingestion.py
index 766d57f..9a7959a 100644
--- a/src/python/grpcio/grpc/framework/core/_ingestion.py
+++ b/src/python/grpcio/grpc/framework/core/_ingestion.py
@@ -35,6 +35,7 @@
 
 from grpc.framework.core import _constants
 from grpc.framework.core import _interfaces
+from grpc.framework.core import _utilities
 from grpc.framework.foundation import abandonment
 from grpc.framework.foundation import callable_util
 from grpc.framework.interfaces.base import base
@@ -46,7 +47,7 @@
 class _SubscriptionCreation(
     collections.namedtuple(
         '_SubscriptionCreation',
-        ('kind', 'subscription', 'code', 'message',))):
+        ('kind', 'subscription', 'code', 'details',))):
   """A sum type for the outcome of ingestion initialization.
 
   Attributes:
@@ -56,7 +57,7 @@
     code: A code value to be sent to the other side of the operation along with
       an indication that the operation is being aborted due to an error on the
       remote side of the operation. Only present if kind is Kind.REMOTE_ERROR.
-    message: A message value to be sent to the other side of the operation
+    details: A details value to be sent to the other side of the operation
       along with an indication that the operation is being aborted due to an
       error on the remote side of the operation. Only present if kind is
       Kind.REMOTE_ERROR.
@@ -190,11 +191,13 @@
     self._pending_payloads = None
     self._pending_completion = None
 
-  def _abort_and_notify(self, outcome, code, message):
+  def _abort_and_notify(self, outcome_kind, code, details):
     self._abort_internal_only()
-    self._termination_manager.abort(outcome)
-    self._transmission_manager.abort(outcome, code, message)
-    self._expiration_manager.terminate()
+    if self._termination_manager.outcome is None:
+      outcome = _utilities.Outcome(outcome_kind, code, details)
+      self._termination_manager.abort(outcome)
+      self._transmission_manager.abort(outcome)
+      self._expiration_manager.terminate()
 
   def _operator_next(self):
     """Computes the next step for full-subscription ingestion.
@@ -250,12 +253,13 @@
         else:
           with self._lock:
             if self._termination_manager.outcome is None:
-              self._abort_and_notify(base.Outcome.LOCAL_FAILURE, None, None)
+              self._abort_and_notify(
+                  base.Outcome.Kind.LOCAL_FAILURE, None, None)
             return
       else:
         with self._lock:
           if self._termination_manager.outcome is None:
-            self._abort_and_notify(base.Outcome.LOCAL_FAILURE, None, None)
+            self._abort_and_notify(base.Outcome.Kind.LOCAL_FAILURE, None, None)
           return
 
   def _operator_post_create(self, subscription):
@@ -279,17 +283,18 @@
     if outcome.return_value is None:
       with self._lock:
         if self._termination_manager.outcome is None:
-          self._abort_and_notify(base.Outcome.LOCAL_FAILURE, None, None)
+          self._abort_and_notify(base.Outcome.Kind.LOCAL_FAILURE, None, None)
     elif outcome.return_value.kind is _SubscriptionCreation.Kind.ABANDONED:
       with self._lock:
         if self._termination_manager.outcome is None:
-          self._abort_and_notify(base.Outcome.LOCAL_FAILURE, None, None)
+          self._abort_and_notify(base.Outcome.Kind.LOCAL_FAILURE, None, None)
     elif outcome.return_value.kind is _SubscriptionCreation.Kind.REMOTE_ERROR:
       code = outcome.return_value.code
-      message = outcome.return_value.message
+      details = outcome.return_value.details
       with self._lock:
         if self._termination_manager.outcome is None:
-          self._abort_and_notify(base.Outcome.REMOTE_FAILURE, code, message)
+          self._abort_and_notify(
+              base.Outcome.Kind.REMOTE_FAILURE, code, details)
     elif outcome.return_value.subscription.kind is base.Subscription.Kind.FULL:
       self._operator_post_create(outcome.return_value.subscription)
     else:
diff --git a/src/python/grpcio/grpc/framework/core/_interfaces.py b/src/python/grpcio/grpc/framework/core/_interfaces.py
index deb5f34..2a534cb 100644
--- a/src/python/grpcio/grpc/framework/core/_interfaces.py
+++ b/src/python/grpcio/grpc/framework/core/_interfaces.py
@@ -50,13 +50,13 @@
     If the operation has already terminated the callback will not be called.
 
     Args:
-      callback: A callable that will be passed an interfaces.Outcome value.
+      callback: A callable that will be passed a base.Outcome value.
 
     Returns:
       None if the operation has not yet terminated and the passed callback will
-        be called when it does, or a base.Outcome value describing the operation
-        termination if the operation has terminated and the callback will not be
-        called as a result of this method call.
+        be called when it does, or a base.Outcome value describing the
+        operation termination if the operation has terminated and the callback
+        will not be called as a result of this method call.
     """
     raise NotImplementedError()
 
@@ -76,8 +76,13 @@
     raise NotImplementedError()
 
   @abc.abstractmethod
-  def reception_complete(self):
-    """Indicates that reception from the other side is complete."""
+  def reception_complete(self, code, details):
+    """Indicates that reception from the other side is complete.
+
+    Args:
+      code: An application-specific code value.
+      details: An application-specific details value.
+    """
     raise NotImplementedError()
 
   @abc.abstractmethod
@@ -95,7 +100,7 @@
     """Indicates that the operation must abort for the indicated reason.
 
     Args:
-      outcome: An interfaces.Outcome indicating operation abortion.
+      outcome: A base.Outcome indicating operation abortion.
     """
     raise NotImplementedError()
 
@@ -155,19 +160,13 @@
     raise NotImplementedError()
 
   @abc.abstractmethod
-  def abort(self, outcome, code, message):
+  def abort(self, outcome):
     """Indicates that the operation has aborted.
 
     Args:
-      outcome: An interfaces.Outcome for the operation. If None, indicates that
-        the operation abortion should not be communicated to the other side of
-        the operation.
-      code: A code value to communicate to the other side of the operation
-        along with indication of operation abortion. May be None, and has no
-        effect if outcome is None.
-      message: A message value to communicate to the other side of the
-        operation along with indication of operation abortion. May be None, and
-        has no effect if outcome is None.
+      outcome: A base.Outcome for the operation. If None, indicates that the
+        operation abortion should not be communicated to the other side of the
+        operation.
     """
     raise NotImplementedError()
 
@@ -279,8 +278,7 @@
     """Handle a ticket from the other side of the operation.
 
     Args:
-      ticket: An interfaces.BackToFrontTicket or interfaces.FrontToBackTicket
-        appropriate to this end of the operation and this object.
+      ticket: A links.Ticket for the operation.
     """
     raise NotImplementedError()
 
@@ -305,10 +303,10 @@
     raise NotImplementedError()
 
   @abc.abstractmethod
-  def abort(self, outcome):
+  def abort(self, outcome_kind):
     """Aborts the operation.
 
     Args:
-      outcome: A base.Outcome value indicating operation abortion.
+      outcome_kind: A base.Outcome.Kind value indicating operation abortion.
     """
     raise NotImplementedError()
diff --git a/src/python/grpcio/grpc/framework/core/_operation.py b/src/python/grpcio/grpc/framework/core/_operation.py
index cc873c0..f5679d0 100644
--- a/src/python/grpcio/grpc/framework/core/_operation.py
+++ b/src/python/grpcio/grpc/framework/core/_operation.py
@@ -31,7 +31,6 @@
 
 import threading
 
-# _utilities is referenced from specification in this module.
 from grpc.framework.core import _context
 from grpc.framework.core import _emission
 from grpc.framework.core import _expiration
@@ -40,7 +39,7 @@
 from grpc.framework.core import _reception
 from grpc.framework.core import _termination
 from grpc.framework.core import _transmission
-from grpc.framework.core import _utilities  # pylint: disable=unused-import
+from grpc.framework.core import _utilities
 
 
 class _EasyOperation(_interfaces.Operation):
@@ -75,11 +74,12 @@
     with self._lock:
       self._reception_manager.receive_ticket(ticket)
 
-  def abort(self, outcome):
+  def abort(self, outcome_kind):
     with self._lock:
       if self._termination_manager.outcome is None:
+        outcome = _utilities.Outcome(outcome_kind, None, None)
         self._termination_manager.abort(outcome)
-        self._transmission_manager.abort(outcome, None, None)
+        self._transmission_manager.abort(outcome)
         self._expiration_manager.terminate()
 
 
diff --git a/src/python/grpcio/grpc/framework/core/_reception.py b/src/python/grpcio/grpc/framework/core/_reception.py
index 1cebe38..d374cf0 100644
--- a/src/python/grpcio/grpc/framework/core/_reception.py
+++ b/src/python/grpcio/grpc/framework/core/_reception.py
@@ -30,21 +30,26 @@
 """State and behavior for ticket reception."""
 
 from grpc.framework.core import _interfaces
+from grpc.framework.core import _utilities
 from grpc.framework.interfaces.base import base
 from grpc.framework.interfaces.base import utilities
 from grpc.framework.interfaces.links import links
 
-_REMOTE_TICKET_TERMINATION_TO_LOCAL_OUTCOME = {
-    links.Ticket.Termination.CANCELLATION: base.Outcome.CANCELLED,
-    links.Ticket.Termination.EXPIRATION: base.Outcome.EXPIRED,
-    links.Ticket.Termination.SHUTDOWN: base.Outcome.REMOTE_SHUTDOWN,
-    links.Ticket.Termination.RECEPTION_FAILURE: base.Outcome.RECEPTION_FAILURE,
+_REMOTE_TICKET_TERMINATION_TO_LOCAL_OUTCOME_KIND = {
+    links.Ticket.Termination.CANCELLATION: base.Outcome.Kind.CANCELLED,
+    links.Ticket.Termination.EXPIRATION: base.Outcome.Kind.EXPIRED,
+    links.Ticket.Termination.SHUTDOWN: base.Outcome.Kind.REMOTE_SHUTDOWN,
+    links.Ticket.Termination.RECEPTION_FAILURE:
+        base.Outcome.Kind.RECEPTION_FAILURE,
     links.Ticket.Termination.TRANSMISSION_FAILURE:
-        base.Outcome.TRANSMISSION_FAILURE,
-    links.Ticket.Termination.LOCAL_FAILURE: base.Outcome.REMOTE_FAILURE,
-    links.Ticket.Termination.REMOTE_FAILURE: base.Outcome.LOCAL_FAILURE,
+        base.Outcome.Kind.TRANSMISSION_FAILURE,
+    links.Ticket.Termination.LOCAL_FAILURE: base.Outcome.Kind.REMOTE_FAILURE,
+    links.Ticket.Termination.REMOTE_FAILURE: base.Outcome.Kind.LOCAL_FAILURE,
 }
 
+_RECEPTION_FAILURE_OUTCOME = _utilities.Outcome(
+    base.Outcome.Kind.RECEPTION_FAILURE, None, None)
+
 
 class ReceptionManager(_interfaces.ReceptionManager):
   """A ReceptionManager based around a _Receiver passed to it."""
@@ -73,7 +78,7 @@
     self._aborted = True
     if self._termination_manager.outcome is None:
       self._termination_manager.abort(outcome)
-      self._transmission_manager.abort(None, None, None)
+      self._transmission_manager.abort(None)
       self._expiration_manager.terminate()
 
   def _sequence_failure(self, ticket):
@@ -102,6 +107,7 @@
     else:
       completion = utilities.completion(
           ticket.terminal_metadata, ticket.code, ticket.message)
+      self._termination_manager.reception_complete(ticket.code, ticket.message)
     self._ingestion_manager.advance(
         ticket.initial_metadata, ticket.payload, completion, ticket.allowance)
     if ticket.allowance is not None:
@@ -129,10 +135,12 @@
     if self._aborted:
       return
     elif self._sequence_failure(ticket):
-      self._abort(base.Outcome.RECEPTION_FAILURE)
+      self._abort(_RECEPTION_FAILURE_OUTCOME)
     elif ticket.termination not in (None, links.Ticket.Termination.COMPLETION):
-      outcome = _REMOTE_TICKET_TERMINATION_TO_LOCAL_OUTCOME[ticket.termination]
-      self._abort(outcome)
+      outcome_kind = _REMOTE_TICKET_TERMINATION_TO_LOCAL_OUTCOME_KIND[
+          ticket.termination]
+      self._abort(
+          _utilities.Outcome(outcome_kind, ticket.code, ticket.message))
     elif ticket.sequence_number == self._lowest_unseen_sequence_number:
       self._process(ticket)
     else:
diff --git a/src/python/grpcio/grpc/framework/core/_termination.py b/src/python/grpcio/grpc/framework/core/_termination.py
index ad9f612..3bf7ade 100644
--- a/src/python/grpcio/grpc/framework/core/_termination.py
+++ b/src/python/grpcio/grpc/framework/core/_termination.py
@@ -33,6 +33,7 @@
 
 from grpc.framework.core import _constants
 from grpc.framework.core import _interfaces
+from grpc.framework.core import _utilities
 from grpc.framework.foundation import callable_util
 from grpc.framework.interfaces.base import base
 
@@ -74,7 +75,8 @@
       predicate: One of _invocation_completion_predicate or
         _service_completion_predicate to be used to determine when the operation
         has completed.
-      action: A behavior to pass the operation outcome on operation termination.
+      action: A behavior to pass the operation outcome's kind on operation
+        termination.
       pool: A thread pool.
     """
     self._predicate = predicate
@@ -82,14 +84,19 @@
     self._pool = pool
     self._expiration_manager = None
 
-    self.outcome = None
     self._callbacks = []
 
+    self._code = None
+    self._details = None
     self._emission_complete = False
     self._transmission_complete = False
     self._reception_complete = False
     self._ingestion_complete = False
 
+    # The None-ness of outcome is the operation-wide record of whether and how
+    # the operation has terminated.
+    self.outcome = None
+
   def set_expiration_manager(self, expiration_manager):
     self._expiration_manager = expiration_manager
 
@@ -106,8 +113,8 @@
     act = callable_util.with_exceptions_logged(
         self._action, _constants.INTERNAL_ERROR_LOG_MESSAGE)
 
-    if outcome is base.Outcome.LOCAL_FAILURE:
-      self._pool.submit(act, outcome)
+    if outcome.kind is base.Outcome.Kind.LOCAL_FAILURE:
+      self._pool.submit(act, base.Outcome.Kind.LOCAL_FAILURE)
     else:
       def call_callbacks_and_act(callbacks, outcome):
         for callback in callbacks:
@@ -115,9 +122,11 @@
               callback, _constants.TERMINATION_CALLBACK_EXCEPTION_LOG_MESSAGE,
               outcome)
           if callback_outcome.exception is not None:
-            outcome = base.Outcome.LOCAL_FAILURE
+            act_outcome_kind = base.Outcome.Kind.LOCAL_FAILURE
             break
-        act(outcome)
+        else:
+          act_outcome_kind = outcome.kind
+        act(act_outcome_kind)
 
       self._pool.submit(
           callable_util.with_exceptions_logged(
@@ -132,7 +141,9 @@
     if self._predicate(
         self._emission_complete, self._transmission_complete,
         self._reception_complete, self._ingestion_complete):
-      self._terminate_and_notify(base.Outcome.COMPLETED)
+      self._terminate_and_notify(
+          _utilities.Outcome(
+              base.Outcome.Kind.COMPLETED, self._code, self._details))
       return True
     else:
       return False
@@ -163,10 +174,12 @@
     else:
       return False
 
-  def reception_complete(self):
+  def reception_complete(self, code, details):
     """See superclass method for specification."""
     if self.outcome is None:
       self._reception_complete = True
+      self._code = code
+      self._details = details
       self._perhaps_complete()
 
   def ingestion_complete(self):
@@ -177,7 +190,8 @@
 
   def expire(self):
     """See _interfaces.TerminationManager.expire for specification."""
-    self._terminate_internal_only(base.Outcome.EXPIRED)
+    self._terminate_internal_only(
+        _utilities.Outcome(base.Outcome.Kind.EXPIRED, None, None))
 
   def abort(self, outcome):
     """See _interfaces.TerminationManager.abort for specification."""
diff --git a/src/python/grpcio/grpc/framework/core/_transmission.py b/src/python/grpcio/grpc/framework/core/_transmission.py
index 202a71d..8f852cf 100644
--- a/src/python/grpcio/grpc/framework/core/_transmission.py
+++ b/src/python/grpcio/grpc/framework/core/_transmission.py
@@ -34,12 +34,16 @@
 
 from grpc.framework.core import _constants
 from grpc.framework.core import _interfaces
+from grpc.framework.core import _utilities
 from grpc.framework.foundation import callable_util
 from grpc.framework.interfaces.base import base
 from grpc.framework.interfaces.links import links
 
 _TRANSMISSION_EXCEPTION_LOG_MESSAGE = 'Exception during transmission!'
 
+_TRANSMISSION_FAILURE_OUTCOME = _utilities.Outcome(
+    base.Outcome.Kind.TRANSMISSION_FAILURE, None, None)
+
 
 def _explode_completion(completion):
   if completion is None:
@@ -194,7 +198,7 @@
           with self._lock:
             self._abort = _ABORTED_NO_NOTIFY
             if self._termination_manager.outcome is None:
-              self._termination_manager.abort(base.Outcome.TRANSMISSION_FAILURE)
+              self._termination_manager.abort(_TRANSMISSION_FAILURE_OUTCOME)
               self._expiration_manager.terminate()
             return
 
@@ -307,19 +311,24 @@
     self._remote_complete = True
     self._local_allowance = 0
 
-  def abort(self, outcome, code, message):
+  def abort(self, outcome):
     """See _interfaces.TransmissionManager.abort for specification."""
     if self._abort.kind is _Abort.Kind.NOT_ABORTED:
-      termination = _constants.ABORTION_OUTCOME_TO_TICKET_TERMINATION.get(
-          outcome)
-      if termination is None:
+      if outcome is None:
         self._abort = _ABORTED_NO_NOTIFY
-      elif self._transmitting:
-        self._abort = _Abort(
-            _Abort.Kind.ABORTED_NOTIFY_NEEDED, termination, code, message)
       else:
-        ticket = links.Ticket(
-            self._operation_id, self._lowest_unused_sequence_number, None,
-            None, None, None, None, None, None, None, code, message,
-            termination, None)
-        self._transmit(ticket)
+        termination = _constants.ABORTION_OUTCOME_TO_TICKET_TERMINATION.get(
+            outcome.kind)
+        if termination is None:
+          self._abort = _ABORTED_NO_NOTIFY
+        elif self._transmitting:
+          self._abort = _Abort(
+              _Abort.Kind.ABORTED_NOTIFY_NEEDED, termination, outcome.code,
+              outcome.details)
+        else:
+          ticket = links.Ticket(
+              self._operation_id, self._lowest_unused_sequence_number, None,
+              None, None, None, None, None, None, None, outcome.code,
+              outcome.details, termination, None)
+          self._transmit(ticket)
+          self._abort = _ABORTED_NO_NOTIFY
diff --git a/src/python/grpcio/grpc/framework/core/_utilities.py b/src/python/grpcio/grpc/framework/core/_utilities.py
index 5b0d798..abedc72 100644
--- a/src/python/grpcio/grpc/framework/core/_utilities.py
+++ b/src/python/grpcio/grpc/framework/core/_utilities.py
@@ -31,6 +31,8 @@
 
 import collections
 
+from grpc.framework.interfaces.base import base
+
 
 class ServicerPackage(
     collections.namedtuple(
@@ -44,3 +46,9 @@
     maximum_timeout: A float indicating the maximum length of time in seconds to
       allow for an operation.
   """
+
+
+class Outcome(
+    base.Outcome,
+    collections.namedtuple('Outcome', ('kind', 'code', 'details',))):
+  """A trivial implementation of base.Outcome."""
diff --git a/src/python/grpcio/grpc/framework/crust/_control.py b/src/python/grpcio/grpc/framework/crust/_control.py
index 01de3c1..7bddf46 100644
--- a/src/python/grpcio/grpc/framework/crust/_control.py
+++ b/src/python/grpcio/grpc/framework/crust/_control.py
@@ -110,30 +110,31 @@
 
 _NOT_TERMINATED = _Termination(False, None, None)
 
-_OPERATION_OUTCOME_TO_TERMINATION_CONSTRUCTOR = {
-    base.Outcome.COMPLETED: lambda *unused_args: _Termination(True, None, None),
-    base.Outcome.CANCELLED: lambda *args: _Termination(
+_OPERATION_OUTCOME_KIND_TO_TERMINATION_CONSTRUCTOR = {
+    base.Outcome.Kind.COMPLETED: lambda *unused_args: _Termination(
+        True, None, None),
+    base.Outcome.Kind.CANCELLED: lambda *args: _Termination(
         True, face.Abortion(face.Abortion.Kind.CANCELLED, *args),
         face.CancellationError(*args)),
-    base.Outcome.EXPIRED: lambda *args: _Termination(
+    base.Outcome.Kind.EXPIRED: lambda *args: _Termination(
         True, face.Abortion(face.Abortion.Kind.EXPIRED, *args),
         face.ExpirationError(*args)),
-    base.Outcome.LOCAL_SHUTDOWN: lambda *args: _Termination(
+    base.Outcome.Kind.LOCAL_SHUTDOWN: lambda *args: _Termination(
         True, face.Abortion(face.Abortion.Kind.LOCAL_SHUTDOWN, *args),
         face.LocalShutdownError(*args)),
-    base.Outcome.REMOTE_SHUTDOWN: lambda *args: _Termination(
+    base.Outcome.Kind.REMOTE_SHUTDOWN: lambda *args: _Termination(
         True, face.Abortion(face.Abortion.Kind.REMOTE_SHUTDOWN, *args),
         face.RemoteShutdownError(*args)),
-    base.Outcome.RECEPTION_FAILURE: lambda *args: _Termination(
+    base.Outcome.Kind.RECEPTION_FAILURE: lambda *args: _Termination(
         True, face.Abortion(face.Abortion.Kind.NETWORK_FAILURE, *args),
         face.NetworkError(*args)),
-    base.Outcome.TRANSMISSION_FAILURE: lambda *args: _Termination(
+    base.Outcome.Kind.TRANSMISSION_FAILURE: lambda *args: _Termination(
         True, face.Abortion(face.Abortion.Kind.NETWORK_FAILURE, *args),
         face.NetworkError(*args)),
-    base.Outcome.LOCAL_FAILURE: lambda *args: _Termination(
+    base.Outcome.Kind.LOCAL_FAILURE: lambda *args: _Termination(
         True, face.Abortion(face.Abortion.Kind.LOCAL_FAILURE, *args),
         face.LocalError(*args)),
-    base.Outcome.REMOTE_FAILURE: lambda *args: _Termination(
+    base.Outcome.Kind.REMOTE_FAILURE: lambda *args: _Termination(
         True, face.Abortion(face.Abortion.Kind.REMOTE_FAILURE, *args),
         face.RemoteError(*args)),
 }
@@ -247,13 +248,17 @@
       else:
         initial_metadata = self._up_initial_metadata.value
       if self._up_completion.kind is _Awaited.Kind.NOT_YET_ARRIVED:
-        terminal_metadata, code, details = None, None, None
+        terminal_metadata = None
       else:
         terminal_metadata = self._up_completion.value.terminal_metadata
+      if outcome.kind is base.Outcome.Kind.COMPLETED:
         code = self._up_completion.value.code
         details = self._up_completion.value.message
-      self._termination = _OPERATION_OUTCOME_TO_TERMINATION_CONSTRUCTOR[
-          outcome](initial_metadata, terminal_metadata, code, details)
+      else:
+        code = outcome.code
+        details = outcome.details
+      self._termination = _OPERATION_OUTCOME_KIND_TO_TERMINATION_CONSTRUCTOR[
+          outcome.kind](initial_metadata, terminal_metadata, code, details)
 
       self._condition.notify_all()
 
diff --git a/src/python/grpcio/grpc/framework/interfaces/base/base.py b/src/python/grpcio/grpc/framework/interfaces/base/base.py
index bc52efb..0d9d6b4 100644
--- a/src/python/grpcio/grpc/framework/interfaces/base/base.py
+++ b/src/python/grpcio/grpc/framework/interfaces/base/base.py
@@ -40,7 +40,7 @@
 # threading is referenced from specification in this module.
 import abc
 import enum
-import threading
+import threading  # pylint: disable=unused-import
 
 # abandonment is referenced from specification in this module.
 from grpc.framework.foundation import abandonment  # pylint: disable=unused-import
@@ -69,19 +69,30 @@
     self.details = details
 
 
-@enum.unique
-class Outcome(enum.Enum):
-  """Operation outcomes."""
+class Outcome(object):
+  """The outcome of an operation.
 
-  COMPLETED = 'completed'
-  CANCELLED = 'cancelled'
-  EXPIRED = 'expired'
-  LOCAL_SHUTDOWN = 'local shutdown'
-  REMOTE_SHUTDOWN = 'remote shutdown'
-  RECEPTION_FAILURE = 'reception failure'
-  TRANSMISSION_FAILURE = 'transmission failure'
-  LOCAL_FAILURE = 'local failure'
-  REMOTE_FAILURE = 'remote failure'
+  Attributes:
+    kind: A Kind value coarsely identifying how the operation terminated.
+    code: An application-specific code value or None if no such value was
+      provided.
+    details: An application-specific details value or None if no such value was
+      provided.
+  """
+
+  @enum.unique
+  class Kind(enum.Enum):
+    """Ways in which an operation can terminate."""
+
+    COMPLETED = 'completed'
+    CANCELLED = 'cancelled'
+    EXPIRED = 'expired'
+    LOCAL_SHUTDOWN = 'local shutdown'
+    REMOTE_SHUTDOWN = 'remote shutdown'
+    RECEPTION_FAILURE = 'reception failure'
+    TRANSMISSION_FAILURE = 'transmission failure'
+    LOCAL_FAILURE = 'local failure'
+    REMOTE_FAILURE = 'remote failure'
 
 
 class Completion(object):
@@ -294,8 +305,8 @@
     """Reports the number of terminated operations broken down by outcome.
 
     Returns:
-      A dictionary from Outcome value to an integer identifying the number
-        of operations that terminated with that outcome.
+      A dictionary from Outcome.Kind value to an integer identifying the number
+        of operations that terminated with that outcome kind.
     """
     raise NotImplementedError()
 
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/base/_control.py b/src/python/grpcio_test/grpc_test/framework/interfaces/base/_control.py
index e4d2a7a..46a0187 100644
--- a/src/python/grpcio_test/grpc_test/framework/interfaces/base/_control.py
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/base/_control.py
@@ -236,8 +236,8 @@
     collections.namedtuple(
         'Instruction',
         ('kind', 'advance_args', 'advance_kwargs', 'conclude_success',
-         'conclude_message', 'conclude_invocation_outcome',
-         'conclude_service_outcome',))):
+         'conclude_message', 'conclude_invocation_outcome_kind',
+         'conclude_service_outcome_kind',))):
   """"""
 
   @enum.unique
@@ -532,24 +532,24 @@
       self._state.service_side_outcome = outcome
       if self._todo is not None or self._remaining_elements:
         self._failed('Premature service-side outcome %s!' % (outcome,))
-      elif outcome is not self._sequence.outcome.service:
+      elif outcome.kind is not self._sequence.outcome_kinds.service:
         self._failed(
-            'Incorrect service-side outcome: %s should have been %s' % (
-                outcome, self._sequence.outcome.service))
+            'Incorrect service-side outcome kind: %s should have been %s' % (
+                outcome.kind, self._sequence.outcome_kinds.service))
       elif self._state.invocation_side_outcome is not None:
-        self._passed(self._state.invocation_side_outcome, outcome)
+        self._passed(self._state.invocation_side_outcome.kind, outcome.kind)
 
   def invocation_on_termination(self, outcome):
     with self._condition:
       self._state.invocation_side_outcome = outcome
       if self._todo is not None or self._remaining_elements:
         self._failed('Premature invocation-side outcome %s!' % (outcome,))
-      elif outcome is not self._sequence.outcome.invocation:
+      elif outcome.kind is not self._sequence.outcome_kinds.invocation:
         self._failed(
-            'Incorrect invocation-side outcome: %s should have been %s' % (
-                outcome, self._sequence.outcome.invocation))
+            'Incorrect invocation-side outcome kind: %s should have been %s' % (
+                outcome.kind, self._sequence.outcome_kinds.invocation))
       elif self._state.service_side_outcome is not None:
-        self._passed(outcome, self._state.service_side_outcome)
+        self._passed(outcome.kind, self._state.service_side_outcome.kind)
 
 
 class _SequenceControllerCreator(ControllerCreator):
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/base/_sequence.py b/src/python/grpcio_test/grpc_test/framework/interfaces/base/_sequence.py
index 1d77aae..f547d91 100644
--- a/src/python/grpcio_test/grpc_test/framework/interfaces/base/_sequence.py
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/base/_sequence.py
@@ -103,13 +103,14 @@
     SERVICE_FAILURE = 'service failure'
 
 
-class Outcome(collections.namedtuple('Outcome', ('invocation', 'service',))):
+class OutcomeKinds(
+    collections.namedtuple('Outcome', ('invocation', 'service',))):
   """A description of the expected outcome of an operation test.
 
   Attributes:
-    invocation: The base.Outcome value expected on the invocation side of the
-      operation.
-    service: The base.Outcome value expected on the service side of the
+    invocation: The base.Outcome.Kind value expected on the invocation side of
+      the operation.
+    service: The base.Outcome.Kind value expected on the service side of the
       operation.
   """
 
@@ -117,7 +118,8 @@
 class Sequence(
     collections.namedtuple(
         'Sequence',
-        ('name', 'maximum_duration', 'invocation', 'elements', 'outcome',))):
+        ('name', 'maximum_duration', 'invocation', 'elements',
+         'outcome_kinds',))):
   """Describes at a high level steps to perform in a test.
 
   Attributes:
@@ -128,7 +130,8 @@
       under test.
     elements: A sequence of Element values describing at coarse granularity
       actions to take during the operation under test.
-    outcome: An Outcome value describing the expected outcome of the test.
+    outcome_kinds: An OutcomeKinds value describing the expected outcome kinds
+      of the test.
   """
 
 _EASY = Sequence(
@@ -139,7 +142,7 @@
         Element(
             Element.Kind.SERVICE_TRANSMISSION, Transmission(True, True, True)),
     ),
-    Outcome(base.Outcome.COMPLETED, base.Outcome.COMPLETED))
+    OutcomeKinds(base.Outcome.Kind.COMPLETED, base.Outcome.Kind.COMPLETED))
 
 _PEASY = Sequence(
     'Peasy',
@@ -154,7 +157,7 @@
         Element(
             Element.Kind.SERVICE_TRANSMISSION, Transmission(False, True, True)),
     ),
-    Outcome(base.Outcome.COMPLETED, base.Outcome.COMPLETED))
+    OutcomeKinds(base.Outcome.Kind.COMPLETED, base.Outcome.Kind.COMPLETED))
 
 
 # TODO(issue 2959): Finish this test suite. This tuple of sequences should
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py b/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py
index 87332cf..5065a3f 100644
--- a/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py
@@ -44,7 +44,8 @@
 
 _SYNCHRONICITY_VARIATION = (('Sync', False), ('Async', True))
 
-_EMPTY_OUTCOME_DICT = {outcome: 0 for outcome in base.Outcome}
+_EMPTY_OUTCOME_KIND_DICT = {
+    outcome_kind: 0 for outcome_kind in base.Outcome.Kind}
 
 
 class _Serialization(test_interfaces.Serialization):
@@ -119,7 +120,7 @@
 
 
 class _Servicer(base.Servicer):
-  """An base.Servicer with instrumented for testing."""
+  """A base.Servicer with instrumented for testing."""
 
   def __init__(self, group, method, controllers, pool):
     self._condition = threading.Condition()
@@ -223,11 +224,12 @@
     self.assertTrue(
         instruction.conclude_success, msg=instruction.conclude_message)
 
-    expected_invocation_stats = dict(_EMPTY_OUTCOME_DICT)
-    expected_invocation_stats[instruction.conclude_invocation_outcome] += 1
+    expected_invocation_stats = dict(_EMPTY_OUTCOME_KIND_DICT)
+    expected_invocation_stats[
+        instruction.conclude_invocation_outcome_kind] += 1
     self.assertDictEqual(expected_invocation_stats, invocation_stats)
-    expected_service_stats = dict(_EMPTY_OUTCOME_DICT)
-    expected_service_stats[instruction.conclude_service_outcome] += 1
+    expected_service_stats = dict(_EMPTY_OUTCOME_KIND_DICT)
+    expected_service_stats[instruction.conclude_service_outcome_kind] += 1
     self.assertDictEqual(expected_service_stats, service_stats)