| """Supporting definitions for the Python regression tests.""" | 
 |  | 
 | if __name__ != 'test.support': | 
 |     raise ImportError('support must be imported from the test package') | 
 |  | 
 | import contextlib | 
 | import errno | 
 | import functools | 
 | import gc | 
 | import socket | 
 | import sys | 
 | import os | 
 | import platform | 
 | import shutil | 
 | import warnings | 
 | import unittest | 
 | import importlib | 
 | import collections.abc | 
 | import re | 
 | import subprocess | 
 | import imp | 
 | import time | 
 | import sysconfig | 
 | import fnmatch | 
 | import logging.handlers | 
 | import struct | 
 | import tempfile | 
 |  | 
 | try: | 
 |     import _thread, threading | 
 | except ImportError: | 
 |     _thread = None | 
 |     threading = None | 
 | try: | 
 |     import multiprocessing.process | 
 | except ImportError: | 
 |     multiprocessing = None | 
 |  | 
 | try: | 
 |     import zlib | 
 | except ImportError: | 
 |     zlib = None | 
 |  | 
 | try: | 
 |     import bz2 | 
 | except ImportError: | 
 |     bz2 = None | 
 |  | 
 | try: | 
 |     import lzma | 
 | except ImportError: | 
 |     lzma = None | 
 |  | 
 | __all__ = [ | 
 |     "Error", "TestFailed", "ResourceDenied", "import_module", "verbose", | 
 |     "use_resources", "max_memuse", "record_original_stdout", | 
 |     "get_original_stdout", "unload", "unlink", "rmtree", "forget", | 
 |     "is_resource_enabled", "requires", "requires_freebsd_version", | 
 |     "requires_linux_version", "requires_mac_ver", "find_unused_port", | 
 |     "bind_port", "IPV6_ENABLED", "is_jython", "TESTFN", "HOST", "SAVEDCWD", | 
 |     "temp_cwd", "findfile", "create_empty_file", "sortdict", | 
 |     "check_syntax_error", "open_urlresource", "check_warnings", "CleanImport", | 
 |     "EnvironmentVarGuard", "TransientResource", "captured_stdout", | 
 |     "captured_stdin", "captured_stderr", "time_out", "socket_peer_reset", | 
 |     "ioerror_peer_reset", "run_with_locale", 'temp_umask', | 
 |     "transient_internet", "set_memlimit", "bigmemtest", "bigaddrspacetest", | 
 |     "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup", | 
 |     "threading_cleanup", "reap_children", "cpython_only", "check_impl_detail", | 
 |     "get_attribute", "swap_item", "swap_attr", "requires_IEEE_754", | 
 |     "TestHandler", "Matcher", "can_symlink", "skip_unless_symlink", | 
 |     "skip_unless_xattr", "import_fresh_module", "requires_zlib", | 
 |     "PIPE_MAX_SIZE", "failfast", "anticipate_failure", "run_with_tz", | 
 |     "requires_bz2", "requires_lzma" | 
 |     ] | 
 |  | 
 | class Error(Exception): | 
 |     """Base class for regression test exceptions.""" | 
 |  | 
 | class TestFailed(Error): | 
 |     """Test failed.""" | 
 |  | 
 | class ResourceDenied(unittest.SkipTest): | 
 |     """Test skipped because it requested a disallowed resource. | 
 |  | 
 |     This is raised when a test calls requires() for a resource that | 
 |     has not be enabled.  It is used to distinguish between expected | 
 |     and unexpected skips. | 
 |     """ | 
 |  | 
 | @contextlib.contextmanager | 
 | def _ignore_deprecated_imports(ignore=True): | 
 |     """Context manager to suppress package and module deprecation | 
 |     warnings when importing them. | 
 |  | 
 |     If ignore is False, this context manager has no effect.""" | 
 |     if ignore: | 
 |         with warnings.catch_warnings(): | 
 |             warnings.filterwarnings("ignore", ".+ (module|package)", | 
 |                                     DeprecationWarning) | 
 |             yield | 
 |     else: | 
 |         yield | 
 |  | 
 |  | 
 | def import_module(name, deprecated=False): | 
 |     """Import and return the module to be tested, raising SkipTest if | 
 |     it is not available. | 
 |  | 
 |     If deprecated is True, any module or package deprecation messages | 
 |     will be suppressed.""" | 
 |     with _ignore_deprecated_imports(deprecated): | 
 |         try: | 
 |             return importlib.import_module(name) | 
 |         except ImportError as msg: | 
 |             raise unittest.SkipTest(str(msg)) | 
 |  | 
 |  | 
 | def _save_and_remove_module(name, orig_modules): | 
 |     """Helper function to save and remove a module from sys.modules | 
 |  | 
 |        Raise ImportError if the module can't be imported.""" | 
 |     # try to import the module and raise an error if it can't be imported | 
 |     if name not in sys.modules: | 
 |         __import__(name) | 
 |         del sys.modules[name] | 
 |     for modname in list(sys.modules): | 
 |         if modname == name or modname.startswith(name + '.'): | 
 |             orig_modules[modname] = sys.modules[modname] | 
 |             del sys.modules[modname] | 
 |  | 
 | def _save_and_block_module(name, orig_modules): | 
 |     """Helper function to save and block a module in sys.modules | 
 |  | 
 |        Return True if the module was in sys.modules, False otherwise.""" | 
 |     saved = True | 
 |     try: | 
 |         orig_modules[name] = sys.modules[name] | 
 |     except KeyError: | 
 |         saved = False | 
 |     sys.modules[name] = None | 
 |     return saved | 
 |  | 
 |  | 
 | def anticipate_failure(condition): | 
 |     """Decorator to mark a test that is known to be broken in some cases | 
 |  | 
 |        Any use of this decorator should have a comment identifying the | 
 |        associated tracker issue. | 
 |     """ | 
 |     if condition: | 
 |         return unittest.expectedFailure | 
 |     return lambda f: f | 
 |  | 
 |  | 
 | def import_fresh_module(name, fresh=(), blocked=(), deprecated=False): | 
 |     """Imports and returns a module, deliberately bypassing the sys.modules cache | 
 |     and importing a fresh copy of the module. Once the import is complete, | 
 |     the sys.modules cache is restored to its original state. | 
 |  | 
 |     Modules named in fresh are also imported anew if needed by the import. | 
 |     If one of these modules can't be imported, None is returned. | 
 |  | 
 |     Importing of modules named in blocked is prevented while the fresh import | 
 |     takes place. | 
 |  | 
 |     If deprecated is True, any module or package deprecation messages | 
 |     will be suppressed.""" | 
 |     # NOTE: test_heapq, test_json and test_warnings include extra sanity checks | 
 |     # to make sure that this utility function is working as expected | 
 |     with _ignore_deprecated_imports(deprecated): | 
 |         # Keep track of modules saved for later restoration as well | 
 |         # as those which just need a blocking entry removed | 
 |         orig_modules = {} | 
 |         names_to_remove = [] | 
 |         _save_and_remove_module(name, orig_modules) | 
 |         try: | 
 |             for fresh_name in fresh: | 
 |                 _save_and_remove_module(fresh_name, orig_modules) | 
 |             for blocked_name in blocked: | 
 |                 if not _save_and_block_module(blocked_name, orig_modules): | 
 |                     names_to_remove.append(blocked_name) | 
 |             fresh_module = importlib.import_module(name) | 
 |         except ImportError: | 
 |             fresh_module = None | 
 |         finally: | 
 |             for orig_name, module in orig_modules.items(): | 
 |                 sys.modules[orig_name] = module | 
 |             for name_to_remove in names_to_remove: | 
 |                 del sys.modules[name_to_remove] | 
 |         return fresh_module | 
 |  | 
 |  | 
 | def get_attribute(obj, name): | 
 |     """Get an attribute, raising SkipTest if AttributeError is raised.""" | 
 |     try: | 
 |         attribute = getattr(obj, name) | 
 |     except AttributeError: | 
 |         raise unittest.SkipTest("object %r has no attribute %r" % (obj, name)) | 
 |     else: | 
 |         return attribute | 
 |  | 
 | verbose = 1              # Flag set to 0 by regrtest.py | 
 | use_resources = None     # Flag set to [] by regrtest.py | 
 | max_memuse = 0           # Disable bigmem tests (they will still be run with | 
 |                          # small sizes, to make sure they work.) | 
 | real_max_memuse = 0 | 
 | failfast = False | 
 | match_tests = None | 
 |  | 
 | # _original_stdout is meant to hold stdout at the time regrtest began. | 
 | # This may be "the real" stdout, or IDLE's emulation of stdout, or whatever. | 
 | # The point is to have some flavor of stdout the user can actually see. | 
 | _original_stdout = None | 
 | def record_original_stdout(stdout): | 
 |     global _original_stdout | 
 |     _original_stdout = stdout | 
 |  | 
 | def get_original_stdout(): | 
 |     return _original_stdout or sys.stdout | 
 |  | 
 | def unload(name): | 
 |     try: | 
 |         del sys.modules[name] | 
 |     except KeyError: | 
 |         pass | 
 |  | 
 | def unlink(filename): | 
 |     try: | 
 |         os.unlink(filename) | 
 |     except OSError as error: | 
 |         # The filename need not exist. | 
 |         if error.errno not in (errno.ENOENT, errno.ENOTDIR): | 
 |             raise | 
 |  | 
 | def rmtree(path): | 
 |     try: | 
 |         shutil.rmtree(path) | 
 |     except OSError as error: | 
 |         if error.errno != errno.ENOENT: | 
 |             raise | 
 |  | 
 | def make_legacy_pyc(source): | 
 |     """Move a PEP 3147 pyc/pyo file to its legacy pyc/pyo location. | 
 |  | 
 |     The choice of .pyc or .pyo extension is done based on the __debug__ flag | 
 |     value. | 
 |  | 
 |     :param source: The file system path to the source file.  The source file | 
 |         does not need to exist, however the PEP 3147 pyc file must exist. | 
 |     :return: The file system path to the legacy pyc file. | 
 |     """ | 
 |     pyc_file = imp.cache_from_source(source) | 
 |     up_one = os.path.dirname(os.path.abspath(source)) | 
 |     legacy_pyc = os.path.join(up_one, source + ('c' if __debug__ else 'o')) | 
 |     os.rename(pyc_file, legacy_pyc) | 
 |     return legacy_pyc | 
 |  | 
 | def forget(modname): | 
 |     """'Forget' a module was ever imported. | 
 |  | 
 |     This removes the module from sys.modules and deletes any PEP 3147 or | 
 |     legacy .pyc and .pyo files. | 
 |     """ | 
 |     unload(modname) | 
 |     for dirname in sys.path: | 
 |         source = os.path.join(dirname, modname + '.py') | 
 |         # It doesn't matter if they exist or not, unlink all possible | 
 |         # combinations of PEP 3147 and legacy pyc and pyo files. | 
 |         unlink(source + 'c') | 
 |         unlink(source + 'o') | 
 |         unlink(imp.cache_from_source(source, debug_override=True)) | 
 |         unlink(imp.cache_from_source(source, debug_override=False)) | 
 |  | 
 | # On some platforms, should not run gui test even if it is allowed | 
 | # in `use_resources'. | 
 | if sys.platform.startswith('win'): | 
 |     import ctypes | 
 |     import ctypes.wintypes | 
 |     def _is_gui_available(): | 
 |         UOI_FLAGS = 1 | 
 |         WSF_VISIBLE = 0x0001 | 
 |         class USEROBJECTFLAGS(ctypes.Structure): | 
 |             _fields_ = [("fInherit", ctypes.wintypes.BOOL), | 
 |                         ("fReserved", ctypes.wintypes.BOOL), | 
 |                         ("dwFlags", ctypes.wintypes.DWORD)] | 
 |         dll = ctypes.windll.user32 | 
 |         h = dll.GetProcessWindowStation() | 
 |         if not h: | 
 |             raise ctypes.WinError() | 
 |         uof = USEROBJECTFLAGS() | 
 |         needed = ctypes.wintypes.DWORD() | 
 |         res = dll.GetUserObjectInformationW(h, | 
 |             UOI_FLAGS, | 
 |             ctypes.byref(uof), | 
 |             ctypes.sizeof(uof), | 
 |             ctypes.byref(needed)) | 
 |         if not res: | 
 |             raise ctypes.WinError() | 
 |         return bool(uof.dwFlags & WSF_VISIBLE) | 
 | else: | 
 |     def _is_gui_available(): | 
 |         return True | 
 |  | 
 | def is_resource_enabled(resource): | 
 |     """Test whether a resource is enabled.  Known resources are set by | 
 |     regrtest.py.""" | 
 |     return use_resources is not None and resource in use_resources | 
 |  | 
 | def requires(resource, msg=None): | 
 |     """Raise ResourceDenied if the specified resource is not available. | 
 |  | 
 |     If the caller's module is __main__ then automatically return True.  The | 
 |     possibility of False being returned occurs when regrtest.py is | 
 |     executing. | 
 |     """ | 
 |     if resource == 'gui' and not _is_gui_available(): | 
 |         raise unittest.SkipTest("Cannot use the 'gui' resource") | 
 |     # see if the caller's module is __main__ - if so, treat as if | 
 |     # the resource was set | 
 |     if sys._getframe(1).f_globals.get("__name__") == "__main__": | 
 |         return | 
 |     if not is_resource_enabled(resource): | 
 |         if msg is None: | 
 |             msg = "Use of the %r resource not enabled" % resource | 
 |         raise ResourceDenied(msg) | 
 |  | 
 | def _requires_unix_version(sysname, min_version): | 
 |     """Decorator raising SkipTest if the OS is `sysname` and the version is less | 
 |     than `min_version`. | 
 |  | 
 |     For example, @_requires_unix_version('FreeBSD', (7, 2)) raises SkipTest if | 
 |     the FreeBSD version is less than 7.2. | 
 |     """ | 
 |     def decorator(func): | 
 |         @functools.wraps(func) | 
 |         def wrapper(*args, **kw): | 
 |             if platform.system() == sysname: | 
 |                 version_txt = platform.release().split('-', 1)[0] | 
 |                 try: | 
 |                     version = tuple(map(int, version_txt.split('.'))) | 
 |                 except ValueError: | 
 |                     pass | 
 |                 else: | 
 |                     if version < min_version: | 
 |                         min_version_txt = '.'.join(map(str, min_version)) | 
 |                         raise unittest.SkipTest( | 
 |                             "%s version %s or higher required, not %s" | 
 |                             % (sysname, min_version_txt, version_txt)) | 
 |         return wrapper | 
 |     return decorator | 
 |  | 
 | def requires_freebsd_version(*min_version): | 
 |     """Decorator raising SkipTest if the OS is FreeBSD and the FreeBSD version is | 
 |     less than `min_version`. | 
 |  | 
 |     For example, @requires_freebsd_version(7, 2) raises SkipTest if the FreeBSD | 
 |     version is less than 7.2. | 
 |     """ | 
 |     return _requires_unix_version('FreeBSD', min_version) | 
 |  | 
 | def requires_linux_version(*min_version): | 
 |     """Decorator raising SkipTest if the OS is Linux and the Linux version is | 
 |     less than `min_version`. | 
 |  | 
 |     For example, @requires_linux_version(2, 6, 32) raises SkipTest if the Linux | 
 |     version is less than 2.6.32. | 
 |     """ | 
 |     return _requires_unix_version('Linux', min_version) | 
 |  | 
 | def requires_mac_ver(*min_version): | 
 |     """Decorator raising SkipTest if the OS is Mac OS X and the OS X | 
 |     version if less than min_version. | 
 |  | 
 |     For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version | 
 |     is lesser than 10.5. | 
 |     """ | 
 |     def decorator(func): | 
 |         @functools.wraps(func) | 
 |         def wrapper(*args, **kw): | 
 |             if sys.platform == 'darwin': | 
 |                 version_txt = platform.mac_ver()[0] | 
 |                 try: | 
 |                     version = tuple(map(int, version_txt.split('.'))) | 
 |                 except ValueError: | 
 |                     pass | 
 |                 else: | 
 |                     if version < min_version: | 
 |                         min_version_txt = '.'.join(map(str, min_version)) | 
 |                         raise unittest.SkipTest( | 
 |                             "Mac OS X %s or higher required, not %s" | 
 |                             % (min_version_txt, version_txt)) | 
 |             return func(*args, **kw) | 
 |         wrapper.min_version = min_version | 
 |         return wrapper | 
 |     return decorator | 
 |  | 
 |  | 
 | HOST = 'localhost' | 
 |  | 
 | def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM): | 
 |     """Returns an unused port that should be suitable for binding.  This is | 
 |     achieved by creating a temporary socket with the same family and type as | 
 |     the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to | 
 |     the specified host address (defaults to 0.0.0.0) with the port set to 0, | 
 |     eliciting an unused ephemeral port from the OS.  The temporary socket is | 
 |     then closed and deleted, and the ephemeral port is returned. | 
 |  | 
 |     Either this method or bind_port() should be used for any tests where a | 
 |     server socket needs to be bound to a particular port for the duration of | 
 |     the test.  Which one to use depends on whether the calling code is creating | 
 |     a python socket, or if an unused port needs to be provided in a constructor | 
 |     or passed to an external program (i.e. the -accept argument to openssl's | 
 |     s_server mode).  Always prefer bind_port() over find_unused_port() where | 
 |     possible.  Hard coded ports should *NEVER* be used.  As soon as a server | 
 |     socket is bound to a hard coded port, the ability to run multiple instances | 
 |     of the test simultaneously on the same host is compromised, which makes the | 
 |     test a ticking time bomb in a buildbot environment. On Unix buildbots, this | 
 |     may simply manifest as a failed test, which can be recovered from without | 
 |     intervention in most cases, but on Windows, the entire python process can | 
 |     completely and utterly wedge, requiring someone to log in to the buildbot | 
 |     and manually kill the affected process. | 
 |  | 
 |     (This is easy to reproduce on Windows, unfortunately, and can be traced to | 
 |     the SO_REUSEADDR socket option having different semantics on Windows versus | 
 |     Unix/Linux.  On Unix, you can't have two AF_INET SOCK_STREAM sockets bind, | 
 |     listen and then accept connections on identical host/ports.  An EADDRINUSE | 
 |     socket.error will be raised at some point (depending on the platform and | 
 |     the order bind and listen were called on each socket). | 
 |  | 
 |     However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE | 
 |     will ever be raised when attempting to bind two identical host/ports. When | 
 |     accept() is called on each socket, the second caller's process will steal | 
 |     the port from the first caller, leaving them both in an awkwardly wedged | 
 |     state where they'll no longer respond to any signals or graceful kills, and | 
 |     must be forcibly killed via OpenProcess()/TerminateProcess(). | 
 |  | 
 |     The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option | 
 |     instead of SO_REUSEADDR, which effectively affords the same semantics as | 
 |     SO_REUSEADDR on Unix.  Given the propensity of Unix developers in the Open | 
 |     Source world compared to Windows ones, this is a common mistake.  A quick | 
 |     look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when | 
 |     openssl.exe is called with the 's_server' option, for example. See | 
 |     http://bugs.python.org/issue2550 for more info.  The following site also | 
 |     has a very thorough description about the implications of both REUSEADDR | 
 |     and EXCLUSIVEADDRUSE on Windows: | 
 |     http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx) | 
 |  | 
 |     XXX: although this approach is a vast improvement on previous attempts to | 
 |     elicit unused ports, it rests heavily on the assumption that the ephemeral | 
 |     port returned to us by the OS won't immediately be dished back out to some | 
 |     other process when we close and delete our temporary socket but before our | 
 |     calling code has a chance to bind the returned port.  We can deal with this | 
 |     issue if/when we come across it. | 
 |     """ | 
 |  | 
 |     tempsock = socket.socket(family, socktype) | 
 |     port = bind_port(tempsock) | 
 |     tempsock.close() | 
 |     del tempsock | 
 |     return port | 
 |  | 
 | def bind_port(sock, host=HOST): | 
 |     """Bind the socket to a free port and return the port number.  Relies on | 
 |     ephemeral ports in order to ensure we are using an unbound port.  This is | 
 |     important as many tests may be running simultaneously, especially in a | 
 |     buildbot environment.  This method raises an exception if the sock.family | 
 |     is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR | 
 |     or SO_REUSEPORT set on it.  Tests should *never* set these socket options | 
 |     for TCP/IP sockets.  The only case for setting these options is testing | 
 |     multicasting via multiple UDP sockets. | 
 |  | 
 |     Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e. | 
 |     on Windows), it will be set on the socket.  This will prevent anyone else | 
 |     from bind()'ing to our host/port for the duration of the test. | 
 |     """ | 
 |  | 
 |     if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM: | 
 |         if hasattr(socket, 'SO_REUSEADDR'): | 
 |             if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1: | 
 |                 raise TestFailed("tests should never set the SO_REUSEADDR "   \ | 
 |                                  "socket option on TCP/IP sockets!") | 
 |         if hasattr(socket, 'SO_REUSEPORT'): | 
 |             if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1: | 
 |                 raise TestFailed("tests should never set the SO_REUSEPORT "   \ | 
 |                                  "socket option on TCP/IP sockets!") | 
 |         if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'): | 
 |             sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1) | 
 |  | 
 |     sock.bind((host, 0)) | 
 |     port = sock.getsockname()[1] | 
 |     return port | 
 |  | 
 | def _is_ipv6_enabled(): | 
 |     """Check whether IPv6 is enabled on this host.""" | 
 |     if socket.has_ipv6: | 
 |         try: | 
 |             sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) | 
 |             sock.bind(('::1', 0)) | 
 |         except (socket.error, socket.gaierror): | 
 |             pass | 
 |         else: | 
 |             sock.close() | 
 |             return True | 
 |     return False | 
 |  | 
 | IPV6_ENABLED = _is_ipv6_enabled() | 
 |  | 
 |  | 
 | # A constant likely larger than the underlying OS pipe buffer size. | 
 | # Windows limit seems to be around 512B, and most Unix kernels have a 64K pipe | 
 | # buffer size: take 1M to be sure. | 
 | PIPE_MAX_SIZE = 1024 * 1024 | 
 |  | 
 |  | 
 | # decorator for skipping tests on non-IEEE 754 platforms | 
 | requires_IEEE_754 = unittest.skipUnless( | 
 |     float.__getformat__("double").startswith("IEEE"), | 
 |     "test requires IEEE 754 doubles") | 
 |  | 
 | requires_zlib = unittest.skipUnless(zlib, 'requires zlib') | 
 |  | 
 | requires_bz2 = unittest.skipUnless(bz2, 'requires bz2') | 
 |  | 
 | requires_lzma = unittest.skipUnless(lzma, 'requires lzma') | 
 |  | 
 | is_jython = sys.platform.startswith('java') | 
 |  | 
 | # Filename used for testing | 
 | if os.name == 'java': | 
 |     # Jython disallows @ in module names | 
 |     TESTFN = '$test' | 
 | else: | 
 |     TESTFN = '@test' | 
 |  | 
 | # Disambiguate TESTFN for parallel testing, while letting it remain a valid | 
 | # module name. | 
 | TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid()) | 
 |  | 
 |  | 
 | # TESTFN_UNICODE is a non-ascii filename | 
 | TESTFN_UNICODE = TESTFN + "-\xe0\xf2\u0258\u0141\u011f" | 
 | if sys.platform == 'darwin': | 
 |     # In Mac OS X's VFS API file names are, by definition, canonically | 
 |     # decomposed Unicode, encoded using UTF-8. See QA1173: | 
 |     # http://developer.apple.com/mac/library/qa/qa2001/qa1173.html | 
 |     import unicodedata | 
 |     TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE) | 
 | TESTFN_ENCODING = sys.getfilesystemencoding() | 
 |  | 
 | # TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be | 
 | # encoded by the filesystem encoding (in strict mode). It can be None if we | 
 | # cannot generate such filename. | 
 | TESTFN_UNENCODABLE = None | 
 | if os.name in ('nt', 'ce'): | 
 |     # skip win32s (0) or Windows 9x/ME (1) | 
 |     if sys.getwindowsversion().platform >= 2: | 
 |         # Different kinds of characters from various languages to minimize the | 
 |         # probability that the whole name is encodable to MBCS (issue #9819) | 
 |         TESTFN_UNENCODABLE = TESTFN + "-\u5171\u0141\u2661\u0363\uDC80" | 
 |         try: | 
 |             TESTFN_UNENCODABLE.encode(TESTFN_ENCODING) | 
 |         except UnicodeEncodeError: | 
 |             pass | 
 |         else: | 
 |             print('WARNING: The filename %r CAN be encoded by the filesystem encoding (%s). ' | 
 |                   'Unicode filename tests may not be effective' | 
 |                   % (TESTFN_UNENCODABLE, TESTFN_ENCODING)) | 
 |             TESTFN_UNENCODABLE = None | 
 | # Mac OS X denies unencodable filenames (invalid utf-8) | 
 | elif sys.platform != 'darwin': | 
 |     try: | 
 |         # ascii and utf-8 cannot encode the byte 0xff | 
 |         b'\xff'.decode(TESTFN_ENCODING) | 
 |     except UnicodeDecodeError: | 
 |         # 0xff will be encoded using the surrogate character u+DCFF | 
 |         TESTFN_UNENCODABLE = TESTFN \ | 
 |             + b'-\xff'.decode(TESTFN_ENCODING, 'surrogateescape') | 
 |     else: | 
 |         # File system encoding (eg. ISO-8859-* encodings) can encode | 
 |         # the byte 0xff. Skip some unicode filename tests. | 
 |         pass | 
 |  | 
 | # Save the initial cwd | 
 | SAVEDCWD = os.getcwd() | 
 |  | 
 | @contextlib.contextmanager | 
 | def temp_cwd(name='tempcwd', quiet=False, path=None): | 
 |     """ | 
 |     Context manager that temporarily changes the CWD. | 
 |  | 
 |     An existing path may be provided as *path*, in which case this | 
 |     function makes no changes to the file system. | 
 |  | 
 |     Otherwise, the new CWD is created in the current directory and it's | 
 |     named *name*. If *quiet* is False (default) and it's not possible to | 
 |     create or change the CWD, an error is raised.  If it's True, only a | 
 |     warning is raised and the original CWD is used. | 
 |     """ | 
 |     saved_dir = os.getcwd() | 
 |     is_temporary = False | 
 |     if path is None: | 
 |         path = name | 
 |         try: | 
 |             os.mkdir(name) | 
 |             is_temporary = True | 
 |         except OSError: | 
 |             if not quiet: | 
 |                 raise | 
 |             warnings.warn('tests may fail, unable to create temp CWD ' + name, | 
 |                           RuntimeWarning, stacklevel=3) | 
 |     try: | 
 |         os.chdir(path) | 
 |     except OSError: | 
 |         if not quiet: | 
 |             raise | 
 |         warnings.warn('tests may fail, unable to change the CWD to ' + name, | 
 |                       RuntimeWarning, stacklevel=3) | 
 |     try: | 
 |         yield os.getcwd() | 
 |     finally: | 
 |         os.chdir(saved_dir) | 
 |         if is_temporary: | 
 |             rmtree(name) | 
 |  | 
 |  | 
 | if hasattr(os, "umask"): | 
 |     @contextlib.contextmanager | 
 |     def temp_umask(umask): | 
 |         """Context manager that temporarily sets the process umask.""" | 
 |         oldmask = os.umask(umask) | 
 |         try: | 
 |             yield | 
 |         finally: | 
 |             os.umask(oldmask) | 
 |  | 
 |  | 
 | def findfile(file, here=__file__, subdir=None): | 
 |     """Try to find a file on sys.path and the working directory.  If it is not | 
 |     found the argument passed to the function is returned (this does not | 
 |     necessarily signal failure; could still be the legitimate path).""" | 
 |     if os.path.isabs(file): | 
 |         return file | 
 |     if subdir is not None: | 
 |         file = os.path.join(subdir, file) | 
 |     path = sys.path | 
 |     path = [os.path.dirname(here)] + path | 
 |     for dn in path: | 
 |         fn = os.path.join(dn, file) | 
 |         if os.path.exists(fn): return fn | 
 |     return file | 
 |  | 
 | def create_empty_file(filename): | 
 |     """Create an empty file. If the file already exists, truncate it.""" | 
 |     fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC) | 
 |     os.close(fd) | 
 |  | 
 | def sortdict(dict): | 
 |     "Like repr(dict), but in sorted order." | 
 |     items = sorted(dict.items()) | 
 |     reprpairs = ["%r: %r" % pair for pair in items] | 
 |     withcommas = ", ".join(reprpairs) | 
 |     return "{%s}" % withcommas | 
 |  | 
 | def make_bad_fd(): | 
 |     """ | 
 |     Create an invalid file descriptor by opening and closing a file and return | 
 |     its fd. | 
 |     """ | 
 |     file = open(TESTFN, "wb") | 
 |     try: | 
 |         return file.fileno() | 
 |     finally: | 
 |         file.close() | 
 |         unlink(TESTFN) | 
 |  | 
 | def check_syntax_error(testcase, statement): | 
 |     testcase.assertRaises(SyntaxError, compile, statement, | 
 |                           '<test string>', 'exec') | 
 |  | 
 | def open_urlresource(url, *args, **kw): | 
 |     import urllib.request, urllib.parse | 
 |  | 
 |     check = kw.pop('check', None) | 
 |  | 
 |     filename = urllib.parse.urlparse(url)[2].split('/')[-1] # '/': it's URL! | 
 |  | 
 |     fn = os.path.join(os.path.dirname(__file__), "data", filename) | 
 |  | 
 |     def check_valid_file(fn): | 
 |         f = open(fn, *args, **kw) | 
 |         if check is None: | 
 |             return f | 
 |         elif check(f): | 
 |             f.seek(0) | 
 |             return f | 
 |         f.close() | 
 |  | 
 |     if os.path.exists(fn): | 
 |         f = check_valid_file(fn) | 
 |         if f is not None: | 
 |             return f | 
 |         unlink(fn) | 
 |  | 
 |     # Verify the requirement before downloading the file | 
 |     requires('urlfetch') | 
 |  | 
 |     print('\tfetching %s ...' % url, file=get_original_stdout()) | 
 |     f = urllib.request.urlopen(url, timeout=15) | 
 |     try: | 
 |         with open(fn, "wb") as out: | 
 |             s = f.read() | 
 |             while s: | 
 |                 out.write(s) | 
 |                 s = f.read() | 
 |     finally: | 
 |         f.close() | 
 |  | 
 |     f = check_valid_file(fn) | 
 |     if f is not None: | 
 |         return f | 
 |     raise TestFailed('invalid resource %r' % fn) | 
 |  | 
 |  | 
 | class WarningsRecorder(object): | 
 |     """Convenience wrapper for the warnings list returned on | 
 |        entry to the warnings.catch_warnings() context manager. | 
 |     """ | 
 |     def __init__(self, warnings_list): | 
 |         self._warnings = warnings_list | 
 |         self._last = 0 | 
 |  | 
 |     def __getattr__(self, attr): | 
 |         if len(self._warnings) > self._last: | 
 |             return getattr(self._warnings[-1], attr) | 
 |         elif attr in warnings.WarningMessage._WARNING_DETAILS: | 
 |             return None | 
 |         raise AttributeError("%r has no attribute %r" % (self, attr)) | 
 |  | 
 |     @property | 
 |     def warnings(self): | 
 |         return self._warnings[self._last:] | 
 |  | 
 |     def reset(self): | 
 |         self._last = len(self._warnings) | 
 |  | 
 |  | 
 | def _filterwarnings(filters, quiet=False): | 
 |     """Catch the warnings, then check if all the expected | 
 |     warnings have been raised and re-raise unexpected warnings. | 
 |     If 'quiet' is True, only re-raise the unexpected warnings. | 
 |     """ | 
 |     # Clear the warning registry of the calling module | 
 |     # in order to re-raise the warnings. | 
 |     frame = sys._getframe(2) | 
 |     registry = frame.f_globals.get('__warningregistry__') | 
 |     if registry: | 
 |         registry.clear() | 
 |     with warnings.catch_warnings(record=True) as w: | 
 |         # Set filter "always" to record all warnings.  Because | 
 |         # test_warnings swap the module, we need to look up in | 
 |         # the sys.modules dictionary. | 
 |         sys.modules['warnings'].simplefilter("always") | 
 |         yield WarningsRecorder(w) | 
 |     # Filter the recorded warnings | 
 |     reraise = list(w) | 
 |     missing = [] | 
 |     for msg, cat in filters: | 
 |         seen = False | 
 |         for w in reraise[:]: | 
 |             warning = w.message | 
 |             # Filter out the matching messages | 
 |             if (re.match(msg, str(warning), re.I) and | 
 |                 issubclass(warning.__class__, cat)): | 
 |                 seen = True | 
 |                 reraise.remove(w) | 
 |         if not seen and not quiet: | 
 |             # This filter caught nothing | 
 |             missing.append((msg, cat.__name__)) | 
 |     if reraise: | 
 |         raise AssertionError("unhandled warning %s" % reraise[0]) | 
 |     if missing: | 
 |         raise AssertionError("filter (%r, %s) did not catch any warning" % | 
 |                              missing[0]) | 
 |  | 
 |  | 
 | @contextlib.contextmanager | 
 | def check_warnings(*filters, **kwargs): | 
 |     """Context manager to silence warnings. | 
 |  | 
 |     Accept 2-tuples as positional arguments: | 
 |         ("message regexp", WarningCategory) | 
 |  | 
 |     Optional argument: | 
 |      - if 'quiet' is True, it does not fail if a filter catches nothing | 
 |         (default True without argument, | 
 |          default False if some filters are defined) | 
 |  | 
 |     Without argument, it defaults to: | 
 |         check_warnings(("", Warning), quiet=True) | 
 |     """ | 
 |     quiet = kwargs.get('quiet') | 
 |     if not filters: | 
 |         filters = (("", Warning),) | 
 |         # Preserve backward compatibility | 
 |         if quiet is None: | 
 |             quiet = True | 
 |     return _filterwarnings(filters, quiet) | 
 |  | 
 |  | 
 | class CleanImport(object): | 
 |     """Context manager to force import to return a new module reference. | 
 |  | 
 |     This is useful for testing module-level behaviours, such as | 
 |     the emission of a DeprecationWarning on import. | 
 |  | 
 |     Use like this: | 
 |  | 
 |         with CleanImport("foo"): | 
 |             importlib.import_module("foo") # new reference | 
 |     """ | 
 |  | 
 |     def __init__(self, *module_names): | 
 |         self.original_modules = sys.modules.copy() | 
 |         for module_name in module_names: | 
 |             if module_name in sys.modules: | 
 |                 module = sys.modules[module_name] | 
 |                 # It is possible that module_name is just an alias for | 
 |                 # another module (e.g. stub for modules renamed in 3.x). | 
 |                 # In that case, we also need delete the real module to clear | 
 |                 # the import cache. | 
 |                 if module.__name__ != module_name: | 
 |                     del sys.modules[module.__name__] | 
 |                 del sys.modules[module_name] | 
 |  | 
 |     def __enter__(self): | 
 |         return self | 
 |  | 
 |     def __exit__(self, *ignore_exc): | 
 |         sys.modules.update(self.original_modules) | 
 |  | 
 |  | 
 | class EnvironmentVarGuard(collections.abc.MutableMapping): | 
 |  | 
 |     """Class to help protect the environment variable properly.  Can be used as | 
 |     a context manager.""" | 
 |  | 
 |     def __init__(self): | 
 |         self._environ = os.environ | 
 |         self._changed = {} | 
 |  | 
 |     def __getitem__(self, envvar): | 
 |         return self._environ[envvar] | 
 |  | 
 |     def __setitem__(self, envvar, value): | 
 |         # Remember the initial value on the first access | 
 |         if envvar not in self._changed: | 
 |             self._changed[envvar] = self._environ.get(envvar) | 
 |         self._environ[envvar] = value | 
 |  | 
 |     def __delitem__(self, envvar): | 
 |         # Remember the initial value on the first access | 
 |         if envvar not in self._changed: | 
 |             self._changed[envvar] = self._environ.get(envvar) | 
 |         if envvar in self._environ: | 
 |             del self._environ[envvar] | 
 |  | 
 |     def keys(self): | 
 |         return self._environ.keys() | 
 |  | 
 |     def __iter__(self): | 
 |         return iter(self._environ) | 
 |  | 
 |     def __len__(self): | 
 |         return len(self._environ) | 
 |  | 
 |     def set(self, envvar, value): | 
 |         self[envvar] = value | 
 |  | 
 |     def unset(self, envvar): | 
 |         del self[envvar] | 
 |  | 
 |     def __enter__(self): | 
 |         return self | 
 |  | 
 |     def __exit__(self, *ignore_exc): | 
 |         for (k, v) in self._changed.items(): | 
 |             if v is None: | 
 |                 if k in self._environ: | 
 |                     del self._environ[k] | 
 |             else: | 
 |                 self._environ[k] = v | 
 |         os.environ = self._environ | 
 |  | 
 |  | 
 | class DirsOnSysPath(object): | 
 |     """Context manager to temporarily add directories to sys.path. | 
 |  | 
 |     This makes a copy of sys.path, appends any directories given | 
 |     as positional arguments, then reverts sys.path to the copied | 
 |     settings when the context ends. | 
 |  | 
 |     Note that *all* sys.path modifications in the body of the | 
 |     context manager, including replacement of the object, | 
 |     will be reverted at the end of the block. | 
 |     """ | 
 |  | 
 |     def __init__(self, *paths): | 
 |         self.original_value = sys.path[:] | 
 |         self.original_object = sys.path | 
 |         sys.path.extend(paths) | 
 |  | 
 |     def __enter__(self): | 
 |         return self | 
 |  | 
 |     def __exit__(self, *ignore_exc): | 
 |         sys.path = self.original_object | 
 |         sys.path[:] = self.original_value | 
 |  | 
 |  | 
 | class TransientResource(object): | 
 |  | 
 |     """Raise ResourceDenied if an exception is raised while the context manager | 
 |     is in effect that matches the specified exception and attributes.""" | 
 |  | 
 |     def __init__(self, exc, **kwargs): | 
 |         self.exc = exc | 
 |         self.attrs = kwargs | 
 |  | 
 |     def __enter__(self): | 
 |         return self | 
 |  | 
 |     def __exit__(self, type_=None, value=None, traceback=None): | 
 |         """If type_ is a subclass of self.exc and value has attributes matching | 
 |         self.attrs, raise ResourceDenied.  Otherwise let the exception | 
 |         propagate (if any).""" | 
 |         if type_ is not None and issubclass(self.exc, type_): | 
 |             for attr, attr_value in self.attrs.items(): | 
 |                 if not hasattr(value, attr): | 
 |                     break | 
 |                 if getattr(value, attr) != attr_value: | 
 |                     break | 
 |             else: | 
 |                 raise ResourceDenied("an optional resource is not available") | 
 |  | 
 | # Context managers that raise ResourceDenied when various issues | 
 | # with the Internet connection manifest themselves as exceptions. | 
 | # XXX deprecate these and use transient_internet() instead | 
 | time_out = TransientResource(IOError, errno=errno.ETIMEDOUT) | 
 | socket_peer_reset = TransientResource(socket.error, errno=errno.ECONNRESET) | 
 | ioerror_peer_reset = TransientResource(IOError, errno=errno.ECONNRESET) | 
 |  | 
 |  | 
 | @contextlib.contextmanager | 
 | def transient_internet(resource_name, *, timeout=30.0, errnos=()): | 
 |     """Return a context manager that raises ResourceDenied when various issues | 
 |     with the Internet connection manifest themselves as exceptions.""" | 
 |     default_errnos = [ | 
 |         ('ECONNREFUSED', 111), | 
 |         ('ECONNRESET', 104), | 
 |         ('EHOSTUNREACH', 113), | 
 |         ('ENETUNREACH', 101), | 
 |         ('ETIMEDOUT', 110), | 
 |     ] | 
 |     default_gai_errnos = [ | 
 |         ('EAI_AGAIN', -3), | 
 |         ('EAI_FAIL', -4), | 
 |         ('EAI_NONAME', -2), | 
 |         ('EAI_NODATA', -5), | 
 |         # Encountered when trying to resolve IPv6-only hostnames | 
 |         ('WSANO_DATA', 11004), | 
 |     ] | 
 |  | 
 |     denied = ResourceDenied("Resource %r is not available" % resource_name) | 
 |     captured_errnos = errnos | 
 |     gai_errnos = [] | 
 |     if not captured_errnos: | 
 |         captured_errnos = [getattr(errno, name, num) | 
 |                            for (name, num) in default_errnos] | 
 |         gai_errnos = [getattr(socket, name, num) | 
 |                       for (name, num) in default_gai_errnos] | 
 |  | 
 |     def filter_error(err): | 
 |         n = getattr(err, 'errno', None) | 
 |         if (isinstance(err, socket.timeout) or | 
 |             (isinstance(err, socket.gaierror) and n in gai_errnos) or | 
 |             n in captured_errnos): | 
 |             if not verbose: | 
 |                 sys.stderr.write(denied.args[0] + "\n") | 
 |             raise denied from err | 
 |  | 
 |     old_timeout = socket.getdefaulttimeout() | 
 |     try: | 
 |         if timeout is not None: | 
 |             socket.setdefaulttimeout(timeout) | 
 |         yield | 
 |     except IOError as err: | 
 |         # urllib can wrap original socket errors multiple times (!), we must | 
 |         # unwrap to get at the original error. | 
 |         while True: | 
 |             a = err.args | 
 |             if len(a) >= 1 and isinstance(a[0], IOError): | 
 |                 err = a[0] | 
 |             # The error can also be wrapped as args[1]: | 
 |             #    except socket.error as msg: | 
 |             #        raise IOError('socket error', msg).with_traceback(sys.exc_info()[2]) | 
 |             elif len(a) >= 2 and isinstance(a[1], IOError): | 
 |                 err = a[1] | 
 |             else: | 
 |                 break | 
 |         filter_error(err) | 
 |         raise | 
 |     # XXX should we catch generic exceptions and look for their | 
 |     # __cause__ or __context__? | 
 |     finally: | 
 |         socket.setdefaulttimeout(old_timeout) | 
 |  | 
 |  | 
 | @contextlib.contextmanager | 
 | def captured_output(stream_name): | 
 |     """Return a context manager used by captured_stdout/stdin/stderr | 
 |     that temporarily replaces the sys stream *stream_name* with a StringIO.""" | 
 |     import io | 
 |     orig_stdout = getattr(sys, stream_name) | 
 |     setattr(sys, stream_name, io.StringIO()) | 
 |     try: | 
 |         yield getattr(sys, stream_name) | 
 |     finally: | 
 |         setattr(sys, stream_name, orig_stdout) | 
 |  | 
 | def captured_stdout(): | 
 |     """Capture the output of sys.stdout: | 
 |  | 
 |        with captured_stdout() as s: | 
 |            print("hello") | 
 |        self.assertEqual(s.getvalue(), "hello") | 
 |     """ | 
 |     return captured_output("stdout") | 
 |  | 
 | def captured_stderr(): | 
 |     return captured_output("stderr") | 
 |  | 
 | def captured_stdin(): | 
 |     return captured_output("stdin") | 
 |  | 
 |  | 
 | def gc_collect(): | 
 |     """Force as many objects as possible to be collected. | 
 |  | 
 |     In non-CPython implementations of Python, this is needed because timely | 
 |     deallocation is not guaranteed by the garbage collector.  (Even in CPython | 
 |     this can be the case in case of reference cycles.)  This means that __del__ | 
 |     methods may be called later than expected and weakrefs may remain alive for | 
 |     longer than expected.  This function tries its best to force all garbage | 
 |     objects to disappear. | 
 |     """ | 
 |     gc.collect() | 
 |     if is_jython: | 
 |         time.sleep(0.1) | 
 |     gc.collect() | 
 |     gc.collect() | 
 |  | 
 | @contextlib.contextmanager | 
 | def disable_gc(): | 
 |     have_gc = gc.isenabled() | 
 |     gc.disable() | 
 |     try: | 
 |         yield | 
 |     finally: | 
 |         if have_gc: | 
 |             gc.enable() | 
 |  | 
 |  | 
 | def python_is_optimized(): | 
 |     """Find if Python was built with optimizations.""" | 
 |     cflags = sysconfig.get_config_var('PY_CFLAGS') or '' | 
 |     final_opt = "" | 
 |     for opt in cflags.split(): | 
 |         if opt.startswith('-O'): | 
 |             final_opt = opt | 
 |     return final_opt != '' and final_opt != '-O0' | 
 |  | 
 |  | 
 | #======================================================================= | 
 | # Decorator for running a function in a different locale, correctly resetting | 
 | # it afterwards. | 
 |  | 
 | def run_with_locale(catstr, *locales): | 
 |     def decorator(func): | 
 |         def inner(*args, **kwds): | 
 |             try: | 
 |                 import locale | 
 |                 category = getattr(locale, catstr) | 
 |                 orig_locale = locale.setlocale(category) | 
 |             except AttributeError: | 
 |                 # if the test author gives us an invalid category string | 
 |                 raise | 
 |             except: | 
 |                 # cannot retrieve original locale, so do nothing | 
 |                 locale = orig_locale = None | 
 |             else: | 
 |                 for loc in locales: | 
 |                     try: | 
 |                         locale.setlocale(category, loc) | 
 |                         break | 
 |                     except: | 
 |                         pass | 
 |  | 
 |             # now run the function, resetting the locale on exceptions | 
 |             try: | 
 |                 return func(*args, **kwds) | 
 |             finally: | 
 |                 if locale and orig_locale: | 
 |                     locale.setlocale(category, orig_locale) | 
 |         inner.__name__ = func.__name__ | 
 |         inner.__doc__ = func.__doc__ | 
 |         return inner | 
 |     return decorator | 
 |  | 
 | #======================================================================= | 
 | # Decorator for running a function in a specific timezone, correctly | 
 | # resetting it afterwards. | 
 |  | 
 | def run_with_tz(tz): | 
 |     def decorator(func): | 
 |         def inner(*args, **kwds): | 
 |             try: | 
 |                 tzset = time.tzset | 
 |             except AttributeError: | 
 |                 raise unittest.SkipTest("tzset required") | 
 |             if 'TZ' in os.environ: | 
 |                 orig_tz = os.environ['TZ'] | 
 |             else: | 
 |                 orig_tz = None | 
 |             os.environ['TZ'] = tz | 
 |             tzset() | 
 |  | 
 |             # now run the function, resetting the tz on exceptions | 
 |             try: | 
 |                 return func(*args, **kwds) | 
 |             finally: | 
 |                 if orig_tz == None: | 
 |                     del os.environ['TZ'] | 
 |                 else: | 
 |                     os.environ['TZ'] = orig_tz | 
 |                 time.tzset() | 
 |  | 
 |         inner.__name__ = func.__name__ | 
 |         inner.__doc__ = func.__doc__ | 
 |         return inner | 
 |     return decorator | 
 |  | 
 | #======================================================================= | 
 | # Big-memory-test support. Separate from 'resources' because memory use | 
 | # should be configurable. | 
 |  | 
 | # Some handy shorthands. Note that these are used for byte-limits as well | 
 | # as size-limits, in the various bigmem tests | 
 | _1M = 1024*1024 | 
 | _1G = 1024 * _1M | 
 | _2G = 2 * _1G | 
 | _4G = 4 * _1G | 
 |  | 
 | MAX_Py_ssize_t = sys.maxsize | 
 |  | 
 | def set_memlimit(limit): | 
 |     global max_memuse | 
 |     global real_max_memuse | 
 |     sizes = { | 
 |         'k': 1024, | 
 |         'm': _1M, | 
 |         'g': _1G, | 
 |         't': 1024*_1G, | 
 |     } | 
 |     m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit, | 
 |                  re.IGNORECASE | re.VERBOSE) | 
 |     if m is None: | 
 |         raise ValueError('Invalid memory limit %r' % (limit,)) | 
 |     memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()]) | 
 |     real_max_memuse = memlimit | 
 |     if memlimit > MAX_Py_ssize_t: | 
 |         memlimit = MAX_Py_ssize_t | 
 |     if memlimit < _2G - 1: | 
 |         raise ValueError('Memory limit %r too low to be useful' % (limit,)) | 
 |     max_memuse = memlimit | 
 |  | 
 | class _MemoryWatchdog: | 
 |     """An object which periodically watches the process' memory consumption | 
 |     and prints it out. | 
 |     """ | 
 |  | 
 |     def __init__(self): | 
 |         self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid()) | 
 |         self.started = False | 
 |  | 
 |     def start(self): | 
 |         try: | 
 |             f = open(self.procfile, 'r') | 
 |         except OSError as e: | 
 |             warnings.warn('/proc not available for stats: {}'.format(e), | 
 |                           RuntimeWarning) | 
 |             sys.stderr.flush() | 
 |             return | 
 |  | 
 |         watchdog_script = findfile("memory_watchdog.py") | 
 |         self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script], | 
 |                                              stdin=f, stderr=subprocess.DEVNULL) | 
 |         f.close() | 
 |         self.started = True | 
 |  | 
 |     def stop(self): | 
 |         if self.started: | 
 |             self.mem_watchdog.terminate() | 
 |             self.mem_watchdog.wait() | 
 |  | 
 |  | 
 | def bigmemtest(size, memuse, dry_run=True): | 
 |     """Decorator for bigmem tests. | 
 |  | 
 |     'minsize' is the minimum useful size for the test (in arbitrary, | 
 |     test-interpreted units.) 'memuse' is the number of 'bytes per size' for | 
 |     the test, or a good estimate of it. | 
 |  | 
 |     if 'dry_run' is False, it means the test doesn't support dummy runs | 
 |     when -M is not specified. | 
 |     """ | 
 |     def decorator(f): | 
 |         def wrapper(self): | 
 |             size = wrapper.size | 
 |             memuse = wrapper.memuse | 
 |             if not real_max_memuse: | 
 |                 maxsize = 5147 | 
 |             else: | 
 |                 maxsize = size | 
 |  | 
 |             if ((real_max_memuse or not dry_run) | 
 |                 and real_max_memuse < maxsize * memuse): | 
 |                 raise unittest.SkipTest( | 
 |                     "not enough memory: %.1fG minimum needed" | 
 |                     % (size * memuse / (1024 ** 3))) | 
 |  | 
 |             if real_max_memuse and verbose: | 
 |                 print() | 
 |                 print(" ... expected peak memory use: {peak:.1f}G" | 
 |                       .format(peak=size * memuse / (1024 ** 3))) | 
 |                 watchdog = _MemoryWatchdog() | 
 |                 watchdog.start() | 
 |             else: | 
 |                 watchdog = None | 
 |  | 
 |             try: | 
 |                 return f(self, maxsize) | 
 |             finally: | 
 |                 if watchdog: | 
 |                     watchdog.stop() | 
 |  | 
 |         wrapper.size = size | 
 |         wrapper.memuse = memuse | 
 |         return wrapper | 
 |     return decorator | 
 |  | 
 | def bigaddrspacetest(f): | 
 |     """Decorator for tests that fill the address space.""" | 
 |     def wrapper(self): | 
 |         if max_memuse < MAX_Py_ssize_t: | 
 |             if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31: | 
 |                 raise unittest.SkipTest( | 
 |                     "not enough memory: try a 32-bit build instead") | 
 |             else: | 
 |                 raise unittest.SkipTest( | 
 |                     "not enough memory: %.1fG minimum needed" | 
 |                     % (MAX_Py_ssize_t / (1024 ** 3))) | 
 |         else: | 
 |             return f(self) | 
 |     return wrapper | 
 |  | 
 | #======================================================================= | 
 | # unittest integration. | 
 |  | 
 | class BasicTestRunner: | 
 |     def run(self, test): | 
 |         result = unittest.TestResult() | 
 |         test(result) | 
 |         return result | 
 |  | 
 | def _id(obj): | 
 |     return obj | 
 |  | 
 | def requires_resource(resource): | 
 |     if resource == 'gui' and not _is_gui_available(): | 
 |         return unittest.skip("resource 'gui' is not available") | 
 |     if is_resource_enabled(resource): | 
 |         return _id | 
 |     else: | 
 |         return unittest.skip("resource {0!r} is not enabled".format(resource)) | 
 |  | 
 | def cpython_only(test): | 
 |     """ | 
 |     Decorator for tests only applicable on CPython. | 
 |     """ | 
 |     return impl_detail(cpython=True)(test) | 
 |  | 
 | def impl_detail(msg=None, **guards): | 
 |     if check_impl_detail(**guards): | 
 |         return _id | 
 |     if msg is None: | 
 |         guardnames, default = _parse_guards(guards) | 
 |         if default: | 
 |             msg = "implementation detail not available on {0}" | 
 |         else: | 
 |             msg = "implementation detail specific to {0}" | 
 |         guardnames = sorted(guardnames.keys()) | 
 |         msg = msg.format(' or '.join(guardnames)) | 
 |     return unittest.skip(msg) | 
 |  | 
 | def _parse_guards(guards): | 
 |     # Returns a tuple ({platform_name: run_me}, default_value) | 
 |     if not guards: | 
 |         return ({'cpython': True}, False) | 
 |     is_true = list(guards.values())[0] | 
 |     assert list(guards.values()) == [is_true] * len(guards)   # all True or all False | 
 |     return (guards, not is_true) | 
 |  | 
 | # Use the following check to guard CPython's implementation-specific tests -- | 
 | # or to run them only on the implementation(s) guarded by the arguments. | 
 | def check_impl_detail(**guards): | 
 |     """This function returns True or False depending on the host platform. | 
 |        Examples: | 
 |           if check_impl_detail():               # only on CPython (default) | 
 |           if check_impl_detail(jython=True):    # only on Jython | 
 |           if check_impl_detail(cpython=False):  # everywhere except on CPython | 
 |     """ | 
 |     guards, default = _parse_guards(guards) | 
 |     return guards.get(platform.python_implementation().lower(), default) | 
 |  | 
 |  | 
 | def no_tracing(func): | 
 |     """Decorator to temporarily turn off tracing for the duration of a test.""" | 
 |     if not hasattr(sys, 'gettrace'): | 
 |         return func | 
 |     else: | 
 |         @functools.wraps(func) | 
 |         def wrapper(*args, **kwargs): | 
 |             original_trace = sys.gettrace() | 
 |             try: | 
 |                 sys.settrace(None) | 
 |                 return func(*args, **kwargs) | 
 |             finally: | 
 |                 sys.settrace(original_trace) | 
 |         return wrapper | 
 |  | 
 |  | 
 | def refcount_test(test): | 
 |     """Decorator for tests which involve reference counting. | 
 |  | 
 |     To start, the decorator does not run the test if is not run by CPython. | 
 |     After that, any trace function is unset during the test to prevent | 
 |     unexpected refcounts caused by the trace function. | 
 |  | 
 |     """ | 
 |     return no_tracing(cpython_only(test)) | 
 |  | 
 |  | 
 | def _filter_suite(suite, pred): | 
 |     """Recursively filter test cases in a suite based on a predicate.""" | 
 |     newtests = [] | 
 |     for test in suite._tests: | 
 |         if isinstance(test, unittest.TestSuite): | 
 |             _filter_suite(test, pred) | 
 |             newtests.append(test) | 
 |         else: | 
 |             if pred(test): | 
 |                 newtests.append(test) | 
 |     suite._tests = newtests | 
 |  | 
 | def _run_suite(suite): | 
 |     """Run tests from a unittest.TestSuite-derived class.""" | 
 |     if verbose: | 
 |         runner = unittest.TextTestRunner(sys.stdout, verbosity=2, | 
 |                                          failfast=failfast) | 
 |     else: | 
 |         runner = BasicTestRunner() | 
 |  | 
 |     result = runner.run(suite) | 
 |     if not result.wasSuccessful(): | 
 |         if len(result.errors) == 1 and not result.failures: | 
 |             err = result.errors[0][1] | 
 |         elif len(result.failures) == 1 and not result.errors: | 
 |             err = result.failures[0][1] | 
 |         else: | 
 |             err = "multiple errors occurred" | 
 |             if not verbose: err += "; run in verbose mode for details" | 
 |         raise TestFailed(err) | 
 |  | 
 |  | 
 | def run_unittest(*classes): | 
 |     """Run tests from unittest.TestCase-derived classes.""" | 
 |     valid_types = (unittest.TestSuite, unittest.TestCase) | 
 |     suite = unittest.TestSuite() | 
 |     for cls in classes: | 
 |         if isinstance(cls, str): | 
 |             if cls in sys.modules: | 
 |                 suite.addTest(unittest.findTestCases(sys.modules[cls])) | 
 |             else: | 
 |                 raise ValueError("str arguments must be keys in sys.modules") | 
 |         elif isinstance(cls, valid_types): | 
 |             suite.addTest(cls) | 
 |         else: | 
 |             suite.addTest(unittest.makeSuite(cls)) | 
 |     def case_pred(test): | 
 |         if match_tests is None: | 
 |             return True | 
 |         for name in test.id().split("."): | 
 |             if fnmatch.fnmatchcase(name, match_tests): | 
 |                 return True | 
 |         return False | 
 |     _filter_suite(suite, case_pred) | 
 |     _run_suite(suite) | 
 |  | 
 |  | 
 | #======================================================================= | 
 | # doctest driver. | 
 |  | 
 | def run_doctest(module, verbosity=None, optionflags=0): | 
 |     """Run doctest on the given module.  Return (#failures, #tests). | 
 |  | 
 |     If optional argument verbosity is not specified (or is None), pass | 
 |     support's belief about verbosity on to doctest.  Else doctest's | 
 |     usual behavior is used (it searches sys.argv for -v). | 
 |     """ | 
 |  | 
 |     import doctest | 
 |  | 
 |     if verbosity is None: | 
 |         verbosity = verbose | 
 |     else: | 
 |         verbosity = None | 
 |  | 
 |     f, t = doctest.testmod(module, verbose=verbosity, optionflags=optionflags) | 
 |     if f: | 
 |         raise TestFailed("%d of %d doctests failed" % (f, t)) | 
 |     if verbose: | 
 |         print('doctest (%s) ... %d tests with zero failures' % | 
 |               (module.__name__, t)) | 
 |     return f, t | 
 |  | 
 |  | 
 | #======================================================================= | 
 | # Support for saving and restoring the imported modules. | 
 |  | 
 | def modules_setup(): | 
 |     return sys.modules.copy(), | 
 |  | 
 | def modules_cleanup(oldmodules): | 
 |     # Encoders/decoders are registered permanently within the internal | 
 |     # codec cache. If we destroy the corresponding modules their | 
 |     # globals will be set to None which will trip up the cached functions. | 
 |     encodings = [(k, v) for k, v in sys.modules.items() | 
 |                  if k.startswith('encodings.')] | 
 |     sys.modules.clear() | 
 |     sys.modules.update(encodings) | 
 |     # XXX: This kind of problem can affect more than just encodings. In particular | 
 |     # extension modules (such as _ssl) don't cope with reloading properly. | 
 |     # Really, test modules should be cleaning out the test specific modules they | 
 |     # know they added (ala test_runpy) rather than relying on this function (as | 
 |     # test_importhooks and test_pkg do currently). | 
 |     # Implicitly imported *real* modules should be left alone (see issue 10556). | 
 |     sys.modules.update(oldmodules) | 
 |  | 
 | #======================================================================= | 
 | # Threading support to prevent reporting refleaks when running regrtest.py -R | 
 |  | 
 | # NOTE: we use thread._count() rather than threading.enumerate() (or the | 
 | # moral equivalent thereof) because a threading.Thread object is still alive | 
 | # until its __bootstrap() method has returned, even after it has been | 
 | # unregistered from the threading module. | 
 | # thread._count(), on the other hand, only gets decremented *after* the | 
 | # __bootstrap() method has returned, which gives us reliable reference counts | 
 | # at the end of a test run. | 
 |  | 
 | def threading_setup(): | 
 |     if _thread: | 
 |         return _thread._count(), threading._dangling.copy() | 
 |     else: | 
 |         return 1, () | 
 |  | 
 | def threading_cleanup(*original_values): | 
 |     if not _thread: | 
 |         return | 
 |     _MAX_COUNT = 10 | 
 |     for count in range(_MAX_COUNT): | 
 |         values = _thread._count(), threading._dangling | 
 |         if values == original_values: | 
 |             break | 
 |         time.sleep(0.1) | 
 |         gc_collect() | 
 |     # XXX print a warning in case of failure? | 
 |  | 
 | def reap_threads(func): | 
 |     """Use this function when threads are being used.  This will | 
 |     ensure that the threads are cleaned up even when the test fails. | 
 |     If threading is unavailable this function does nothing. | 
 |     """ | 
 |     if not _thread: | 
 |         return func | 
 |  | 
 |     @functools.wraps(func) | 
 |     def decorator(*args): | 
 |         key = threading_setup() | 
 |         try: | 
 |             return func(*args) | 
 |         finally: | 
 |             threading_cleanup(*key) | 
 |     return decorator | 
 |  | 
 | def reap_children(): | 
 |     """Use this function at the end of test_main() whenever sub-processes | 
 |     are started.  This will help ensure that no extra children (zombies) | 
 |     stick around to hog resources and create problems when looking | 
 |     for refleaks. | 
 |     """ | 
 |  | 
 |     # Reap all our dead child processes so we don't leave zombies around. | 
 |     # These hog resources and might be causing some of the buildbots to die. | 
 |     if hasattr(os, 'waitpid'): | 
 |         any_process = -1 | 
 |         while True: | 
 |             try: | 
 |                 # This will raise an exception on Windows.  That's ok. | 
 |                 pid, status = os.waitpid(any_process, os.WNOHANG) | 
 |                 if pid == 0: | 
 |                     break | 
 |             except: | 
 |                 break | 
 |  | 
 | @contextlib.contextmanager | 
 | def swap_attr(obj, attr, new_val): | 
 |     """Temporary swap out an attribute with a new object. | 
 |  | 
 |     Usage: | 
 |         with swap_attr(obj, "attr", 5): | 
 |             ... | 
 |  | 
 |         This will set obj.attr to 5 for the duration of the with: block, | 
 |         restoring the old value at the end of the block. If `attr` doesn't | 
 |         exist on `obj`, it will be created and then deleted at the end of the | 
 |         block. | 
 |     """ | 
 |     if hasattr(obj, attr): | 
 |         real_val = getattr(obj, attr) | 
 |         setattr(obj, attr, new_val) | 
 |         try: | 
 |             yield | 
 |         finally: | 
 |             setattr(obj, attr, real_val) | 
 |     else: | 
 |         setattr(obj, attr, new_val) | 
 |         try: | 
 |             yield | 
 |         finally: | 
 |             delattr(obj, attr) | 
 |  | 
 | @contextlib.contextmanager | 
 | def swap_item(obj, item, new_val): | 
 |     """Temporary swap out an item with a new object. | 
 |  | 
 |     Usage: | 
 |         with swap_item(obj, "item", 5): | 
 |             ... | 
 |  | 
 |         This will set obj["item"] to 5 for the duration of the with: block, | 
 |         restoring the old value at the end of the block. If `item` doesn't | 
 |         exist on `obj`, it will be created and then deleted at the end of the | 
 |         block. | 
 |     """ | 
 |     if item in obj: | 
 |         real_val = obj[item] | 
 |         obj[item] = new_val | 
 |         try: | 
 |             yield | 
 |         finally: | 
 |             obj[item] = real_val | 
 |     else: | 
 |         obj[item] = new_val | 
 |         try: | 
 |             yield | 
 |         finally: | 
 |             del obj[item] | 
 |  | 
 | def strip_python_stderr(stderr): | 
 |     """Strip the stderr of a Python process from potential debug output | 
 |     emitted by the interpreter. | 
 |  | 
 |     This will typically be run on the result of the communicate() method | 
 |     of a subprocess.Popen object. | 
 |     """ | 
 |     stderr = re.sub(br"\[\d+ refs\]\r?\n?", b"", stderr).strip() | 
 |     return stderr | 
 |  | 
 | def args_from_interpreter_flags(): | 
 |     """Return a list of command-line arguments reproducing the current | 
 |     settings in sys.flags and sys.warnoptions.""" | 
 |     return subprocess._args_from_interpreter_flags() | 
 |  | 
 | #============================================================ | 
 | # Support for assertions about logging. | 
 | #============================================================ | 
 |  | 
 | class TestHandler(logging.handlers.BufferingHandler): | 
 |     def __init__(self, matcher): | 
 |         # BufferingHandler takes a "capacity" argument | 
 |         # so as to know when to flush. As we're overriding | 
 |         # shouldFlush anyway, we can set a capacity of zero. | 
 |         # You can call flush() manually to clear out the | 
 |         # buffer. | 
 |         logging.handlers.BufferingHandler.__init__(self, 0) | 
 |         self.matcher = matcher | 
 |  | 
 |     def shouldFlush(self): | 
 |         return False | 
 |  | 
 |     def emit(self, record): | 
 |         self.format(record) | 
 |         self.buffer.append(record.__dict__) | 
 |  | 
 |     def matches(self, **kwargs): | 
 |         """ | 
 |         Look for a saved dict whose keys/values match the supplied arguments. | 
 |         """ | 
 |         result = False | 
 |         for d in self.buffer: | 
 |             if self.matcher.matches(d, **kwargs): | 
 |                 result = True | 
 |                 break | 
 |         return result | 
 |  | 
 | class Matcher(object): | 
 |  | 
 |     _partial_matches = ('msg', 'message') | 
 |  | 
 |     def matches(self, d, **kwargs): | 
 |         """ | 
 |         Try to match a single dict with the supplied arguments. | 
 |  | 
 |         Keys whose values are strings and which are in self._partial_matches | 
 |         will be checked for partial (i.e. substring) matches. You can extend | 
 |         this scheme to (for example) do regular expression matching, etc. | 
 |         """ | 
 |         result = True | 
 |         for k in kwargs: | 
 |             v = kwargs[k] | 
 |             dv = d.get(k) | 
 |             if not self.match_value(k, dv, v): | 
 |                 result = False | 
 |                 break | 
 |         return result | 
 |  | 
 |     def match_value(self, k, dv, v): | 
 |         """ | 
 |         Try to match a single stored value (dv) with a supplied value (v). | 
 |         """ | 
 |         if type(v) != type(dv): | 
 |             result = False | 
 |         elif type(dv) is not str or k not in self._partial_matches: | 
 |             result = (v == dv) | 
 |         else: | 
 |             result = dv.find(v) >= 0 | 
 |         return result | 
 |  | 
 |  | 
 | _can_symlink = None | 
 | def can_symlink(): | 
 |     global _can_symlink | 
 |     if _can_symlink is not None: | 
 |         return _can_symlink | 
 |     symlink_path = TESTFN + "can_symlink" | 
 |     try: | 
 |         os.symlink(TESTFN, symlink_path) | 
 |         can = True | 
 |     except (OSError, NotImplementedError, AttributeError): | 
 |         can = False | 
 |     else: | 
 |         os.remove(symlink_path) | 
 |     _can_symlink = can | 
 |     return can | 
 |  | 
 | def skip_unless_symlink(test): | 
 |     """Skip decorator for tests that require functional symlink""" | 
 |     ok = can_symlink() | 
 |     msg = "Requires functional symlink implementation" | 
 |     return test if ok else unittest.skip(msg)(test) | 
 |  | 
 | _can_xattr = None | 
 | def can_xattr(): | 
 |     global _can_xattr | 
 |     if _can_xattr is not None: | 
 |         return _can_xattr | 
 |     if not hasattr(os, "setxattr"): | 
 |         can = False | 
 |     else: | 
 |         tmp_fp, tmp_name = tempfile.mkstemp() | 
 |         try: | 
 |             with open(TESTFN, "wb") as fp: | 
 |                 try: | 
 |                     # TESTFN & tempfile may use different file systems with | 
 |                     # different capabilities | 
 |                     os.setxattr(tmp_fp, b"user.test", b"") | 
 |                     os.setxattr(fp.fileno(), b"user.test", b"") | 
 |                     # Kernels < 2.6.39 don't respect setxattr flags. | 
 |                     kernel_version = platform.release() | 
 |                     m = re.match("2.6.(\d{1,2})", kernel_version) | 
 |                     can = m is None or int(m.group(1)) >= 39 | 
 |                 except OSError: | 
 |                     can = False | 
 |         finally: | 
 |             unlink(TESTFN) | 
 |             unlink(tmp_name) | 
 |     _can_xattr = can | 
 |     return can | 
 |  | 
 | def skip_unless_xattr(test): | 
 |     """Skip decorator for tests that require functional extended attributes""" | 
 |     ok = can_xattr() | 
 |     msg = "no non-broken extended attribute support" | 
 |     return test if ok else unittest.skip(msg)(test) | 
 |  | 
 | def patch(test_instance, object_to_patch, attr_name, new_value): | 
 |     """Override 'object_to_patch'.'attr_name' with 'new_value'. | 
 |  | 
 |     Also, add a cleanup procedure to 'test_instance' to restore | 
 |     'object_to_patch' value for 'attr_name'. | 
 |     The 'attr_name' should be a valid attribute for 'object_to_patch'. | 
 |  | 
 |     """ | 
 |     # check that 'attr_name' is a real attribute for 'object_to_patch' | 
 |     # will raise AttributeError if it does not exist | 
 |     getattr(object_to_patch, attr_name) | 
 |  | 
 |     # keep a copy of the old value | 
 |     attr_is_local = False | 
 |     try: | 
 |         old_value = object_to_patch.__dict__[attr_name] | 
 |     except (AttributeError, KeyError): | 
 |         old_value = getattr(object_to_patch, attr_name, None) | 
 |     else: | 
 |         attr_is_local = True | 
 |  | 
 |     # restore the value when the test is done | 
 |     def cleanup(): | 
 |         if attr_is_local: | 
 |             setattr(object_to_patch, attr_name, old_value) | 
 |         else: | 
 |             delattr(object_to_patch, attr_name) | 
 |  | 
 |     test_instance.addCleanup(cleanup) | 
 |  | 
 |     # actually override the attribute | 
 |     setattr(object_to_patch, attr_name, new_value) |