Fixes for bugs found during manual use

(1) In _ingestion, it's the "details" attribute of a
NoSuchMethodException that we want. The "message" is inherited from the
base Exception class.

(2) In _transmission, use a proper sum type for representing operation
abortion. Trying to overload the existing _completion value for
status-and-details-when-aborting was trying to be too clever.

(3) In _calls... oof. Just look. Oof. Test coverage for this code path
is added.

(4) In _service, the application-provided
face.MultiMethodImplementation isn't directly callable, but rather
exposes a method named "service".

(5) In crust.implementations, the wrapping that we've put around the
application-provided face.MultiMethodImplementation *is* directly
callable, and *does not* expose a method named "service".

(6) Also in crust.implementations, base.NoSuchMethodError's constructor
takes a code value and a details value.

(7) Again in crust.implementations, the application-provided
face.MultiMethodImplementation may be None, and if it is None, we
shouldn't wrap it with an adaptation function that would only raise a
TypeError at a later time.
diff --git a/src/python/grpcio/grpc/framework/core/_ingestion.py b/src/python/grpcio/grpc/framework/core/_ingestion.py
index 7b8127f..766d57f 100644
--- a/src/python/grpcio/grpc/framework/core/_ingestion.py
+++ b/src/python/grpcio/grpc/framework/core/_ingestion.py
@@ -114,7 +114,7 @@
           group, method, self._operation_context, self._output_operator)
     except base.NoSuchMethodError as e:
       return _SubscriptionCreation(
-          _SubscriptionCreation.Kind.REMOTE_ERROR, None, e.code, e.message)
+          _SubscriptionCreation.Kind.REMOTE_ERROR, None, e.code, e.details)
     except abandonment.Abandoned:
       return _SubscriptionCreation(
           _SubscriptionCreation.Kind.ABANDONED, None, None, None)
diff --git a/src/python/grpcio/grpc/framework/core/_transmission.py b/src/python/grpcio/grpc/framework/core/_transmission.py
index efef87d..202a71d 100644
--- a/src/python/grpcio/grpc/framework/core/_transmission.py
+++ b/src/python/grpcio/grpc/framework/core/_transmission.py
@@ -29,6 +29,9 @@
 
 """State and behavior for ticket transmission during an operation."""
 
+import collections
+import enum
+
 from grpc.framework.core import _constants
 from grpc.framework.core import _interfaces
 from grpc.framework.foundation import callable_util
@@ -47,6 +50,31 @@
         links.Ticket.Termination.COMPLETION)
 
 
+class _Abort(
+    collections.namedtuple(
+        '_Abort', ('kind', 'termination', 'code', 'details',))):
+  """Tracks whether the operation aborted and what is to be done about it.
+
+  Attributes:
+    kind: A Kind value describing the overall kind of the _Abort.
+    termination: A links.Ticket.Termination value to be sent to the other side
+      of the operation. Only valid if kind is Kind.ABORTED_NOTIFY_NEEDED.
+    code: A code value to be sent to the other side of the operation. Only
+      valid if kind is Kind.ABORTED_NOTIFY_NEEDED.
+    details: A details value to be sent to the other side of the operation.
+      Only valid if kind is Kind.ABORTED_NOTIFY_NEEDED.
+  """
+
+  @enum.unique
+  class Kind(enum.Enum):
+    NOT_ABORTED = 'not aborted'
+    ABORTED_NOTIFY_NEEDED = 'aborted notify needed'
+    ABORTED_NO_NOTIFY = 'aborted no notify'
+
+_NOT_ABORTED = _Abort(_Abort.Kind.NOT_ABORTED, None, None, None)
+_ABORTED_NO_NOTIFY = _Abort(_Abort.Kind.ABORTED_NO_NOTIFY, None, None, None)
+
+
 class TransmissionManager(_interfaces.TransmissionManager):
   """An _interfaces.TransmissionManager that sends links.Tickets."""
 
@@ -79,8 +107,7 @@
     self._initial_metadata = None
     self._payloads = []
     self._completion = None
-    self._aborted = False
-    self._abortion_outcome = None
+    self._abort = _NOT_ABORTED
     self._transmitting = False
 
   def set_expiration_manager(self, expiration_manager):
@@ -94,24 +121,15 @@
       A links.Ticket to be sent to the other side of the operation or None if
         there is nothing to be sent at this time.
     """
-    if self._aborted:
-      if self._abortion_outcome is None:
-        return None
-      else:
-        termination = _constants.ABORTION_OUTCOME_TO_TICKET_TERMINATION[
-            self._abortion_outcome]
-        if termination is None:
-          return None
-        else:
-          self._abortion_outcome = None
-          if self._completion is None:
-            code, message = None, None
-          else:
-            code, message = self._completion.code, self._completion.message
-          return links.Ticket(
-              self._operation_id, self._lowest_unused_sequence_number, None,
-              None, None, None, None, None, None, None, code, message,
-              termination, None)
+    if self._abort.kind is _Abort.Kind.ABORTED_NO_NOTIFY:
+      return None
+    elif self._abort.kind is _Abort.Kind.ABORTED_NOTIFY_NEEDED:
+      termination = self._abort.termination
+      code, details = self._abort.code, self._abort.details
+      self._abort = _ABORTED_NO_NOTIFY
+      return links.Ticket(
+          self._operation_id, self._lowest_unused_sequence_number, None, None,
+          None, None, None, None, None, None, code, details, termination, None)
 
     action = False
     # TODO(nathaniel): Support other subscriptions.
@@ -174,6 +192,7 @@
               return
         else:
           with self._lock:
+            self._abort = _ABORTED_NO_NOTIFY
             if self._termination_manager.outcome is None:
               self._termination_manager.abort(base.Outcome.TRANSMISSION_FAILURE)
               self._expiration_manager.terminate()
@@ -201,6 +220,9 @@
 
   def advance(self, initial_metadata, payload, completion, allowance):
     """See _interfaces.TransmissionManager.advance for specification."""
+    if self._abort.kind is not _Abort.Kind.NOT_ABORTED:
+      return
+
     effective_initial_metadata = initial_metadata
     effective_payload = payload
     effective_completion = completion
@@ -246,7 +268,9 @@
 
   def timeout(self, timeout):
     """See _interfaces.TransmissionManager.timeout for specification."""
-    if self._transmitting:
+    if self._abort.kind is not _Abort.Kind.NOT_ABORTED:
+      return
+    elif self._transmitting:
       self._timeout = timeout
     else:
       ticket = links.Ticket(
@@ -257,7 +281,9 @@
 
   def allowance(self, allowance):
     """See _interfaces.TransmissionManager.allowance for specification."""
-    if self._transmitting or not self._payloads:
+    if self._abort.kind is not _Abort.Kind.NOT_ABORTED:
+      return
+    elif self._transmitting or not self._payloads:
       self._remote_allowance += allowance
     else:
       self._remote_allowance += allowance - 1
@@ -283,20 +309,17 @@
 
   def abort(self, outcome, code, message):
     """See _interfaces.TransmissionManager.abort for specification."""
-    if self._transmitting:
-      self._aborted, self._abortion_outcome = True, outcome
-    else:
-      self._aborted = True
-      if outcome is not None:
-        termination = _constants.ABORTION_OUTCOME_TO_TICKET_TERMINATION[
-            outcome]
-        if termination is not None:
-          if self._completion is None:
-            code, message = None, None
-          else:
-            code, message = self._completion.code, self._completion.message
-          ticket = links.Ticket(
-              self._operation_id, self._lowest_unused_sequence_number, None,
-              None, None, None, None, None, None, None, code, message,
-              termination, None)
-          self._transmit(ticket)
+    if self._abort.kind is _Abort.Kind.NOT_ABORTED:
+      termination = _constants.ABORTION_OUTCOME_TO_TICKET_TERMINATION.get(
+          outcome)
+      if termination is None:
+        self._abort = _ABORTED_NO_NOTIFY
+      elif self._transmitting:
+        self._abort = _Abort(
+            _Abort.Kind.ABORTED_NOTIFY_NEEDED, termination, code, message)
+      else:
+        ticket = links.Ticket(
+            self._operation_id, self._lowest_unused_sequence_number, None,
+            None, None, None, None, None, None, None, code, message,
+            termination, None)
+        self._transmit(ticket)
diff --git a/src/python/grpcio/grpc/framework/crust/_calls.py b/src/python/grpcio/grpc/framework/crust/_calls.py
index f9077be..4c6bf16 100644
--- a/src/python/grpcio/grpc/framework/crust/_calls.py
+++ b/src/python/grpcio/grpc/framework/crust/_calls.py
@@ -98,7 +98,7 @@
   rendezvous, unused_operation_context, unused_outcome = _invoke(
       end, group, method, timeout, initial_metadata, payload, True)
   if with_call:
-    return next(rendezvous, rendezvous)
+    return next(rendezvous), rendezvous
   else:
     return next(rendezvous)
 
diff --git a/src/python/grpcio/grpc/framework/crust/_service.py b/src/python/grpcio/grpc/framework/crust/_service.py
index 2455a58..6ff7249 100644
--- a/src/python/grpcio/grpc/framework/crust/_service.py
+++ b/src/python/grpcio/grpc/framework/crust/_service.py
@@ -154,7 +154,7 @@
     outcome = operation_context.add_termination_callback(rendezvous.set_outcome)
     if outcome is None:
       def in_pool():
-        request_consumer = multi_method(
+        request_consumer = multi_method.service(
             group, method, rendezvous, _ServicerContext(rendezvous))
         for request in rendezvous:
           request_consumer.consume(request)
diff --git a/src/python/grpcio/grpc/framework/crust/implementations.py b/src/python/grpcio/grpc/framework/crust/implementations.py
index 12f7e79..d38fab8 100644
--- a/src/python/grpcio/grpc/framework/crust/implementations.py
+++ b/src/python/grpcio/grpc/framework/crust/implementations.py
@@ -49,12 +49,12 @@
       return adapted_method(output_operator, context)
     elif self._adapted_multi_method is not None:
       try:
-        return self._adapted_multi_method.service(
+        return self._adapted_multi_method(
             group, method, output_operator, context)
       except face.NoSuchMethodError:
-        raise base.NoSuchMethodError()
+        raise base.NoSuchMethodError(None, None)
     else:
-      raise base.NoSuchMethodError()
+      raise base.NoSuchMethodError(None, None)
 
 
 class _UnaryUnaryMultiCallable(face.UnaryUnaryMultiCallable):
@@ -315,8 +315,11 @@
   """
   adapted_implementations = _adapt_method_implementations(
       method_implementations, pool)
-  adapted_multi_method_implementation = _service.adapt_multi_method(
-      multi_method_implementation, pool)
+  if multi_method_implementation is None:
+    adapted_multi_method_implementation = None
+  else:
+    adapted_multi_method_implementation = _service.adapt_multi_method(
+        multi_method_implementation, pool)
   return _BaseServicer(
       adapted_implementations, adapted_multi_method_implementation)
 
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_blocking_invocation_inline_service.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_blocking_invocation_inline_service.py
index b7dd5d4..2d2a081 100644
--- a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_blocking_invocation_inline_service.py
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_blocking_invocation_inline_service.py
@@ -82,8 +82,8 @@
       for test_messages in test_messages_sequence:
         request = test_messages.request()
 
-        response = self._invoker.blocking(group, method)(
-            request, test_constants.LONG_TIMEOUT)
+        response, call = self._invoker.blocking(group, method)(
+            request, test_constants.LONG_TIMEOUT, with_call=True)
 
         test_messages.verify(request, response, self)
 
@@ -105,8 +105,8 @@
       for test_messages in test_messages_sequence:
         requests = test_messages.requests()
 
-        response = self._invoker.blocking(group, method)(
-            iter(requests), test_constants.LONG_TIMEOUT)
+        response, call = self._invoker.blocking(group, method)(
+            iter(requests), test_constants.LONG_TIMEOUT, with_call=True)
 
         test_messages.verify(requests, response, self)