Merge pull request #9119 from soltanmm-google/there-is-no-such-thing-as-an-environmentally-responsible-oil-pipeline-in-this-day-and-age

Force use of local distributions in Python-building
diff --git a/src/node/ext/call_credentials.cc b/src/node/ext/call_credentials.cc
index 81fc552..41f6c29 100644
--- a/src/node/ext/call_credentials.cc
+++ b/src/node/ext/call_credentials.cc
@@ -35,7 +35,7 @@
 #include <nan.h>
 #include <uv.h>
 
-#include <list>
+#include <queue>
 
 #include "grpc/grpc.h"
 #include "grpc/grpc_security.h"
@@ -170,7 +170,7 @@
   grpc_metadata_credentials_plugin plugin;
   plugin_state *state = new plugin_state;
   state->callback = new Nan::Callback(info[0].As<Function>());
-  state->pending_callbacks = new std::list<plugin_callback_data*>();
+  state->pending_callbacks = new std::queue<plugin_callback_data*>();
   uv_mutex_init(&state->plugin_mutex);
   uv_async_init(uv_default_loop(),
                 &state->plugin_async,
@@ -232,13 +232,13 @@
 NAUV_WORK_CB(SendPluginCallback) {
   Nan::HandleScope scope;
   plugin_state *state = reinterpret_cast<plugin_state*>(async->data);
-  std::list<plugin_callback_data*> callbacks;
+  std::queue<plugin_callback_data*> callbacks;
   uv_mutex_lock(&state->plugin_mutex);
-  callbacks.splice(callbacks.begin(), *state->pending_callbacks);
+  state->pending_callbacks->swap(callbacks);
   uv_mutex_unlock(&state->plugin_mutex);
   while (!callbacks.empty()) {
     plugin_callback_data *data = callbacks.front();
-    callbacks.pop_front();
+    callbacks.pop();
     Local<Object> callback_data = Nan::New<Object>();
     Nan::Set(callback_data, Nan::New("cb").ToLocalChecked(),
              Nan::New<v8::External>(reinterpret_cast<void*>(data->cb)));
@@ -267,7 +267,7 @@
   data->user_data = user_data;
 
   uv_mutex_lock(&p_state->plugin_mutex);
-  p_state->pending_callbacks->push_back(data);
+  p_state->pending_callbacks->push(data);
   uv_mutex_unlock(&p_state->plugin_mutex);
 
   uv_async_send(&p_state->plugin_async);
diff --git a/src/node/ext/call_credentials.h b/src/node/ext/call_credentials.h
index 04c852b..21a4b89 100644
--- a/src/node/ext/call_credentials.h
+++ b/src/node/ext/call_credentials.h
@@ -34,7 +34,7 @@
 #ifndef GRPC_NODE_CALL_CREDENTIALS_H_
 #define GRPC_NODE_CALL_CREDENTIALS_H_
 
-#include <list>
+#include <queue>
 
 #include <node.h>
 #include <nan.h>
@@ -84,7 +84,7 @@
 
 typedef struct plugin_state {
   Nan::Callback *callback;
-  std::list<plugin_callback_data*> *pending_callbacks;
+  std::queue<plugin_callback_data*> *pending_callbacks;
   uv_mutex_t plugin_mutex;
   // async.data == this
   uv_async_t plugin_async;
diff --git a/src/node/ext/node_grpc.cc b/src/node/ext/node_grpc.cc
index 848c601..620b086 100644
--- a/src/node/ext/node_grpc.cc
+++ b/src/node/ext/node_grpc.cc
@@ -31,7 +31,7 @@
  *
  */
 
-#include <list>
+#include <queue>
 
 #include <node.h>
 #include <nan.h>
@@ -66,7 +66,7 @@
 
 typedef struct logger_state {
   Nan::Callback *callback;
-  std::list<log_args *> *pending_args;
+  std::queue<log_args *> *pending_args;
   uv_mutex_t mutex;
   uv_async_t async;
   // Indicates that a logger has been set
@@ -334,14 +334,14 @@
 
 NAUV_WORK_CB(LogMessagesCallback) {
   Nan::HandleScope scope;
-  std::list<log_args *> args;
+  std::queue<log_args *> args;
   uv_mutex_lock(&grpc_logger_state.mutex);
-  args.splice(args.begin(), *grpc_logger_state.pending_args);
+  grpc_logger_state.pending_args->swap(args);
   uv_mutex_unlock(&grpc_logger_state.mutex);
   /* Call the callback with each log message */
   while (!args.empty()) {
     log_args *arg = args.front();
-    args.pop_front();
+    args.pop();
     Local<Value> file = Nan::New(arg->core_args.file).ToLocalChecked();
     Local<Value> line = Nan::New<Uint32, uint32_t>(arg->core_args.line);
     Local<Value> severity = Nan::New(
@@ -368,7 +368,7 @@
   args_copy->timestamp = gpr_now(GPR_CLOCK_REALTIME);
 
   uv_mutex_lock(&grpc_logger_state.mutex);
-  grpc_logger_state.pending_args->push_back(args_copy);
+  grpc_logger_state.pending_args->push(args_copy);
   uv_mutex_unlock(&grpc_logger_state.mutex);
 
   uv_async_send(&grpc_logger_state.async);
@@ -376,7 +376,7 @@
 
 void init_logger() {
   memset(&grpc_logger_state, 0, sizeof(logger_state));
-  grpc_logger_state.pending_args = new std::list<log_args *>();
+  grpc_logger_state.pending_args = new std::queue<log_args *>();
   uv_mutex_init(&grpc_logger_state.mutex);
   uv_async_init(uv_default_loop(),
                 &grpc_logger_state.async,
diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py
index 3ac735a..f07142a 100644
--- a/src/python/grpcio/grpc/_channel.py
+++ b/src/python/grpcio/grpc/_channel.py
@@ -32,6 +32,7 @@
 import sys
 import threading
 import time
+import logging
 
 import grpc
 from grpc import _common
@@ -99,6 +100,22 @@
     else:
       condition.wait(timeout=remaining)
 
+_INTERNAL_CALL_ERROR_MESSAGE_FORMAT = (
+    'Internal gRPC call error %d. ' +
+    'Please report to https://github.com/grpc/grpc/issues')
+
+def _check_call_error(call_error, metadata):
+  if call_error == cygrpc.CallError.invalid_metadata:
+    raise ValueError('metadata was invalid: %s' % metadata)
+  elif call_error != cygrpc.CallError.ok:
+    raise ValueError(_INTERNAL_CALL_ERROR_MESSAGE_FORMAT % call_error)
+
+def _call_error_set_RPCstate(state, call_error, metadata):
+  if call_error == cygrpc.CallError.invalid_metadata:
+    _abort(state, grpc.StatusCode.INTERNAL, 'metadata was invalid: %s' % metadata)
+  else:
+    _abort(state, grpc.StatusCode.INTERNAL, 
+        _INTERNAL_CALL_ERROR_MESSAGE_FORMAT % call_error)
 
 class _RPCState(object):
 
@@ -181,7 +198,16 @@
   event_handler = _event_handler(state, call, None)
 
   def consume_request_iterator():
-    for request in request_iterator:
+    while True:
+      try:
+        request = next(request_iterator)
+      except StopIteration:
+        break
+      except Exception as e:
+        logging.exception("Exception iterating requests!")
+        call.cancel()
+        _abort(state, grpc.StatusCode.UNKNOWN, "Exception iterating requests!")
+        return
       serialized_request = _common.serialize(request, request_serializer)
       with state.condition:
         if state.code is None and not state.cancelled:
@@ -472,7 +498,8 @@
           None, 0, completion_queue, self._method, None, deadline_timespec)
       if credentials is not None:
         call.set_credentials(credentials._credentials)
-      call.start_client_batch(cygrpc.Operations(operations), None)
+      call_error = call.start_client_batch(cygrpc.Operations(operations), None)
+      _check_call_error(call_error, metadata)
       _handle_event(completion_queue.poll(), state, self._response_deserializer)
       return state, deadline
 
@@ -496,7 +523,11 @@
         call.set_credentials(credentials._credentials)
       event_handler = _event_handler(state, call, self._response_deserializer)
       with state.condition:
-        call.start_client_batch(cygrpc.Operations(operations), event_handler)
+        call_error = call.start_client_batch(cygrpc.Operations(operations),
+            event_handler)
+        if call_error != cygrpc.CallError.ok:
+          _call_error_set_RPCstate(state, call_error, metadata)
+          return _Rendezvous(state, None, None, deadline)
         drive_call()
       return _Rendezvous(state, call, self._response_deserializer, deadline)
 
@@ -536,7 +567,11 @@
             cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),
             cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),
         )
-        call.start_client_batch(cygrpc.Operations(operations), event_handler)
+        call_error = call.start_client_batch(cygrpc.Operations(operations),
+            event_handler)
+        if call_error != cygrpc.CallError.ok:
+          _call_error_set_RPCstate(state, call_error, metadata)
+          return _Rendezvous(state, None, None, deadline)
         drive_call()
       return _Rendezvous(state, call, self._response_deserializer, deadline)
 
@@ -571,7 +606,8 @@
           cygrpc.operation_receive_message(_EMPTY_FLAGS),
           cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),
       )
-      call.start_client_batch(cygrpc.Operations(operations), None)
+      call_error = call.start_client_batch(cygrpc.Operations(operations), None)
+      _check_call_error(call_error, metadata)
       _consume_request_iterator(
           request_iterator, state, call, self._request_serializer)
     while True:
@@ -615,7 +651,11 @@
           cygrpc.operation_receive_message(_EMPTY_FLAGS),
           cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),
       )
-      call.start_client_batch(cygrpc.Operations(operations), event_handler)
+      call_error = call.start_client_batch(cygrpc.Operations(operations),
+          event_handler)
+      if call_error != cygrpc.CallError.ok:
+        _call_error_set_RPCstate(state, call_error, metadata)
+        return _Rendezvous(state, None, None, deadline)
       drive_call()
       _consume_request_iterator(
           request_iterator, state, call, self._request_serializer)
@@ -652,7 +692,11 @@
               _common.cygrpc_metadata(metadata), _EMPTY_FLAGS),
           cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),
       )
-      call.start_client_batch(cygrpc.Operations(operations), event_handler)
+      call_error = call.start_client_batch(cygrpc.Operations(operations),
+          event_handler)
+      if call_error != cygrpc.CallError.ok:
+        _call_error_set_RPCstate(state, call_error, metadata)
+        return _Rendezvous(state, None, None, deadline)
       drive_call()
       _consume_request_iterator(
           request_iterator, state, call, self._request_serializer)
diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json
index d0cf2b5..d47631c 100644
--- a/src/python/grpcio_tests/tests/tests.json
+++ b/src/python/grpcio_tests/tests/tests.json
@@ -25,6 +25,8 @@
   "_implementations_test.CallCredentialsTest",
   "_implementations_test.ChannelCredentialsTest",
   "_insecure_interop_test.InsecureInteropTest",
+  "_invalid_metadata_test.InvalidMetadataTest",
+  "_invocation_defects_test.InvocationDefectsTest",
   "_logging_pool_test.LoggingPoolTest",
   "_metadata_code_details_test.MetadataCodeDetailsTest",
   "_metadata_test.MetadataTest",
diff --git a/src/python/grpcio_tests/tests/unit/_compression_test.py b/src/python/grpcio_tests/tests/unit/_compression_test.py
index 83b9109..4d3f02e 100644
--- a/src/python/grpcio_tests/tests/unit/_compression_test.py
+++ b/src/python/grpcio_tests/tests/unit/_compression_test.py
@@ -125,7 +125,7 @@
     compressed_channel = grpc.insecure_channel('localhost:%d' % self._port,
         options=[('grpc.default_compression_algorithm', 1)])
     multi_callable = compressed_channel.stream_stream(_STREAM_STREAM)
-    call = multi_callable([request] * test_constants.STREAM_LENGTH)
+    call = multi_callable(iter([request] * test_constants.STREAM_LENGTH))
     for response in call:
       self.assertEqual(request, response)
 
diff --git a/src/python/grpcio_tests/tests/unit/_empty_message_test.py b/src/python/grpcio_tests/tests/unit/_empty_message_test.py
index 131f6e9..69f4689 100644
--- a/src/python/grpcio_tests/tests/unit/_empty_message_test.py
+++ b/src/python/grpcio_tests/tests/unit/_empty_message_test.py
@@ -123,12 +123,12 @@
 
   def testStreamUnary(self):
     response = self._channel.stream_unary(_STREAM_UNARY)(
-        [_REQUEST] * test_constants.STREAM_LENGTH)
+        iter([_REQUEST] * test_constants.STREAM_LENGTH))
     self.assertEqual(_RESPONSE, response)
 
   def testStreamStream(self):
     response_iterator = self._channel.stream_stream(_STREAM_STREAM)(
-        [_REQUEST] * test_constants.STREAM_LENGTH)
+        iter([_REQUEST] * test_constants.STREAM_LENGTH))
     self.assertSequenceEqual(
         [_RESPONSE] * test_constants.STREAM_LENGTH, list(response_iterator))
 
diff --git a/src/python/grpcio_tests/tests/unit/_exit_scenarios.py b/src/python/grpcio_tests/tests/unit/_exit_scenarios.py
index b33802b..7775271 100644
--- a/src/python/grpcio_tests/tests/unit/_exit_scenarios.py
+++ b/src/python/grpcio_tests/tests/unit/_exit_scenarios.py
@@ -240,7 +240,7 @@
       multi_callable = channel.stream_unary(method)
       future = multi_callable.future(infinite_request_iterator())
       result, call = multi_callable.with_call(
-          [REQUEST] * test_constants.STREAM_LENGTH)
+          iter([REQUEST] * test_constants.STREAM_LENGTH))
     elif (args.scenario == IN_FLIGHT_STREAM_STREAM_CALL or
           args.scenario == IN_FLIGHT_PARTIAL_STREAM_STREAM_CALL):
       multi_callable = channel.stream_stream(method)
diff --git a/src/python/grpcio_tests/tests/unit/_invalid_metadata_test.py b/src/python/grpcio_tests/tests/unit/_invalid_metadata_test.py
new file mode 100644
index 0000000..2dc225d
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/_invalid_metadata_test.py
@@ -0,0 +1,175 @@
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Test of RPCs made against gRPC Python's application-layer API."""
+
+import unittest
+
+import grpc
+
+from tests.unit.framework.common import test_constants
+
+_SERIALIZE_REQUEST = lambda bytestring: bytestring * 2
+_DESERIALIZE_REQUEST = lambda bytestring: bytestring[len(bytestring) // 2:]
+_SERIALIZE_RESPONSE = lambda bytestring: bytestring * 3
+_DESERIALIZE_RESPONSE = lambda bytestring: bytestring[:len(bytestring) // 3]
+
+_UNARY_UNARY = '/test/UnaryUnary'
+_UNARY_STREAM = '/test/UnaryStream'
+_STREAM_UNARY = '/test/StreamUnary'
+_STREAM_STREAM = '/test/StreamStream'
+
+
+def _unary_unary_multi_callable(channel):
+  return channel.unary_unary(_UNARY_UNARY)
+
+
+def _unary_stream_multi_callable(channel):
+  return channel.unary_stream(
+      _UNARY_STREAM,
+      request_serializer=_SERIALIZE_REQUEST,
+      response_deserializer=_DESERIALIZE_RESPONSE)
+
+
+def _stream_unary_multi_callable(channel):
+  return channel.stream_unary(
+      _STREAM_UNARY,
+      request_serializer=_SERIALIZE_REQUEST,
+      response_deserializer=_DESERIALIZE_RESPONSE)
+
+
+def _stream_stream_multi_callable(channel):
+  return channel.stream_stream(_STREAM_STREAM)
+
+
+class InvalidMetadataTest(unittest.TestCase):
+
+  def setUp(self):
+    self._channel = grpc.insecure_channel('localhost:8080')
+    self._unary_unary = _unary_unary_multi_callable(self._channel)
+    self._unary_stream = _unary_stream_multi_callable(self._channel)
+    self._stream_unary = _stream_unary_multi_callable(self._channel)
+    self._stream_stream = _stream_stream_multi_callable(self._channel)
+
+  def testUnaryRequestBlockingUnaryResponse(self):
+    request = b'\x07\x08'
+    metadata = (('InVaLiD', 'UnaryRequestBlockingUnaryResponse'),)
+    expected_error_details = "metadata was invalid: %s" % metadata
+    with self.assertRaises(ValueError) as exception_context:
+      self._unary_unary(request, metadata=metadata)
+    self.assertIn(expected_error_details, str(exception_context.exception))
+
+  def testUnaryRequestBlockingUnaryResponseWithCall(self):
+    request = b'\x07\x08'
+    metadata = (('InVaLiD', 'UnaryRequestBlockingUnaryResponseWithCall'),)
+    expected_error_details = "metadata was invalid: %s" % metadata
+    with self.assertRaises(ValueError) as exception_context:
+      self._unary_unary.with_call(request, metadata=metadata)
+    self.assertIn(expected_error_details, str(exception_context.exception))
+
+  def testUnaryRequestFutureUnaryResponse(self):
+    request = b'\x07\x08'
+    metadata = (('InVaLiD', 'UnaryRequestFutureUnaryResponse'),)
+    expected_error_details = "metadata was invalid: %s" % metadata
+    response_future = self._unary_unary.future(request, metadata=metadata)
+    with self.assertRaises(grpc.RpcError) as exception_context:
+      response_future.result()
+    self.assertEqual(
+        exception_context.exception.details(), expected_error_details)
+    self.assertEqual(
+        exception_context.exception.code(), grpc.StatusCode.INTERNAL)
+    self.assertEqual(response_future.details(), expected_error_details)
+    self.assertEqual(response_future.code(), grpc.StatusCode.INTERNAL)
+
+  def testUnaryRequestStreamResponse(self):
+    request = b'\x37\x58'
+    metadata = (('InVaLiD', 'UnaryRequestStreamResponse'),)
+    expected_error_details = "metadata was invalid: %s" % metadata
+    response_iterator = self._unary_stream(request, metadata=metadata)
+    with self.assertRaises(grpc.RpcError) as exception_context:
+      next(response_iterator)
+    self.assertEqual(
+        exception_context.exception.details(), expected_error_details)
+    self.assertEqual(
+        exception_context.exception.code(), grpc.StatusCode.INTERNAL)
+    self.assertEqual(response_iterator.details(), expected_error_details)
+    self.assertEqual(response_iterator.code(), grpc.StatusCode.INTERNAL)
+
+  def testStreamRequestBlockingUnaryResponse(self):
+    request_iterator = (b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
+    metadata = (('InVaLiD', 'StreamRequestBlockingUnaryResponse'),)
+    expected_error_details = "metadata was invalid: %s" % metadata
+    with self.assertRaises(ValueError) as exception_context:
+      self._stream_unary(request_iterator, metadata=metadata)
+    self.assertIn(expected_error_details, str(exception_context.exception))
+
+  def testStreamRequestBlockingUnaryResponseWithCall(self):
+    request_iterator = (
+        b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
+    metadata = (('InVaLiD', 'StreamRequestBlockingUnaryResponseWithCall'),)
+    expected_error_details = "metadata was invalid: %s" % metadata
+    multi_callable = _stream_unary_multi_callable(self._channel)
+    with self.assertRaises(ValueError) as exception_context:
+      multi_callable.with_call(request_iterator, metadata=metadata)
+    self.assertIn(expected_error_details, str(exception_context.exception))
+
+  def testStreamRequestFutureUnaryResponse(self):
+    request_iterator = (
+        b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
+    metadata = (('InVaLiD', 'StreamRequestFutureUnaryResponse'),)
+    expected_error_details = "metadata was invalid: %s" % metadata
+    response_future = self._stream_unary.future(
+        request_iterator, metadata=metadata)
+    with self.assertRaises(grpc.RpcError) as exception_context:
+      response_future.result()
+    self.assertEqual(
+        exception_context.exception.details(), expected_error_details)
+    self.assertEqual(
+        exception_context.exception.code(), grpc.StatusCode.INTERNAL)
+    self.assertEqual(response_future.details(), expected_error_details)
+    self.assertEqual(response_future.code(), grpc.StatusCode.INTERNAL)
+
+  def testStreamRequestStreamResponse(self):
+    request_iterator = (
+        b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
+    metadata = (('InVaLiD', 'StreamRequestStreamResponse'),)
+    expected_error_details = "metadata was invalid: %s" % metadata
+    response_iterator = self._stream_stream(request_iterator, metadata=metadata)
+    with self.assertRaises(grpc.RpcError) as exception_context:
+      next(response_iterator)
+    self.assertEqual(
+        exception_context.exception.details(), expected_error_details)
+    self.assertEqual(
+        exception_context.exception.code(), grpc.StatusCode.INTERNAL)
+    self.assertEqual(response_iterator.details(), expected_error_details)
+    self.assertEqual(response_iterator.code(), grpc.StatusCode.INTERNAL)
+
+
+if __name__ == '__main__':
+  unittest.main(verbosity=2)
diff --git a/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py
new file mode 100644
index 0000000..4312679
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py
@@ -0,0 +1,247 @@
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import itertools
+import threading
+import unittest
+from concurrent import futures
+
+import grpc
+from grpc.framework.foundation import logging_pool
+
+from tests.unit.framework.common import test_constants
+from tests.unit.framework.common import test_control
+
+_SERIALIZE_REQUEST = lambda bytestring: bytestring * 2
+_DESERIALIZE_REQUEST = lambda bytestring: bytestring[len(bytestring) // 2:]
+_SERIALIZE_RESPONSE = lambda bytestring: bytestring * 3
+_DESERIALIZE_RESPONSE = lambda bytestring: bytestring[:len(bytestring) // 3]
+
+_UNARY_UNARY = '/test/UnaryUnary'
+_UNARY_STREAM = '/test/UnaryStream'
+_STREAM_UNARY = '/test/StreamUnary'
+_STREAM_STREAM = '/test/StreamStream'
+
+
+class _Callback(object):
+  def __init__(self):
+    self._condition = threading.Condition()
+    self._value = None
+    self._called = False
+
+  def __call__(self, value):
+    with self._condition:
+      self._value = value
+      self._called = True
+      self._condition.notify_all()
+
+  def value(self):
+    with self._condition:
+      while not self._called:
+        self._condition.wait()
+      return self._value
+
+
+class _Handler(object):
+  def __init__(self, control):
+    self._control = control
+
+  def handle_unary_unary(self, request, servicer_context):
+    self._control.control()
+    if servicer_context is not None:
+      servicer_context.set_trailing_metadata((('testkey', 'testvalue',),))
+    return request
+
+  def handle_unary_stream(self, request, servicer_context):
+    for _ in range(test_constants.STREAM_LENGTH):
+      self._control.control()
+      yield request
+    self._control.control()
+    if servicer_context is not None:
+      servicer_context.set_trailing_metadata((('testkey', 'testvalue',),))
+
+  def handle_stream_unary(self, request_iterator, servicer_context):
+    if servicer_context is not None:
+      servicer_context.invocation_metadata()
+    self._control.control()
+    response_elements = []
+    for request in request_iterator:
+      self._control.control()
+      response_elements.append(request)
+    self._control.control()
+    if servicer_context is not None:
+      servicer_context.set_trailing_metadata((('testkey', 'testvalue',),))
+    return b''.join(response_elements)
+
+  def handle_stream_stream(self, request_iterator, servicer_context):
+    self._control.control()
+    if servicer_context is not None:
+      servicer_context.set_trailing_metadata((('testkey', 'testvalue',),))
+    for request in request_iterator:
+      self._control.control()
+      yield request
+    self._control.control()
+
+
+class _MethodHandler(grpc.RpcMethodHandler):
+  def __init__(
+    self, request_streaming, response_streaming, request_deserializer,
+    response_serializer, unary_unary, unary_stream, stream_unary,
+    stream_stream):
+    self.request_streaming = request_streaming
+    self.response_streaming = response_streaming
+    self.request_deserializer = request_deserializer
+    self.response_serializer = response_serializer
+    self.unary_unary = unary_unary
+    self.unary_stream = unary_stream
+    self.stream_unary = stream_unary
+    self.stream_stream = stream_stream
+
+
+class _GenericHandler(grpc.GenericRpcHandler):
+  def __init__(self, handler):
+    self._handler = handler
+
+  def service(self, handler_call_details):
+    if handler_call_details.method == _UNARY_UNARY:
+      return _MethodHandler(
+        False, False, None, None, self._handler.handle_unary_unary, None,
+        None, None)
+    elif handler_call_details.method == _UNARY_STREAM:
+      return _MethodHandler(
+        False, True, _DESERIALIZE_REQUEST, _SERIALIZE_RESPONSE, None,
+        self._handler.handle_unary_stream, None, None)
+    elif handler_call_details.method == _STREAM_UNARY:
+      return _MethodHandler(
+        True, False, _DESERIALIZE_REQUEST, _SERIALIZE_RESPONSE, None, None,
+        self._handler.handle_stream_unary, None)
+    elif handler_call_details.method == _STREAM_STREAM:
+      return _MethodHandler(
+        True, True, None, None, None, None, None,
+        self._handler.handle_stream_stream)
+    else:
+      return None
+
+
+class FailAfterFewIterationsCounter(object):
+    def __init__(self, high, bytestring):
+        self._current = 0
+        self._high = high
+        self._bytestring = bytestring
+
+    def __iter__(self):
+        return self
+
+    def __next__(self):
+        if self._current >= self._high:
+            raise Exception("This is a deliberate failure in a unit test.")
+        else:
+            self._current += 1
+            return self._bytestring
+
+
+def _unary_unary_multi_callable(channel):
+  return channel.unary_unary(_UNARY_UNARY)
+
+
+def _unary_stream_multi_callable(channel):
+  return channel.unary_stream(
+    _UNARY_STREAM,
+    request_serializer=_SERIALIZE_REQUEST,
+    response_deserializer=_DESERIALIZE_RESPONSE)
+
+
+def _stream_unary_multi_callable(channel):
+  return channel.stream_unary(
+    _STREAM_UNARY,
+    request_serializer=_SERIALIZE_REQUEST,
+    response_deserializer=_DESERIALIZE_RESPONSE)
+
+
+def _stream_stream_multi_callable(channel):
+  return channel.stream_stream(_STREAM_STREAM)
+
+
+class InvocationDefectsTest(unittest.TestCase):
+  def setUp(self):
+    self._control = test_control.PauseFailControl()
+    self._handler = _Handler(self._control)
+    self._server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
+
+    self._server = grpc.server(self._server_pool)
+    port = self._server.add_insecure_port('[::]:0')
+    self._server.add_generic_rpc_handlers((_GenericHandler(self._handler),))
+    self._server.start()
+
+    self._channel = grpc.insecure_channel('localhost:%d' % port)
+
+  def tearDown(self):
+    self._server.stop(0)
+
+  def testIterableStreamRequestBlockingUnaryResponse(self):
+    requests = [b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)]
+    multi_callable = _stream_unary_multi_callable(self._channel)
+
+    with self.assertRaises(grpc.RpcError):
+      response = multi_callable(
+        requests,
+        metadata=(('test', 'IterableStreamRequestBlockingUnaryResponse'),))
+
+  def testIterableStreamRequestFutureUnaryResponse(self):
+    requests = [b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)]
+    multi_callable = _stream_unary_multi_callable(self._channel)
+    response_future = multi_callable.future(
+      requests,
+      metadata=(
+        ('test', 'IterableStreamRequestFutureUnaryResponse'),))
+
+    with self.assertRaises(grpc.RpcError):
+      response = response_future.result()
+
+  def testIterableStreamRequestStreamResponse(self):
+    requests = [b'\x77\x58' for _ in range(test_constants.STREAM_LENGTH)]
+    multi_callable = _stream_stream_multi_callable(self._channel)
+    response_iterator = multi_callable(
+      requests,
+      metadata=(('test', 'IterableStreamRequestStreamResponse'),))
+
+    with self.assertRaises(grpc.RpcError):
+      next(response_iterator)
+
+  def testIteratorStreamRequestStreamResponse(self):
+    requests_iterator = FailAfterFewIterationsCounter(
+      test_constants.STREAM_LENGTH // 2, b'\x07\x08')
+    multi_callable = _stream_stream_multi_callable(self._channel)
+    response_iterator = multi_callable(
+      requests_iterator,
+      metadata=(('test', 'IteratorStreamRequestStreamResponse'),))
+
+    with self.assertRaises(grpc.RpcError):
+      for _ in range(test_constants.STREAM_LENGTH // 2 + 1):
+        next(response_iterator)
diff --git a/src/python/grpcio_tests/tests/unit/_metadata_test.py b/src/python/grpcio_tests/tests/unit/_metadata_test.py
index da73476..caba53f 100644
--- a/src/python/grpcio_tests/tests/unit/_metadata_test.py
+++ b/src/python/grpcio_tests/tests/unit/_metadata_test.py
@@ -193,7 +193,7 @@
   def testStreamUnary(self):
     multi_callable = self._channel.stream_unary(_STREAM_UNARY)
     unused_response, call = multi_callable.with_call(
-        [_REQUEST] * test_constants.STREAM_LENGTH,
+        iter([_REQUEST] * test_constants.STREAM_LENGTH),
         metadata=_CLIENT_METADATA)
     self.assertTrue(test_common.metadata_transmitted(
         _SERVER_INITIAL_METADATA, call.initial_metadata()))
@@ -202,7 +202,7 @@
 
   def testStreamStream(self):
     multi_callable = self._channel.stream_stream(_STREAM_STREAM)
-    call = multi_callable([_REQUEST] * test_constants.STREAM_LENGTH,
+    call = multi_callable(iter([_REQUEST] * test_constants.STREAM_LENGTH),
                           metadata=_CLIENT_METADATA)
     self.assertTrue(test_common.metadata_transmitted(
         _SERVER_INITIAL_METADATA, call.initial_metadata()))
diff --git a/src/ruby/tools/bin/grpc_tools_ruby_protoc b/src/ruby/tools/bin/grpc_tools_ruby_protoc
index dab06e7..7e619e7 100755
--- a/src/ruby/tools/bin/grpc_tools_ruby_protoc
+++ b/src/ruby/tools/bin/grpc_tools_ruby_protoc
@@ -30,7 +30,7 @@
 
 require 'rbconfig'
 
-require_relative '../os_check'
+require_relative '../platform_check'
 
 ext = RbConfig::CONFIG['EXEEXT']
 
@@ -39,7 +39,7 @@
 plugin_name = 'grpc_ruby_plugin' + ext
 
 protoc_dir = File.join(File.dirname(__FILE__),
-                       RbConfig::CONFIG['host_cpu'] + '-' + OS.os_name)
+                       PLATFORM.architecture + '-' + PLATFORM.os_name)
 
 protoc_path = File.join(protoc_dir, protoc_name)
 
diff --git a/src/ruby/tools/bin/grpc_tools_ruby_protoc_plugin b/src/ruby/tools/bin/grpc_tools_ruby_protoc_plugin
index 4b296de..e6af2fe 100755
--- a/src/ruby/tools/bin/grpc_tools_ruby_protoc_plugin
+++ b/src/ruby/tools/bin/grpc_tools_ruby_protoc_plugin
@@ -30,12 +30,12 @@
 
 require 'rbconfig'
 
-require_relative '../os_check'
+require_relative '../platform_check'
 
 plugin_name = 'grpc_ruby_plugin' + RbConfig::CONFIG['EXEEXT']
 
 plugin_path = File.join(File.dirname(__FILE__),
-                        RbConfig::CONFIG['host_cpu'] + '-' + OS.os_name,
+                        PLATFORM.architecture + '-' + PLATFORM.os_name,
                         plugin_name)
 
 exec([ plugin_path, plugin_path ], *ARGV)
diff --git a/src/ruby/tools/grpc-tools.gemspec b/src/ruby/tools/grpc-tools.gemspec
index 68e2a7a..bc142ae 100644
--- a/src/ruby/tools/grpc-tools.gemspec
+++ b/src/ruby/tools/grpc-tools.gemspec
@@ -11,7 +11,7 @@
   s.description = 'protoc and the Ruby gRPC protoc plugin'
   s.license = 'BSD-3-Clause'
 
-  s.files = %w( version.rb os_check.rb README.md )
+  s.files = %w( version.rb platform_check.rb README.md )
   s.files += Dir.glob('bin/**/*')
 
   s.bindir = 'bin'
diff --git a/src/ruby/tools/os_check.rb b/src/ruby/tools/platform_check.rb
similarity index 84%
rename from src/ruby/tools/os_check.rb
rename to src/ruby/tools/platform_check.rb
index 2677306..1f4d5a6 100644
--- a/src/ruby/tools/os_check.rb
+++ b/src/ruby/tools/platform_check.rb
@@ -27,19 +27,28 @@
 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
-# This is based on http://stackoverflow.com/a/171011/159388 by Aaron Hinni
-
 require 'rbconfig'
 
-module OS
-  def OS.os_name
+# This is based on http://stackoverflow.com/a/171011/159388 by Aaron Hinni
+
+module PLATFORM
+  def PLATFORM.os_name
     case RbConfig::CONFIG['host_os']
-    when /cygwin|mswin|mingw|bccwin|wince|emx/
-      'windows'
-    when /darwin/
-      'macos'
-    else
-      'linux'
+      when /cygwin|mswin|mingw|bccwin|wince|emx/
+        'windows'
+      when /darwin/
+        'macos'
+      else
+        'linux'
+    end
+  end
+
+  def PLATFORM.architecture
+    case RbConfig::CONFIG['host_cpu']
+      when /x86_64/
+        'x86_64'
+      else
+        'x86'
     end
   end
 end
diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py
index 0424cf3..2a28a87 100755
--- a/tools/run_tests/run_tests.py
+++ b/tools/run_tests/run_tests.py
@@ -307,10 +307,9 @@
 
   def test_specs(self):
     if self.platform == 'windows':
-      return [self.config.job_spec(['tools\\run_tests\\run_node.bat'], None)]
+      return [self.config.job_spec(['tools\\run_tests\\run_node.bat'])]
     else:
       return [self.config.job_spec(['tools/run_tests/run_node.sh', self.node_version],
-                                   None,
                                    environ=_FORCE_ENVIRON_FOR_WRAPPERS)]
 
   def pre_build_steps(self):
@@ -352,7 +351,7 @@
     _check_compiler(self.args.compiler, ['default'])
 
   def test_specs(self):
-    return [self.config.job_spec(['src/php/bin/run_tests.sh'], None,
+    return [self.config.job_spec(['src/php/bin/run_tests.sh'],
                                   environ=_FORCE_ENVIRON_FOR_WRAPPERS)]
 
   def pre_build_steps(self):
@@ -388,7 +387,7 @@
     _check_compiler(self.args.compiler, ['default'])
 
   def test_specs(self):
-    return [self.config.job_spec(['src/php/bin/run_tests.sh'], None,
+    return [self.config.job_spec(['src/php/bin/run_tests.sh'],
                                   environ=_FORCE_ENVIRON_FOR_WRAPPERS)]
 
   def pre_build_steps(self):
@@ -610,7 +609,6 @@
         for test in tests_by_assembly[assembly]:
           cmdline = runtime_cmd + [assembly_file, '--test=%s' % test] + nunit_args
           specs.append(self.config.job_spec(cmdline,
-                                            None,
                                             shortname='csharp.%s' % test,
                                             environ=_FORCE_ENVIRON_FOR_WRAPPERS))
       else:
@@ -628,7 +626,6 @@
         # to prevent problems with registering the profiler.
         run_exclusive = 1000000
         specs.append(self.config.job_spec(cmdline,
-                                          None,
                                           shortname='csharp.coverage.%s' % assembly,
                                           cpu_cost=run_exclusive,
                                           environ=_FORCE_ENVIRON_FOR_WRAPPERS))
@@ -687,7 +684,7 @@
   def test_specs(self):
     return [
         self.config.job_spec(['src/objective-c/tests/run_tests.sh'],
-                              timeout_seconds=None,
+                              timeout_seconds=60*60,
                               shortname='objc-tests',
                               environ=_FORCE_ENVIRON_FOR_WRAPPERS),
         self.config.job_spec(['src/objective-c/tests/build_example_test.sh'],
@@ -732,7 +729,7 @@
     import yaml
     with open('tools/run_tests/sanity/sanity_tests.yaml', 'r') as f:
       return [self.config.job_spec(cmd['script'].split(),
-                                   timeout_seconds=None, environ={'TEST': 'true'},
+                                   timeout_seconds=30*60, environ={'TEST': 'true'},
                                    cpu_cost=cmd.get('cpu_cost', 1))
               for cmd in yaml.load(f)]