| import builtins |
| import contextlib |
| import errno |
| import functools |
| import importlib |
| from importlib import machinery, util, invalidate_caches |
| import os |
| import os.path |
| from test import support |
| import unittest |
| import sys |
| import tempfile |
| import types |
| |
| |
| BUILTINS = types.SimpleNamespace() |
| BUILTINS.good_name = None |
| BUILTINS.bad_name = None |
| if 'errno' in sys.builtin_module_names: |
| BUILTINS.good_name = 'errno' |
| if 'importlib' not in sys.builtin_module_names: |
| BUILTINS.bad_name = 'importlib' |
| |
| EXTENSIONS = types.SimpleNamespace() |
| EXTENSIONS.path = None |
| EXTENSIONS.ext = None |
| EXTENSIONS.filename = None |
| EXTENSIONS.file_path = None |
| EXTENSIONS.name = '_testcapi' |
| |
| def _extension_details(): |
| global EXTENSIONS |
| for path in sys.path: |
| for ext in machinery.EXTENSION_SUFFIXES: |
| filename = EXTENSIONS.name + ext |
| file_path = os.path.join(path, filename) |
| if os.path.exists(file_path): |
| EXTENSIONS.path = path |
| EXTENSIONS.ext = ext |
| EXTENSIONS.filename = filename |
| EXTENSIONS.file_path = file_path |
| return |
| |
| _extension_details() |
| |
| |
| def import_importlib(module_name): |
| """Import a module from importlib both w/ and w/o _frozen_importlib.""" |
| fresh = ('importlib',) if '.' in module_name else () |
| frozen = support.import_fresh_module(module_name) |
| source = support.import_fresh_module(module_name, fresh=fresh, |
| blocked=('_frozen_importlib', '_frozen_importlib_external')) |
| return {'Frozen': frozen, 'Source': source} |
| |
| |
| def specialize_class(cls, kind, base=None, **kwargs): |
| # XXX Support passing in submodule names--load (and cache) them? |
| # That would clean up the test modules a bit more. |
| if base is None: |
| base = unittest.TestCase |
| elif not isinstance(base, type): |
| base = base[kind] |
| name = '{}_{}'.format(kind, cls.__name__) |
| bases = (cls, base) |
| specialized = types.new_class(name, bases) |
| specialized.__module__ = cls.__module__ |
| specialized._NAME = cls.__name__ |
| specialized._KIND = kind |
| for attr, values in kwargs.items(): |
| value = values[kind] |
| setattr(specialized, attr, value) |
| return specialized |
| |
| |
| def split_frozen(cls, base=None, **kwargs): |
| frozen = specialize_class(cls, 'Frozen', base, **kwargs) |
| source = specialize_class(cls, 'Source', base, **kwargs) |
| return frozen, source |
| |
| |
| def test_both(test_class, base=None, **kwargs): |
| return split_frozen(test_class, base, **kwargs) |
| |
| |
| CASE_INSENSITIVE_FS = True |
| # Windows is the only OS that is *always* case-insensitive |
| # (OS X *can* be case-sensitive). |
| if sys.platform not in ('win32', 'cygwin'): |
| changed_name = __file__.upper() |
| if changed_name == __file__: |
| changed_name = __file__.lower() |
| if not os.path.exists(changed_name): |
| CASE_INSENSITIVE_FS = False |
| |
| source_importlib = import_importlib('importlib')['Source'] |
| __import__ = {'Frozen': staticmethod(builtins.__import__), |
| 'Source': staticmethod(source_importlib.__import__)} |
| |
| |
| def case_insensitive_tests(test): |
| """Class decorator that nullifies tests requiring a case-insensitive |
| file system.""" |
| return unittest.skipIf(not CASE_INSENSITIVE_FS, |
| "requires a case-insensitive filesystem")(test) |
| |
| |
| def submodule(parent, name, pkg_dir, content=''): |
| path = os.path.join(pkg_dir, name + '.py') |
| with open(path, 'w') as subfile: |
| subfile.write(content) |
| return '{}.{}'.format(parent, name), path |
| |
| |
| @contextlib.contextmanager |
| def uncache(*names): |
| """Uncache a module from sys.modules. |
| |
| A basic sanity check is performed to prevent uncaching modules that either |
| cannot/shouldn't be uncached. |
| |
| """ |
| for name in names: |
| if name in ('sys', 'marshal', 'imp'): |
| raise ValueError( |
| "cannot uncache {0}".format(name)) |
| try: |
| del sys.modules[name] |
| except KeyError: |
| pass |
| try: |
| yield |
| finally: |
| for name in names: |
| try: |
| del sys.modules[name] |
| except KeyError: |
| pass |
| |
| |
| @contextlib.contextmanager |
| def temp_module(name, content='', *, pkg=False): |
| conflicts = [n for n in sys.modules if n.partition('.')[0] == name] |
| with support.temp_cwd(None) as cwd: |
| with uncache(name, *conflicts): |
| with support.DirsOnSysPath(cwd): |
| invalidate_caches() |
| |
| location = os.path.join(cwd, name) |
| if pkg: |
| modpath = os.path.join(location, '__init__.py') |
| os.mkdir(name) |
| else: |
| modpath = location + '.py' |
| if content is None: |
| # Make sure the module file gets created. |
| content = '' |
| if content is not None: |
| # not a namespace package |
| with open(modpath, 'w') as modfile: |
| modfile.write(content) |
| yield location |
| |
| |
| @contextlib.contextmanager |
| def import_state(**kwargs): |
| """Context manager to manage the various importers and stored state in the |
| sys module. |
| |
| The 'modules' attribute is not supported as the interpreter state stores a |
| pointer to the dict that the interpreter uses internally; |
| reassigning to sys.modules does not have the desired effect. |
| |
| """ |
| originals = {} |
| try: |
| for attr, default in (('meta_path', []), ('path', []), |
| ('path_hooks', []), |
| ('path_importer_cache', {})): |
| originals[attr] = getattr(sys, attr) |
| if attr in kwargs: |
| new_value = kwargs[attr] |
| del kwargs[attr] |
| else: |
| new_value = default |
| setattr(sys, attr, new_value) |
| if len(kwargs): |
| raise ValueError( |
| 'unrecognized arguments: {0}'.format(kwargs.keys())) |
| yield |
| finally: |
| for attr, value in originals.items(): |
| setattr(sys, attr, value) |
| |
| |
| class _ImporterMock: |
| |
| """Base class to help with creating importer mocks.""" |
| |
| def __init__(self, *names, module_code={}): |
| self.modules = {} |
| self.module_code = {} |
| for name in names: |
| if not name.endswith('.__init__'): |
| import_name = name |
| else: |
| import_name = name[:-len('.__init__')] |
| if '.' not in name: |
| package = None |
| elif import_name == name: |
| package = name.rsplit('.', 1)[0] |
| else: |
| package = import_name |
| module = types.ModuleType(import_name) |
| module.__loader__ = self |
| module.__file__ = '<mock __file__>' |
| module.__package__ = package |
| module.attr = name |
| if import_name != name: |
| module.__path__ = ['<mock __path__>'] |
| self.modules[import_name] = module |
| if import_name in module_code: |
| self.module_code[import_name] = module_code[import_name] |
| |
| def __getitem__(self, name): |
| return self.modules[name] |
| |
| def __enter__(self): |
| self._uncache = uncache(*self.modules.keys()) |
| self._uncache.__enter__() |
| return self |
| |
| def __exit__(self, *exc_info): |
| self._uncache.__exit__(None, None, None) |
| |
| |
| class mock_modules(_ImporterMock): |
| |
| """Importer mock using PEP 302 APIs.""" |
| |
| def find_module(self, fullname, path=None): |
| if fullname not in self.modules: |
| return None |
| else: |
| return self |
| |
| def load_module(self, fullname): |
| if fullname not in self.modules: |
| raise ImportError |
| else: |
| sys.modules[fullname] = self.modules[fullname] |
| if fullname in self.module_code: |
| try: |
| self.module_code[fullname]() |
| except Exception: |
| del sys.modules[fullname] |
| raise |
| return self.modules[fullname] |
| |
| |
| class mock_spec(_ImporterMock): |
| |
| """Importer mock using PEP 451 APIs.""" |
| |
| def find_spec(self, fullname, path=None, parent=None): |
| try: |
| module = self.modules[fullname] |
| except KeyError: |
| return None |
| is_package = hasattr(module, '__path__') |
| spec = util.spec_from_file_location( |
| fullname, module.__file__, loader=self, |
| submodule_search_locations=getattr(module, '__path__', None)) |
| return spec |
| |
| def create_module(self, spec): |
| if spec.name not in self.modules: |
| raise ImportError |
| return self.modules[spec.name] |
| |
| def exec_module(self, module): |
| try: |
| self.module_code[module.__spec__.name]() |
| except KeyError: |
| pass |
| |
| |
| def writes_bytecode_files(fxn): |
| """Decorator to protect sys.dont_write_bytecode from mutation and to skip |
| tests that require it to be set to False.""" |
| if sys.dont_write_bytecode: |
| return lambda *args, **kwargs: None |
| @functools.wraps(fxn) |
| def wrapper(*args, **kwargs): |
| original = sys.dont_write_bytecode |
| sys.dont_write_bytecode = False |
| try: |
| to_return = fxn(*args, **kwargs) |
| finally: |
| sys.dont_write_bytecode = original |
| return to_return |
| return wrapper |
| |
| |
| def ensure_bytecode_path(bytecode_path): |
| """Ensure that the __pycache__ directory for PEP 3147 pyc file exists. |
| |
| :param bytecode_path: File system path to PEP 3147 pyc file. |
| """ |
| try: |
| os.mkdir(os.path.dirname(bytecode_path)) |
| except OSError as error: |
| if error.errno != errno.EEXIST: |
| raise |
| |
| |
| @contextlib.contextmanager |
| def create_modules(*names): |
| """Temporarily create each named module with an attribute (named 'attr') |
| that contains the name passed into the context manager that caused the |
| creation of the module. |
| |
| All files are created in a temporary directory returned by |
| tempfile.mkdtemp(). This directory is inserted at the beginning of |
| sys.path. When the context manager exits all created files (source and |
| bytecode) are explicitly deleted. |
| |
| No magic is performed when creating packages! This means that if you create |
| a module within a package you must also create the package's __init__ as |
| well. |
| |
| """ |
| source = 'attr = {0!r}' |
| created_paths = [] |
| mapping = {} |
| state_manager = None |
| uncache_manager = None |
| try: |
| temp_dir = tempfile.mkdtemp() |
| mapping['.root'] = temp_dir |
| import_names = set() |
| for name in names: |
| if not name.endswith('__init__'): |
| import_name = name |
| else: |
| import_name = name[:-len('.__init__')] |
| import_names.add(import_name) |
| if import_name in sys.modules: |
| del sys.modules[import_name] |
| name_parts = name.split('.') |
| file_path = temp_dir |
| for directory in name_parts[:-1]: |
| file_path = os.path.join(file_path, directory) |
| if not os.path.exists(file_path): |
| os.mkdir(file_path) |
| created_paths.append(file_path) |
| file_path = os.path.join(file_path, name_parts[-1] + '.py') |
| with open(file_path, 'w') as file: |
| file.write(source.format(name)) |
| created_paths.append(file_path) |
| mapping[name] = file_path |
| uncache_manager = uncache(*import_names) |
| uncache_manager.__enter__() |
| state_manager = import_state(path=[temp_dir]) |
| state_manager.__enter__() |
| yield mapping |
| finally: |
| if state_manager is not None: |
| state_manager.__exit__(None, None, None) |
| if uncache_manager is not None: |
| uncache_manager.__exit__(None, None, None) |
| support.rmtree(temp_dir) |
| |
| |
| def mock_path_hook(*entries, importer): |
| """A mock sys.path_hooks entry.""" |
| def hook(entry): |
| if entry not in entries: |
| raise ImportError |
| return importer |
| return hook |