Added compiler plugin test for Python.
diff --git a/test/compiler/python_plugin_test.py b/test/compiler/python_plugin_test.py
new file mode 100644
index 0000000..b0c9ec6
--- /dev/null
+++ b/test/compiler/python_plugin_test.py
@@ -0,0 +1,480 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import argparse
+import contextlib
+import errno
+import itertools
+import os
+import subprocess
+import sys
+import time
+import unittest
+
+from grpc.framework.face import exceptions
+from grpc.framework.foundation import future
+
+# Assigned in __main__.
+_build_mode = None
+
+
+class _ServicerMethods(object):
+
+  def __init__(self, test_pb2, delay):
+    self._paused = False
+    self._failed = False
+    self.test_pb2 = test_pb2
+    self.delay = delay
+
+  @contextlib.contextmanager
+  def pause(self):  # pylint: disable=invalid-name
+    self._paused = True
+    yield
+    self._paused = False
+
+  @contextlib.contextmanager
+  def fail(self):  # pylint: disable=invalid-name
+    self._failed = True
+    yield
+    self._failed = False
+
+  def _control(self):  # pylint: disable=invalid-name
+    if self._failed:
+      raise ValueError()
+    time.sleep(self.delay)
+    while self._paused:
+      time.sleep(0)
+
+  def UnaryCall(self, request):
+    response = self.test_pb2.SimpleResponse()
+    response.payload.payload_type = self.test_pb2.COMPRESSABLE
+    response.payload.payload_compressable = 'a' * request.response_size
+    self._control()
+    return response
+
+  def StreamingOutputCall(self, request):
+    for parameter in request.response_parameters:
+      response = self.test_pb2.StreamingOutputCallResponse()
+      response.payload.payload_type = self.test_pb2.COMPRESSABLE
+      response.payload.payload_compressable = 'a' * parameter.size
+      self._control()
+      yield response
+
+  def StreamingInputCall(self, request_iter):
+    response = self.test_pb2.StreamingInputCallResponse()
+    aggregated_payload_size = 0
+    for request in request_iter:
+      aggregated_payload_size += len(request.payload.payload_compressable)
+    response.aggregated_payload_size = aggregated_payload_size
+    self._control()
+    return response
+
+  def FullDuplexCall(self, request_iter):
+    for request in request_iter:
+      for parameter in request.response_parameters:
+        response = self.test_pb2.StreamingOutputCallResponse()
+        response.payload.payload_type = self.test_pb2.COMPRESSABLE
+        response.payload.payload_compressable = 'a' * parameter.size
+        self._control()
+        yield response
+
+  def HalfDuplexCall(self, request_iter):
+    responses = []
+    for request in request_iter:
+      for parameter in request.response_parameters:
+        response = self.test_pb2.StreamingOutputCallResponse()
+        response.payload.payload_type = self.test_pb2.COMPRESSABLE
+        response.payload.payload_compressable = 'a' * parameter.size
+        self._control()
+        responses.append(response)
+    for response in responses:
+      yield response
+
+
+def CreateService(test_pb2, delay=0, timeout=1):
+  """Provides a servicer backend and a stub.
+
+  The servicer is just the implementation
+  of the actual servicer passed to the face player of the python RPC
+  implementation; the two are detached.
+
+  Non-zero delay puts a delay on each call to the servicer, representative of
+  communication latency. Timeout is the default timeout for the stub while
+  waiting for the service.
+
+  Args:
+    test_pb2: the test_pb2 module generated by this test
+    delay: delay in seconds per response from the servicer
+    timeout: how long the stub will wait for the servicer by default.
+  Returns:
+    A two-tuple (servicer, stub), where the servicer is the back-end of the
+      service bound to the stub.
+  """
+  class Servicer(test_pb2.TestServiceServicer):
+
+    def UnaryCall(self, request):
+      return servicer_methods.UnaryCall(request)
+
+    def StreamingOutputCall(self, request):
+      return servicer_methods.StreamingOutputCall(request)
+
+    def StreamingInputCall(self, request_iter):
+      return servicer_methods.StreamingInputCall(request_iter)
+
+    def FullDuplexCall(self, request_iter):
+      return servicer_methods.FullDuplexCall(request_iter)
+
+    def HalfDuplexCall(self, request_iter):
+      return servicer_methods.HalfDuplexCall(request_iter)
+
+  servicer_methods = _ServicerMethods(test_pb2, delay)
+  servicer = Servicer()
+  linked_pair = test_pb2.mock_TestService(servicer, timeout)
+  stub = linked_pair.stub
+  return servicer_methods, stub
+
+
+def StreamingInputRequest(test_pb2):
+  for _ in range(3):
+    request = test_pb2.StreamingInputCallRequest()
+    request.payload.payload_type = test_pb2.COMPRESSABLE
+    request.payload.payload_compressable = 'a'
+    yield request
+
+
+def StreamingOutputRequest(test_pb2):
+  request = test_pb2.StreamingOutputCallRequest()
+  sizes = [1, 2, 3]
+  request.response_parameters.add(size=sizes[0], interval_us=0)
+  request.response_parameters.add(size=sizes[1], interval_us=0)
+  request.response_parameters.add(size=sizes[2], interval_us=0)
+  return request
+
+
+def FullDuplexRequest(test_pb2):
+  request = test_pb2.StreamingOutputCallRequest()
+  request.response_parameters.add(size=1, interval_us=0)
+  yield request
+  request = test_pb2.StreamingOutputCallRequest()
+  request.response_parameters.add(size=2, interval_us=0)
+  request.response_parameters.add(size=3, interval_us=0)
+  yield request
+
+
+class PythonPluginTest(unittest.TestCase):
+  """Test case for the gRPC Python protoc-plugin.
+
+  While reading these tests, remember that the futures API
+  (`stub.method.async()`) only gives futures for the *non-streaming* responses,
+  else it behaves like its blocking cousin.
+  """
+
+  def setUp(self):
+    protoc_command = '../../bins/%s/protobuf/protoc' % _build_mode
+    protoc_plugin_filename = '../../bins/%s/grpc_python_plugin' % _build_mode
+    test_proto_filename = '../cpp/interop/test.proto'
+    if not os.path.isfile(protoc_command):
+      # Assume that if we haven't built protoc that it's on the system.
+      protoc_command = 'protoc'
+
+    # ensure that the output directory exists
+    outdir = '../../gens/test/compiler/python/'
+    try:
+      os.makedirs(outdir)
+    except OSError as exception:
+      if exception.errno != errno.EEXIST:
+        raise
+
+    cmd = [
+        protoc_command,
+        '--plugin=protoc-gen-python-grpc=%s' % protoc_plugin_filename,
+        '-I %s' % os.path.dirname(test_proto_filename),
+        '--python_out=%s' % outdir,
+        '--python-grpc_out=%s' % outdir,
+        os.path.basename(test_proto_filename),
+    ]
+    subprocess.call(' '.join(cmd), shell=True)
+    sys.path.append(outdir)
+
+    self.delay = 1  # seconds
+    self.timeout = 2  # seconds
+
+  def testImportAttributes(self):
+    # check that we can access the members
+    import test_pb2  # pylint: disable=g-import-not-at-top
+    self.assertIsNotNone(getattr(test_pb2, 'TestServiceServicer', None))
+    self.assertIsNotNone(getattr(test_pb2, 'TestServiceService', None))
+    self.assertIsNotNone(getattr(test_pb2, 'TestServiceStub', None))
+
+  def testUnaryCall(self):
+    import test_pb2  # pylint: disable=g-import-not-at-top
+    servicer, stub = CreateService(test_pb2)
+    request = test_pb2.SimpleRequest(response_size=13)
+    response = stub.UnaryCall(request)
+    expected_response = servicer.UnaryCall(request)
+    self.assertEqual(expected_response, response)
+
+  def testUnaryCallAsync(self):
+    import test_pb2  # pylint: disable=g-import-not-at-top
+    servicer, stub = CreateService(
+        test_pb2, delay=self.delay, timeout=self.timeout)
+    request = test_pb2.SimpleRequest(response_size=13)
+    # TODO(atash): consider using the 'profile' module? Does it even work here?
+    start_time = time.clock()
+    response_future = stub.UnaryCall.async(request)
+    self.assertGreater(self.delay, time.clock() - start_time)
+    response = response_future.result()
+    expected_response = servicer.UnaryCall(request)
+    self.assertEqual(expected_response, response)
+
+  def testUnaryCallAsyncExpired(self):
+    import test_pb2  # pylint: disable=g-import-not-at-top
+    # set the timeout super low...
+    servicer, stub = CreateService(test_pb2, delay=1, timeout=0.1)
+    request = test_pb2.SimpleRequest(response_size=13)
+    with servicer.pause():
+      response_future = stub.UnaryCall.async(request)
+      with self.assertRaises(exceptions.ExpirationError):
+        response_future.result()
+
+  def testUnaryCallAsyncCancelled(self):
+    import test_pb2  # pylint: disable=g-import-not-at-top
+    servicer, stub = CreateService(test_pb2)
+    request = test_pb2.SimpleRequest(response_size=13)
+    with servicer.pause():
+      response_future = stub.UnaryCall.async(request)
+      response_future.cancel()
+      self.assertTrue(response_future.cancelled())
+
+  def testUnaryCallAsyncFailed(self):
+    import test_pb2  # pylint: disable=g-import-not-at-top
+    servicer, stub = CreateService(test_pb2)
+    request = test_pb2.SimpleRequest(response_size=13)
+    with servicer.fail():
+      response_future = stub.UnaryCall.async(request)
+      self.assertIsNotNone(response_future.exception())
+
+  def testStreamingOutputCall(self):
+    import test_pb2  # pylint: disable=g-import-not-at-top
+    servicer, stub = CreateService(test_pb2)
+    request = StreamingOutputRequest(test_pb2)
+    responses = stub.StreamingOutputCall(request)
+    expected_responses = servicer.StreamingOutputCall(request)
+    for check in itertools.izip_longest(expected_responses, responses):
+      expected_response, response = check
+      self.assertEqual(expected_response, response)
+
+  def testStreamingOutputCallAsync(self):
+    import test_pb2  # pylint: disable=g-import-not-at-top
+    servicer, stub = CreateService(test_pb2, timeout=self.timeout)
+    request = StreamingOutputRequest(test_pb2)
+    responses = stub.StreamingOutputCall.async(request)
+    expected_responses = servicer.StreamingOutputCall(request)
+    for check in itertools.izip_longest(expected_responses, responses):
+      expected_response, response = check
+      self.assertEqual(expected_response, response)
+
+  def testStreamingOutputCallAsyncExpired(self):
+    import test_pb2  # pylint: disable=g-import-not-at-top
+    servicer, stub = CreateService(test_pb2, timeout=0.1)
+    request = StreamingOutputRequest(test_pb2)
+    with servicer.pause():
+      responses = stub.StreamingOutputCall.async(request)
+      with self.assertRaises(exceptions.ExpirationError):
+        list(responses)
+
+  def testStreamingOutputCallAsyncCancelled(self):
+    import test_pb2  # pylint: disable=g-import-not-at-top
+    _, stub = CreateService(test_pb2, timeout=0.1)
+    request = StreamingOutputRequest(test_pb2)
+    responses = stub.StreamingOutputCall.async(request)
+    next(responses)
+    responses.cancel()
+    with self.assertRaises(future.CancelledError):
+      next(responses)
+
+  def testStreamingOutputCallAsyncFailed(self):
+    import test_pb2  # pylint: disable=g-import-not-at-top
+    servicer, stub = CreateService(test_pb2, timeout=0.1)
+    request = StreamingOutputRequest(test_pb2)
+    with servicer.fail():
+      responses = stub.StreamingOutputCall.async(request)
+      self.assertIsNotNone(responses)
+      with self.assertRaises(exceptions.ServicerError):
+        next(responses)
+
+  def testStreamingInputCall(self):
+    import test_pb2  # pylint: disable=g-import-not-at-top
+    servicer, stub = CreateService(test_pb2)
+    response = stub.StreamingInputCall(StreamingInputRequest(test_pb2))
+    expected_response = servicer.StreamingInputCall(
+        StreamingInputRequest(test_pb2))
+    self.assertEqual(expected_response, response)
+
+  def testStreamingInputCallAsync(self):
+    import test_pb2  # pylint: disable=g-import-not-at-top
+    servicer, stub = CreateService(
+        test_pb2, delay=self.delay, timeout=self.timeout)
+    start_time = time.clock()
+    response_future = stub.StreamingInputCall.async(
+        StreamingInputRequest(test_pb2))
+    self.assertGreater(self.delay, time.clock() - start_time)
+    response = response_future.result()
+    expected_response = servicer.StreamingInputCall(
+        StreamingInputRequest(test_pb2))
+    self.assertEqual(expected_response, response)
+
+  def testStreamingInputCallAsyncExpired(self):
+    import test_pb2  # pylint: disable=g-import-not-at-top
+    # set the timeout super low...
+    servicer, stub = CreateService(test_pb2, delay=1, timeout=0.1)
+    with servicer.pause():
+      response_future = stub.StreamingInputCall.async(
+          StreamingInputRequest(test_pb2))
+      with self.assertRaises(exceptions.ExpirationError):
+        response_future.result()
+      self.assertIsInstance(
+          response_future.exception(), exceptions.ExpirationError)
+
+  def testStreamingInputCallAsyncCancelled(self):
+    import test_pb2  # pylint: disable=g-import-not-at-top
+    servicer, stub = CreateService(test_pb2)
+    with servicer.pause():
+      response_future = stub.StreamingInputCall.async(
+          StreamingInputRequest(test_pb2))
+      response_future.cancel()
+      self.assertTrue(response_future.cancelled())
+    with self.assertRaises(future.CancelledError):
+      response_future.result()
+
+  def testStreamingInputCallAsyncFailed(self):
+    import test_pb2  # pylint: disable=g-import-not-at-top
+    servicer, stub = CreateService(test_pb2)
+    with servicer.fail():
+      response_future = stub.StreamingInputCall.async(
+          StreamingInputRequest(test_pb2))
+      self.assertIsNotNone(response_future.exception())
+
+  def testFullDuplexCall(self):
+    import test_pb2  # pylint: disable=g-import-not-at-top
+    servicer, stub = CreateService(test_pb2)
+    responses = stub.FullDuplexCall(FullDuplexRequest(test_pb2))
+    expected_responses = servicer.FullDuplexCall(FullDuplexRequest(test_pb2))
+    for check in itertools.izip_longest(expected_responses, responses):
+      expected_response, response = check
+      self.assertEqual(expected_response, response)
+
+  def testFullDuplexCallAsync(self):
+    import test_pb2  # pylint: disable=g-import-not-at-top
+    servicer, stub = CreateService(test_pb2, timeout=self.timeout)
+    responses = stub.FullDuplexCall.async(FullDuplexRequest(test_pb2))
+    expected_responses = servicer.FullDuplexCall(FullDuplexRequest(test_pb2))
+    for check in itertools.izip_longest(expected_responses, responses):
+      expected_response, response = check
+      self.assertEqual(expected_response, response)
+
+  def testFullDuplexCallAsyncExpired(self):
+    import test_pb2  # pylint: disable=g-import-not-at-top
+    servicer, stub = CreateService(test_pb2, timeout=0.1)
+    request = FullDuplexRequest(test_pb2)
+    with servicer.pause():
+      responses = stub.FullDuplexCall.async(request)
+      with self.assertRaises(exceptions.ExpirationError):
+        list(responses)
+
+  def testFullDuplexCallAsyncCancelled(self):
+    import test_pb2  # pylint: disable=g-import-not-at-top
+    _, stub = CreateService(test_pb2, timeout=0.1)
+    request = FullDuplexRequest(test_pb2)
+    responses = stub.FullDuplexCall.async(request)
+    next(responses)
+    responses.cancel()
+    with self.assertRaises(future.CancelledError):
+      next(responses)
+
+  def testFullDuplexCallAsyncFailed(self):
+    import test_pb2  # pylint: disable=g-import-not-at-top
+    servicer, stub = CreateService(test_pb2, timeout=0.1)
+    request = FullDuplexRequest(test_pb2)
+    with servicer.fail():
+      responses = stub.FullDuplexCall.async(request)
+      self.assertIsNotNone(responses)
+      with self.assertRaises(exceptions.ServicerError):
+        next(responses)
+
+  def testHalfDuplexCall(self):
+    import test_pb2  # pylint: disable=g-import-not-at-top
+    servicer, stub = CreateService(test_pb2)
+    def HalfDuplexRequest():
+      request = test_pb2.StreamingOutputCallRequest()
+      request.response_parameters.add(size=1, interval_us=0)
+      yield request
+      request = test_pb2.StreamingOutputCallRequest()
+      request.response_parameters.add(size=2, interval_us=0)
+      request.response_parameters.add(size=3, interval_us=0)
+      yield request
+    responses = stub.HalfDuplexCall(HalfDuplexRequest())
+    expected_responses = servicer.HalfDuplexCall(HalfDuplexRequest())
+    for check in itertools.izip_longest(expected_responses, responses):
+      expected_response, response = check
+      self.assertEqual(expected_response, response)
+
+  def testHalfDuplexCallAsyncWedged(self):
+    import test_pb2  # pylint: disable=g-import-not-at-top
+    _, stub = CreateService(test_pb2, timeout=1)
+    wait_flag = [False]
+    @contextlib.contextmanager
+    def wait():  # pylint: disable=invalid-name
+      # Where's Python 3's 'nonlocal' statement when you need it?
+      wait_flag[0] = True
+      yield
+      wait_flag[0] = False
+    def HalfDuplexRequest():
+      request = test_pb2.StreamingOutputCallRequest()
+      request.response_parameters.add(size=1, interval_us=0)
+      yield request
+      while wait_flag[0]:
+        time.sleep(0.1)
+    with wait():
+      responses = stub.HalfDuplexCall.async(HalfDuplexRequest())
+      # half-duplex waits for the client to send all info
+      with self.assertRaises(exceptions.ExpirationError):
+        next(responses)
+
+
+if __name__ == '__main__':
+  os.chdir(os.path.dirname(sys.argv[0]))
+  parser = argparse.ArgumentParser(description='Run Python compiler plugin test.')
+  parser.add_argument('--build_mode', dest='build_mode', type=str, default='dbg',
+                      help='The build mode of the targets to test, e.g. '
+                      '"dbg", "opt", "asan", etc.')
+  args, remainder = parser.parse_known_args()
+  _build_mode = args.build_mode
+  sys.argv[1:] = remainder
+  unittest.main()
diff --git a/test/compiler/test.proto b/test/compiler/test.proto
new file mode 100644
index 0000000..ed7c6a7
--- /dev/null
+++ b/test/compiler/test.proto
@@ -0,0 +1,139 @@
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+//     * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+//     * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+//     * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+// An integration test service that covers all the method signature permutations
+// of unary/streaming requests/responses.
+// This file is duplicated around the code base. See GitHub issue #526.
+syntax = "proto2";
+
+package grpc.testing;
+
+enum PayloadType {
+  // Compressable text format.
+  COMPRESSABLE= 1;
+
+  // Uncompressable binary format.
+  UNCOMPRESSABLE = 2;
+
+  // Randomly chosen from all other formats defined in this enum.
+  RANDOM = 3;
+}
+
+message Payload {
+  required PayloadType payload_type = 1;
+  oneof payload_body {
+    string payload_compressable = 2;
+    bytes payload_uncompressable = 3;
+  }
+}
+
+message SimpleRequest {
+  // Desired payload type in the response from the server.
+  // If response_type is RANDOM, server randomly chooses one from other formats.
+  optional PayloadType response_type = 1 [default=COMPRESSABLE];
+
+  // Desired payload size in the response from the server.
+  // If response_type is COMPRESSABLE, this denotes the size before compression.
+  optional int32 response_size = 2;
+
+  // Optional input payload sent along with the request.
+  optional Payload payload = 3;
+}
+
+message SimpleResponse {
+  optional Payload payload = 1;
+}
+
+message StreamingInputCallRequest {
+  // Optional input payload sent along with the request.
+  optional Payload payload = 1;
+
+  // Not expecting any payload from the response.
+}
+
+message StreamingInputCallResponse {
+  // Aggregated size of payloads received from the client.
+  optional int32 aggregated_payload_size = 1;
+}
+
+message ResponseParameters {
+  // Desired payload sizes in responses from the server.
+  // If response_type is COMPRESSABLE, this denotes the size before compression.
+  required int32 size = 1;
+
+  // Desired interval between consecutive responses in the response stream in
+  // microseconds.
+  required int32 interval_us = 2;
+}
+
+message StreamingOutputCallRequest {
+  // Desired payload type in the response from the server.
+  // If response_type is RANDOM, the payload from each response in the stream
+  // might be of different types. This is to simulate a mixed type of payload
+  // stream.
+  optional PayloadType response_type = 1 [default=COMPRESSABLE];
+
+  repeated ResponseParameters response_parameters = 2;
+
+  // Optional input payload sent along with the request.
+  optional Payload payload = 3;
+}
+
+message StreamingOutputCallResponse {
+  optional Payload payload = 1;
+}
+
+service TestService {
+  // One request followed by one response.
+  // The server returns the client payload as-is.
+  rpc UnaryCall(SimpleRequest) returns (SimpleResponse);
+
+  // One request followed by a sequence of responses (streamed download).
+  // The server returns the payload with client desired type and sizes.
+  rpc StreamingOutputCall(StreamingOutputCallRequest)
+      returns (stream StreamingOutputCallResponse);
+
+  // A sequence of requests followed by one response (streamed upload).
+  // The server returns the aggregated size of client payload as the result.
+  rpc StreamingInputCall(stream StreamingInputCallRequest)
+      returns (StreamingInputCallResponse);
+
+  // A sequence of requests with each request served by the server immediately.
+  // As one request could lead to multiple responses, this interface
+  // demonstrates the idea of full duplexing.
+  rpc FullDuplexCall(stream StreamingOutputCallRequest)
+      returns (stream StreamingOutputCallResponse);
+
+  // A sequence of requests followed by a sequence of responses.
+  // The server buffers all the client requests and then serves them in order. A
+  // stream of responses are returned to the client when the server starts with
+  // first request.
+  rpc HalfDuplexCall(stream StreamingOutputCallRequest)
+      returns (stream StreamingOutputCallResponse);
+}
diff --git a/tools/run_tests/run_python.sh b/tools/run_tests/run_python.sh
index 1b8fe19..fe40b51 100755
--- a/tools/run_tests/run_python.sh
+++ b/tools/run_tests/run_python.sh
@@ -37,6 +37,7 @@
 export LD_LIBRARY_PATH=$root/libs/opt
 source python2.7_virtual_environment/bin/activate
 # TODO(issue 215): Properly itemize these in run_tests.py so that they can be parallelized.
+python2.7 -B test/compiler/python_plugin_test.py
 python2.7 -B -m grpc._adapter._blocking_invocation_inline_service_test
 python2.7 -B -m grpc._adapter._c_test
 python2.7 -B -m grpc._adapter._event_invocation_synchronous_event_service_test