Merge pull request #3047 from nathanielmanistaatgoogle/status

Status code conformance in grpc._links
diff --git a/src/python/grpcio/grpc/_links/invocation.py b/src/python/grpcio/grpc/_links/invocation.py
index a74c77e..ee3d72f 100644
--- a/src/python/grpcio/grpc/_links/invocation.py
+++ b/src/python/grpcio/grpc/_links/invocation.py
@@ -141,6 +141,8 @@
       termination = links.Ticket.Termination.CANCELLATION
     elif event.status.code is _intermediary_low.Code.DEADLINE_EXCEEDED:
       termination = links.Ticket.Termination.EXPIRATION
+    elif event.status.code is _intermediary_low.Code.UNKNOWN:
+      termination = links.Ticket.Termination.LOCAL_FAILURE
     else:
       termination = links.Ticket.Termination.TRANSMISSION_FAILURE
     ticket = links.Ticket(
@@ -349,7 +351,7 @@
   """Creates an InvocationLink.
 
   Args:
-    channel: A channel for use by the link.
+    channel: An _intermediary_low.Channel for use by the link.
     host: The host to specify when invoking RPCs.
     request_serializers: A dict from group-method pair to request object
       serialization behavior.
diff --git a/src/python/grpcio/grpc/_links/service.py b/src/python/grpcio/grpc/_links/service.py
index 1436176..43c4c0e 100644
--- a/src/python/grpcio/grpc/_links/service.py
+++ b/src/python/grpcio/grpc/_links/service.py
@@ -40,6 +40,19 @@
 from grpc.framework.foundation import relay
 from grpc.framework.interfaces.links import links
 
+_TERMINATION_KIND_TO_CODE = {
+    links.Ticket.Termination.COMPLETION: _intermediary_low.Code.OK,
+    links.Ticket.Termination.CANCELLATION: _intermediary_low.Code.CANCELLED,
+    links.Ticket.Termination.EXPIRATION:
+        _intermediary_low.Code.DEADLINE_EXCEEDED,
+    links.Ticket.Termination.SHUTDOWN: _intermediary_low.Code.UNAVAILABLE,
+    links.Ticket.Termination.RECEPTION_FAILURE: _intermediary_low.Code.INTERNAL,
+    links.Ticket.Termination.TRANSMISSION_FAILURE:
+        _intermediary_low.Code.INTERNAL,
+    links.Ticket.Termination.LOCAL_FAILURE: _intermediary_low.Code.UNKNOWN,
+    links.Ticket.Termination.REMOTE_FAILURE: _intermediary_low.Code.UNKNOWN,
+}
+
 
 @enum.unique
 class _Read(enum.Enum):
@@ -93,6 +106,15 @@
     call.add_metadata(metadata_key, metadata_value)
 
 
+def _status(termination_kind, code, details):
+  effective_details = b'' if details is None else details
+  if code is None:
+    effective_code = _TERMINATION_KIND_TO_CODE[termination_kind]
+  else:
+    effective_code = code
+  return _intermediary_low.Status(effective_code, effective_details)
+
+
 class _Kernel(object):
 
   def __init__(self, request_deserializers, response_serializers, ticket_relay):
@@ -170,8 +192,10 @@
     if rpc_state.high_write is _HighWrite.CLOSED:
       if rpc_state.terminal_metadata is not None:
         _metadatafy(call, rpc_state.terminal_metadata)
-      call.status(
-          _intermediary_low.Status(rpc_state.code, rpc_state.message), call)
+      status = _status(
+          links.Ticket.Termination.COMPLETION, rpc_state.code,
+          rpc_state.message)
+      call.status(status, call)
       rpc_state.low_write = _LowWrite.CLOSED
     else:
       ticket = links.Ticket(
@@ -279,14 +303,17 @@
         if rpc_state.low_write is _LowWrite.OPEN:
           if rpc_state.terminal_metadata is not None:
             _metadatafy(call, rpc_state.terminal_metadata)
-          status = _intermediary_low.Status(
-              _intermediary_low.Code.OK
-              if rpc_state.code is None else rpc_state.code,
-              '' if rpc_state.message is None else rpc_state.message)
+          status = _status(
+              links.Ticket.Termination.COMPLETION, rpc_state.code,
+              rpc_state.message)
           call.status(status, call)
           rpc_state.low_write = _LowWrite.CLOSED
       elif ticket.termination is not None:
-        call.cancel()
+        if rpc_state.terminal_metadata is not None:
+          _metadatafy(call, rpc_state.terminal_metadata)
+        status = _status(
+            ticket.termination, rpc_state.code, rpc_state.message)
+        call.status(status, call)
         self._rpc_states.pop(call, None)
 
   def add_port(self, port, server_credentials):