blob: 7440e9d60c956b93b0e89666c91ef449007a708b [file] [log] [blame]
import faulthandler
import os
import platform
import random
import re
import signal
import sys
import sysconfig
import tempfile
import textwrap
import unittest
from test.libregrtest.runtest import (
findtests, runtest,
STDTESTS, NOTTESTS, PASSED, FAILED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED)
from test.libregrtest.refleak import warm_caches
from test.libregrtest.cmdline import _parse_args
from test import support
try:
import gc
except ImportError:
gc = None
# When tests are run from the Python build directory, it is best practice
# to keep the test files in a subfolder. This eases the cleanup of leftover
# files using the "make distclean" command.
if sysconfig.is_python_build():
TEMPDIR = os.path.join(sysconfig.get_config_var('srcdir'), 'build')
else:
TEMPDIR = tempfile.gettempdir()
TEMPDIR = os.path.abspath(TEMPDIR)
def setup_python():
# Display the Python traceback on fatal errors (e.g. segfault)
faulthandler.enable(all_threads=True)
# Display the Python traceback on SIGALRM or SIGUSR1 signal
signals = []
if hasattr(signal, 'SIGALRM'):
signals.append(signal.SIGALRM)
if hasattr(signal, 'SIGUSR1'):
signals.append(signal.SIGUSR1)
for signum in signals:
faulthandler.register(signum, chain=True)
replace_stdout()
support.record_original_stdout(sys.stdout)
# Some times __path__ and __file__ are not absolute (e.g. while running from
# Lib/) and, if we change the CWD to run the tests in a temporary dir, some
# imports might fail. This affects only the modules imported before os.chdir().
# These modules are searched first in sys.path[0] (so '' -- the CWD) and if
# they are found in the CWD their __file__ and __path__ will be relative (this
# happens before the chdir). All the modules imported after the chdir, are
# not found in the CWD, and since the other paths in sys.path[1:] are absolute
# (site.py absolutize them), the __file__ and __path__ will be absolute too.
# Therefore it is necessary to absolutize manually the __file__ and __path__ of
# the packages to prevent later imports to fail when the CWD is different.
for module in sys.modules.values():
if hasattr(module, '__path__'):
module.__path__ = [os.path.abspath(path)
for path in module.__path__]
if hasattr(module, '__file__'):
module.__file__ = os.path.abspath(module.__file__)
# MacOSX (a.k.a. Darwin) has a default stack size that is too small
# for deeply recursive regular expressions. We see this as crashes in
# the Python test suite when running test_re.py and test_sre.py. The
# fix is to set the stack limit to 2048.
# This approach may also be useful for other Unixy platforms that
# suffer from small default stack limits.
if sys.platform == 'darwin':
try:
import resource
except ImportError:
pass
else:
soft, hard = resource.getrlimit(resource.RLIMIT_STACK)
newsoft = min(hard, max(soft, 1024*2048))
resource.setrlimit(resource.RLIMIT_STACK, (newsoft, hard))
class Regrtest:
"""Execute a test suite.
This also parses command-line options and modifies its behavior
accordingly.
tests -- a list of strings containing test names (optional)
testdir -- the directory in which to look for tests (optional)
Users other than the Python test suite will certainly want to
specify testdir; if it's omitted, the directory containing the
Python test suite is searched for.
If the tests argument is omitted, the tests listed on the
command-line will be used. If that's empty, too, then all *.py
files beginning with test_ will be used.
The other default arguments (verbose, quiet, exclude,
single, randomize, findleaks, use_resources, trace, coverdir,
print_slow, and random_seed) allow programmers calling main()
directly to set the values that would normally be set by flags
on the command line.
"""
def __init__(self):
# Namespace of command line options
self.ns = None
# tests
self.tests = []
self.selected = []
# test results
self.good = []
self.bad = []
self.skipped = []
self.resource_denieds = []
self.environment_changed = []
self.interrupted = False
# used by --slow
self.test_times = []
# used by --coverage, trace.Trace instance
self.tracer = None
# used by --findleaks, store for gc.garbage
self.found_garbage = []
# used to display the progress bar "[ 3/100]"
self.test_count = ''
self.test_count_width = 1
# used by --single
self.next_single_test = None
self.next_single_filename = None
def accumulate_result(self, test, result):
ok, test_time = result
self.test_times.append((test_time, test))
if ok == PASSED:
self.good.append(test)
elif ok == FAILED:
self.bad.append(test)
elif ok == ENV_CHANGED:
self.environment_changed.append(test)
elif ok == SKIPPED:
self.skipped.append(test)
elif ok == RESOURCE_DENIED:
self.skipped.append(test)
self.resource_denieds.append(test)
def display_progress(self, test_index, test):
if self.ns.quiet:
return
fmt = "[{1:{0}}{2}/{3}] {4}" if self.bad else "[{1:{0}}{2}] {4}"
print(fmt.format(self.test_count_width, test_index,
self.test_count, len(self.bad), test),
flush=True)
def setup_regrtest(self):
if self.ns.huntrleaks:
# Avoid false positives due to various caches
# filling slowly with random data:
warm_caches()
if self.ns.memlimit is not None:
support.set_memlimit(self.ns.memlimit)
if self.ns.threshold is not None:
if gc is not None:
gc.set_threshold(self.ns.threshold)
else:
print('No GC available, ignore --threshold.')
if self.ns.nowindows:
import msvcrt
msvcrt.SetErrorMode(msvcrt.SEM_FAILCRITICALERRORS|
msvcrt.SEM_NOALIGNMENTFAULTEXCEPT|
msvcrt.SEM_NOGPFAULTERRORBOX|
msvcrt.SEM_NOOPENFILEERRORBOX)
try:
msvcrt.CrtSetReportMode
except AttributeError:
# release build
pass
else:
for m in [msvcrt.CRT_WARN, msvcrt.CRT_ERROR, msvcrt.CRT_ASSERT]:
msvcrt.CrtSetReportMode(m, msvcrt.CRTDBG_MODE_FILE)
msvcrt.CrtSetReportFile(m, msvcrt.CRTDBG_FILE_STDERR)
if self.ns.findleaks:
if gc is not None:
# Uncomment the line below to report garbage that is not
# freeable by reference counting alone. By default only
# garbage that is not collectable by the GC is reported.
pass
#gc.set_debug(gc.DEBUG_SAVEALL)
else:
print('No GC available, disabling --findleaks')
self.ns.findleaks = False
if self.ns.huntrleaks:
unittest.BaseTestSuite._cleanup = False
# Strip .py extensions.
removepy(self.ns.args)
if self.ns.trace:
import trace
self.tracer = trace.Trace(ignoredirs=[sys.base_prefix,
sys.base_exec_prefix,
tempfile.gettempdir()],
trace=False, count=True)
def find_tests(self, tests):
self.tests = tests
if self.ns.single:
self.next_single_filename = os.path.join(TEMPDIR, 'pynexttest')
try:
with open(self.next_single_filename, 'r') as fp:
next_test = fp.read().strip()
self.tests = [next_test]
except OSError:
pass
if self.ns.fromfile:
self.tests = []
with open(os.path.join(support.SAVEDCWD, self.ns.fromfile)) as fp:
count_pat = re.compile(r'\[\s*\d+/\s*\d+\]')
for line in fp:
line = count_pat.sub('', line)
guts = line.split() # assuming no test has whitespace in its name
if guts and not guts[0].startswith('#'):
self.tests.extend(guts)
removepy(self.tests)
stdtests = STDTESTS[:]
nottests = NOTTESTS.copy()
if self.ns.exclude:
for arg in self.ns.args:
if arg in stdtests:
stdtests.remove(arg)
nottests.add(arg)
self.ns.args = []
# For a partial run, we do not need to clutter the output.
if (self.ns.verbose
or self.ns.header
or not (self.ns.quiet or self.ns.single
or self.tests or self.ns.args)):
# Print basic platform information
print("==", platform.python_implementation(), *sys.version.split())
print("== ", platform.platform(aliased=True),
"%s-endian" % sys.byteorder)
print("== ", "hash algorithm:", sys.hash_info.algorithm,
"64bit" if sys.maxsize > 2**32 else "32bit")
print("== ", os.getcwd())
print("Testing with flags:", sys.flags)
# if testdir is set, then we are not running the python tests suite, so
# don't add default tests to be executed or skipped (pass empty values)
if self.ns.testdir:
alltests = findtests(self.ns.testdir, list(), set())
else:
alltests = findtests(self.ns.testdir, stdtests, nottests)
self.selected = self.tests or self.ns.args or alltests
if self.ns.single:
self.selected = self.selected[:1]
try:
pos = alltests.index(self.selected[0])
self.next_single_test = alltests[pos + 1]
except IndexError:
pass
# Remove all the self.selected tests that precede start if it's set.
if self.ns.start:
try:
del self.selected[:self.selected.index(self.ns.start)]
except ValueError:
print("Couldn't find starting test (%s), using all tests"
% self.ns.start)
if self.ns.randomize:
if self.ns.random_seed is None:
self.ns.random_seed = random.randrange(10000000)
random.seed(self.ns.random_seed)
print("Using random seed", self.ns.random_seed)
random.shuffle(self.selected)
def display_result(self):
if self.interrupted:
# print a newline after ^C
print()
print("Test suite interrupted by signal SIGINT.")
executed = set(self.good) | set(self.bad) | set(self.skipped)
omitted = set(self.selected) - executed
print(count(len(omitted), "test"), "omitted:")
printlist(omitted)
if self.good and not self.ns.quiet:
if (not self.bad
and not self.skipped
and not self.interrupted
and len(self.good) > 1):
print("All", end=' ')
print(count(len(self.good), "test"), "OK.")
if self.ns.print_slow:
self.test_times.sort(reverse=True)
print("10 slowest tests:")
for time, test in self.test_times[:10]:
print("%s: %.1fs" % (test, time))
if self.bad:
print(count(len(self.bad), "test"), "failed:")
printlist(self.bad)
if self.environment_changed:
print("{} altered the execution environment:".format(
count(len(self.environment_changed), "test")))
printlist(self.environment_changed)
if self.skipped and not self.ns.quiet:
print(count(len(self.skipped), "test"), "skipped:")
printlist(self.skipped)
if self.ns.verbose2 and self.bad:
print("Re-running failed tests in verbose mode")
for test in self.bad[:]:
print("Re-running test %r in verbose mode" % test, flush=True)
try:
self.ns.verbose = True
ok = runtest(test, True, self.ns.quiet, self.ns.huntrleaks,
timeout=self.ns.timeout)
except KeyboardInterrupt:
# print a newline separate from the ^C
print()
break
else:
if ok[0] in {PASSED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED}:
self.bad.remove(test)
else:
if self.bad:
print(count(len(self.bad), 'test'), "failed again:")
printlist(self.bad)
def run_test(self, test):
result = runtest(test,
self.ns.verbose,
self.ns.quiet,
self.ns.huntrleaks,
output_on_failure=self.ns.verbose3,
timeout=self.ns.timeout,
failfast=self.ns.failfast,
match_tests=self.ns.match_tests)
self.accumulate_result(test, result)
def run_tests_sequential(self):
save_modules = sys.modules.keys()
for test_index, test in enumerate(self.tests, 1):
self.display_progress(test_index, test)
if self.ns.trace:
# If we're tracing code coverage, then we don't exit with status
# if on a false return value from main.
cmd = 'self.run_test(test)'
self.tracer.runctx(cmd, globals=globals(), locals=vars())
else:
try:
self.run_test(test)
except KeyboardInterrupt:
self.interrupted = True
break
if self.ns.findleaks:
gc.collect()
if gc.garbage:
print("Warning: test created", len(gc.garbage), end=' ')
print("uncollectable object(s).")
# move the uncollectable objects somewhere so we don't see
# them again
self.found_garbage.extend(gc.garbage)
del gc.garbage[:]
# Unload the newly imported modules (best effort finalization)
for module in sys.modules.keys():
if module not in save_modules and module.startswith("test."):
support.unload(module)
def run_tests(self):
support.verbose = self.ns.verbose # Tell tests to be moderately quiet
support.use_resources = self.ns.use_resources
if self.ns.forever:
def test_forever(tests):
while True:
for test in tests:
yield test
if self.bad:
return
self.tests = test_forever(list(self.selected))
self.test_count = ''
self.test_count_width = 3
else:
self.tests = iter(self.selected)
self.test_count = '/{}'.format(len(self.selected))
self.test_count_width = len(self.test_count) - 1
if self.ns.use_mp:
from test.libregrtest.runtest_mp import run_tests_multiprocess
run_tests_multiprocess(self)
else:
self.run_tests_sequential()
def finalize(self):
if self.next_single_filename:
if self.next_single_test:
with open(self.next_single_filename, 'w') as fp:
fp.write(self.next_single_test + '\n')
else:
os.unlink(self.next_single_filename)
if self.ns.trace:
r = self.tracer.results()
r.write_results(show_missing=True, summary=True,
coverdir=self.ns.coverdir)
if self.ns.runleaks:
os.system("leaks %d" % os.getpid())
def main(self, tests=None, **kwargs):
self.ns = _parse_args(sys.argv[1:], **kwargs)
setup_python()
self.setup_regrtest()
if self.ns.wait:
input("Press any key to continue...")
if self.ns.slaveargs is not None:
from test.libregrtest.runtest_mp import run_tests_slave
run_tests_slave(self.ns.slaveargs)
self.find_tests(tests)
self.run_tests()
self.display_result()
self.finalize()
sys.exit(len(self.bad) > 0 or self.interrupted)
def replace_stdout():
"""Set stdout encoder error handler to backslashreplace (as stderr error
handler) to avoid UnicodeEncodeError when printing a traceback"""
import atexit
stdout = sys.stdout
sys.stdout = open(stdout.fileno(), 'w',
encoding=stdout.encoding,
errors="backslashreplace",
closefd=False,
newline='\n')
def restore_stdout():
sys.stdout.close()
sys.stdout = stdout
atexit.register(restore_stdout)
def removepy(names):
if not names:
return
for idx, name in enumerate(names):
basename, ext = os.path.splitext(name)
if ext == '.py':
names[idx] = basename
def count(n, word):
if n == 1:
return "%d %s" % (n, word)
else:
return "%d %ss" % (n, word)
def printlist(x, width=70, indent=4):
"""Print the elements of iterable x to stdout.
Optional arg width (default 70) is the maximum line length.
Optional arg indent (default 4) is the number of blanks with which to
begin each line.
"""
blanks = ' ' * indent
# Print the sorted list: 'x' may be a '--random' list or a set()
print(textwrap.fill(' '.join(str(elt) for elt in sorted(x)), width,
initial_indent=blanks, subsequent_indent=blanks))
def main(tests=None, **kwargs):
Regrtest().main(tests=tests, **kwargs)
def main_in_temp_cwd():
"""Run main() in a temporary working directory."""
if sysconfig.is_python_build():
try:
os.mkdir(TEMPDIR)
except FileExistsError:
pass
# Define a writable temp dir that will be used as cwd while running
# the tests. The name of the dir includes the pid to allow parallel
# testing (see the -j option).
test_cwd = 'test_python_{}'.format(os.getpid())
test_cwd = os.path.join(TEMPDIR, test_cwd)
# Run the tests in a context manager that temporarily changes the CWD to a
# temporary and writable directory. If it's not possible to create or
# change the CWD, the original CWD will be used. The original CWD is
# available from support.SAVEDCWD.
with support.temp_cwd(test_cwd, quiet=True):
main()