Merge pull request #4906 from jtattermusch/runtest_more_cleanup

More cleanup of run_tests.py
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index e6ddb1a..5b10600 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -76,7 +76,7 @@
 } pick_first_lb_policy;
 
 #define GET_SELECTED(p) \
-  ((grpc_connected_subchannel *)gpr_atm_no_barrier_load(&(p)->selected))
+  ((grpc_connected_subchannel *)gpr_atm_acq_load(&(p)->selected))
 
 void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
   pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
@@ -268,10 +268,10 @@
         selected =
             grpc_subchannel_get_connected_subchannel(selected_subchannel);
         GPR_ASSERT(selected != NULL);
-        gpr_atm_no_barrier_store(&p->selected, (gpr_atm)selected);
         GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked_first");
         /* drop the pick list: we are connected now */
         GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels");
+        gpr_atm_rel_store(&p->selected, (gpr_atm)selected);
         grpc_exec_ctx_enqueue(exec_ctx,
                               grpc_closure_create(destroy_subchannels, p), 1);
         /* update any calls that were waiting for a pick */
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index 2992da8..748eef9 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -1,6 +1,6 @@
 /*
  *
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
@@ -519,7 +519,12 @@
   }
 
   /* publish */
-  GPR_ASSERT(gpr_atm_no_barrier_cas(&c->connected_subchannel, 0, (gpr_atm)con));
+  /* TODO(ctiller): this full barrier seems to clear up a TSAN failure.
+                    I'd have expected the rel_cas below to be enough, but
+                    seemingly it's not.
+                    Re-evaluate if we really need this. */
+  gpr_atm_full_barrier();
+  GPR_ASSERT(gpr_atm_rel_cas(&c->connected_subchannel, 0, (gpr_atm)con));
   c->connecting = 0;
 
   /* setup subchannel watching connected subchannel for changes; subchannel ref
diff --git a/src/cpp/util/byte_buffer.cc b/src/cpp/util/byte_buffer.cc
index 2952f94..3a2318d 100644
--- a/src/cpp/util/byte_buffer.cc
+++ b/src/cpp/util/byte_buffer.cc
@@ -31,8 +31,8 @@
  *
  */
 
-#include <grpc/byte_buffer_reader.h>
 #include <grpc++/support/byte_buffer.h>
+#include <grpc/byte_buffer_reader.h>
 
 namespace grpc {
 
@@ -84,8 +84,10 @@
     : buffer_(grpc_byte_buffer_copy(buf.buffer_)) {}
 
 ByteBuffer& ByteBuffer::operator=(const ByteBuffer& buf) {
-  Clear();                                       // first remove existing data
-  buffer_ = grpc_byte_buffer_copy(buf.buffer_);  // then copy
+  Clear();  // first remove existing data
+  if (buf.buffer_) {
+    buffer_ = grpc_byte_buffer_copy(buf.buffer_);  // then copy
+  }
   return *this;
 }
 
diff --git a/src/proto/grpc/testing/control.proto b/src/proto/grpc/testing/control.proto
index 0784ebf..7ba6f98 100644
--- a/src/proto/grpc/testing/control.proto
+++ b/src/proto/grpc/testing/control.proto
@@ -1,4 +1,4 @@
-// Copyright 2015, Google Inc.
+// Copyright 2015-2016, Google Inc.
 // All rights reserved.
 //
 // Redistribution and use in source and binary forms, with or without
@@ -42,6 +42,7 @@
 enum ServerType {
   SYNC_SERVER = 0;
   ASYNC_SERVER = 1;
+  ASYNC_GENERIC_SERVER = 2;
 }
 
 enum RpcType {
diff --git a/src/proto/grpc/testing/echo_messages.proto b/src/proto/grpc/testing/echo_messages.proto
index f01d645..d05a355 100644
--- a/src/proto/grpc/testing/echo_messages.proto
+++ b/src/proto/grpc/testing/echo_messages.proto
@@ -1,5 +1,5 @@
 
-// Copyright 2015, Google Inc.
+// Copyright 2015-2016, Google Inc.
 // All rights reserved.
 //
 // Redistribution and use in source and binary forms, with or without
@@ -41,6 +41,7 @@
   int32 response_message_length = 6;
   bool echo_peer = 7;
   string expected_client_identity = 8; // will force check_auth_context.
+  bool skip_cancelled_check = 9;
 }
 
 message EchoRequest {
diff --git a/src/python/grpcio/grpc/framework/foundation/logging_pool.py b/src/python/grpcio/grpc/framework/foundation/logging_pool.py
index 7c7a6ee..f82c7f7 100644
--- a/src/python/grpcio/grpc/framework/foundation/logging_pool.py
+++ b/src/python/grpcio/grpc/framework/foundation/logging_pool.py
@@ -1,4 +1,4 @@
-# Copyright 2015, Google Inc.
+# Copyright 2015-2016, Google Inc.
 # All rights reserved.
 #
 # Redistribution and use in source and binary forms, with or without
@@ -29,7 +29,6 @@
 
 """A thread pool that logs exceptions raised by tasks executed within it."""
 
-import functools
 import logging
 
 from concurrent import futures
@@ -37,12 +36,12 @@
 
 def _wrap(behavior):
   """Wraps an arbitrary callable behavior in exception-logging."""
-  @functools.wraps(behavior)
   def _wrapping(*args, **kwargs):
     try:
       return behavior(*args, **kwargs)
     except Exception as e:
-      logging.exception('Unexpected exception from task run in logging pool!')
+      logging.exception(
+          'Unexpected exception from %s executed in logging pool!', behavior)
       raise
   return _wrapping
 
diff --git a/src/python/grpcio/tests/unit/framework/foundation/_logging_pool_test.py b/src/python/grpcio/tests/unit/framework/foundation/_logging_pool_test.py
index 452802d..0521e1c 100644
--- a/src/python/grpcio/tests/unit/framework/foundation/_logging_pool_test.py
+++ b/src/python/grpcio/tests/unit/framework/foundation/_logging_pool_test.py
@@ -1,4 +1,4 @@
-# Copyright 2015, Google Inc.
+# Copyright 2015-2016, Google Inc.
 # All rights reserved.
 #
 # Redistribution and use in source and binary forms, with or without
@@ -29,6 +29,7 @@
 
 """Tests for grpc.framework.foundation.logging_pool."""
 
+import threading
 import unittest
 
 from grpc.framework.foundation import logging_pool
@@ -36,6 +37,21 @@
 _POOL_SIZE = 16
 
 
+class _CallableObject(object):
+
+  def __init__(self):
+    self._lock = threading.Lock()
+    self._passed_values = []
+
+  def __call__(self, value):
+    with self._lock:
+      self._passed_values.append(value)
+
+  def passed_values(self):
+    with self._lock:
+      return tuple(self._passed_values)
+
+
 class LoggingPoolTest(unittest.TestCase):
 
   def testUpAndDown(self):
@@ -59,6 +75,14 @@
 
     self.assertIsNotNone(raised_exception)
 
+  def testCallableObjectExecuted(self):
+    callable_object = _CallableObject()
+    passed_object = object()
+    with logging_pool.pool(_POOL_SIZE) as pool:
+      future = pool.submit(callable_object, passed_object)
+    self.assertIsNone(future.result())
+    self.assertSequenceEqual((passed_object,), callable_object.passed_values())
+
 
 if __name__ == '__main__':
   unittest.main(verbosity=2)
diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py
index 3bcefa6..c8a3a1b 100644
--- a/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py
+++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py
@@ -1,4 +1,4 @@
-# Copyright 2015, Google Inc.
+# Copyright 2015-2016, Google Inc.
 # All rights reserved.
 #
 # Redistribution and use in source and binary forms, with or without
@@ -30,9 +30,12 @@
 """Test code for the Face layer of RPC Framework."""
 
 import abc
+import itertools
 import unittest
+from concurrent import futures
 
 # test_interfaces is referenced from specification in this module.
+from grpc.framework.foundation import logging_pool
 from grpc.framework.interfaces.face import face
 from tests.unit.framework.common import test_constants
 from tests.unit.framework.common import test_control
@@ -139,13 +142,50 @@
 
         test_messages.verify(second_request, second_response, self)
 
-  @unittest.skip('Parallel invocations impossible with blocking control flow!')
   def testParallelInvocations(self):
-    raise NotImplementedError()
+    pool = logging_pool.pool(test_constants.PARALLELISM)
+    for (group, method), test_messages_sequence in (
+        self._digest.unary_unary_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        requests = []
+        response_futures = []
+        for _ in range(test_constants.PARALLELISM):
+          request = test_messages.request()
+          response_future = pool.submit(
+              self._invoker.blocking(group, method), request,
+              test_constants.LONG_TIMEOUT)
+          requests.append(request)
+          response_futures.append(response_future)
 
-  @unittest.skip('Parallel invocations impossible with blocking control flow!')
+        responses = [
+            response_future.result() for response_future in response_futures]
+
+        for request, response in zip(requests, responses):
+          test_messages.verify(request, response, self)
+    pool.shutdown(wait=True)
+
   def testWaitingForSomeButNotAllParallelInvocations(self):
-    raise NotImplementedError()
+    pool = logging_pool.pool(test_constants.PARALLELISM)
+    for (group, method), test_messages_sequence in (
+        self._digest.unary_unary_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        requests = []
+        response_futures_to_indices = {}
+        for index in range(test_constants.PARALLELISM):
+          request = test_messages.request()
+          response_future = pool.submit(
+              self._invoker.blocking(group, method), request,
+              test_constants.LONG_TIMEOUT)
+          requests.append(request)
+          response_futures_to_indices[response_future] = index
+
+        some_completed_response_futures_iterator = itertools.islice(
+            futures.as_completed(response_futures_to_indices),
+            test_constants.PARALLELISM / 2)
+        for response_future in some_completed_response_futures_iterator:
+          index = response_futures_to_indices[response_future]
+          test_messages.verify(requests[index], response_future.result(), self)
+    pool.shutdown(wait=True)
 
   @unittest.skip('Cancellation impossible with blocking control flow!')
   def testCancelledUnaryRequestUnaryResponse(self):
diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
index fc8daa9..1d36a93 100644
--- a/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
+++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
@@ -31,8 +31,10 @@
 
 import abc
 import contextlib
+import itertools
 import threading
 import unittest
+from concurrent import futures
 
 # test_interfaces is referenced from specification in this module.
 from grpc.framework.foundation import logging_pool
@@ -219,6 +221,23 @@
 
         test_messages.verify(second_request, second_response, self)
 
+  def testParallelInvocations(self):
+    for (group, method), test_messages_sequence in (
+        self._digest.unary_unary_messages_sequences.iteritems()):
+      for test_messages in test_messages_sequence:
+        first_request = test_messages.request()
+        second_request = test_messages.request()
+
+        first_response_future = self._invoker.future(group, method)(
+            first_request, test_constants.LONG_TIMEOUT)
+        second_response_future = self._invoker.future(group, method)(
+            second_request, test_constants.LONG_TIMEOUT)
+        first_response = first_response_future.result()
+        second_response = second_response_future.result()
+
+        test_messages.verify(first_request, first_response, self)
+        test_messages.verify(second_request, second_response, self)
+
     for (group, method), test_messages_sequence in (
         self._digest.unary_unary_messages_sequences.iteritems()):
       for test_messages in test_messages_sequence:
@@ -237,26 +256,28 @@
         for request, response in zip(requests, responses):
           test_messages.verify(request, response, self)
 
-  def testParallelInvocations(self):
+  def testWaitingForSomeButNotAllParallelInvocations(self):
+    pool = logging_pool.pool(test_constants.PARALLELISM)
     for (group, method), test_messages_sequence in (
         self._digest.unary_unary_messages_sequences.iteritems()):
       for test_messages in test_messages_sequence:
-        first_request = test_messages.request()
-        second_request = test_messages.request()
+        requests = []
+        response_futures_to_indices = {}
+        for index in range(test_constants.PARALLELISM):
+          request = test_messages.request()
+          inner_response_future = self._invoker.future(group, method)(
+              request, test_constants.LONG_TIMEOUT)
+          outer_response_future = pool.submit(inner_response_future.result)
+          requests.append(request)
+          response_futures_to_indices[outer_response_future] = index
 
-        first_response_future = self._invoker.future(group, method)(
-            first_request, test_constants.LONG_TIMEOUT)
-        second_response_future = self._invoker.future(group, method)(
-            second_request, test_constants.LONG_TIMEOUT)
-        first_response = first_response_future.result()
-        second_response = second_response_future.result()
-
-        test_messages.verify(first_request, first_response, self)
-        test_messages.verify(second_request, second_response, self)
-
-  @unittest.skip('TODO(nathaniel): implement.')
-  def testWaitingForSomeButNotAllParallelInvocations(self):
-    raise NotImplementedError()
+        some_completed_response_futures_iterator = itertools.islice(
+            futures.as_completed(response_futures_to_indices),
+            test_constants.PARALLELISM / 2)
+        for response_future in some_completed_response_futures_iterator:
+          index = response_futures_to_indices[response_future]
+          test_messages.verify(requests[index], response_future.result(), self)
+    pool.shutdown(wait=True)
 
   def testCancelledUnaryRequestUnaryResponse(self):
     for (group, method), test_messages_sequence in (
diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_receiver.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_receiver.py
index 2e444ff..42a7f4e 100644
--- a/src/python/grpcio/tests/unit/framework/interfaces/face/_receiver.py
+++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_receiver.py
@@ -1,4 +1,4 @@
-# Copyright 2015, Google Inc.
+# Copyright 2015-2016, Google Inc.
 # All rights reserved.
 #
 # Redistribution and use in source and binary forms, with or without
@@ -76,7 +76,7 @@
   def unary_response(self):
     with self._condition:
       if self._abortion is not None:
-        raise AssertionError('Aborted with abortion "%s"!' % self._abortion)
+        raise AssertionError('Aborted: "{}"!'.format(self._abortion))
       elif len(self._responses) != 1:
         raise AssertionError(
             '%d responses received, not exactly one!', len(self._responses))
@@ -88,7 +88,7 @@
       if self._abortion is None:
         return list(self._responses)
       else:
-        raise AssertionError('Aborted with abortion "%s"!' % self._abortion)
+        raise AssertionError('Aborted: "{}"!'.format(self._abortion))
 
   def abortion(self):
     with self._condition:
diff --git a/test/core/util/port_posix.c b/test/core/util/port_posix.c
index 732a51c..11cefbf 100644
--- a/test/core/util/port_posix.c
+++ b/test/core/util/port_posix.c
@@ -1,6 +1,6 @@
 /*
  *
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
@@ -37,6 +37,7 @@
 
 #include "test/core/util/port.h"
 
+#include <math.h>
 #include <netinet/in.h>
 #include <sys/socket.h>
 #include <stdio.h>
@@ -229,10 +230,10 @@
     grpc_httpcli_request req;
     memset(&req, 0, sizeof(req));
     GPR_ASSERT(pr->retries < 10);
+    sleep(1 + (unsigned)(pow(1.3, pr->retries) * rand() / RAND_MAX));
     pr->retries++;
     req.host = pr->server;
     req.path = "/get";
-    sleep(1);
     grpc_httpcli_get(exec_ctx, pr->ctx, &pr->pollset, &req,
                      GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10), got_port_from_server,
                      pr);
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index f8027bc..5a414eb 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -244,7 +244,8 @@
           gpr_time_from_micros(request->param().server_cancel_after_us(),
                                GPR_TIMESPAN)));
       return Status::CANCELLED;
-    } else {
+    } else if (!request->has_param() ||
+               !request->param().skip_cancelled_check()) {
       EXPECT_FALSE(context->IsCancelled());
     }
 
@@ -823,6 +824,7 @@
   EchoRequest request;
   EchoResponse response;
   request.set_message("Hello");
+  request.mutable_param()->set_skip_cancelled_check(true);
 
   ClientContext context;
   std::chrono::system_clock::time_point deadline =
diff --git a/test/cpp/qps/generic_async_streaming_ping_pong_test.cc b/test/cpp/qps/generic_async_streaming_ping_pong_test.cc
index 2b2e1c8..81c0f24 100644
--- a/test/cpp/qps/generic_async_streaming_ping_pong_test.cc
+++ b/test/cpp/qps/generic_async_streaming_ping_pong_test.cc
@@ -60,7 +60,7 @@
   bbuf->set_req_size(0);
 
   ServerConfig server_config;
-  server_config.set_server_type(ASYNC_SERVER);
+  server_config.set_server_type(ASYNC_GENERIC_SERVER);
   server_config.set_host("localhost");
   server_config.set_async_server_threads(1);
 
diff --git a/test/cpp/qps/qps_driver.cc b/test/cpp/qps/qps_driver.cc
index e3e7cb2..680e4b1 100644
--- a/test/cpp/qps/qps_driver.cc
+++ b/test/cpp/qps/qps_driver.cc
@@ -170,7 +170,7 @@
   GPR_ASSERT(!client_config.payload_config().has_bytebuf_params() ||
              (client_config.client_type() == ASYNC_CLIENT &&
               client_config.rpc_type() == STREAMING &&
-              server_config.server_type() == ASYNC_SERVER));
+              server_config.server_type() == ASYNC_GENERIC_SERVER));
 
   const auto result = RunScenario(
       client_config, FLAGS_num_clients, server_config, FLAGS_num_servers,
diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc
index e782b2a..6316605 100644
--- a/test/cpp/qps/qps_worker.cc
+++ b/test/cpp/qps/qps_worker.cc
@@ -97,6 +97,8 @@
       return CreateSynchronousServer(config);
     case ServerType::ASYNC_SERVER:
       return CreateAsyncServer(config);
+    case ServerType::ASYNC_GENERIC_SERVER:
+      return CreateAsyncGenericServer(config);
     default:
       abort();
   }
diff --git a/test/cpp/qps/server.h b/test/cpp/qps/server.h
index 32a3e85..196fdac 100644
--- a/test/cpp/qps/server.h
+++ b/test/cpp/qps/server.h
@@ -108,6 +108,7 @@
 
 std::unique_ptr<Server> CreateSynchronousServer(const ServerConfig& config);
 std::unique_ptr<Server> CreateAsyncServer(const ServerConfig& config);
+std::unique_ptr<Server> CreateAsyncGenericServer(const ServerConfig& config);
 
 }  // namespace testing
 }  // namespace grpc
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index d530dac..cd8d546 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -373,7 +373,7 @@
                                 const ByteBuffer *request,
                                 ByteBuffer *response) {
   int resp_size = payload_config.bytebuf_params().resp_size();
-  std::unique_ptr<char> buf(new char[resp_size]);
+  std::unique_ptr<char[]> buf(new char[resp_size]);
   gpr_slice s = gpr_slice_from_copied_buffer(buf.get(), resp_size);
   Slice slice(s, Slice::STEAL_REF);
   *response = ByteBuffer(&slice, 1);
diff --git a/tools/run_tests/run_node.sh b/tools/run_tests/run_node.sh
index fff579f..f93c9c3 100755
--- a/tools/run_tests/run_node.sh
+++ b/tools/run_tests/run_node.sh
@@ -1,5 +1,5 @@
 #!/bin/bash
-# Copyright 2015, Google Inc.
+# Copyright 2015-2016, Google Inc.
 # All rights reserved.
 #
 # Redistribution and use in source and binary forms, with or without
@@ -51,5 +51,5 @@
   echo '<html><head><meta http-equiv="refresh" content="0;URL=lcov-report/index.html"></head></html>' > \
     ../reports/node_coverage/index.html
 else
-  JUNIT_REPORT_PATH=src/node/reports.xml JUNIT_REPORT_STACK=1 ./node_modules/.bin/mocha --reporter mocha-jenkins-reporter src/node/test || true
+  JUNIT_REPORT_PATH=src/node/reports.xml JUNIT_REPORT_STACK=1 ./node_modules/.bin/mocha --reporter mocha-jenkins-reporter src/node/test
 fi
diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py
index 885a79c..fa4a37a 100755
--- a/tools/run_tests/run_tests.py
+++ b/tools/run_tests/run_tests.py
@@ -151,9 +151,9 @@
   def make_targets(self, test_regex):
     if platform_string() != 'windows' and test_regex != '.*':
       # use the regex to minimize the number of things to build
-      return [target['name']
+      return [os.path.basename(target['name'])
               for target in get_c_tests(False, self.test_lang)
-              if re.search(test_regex, target['name'])]
+              if re.search(test_regex, '/' + target['name'])]
     if platform_string() == 'windows':
       # don't build tools on windows just yet
       return ['buildtests_%s' % self.make_target]
diff --git a/tools/tsan_suppressions.txt b/tools/tsan_suppressions.txt
index 65e7e2e..09e68cd 100644
--- a/tools/tsan_suppressions.txt
+++ b/tools/tsan_suppressions.txt
@@ -5,4 +5,4 @@
 # https://www.mail-archive.com/openssl-dev@openssl.org/msg09019.html
 race:ssleay_rand_add
 race:ssleay_rand_bytes
-
+race:__sleep_for