| import contextlib |
| import os |
| import pathlib |
| import shutil |
| import stat |
| import sys |
| import zipfile |
| |
| __all__ = ['ZipAppError', 'create_archive', 'get_interpreter'] |
| |
| |
| # The __main__.py used if the users specifies "-m module:fn". |
| # Note that this will always be written as UTF-8 (module and |
| # function names can be non-ASCII in Python 3). |
| # We add a coding cookie even though UTF-8 is the default in Python 3 |
| # because the resulting archive may be intended to be run under Python 2. |
| MAIN_TEMPLATE = """\ |
| # -*- coding: utf-8 -*- |
| import {module} |
| {module}.{fn}() |
| """ |
| |
| |
| # The Windows launcher defaults to UTF-8 when parsing shebang lines if the |
| # file has no BOM. So use UTF-8 on Windows. |
| # On Unix, use the filesystem encoding. |
| if sys.platform.startswith('win'): |
| shebang_encoding = 'utf-8' |
| else: |
| shebang_encoding = sys.getfilesystemencoding() |
| |
| |
| class ZipAppError(ValueError): |
| pass |
| |
| |
| @contextlib.contextmanager |
| def _maybe_open(archive, mode): |
| if isinstance(archive, (str, os.PathLike)): |
| with open(archive, mode) as f: |
| yield f |
| else: |
| yield archive |
| |
| |
| def _write_file_prefix(f, interpreter): |
| """Write a shebang line.""" |
| if interpreter: |
| shebang = b'#!' + interpreter.encode(shebang_encoding) + b'\n' |
| f.write(shebang) |
| |
| |
| def _copy_archive(archive, new_archive, interpreter=None): |
| """Copy an application archive, modifying the shebang line.""" |
| with _maybe_open(archive, 'rb') as src: |
| # Skip the shebang line from the source. |
| # Read 2 bytes of the source and check if they are #!. |
| first_2 = src.read(2) |
| if first_2 == b'#!': |
| # Discard the initial 2 bytes and the rest of the shebang line. |
| first_2 = b'' |
| src.readline() |
| |
| with _maybe_open(new_archive, 'wb') as dst: |
| _write_file_prefix(dst, interpreter) |
| # If there was no shebang, "first_2" contains the first 2 bytes |
| # of the source file, so write them before copying the rest |
| # of the file. |
| dst.write(first_2) |
| shutil.copyfileobj(src, dst) |
| |
| if interpreter and isinstance(new_archive, str): |
| os.chmod(new_archive, os.stat(new_archive).st_mode | stat.S_IEXEC) |
| |
| |
| def create_archive(source, target=None, interpreter=None, main=None, |
| filter=None, compressed=False): |
| """Create an application archive from SOURCE. |
| |
| The SOURCE can be the name of a directory, or a filename or a file-like |
| object referring to an existing archive. |
| |
| The content of SOURCE is packed into an application archive in TARGET, |
| which can be a filename or a file-like object. If SOURCE is a directory, |
| TARGET can be omitted and will default to the name of SOURCE with .pyz |
| appended. |
| |
| The created application archive will have a shebang line specifying |
| that it should run with INTERPRETER (there will be no shebang line if |
| INTERPRETER is None), and a __main__.py which runs MAIN (if MAIN is |
| not specified, an existing __main__.py will be used). It is an error |
| to specify MAIN for anything other than a directory source with no |
| __main__.py, and it is an error to omit MAIN if the directory has no |
| __main__.py. |
| """ |
| # Are we copying an existing archive? |
| source_is_file = False |
| if hasattr(source, 'read') and hasattr(source, 'readline'): |
| source_is_file = True |
| else: |
| source = pathlib.Path(source) |
| if source.is_file(): |
| source_is_file = True |
| |
| if source_is_file: |
| _copy_archive(source, target, interpreter) |
| return |
| |
| # We are creating a new archive from a directory. |
| if not source.exists(): |
| raise ZipAppError("Source does not exist") |
| has_main = (source / '__main__.py').is_file() |
| if main and has_main: |
| raise ZipAppError( |
| "Cannot specify entry point if the source has __main__.py") |
| if not (main or has_main): |
| raise ZipAppError("Archive has no entry point") |
| |
| main_py = None |
| if main: |
| # Check that main has the right format. |
| mod, sep, fn = main.partition(':') |
| mod_ok = all(part.isidentifier() for part in mod.split('.')) |
| fn_ok = all(part.isidentifier() for part in fn.split('.')) |
| if not (sep == ':' and mod_ok and fn_ok): |
| raise ZipAppError("Invalid entry point: " + main) |
| main_py = MAIN_TEMPLATE.format(module=mod, fn=fn) |
| |
| if target is None: |
| target = source.with_suffix('.pyz') |
| elif not hasattr(target, 'write'): |
| target = pathlib.Path(target) |
| |
| with _maybe_open(target, 'wb') as fd: |
| _write_file_prefix(fd, interpreter) |
| compression = (zipfile.ZIP_DEFLATED if compressed else |
| zipfile.ZIP_STORED) |
| with zipfile.ZipFile(fd, 'w', compression=compression) as z: |
| for child in source.rglob('*'): |
| arcname = child.relative_to(source) |
| if filter is None or filter(arcname): |
| z.write(child, arcname.as_posix()) |
| if main_py: |
| z.writestr('__main__.py', main_py.encode('utf-8')) |
| |
| if interpreter and not hasattr(target, 'write'): |
| target.chmod(target.stat().st_mode | stat.S_IEXEC) |
| |
| |
| def get_interpreter(archive): |
| with _maybe_open(archive, 'rb') as f: |
| if f.read(2) == b'#!': |
| return f.readline().strip().decode(shebang_encoding) |
| |
| |
| def main(args=None): |
| """Run the zipapp command line interface. |
| |
| The ARGS parameter lets you specify the argument list directly. |
| Omitting ARGS (or setting it to None) works as for argparse, using |
| sys.argv[1:] as the argument list. |
| """ |
| import argparse |
| |
| parser = argparse.ArgumentParser() |
| parser.add_argument('--output', '-o', default=None, |
| help="The name of the output archive. " |
| "Required if SOURCE is an archive.") |
| parser.add_argument('--python', '-p', default=None, |
| help="The name of the Python interpreter to use " |
| "(default: no shebang line).") |
| parser.add_argument('--main', '-m', default=None, |
| help="The main function of the application " |
| "(default: use an existing __main__.py).") |
| parser.add_argument('--compress', '-c', action='store_true', |
| help="Compress files with the deflate method. " |
| "Files are stored uncompressed by default.") |
| parser.add_argument('--info', default=False, action='store_true', |
| help="Display the interpreter from the archive.") |
| parser.add_argument('source', |
| help="Source directory (or existing archive).") |
| |
| args = parser.parse_args(args) |
| |
| # Handle `python -m zipapp archive.pyz --info`. |
| if args.info: |
| if not os.path.isfile(args.source): |
| raise SystemExit("Can only get info for an archive file") |
| interpreter = get_interpreter(args.source) |
| print("Interpreter: {}".format(interpreter or "<none>")) |
| sys.exit(0) |
| |
| if os.path.isfile(args.source): |
| if args.output is None or (os.path.exists(args.output) and |
| os.path.samefile(args.source, args.output)): |
| raise SystemExit("In-place editing of archives is not supported") |
| if args.main: |
| raise SystemExit("Cannot change the main function when copying") |
| |
| create_archive(args.source, args.output, |
| interpreter=args.python, main=args.main, |
| compressed=args.compress) |
| |
| |
| if __name__ == '__main__': |
| main() |