| """Supporting definitions for the Python regression tests.""" | 
 |  | 
 | if __name__ != 'test.support': | 
 |     raise ImportError('support must be imported from the test package') | 
 |  | 
 | import contextlib | 
 | import errno | 
 | import socket | 
 | import sys | 
 | import os | 
 | import os.path | 
 | import shutil | 
 | import warnings | 
 | import unittest | 
 |  | 
 | __all__ = ["Error", "TestFailed", "TestSkipped", "ResourceDenied", "import_module", | 
 |            "verbose", "use_resources", "max_memuse", "record_original_stdout", | 
 |            "get_original_stdout", "unload", "unlink", "rmtree", "forget", | 
 |            "is_resource_enabled", "requires", "find_unused_port", "bind_port", | 
 |            "fcmp", "is_jython", "TESTFN", "HOST", "FUZZ", "findfile", "verify", | 
 |            "vereq", "sortdict", "check_syntax_error", "open_urlresource", | 
 |            "catch_warning", "CleanImport", "EnvironmentVarGuard", | 
 |            "TransientResource", "captured_output", "captured_stdout", | 
 |            "TransientResource", "transient_internet", "run_with_locale", | 
 |            "set_memlimit", "bigmemtest", "bigaddrspacetest", "BasicTestRunner", | 
 |            "run_unittest", "run_doctest", "threading_setup", "threading_cleanup", | 
 |            "reap_children"] | 
 |  | 
 | class Error(Exception): | 
 |     """Base class for regression test exceptions.""" | 
 |  | 
 | class TestFailed(Error): | 
 |     """Test failed.""" | 
 |  | 
 | class TestSkipped(Error): | 
 |     """Test skipped. | 
 |  | 
 |     This can be raised to indicate that a test was deliberatly | 
 |     skipped, but not because a feature wasn't available.  For | 
 |     example, if some resource can't be used, such as the network | 
 |     appears to be unavailable, this should be raised instead of | 
 |     TestFailed. | 
 |     """ | 
 |  | 
 | class ResourceDenied(TestSkipped): | 
 |     """Test skipped because it requested a disallowed resource. | 
 |  | 
 |     This is raised when a test calls requires() for a resource that | 
 |     has not be enabled.  It is used to distinguish between expected | 
 |     and unexpected skips. | 
 |     """ | 
 |  | 
 | def import_module(name, deprecated=False): | 
 |     """Import the module to be tested, raising TestSkipped if it is not | 
 |     available.""" | 
 |     with catch_warning(record=False): | 
 |         if deprecated: | 
 |             warnings.filterwarnings("ignore", ".+ (module|package)", | 
 |                                     DeprecationWarning) | 
 |         try: | 
 |             module = __import__(name, level=0) | 
 |         except ImportError: | 
 |             raise TestSkipped("No module named " + name) | 
 |         else: | 
 |             return module | 
 |  | 
 | verbose = 1              # Flag set to 0 by regrtest.py | 
 | use_resources = None     # Flag set to [] by regrtest.py | 
 | max_memuse = 0           # Disable bigmem tests (they will still be run with | 
 |                          # small sizes, to make sure they work.) | 
 | real_max_memuse = 0 | 
 |  | 
 | # _original_stdout is meant to hold stdout at the time regrtest began. | 
 | # This may be "the real" stdout, or IDLE's emulation of stdout, or whatever. | 
 | # The point is to have some flavor of stdout the user can actually see. | 
 | _original_stdout = None | 
 | def record_original_stdout(stdout): | 
 |     global _original_stdout | 
 |     _original_stdout = stdout | 
 |  | 
 | def get_original_stdout(): | 
 |     return _original_stdout or sys.stdout | 
 |  | 
 | def unload(name): | 
 |     try: | 
 |         del sys.modules[name] | 
 |     except KeyError: | 
 |         pass | 
 |  | 
 | def unlink(filename): | 
 |     try: | 
 |         os.unlink(filename) | 
 |     except OSError: | 
 |         pass | 
 |  | 
 | def rmtree(path): | 
 |     try: | 
 |         shutil.rmtree(path) | 
 |     except OSError as e: | 
 |         # Unix returns ENOENT, Windows returns ESRCH. | 
 |         if e.errno not in (errno.ENOENT, errno.ESRCH): | 
 |             raise | 
 |  | 
 | def forget(modname): | 
 |     '''"Forget" a module was ever imported by removing it from sys.modules and | 
 |     deleting any .pyc and .pyo files.''' | 
 |     unload(modname) | 
 |     for dirname in sys.path: | 
 |         unlink(os.path.join(dirname, modname + '.pyc')) | 
 |         # Deleting the .pyo file cannot be within the 'try' for the .pyc since | 
 |         # the chance exists that there is no .pyc (and thus the 'try' statement | 
 |         # is exited) but there is a .pyo file. | 
 |         unlink(os.path.join(dirname, modname + '.pyo')) | 
 |  | 
 | def is_resource_enabled(resource): | 
 |     """Test whether a resource is enabled.  Known resources are set by | 
 |     regrtest.py.""" | 
 |     return use_resources is not None and resource in use_resources | 
 |  | 
 | def requires(resource, msg=None): | 
 |     """Raise ResourceDenied if the specified resource is not available. | 
 |  | 
 |     If the caller's module is __main__ then automatically return True.  The | 
 |     possibility of False being returned occurs when regrtest.py is executing.""" | 
 |     # see if the caller's module is __main__ - if so, treat as if | 
 |     # the resource was set | 
 |     if sys._getframe().f_back.f_globals.get("__name__") == "__main__": | 
 |         return | 
 |     if not is_resource_enabled(resource): | 
 |         if msg is None: | 
 |             msg = "Use of the `%s' resource not enabled" % resource | 
 |         raise ResourceDenied(msg) | 
 |  | 
 | HOST = 'localhost' | 
 |  | 
 | def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM): | 
 |     """Returns an unused port that should be suitable for binding.  This is | 
 |     achieved by creating a temporary socket with the same family and type as | 
 |     the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to | 
 |     the specified host address (defaults to 0.0.0.0) with the port set to 0, | 
 |     eliciting an unused ephemeral port from the OS.  The temporary socket is | 
 |     then closed and deleted, and the ephemeral port is returned. | 
 |  | 
 |     Either this method or bind_port() should be used for any tests where a | 
 |     server socket needs to be bound to a particular port for the duration of | 
 |     the test.  Which one to use depends on whether the calling code is creating | 
 |     a python socket, or if an unused port needs to be provided in a constructor | 
 |     or passed to an external program (i.e. the -accept argument to openssl's | 
 |     s_server mode).  Always prefer bind_port() over find_unused_port() where | 
 |     possible.  Hard coded ports should *NEVER* be used.  As soon as a server | 
 |     socket is bound to a hard coded port, the ability to run multiple instances | 
 |     of the test simultaneously on the same host is compromised, which makes the | 
 |     test a ticking time bomb in a buildbot environment. On Unix buildbots, this | 
 |     may simply manifest as a failed test, which can be recovered from without | 
 |     intervention in most cases, but on Windows, the entire python process can | 
 |     completely and utterly wedge, requiring someone to log in to the buildbot | 
 |     and manually kill the affected process. | 
 |  | 
 |     (This is easy to reproduce on Windows, unfortunately, and can be traced to | 
 |     the SO_REUSEADDR socket option having different semantics on Windows versus | 
 |     Unix/Linux.  On Unix, you can't have two AF_INET SOCK_STREAM sockets bind, | 
 |     listen and then accept connections on identical host/ports.  An EADDRINUSE | 
 |     socket.error will be raised at some point (depending on the platform and | 
 |     the order bind and listen were called on each socket). | 
 |  | 
 |     However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE | 
 |     will ever be raised when attempting to bind two identical host/ports. When | 
 |     accept() is called on each socket, the second caller's process will steal | 
 |     the port from the first caller, leaving them both in an awkwardly wedged | 
 |     state where they'll no longer respond to any signals or graceful kills, and | 
 |     must be forcibly killed via OpenProcess()/TerminateProcess(). | 
 |  | 
 |     The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option | 
 |     instead of SO_REUSEADDR, which effectively affords the same semantics as | 
 |     SO_REUSEADDR on Unix.  Given the propensity of Unix developers in the Open | 
 |     Source world compared to Windows ones, this is a common mistake.  A quick | 
 |     look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when | 
 |     openssl.exe is called with the 's_server' option, for example. See | 
 |     http://bugs.python.org/issue2550 for more info.  The following site also | 
 |     has a very thorough description about the implications of both REUSEADDR | 
 |     and EXCLUSIVEADDRUSE on Windows: | 
 |     http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx) | 
 |  | 
 |     XXX: although this approach is a vast improvement on previous attempts to | 
 |     elicit unused ports, it rests heavily on the assumption that the ephemeral | 
 |     port returned to us by the OS won't immediately be dished back out to some | 
 |     other process when we close and delete our temporary socket but before our | 
 |     calling code has a chance to bind the returned port.  We can deal with this | 
 |     issue if/when we come across it. | 
 |     """ | 
 |  | 
 |     tempsock = socket.socket(family, socktype) | 
 |     port = bind_port(tempsock) | 
 |     tempsock.close() | 
 |     del tempsock | 
 |     return port | 
 |  | 
 | def bind_port(sock, host=HOST): | 
 |     """Bind the socket to a free port and return the port number.  Relies on | 
 |     ephemeral ports in order to ensure we are using an unbound port.  This is | 
 |     important as many tests may be running simultaneously, especially in a | 
 |     buildbot environment.  This method raises an exception if the sock.family | 
 |     is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR | 
 |     or SO_REUSEPORT set on it.  Tests should *never* set these socket options | 
 |     for TCP/IP sockets.  The only case for setting these options is testing | 
 |     multicasting via multiple UDP sockets. | 
 |  | 
 |     Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e. | 
 |     on Windows), it will be set on the socket.  This will prevent anyone else | 
 |     from bind()'ing to our host/port for the duration of the test. | 
 |     """ | 
 |  | 
 |     if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM: | 
 |         if hasattr(socket, 'SO_REUSEADDR'): | 
 |             if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1: | 
 |                 raise TestFailed("tests should never set the SO_REUSEADDR "   \ | 
 |                                  "socket option on TCP/IP sockets!") | 
 |         if hasattr(socket, 'SO_REUSEPORT'): | 
 |             if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1: | 
 |                 raise TestFailed("tests should never set the SO_REUSEPORT "   \ | 
 |                                  "socket option on TCP/IP sockets!") | 
 |         if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'): | 
 |             sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1) | 
 |  | 
 |     sock.bind((host, 0)) | 
 |     port = sock.getsockname()[1] | 
 |     return port | 
 |  | 
 | FUZZ = 1e-6 | 
 |  | 
 | def fcmp(x, y): # fuzzy comparison function | 
 |     if isinstance(x, float) or isinstance(y, float): | 
 |         try: | 
 |             fuzz = (abs(x) + abs(y)) * FUZZ | 
 |             if abs(x-y) <= fuzz: | 
 |                 return 0 | 
 |         except: | 
 |             pass | 
 |     elif type(x) == type(y) and isinstance(x, (tuple, list)): | 
 |         for i in range(min(len(x), len(y))): | 
 |             outcome = fcmp(x[i], y[i]) | 
 |             if outcome != 0: | 
 |                 return outcome | 
 |         return (len(x) > len(y)) - (len(x) < len(y)) | 
 |     return (x > y) - (x < y) | 
 |  | 
 | is_jython = sys.platform.startswith('java') | 
 |  | 
 | # Filename used for testing | 
 | if os.name == 'java': | 
 |     # Jython disallows @ in module names | 
 |     TESTFN = '$test' | 
 | else: | 
 |     TESTFN = '@test' | 
 |  | 
 |     # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding() | 
 |     # TESTFN_UNICODE is a filename that can be encoded using the | 
 |     # file system encoding, but *not* with the default (ascii) encoding | 
 |     TESTFN_UNICODE = "@test-\xe0\xf2" | 
 |     TESTFN_ENCODING = sys.getfilesystemencoding() | 
 |     # TESTFN_UNICODE_UNENCODEABLE is a filename that should *not* be | 
 |     # able to be encoded by *either* the default or filesystem encoding. | 
 |     # This test really only makes sense on Windows NT platforms | 
 |     # which have special Unicode support in posixmodule. | 
 |     if (not hasattr(sys, "getwindowsversion") or | 
 |             sys.getwindowsversion()[3] < 2): #  0=win32s or 1=9x/ME | 
 |         TESTFN_UNICODE_UNENCODEABLE = None | 
 |     else: | 
 |         # Japanese characters (I think - from bug 846133) | 
 |         TESTFN_UNICODE_UNENCODEABLE = "@test-\u5171\u6709\u3055\u308c\u308b" | 
 |         try: | 
 |             # XXX - Note - should be using TESTFN_ENCODING here - but for | 
 |             # Windows, "mbcs" currently always operates as if in | 
 |             # errors=ignore' mode - hence we get '?' characters rather than | 
 |             # the exception.  'Latin1' operates as we expect - ie, fails. | 
 |             # See [ 850997 ] mbcs encoding ignores errors | 
 |             TESTFN_UNICODE_UNENCODEABLE.encode("Latin1") | 
 |         except UnicodeEncodeError: | 
 |             pass | 
 |         else: | 
 |             print('WARNING: The filename %r CAN be encoded by the filesystem.  ' | 
 |                   'Unicode filename tests may not be effective' | 
 |                   % TESTFN_UNICODE_UNENCODEABLE) | 
 |  | 
 | # Make sure we can write to TESTFN, try in /tmp if we can't | 
 | fp = None | 
 | try: | 
 |     fp = open(TESTFN, 'w+') | 
 | except IOError: | 
 |     TMP_TESTFN = os.path.join('/tmp', TESTFN) | 
 |     try: | 
 |         fp = open(TMP_TESTFN, 'w+') | 
 |         TESTFN = TMP_TESTFN | 
 |         del TMP_TESTFN | 
 |     except IOError: | 
 |         print(('WARNING: tests will fail, unable to write to: %s or %s' % | 
 |                 (TESTFN, TMP_TESTFN))) | 
 | if fp is not None: | 
 |     fp.close() | 
 |     unlink(TESTFN) | 
 | del fp | 
 |  | 
 | def findfile(file, here=__file__): | 
 |     """Try to find a file on sys.path and the working directory.  If it is not | 
 |     found the argument passed to the function is returned (this does not | 
 |     necessarily signal failure; could still be the legitimate path).""" | 
 |     if os.path.isabs(file): | 
 |         return file | 
 |     path = sys.path | 
 |     path = [os.path.dirname(here)] + path | 
 |     for dn in path: | 
 |         fn = os.path.join(dn, file) | 
 |         if os.path.exists(fn): return fn | 
 |     return file | 
 |  | 
 | def verify(condition, reason='test failed'): | 
 |     """Verify that condition is true. If not, raise TestFailed. | 
 |  | 
 |        The optional argument reason can be given to provide | 
 |        a better error text. | 
 |     """ | 
 |  | 
 |     if not condition: | 
 |         raise TestFailed(reason) | 
 |  | 
 | def vereq(a, b): | 
 |     """Raise TestFailed if a == b is false. | 
 |  | 
 |     This is better than verify(a == b) because, in case of failure, the | 
 |     error message incorporates repr(a) and repr(b) so you can see the | 
 |     inputs. | 
 |  | 
 |     Note that "not (a == b)" isn't necessarily the same as "a != b"; the | 
 |     former is tested. | 
 |     """ | 
 |  | 
 |     if not (a == b): | 
 |         raise TestFailed("%r == %r" % (a, b)) | 
 |  | 
 | def sortdict(dict): | 
 |     "Like repr(dict), but in sorted order." | 
 |     items = sorted(dict.items()) | 
 |     reprpairs = ["%r: %r" % pair for pair in items] | 
 |     withcommas = ", ".join(reprpairs) | 
 |     return "{%s}" % withcommas | 
 |  | 
 | def check_syntax_error(testcase, statement): | 
 |     try: | 
 |         compile(statement, '<test string>', 'exec') | 
 |     except SyntaxError: | 
 |         pass | 
 |     else: | 
 |         testcase.fail('Missing SyntaxError: "%s"' % statement) | 
 |  | 
 | def open_urlresource(url, *args, **kw): | 
 |     import urllib.request, urllib.parse | 
 |  | 
 |     requires('urlfetch') | 
 |     filename = urllib.parse.urlparse(url)[2].split('/')[-1] # '/': it's URL! | 
 |  | 
 |     for path in [os.path.curdir, os.path.pardir]: | 
 |         fn = os.path.join(path, filename) | 
 |         if os.path.exists(fn): | 
 |             return open(fn, *args, **kw) | 
 |  | 
 |     print('\tfetching %s ...' % url, file=get_original_stdout()) | 
 |     fn, _ = urllib.request.urlretrieve(url, filename) | 
 |     return open(fn, *args, **kw) | 
 |  | 
 |  | 
 | def catch_warning(module=warnings, record=True): | 
 |     """Guard the warnings filter from being permanently changed and | 
 |     optionally record the details of any warnings that are issued. | 
 |  | 
 |     Use like this: | 
 |  | 
 |         with catch_warning() as w: | 
 |             warnings.warn("foo") | 
 |             assert str(w.message) == "foo" | 
 |     """ | 
 |     return warnings.catch_warnings(record=record, module=module) | 
 |  | 
 |  | 
 | class CleanImport(object): | 
 |     """Context manager to force import to return a new module reference. | 
 |  | 
 |     This is useful for testing module-level behaviours, such as | 
 |     the emission of a DeprecationWarning on import. | 
 |  | 
 |     Use like this: | 
 |  | 
 |         with CleanImport("foo"): | 
 |             __import__("foo") # new reference | 
 |     """ | 
 |  | 
 |     def __init__(self, *module_names): | 
 |         self.original_modules = sys.modules.copy() | 
 |         for module_name in module_names: | 
 |             if module_name in sys.modules: | 
 |                 module = sys.modules[module_name] | 
 |                 # It is possible that module_name is just an alias for | 
 |                 # another module (e.g. stub for modules renamed in 3.x). | 
 |                 # In that case, we also need delete the real module to clear | 
 |                 # the import cache. | 
 |                 if module.__name__ != module_name: | 
 |                     del sys.modules[module.__name__] | 
 |                 del sys.modules[module_name] | 
 |  | 
 |     def __enter__(self): | 
 |         return self | 
 |  | 
 |     def __exit__(self, *ignore_exc): | 
 |         sys.modules.update(self.original_modules) | 
 |  | 
 |  | 
 | class EnvironmentVarGuard(object): | 
 |  | 
 |     """Class to help protect the environment variable properly.  Can be used as | 
 |     a context manager.""" | 
 |  | 
 |     def __init__(self): | 
 |         self._environ = os.environ | 
 |         self._unset = set() | 
 |         self._reset = dict() | 
 |  | 
 |     def set(self, envvar, value): | 
 |         if envvar not in self._environ: | 
 |             self._unset.add(envvar) | 
 |         else: | 
 |             self._reset[envvar] = self._environ[envvar] | 
 |         self._environ[envvar] = value | 
 |  | 
 |     def unset(self, envvar): | 
 |         if envvar in self._environ: | 
 |             self._reset[envvar] = self._environ[envvar] | 
 |             del self._environ[envvar] | 
 |  | 
 |     def __enter__(self): | 
 |         return self | 
 |  | 
 |     def __exit__(self, *ignore_exc): | 
 |         for envvar, value in self._reset.items(): | 
 |             self._environ[envvar] = value | 
 |         for unset in self._unset: | 
 |             del self._environ[unset] | 
 |  | 
 | class TransientResource(object): | 
 |  | 
 |     """Raise ResourceDenied if an exception is raised while the context manager | 
 |     is in effect that matches the specified exception and attributes.""" | 
 |  | 
 |     def __init__(self, exc, **kwargs): | 
 |         self.exc = exc | 
 |         self.attrs = kwargs | 
 |  | 
 |     def __enter__(self): | 
 |         return self | 
 |  | 
 |     def __exit__(self, type_=None, value=None, traceback=None): | 
 |         """If type_ is a subclass of self.exc and value has attributes matching | 
 |         self.attrs, raise ResourceDenied.  Otherwise let the exception | 
 |         propagate (if any).""" | 
 |         if type_ is not None and issubclass(self.exc, type_): | 
 |             for attr, attr_value in self.attrs.items(): | 
 |                 if not hasattr(value, attr): | 
 |                     break | 
 |                 if getattr(value, attr) != attr_value: | 
 |                     break | 
 |             else: | 
 |                 raise ResourceDenied("an optional resource is not available") | 
 |  | 
 |  | 
 | def transient_internet(): | 
 |     """Return a context manager that raises ResourceDenied when various issues | 
 |     with the Internet connection manifest themselves as exceptions.""" | 
 |     time_out = TransientResource(IOError, errno=errno.ETIMEDOUT) | 
 |     socket_peer_reset = TransientResource(socket.error, errno=errno.ECONNRESET) | 
 |     ioerror_peer_reset = TransientResource(IOError, errno=errno.ECONNRESET) | 
 |     return contextlib.nested(time_out, socket_peer_reset, ioerror_peer_reset) | 
 |  | 
 |  | 
 | @contextlib.contextmanager | 
 | def captured_output(stream_name): | 
 |     """Run the 'with' statement body using a StringIO object in place of a | 
 |     specific attribute on the sys module. | 
 |     Example use (with 'stream_name=stdout'):: | 
 |  | 
 |        with captured_stdout() as s: | 
 |            print("hello") | 
 |        assert s.getvalue() == "hello" | 
 |     """ | 
 |     import io | 
 |     orig_stdout = getattr(sys, stream_name) | 
 |     setattr(sys, stream_name, io.StringIO()) | 
 |     try: | 
 |         yield getattr(sys, stream_name) | 
 |     finally: | 
 |         setattr(sys, stream_name, orig_stdout) | 
 |  | 
 | def captured_stdout(): | 
 |     return captured_output("stdout") | 
 |  | 
 |  | 
 | #======================================================================= | 
 | # Decorator for running a function in a different locale, correctly resetting | 
 | # it afterwards. | 
 |  | 
 | def run_with_locale(catstr, *locales): | 
 |     def decorator(func): | 
 |         def inner(*args, **kwds): | 
 |             try: | 
 |                 import locale | 
 |                 category = getattr(locale, catstr) | 
 |                 orig_locale = locale.setlocale(category) | 
 |             except AttributeError: | 
 |                 # if the test author gives us an invalid category string | 
 |                 raise | 
 |             except: | 
 |                 # cannot retrieve original locale, so do nothing | 
 |                 locale = orig_locale = None | 
 |             else: | 
 |                 for loc in locales: | 
 |                     try: | 
 |                         locale.setlocale(category, loc) | 
 |                         break | 
 |                     except: | 
 |                         pass | 
 |  | 
 |             # now run the function, resetting the locale on exceptions | 
 |             try: | 
 |                 return func(*args, **kwds) | 
 |             finally: | 
 |                 if locale and orig_locale: | 
 |                     locale.setlocale(category, orig_locale) | 
 |         inner.__name__ = func.__name__ | 
 |         inner.__doc__ = func.__doc__ | 
 |         return inner | 
 |     return decorator | 
 |  | 
 | #======================================================================= | 
 | # Big-memory-test support. Separate from 'resources' because memory use | 
 | # should be configurable. | 
 |  | 
 | # Some handy shorthands. Note that these are used for byte-limits as well | 
 | # as size-limits, in the various bigmem tests | 
 | _1M = 1024*1024 | 
 | _1G = 1024 * _1M | 
 | _2G = 2 * _1G | 
 | _4G = 4 * _1G | 
 |  | 
 | MAX_Py_ssize_t = sys.maxsize | 
 |  | 
 | def set_memlimit(limit): | 
 |     import re | 
 |     global max_memuse | 
 |     global real_max_memuse | 
 |     sizes = { | 
 |         'k': 1024, | 
 |         'm': _1M, | 
 |         'g': _1G, | 
 |         't': 1024*_1G, | 
 |     } | 
 |     m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit, | 
 |                  re.IGNORECASE | re.VERBOSE) | 
 |     if m is None: | 
 |         raise ValueError('Invalid memory limit %r' % (limit,)) | 
 |     memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()]) | 
 |     real_max_memuse = memlimit | 
 |     if memlimit > MAX_Py_ssize_t: | 
 |         memlimit = MAX_Py_ssize_t | 
 |     if memlimit < _2G - 1: | 
 |         raise ValueError('Memory limit %r too low to be useful' % (limit,)) | 
 |     max_memuse = memlimit | 
 |  | 
 | def bigmemtest(minsize, memuse, overhead=5*_1M): | 
 |     """Decorator for bigmem tests. | 
 |  | 
 |     'minsize' is the minimum useful size for the test (in arbitrary, | 
 |     test-interpreted units.) 'memuse' is the number of 'bytes per size' for | 
 |     the test, or a good estimate of it. 'overhead' specifies fixed overhead, | 
 |     independent of the testsize, and defaults to 5Mb. | 
 |  | 
 |     The decorator tries to guess a good value for 'size' and passes it to | 
 |     the decorated test function. If minsize * memuse is more than the | 
 |     allowed memory use (as defined by max_memuse), the test is skipped. | 
 |     Otherwise, minsize is adjusted upward to use up to max_memuse. | 
 |     """ | 
 |     def decorator(f): | 
 |         def wrapper(self): | 
 |             if not max_memuse: | 
 |                 # If max_memuse is 0 (the default), | 
 |                 # we still want to run the tests with size set to a few kb, | 
 |                 # to make sure they work. We still want to avoid using | 
 |                 # too much memory, though, but we do that noisily. | 
 |                 maxsize = 5147 | 
 |                 self.failIf(maxsize * memuse + overhead > 20 * _1M) | 
 |             else: | 
 |                 maxsize = int((max_memuse - overhead) / memuse) | 
 |                 if maxsize < minsize: | 
 |                     # Really ought to print 'test skipped' or something | 
 |                     if verbose: | 
 |                         sys.stderr.write("Skipping %s because of memory " | 
 |                                          "constraint\n" % (f.__name__,)) | 
 |                     return | 
 |                 # Try to keep some breathing room in memory use | 
 |                 maxsize = max(maxsize - 50 * _1M, minsize) | 
 |             return f(self, maxsize) | 
 |         wrapper.minsize = minsize | 
 |         wrapper.memuse = memuse | 
 |         wrapper.overhead = overhead | 
 |         return wrapper | 
 |     return decorator | 
 |  | 
 | def precisionbigmemtest(size, memuse, overhead=5*_1M): | 
 |     def decorator(f): | 
 |         def wrapper(self): | 
 |             if not real_max_memuse: | 
 |                 maxsize = 5147 | 
 |             else: | 
 |                 maxsize = size | 
 |  | 
 |                 if real_max_memuse and real_max_memuse < maxsize * memuse: | 
 |                     if verbose: | 
 |                         sys.stderr.write("Skipping %s because of memory " | 
 |                                          "constraint\n" % (f.__name__,)) | 
 |                     return | 
 |  | 
 |             return f(self, maxsize) | 
 |         wrapper.size = size | 
 |         wrapper.memuse = memuse | 
 |         wrapper.overhead = overhead | 
 |         return wrapper | 
 |     return decorator | 
 |  | 
 | def bigaddrspacetest(f): | 
 |     """Decorator for tests that fill the address space.""" | 
 |     def wrapper(self): | 
 |         if max_memuse < MAX_Py_ssize_t: | 
 |             if verbose: | 
 |                 sys.stderr.write("Skipping %s because of memory " | 
 |                                  "constraint\n" % (f.__name__,)) | 
 |         else: | 
 |             return f(self) | 
 |     return wrapper | 
 |  | 
 | #======================================================================= | 
 | # unittest integration. | 
 |  | 
 | class BasicTestRunner: | 
 |     def run(self, test): | 
 |         result = unittest.TestResult() | 
 |         test(result) | 
 |         return result | 
 |  | 
 |  | 
 | def _run_suite(suite): | 
 |     """Run tests from a unittest.TestSuite-derived class.""" | 
 |     if verbose: | 
 |         runner = unittest.TextTestRunner(sys.stdout, verbosity=2) | 
 |     else: | 
 |         runner = BasicTestRunner() | 
 |  | 
 |     result = runner.run(suite) | 
 |     if not result.wasSuccessful(): | 
 |         if len(result.errors) == 1 and not result.failures: | 
 |             err = result.errors[0][1] | 
 |         elif len(result.failures) == 1 and not result.errors: | 
 |             err = result.failures[0][1] | 
 |         else: | 
 |             err = "errors occurred; run in verbose mode for details" | 
 |         raise TestFailed(err) | 
 |  | 
 |  | 
 | def run_unittest(*classes): | 
 |     """Run tests from unittest.TestCase-derived classes.""" | 
 |     valid_types = (unittest.TestSuite, unittest.TestCase) | 
 |     suite = unittest.TestSuite() | 
 |     for cls in classes: | 
 |         if isinstance(cls, str): | 
 |             if cls in sys.modules: | 
 |                 suite.addTest(unittest.findTestCases(sys.modules[cls])) | 
 |             else: | 
 |                 raise ValueError("str arguments must be keys in sys.modules") | 
 |         elif isinstance(cls, valid_types): | 
 |             suite.addTest(cls) | 
 |         else: | 
 |             suite.addTest(unittest.makeSuite(cls)) | 
 |     _run_suite(suite) | 
 |  | 
 |  | 
 | #======================================================================= | 
 | # doctest driver. | 
 |  | 
 | def run_doctest(module, verbosity=None): | 
 |     """Run doctest on the given module.  Return (#failures, #tests). | 
 |  | 
 |     If optional argument verbosity is not specified (or is None), pass | 
 |     support's belief about verbosity on to doctest.  Else doctest's | 
 |     usual behavior is used (it searches sys.argv for -v). | 
 |     """ | 
 |  | 
 |     import doctest | 
 |  | 
 |     if verbosity is None: | 
 |         verbosity = verbose | 
 |     else: | 
 |         verbosity = None | 
 |  | 
 |     # Direct doctest output (normally just errors) to real stdout; doctest | 
 |     # output shouldn't be compared by regrtest. | 
 |     save_stdout = sys.stdout | 
 |     sys.stdout = get_original_stdout() | 
 |     try: | 
 |         f, t = doctest.testmod(module, verbose=verbosity) | 
 |         if f: | 
 |             raise TestFailed("%d of %d doctests failed" % (f, t)) | 
 |     finally: | 
 |         sys.stdout = save_stdout | 
 |     if verbose: | 
 |         print('doctest (%s) ... %d tests with zero failures' % | 
 |               (module.__name__, t)) | 
 |     return f, t | 
 |  | 
 | #======================================================================= | 
 | # Threading support to prevent reporting refleaks when running regrtest.py -R | 
 |  | 
 | def threading_setup(): | 
 |     import threading | 
 |     return len(threading._active), len(threading._limbo) | 
 |  | 
 | def threading_cleanup(num_active, num_limbo): | 
 |     import threading | 
 |     import time | 
 |  | 
 |     _MAX_COUNT = 10 | 
 |     count = 0 | 
 |     while len(threading._active) != num_active and count < _MAX_COUNT: | 
 |         count += 1 | 
 |         time.sleep(0.1) | 
 |  | 
 |     count = 0 | 
 |     while len(threading._limbo) != num_limbo and count < _MAX_COUNT: | 
 |         count += 1 | 
 |         time.sleep(0.1) | 
 |  | 
 | def reap_children(): | 
 |     """Use this function at the end of test_main() whenever sub-processes | 
 |     are started.  This will help ensure that no extra children (zombies) | 
 |     stick around to hog resources and create problems when looking | 
 |     for refleaks. | 
 |     """ | 
 |  | 
 |     # Reap all our dead child processes so we don't leave zombies around. | 
 |     # These hog resources and might be causing some of the buildbots to die. | 
 |     if hasattr(os, 'waitpid'): | 
 |         any_process = -1 | 
 |         while True: | 
 |             try: | 
 |                 # This will raise an exception on Windows.  That's ok. | 
 |                 pid, status = os.waitpid(any_process, os.WNOHANG) | 
 |                 if pid == 0: | 
 |                     break | 
 |             except: | 
 |                 break |