Merge pull request #3040 from nathanielmanistaatgoogle/face

A test suite for the RPC Framework Face interface
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/__init__.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/__init__.py
new file mode 100644
index 0000000..7086519
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/__init__.py
@@ -0,0 +1,30 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_blocking_invocation_inline_service.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_blocking_invocation_inline_service.py
new file mode 100644
index 0000000..857ad5c
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_blocking_invocation_inline_service.py
@@ -0,0 +1,250 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Test code for the Face layer of RPC Framework."""
+
+import abc
+import unittest
+
+# test_interfaces is referenced from specification in this module.
+from grpc.framework.interfaces.face import face
+from grpc_test.framework.common import test_constants
+from grpc_test.framework.common import test_control
+from grpc_test.framework.common import test_coverage
+from grpc_test.framework.interfaces.face import _digest
+from grpc_test.framework.interfaces.face import _stock_service
+from grpc_test.framework.interfaces.face import test_interfaces  # pylint: disable=unused-import
+
+
+class TestCase(test_coverage.Coverage, unittest.TestCase):
+  """A test of the Face layer of RPC Framework.
+
+  Concrete subclasses must have an "implementation" attribute of type
+  test_interfaces.Implementation and an "invoker_constructor" attribute of type
+  _invocation.InvokerConstructor.
+  """
+  __metaclass__ = abc.ABCMeta
+
+  NAME = 'BlockingInvocationInlineServiceTest'
+
+  def setUp(self):
+    """See unittest.TestCase.setUp for full specification.
+
+    Overriding implementations must call this implementation.
+    """
+    self._control = test_control.PauseFailControl()
+    self._digest = _digest.digest(
+        _stock_service.STOCK_TEST_SERVICE, self._control, None)
+
+    generic_stub, dynamic_stubs, self._memo = self.implementation.instantiate(
+        self._digest.methods, self._digest.inline_method_implementations, None)
+    self._invoker = self.invoker_constructor.construct_invoker(
+        generic_stub, dynamic_stubs, self._digest.methods)
+
+  def tearDown(self):
+    """See unittest.TestCase.tearDown for full specification.
+
+    Overriding implementations must call this implementation.
+    """
+    self.implementation.destantiate(self._memo)
+
+  def testSuccessfulUnaryRequestUnaryResponse(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.unary_unary_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        request = test_messages.request()
+
+        response = self._invoker.blocking(group, method)(
+            request, test_constants.LONG_TIMEOUT)
+
+        test_messages.verify(request, response, self)
+
+  def testSuccessfulUnaryRequestStreamResponse(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.unary_stream_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        request = test_messages.request()
+
+        response_iterator = self._invoker.blocking(group, method)(
+            request, test_constants.LONG_TIMEOUT)
+        responses = list(response_iterator)
+
+        test_messages.verify(request, responses, self)
+
+  def testSuccessfulStreamRequestUnaryResponse(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.stream_unary_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        requests = test_messages.requests()
+
+        response = self._invoker.blocking(group, method)(
+            iter(requests), test_constants.LONG_TIMEOUT)
+
+        test_messages.verify(requests, response, self)
+
+  def testSuccessfulStreamRequestStreamResponse(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.stream_stream_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        requests = test_messages.requests()
+
+        response_iterator = self._invoker.blocking(group, method)(
+            iter(requests), test_constants.LONG_TIMEOUT)
+        responses = list(response_iterator)
+
+        test_messages.verify(requests, responses, self)
+
+  def testSequentialInvocations(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.unary_unary_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        first_request = test_messages.request()
+        second_request = test_messages.request()
+
+        first_response = self._invoker.blocking(group, method)(
+            first_request, test_constants.LONG_TIMEOUT)
+
+        test_messages.verify(first_request, first_response, self)
+
+        second_response = self._invoker.blocking(group, method)(
+            second_request, test_constants.LONG_TIMEOUT)
+
+        test_messages.verify(second_request, second_response, self)
+
+  @unittest.skip('Parallel invocations impossible with blocking control flow!')
+  def testParallelInvocations(self):
+    raise NotImplementedError()
+
+  @unittest.skip('Parallel invocations impossible with blocking control flow!')
+  def testWaitingForSomeButNotAllParallelInvocations(self):
+    raise NotImplementedError()
+
+  @unittest.skip('Cancellation impossible with blocking control flow!')
+  def testCancelledUnaryRequestUnaryResponse(self):
+    raise NotImplementedError()
+
+  @unittest.skip('Cancellation impossible with blocking control flow!')
+  def testCancelledUnaryRequestStreamResponse(self):
+    raise NotImplementedError()
+
+  @unittest.skip('Cancellation impossible with blocking control flow!')
+  def testCancelledStreamRequestUnaryResponse(self):
+    raise NotImplementedError()
+
+  @unittest.skip('Cancellation impossible with blocking control flow!')
+  def testCancelledStreamRequestStreamResponse(self):
+    raise NotImplementedError()
+
+  def testExpiredUnaryRequestUnaryResponse(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.unary_unary_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        request = test_messages.request()
+
+        with self._control.pause(), self.assertRaises(
+            face.ExpirationError):
+          self._invoker.blocking(group, method)(
+              request, test_constants.SHORT_TIMEOUT)
+
+  def testExpiredUnaryRequestStreamResponse(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.unary_stream_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        request = test_messages.request()
+
+        with self._control.pause(), self.assertRaises(
+            face.ExpirationError):
+          response_iterator = self._invoker.blocking(group, method)(
+              request, test_constants.SHORT_TIMEOUT)
+          list(response_iterator)
+
+  def testExpiredStreamRequestUnaryResponse(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.stream_unary_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        requests = test_messages.requests()
+
+        with self._control.pause(), self.assertRaises(
+            face.ExpirationError):
+          self._invoker.blocking(group, method)(
+              iter(requests), test_constants.SHORT_TIMEOUT)
+
+  def testExpiredStreamRequestStreamResponse(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.stream_stream_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        requests = test_messages.requests()
+
+        with self._control.pause(), self.assertRaises(
+            face.ExpirationError):
+          response_iterator = self._invoker.blocking(group, method)(
+              iter(requests), test_constants.SHORT_TIMEOUT)
+          list(response_iterator)
+
+  def testFailedUnaryRequestUnaryResponse(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.unary_unary_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        request = test_messages.request()
+
+        with self._control.fail(), self.assertRaises(face.RemoteError):
+          self._invoker.blocking(group, method)(
+              request, test_constants.LONG_TIMEOUT)
+
+  def testFailedUnaryRequestStreamResponse(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.unary_stream_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        request = test_messages.request()
+
+        with self._control.fail(), self.assertRaises(face.RemoteError):
+          response_iterator = self._invoker.blocking(group, method)(
+              request, test_constants.LONG_TIMEOUT)
+          list(response_iterator)
+
+  def testFailedStreamRequestUnaryResponse(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.stream_unary_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        requests = test_messages.requests()
+
+        with self._control.fail(), self.assertRaises(face.RemoteError):
+          self._invoker.blocking(group, method)(
+              iter(requests), test_constants.LONG_TIMEOUT)
+
+  def testFailedStreamRequestStreamResponse(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.stream_stream_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        requests = test_messages.requests()
+
+        with self._control.fail(), self.assertRaises(face.RemoteError):
+          response_iterator = self._invoker.blocking(group, method)(
+              iter(requests), test_constants.LONG_TIMEOUT)
+          list(response_iterator)
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_digest.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_digest.py
new file mode 100644
index 0000000..da56ed7
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_digest.py
@@ -0,0 +1,444 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Code for making a service.TestService more amenable to use in tests."""
+
+import collections
+import threading
+
+# test_control, _service, and test_interfaces are referenced from specification
+# in this module.
+from grpc.framework.common import cardinality
+from grpc.framework.common import style
+from grpc.framework.foundation import stream
+from grpc.framework.foundation import stream_util
+from grpc.framework.interfaces.face import face
+from grpc_test.framework.common import test_control  # pylint: disable=unused-import
+from grpc_test.framework.interfaces.face import _service  # pylint: disable=unused-import
+from grpc_test.framework.interfaces.face import test_interfaces  # pylint: disable=unused-import
+
+_IDENTITY = lambda x: x
+
+
+class TestServiceDigest(
+    collections.namedtuple(
+        'TestServiceDigest',
+        ('methods',
+         'inline_method_implementations',
+         'event_method_implementations',
+         'multi_method_implementation',
+         'unary_unary_messages_sequences',
+         'unary_stream_messages_sequences',
+         'stream_unary_messages_sequences',
+         'stream_stream_messages_sequences',))):
+  """A transformation of a service.TestService.
+
+  Attributes:
+    methods: A dict from method group-name pair to test_interfaces.Method object
+      describing the RPC methods that may be called during the test.
+    inline_method_implementations: A dict from method group-name pair to
+      face.MethodImplementation object to be used in tests of in-line calls to
+      behaviors under test.
+    event_method_implementations: A dict from method group-name pair to
+      face.MethodImplementation object to be used in tests of event-driven calls
+      to behaviors under test.
+    multi_method_implementation: A face.MultiMethodImplementation to be used in
+      tests of generic calls to behaviors under test.
+    unary_unary_messages_sequences: A dict from method group-name pair to
+      sequence of service.UnaryUnaryTestMessages objects to be used to test the
+      identified method.
+    unary_stream_messages_sequences: A dict from method group-name pair to
+      sequence of service.UnaryStreamTestMessages objects to be used to test the
+      identified method.
+    stream_unary_messages_sequences: A dict from method group-name pair to
+      sequence of service.StreamUnaryTestMessages objects to be used to test the
+      identified method.
+    stream_stream_messages_sequences: A dict from method group-name pair to
+      sequence of service.StreamStreamTestMessages objects to be used to test
+      the identified method.
+  """
+
+
+class _BufferingConsumer(stream.Consumer):
+  """A trivial Consumer that dumps what it consumes in a user-mutable buffer."""
+
+  def __init__(self):
+    self.consumed = []
+    self.terminated = False
+
+  def consume(self, value):
+    self.consumed.append(value)
+
+  def terminate(self):
+    self.terminated = True
+
+  def consume_and_terminate(self, value):
+    self.consumed.append(value)
+    self.terminated = True
+
+
+class _InlineUnaryUnaryMethod(face.MethodImplementation):
+
+  def __init__(self, unary_unary_test_method, control):
+    self._test_method = unary_unary_test_method
+    self._control = control
+
+    self.cardinality = cardinality.Cardinality.UNARY_UNARY
+    self.style = style.Service.INLINE
+
+  def unary_unary_inline(self, request, context):
+    response_list = []
+    self._test_method.service(
+        request, response_list.append, context, self._control)
+    return response_list.pop(0)
+
+
+class _EventUnaryUnaryMethod(face.MethodImplementation):
+
+  def __init__(self, unary_unary_test_method, control, pool):
+    self._test_method = unary_unary_test_method
+    self._control = control
+    self._pool = pool
+
+    self.cardinality = cardinality.Cardinality.UNARY_UNARY
+    self.style = style.Service.EVENT
+
+  def unary_unary_event(self, request, response_callback, context):
+    if self._pool is None:
+      self._test_method.service(
+          request, response_callback, context, self._control)
+    else:
+      self._pool.submit(
+          self._test_method.service, request, response_callback, context,
+          self._control)
+
+
+class _InlineUnaryStreamMethod(face.MethodImplementation):
+
+  def __init__(self, unary_stream_test_method, control):
+    self._test_method = unary_stream_test_method
+    self._control = control
+
+    self.cardinality = cardinality.Cardinality.UNARY_STREAM
+    self.style = style.Service.INLINE
+
+  def unary_stream_inline(self, request, context):
+    response_consumer = _BufferingConsumer()
+    self._test_method.service(
+        request, response_consumer, context, self._control)
+    for response in response_consumer.consumed:
+      yield response
+
+
+class _EventUnaryStreamMethod(face.MethodImplementation):
+
+  def __init__(self, unary_stream_test_method, control, pool):
+    self._test_method = unary_stream_test_method
+    self._control = control
+    self._pool = pool
+
+    self.cardinality = cardinality.Cardinality.UNARY_STREAM
+    self.style = style.Service.EVENT
+
+  def unary_stream_event(self, request, response_consumer, context):
+    if self._pool is None:
+      self._test_method.service(
+          request, response_consumer, context, self._control)
+    else:
+      self._pool.submit(
+          self._test_method.service, request, response_consumer, context,
+          self._control)
+
+
+class _InlineStreamUnaryMethod(face.MethodImplementation):
+
+  def __init__(self, stream_unary_test_method, control):
+    self._test_method = stream_unary_test_method
+    self._control = control
+
+    self.cardinality = cardinality.Cardinality.STREAM_UNARY
+    self.style = style.Service.INLINE
+
+  def stream_unary_inline(self, request_iterator, context):
+    response_list = []
+    request_consumer = self._test_method.service(
+        response_list.append, context, self._control)
+    for request in request_iterator:
+      request_consumer.consume(request)
+    request_consumer.terminate()
+    return response_list.pop(0)
+
+
+class _EventStreamUnaryMethod(face.MethodImplementation):
+
+  def __init__(self, stream_unary_test_method, control, pool):
+    self._test_method = stream_unary_test_method
+    self._control = control
+    self._pool = pool
+
+    self.cardinality = cardinality.Cardinality.STREAM_UNARY
+    self.style = style.Service.EVENT
+
+  def stream_unary_event(self, response_callback, context):
+    request_consumer = self._test_method.service(
+        response_callback, context, self._control)
+    if self._pool is None:
+      return request_consumer
+    else:
+      return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool)
+
+
+class _InlineStreamStreamMethod(face.MethodImplementation):
+
+  def __init__(self, stream_stream_test_method, control):
+    self._test_method = stream_stream_test_method
+    self._control = control
+
+    self.cardinality = cardinality.Cardinality.STREAM_STREAM
+    self.style = style.Service.INLINE
+
+  def stream_stream_inline(self, request_iterator, context):
+    response_consumer = _BufferingConsumer()
+    request_consumer = self._test_method.service(
+        response_consumer, context, self._control)
+
+    for request in request_iterator:
+      request_consumer.consume(request)
+      while response_consumer.consumed:
+        yield response_consumer.consumed.pop(0)
+    response_consumer.terminate()
+
+
+class _EventStreamStreamMethod(face.MethodImplementation):
+
+  def __init__(self, stream_stream_test_method, control, pool):
+    self._test_method = stream_stream_test_method
+    self._control = control
+    self._pool = pool
+
+    self.cardinality = cardinality.Cardinality.STREAM_STREAM
+    self.style = style.Service.EVENT
+
+  def stream_stream_event(self, response_consumer, context):
+    request_consumer = self._test_method.service(
+        response_consumer, context, self._control)
+    if self._pool is None:
+      return request_consumer
+    else:
+      return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool)
+
+
+class _UnaryConsumer(stream.Consumer):
+  """A Consumer that only allows consumption of exactly one value."""
+
+  def __init__(self, action):
+    self._lock = threading.Lock()
+    self._action = action
+    self._consumed = False
+    self._terminated = False
+
+  def consume(self, value):
+    with self._lock:
+      if self._consumed:
+        raise ValueError('Unary consumer already consumed!')
+      elif self._terminated:
+        raise ValueError('Unary consumer already terminated!')
+      else:
+        self._consumed = True
+
+    self._action(value)
+
+  def terminate(self):
+    with self._lock:
+      if not self._consumed:
+        raise ValueError('Unary consumer hasn\'t yet consumed!')
+      elif self._terminated:
+        raise ValueError('Unary consumer already terminated!')
+      else:
+        self._terminated = True
+
+  def consume_and_terminate(self, value):
+    with self._lock:
+      if self._consumed:
+        raise ValueError('Unary consumer already consumed!')
+      elif self._terminated:
+        raise ValueError('Unary consumer already terminated!')
+      else:
+        self._consumed = True
+        self._terminated = True
+
+    self._action(value)
+
+
+class _UnaryUnaryAdaptation(object):
+
+  def __init__(self, unary_unary_test_method):
+    self._method = unary_unary_test_method
+
+  def service(self, response_consumer, context, control):
+    def action(request):
+      self._method.service(
+          request, response_consumer.consume_and_terminate, context, control)
+    return _UnaryConsumer(action)
+
+
+class _UnaryStreamAdaptation(object):
+
+  def __init__(self, unary_stream_test_method):
+    self._method = unary_stream_test_method
+
+  def service(self, response_consumer, context, control):
+    def action(request):
+      self._method.service(request, response_consumer, context, control)
+    return _UnaryConsumer(action)
+
+
+class _StreamUnaryAdaptation(object):
+
+  def __init__(self, stream_unary_test_method):
+    self._method = stream_unary_test_method
+
+  def service(self, response_consumer, context, control):
+    return self._method.service(
+        response_consumer.consume_and_terminate, context, control)
+
+
+class _MultiMethodImplementation(face.MultiMethodImplementation):
+
+  def __init__(self, methods, control, pool):
+    self._methods = methods
+    self._control = control
+    self._pool = pool
+
+  def service(self, group, name, response_consumer, context):
+    method = self._methods.get(group, name, None)
+    if method is None:
+      raise face.NoSuchMethodError(group, name)
+    elif self._pool is None:
+      return method(response_consumer, context, self._control)
+    else:
+      request_consumer = method(response_consumer, context, self._control)
+      return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool)
+
+
+class _Assembly(
+    collections.namedtuple(
+        '_Assembly',
+        ['methods', 'inlines', 'events', 'adaptations', 'messages'])):
+  """An intermediate structure created when creating a TestServiceDigest."""
+
+
+def _assemble(
+    scenarios, identifiers, inline_method_constructor, event_method_constructor,
+    adapter, control, pool):
+  """Creates an _Assembly from the given scenarios."""
+  methods = {}
+  inlines = {}
+  events = {}
+  adaptations = {}
+  messages = {}
+  for identifier, scenario in scenarios.iteritems():
+    if identifier in identifiers:
+      raise ValueError('Repeated identifier "(%s, %s)"!' % identifier)
+
+    test_method = scenario[0]
+    inline_method = inline_method_constructor(test_method, control)
+    event_method = event_method_constructor(test_method, control, pool)
+    adaptation = adapter(test_method)
+
+    methods[identifier] = test_method
+    inlines[identifier] = inline_method
+    events[identifier] = event_method
+    adaptations[identifier] = adaptation
+    messages[identifier] = scenario[1]
+
+  return _Assembly(methods, inlines, events, adaptations, messages)
+
+
+def digest(service, control, pool):
+  """Creates a TestServiceDigest from a TestService.
+
+  Args:
+    service: A _service.TestService.
+    control: A test_control.Control.
+    pool: If RPC methods should be serviced in a separate thread, a thread pool.
+      None if RPC methods should be serviced in the thread belonging to the
+      run-time that calls for their service.
+
+  Returns:
+    A TestServiceDigest synthesized from the given service.TestService.
+  """
+  identifiers = set()
+
+  unary_unary = _assemble(
+      service.unary_unary_scenarios(), identifiers, _InlineUnaryUnaryMethod,
+      _EventUnaryUnaryMethod, _UnaryUnaryAdaptation, control, pool)
+  identifiers.update(unary_unary.inlines)
+
+  unary_stream = _assemble(
+      service.unary_stream_scenarios(), identifiers, _InlineUnaryStreamMethod,
+      _EventUnaryStreamMethod, _UnaryStreamAdaptation, control, pool)
+  identifiers.update(unary_stream.inlines)
+
+  stream_unary = _assemble(
+      service.stream_unary_scenarios(), identifiers, _InlineStreamUnaryMethod,
+      _EventStreamUnaryMethod, _StreamUnaryAdaptation, control, pool)
+  identifiers.update(stream_unary.inlines)
+
+  stream_stream = _assemble(
+      service.stream_stream_scenarios(), identifiers, _InlineStreamStreamMethod,
+      _EventStreamStreamMethod, _IDENTITY, control, pool)
+  identifiers.update(stream_stream.inlines)
+
+  methods = dict(unary_unary.methods)
+  methods.update(unary_stream.methods)
+  methods.update(stream_unary.methods)
+  methods.update(stream_stream.methods)
+  adaptations = dict(unary_unary.adaptations)
+  adaptations.update(unary_stream.adaptations)
+  adaptations.update(stream_unary.adaptations)
+  adaptations.update(stream_stream.adaptations)
+  inlines = dict(unary_unary.inlines)
+  inlines.update(unary_stream.inlines)
+  inlines.update(stream_unary.inlines)
+  inlines.update(stream_stream.inlines)
+  events = dict(unary_unary.events)
+  events.update(unary_stream.events)
+  events.update(stream_unary.events)
+  events.update(stream_stream.events)
+
+  return TestServiceDigest(
+      methods,
+      inlines,
+      events,
+      _MultiMethodImplementation(adaptations, control, pool),
+      unary_unary.messages,
+      unary_stream.messages,
+      stream_unary.messages,
+      stream_stream.messages)
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_event_invocation_synchronous_event_service.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_event_invocation_synchronous_event_service.py
new file mode 100644
index 0000000..ea5cdea
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_event_invocation_synchronous_event_service.py
@@ -0,0 +1,377 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Test code for the Face layer of RPC Framework."""
+
+import abc
+import unittest
+
+# test_interfaces is referenced from specification in this module.
+from grpc.framework.interfaces.face import face
+from grpc_test.framework.common import test_constants
+from grpc_test.framework.common import test_control
+from grpc_test.framework.common import test_coverage
+from grpc_test.framework.interfaces.face import _digest
+from grpc_test.framework.interfaces.face import _receiver
+from grpc_test.framework.interfaces.face import _stock_service
+from grpc_test.framework.interfaces.face import test_interfaces  # pylint: disable=unused-import
+
+
+class TestCase(test_coverage.Coverage, unittest.TestCase):
+  """A test of the Face layer of RPC Framework.
+
+  Concrete subclasses must have an "implementation" attribute of type
+  test_interfaces.Implementation and an "invoker_constructor" attribute of type
+  _invocation.InvokerConstructor.
+  """
+  __metaclass__ = abc.ABCMeta
+
+  NAME = 'EventInvocationSynchronousEventServiceTest'
+
+  def setUp(self):
+    """See unittest.TestCase.setUp for full specification.
+
+    Overriding implementations must call this implementation.
+    """
+    self._control = test_control.PauseFailControl()
+    self._digest = _digest.digest(
+        _stock_service.STOCK_TEST_SERVICE, self._control, None)
+
+    generic_stub, dynamic_stubs, self._memo = self.implementation.instantiate(
+        self._digest.methods, self._digest.event_method_implementations, None)
+    self._invoker = self.invoker_constructor.construct_invoker(
+        generic_stub, dynamic_stubs, self._digest.methods)
+
+  def tearDown(self):
+    """See unittest.TestCase.tearDown for full specification.
+
+    Overriding implementations must call this implementation.
+    """
+    self.implementation.destantiate(self._memo)
+
+  def testSuccessfulUnaryRequestUnaryResponse(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.unary_unary_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        request = test_messages.request()
+        receiver = _receiver.Receiver()
+
+        self._invoker.event(group, method)(
+            request, receiver, receiver.abort, test_constants.LONG_TIMEOUT)
+        receiver.block_until_terminated()
+        response = receiver.unary_response()
+
+        test_messages.verify(request, response, self)
+
+  def testSuccessfulUnaryRequestStreamResponse(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.unary_stream_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        request = test_messages.request()
+        receiver = _receiver.Receiver()
+
+        self._invoker.event(group, method)(
+            request, receiver, receiver.abort, test_constants.LONG_TIMEOUT)
+        receiver.block_until_terminated()
+        responses = receiver.stream_responses()
+
+        test_messages.verify(request, responses, self)
+
+  def testSuccessfulStreamRequestUnaryResponse(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.stream_unary_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        requests = test_messages.requests()
+        receiver = _receiver.Receiver()
+
+        call_consumer = self._invoker.event(group, method)(
+            receiver, receiver.abort, test_constants.LONG_TIMEOUT)
+        for request in requests:
+          call_consumer.consume(request)
+        call_consumer.terminate()
+        receiver.block_until_terminated()
+        response = receiver.unary_response()
+
+        test_messages.verify(requests, response, self)
+
+  def testSuccessfulStreamRequestStreamResponse(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.stream_stream_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        requests = test_messages.requests()
+        receiver = _receiver.Receiver()
+
+        call_consumer = self._invoker.event(group, method)(
+            receiver, receiver.abort, test_constants.LONG_TIMEOUT)
+        for request in requests:
+          call_consumer.consume(request)
+        call_consumer.terminate()
+        receiver.block_until_terminated()
+        responses = receiver.stream_responses()
+
+        test_messages.verify(requests, responses, self)
+
+  def testSequentialInvocations(self):
+    # pylint: disable=cell-var-from-loop
+    for (group, method), test_messages_sequence in (
+        self._digest.unary_unary_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        first_request = test_messages.request()
+        second_request = test_messages.request()
+        second_receiver = _receiver.Receiver()
+
+        def make_second_invocation():
+          self._invoker.event(group, method)(
+              second_request, second_receiver, second_receiver.abort,
+              test_constants.LONG_TIMEOUT)
+
+        class FirstReceiver(_receiver.Receiver):
+
+          def complete(self, terminal_metadata, code, details):
+            super(FirstReceiver, self).complete(
+                terminal_metadata, code, details)
+            make_second_invocation()
+
+        first_receiver = FirstReceiver()
+
+        self._invoker.event(group, method)(
+            first_request, first_receiver, first_receiver.abort,
+            test_constants.LONG_TIMEOUT)
+        second_receiver.block_until_terminated()
+
+        first_response = first_receiver.unary_response()
+        second_response = second_receiver.unary_response()
+        test_messages.verify(first_request, first_response, self)
+        test_messages.verify(second_request, second_response, self)
+
+  def testParallelInvocations(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.unary_unary_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        first_request = test_messages.request()
+        first_receiver = _receiver.Receiver()
+        second_request = test_messages.request()
+        second_receiver = _receiver.Receiver()
+
+        self._invoker.event(group, method)(
+            first_request, first_receiver, first_receiver.abort,
+            test_constants.LONG_TIMEOUT)
+        self._invoker.event(group, method)(
+            second_request, second_receiver, second_receiver.abort,
+            test_constants.LONG_TIMEOUT)
+        first_receiver.block_until_terminated()
+        second_receiver.block_until_terminated()
+
+        first_response = first_receiver.unary_response()
+        second_response = second_receiver.unary_response()
+        test_messages.verify(first_request, first_response, self)
+        test_messages.verify(second_request, second_response, self)
+
+  @unittest.skip('TODO(nathaniel): implement.')
+  def testWaitingForSomeButNotAllParallelInvocations(self):
+    raise NotImplementedError()
+
+  def testCancelledUnaryRequestUnaryResponse(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.unary_unary_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        request = test_messages.request()
+        receiver = _receiver.Receiver()
+
+        with self._control.pause():
+          call = self._invoker.event(group, method)(
+              request, receiver, receiver.abort, test_constants.LONG_TIMEOUT)
+          call.cancel()
+          receiver.block_until_terminated()
+
+        self.assertIs(face.Abortion.Kind.CANCELLED, receiver.abortion().kind)
+
+  def testCancelledUnaryRequestStreamResponse(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.unary_stream_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        request = test_messages.request()
+        receiver = _receiver.Receiver()
+
+        call = self._invoker.event(group, method)(
+            request, receiver, receiver.abort, test_constants.LONG_TIMEOUT)
+        call.cancel()
+        receiver.block_until_terminated()
+
+        self.assertIs(face.Abortion.Kind.CANCELLED, receiver.abortion().kind)
+
+  def testCancelledStreamRequestUnaryResponse(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.stream_unary_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        requests = test_messages.requests()
+        receiver = _receiver.Receiver()
+
+        call_consumer = self._invoker.event(group, method)(
+            receiver, receiver.abort, test_constants.LONG_TIMEOUT)
+        for request in requests:
+          call_consumer.consume(request)
+        call_consumer.cancel()
+        receiver.block_until_terminated()
+
+        self.assertIs(face.Abortion.Kind.CANCELLED, receiver.abortion().kind)
+
+  def testCancelledStreamRequestStreamResponse(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.stream_stream_messages_sequences.iteritems()):
+      for unused_test_messages in test_messages_sequence:
+        receiver = _receiver.Receiver()
+
+        call_consumer = self._invoker.event(group, method)(
+            receiver, receiver.abort, test_constants.LONG_TIMEOUT)
+        call_consumer.cancel()
+        receiver.block_until_terminated()
+
+        self.assertIs(face.Abortion.Kind.CANCELLED, receiver.abortion().kind)
+
+  def testExpiredUnaryRequestUnaryResponse(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.unary_unary_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        request = test_messages.request()
+        receiver = _receiver.Receiver()
+
+        with self._control.pause():
+          self._invoker.event(group, method)(
+              request, receiver, receiver.abort, test_constants.SHORT_TIMEOUT)
+          receiver.block_until_terminated()
+
+        self.assertIs(face.Abortion.Kind.EXPIRED, receiver.abortion().kind)
+
+  def testExpiredUnaryRequestStreamResponse(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.unary_stream_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        request = test_messages.request()
+        receiver = _receiver.Receiver()
+
+        with self._control.pause():
+          self._invoker.event(group, method)(
+              request, receiver, receiver.abort, test_constants.SHORT_TIMEOUT)
+          receiver.block_until_terminated()
+
+        self.assertIs(face.Abortion.Kind.EXPIRED, receiver.abortion().kind)
+
+  def testExpiredStreamRequestUnaryResponse(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.stream_unary_messages_sequences.iteritems()):
+      for unused_test_messages in test_messages_sequence:
+        receiver = _receiver.Receiver()
+
+        self._invoker.event(group, method)(
+            receiver, receiver.abort, test_constants.SHORT_TIMEOUT)
+        receiver.block_until_terminated()
+
+        self.assertIs(face.Abortion.Kind.EXPIRED, receiver.abortion().kind)
+
+  def testExpiredStreamRequestStreamResponse(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.stream_stream_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        requests = test_messages.requests()
+        receiver = _receiver.Receiver()
+
+        call_consumer = self._invoker.event(group, method)(
+            receiver, receiver.abort, test_constants.SHORT_TIMEOUT)
+        for request in requests:
+          call_consumer.consume(request)
+        receiver.block_until_terminated()
+
+        self.assertIs(face.Abortion.Kind.EXPIRED, receiver.abortion().kind)
+
+  def testFailedUnaryRequestUnaryResponse(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.unary_unary_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        request = test_messages.request()
+        receiver = _receiver.Receiver()
+
+        with self._control.fail():
+          self._invoker.event(group, method)(
+              request, receiver, receiver.abort, test_constants.LONG_TIMEOUT)
+          receiver.block_until_terminated()
+
+        self.assertIs(
+            face.Abortion.Kind.REMOTE_FAILURE, receiver.abortion().kind)
+
+  def testFailedUnaryRequestStreamResponse(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.unary_stream_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        request = test_messages.request()
+        receiver = _receiver.Receiver()
+
+        with self._control.fail():
+          self._invoker.event(group, method)(
+              request, receiver, receiver.abort, test_constants.LONG_TIMEOUT)
+          receiver.block_until_terminated()
+
+        self.assertIs(
+            face.Abortion.Kind.REMOTE_FAILURE, receiver.abortion().kind)
+
+  def testFailedStreamRequestUnaryResponse(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.stream_unary_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        requests = test_messages.requests()
+        receiver = _receiver.Receiver()
+
+        with self._control.fail():
+          call_consumer = self._invoker.event(group, method)(
+              receiver, receiver.abort, test_constants.LONG_TIMEOUT)
+          for request in requests:
+            call_consumer.consume(request)
+          call_consumer.terminate()
+          receiver.block_until_terminated()
+
+        self.assertIs(
+            face.Abortion.Kind.REMOTE_FAILURE, receiver.abortion().kind)
+
+  def testFailedStreamRequestStreamResponse(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.stream_stream_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        requests = test_messages.requests()
+        receiver = _receiver.Receiver()
+
+        with self._control.fail():
+          call_consumer = self._invoker.event(group, method)(
+              receiver, receiver.abort, test_constants.LONG_TIMEOUT)
+          for request in requests:
+            call_consumer.consume(request)
+          call_consumer.terminate()
+          receiver.block_until_terminated()
+
+        self.assertIs(
+            face.Abortion.Kind.REMOTE_FAILURE, receiver.abortion().kind)
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_future_invocation_asynchronous_event_service.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
new file mode 100644
index 0000000..a649362
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
@@ -0,0 +1,378 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Test code for the Face layer of RPC Framework."""
+
+import abc
+import contextlib
+import threading
+import unittest
+
+# test_interfaces is referenced from specification in this module.
+from grpc.framework.foundation import logging_pool
+from grpc.framework.interfaces.face import face
+from grpc_test.framework.common import test_constants
+from grpc_test.framework.common import test_control
+from grpc_test.framework.common import test_coverage
+from grpc_test.framework.interfaces.face import _digest
+from grpc_test.framework.interfaces.face import _stock_service
+from grpc_test.framework.interfaces.face import test_interfaces  # pylint: disable=unused-import
+
+
+class _PauseableIterator(object):
+
+  def __init__(self, upstream):
+    self._upstream = upstream
+    self._condition = threading.Condition()
+    self._paused = False
+
+  @contextlib.contextmanager
+  def pause(self):
+    with self._condition:
+      self._paused = True
+    yield
+    with self._condition:
+      self._paused = False
+      self._condition.notify_all()
+
+  def __iter__(self):
+    return self
+
+  def next(self):
+    with self._condition:
+      while self._paused:
+        self._condition.wait()
+    return next(self._upstream)
+
+
+class TestCase(test_coverage.Coverage, unittest.TestCase):
+  """A test of the Face layer of RPC Framework.
+
+  Concrete subclasses must have an "implementation" attribute of type
+  test_interfaces.Implementation and an "invoker_constructor" attribute of type
+  _invocation.InvokerConstructor.
+  """
+  __metaclass__ = abc.ABCMeta
+
+  NAME = 'FutureInvocationAsynchronousEventServiceTest'
+
+  def setUp(self):
+    """See unittest.TestCase.setUp for full specification.
+
+    Overriding implementations must call this implementation.
+    """
+    self._control = test_control.PauseFailControl()
+    self._digest_pool = logging_pool.pool(test_constants.POOL_SIZE)
+    self._digest = _digest.digest(
+        _stock_service.STOCK_TEST_SERVICE, self._control, self._digest_pool)
+
+    generic_stub, dynamic_stubs, self._memo = self.implementation.instantiate(
+        self._digest.methods, self._digest.event_method_implementations, None)
+    self._invoker = self.invoker_constructor.construct_invoker(
+        generic_stub, dynamic_stubs, self._digest.methods)
+
+  def tearDown(self):
+    """See unittest.TestCase.tearDown for full specification.
+
+    Overriding implementations must call this implementation.
+    """
+    self.implementation.destantiate(self._memo)
+    self._digest_pool.shutdown(wait=True)
+
+  def testSuccessfulUnaryRequestUnaryResponse(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.unary_unary_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        request = test_messages.request()
+
+        response_future = self._invoker.future(group, method)(
+            request, test_constants.LONG_TIMEOUT)
+        response = response_future.result()
+
+        test_messages.verify(request, response, self)
+
+  def testSuccessfulUnaryRequestStreamResponse(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.unary_stream_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        request = test_messages.request()
+
+        response_iterator = self._invoker.future(group, method)(
+            request, test_constants.LONG_TIMEOUT)
+        responses = list(response_iterator)
+
+        test_messages.verify(request, responses, self)
+
+  def testSuccessfulStreamRequestUnaryResponse(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.stream_unary_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        requests = test_messages.requests()
+        request_iterator = _PauseableIterator(iter(requests))
+
+        # Use of a paused iterator of requests allows us to test that control is
+        # returned to calling code before the iterator yields any requests.
+        with request_iterator.pause():
+          response_future = self._invoker.future(group, method)(
+              request_iterator, test_constants.LONG_TIMEOUT)
+        response = response_future.result()
+
+        test_messages.verify(requests, response, self)
+
+  def testSuccessfulStreamRequestStreamResponse(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.stream_stream_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        requests = test_messages.requests()
+        request_iterator = _PauseableIterator(iter(requests))
+
+        # Use of a paused iterator of requests allows us to test that control is
+        # returned to calling code before the iterator yields any requests.
+        with request_iterator.pause():
+          response_iterator = self._invoker.future(group, method)(
+              request_iterator, test_constants.LONG_TIMEOUT)
+        responses = list(response_iterator)
+
+        test_messages.verify(requests, responses, self)
+
+  def testSequentialInvocations(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.unary_unary_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        first_request = test_messages.request()
+        second_request = test_messages.request()
+
+        first_response_future = self._invoker.future(group, method)(
+            first_request, test_constants.LONG_TIMEOUT)
+        first_response = first_response_future.result()
+
+        test_messages.verify(first_request, first_response, self)
+
+        second_response_future = self._invoker.future(group, method)(
+            second_request, test_constants.LONG_TIMEOUT)
+        second_response = second_response_future.result()
+
+        test_messages.verify(second_request, second_response, self)
+
+  def testParallelInvocations(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.unary_unary_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        first_request = test_messages.request()
+        second_request = test_messages.request()
+
+        first_response_future = self._invoker.future(group, method)(
+            first_request, test_constants.LONG_TIMEOUT)
+        second_response_future = self._invoker.future(group, method)(
+            second_request, test_constants.LONG_TIMEOUT)
+        first_response = first_response_future.result()
+        second_response = second_response_future.result()
+
+        test_messages.verify(first_request, first_response, self)
+        test_messages.verify(second_request, second_response, self)
+
+  @unittest.skip('TODO(nathaniel): implement.')
+  def testWaitingForSomeButNotAllParallelInvocations(self):
+    raise NotImplementedError()
+
+  def testCancelledUnaryRequestUnaryResponse(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.unary_unary_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        request = test_messages.request()
+
+        with self._control.pause():
+          response_future = self._invoker.future(group, method)(
+              request, test_constants.LONG_TIMEOUT)
+          cancel_method_return_value = response_future.cancel()
+
+        self.assertFalse(cancel_method_return_value)
+        self.assertTrue(response_future.cancelled())
+
+  def testCancelledUnaryRequestStreamResponse(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.unary_stream_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        request = test_messages.request()
+
+        with self._control.pause():
+          response_iterator = self._invoker.future(group, method)(
+              request, test_constants.LONG_TIMEOUT)
+          response_iterator.cancel()
+
+        with self.assertRaises(face.CancellationError):
+          next(response_iterator)
+
+  def testCancelledStreamRequestUnaryResponse(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.stream_unary_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        requests = test_messages.requests()
+
+        with self._control.pause():
+          response_future = self._invoker.future(group, method)(
+              iter(requests), test_constants.LONG_TIMEOUT)
+          cancel_method_return_value = response_future.cancel()
+
+        self.assertFalse(cancel_method_return_value)
+        self.assertTrue(response_future.cancelled())
+
+  def testCancelledStreamRequestStreamResponse(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.stream_stream_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        requests = test_messages.requests()
+
+        with self._control.pause():
+          response_iterator = self._invoker.future(group, method)(
+              iter(requests), test_constants.LONG_TIMEOUT)
+          response_iterator.cancel()
+
+        with self.assertRaises(face.CancellationError):
+          next(response_iterator)
+
+  def testExpiredUnaryRequestUnaryResponse(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.unary_unary_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        request = test_messages.request()
+
+        with self._control.pause():
+          response_future = self._invoker.future(
+              group, method)(request, test_constants.SHORT_TIMEOUT)
+          self.assertIsInstance(
+              response_future.exception(), face.ExpirationError)
+          with self.assertRaises(face.ExpirationError):
+            response_future.result()
+
+  def testExpiredUnaryRequestStreamResponse(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.unary_stream_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        request = test_messages.request()
+
+        with self._control.pause():
+          response_iterator = self._invoker.future(group, method)(
+              request, test_constants.SHORT_TIMEOUT)
+          with self.assertRaises(face.ExpirationError):
+            list(response_iterator)
+
+  def testExpiredStreamRequestUnaryResponse(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.stream_unary_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        requests = test_messages.requests()
+
+        with self._control.pause():
+          response_future = self._invoker.future(group, method)(
+              iter(requests), test_constants.SHORT_TIMEOUT)
+          self.assertIsInstance(
+              response_future.exception(), face.ExpirationError)
+          with self.assertRaises(face.ExpirationError):
+            response_future.result()
+
+  def testExpiredStreamRequestStreamResponse(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.stream_stream_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        requests = test_messages.requests()
+
+        with self._control.pause():
+          response_iterator = self._invoker.future(group, method)(
+              iter(requests), test_constants.SHORT_TIMEOUT)
+          with self.assertRaises(face.ExpirationError):
+            list(response_iterator)
+
+  def testFailedUnaryRequestUnaryResponse(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.unary_unary_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        request = test_messages.request()
+
+        with self._control.fail():
+          response_future = self._invoker.future(group, method)(
+              request, test_constants.SHORT_TIMEOUT)
+
+          # Because the servicer fails outside of the thread from which the
+          # servicer-side runtime called into it its failure is
+          # indistinguishable from simply not having called its
+          # response_callback before the expiration of the RPC.
+          self.assertIsInstance(
+              response_future.exception(), face.ExpirationError)
+          with self.assertRaises(face.ExpirationError):
+            response_future.result()
+
+  def testFailedUnaryRequestStreamResponse(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.unary_stream_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        request = test_messages.request()
+
+        # Because the servicer fails outside of the thread from which the
+        # servicer-side runtime called into it its failure is indistinguishable
+        # from simply not having called its response_consumer before the
+        # expiration of the RPC.
+        with self._control.fail(), self.assertRaises(face.ExpirationError):
+          response_iterator = self._invoker.future(group, method)(
+              request, test_constants.SHORT_TIMEOUT)
+          list(response_iterator)
+
+  def testFailedStreamRequestUnaryResponse(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.stream_unary_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        requests = test_messages.requests()
+
+        with self._control.fail():
+          response_future = self._invoker.future(group, method)(
+              iter(requests), test_constants.SHORT_TIMEOUT)
+
+          # Because the servicer fails outside of the thread from which the
+          # servicer-side runtime called into it its failure is
+          # indistinguishable from simply not having called its
+          # response_callback before the expiration of the RPC.
+          self.assertIsInstance(
+              response_future.exception(), face.ExpirationError)
+          with self.assertRaises(face.ExpirationError):
+            response_future.result()
+
+  def testFailedStreamRequestStreamResponse(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.stream_stream_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        requests = test_messages.requests()
+
+        # Because the servicer fails outside of the thread from which the
+        # servicer-side runtime called into it its failure is indistinguishable
+        # from simply not having called its response_consumer before the
+        # expiration of the RPC.
+        with self._control.fail(), self.assertRaises(face.ExpirationError):
+          response_iterator = self._invoker.future(group, method)(
+              iter(requests), test_constants.SHORT_TIMEOUT)
+          list(response_iterator)
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_invocation.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_invocation.py
new file mode 100644
index 0000000..448e845
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_invocation.py
@@ -0,0 +1,213 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Coverage across the Face layer's generic-to-dynamic range for invocation."""
+
+import abc
+
+from grpc.framework.common import cardinality
+
+_CARDINALITY_TO_GENERIC_BLOCKING_BEHAVIOR = {
+    cardinality.Cardinality.UNARY_UNARY: 'blocking_unary_unary',
+    cardinality.Cardinality.UNARY_STREAM: 'inline_unary_stream',
+    cardinality.Cardinality.STREAM_UNARY: 'blocking_stream_unary',
+    cardinality.Cardinality.STREAM_STREAM: 'inline_stream_stream',
+}
+
+_CARDINALITY_TO_GENERIC_FUTURE_BEHAVIOR = {
+    cardinality.Cardinality.UNARY_UNARY: 'future_unary_unary',
+    cardinality.Cardinality.UNARY_STREAM: 'inline_unary_stream',
+    cardinality.Cardinality.STREAM_UNARY: 'future_stream_unary',
+    cardinality.Cardinality.STREAM_STREAM: 'inline_stream_stream',
+}
+
+_CARDINALITY_TO_GENERIC_EVENT_BEHAVIOR = {
+    cardinality.Cardinality.UNARY_UNARY: 'event_unary_unary',
+    cardinality.Cardinality.UNARY_STREAM: 'event_unary_stream',
+    cardinality.Cardinality.STREAM_UNARY: 'event_stream_unary',
+    cardinality.Cardinality.STREAM_STREAM: 'event_stream_stream',
+}
+
+_CARDINALITY_TO_MULTI_CALLABLE_ATTRIBUTE = {
+    cardinality.Cardinality.UNARY_UNARY: 'unary_unary',
+    cardinality.Cardinality.UNARY_STREAM: 'unary_stream',
+    cardinality.Cardinality.STREAM_UNARY: 'stream_unary',
+    cardinality.Cardinality.STREAM_STREAM: 'stream_stream',
+}
+
+
+class Invoker(object):
+  """A type used to invoke test RPCs."""
+  __metaclass__ = abc.ABCMeta
+
+  @abc.abstractmethod
+  def blocking(self, group, name):
+    """Invokes an RPC with blocking control flow."""
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def future(self, group, name):
+    """Invokes an RPC with future control flow."""
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def event(self, group, name):
+    """Invokes an RPC with event control flow."""
+    raise NotImplementedError()
+
+
+class InvokerConstructor(object):
+  """A type used to create Invokers."""
+  __metaclass__ = abc.ABCMeta
+
+  @abc.abstractmethod
+  def name(self):
+    """Specifies the name of the Invoker constructed by this object."""
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def construct_invoker(self, generic_stub, dynamic_stubs, methods):
+    """Constructs an Invoker for the given stubs and methods."""
+    raise NotImplementedError()
+
+
+class _GenericInvoker(Invoker):
+
+  def __init__(self, generic_stub, methods):
+    self._stub = generic_stub
+    self._methods = methods
+
+  def _behavior(self, group, name, cardinality_to_generic_method):
+    method_cardinality = self._methods[group, name].cardinality()
+    behavior = getattr(
+        self._stub, cardinality_to_generic_method[method_cardinality])
+    return lambda *args, **kwargs: behavior(group, name, *args, **kwargs)
+
+  def blocking(self, group, name):
+    return self._behavior(
+        group, name, _CARDINALITY_TO_GENERIC_BLOCKING_BEHAVIOR)
+
+  def future(self, group, name):
+    return self._behavior(group, name, _CARDINALITY_TO_GENERIC_FUTURE_BEHAVIOR)
+
+  def event(self, group, name):
+    return self._behavior(group, name, _CARDINALITY_TO_GENERIC_EVENT_BEHAVIOR)
+
+
+class _GenericInvokerConstructor(InvokerConstructor):
+
+  def name(self):
+    return 'GenericInvoker'
+
+  def construct_invoker(self, generic_stub, dynamic_stub, methods):
+    return _GenericInvoker(generic_stub, methods)
+
+
+class _MultiCallableInvoker(Invoker):
+
+  def __init__(self, generic_stub, methods):
+    self._stub = generic_stub
+    self._methods = methods
+
+  def _multi_callable(self, group, name):
+    method_cardinality = self._methods[group, name].cardinality()
+    behavior = getattr(
+        self._stub,
+        _CARDINALITY_TO_MULTI_CALLABLE_ATTRIBUTE[method_cardinality])
+    return behavior(group, name)
+
+  def blocking(self, group, name):
+    return self._multi_callable(group, name)
+
+  def future(self, group, name):
+    method_cardinality = self._methods[group, name].cardinality()
+    behavior = getattr(
+        self._stub,
+        _CARDINALITY_TO_MULTI_CALLABLE_ATTRIBUTE[method_cardinality])
+    if method_cardinality in (
+        cardinality.Cardinality.UNARY_UNARY,
+        cardinality.Cardinality.STREAM_UNARY):
+      return behavior(group, name).future
+    else:
+      return behavior(group, name)
+
+  def event(self, group, name):
+    return self._multi_callable(group, name).event
+
+
+class _MultiCallableInvokerConstructor(InvokerConstructor):
+
+  def name(self):
+    return 'MultiCallableInvoker'
+
+  def construct_invoker(self, generic_stub, dynamic_stub, methods):
+    return _MultiCallableInvoker(generic_stub, methods)
+
+
+class _DynamicInvoker(Invoker):
+
+  def __init__(self, dynamic_stubs, methods):
+    self._stubs = dynamic_stubs
+    self._methods = methods
+
+  def blocking(self, group, name):
+    return getattr(self._stubs[group], name)
+
+  def future(self, group, name):
+    if self._methods[group, name].cardinality() in (
+        cardinality.Cardinality.UNARY_UNARY,
+        cardinality.Cardinality.STREAM_UNARY):
+      return getattr(self._stubs[group], name).future
+    else:
+      return getattr(self._stubs[group], name)
+
+  def event(self, group, name):
+    return getattr(self._stubs[group], name).event
+
+
+class _DynamicInvokerConstructor(InvokerConstructor):
+
+  def name(self):
+    return 'DynamicInvoker'
+
+  def construct_invoker(self, generic_stub, dynamic_stubs, methods):
+    return _DynamicInvoker(dynamic_stubs, methods)
+
+
+def invoker_constructors():
+  """Creates a sequence of InvokerConstructors to use in tests of RPCs.
+
+  Returns:
+    A sequence of InvokerConstructors.
+  """
+  return (
+      _GenericInvokerConstructor(),
+      _MultiCallableInvokerConstructor(),
+      _DynamicInvokerConstructor(),
+  )
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_receiver.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_receiver.py
new file mode 100644
index 0000000..2e444ff
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_receiver.py
@@ -0,0 +1,95 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""A utility useful in tests of asynchronous, event-driven interfaces."""
+
+import threading
+
+from grpc.framework.interfaces.face import face
+
+
+class Receiver(face.ResponseReceiver):
+  """A utility object useful in tests of asynchronous code."""
+
+  def __init__(self):
+    self._condition = threading.Condition()
+    self._initial_metadata = None
+    self._responses = []
+    self._terminal_metadata = None
+    self._code = None
+    self._details = None
+    self._completed = False
+    self._abortion = None
+
+  def abort(self, abortion):
+    with self._condition:
+      self._abortion = abortion
+      self._condition.notify_all()
+
+  def initial_metadata(self, initial_metadata):
+    with self._condition:
+      self._initial_metadata = initial_metadata
+
+  def response(self, response):
+    with self._condition:
+      self._responses.append(response)
+
+  def complete(self, terminal_metadata, code, details):
+    with self._condition:
+      self._terminal_metadata = terminal_metadata
+      self._code = code
+      self._details = details
+      self._completed = True
+      self._condition.notify_all()
+
+  def block_until_terminated(self):
+    with self._condition:
+      while self._abortion is None and not self._completed:
+        self._condition.wait()
+
+  def unary_response(self):
+    with self._condition:
+      if self._abortion is not None:
+        raise AssertionError('Aborted with abortion "%s"!' % self._abortion)
+      elif len(self._responses) != 1:
+        raise AssertionError(
+            '%d responses received, not exactly one!', len(self._responses))
+      else:
+        return self._responses[0]
+
+  def stream_responses(self):
+    with self._condition:
+      if self._abortion is None:
+        return list(self._responses)
+      else:
+        raise AssertionError('Aborted with abortion "%s"!' % self._abortion)
+
+  def abortion(self):
+    with self._condition:
+      return self._abortion
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_service.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_service.py
new file mode 100644
index 0000000..e25b8a0
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_service.py
@@ -0,0 +1,332 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Private interfaces implemented by data sets used in Face-layer tests."""
+
+import abc
+
+# face is referenced from specification in this module.
+from grpc.framework.interfaces.face import face  # pylint: disable=unused-import
+from grpc_test.framework.interfaces.face import test_interfaces
+
+
+class UnaryUnaryTestMethodImplementation(test_interfaces.Method):
+  """A controllable implementation of a unary-unary method."""
+
+  __metaclass__ = abc.ABCMeta
+
+  @abc.abstractmethod
+  def service(self, request, response_callback, context, control):
+    """Services an RPC that accepts one message and produces one message.
+
+    Args:
+      request: The single request message for the RPC.
+      response_callback: A callback to be called to accept the response message
+        of the RPC.
+      context: An face.ServicerContext object.
+      control: A test_control.Control to control execution of this method.
+
+    Raises:
+      abandonment.Abandoned: May or may not be raised when the RPC has been
+        aborted.
+    """
+    raise NotImplementedError()
+
+
+class UnaryUnaryTestMessages(object):
+  """A type for unary-request-unary-response message pairings."""
+
+  __metaclass__ = abc.ABCMeta
+
+  @abc.abstractmethod
+  def request(self):
+    """Affords a request message.
+
+    Implementations of this method should return a different message with each
+    call so that multiple test executions of the test method may be made with
+    different inputs.
+
+    Returns:
+      A request message.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def verify(self, request, response, test_case):
+    """Verifies that the computed response matches the given request.
+
+    Args:
+      request: A request message.
+      response: A response message.
+      test_case: A unittest.TestCase object affording useful assertion methods.
+
+    Raises:
+      AssertionError: If the request and response do not match, indicating that
+        there was some problem executing the RPC under test.
+    """
+    raise NotImplementedError()
+
+
+class UnaryStreamTestMethodImplementation(test_interfaces.Method):
+  """A controllable implementation of a unary-stream method."""
+
+  __metaclass__ = abc.ABCMeta
+
+  @abc.abstractmethod
+  def service(self, request, response_consumer, context, control):
+    """Services an RPC that takes one message and produces a stream of messages.
+
+    Args:
+      request: The single request message for the RPC.
+      response_consumer: A stream.Consumer to be called to accept the response
+        messages of the RPC.
+      context: A face.ServicerContext object.
+      control: A test_control.Control to control execution of this method.
+
+    Raises:
+      abandonment.Abandoned: May or may not be raised when the RPC has been
+        aborted.
+    """
+    raise NotImplementedError()
+
+
+class UnaryStreamTestMessages(object):
+  """A type for unary-request-stream-response message pairings."""
+
+  __metaclass__ = abc.ABCMeta
+
+  @abc.abstractmethod
+  def request(self):
+    """Affords a request message.
+
+    Implementations of this method should return a different message with each
+    call so that multiple test executions of the test method may be made with
+    different inputs.
+
+    Returns:
+      A request message.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def verify(self, request, responses, test_case):
+    """Verifies that the computed responses match the given request.
+
+    Args:
+      request: A request message.
+      responses: A sequence of response messages.
+      test_case: A unittest.TestCase object affording useful assertion methods.
+
+    Raises:
+      AssertionError: If the request and responses do not match, indicating that
+        there was some problem executing the RPC under test.
+    """
+    raise NotImplementedError()
+
+
+class StreamUnaryTestMethodImplementation(test_interfaces.Method):
+  """A controllable implementation of a stream-unary method."""
+
+  __metaclass__ = abc.ABCMeta
+
+  @abc.abstractmethod
+  def service(self, response_callback, context, control):
+    """Services an RPC that takes a stream of messages and produces one message.
+
+    Args:
+      response_callback: A callback to be called to accept the response message
+        of the RPC.
+      context: A face.ServicerContext object.
+      control: A test_control.Control to control execution of this method.
+
+    Returns:
+      A stream.Consumer with which to accept the request messages of the RPC.
+        The consumer returned from this method may or may not be invoked to
+        completion: in the case of RPC abortion, RPC Framework will simply stop
+        passing messages to this object. Implementations must not assume that
+        this object will be called to completion of the request stream or even
+        called at all.
+
+    Raises:
+      abandonment.Abandoned: May or may not be raised when the RPC has been
+        aborted.
+    """
+    raise NotImplementedError()
+
+
+class StreamUnaryTestMessages(object):
+  """A type for stream-request-unary-response message pairings."""
+
+  __metaclass__ = abc.ABCMeta
+
+  @abc.abstractmethod
+  def requests(self):
+    """Affords a sequence of request messages.
+
+    Implementations of this method should return a different sequences with each
+    call so that multiple test executions of the test method may be made with
+    different inputs.
+
+    Returns:
+      A sequence of request messages.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def verify(self, requests, response, test_case):
+    """Verifies that the computed response matches the given requests.
+
+    Args:
+      requests: A sequence of request messages.
+      response: A response message.
+      test_case: A unittest.TestCase object affording useful assertion methods.
+
+    Raises:
+      AssertionError: If the requests and response do not match, indicating that
+        there was some problem executing the RPC under test.
+    """
+    raise NotImplementedError()
+
+
+class StreamStreamTestMethodImplementation(test_interfaces.Method):
+  """A controllable implementation of a stream-stream method."""
+
+  __metaclass__ = abc.ABCMeta
+
+  @abc.abstractmethod
+  def service(self, response_consumer, context, control):
+    """Services an RPC that accepts and produces streams of messages.
+
+    Args:
+      response_consumer: A stream.Consumer to be called to accept the response
+        messages of the RPC.
+      context: A face.ServicerContext object.
+      control: A test_control.Control to control execution of this method.
+
+    Returns:
+      A stream.Consumer with which to accept the request messages of the RPC.
+        The consumer returned from this method may or may not be invoked to
+        completion: in the case of RPC abortion, RPC Framework will simply stop
+        passing messages to this object. Implementations must not assume that
+        this object will be called to completion of the request stream or even
+        called at all.
+
+    Raises:
+      abandonment.Abandoned: May or may not be raised when the RPC has been
+        aborted.
+    """
+    raise NotImplementedError()
+
+
+class StreamStreamTestMessages(object):
+  """A type for stream-request-stream-response message pairings."""
+
+  __metaclass__ = abc.ABCMeta
+
+  @abc.abstractmethod
+  def requests(self):
+    """Affords a sequence of request messages.
+
+    Implementations of this method should return a different sequences with each
+    call so that multiple test executions of the test method may be made with
+    different inputs.
+
+    Returns:
+      A sequence of request messages.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def verify(self, requests, responses, test_case):
+    """Verifies that the computed response matches the given requests.
+
+    Args:
+      requests: A sequence of request messages.
+      responses: A sequence of response messages.
+      test_case: A unittest.TestCase object affording useful assertion methods.
+
+    Raises:
+      AssertionError: If the requests and responses do not match, indicating
+        that there was some problem executing the RPC under test.
+    """
+    raise NotImplementedError()
+
+
+class TestService(object):
+  """A specification of implemented methods to use in tests."""
+
+  __metaclass__ = abc.ABCMeta
+
+  @abc.abstractmethod
+  def unary_unary_scenarios(self):
+    """Affords unary-request-unary-response test methods and their messages.
+
+    Returns:
+      A dict from method group-name pair to implementation/messages pair. The
+        first element of the pair is a UnaryUnaryTestMethodImplementation object
+        and the second element is a sequence of UnaryUnaryTestMethodMessages
+        objects.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def unary_stream_scenarios(self):
+    """Affords unary-request-stream-response test methods and their messages.
+
+    Returns:
+      A dict from method group-name pair to implementation/messages pair. The
+        first element of the pair is a UnaryStreamTestMethodImplementation
+        object and the second element is a sequence of
+        UnaryStreamTestMethodMessages objects.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def stream_unary_scenarios(self):
+    """Affords stream-request-unary-response test methods and their messages.
+
+    Returns:
+      A dict from method group-name pair to implementation/messages pair. The
+        first element of the pair is a StreamUnaryTestMethodImplementation
+        object and the second element is a sequence of
+        StreamUnaryTestMethodMessages objects.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def stream_stream_scenarios(self):
+    """Affords stream-request-stream-response test methods and their messages.
+
+    Returns:
+      A dict from method group-name pair to implementation/messages pair. The
+        first element of the pair is a StreamStreamTestMethodImplementation
+        object and the second element is a sequence of
+        StreamStreamTestMethodMessages objects.
+    """
+    raise NotImplementedError()
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_stock_service.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_stock_service.py
new file mode 100644
index 0000000..1dd2ec3
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_stock_service.py
@@ -0,0 +1,396 @@
+B# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Examples of Python implementations of the stock.proto Stock service."""
+
+from grpc.framework.common import cardinality
+from grpc.framework.foundation import abandonment
+from grpc.framework.foundation import stream
+from grpc_test.framework.common import test_constants
+from grpc_test.framework.interfaces.face import _service
+from grpc_test._junkdrawer import stock_pb2
+
+_STOCK_GROUP_NAME = 'Stock'
+_SYMBOL_FORMAT = 'test symbol:%03d'
+
+# A test-appropriate security-pricing function. :-P
+_price = lambda symbol_name: float(hash(symbol_name) % 4096)
+
+
+def _get_last_trade_price(stock_request, stock_reply_callback, control, active):
+  """A unary-request, unary-response test method."""
+  control.control()
+  if active():
+    stock_reply_callback(
+        stock_pb2.StockReply(
+            symbol=stock_request.symbol, price=_price(stock_request.symbol)))
+  else:
+    raise abandonment.Abandoned()
+
+
+def _get_last_trade_price_multiple(stock_reply_consumer, control, active):
+  """A stream-request, stream-response test method."""
+  def stock_reply_for_stock_request(stock_request):
+    control.control()
+    if active():
+      return stock_pb2.StockReply(
+          symbol=stock_request.symbol, price=_price(stock_request.symbol))
+    else:
+      raise abandonment.Abandoned()
+
+  class StockRequestConsumer(stream.Consumer):
+
+    def consume(self, stock_request):
+      stock_reply_consumer.consume(stock_reply_for_stock_request(stock_request))
+
+    def terminate(self):
+      control.control()
+      stock_reply_consumer.terminate()
+
+    def consume_and_terminate(self, stock_request):
+      stock_reply_consumer.consume_and_terminate(
+          stock_reply_for_stock_request(stock_request))
+
+  return StockRequestConsumer()
+
+
+def _watch_future_trades(stock_request, stock_reply_consumer, control, active):
+  """A unary-request, stream-response test method."""
+  base_price = _price(stock_request.symbol)
+  for index in range(stock_request.num_trades_to_watch):
+    control.control()
+    if active():
+      stock_reply_consumer.consume(
+          stock_pb2.StockReply(
+              symbol=stock_request.symbol, price=base_price + index))
+    else:
+      raise abandonment.Abandoned()
+  stock_reply_consumer.terminate()
+
+
+def _get_highest_trade_price(stock_reply_callback, control, active):
+  """A stream-request, unary-response test method."""
+
+  class StockRequestConsumer(stream.Consumer):
+    """Keeps an ongoing record of the most valuable symbol yet consumed."""
+
+    def __init__(self):
+      self._symbol = None
+      self._price = None
+
+    def consume(self, stock_request):
+      control.control()
+      if active():
+        if self._price is None:
+          self._symbol = stock_request.symbol
+          self._price = _price(stock_request.symbol)
+        else:
+          candidate_price = _price(stock_request.symbol)
+          if self._price < candidate_price:
+            self._symbol = stock_request.symbol
+            self._price = candidate_price
+
+    def terminate(self):
+      control.control()
+      if active():
+        if self._symbol is None:
+          raise ValueError()
+        else:
+          stock_reply_callback(
+              stock_pb2.StockReply(symbol=self._symbol, price=self._price))
+          self._symbol = None
+          self._price = None
+
+    def consume_and_terminate(self, stock_request):
+      control.control()
+      if active():
+        if self._price is None:
+          stock_reply_callback(
+              stock_pb2.StockReply(
+                  symbol=stock_request.symbol,
+                  price=_price(stock_request.symbol)))
+        else:
+          candidate_price = _price(stock_request.symbol)
+          if self._price < candidate_price:
+            stock_reply_callback(
+                stock_pb2.StockReply(
+                    symbol=stock_request.symbol, price=candidate_price))
+          else:
+            stock_reply_callback(
+                stock_pb2.StockReply(
+                    symbol=self._symbol, price=self._price))
+
+        self._symbol = None
+        self._price = None
+
+  return StockRequestConsumer()
+
+
+class GetLastTradePrice(_service.UnaryUnaryTestMethodImplementation):
+  """GetLastTradePrice for use in tests."""
+
+  def group(self):
+    return _STOCK_GROUP_NAME
+
+  def name(self):
+    return 'GetLastTradePrice'
+
+  def cardinality(self):
+    return cardinality.Cardinality.UNARY_UNARY
+
+  def request_class(self):
+    return stock_pb2.StockRequest
+
+  def response_class(self):
+    return stock_pb2.StockReply
+
+  def serialize_request(self, request):
+    return request.SerializeToString()
+
+  def deserialize_request(self, serialized_request):
+    return stock_pb2.StockRequest.FromString(serialized_request)
+
+  def serialize_response(self, response):
+    return response.SerializeToString()
+
+  def deserialize_response(self, serialized_response):
+    return stock_pb2.StockReply.FromString(serialized_response)
+
+  def service(self, request, response_callback, context, control):
+    _get_last_trade_price(
+        request, response_callback, control, context.is_active)
+
+
+class GetLastTradePriceMessages(_service.UnaryUnaryTestMessages):
+
+  def __init__(self):
+    self._index = 0
+
+  def request(self):
+    symbol = _SYMBOL_FORMAT % self._index
+    self._index += 1
+    return stock_pb2.StockRequest(symbol=symbol)
+
+  def verify(self, request, response, test_case):
+    test_case.assertEqual(request.symbol, response.symbol)
+    test_case.assertEqual(_price(request.symbol), response.price)
+
+
+class GetLastTradePriceMultiple(_service.StreamStreamTestMethodImplementation):
+  """GetLastTradePriceMultiple for use in tests."""
+
+  def group(self):
+    return _STOCK_GROUP_NAME
+
+  def name(self):
+    return 'GetLastTradePriceMultiple'
+
+  def cardinality(self):
+    return cardinality.Cardinality.STREAM_STREAM
+
+  def request_class(self):
+    return stock_pb2.StockRequest
+
+  def response_class(self):
+    return stock_pb2.StockReply
+
+  def serialize_request(self, request):
+    return request.SerializeToString()
+
+  def deserialize_request(self, serialized_request):
+    return stock_pb2.StockRequest.FromString(serialized_request)
+
+  def serialize_response(self, response):
+    return response.SerializeToString()
+
+  def deserialize_response(self, serialized_response):
+    return stock_pb2.StockReply.FromString(serialized_response)
+
+  def service(self, response_consumer, context, control):
+    return _get_last_trade_price_multiple(
+        response_consumer, control, context.is_active)
+
+
+class GetLastTradePriceMultipleMessages(_service.StreamStreamTestMessages):
+  """Pairs of message streams for use with GetLastTradePriceMultiple."""
+
+  def __init__(self):
+    self._index = 0
+
+  def requests(self):
+    base_index = self._index
+    self._index += 1
+    return [
+        stock_pb2.StockRequest(symbol=_SYMBOL_FORMAT % (base_index + index))
+        for index in range(test_constants.STREAM_LENGTH)]
+
+  def verify(self, requests, responses, test_case):
+    test_case.assertEqual(len(requests), len(responses))
+    for stock_request, stock_reply in zip(requests, responses):
+      test_case.assertEqual(stock_request.symbol, stock_reply.symbol)
+      test_case.assertEqual(_price(stock_request.symbol), stock_reply.price)
+
+
+class WatchFutureTrades(_service.UnaryStreamTestMethodImplementation):
+  """WatchFutureTrades for use in tests."""
+
+  def group(self):
+    return _STOCK_GROUP_NAME
+
+  def name(self):
+    return 'WatchFutureTrades'
+
+  def cardinality(self):
+    return cardinality.Cardinality.UNARY_STREAM
+
+  def request_class(self):
+    return stock_pb2.StockRequest
+
+  def response_class(self):
+    return stock_pb2.StockReply
+
+  def serialize_request(self, request):
+    return request.SerializeToString()
+
+  def deserialize_request(self, serialized_request):
+    return stock_pb2.StockRequest.FromString(serialized_request)
+
+  def serialize_response(self, response):
+    return response.SerializeToString()
+
+  def deserialize_response(self, serialized_response):
+    return stock_pb2.StockReply.FromString(serialized_response)
+
+  def service(self, request, response_consumer, context, control):
+    _watch_future_trades(request, response_consumer, control, context.is_active)
+
+
+class WatchFutureTradesMessages(_service.UnaryStreamTestMessages):
+  """Pairs of a single request message and a sequence of response messages."""
+
+  def __init__(self):
+    self._index = 0
+
+  def request(self):
+    symbol = _SYMBOL_FORMAT % self._index
+    self._index += 1
+    return stock_pb2.StockRequest(
+        symbol=symbol, num_trades_to_watch=test_constants.STREAM_LENGTH)
+
+  def verify(self, request, responses, test_case):
+    test_case.assertEqual(test_constants.STREAM_LENGTH, len(responses))
+    base_price = _price(request.symbol)
+    for index, response in enumerate(responses):
+      test_case.assertEqual(base_price + index, response.price)
+
+
+class GetHighestTradePrice(_service.StreamUnaryTestMethodImplementation):
+  """GetHighestTradePrice for use in tests."""
+
+  def group(self):
+    return _STOCK_GROUP_NAME
+
+  def name(self):
+    return 'GetHighestTradePrice'
+
+  def cardinality(self):
+    return cardinality.Cardinality.STREAM_UNARY
+
+  def request_class(self):
+    return stock_pb2.StockRequest
+
+  def response_class(self):
+    return stock_pb2.StockReply
+
+  def serialize_request(self, request):
+    return request.SerializeToString()
+
+  def deserialize_request(self, serialized_request):
+    return stock_pb2.StockRequest.FromString(serialized_request)
+
+  def serialize_response(self, response):
+    return response.SerializeToString()
+
+  def deserialize_response(self, serialized_response):
+    return stock_pb2.StockReply.FromString(serialized_response)
+
+  def service(self, response_callback, context, control):
+    return _get_highest_trade_price(
+        response_callback, control, context.is_active)
+
+
+class GetHighestTradePriceMessages(_service.StreamUnaryTestMessages):
+
+  def requests(self):
+    return [
+        stock_pb2.StockRequest(symbol=_SYMBOL_FORMAT % index)
+        for index in range(test_constants.STREAM_LENGTH)]
+
+  def verify(self, requests, response, test_case):
+    price = None
+    symbol = None
+    for stock_request in requests:
+      current_symbol = stock_request.symbol
+      current_price = _price(current_symbol)
+      if price is None or price < current_price:
+        price = current_price
+        symbol = current_symbol
+    test_case.assertEqual(price, response.price)
+    test_case.assertEqual(symbol, response.symbol)
+
+
+class StockTestService(_service.TestService):
+  """A corpus of test data with one method of each RPC cardinality."""
+
+  def unary_unary_scenarios(self):
+    return {
+        (_STOCK_GROUP_NAME, 'GetLastTradePrice'): (
+            GetLastTradePrice(), [GetLastTradePriceMessages()]),
+    }
+
+  def unary_stream_scenarios(self):
+    return {
+        (_STOCK_GROUP_NAME, 'WatchFutureTrades'): (
+            WatchFutureTrades(), [WatchFutureTradesMessages()]),
+    }
+
+  def stream_unary_scenarios(self):
+    return {
+        (_STOCK_GROUP_NAME, 'GetHighestTradePrice'): (
+            GetHighestTradePrice(), [GetHighestTradePriceMessages()])
+    }
+
+  def stream_stream_scenarios(self):
+    return {
+        (_STOCK_GROUP_NAME, 'GetLastTradePriceMultiple'): (
+            GetLastTradePriceMultiple(), [GetLastTradePriceMultipleMessages()]),
+    }
+
+
+STOCK_TEST_SERVICE = StockTestService()
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/test_cases.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/test_cases.py
new file mode 100644
index 0000000..ca62366
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/test_cases.py
@@ -0,0 +1,67 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Tools for creating tests of implementations of the Face layer."""
+
+# unittest is referenced from specification in this module.
+import unittest  # pylint: disable=unused-import
+
+# test_interfaces is referenced from specification in this module.
+from grpc_test.framework.interfaces.face import _blocking_invocation_inline_service
+from grpc_test.framework.interfaces.face import _event_invocation_synchronous_event_service
+from grpc_test.framework.interfaces.face import _future_invocation_asynchronous_event_service
+from grpc_test.framework.interfaces.face import _invocation
+from grpc_test.framework.interfaces.face import test_interfaces  # pylint: disable=unused-import
+
+_TEST_CASE_SUPERCLASSES = (
+    _blocking_invocation_inline_service.TestCase,
+    _event_invocation_synchronous_event_service.TestCase,
+    _future_invocation_asynchronous_event_service.TestCase,
+)
+
+
+def test_cases(implementation):
+  """Creates unittest.TestCase classes for a given Face layer implementation.
+
+  Args:
+    implementation: A test_interfaces.Implementation specifying creation and
+      destruction of a given Face layer implementation.
+
+  Returns:
+    A sequence of subclasses of unittest.TestCase defining tests of the
+      specified Face layer implementation.
+  """
+  test_case_classes = []
+  for invoker_constructor in _invocation.invoker_constructors():
+    for super_class in _TEST_CASE_SUPERCLASSES:
+      test_case_classes.append(
+          type(invoker_constructor.name() + super_class.NAME, (super_class,),
+               {'implementation': implementation,
+                'invoker_constructor': invoker_constructor}))
+  return test_case_classes
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/test_interfaces.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/test_interfaces.py
new file mode 100644
index 0000000..b2b5c10
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/test_interfaces.py
@@ -0,0 +1,229 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Interfaces used in tests of implementations of the Face layer."""
+
+import abc
+
+from grpc.framework.common import cardinality  # pylint: disable=unused-import
+from grpc.framework.interfaces.face import face  # pylint: disable=unused-import
+
+
+class Method(object):
+  """Specifies a method to be used in tests."""
+  __metaclass__ = abc.ABCMeta
+
+  @abc.abstractmethod
+  def group(self):
+    """Identify the group of the method.
+
+    Returns:
+      The group of the method.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def name(self):
+    """Identify the name of the method.
+
+    Returns:
+      The name of the method.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def cardinality(self):
+    """Identify the cardinality of the method.
+
+    Returns:
+      A cardinality.Cardinality value describing the streaming semantics of the
+        method.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def request_class(self):
+    """Identify the class used for the method's request objects.
+
+    Returns:
+      The class object of the class to which the method's request objects
+        belong.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def response_class(self):
+    """Identify the class used for the method's response objects.
+
+    Returns:
+      The class object of the class to which the method's response objects
+        belong.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def serialize_request(self, request):
+    """Serialize the given request object.
+
+    Args:
+      request: A request object appropriate for this method.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def deserialize_request(self, serialized_request):
+    """Synthesize a request object from a given bytestring.
+
+    Args:
+      serialized_request: A bytestring deserializable into a request object
+        appropriate for this method.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def serialize_response(self, response):
+    """Serialize the given response object.
+
+    Args:
+      response: A response object appropriate for this method.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def deserialize_response(self, serialized_response):
+    """Synthesize a response object from a given bytestring.
+
+    Args:
+      serialized_response: A bytestring deserializable into a response object
+        appropriate for this method.
+    """
+    raise NotImplementedError()
+
+
+class Implementation(object):
+  """Specifies an implementation of the Face layer."""
+  __metaclass__ = abc.ABCMeta
+
+  @abc.abstractmethod
+  def instantiate(
+      self, methods, method_implementations,
+      multi_method_implementation):
+    """Instantiates the Face layer implementation to be used in a test.
+
+    Args:
+      methods: A sequence of Method objects describing the methods available to
+        be called during the test.
+      method_implementations: A dictionary from group-name pair to
+        face.MethodImplementation object specifying implementation of a method.
+      multi_method_implementation: A face.MultiMethodImplementation or None.
+
+    Returns:
+      A sequence of length three the first element of which is a
+        face.GenericStub, the second element of which is dictionary from groups
+        to face.DynamicStubs affording invocation of the group's methods, and
+        the third element of which is an arbitrary memo object to be kept and
+        passed to destantiate at the conclusion of the test. The returned stubs
+        must be backed by the provided implementations.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def destantiate(self, memo):
+    """Destroys the Face layer implementation under test.
+
+    Args:
+      memo: The object from the third position of the return value of a call to
+        instantiate.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def invocation_metadata(self):
+    """Provides the metadata to be used when invoking a test RPC.
+
+    Returns:
+      An object to use as the supplied-at-invocation-time metadata in a test
+        RPC.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def initial_metadata(self):
+    """Provides the metadata for use as a test RPC's first servicer metadata.
+
+    Returns:
+      An object to use as the from-the-servicer-before-responses metadata in a
+        test RPC.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def terminal_metadata(self):
+    """Provides the metadata for use as a test RPC's second servicer metadata.
+
+    Returns:
+      An object to use as the from-the-servicer-after-all-responses metadata in
+        a test RPC.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def code(self):
+    """Provides the value for use as a test RPC's code.
+
+    Returns:
+      An object to use as the from-the-servicer code in a test RPC.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def details(self):
+    """Provides the value for use as a test RPC's details.
+
+    Returns:
+      An object to use as the from-the-servicer details in a test RPC.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
+  def metadata_transmitted(self, original_metadata, transmitted_metadata):
+    """Identifies whether or not metadata was properly transmitted.
+
+    Args:
+      original_metadata: A metadata value passed to the Face interface
+        implementation under test.
+      transmitted_metadata: The same metadata value after having been
+        transmitted via an RPC performed by the Face interface implementation
+          under test.
+
+    Returns:
+      Whether or not the metadata was properly transmitted by the Face interface
+        implementation under test.
+    """
+    raise NotImplementedError()