| """Module/script to byte-compile all .py files to .pyc files. |
| |
| When called as a script with arguments, this compiles the directories |
| given as arguments recursively; the -l option prevents it from |
| recursing into directories. |
| |
| Without arguments, if compiles all modules on sys.path, without |
| recursing into subdirectories. (Even though it should do so for |
| packages -- for now, you'll have to deal with packages separately.) |
| |
| See module py_compile for details of the actual byte-compilation. |
| """ |
| import os |
| import sys |
| import importlib.util |
| import py_compile |
| import struct |
| import filecmp |
| |
| from functools import partial |
| from pathlib import Path |
| |
| __all__ = ["compile_dir","compile_file","compile_path"] |
| |
| def _walk_dir(dir, maxlevels, quiet=0): |
| if quiet < 2 and isinstance(dir, os.PathLike): |
| dir = os.fspath(dir) |
| if not quiet: |
| print('Listing {!r}...'.format(dir)) |
| try: |
| names = os.listdir(dir) |
| except OSError: |
| if quiet < 2: |
| print("Can't list {!r}".format(dir)) |
| names = [] |
| names.sort() |
| for name in names: |
| if name == '__pycache__': |
| continue |
| fullname = os.path.join(dir, name) |
| if not os.path.isdir(fullname): |
| yield fullname |
| elif (maxlevels > 0 and name != os.curdir and name != os.pardir and |
| os.path.isdir(fullname) and not os.path.islink(fullname)): |
| yield from _walk_dir(fullname, maxlevels=maxlevels - 1, |
| quiet=quiet) |
| |
| def compile_dir(dir, maxlevels=None, ddir=None, force=False, |
| rx=None, quiet=0, legacy=False, optimize=-1, workers=1, |
| invalidation_mode=None, *, stripdir=None, |
| prependdir=None, limit_sl_dest=None, hardlink_dupes=False): |
| """Byte-compile all modules in the given directory tree. |
| |
| Arguments (only dir is required): |
| |
| dir: the directory to byte-compile |
| maxlevels: maximum recursion level (default `sys.getrecursionlimit()`) |
| ddir: the directory that will be prepended to the path to the |
| file as it is compiled into each byte-code file. |
| force: if True, force compilation, even if timestamps are up-to-date |
| quiet: full output with False or 0, errors only with 1, |
| no output with 2 |
| legacy: if True, produce legacy pyc paths instead of PEP 3147 paths |
| optimize: int or list of optimization levels or -1 for level of |
| the interpreter. Multiple levels leads to multiple compiled |
| files each with one optimization level. |
| workers: maximum number of parallel workers |
| invalidation_mode: how the up-to-dateness of the pyc will be checked |
| stripdir: part of path to left-strip from source file path |
| prependdir: path to prepend to beginning of original file path, applied |
| after stripdir |
| limit_sl_dest: ignore symlinks if they are pointing outside of |
| the defined path |
| hardlink_dupes: hardlink duplicated pyc files |
| """ |
| ProcessPoolExecutor = None |
| if ddir is not None and (stripdir is not None or prependdir is not None): |
| raise ValueError(("Destination dir (ddir) cannot be used " |
| "in combination with stripdir or prependdir")) |
| if ddir is not None: |
| stripdir = dir |
| prependdir = ddir |
| ddir = None |
| if workers < 0: |
| raise ValueError('workers must be greater or equal to 0') |
| if workers != 1: |
| # Check if this is a system where ProcessPoolExecutor can function. |
| from concurrent.futures.process import _check_system_limits |
| try: |
| _check_system_limits() |
| except NotImplementedError: |
| workers = 1 |
| else: |
| from concurrent.futures import ProcessPoolExecutor |
| if maxlevels is None: |
| maxlevels = sys.getrecursionlimit() |
| files = _walk_dir(dir, quiet=quiet, maxlevels=maxlevels) |
| success = True |
| if workers != 1 and ProcessPoolExecutor is not None: |
| # If workers == 0, let ProcessPoolExecutor choose |
| workers = workers or None |
| with ProcessPoolExecutor(max_workers=workers) as executor: |
| results = executor.map(partial(compile_file, |
| ddir=ddir, force=force, |
| rx=rx, quiet=quiet, |
| legacy=legacy, |
| optimize=optimize, |
| invalidation_mode=invalidation_mode, |
| stripdir=stripdir, |
| prependdir=prependdir, |
| limit_sl_dest=limit_sl_dest, |
| hardlink_dupes=hardlink_dupes), |
| files) |
| success = min(results, default=True) |
| else: |
| for file in files: |
| if not compile_file(file, ddir, force, rx, quiet, |
| legacy, optimize, invalidation_mode, |
| stripdir=stripdir, prependdir=prependdir, |
| limit_sl_dest=limit_sl_dest, |
| hardlink_dupes=hardlink_dupes): |
| success = False |
| return success |
| |
| def compile_file(fullname, ddir=None, force=False, rx=None, quiet=0, |
| legacy=False, optimize=-1, |
| invalidation_mode=None, *, stripdir=None, prependdir=None, |
| limit_sl_dest=None, hardlink_dupes=False): |
| """Byte-compile one file. |
| |
| Arguments (only fullname is required): |
| |
| fullname: the file to byte-compile |
| ddir: if given, the directory name compiled in to the |
| byte-code file. |
| force: if True, force compilation, even if timestamps are up-to-date |
| quiet: full output with False or 0, errors only with 1, |
| no output with 2 |
| legacy: if True, produce legacy pyc paths instead of PEP 3147 paths |
| optimize: int or list of optimization levels or -1 for level of |
| the interpreter. Multiple levels leads to multiple compiled |
| files each with one optimization level. |
| invalidation_mode: how the up-to-dateness of the pyc will be checked |
| stripdir: part of path to left-strip from source file path |
| prependdir: path to prepend to beginning of original file path, applied |
| after stripdir |
| limit_sl_dest: ignore symlinks if they are pointing outside of |
| the defined path. |
| hardlink_dupes: hardlink duplicated pyc files |
| """ |
| |
| if ddir is not None and (stripdir is not None or prependdir is not None): |
| raise ValueError(("Destination dir (ddir) cannot be used " |
| "in combination with stripdir or prependdir")) |
| |
| success = True |
| if quiet < 2 and isinstance(fullname, os.PathLike): |
| fullname = os.fspath(fullname) |
| name = os.path.basename(fullname) |
| |
| dfile = None |
| |
| if ddir is not None: |
| dfile = os.path.join(ddir, name) |
| |
| if stripdir is not None: |
| fullname_parts = fullname.split(os.path.sep) |
| stripdir_parts = stripdir.split(os.path.sep) |
| ddir_parts = list(fullname_parts) |
| |
| for spart, opart in zip(stripdir_parts, fullname_parts): |
| if spart == opart: |
| ddir_parts.remove(spart) |
| |
| dfile = os.path.join(*ddir_parts) |
| |
| if prependdir is not None: |
| if dfile is None: |
| dfile = os.path.join(prependdir, fullname) |
| else: |
| dfile = os.path.join(prependdir, dfile) |
| |
| if isinstance(optimize, int): |
| optimize = [optimize] |
| |
| # Use set() to remove duplicates. |
| # Use sorted() to create pyc files in a deterministic order. |
| optimize = sorted(set(optimize)) |
| |
| if hardlink_dupes and len(optimize) < 2: |
| raise ValueError("Hardlinking of duplicated bytecode makes sense " |
| "only for more than one optimization level") |
| |
| if rx is not None: |
| mo = rx.search(fullname) |
| if mo: |
| return success |
| |
| if limit_sl_dest is not None and os.path.islink(fullname): |
| if Path(limit_sl_dest).resolve() not in Path(fullname).resolve().parents: |
| return success |
| |
| opt_cfiles = {} |
| |
| if os.path.isfile(fullname): |
| for opt_level in optimize: |
| if legacy: |
| opt_cfiles[opt_level] = fullname + 'c' |
| else: |
| if opt_level >= 0: |
| opt = opt_level if opt_level >= 1 else '' |
| cfile = (importlib.util.cache_from_source( |
| fullname, optimization=opt)) |
| opt_cfiles[opt_level] = cfile |
| else: |
| cfile = importlib.util.cache_from_source(fullname) |
| opt_cfiles[opt_level] = cfile |
| |
| head, tail = name[:-3], name[-3:] |
| if tail == '.py': |
| if not force: |
| try: |
| mtime = int(os.stat(fullname).st_mtime) |
| expect = struct.pack('<4sLL', importlib.util.MAGIC_NUMBER, |
| 0, mtime & 0xFFFF_FFFF) |
| for cfile in opt_cfiles.values(): |
| with open(cfile, 'rb') as chandle: |
| actual = chandle.read(12) |
| if expect != actual: |
| break |
| else: |
| return success |
| except OSError: |
| pass |
| if not quiet: |
| print('Compiling {!r}...'.format(fullname)) |
| try: |
| for index, opt_level in enumerate(optimize): |
| cfile = opt_cfiles[opt_level] |
| ok = py_compile.compile(fullname, cfile, dfile, True, |
| optimize=opt_level, |
| invalidation_mode=invalidation_mode) |
| if index > 0 and hardlink_dupes: |
| previous_cfile = opt_cfiles[optimize[index - 1]] |
| if filecmp.cmp(cfile, previous_cfile, shallow=False): |
| os.unlink(cfile) |
| os.link(previous_cfile, cfile) |
| except py_compile.PyCompileError as err: |
| success = False |
| if quiet >= 2: |
| return success |
| elif quiet: |
| print('*** Error compiling {!r}...'.format(fullname)) |
| else: |
| print('*** ', end='') |
| # escape non-printable characters in msg |
| encoding = sys.stdout.encoding or sys.getdefaultencoding() |
| msg = err.msg.encode(encoding, errors='backslashreplace').decode(encoding) |
| print(msg) |
| except (SyntaxError, UnicodeError, OSError) as e: |
| success = False |
| if quiet >= 2: |
| return success |
| elif quiet: |
| print('*** Error compiling {!r}...'.format(fullname)) |
| else: |
| print('*** ', end='') |
| print(e.__class__.__name__ + ':', e) |
| else: |
| if ok == 0: |
| success = False |
| return success |
| |
| def compile_path(skip_curdir=1, maxlevels=0, force=False, quiet=0, |
| legacy=False, optimize=-1, |
| invalidation_mode=None): |
| """Byte-compile all module on sys.path. |
| |
| Arguments (all optional): |
| |
| skip_curdir: if true, skip current directory (default True) |
| maxlevels: max recursion level (default 0) |
| force: as for compile_dir() (default False) |
| quiet: as for compile_dir() (default 0) |
| legacy: as for compile_dir() (default False) |
| optimize: as for compile_dir() (default -1) |
| invalidation_mode: as for compiler_dir() |
| """ |
| success = True |
| for dir in sys.path: |
| if (not dir or dir == os.curdir) and skip_curdir: |
| if quiet < 2: |
| print('Skipping current directory') |
| else: |
| success = success and compile_dir( |
| dir, |
| maxlevels, |
| None, |
| force, |
| quiet=quiet, |
| legacy=legacy, |
| optimize=optimize, |
| invalidation_mode=invalidation_mode, |
| ) |
| return success |
| |
| |
| def main(): |
| """Script main program.""" |
| import argparse |
| |
| parser = argparse.ArgumentParser( |
| description='Utilities to support installing Python libraries.') |
| parser.add_argument('-l', action='store_const', const=0, |
| default=None, dest='maxlevels', |
| help="don't recurse into subdirectories") |
| parser.add_argument('-r', type=int, dest='recursion', |
| help=('control the maximum recursion level. ' |
| 'if `-l` and `-r` options are specified, ' |
| 'then `-r` takes precedence.')) |
| parser.add_argument('-f', action='store_true', dest='force', |
| help='force rebuild even if timestamps are up to date') |
| parser.add_argument('-q', action='count', dest='quiet', default=0, |
| help='output only error messages; -qq will suppress ' |
| 'the error messages as well.') |
| parser.add_argument('-b', action='store_true', dest='legacy', |
| help='use legacy (pre-PEP3147) compiled file locations') |
| parser.add_argument('-d', metavar='DESTDIR', dest='ddir', default=None, |
| help=('directory to prepend to file paths for use in ' |
| 'compile-time tracebacks and in runtime ' |
| 'tracebacks in cases where the source file is ' |
| 'unavailable')) |
| parser.add_argument('-s', metavar='STRIPDIR', dest='stripdir', |
| default=None, |
| help=('part of path to left-strip from path ' |
| 'to source file - for example buildroot. ' |
| '`-d` and `-s` options cannot be ' |
| 'specified together.')) |
| parser.add_argument('-p', metavar='PREPENDDIR', dest='prependdir', |
| default=None, |
| help=('path to add as prefix to path ' |
| 'to source file - for example / to make ' |
| 'it absolute when some part is removed ' |
| 'by `-s` option. ' |
| '`-d` and `-p` options cannot be ' |
| 'specified together.')) |
| parser.add_argument('-x', metavar='REGEXP', dest='rx', default=None, |
| help=('skip files matching the regular expression; ' |
| 'the regexp is searched for in the full path ' |
| 'of each file considered for compilation')) |
| parser.add_argument('-i', metavar='FILE', dest='flist', |
| help=('add all the files and directories listed in ' |
| 'FILE to the list considered for compilation; ' |
| 'if "-", names are read from stdin')) |
| parser.add_argument('compile_dest', metavar='FILE|DIR', nargs='*', |
| help=('zero or more file and directory names ' |
| 'to compile; if no arguments given, defaults ' |
| 'to the equivalent of -l sys.path')) |
| parser.add_argument('-j', '--workers', default=1, |
| type=int, help='Run compileall concurrently') |
| invalidation_modes = [mode.name.lower().replace('_', '-') |
| for mode in py_compile.PycInvalidationMode] |
| parser.add_argument('--invalidation-mode', |
| choices=sorted(invalidation_modes), |
| help=('set .pyc invalidation mode; defaults to ' |
| '"checked-hash" if the SOURCE_DATE_EPOCH ' |
| 'environment variable is set, and ' |
| '"timestamp" otherwise.')) |
| parser.add_argument('-o', action='append', type=int, dest='opt_levels', |
| help=('Optimization levels to run compilation with. ' |
| 'Default is -1 which uses the optimization level ' |
| 'of the Python interpreter itself (see -O).')) |
| parser.add_argument('-e', metavar='DIR', dest='limit_sl_dest', |
| help='Ignore symlinks pointing outsite of the DIR') |
| parser.add_argument('--hardlink-dupes', action='store_true', |
| dest='hardlink_dupes', |
| help='Hardlink duplicated pyc files') |
| |
| args = parser.parse_args() |
| compile_dests = args.compile_dest |
| |
| if args.rx: |
| import re |
| args.rx = re.compile(args.rx) |
| |
| if args.limit_sl_dest == "": |
| args.limit_sl_dest = None |
| |
| if args.recursion is not None: |
| maxlevels = args.recursion |
| else: |
| maxlevels = args.maxlevels |
| |
| if args.opt_levels is None: |
| args.opt_levels = [-1] |
| |
| if len(args.opt_levels) == 1 and args.hardlink_dupes: |
| parser.error(("Hardlinking of duplicated bytecode makes sense " |
| "only for more than one optimization level.")) |
| |
| if args.ddir is not None and ( |
| args.stripdir is not None or args.prependdir is not None |
| ): |
| parser.error("-d cannot be used in combination with -s or -p") |
| |
| # if flist is provided then load it |
| if args.flist: |
| try: |
| with (sys.stdin if args.flist=='-' else |
| open(args.flist, encoding="utf-8")) as f: |
| for line in f: |
| compile_dests.append(line.strip()) |
| except OSError: |
| if args.quiet < 2: |
| print("Error reading file list {}".format(args.flist)) |
| return False |
| |
| if args.invalidation_mode: |
| ivl_mode = args.invalidation_mode.replace('-', '_').upper() |
| invalidation_mode = py_compile.PycInvalidationMode[ivl_mode] |
| else: |
| invalidation_mode = None |
| |
| success = True |
| try: |
| if compile_dests: |
| for dest in compile_dests: |
| if os.path.isfile(dest): |
| if not compile_file(dest, args.ddir, args.force, args.rx, |
| args.quiet, args.legacy, |
| invalidation_mode=invalidation_mode, |
| stripdir=args.stripdir, |
| prependdir=args.prependdir, |
| optimize=args.opt_levels, |
| limit_sl_dest=args.limit_sl_dest, |
| hardlink_dupes=args.hardlink_dupes): |
| success = False |
| else: |
| if not compile_dir(dest, maxlevels, args.ddir, |
| args.force, args.rx, args.quiet, |
| args.legacy, workers=args.workers, |
| invalidation_mode=invalidation_mode, |
| stripdir=args.stripdir, |
| prependdir=args.prependdir, |
| optimize=args.opt_levels, |
| limit_sl_dest=args.limit_sl_dest, |
| hardlink_dupes=args.hardlink_dupes): |
| success = False |
| return success |
| else: |
| return compile_path(legacy=args.legacy, force=args.force, |
| quiet=args.quiet, |
| invalidation_mode=invalidation_mode) |
| except KeyboardInterrupt: |
| if args.quiet < 2: |
| print("\n[interrupted]") |
| return False |
| return True |
| |
| |
| if __name__ == '__main__': |
| exit_status = int(not main()) |
| sys.exit(exit_status) |