| import collections |
| import faulthandler |
| import functools |
| import gc |
| import importlib |
| import io |
| import os |
| import sys |
| import time |
| import traceback |
| import unittest |
| |
| from test import support |
| from test.libregrtest.refleak import dash_R, clear_caches |
| from test.libregrtest.save_env import saved_test_environment |
| from test.libregrtest.utils import print_warning |
| |
| |
| # Test result constants. |
| PASSED = 1 |
| FAILED = 0 |
| ENV_CHANGED = -1 |
| SKIPPED = -2 |
| RESOURCE_DENIED = -3 |
| INTERRUPTED = -4 |
| CHILD_ERROR = -5 # error in a child process |
| TEST_DID_NOT_RUN = -6 |
| |
| _FORMAT_TEST_RESULT = { |
| PASSED: '%s passed', |
| FAILED: '%s failed', |
| ENV_CHANGED: '%s failed (env changed)', |
| SKIPPED: '%s skipped', |
| RESOURCE_DENIED: '%s skipped (resource denied)', |
| INTERRUPTED: '%s interrupted', |
| CHILD_ERROR: '%s crashed', |
| TEST_DID_NOT_RUN: '%s run no tests', |
| } |
| |
| # Minimum duration of a test to display its duration or to mention that |
| # the test is running in background |
| PROGRESS_MIN_TIME = 30.0 # seconds |
| |
| # small set of tests to determine if we have a basically functioning interpreter |
| # (i.e. if any of these fail, then anything else is likely to follow) |
| STDTESTS = [ |
| 'test_grammar', |
| 'test_opcodes', |
| 'test_dict', |
| 'test_builtin', |
| 'test_exceptions', |
| 'test_types', |
| 'test_unittest', |
| 'test_doctest', |
| 'test_doctest2', |
| 'test_support' |
| ] |
| |
| # set of tests that we don't want to be executed when using regrtest |
| NOTTESTS = set() |
| |
| |
| # used by --findleaks, store for gc.garbage |
| FOUND_GARBAGE = [] |
| |
| |
| def is_failed(result, ns): |
| ok = result.result |
| if ok in (PASSED, RESOURCE_DENIED, SKIPPED, TEST_DID_NOT_RUN): |
| return False |
| if ok == ENV_CHANGED: |
| return ns.fail_env_changed |
| return True |
| |
| |
| def format_test_result(result): |
| fmt = _FORMAT_TEST_RESULT.get(result.result, "%s") |
| return fmt % result.test_name |
| |
| |
| def findtestdir(path=None): |
| return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir |
| |
| |
| def findtests(testdir=None, stdtests=STDTESTS, nottests=NOTTESTS): |
| """Return a list of all applicable test modules.""" |
| testdir = findtestdir(testdir) |
| names = os.listdir(testdir) |
| tests = [] |
| others = set(stdtests) | nottests |
| for name in names: |
| mod, ext = os.path.splitext(name) |
| if mod[:5] == "test_" and ext in (".py", "") and mod not in others: |
| tests.append(mod) |
| return stdtests + sorted(tests) |
| |
| |
| def get_abs_module(ns, test_name): |
| if test_name.startswith('test.') or ns.testdir: |
| return test_name |
| else: |
| # Import it from the test package |
| return 'test.' + test_name |
| |
| |
| TestResult = collections.namedtuple('TestResult', |
| 'test_name result test_time xml_data') |
| |
| def _runtest(ns, test_name): |
| # Handle faulthandler timeout, capture stdout+stderr, XML serialization |
| # and measure time. |
| |
| output_on_failure = ns.verbose3 |
| |
| use_timeout = (ns.timeout is not None) |
| if use_timeout: |
| faulthandler.dump_traceback_later(ns.timeout, exit=True) |
| |
| start_time = time.perf_counter() |
| try: |
| support.set_match_tests(ns.match_tests) |
| support.junit_xml_list = xml_list = [] if ns.xmlpath else None |
| if ns.failfast: |
| support.failfast = True |
| |
| if output_on_failure: |
| support.verbose = True |
| |
| stream = io.StringIO() |
| orig_stdout = sys.stdout |
| orig_stderr = sys.stderr |
| try: |
| sys.stdout = stream |
| sys.stderr = stream |
| result = _runtest_inner(ns, test_name, |
| display_failure=False) |
| if result != PASSED: |
| output = stream.getvalue() |
| orig_stderr.write(output) |
| orig_stderr.flush() |
| finally: |
| sys.stdout = orig_stdout |
| sys.stderr = orig_stderr |
| else: |
| # Tell tests to be moderately quiet |
| support.verbose = ns.verbose |
| |
| result = _runtest_inner(ns, test_name, |
| display_failure=not ns.verbose) |
| |
| if xml_list: |
| import xml.etree.ElementTree as ET |
| xml_data = [ET.tostring(x).decode('us-ascii') for x in xml_list] |
| else: |
| xml_data = None |
| |
| test_time = time.perf_counter() - start_time |
| |
| return TestResult(test_name, result, test_time, xml_data) |
| finally: |
| if use_timeout: |
| faulthandler.cancel_dump_traceback_later() |
| support.junit_xml_list = None |
| |
| |
| def runtest(ns, test_name): |
| """Run a single test. |
| |
| ns -- regrtest namespace of options |
| test_name -- the name of the test |
| |
| Returns the tuple (result, test_time, xml_data), where result is one |
| of the constants: |
| |
| INTERRUPTED KeyboardInterrupt |
| RESOURCE_DENIED test skipped because resource denied |
| SKIPPED test skipped for some other reason |
| ENV_CHANGED test failed because it changed the execution environment |
| FAILED test failed |
| PASSED test passed |
| EMPTY_TEST_SUITE test ran no subtests. |
| |
| If ns.xmlpath is not None, xml_data is a list containing each |
| generated testsuite element. |
| """ |
| try: |
| return _runtest(ns, test_name) |
| except: |
| if not ns.pgo: |
| msg = traceback.format_exc() |
| print(f"test {test_name} crashed -- {msg}", |
| file=sys.stderr, flush=True) |
| return TestResult(test_name, FAILED, 0.0, None) |
| |
| |
| def _test_module(the_module): |
| loader = unittest.TestLoader() |
| tests = loader.loadTestsFromModule(the_module) |
| for error in loader.errors: |
| print(error, file=sys.stderr) |
| if loader.errors: |
| raise Exception("errors while loading tests") |
| support.run_unittest(tests) |
| |
| |
| def _runtest_inner2(ns, test_name): |
| # Load the test function, run the test function, handle huntrleaks |
| # and findleaks to detect leaks |
| |
| abstest = get_abs_module(ns, test_name) |
| |
| # remove the module from sys.module to reload it if it was already imported |
| support.unload(abstest) |
| |
| the_module = importlib.import_module(abstest) |
| |
| # If the test has a test_main, that will run the appropriate |
| # tests. If not, use normal unittest test loading. |
| test_runner = getattr(the_module, "test_main", None) |
| if test_runner is None: |
| test_runner = functools.partial(_test_module, the_module) |
| |
| try: |
| if ns.huntrleaks: |
| # Return True if the test leaked references |
| refleak = dash_R(ns, test_name, test_runner) |
| else: |
| test_runner() |
| refleak = False |
| finally: |
| cleanup_test_droppings(test_name, ns.verbose) |
| |
| support.gc_collect() |
| |
| if gc.garbage: |
| support.environment_altered = True |
| print_warning(f"{test_name} created {len(gc.garbage)} " |
| f"uncollectable object(s).") |
| |
| # move the uncollectable objects somewhere, |
| # so we don't see them again |
| FOUND_GARBAGE.extend(gc.garbage) |
| gc.garbage.clear() |
| |
| support.reap_children() |
| |
| return refleak |
| |
| |
| def _runtest_inner(ns, test_name, display_failure=True): |
| # Detect environment changes, handle exceptions. |
| |
| # Reset the environment_altered flag to detect if a test altered |
| # the environment |
| support.environment_altered = False |
| |
| if ns.pgo: |
| display_failure = False |
| |
| try: |
| clear_caches() |
| |
| with saved_test_environment(test_name, ns.verbose, ns.quiet, pgo=ns.pgo) as environment: |
| refleak = _runtest_inner2(ns, test_name) |
| except support.ResourceDenied as msg: |
| if not ns.quiet and not ns.pgo: |
| print(f"{test_name} skipped -- {msg}", flush=True) |
| return RESOURCE_DENIED |
| except unittest.SkipTest as msg: |
| if not ns.quiet and not ns.pgo: |
| print(f"{test_name} skipped -- {msg}", flush=True) |
| return SKIPPED |
| except support.TestFailed as exc: |
| msg = f"test {test_name} failed" |
| if display_failure: |
| msg = f"{msg} -- {exc}" |
| print(msg, file=sys.stderr, flush=True) |
| return FAILED |
| except support.TestDidNotRun: |
| return TEST_DID_NOT_RUN |
| except KeyboardInterrupt: |
| print() |
| return INTERRUPTED |
| except: |
| if not ns.pgo: |
| msg = traceback.format_exc() |
| print(f"test {test_name} crashed -- {msg}", |
| file=sys.stderr, flush=True) |
| return FAILED |
| |
| if refleak: |
| return FAILED |
| if environment.changed: |
| return ENV_CHANGED |
| return PASSED |
| |
| |
| def cleanup_test_droppings(test_name, verbose): |
| # First kill any dangling references to open files etc. |
| # This can also issue some ResourceWarnings which would otherwise get |
| # triggered during the following test run, and possibly produce failures. |
| support.gc_collect() |
| |
| # Try to clean up junk commonly left behind. While tests shouldn't leave |
| # any files or directories behind, when a test fails that can be tedious |
| # for it to arrange. The consequences can be especially nasty on Windows, |
| # since if a test leaves a file open, it cannot be deleted by name (while |
| # there's nothing we can do about that here either, we can display the |
| # name of the offending test, which is a real help). |
| for name in (support.TESTFN, |
| "db_home", |
| ): |
| if not os.path.exists(name): |
| continue |
| |
| if os.path.isdir(name): |
| import shutil |
| kind, nuker = "directory", shutil.rmtree |
| elif os.path.isfile(name): |
| kind, nuker = "file", os.unlink |
| else: |
| raise RuntimeError(f"os.path says {name!r} exists but is neither " |
| f"directory nor file") |
| |
| if verbose: |
| print_warning("%r left behind %s %r" % (test_name, kind, name)) |
| support.environment_altered = True |
| |
| try: |
| import stat |
| # fix possible permissions problems that might prevent cleanup |
| os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) |
| nuker(name) |
| except Exception as exc: |
| print_warning(f"{test_name} left behind {kind} {name!r} " |
| f"and it couldn't be removed: {exc}") |