Merge pull request #4906 from jtattermusch/runtest_more_cleanup
More cleanup of run_tests.py
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index e6ddb1a..5b10600 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -76,7 +76,7 @@
} pick_first_lb_policy;
#define GET_SELECTED(p) \
- ((grpc_connected_subchannel *)gpr_atm_no_barrier_load(&(p)->selected))
+ ((grpc_connected_subchannel *)gpr_atm_acq_load(&(p)->selected))
void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
@@ -268,10 +268,10 @@
selected =
grpc_subchannel_get_connected_subchannel(selected_subchannel);
GPR_ASSERT(selected != NULL);
- gpr_atm_no_barrier_store(&p->selected, (gpr_atm)selected);
GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked_first");
/* drop the pick list: we are connected now */
GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels");
+ gpr_atm_rel_store(&p->selected, (gpr_atm)selected);
grpc_exec_ctx_enqueue(exec_ctx,
grpc_closure_create(destroy_subchannels, p), 1);
/* update any calls that were waiting for a pick */
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index 2992da8..748eef9 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -519,7 +519,12 @@
}
/* publish */
- GPR_ASSERT(gpr_atm_no_barrier_cas(&c->connected_subchannel, 0, (gpr_atm)con));
+ /* TODO(ctiller): this full barrier seems to clear up a TSAN failure.
+ I'd have expected the rel_cas below to be enough, but
+ seemingly it's not.
+ Re-evaluate if we really need this. */
+ gpr_atm_full_barrier();
+ GPR_ASSERT(gpr_atm_rel_cas(&c->connected_subchannel, 0, (gpr_atm)con));
c->connecting = 0;
/* setup subchannel watching connected subchannel for changes; subchannel ref
diff --git a/src/cpp/util/byte_buffer.cc b/src/cpp/util/byte_buffer.cc
index 2952f94..3a2318d 100644
--- a/src/cpp/util/byte_buffer.cc
+++ b/src/cpp/util/byte_buffer.cc
@@ -31,8 +31,8 @@
*
*/
-#include <grpc/byte_buffer_reader.h>
#include <grpc++/support/byte_buffer.h>
+#include <grpc/byte_buffer_reader.h>
namespace grpc {
@@ -84,8 +84,10 @@
: buffer_(grpc_byte_buffer_copy(buf.buffer_)) {}
ByteBuffer& ByteBuffer::operator=(const ByteBuffer& buf) {
- Clear(); // first remove existing data
- buffer_ = grpc_byte_buffer_copy(buf.buffer_); // then copy
+ Clear(); // first remove existing data
+ if (buf.buffer_) {
+ buffer_ = grpc_byte_buffer_copy(buf.buffer_); // then copy
+ }
return *this;
}
diff --git a/src/proto/grpc/testing/control.proto b/src/proto/grpc/testing/control.proto
index 0784ebf..7ba6f98 100644
--- a/src/proto/grpc/testing/control.proto
+++ b/src/proto/grpc/testing/control.proto
@@ -1,4 +1,4 @@
-// Copyright 2015, Google Inc.
+// Copyright 2015-2016, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
@@ -42,6 +42,7 @@
enum ServerType {
SYNC_SERVER = 0;
ASYNC_SERVER = 1;
+ ASYNC_GENERIC_SERVER = 2;
}
enum RpcType {
diff --git a/src/proto/grpc/testing/echo_messages.proto b/src/proto/grpc/testing/echo_messages.proto
index f01d645..d05a355 100644
--- a/src/proto/grpc/testing/echo_messages.proto
+++ b/src/proto/grpc/testing/echo_messages.proto
@@ -1,5 +1,5 @@
-// Copyright 2015, Google Inc.
+// Copyright 2015-2016, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
@@ -41,6 +41,7 @@
int32 response_message_length = 6;
bool echo_peer = 7;
string expected_client_identity = 8; // will force check_auth_context.
+ bool skip_cancelled_check = 9;
}
message EchoRequest {
diff --git a/src/python/grpcio/grpc/framework/foundation/logging_pool.py b/src/python/grpcio/grpc/framework/foundation/logging_pool.py
index 7c7a6ee..f82c7f7 100644
--- a/src/python/grpcio/grpc/framework/foundation/logging_pool.py
+++ b/src/python/grpcio/grpc/framework/foundation/logging_pool.py
@@ -1,4 +1,4 @@
-# Copyright 2015, Google Inc.
+# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@@ -29,7 +29,6 @@
"""A thread pool that logs exceptions raised by tasks executed within it."""
-import functools
import logging
from concurrent import futures
@@ -37,12 +36,12 @@
def _wrap(behavior):
"""Wraps an arbitrary callable behavior in exception-logging."""
- @functools.wraps(behavior)
def _wrapping(*args, **kwargs):
try:
return behavior(*args, **kwargs)
except Exception as e:
- logging.exception('Unexpected exception from task run in logging pool!')
+ logging.exception(
+ 'Unexpected exception from %s executed in logging pool!', behavior)
raise
return _wrapping
diff --git a/src/python/grpcio/tests/unit/framework/foundation/_logging_pool_test.py b/src/python/grpcio/tests/unit/framework/foundation/_logging_pool_test.py
index 452802d..0521e1c 100644
--- a/src/python/grpcio/tests/unit/framework/foundation/_logging_pool_test.py
+++ b/src/python/grpcio/tests/unit/framework/foundation/_logging_pool_test.py
@@ -1,4 +1,4 @@
-# Copyright 2015, Google Inc.
+# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@@ -29,6 +29,7 @@
"""Tests for grpc.framework.foundation.logging_pool."""
+import threading
import unittest
from grpc.framework.foundation import logging_pool
@@ -36,6 +37,21 @@
_POOL_SIZE = 16
+class _CallableObject(object):
+
+ def __init__(self):
+ self._lock = threading.Lock()
+ self._passed_values = []
+
+ def __call__(self, value):
+ with self._lock:
+ self._passed_values.append(value)
+
+ def passed_values(self):
+ with self._lock:
+ return tuple(self._passed_values)
+
+
class LoggingPoolTest(unittest.TestCase):
def testUpAndDown(self):
@@ -59,6 +75,14 @@
self.assertIsNotNone(raised_exception)
+ def testCallableObjectExecuted(self):
+ callable_object = _CallableObject()
+ passed_object = object()
+ with logging_pool.pool(_POOL_SIZE) as pool:
+ future = pool.submit(callable_object, passed_object)
+ self.assertIsNone(future.result())
+ self.assertSequenceEqual((passed_object,), callable_object.passed_values())
+
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py
index 3bcefa6..c8a3a1b 100644
--- a/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py
+++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py
@@ -1,4 +1,4 @@
-# Copyright 2015, Google Inc.
+# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@@ -30,9 +30,12 @@
"""Test code for the Face layer of RPC Framework."""
import abc
+import itertools
import unittest
+from concurrent import futures
# test_interfaces is referenced from specification in this module.
+from grpc.framework.foundation import logging_pool
from grpc.framework.interfaces.face import face
from tests.unit.framework.common import test_constants
from tests.unit.framework.common import test_control
@@ -139,13 +142,50 @@
test_messages.verify(second_request, second_response, self)
- @unittest.skip('Parallel invocations impossible with blocking control flow!')
def testParallelInvocations(self):
- raise NotImplementedError()
+ pool = logging_pool.pool(test_constants.PARALLELISM)
+ for (group, method), test_messages_sequence in (
+ self._digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = []
+ response_futures = []
+ for _ in range(test_constants.PARALLELISM):
+ request = test_messages.request()
+ response_future = pool.submit(
+ self._invoker.blocking(group, method), request,
+ test_constants.LONG_TIMEOUT)
+ requests.append(request)
+ response_futures.append(response_future)
- @unittest.skip('Parallel invocations impossible with blocking control flow!')
+ responses = [
+ response_future.result() for response_future in response_futures]
+
+ for request, response in zip(requests, responses):
+ test_messages.verify(request, response, self)
+ pool.shutdown(wait=True)
+
def testWaitingForSomeButNotAllParallelInvocations(self):
- raise NotImplementedError()
+ pool = logging_pool.pool(test_constants.PARALLELISM)
+ for (group, method), test_messages_sequence in (
+ self._digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = []
+ response_futures_to_indices = {}
+ for index in range(test_constants.PARALLELISM):
+ request = test_messages.request()
+ response_future = pool.submit(
+ self._invoker.blocking(group, method), request,
+ test_constants.LONG_TIMEOUT)
+ requests.append(request)
+ response_futures_to_indices[response_future] = index
+
+ some_completed_response_futures_iterator = itertools.islice(
+ futures.as_completed(response_futures_to_indices),
+ test_constants.PARALLELISM / 2)
+ for response_future in some_completed_response_futures_iterator:
+ index = response_futures_to_indices[response_future]
+ test_messages.verify(requests[index], response_future.result(), self)
+ pool.shutdown(wait=True)
@unittest.skip('Cancellation impossible with blocking control flow!')
def testCancelledUnaryRequestUnaryResponse(self):
diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
index fc8daa9..1d36a93 100644
--- a/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
+++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
@@ -31,8 +31,10 @@
import abc
import contextlib
+import itertools
import threading
import unittest
+from concurrent import futures
# test_interfaces is referenced from specification in this module.
from grpc.framework.foundation import logging_pool
@@ -219,6 +221,23 @@
test_messages.verify(second_request, second_response, self)
+ def testParallelInvocations(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ first_request = test_messages.request()
+ second_request = test_messages.request()
+
+ first_response_future = self._invoker.future(group, method)(
+ first_request, test_constants.LONG_TIMEOUT)
+ second_response_future = self._invoker.future(group, method)(
+ second_request, test_constants.LONG_TIMEOUT)
+ first_response = first_response_future.result()
+ second_response = second_response_future.result()
+
+ test_messages.verify(first_request, first_response, self)
+ test_messages.verify(second_request, second_response, self)
+
for (group, method), test_messages_sequence in (
self._digest.unary_unary_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
@@ -237,26 +256,28 @@
for request, response in zip(requests, responses):
test_messages.verify(request, response, self)
- def testParallelInvocations(self):
+ def testWaitingForSomeButNotAllParallelInvocations(self):
+ pool = logging_pool.pool(test_constants.PARALLELISM)
for (group, method), test_messages_sequence in (
self._digest.unary_unary_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
- first_request = test_messages.request()
- second_request = test_messages.request()
+ requests = []
+ response_futures_to_indices = {}
+ for index in range(test_constants.PARALLELISM):
+ request = test_messages.request()
+ inner_response_future = self._invoker.future(group, method)(
+ request, test_constants.LONG_TIMEOUT)
+ outer_response_future = pool.submit(inner_response_future.result)
+ requests.append(request)
+ response_futures_to_indices[outer_response_future] = index
- first_response_future = self._invoker.future(group, method)(
- first_request, test_constants.LONG_TIMEOUT)
- second_response_future = self._invoker.future(group, method)(
- second_request, test_constants.LONG_TIMEOUT)
- first_response = first_response_future.result()
- second_response = second_response_future.result()
-
- test_messages.verify(first_request, first_response, self)
- test_messages.verify(second_request, second_response, self)
-
- @unittest.skip('TODO(nathaniel): implement.')
- def testWaitingForSomeButNotAllParallelInvocations(self):
- raise NotImplementedError()
+ some_completed_response_futures_iterator = itertools.islice(
+ futures.as_completed(response_futures_to_indices),
+ test_constants.PARALLELISM / 2)
+ for response_future in some_completed_response_futures_iterator:
+ index = response_futures_to_indices[response_future]
+ test_messages.verify(requests[index], response_future.result(), self)
+ pool.shutdown(wait=True)
def testCancelledUnaryRequestUnaryResponse(self):
for (group, method), test_messages_sequence in (
diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_receiver.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_receiver.py
index 2e444ff..42a7f4e 100644
--- a/src/python/grpcio/tests/unit/framework/interfaces/face/_receiver.py
+++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_receiver.py
@@ -1,4 +1,4 @@
-# Copyright 2015, Google Inc.
+# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@@ -76,7 +76,7 @@
def unary_response(self):
with self._condition:
if self._abortion is not None:
- raise AssertionError('Aborted with abortion "%s"!' % self._abortion)
+ raise AssertionError('Aborted: "{}"!'.format(self._abortion))
elif len(self._responses) != 1:
raise AssertionError(
'%d responses received, not exactly one!', len(self._responses))
@@ -88,7 +88,7 @@
if self._abortion is None:
return list(self._responses)
else:
- raise AssertionError('Aborted with abortion "%s"!' % self._abortion)
+ raise AssertionError('Aborted: "{}"!'.format(self._abortion))
def abortion(self):
with self._condition:
diff --git a/test/core/util/port_posix.c b/test/core/util/port_posix.c
index 732a51c..11cefbf 100644
--- a/test/core/util/port_posix.c
+++ b/test/core/util/port_posix.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -37,6 +37,7 @@
#include "test/core/util/port.h"
+#include <math.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <stdio.h>
@@ -229,10 +230,10 @@
grpc_httpcli_request req;
memset(&req, 0, sizeof(req));
GPR_ASSERT(pr->retries < 10);
+ sleep(1 + (unsigned)(pow(1.3, pr->retries) * rand() / RAND_MAX));
pr->retries++;
req.host = pr->server;
req.path = "/get";
- sleep(1);
grpc_httpcli_get(exec_ctx, pr->ctx, &pr->pollset, &req,
GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10), got_port_from_server,
pr);
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index f8027bc..5a414eb 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -244,7 +244,8 @@
gpr_time_from_micros(request->param().server_cancel_after_us(),
GPR_TIMESPAN)));
return Status::CANCELLED;
- } else {
+ } else if (!request->has_param() ||
+ !request->param().skip_cancelled_check()) {
EXPECT_FALSE(context->IsCancelled());
}
@@ -823,6 +824,7 @@
EchoRequest request;
EchoResponse response;
request.set_message("Hello");
+ request.mutable_param()->set_skip_cancelled_check(true);
ClientContext context;
std::chrono::system_clock::time_point deadline =
diff --git a/test/cpp/qps/generic_async_streaming_ping_pong_test.cc b/test/cpp/qps/generic_async_streaming_ping_pong_test.cc
index 2b2e1c8..81c0f24 100644
--- a/test/cpp/qps/generic_async_streaming_ping_pong_test.cc
+++ b/test/cpp/qps/generic_async_streaming_ping_pong_test.cc
@@ -60,7 +60,7 @@
bbuf->set_req_size(0);
ServerConfig server_config;
- server_config.set_server_type(ASYNC_SERVER);
+ server_config.set_server_type(ASYNC_GENERIC_SERVER);
server_config.set_host("localhost");
server_config.set_async_server_threads(1);
diff --git a/test/cpp/qps/qps_driver.cc b/test/cpp/qps/qps_driver.cc
index e3e7cb2..680e4b1 100644
--- a/test/cpp/qps/qps_driver.cc
+++ b/test/cpp/qps/qps_driver.cc
@@ -170,7 +170,7 @@
GPR_ASSERT(!client_config.payload_config().has_bytebuf_params() ||
(client_config.client_type() == ASYNC_CLIENT &&
client_config.rpc_type() == STREAMING &&
- server_config.server_type() == ASYNC_SERVER));
+ server_config.server_type() == ASYNC_GENERIC_SERVER));
const auto result = RunScenario(
client_config, FLAGS_num_clients, server_config, FLAGS_num_servers,
diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc
index e782b2a..6316605 100644
--- a/test/cpp/qps/qps_worker.cc
+++ b/test/cpp/qps/qps_worker.cc
@@ -97,6 +97,8 @@
return CreateSynchronousServer(config);
case ServerType::ASYNC_SERVER:
return CreateAsyncServer(config);
+ case ServerType::ASYNC_GENERIC_SERVER:
+ return CreateAsyncGenericServer(config);
default:
abort();
}
diff --git a/test/cpp/qps/server.h b/test/cpp/qps/server.h
index 32a3e85..196fdac 100644
--- a/test/cpp/qps/server.h
+++ b/test/cpp/qps/server.h
@@ -108,6 +108,7 @@
std::unique_ptr<Server> CreateSynchronousServer(const ServerConfig& config);
std::unique_ptr<Server> CreateAsyncServer(const ServerConfig& config);
+std::unique_ptr<Server> CreateAsyncGenericServer(const ServerConfig& config);
} // namespace testing
} // namespace grpc
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index d530dac..cd8d546 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -373,7 +373,7 @@
const ByteBuffer *request,
ByteBuffer *response) {
int resp_size = payload_config.bytebuf_params().resp_size();
- std::unique_ptr<char> buf(new char[resp_size]);
+ std::unique_ptr<char[]> buf(new char[resp_size]);
gpr_slice s = gpr_slice_from_copied_buffer(buf.get(), resp_size);
Slice slice(s, Slice::STEAL_REF);
*response = ByteBuffer(&slice, 1);
diff --git a/tools/run_tests/run_node.sh b/tools/run_tests/run_node.sh
index fff579f..f93c9c3 100755
--- a/tools/run_tests/run_node.sh
+++ b/tools/run_tests/run_node.sh
@@ -1,5 +1,5 @@
#!/bin/bash
-# Copyright 2015, Google Inc.
+# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@@ -51,5 +51,5 @@
echo '<html><head><meta http-equiv="refresh" content="0;URL=lcov-report/index.html"></head></html>' > \
../reports/node_coverage/index.html
else
- JUNIT_REPORT_PATH=src/node/reports.xml JUNIT_REPORT_STACK=1 ./node_modules/.bin/mocha --reporter mocha-jenkins-reporter src/node/test || true
+ JUNIT_REPORT_PATH=src/node/reports.xml JUNIT_REPORT_STACK=1 ./node_modules/.bin/mocha --reporter mocha-jenkins-reporter src/node/test
fi
diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py
index 885a79c..fa4a37a 100755
--- a/tools/run_tests/run_tests.py
+++ b/tools/run_tests/run_tests.py
@@ -151,9 +151,9 @@
def make_targets(self, test_regex):
if platform_string() != 'windows' and test_regex != '.*':
# use the regex to minimize the number of things to build
- return [target['name']
+ return [os.path.basename(target['name'])
for target in get_c_tests(False, self.test_lang)
- if re.search(test_regex, target['name'])]
+ if re.search(test_regex, '/' + target['name'])]
if platform_string() == 'windows':
# don't build tools on windows just yet
return ['buildtests_%s' % self.make_target]
diff --git a/tools/tsan_suppressions.txt b/tools/tsan_suppressions.txt
index 65e7e2e..09e68cd 100644
--- a/tools/tsan_suppressions.txt
+++ b/tools/tsan_suppressions.txt
@@ -5,4 +5,4 @@
# https://www.mail-archive.com/openssl-dev@openssl.org/msg09019.html
race:ssleay_rand_add
race:ssleay_rand_bytes
-
+race:__sleep_for