blob: 50dd754886919305f324e43ded84772dce88c684 [file] [log] [blame]
import argparse
import contextlib
import fnmatch
import logging
import os
import os.path
import shutil
import sys
from . import fsutil, strutil, iterutil, logging as loggingutil
_NOT_SET = object()
def get_prog(spec=None, *, absolute=False, allowsuffix=True):
if spec is None:
_, spec = _find_script()
# This is more natural for prog than __file__ would be.
filename = sys.argv[0]
elif isinstance(spec, str):
filename = os.path.normpath(spec)
spec = None
else:
filename = spec.origin
if _is_standalone(filename):
# Check if "installed".
if allowsuffix or not filename.endswith('.py'):
basename = os.path.basename(filename)
found = shutil.which(basename)
if found:
script = os.path.abspath(filename)
found = os.path.abspath(found)
if os.path.normcase(script) == os.path.normcase(found):
return basename
# It is only "standalone".
if absolute:
filename = os.path.abspath(filename)
return filename
elif spec is not None:
module = spec.name
if module.endswith('.__main__'):
module = module[:-9]
return f'{sys.executable} -m {module}'
else:
if absolute:
filename = os.path.abspath(filename)
return f'{sys.executable} {filename}'
def _find_script():
frame = sys._getframe(2)
while frame.f_globals['__name__'] != '__main__':
frame = frame.f_back
# This should match sys.argv[0].
filename = frame.f_globals['__file__']
# This will be None if -m wasn't used..
spec = frame.f_globals['__spec__']
return filename, spec
def is_installed(filename, *, allowsuffix=True):
if not allowsuffix and filename.endswith('.py'):
return False
filename = os.path.abspath(os.path.normalize(filename))
found = shutil.which(os.path.basename(filename))
if not found:
return False
if found != filename:
return False
return _is_standalone(filename)
def is_standalone(filename):
filename = os.path.abspath(os.path.normalize(filename))
return _is_standalone(filename)
def _is_standalone(filename):
return fsutil.is_executable(filename)
##################################
# logging
VERBOSITY = 3
TRACEBACK = os.environ.get('SHOW_TRACEBACK', '').strip()
TRACEBACK = bool(TRACEBACK and TRACEBACK.upper() not in ('0', 'FALSE', 'NO'))
logger = logging.getLogger(__name__)
def configure_logger(verbosity, logger=None, **kwargs):
if logger is None:
# Configure the root logger.
logger = logging.getLogger()
loggingutil.configure_logger(logger, verbosity, **kwargs)
##################################
# selections
class UnsupportedSelectionError(Exception):
def __init__(self, values, possible):
self.values = tuple(values)
self.possible = tuple(possible)
super().__init__(f'unsupported selections {self.unique}')
@property
def unique(self):
return tuple(sorted(set(self.values)))
def normalize_selection(selected: str, *, possible=None):
if selected in (None, True, False):
return selected
elif isinstance(selected, str):
selected = [selected]
elif not selected:
return ()
unsupported = []
_selected = set()
for item in selected:
if not item:
continue
for value in item.strip().replace(',', ' ').split():
if not value:
continue
# XXX Handle subtraction (leading "-").
if possible and value not in possible and value != 'all':
unsupported.append(value)
_selected.add(value)
if unsupported:
raise UnsupportedSelectionError(unsupported, tuple(possible))
if 'all' in _selected:
return True
return frozenset(selected)
##################################
# CLI parsing helpers
class CLIArgSpec(tuple):
def __new__(cls, *args, **kwargs):
return super().__new__(cls, (args, kwargs))
def __repr__(self):
args, kwargs = self
args = [repr(arg) for arg in args]
for name, value in kwargs.items():
args.append(f'{name}={value!r}')
return f'{type(self).__name__}({", ".join(args)})'
def __call__(self, parser, *, _noop=(lambda a: None)):
self.apply(parser)
return _noop
def apply(self, parser):
args, kwargs = self
parser.add_argument(*args, **kwargs)
def apply_cli_argspecs(parser, specs):
processors = []
for spec in specs:
if callable(spec):
procs = spec(parser)
_add_procs(processors, procs)
else:
args, kwargs = spec
parser.add_argument(args, kwargs)
return processors
def _add_procs(flattened, procs):
# XXX Fail on non-empty, non-callable procs?
if not procs:
return
if callable(procs):
flattened.append(procs)
else:
#processors.extend(p for p in procs if callable(p))
for proc in procs:
_add_procs(flattened, proc)
def add_verbosity_cli(parser):
parser.add_argument('-q', '--quiet', action='count', default=0)
parser.add_argument('-v', '--verbose', action='count', default=0)
def process_args(args):
ns = vars(args)
key = 'verbosity'
if key in ns:
parser.error(f'duplicate arg {key!r}')
ns[key] = max(0, VERBOSITY + ns.pop('verbose') - ns.pop('quiet'))
return key
return process_args
def add_traceback_cli(parser):
parser.add_argument('--traceback', '--tb', action='store_true',
default=TRACEBACK)
parser.add_argument('--no-traceback', '--no-tb', dest='traceback',
action='store_const', const=False)
def process_args(args):
ns = vars(args)
key = 'traceback_cm'
if key in ns:
parser.error(f'duplicate arg {key!r}')
showtb = ns.pop('traceback')
@contextlib.contextmanager
def traceback_cm():
restore = loggingutil.hide_emit_errors()
try:
yield
except BrokenPipeError:
# It was piped to "head" or something similar.
pass
except NotImplementedError:
raise # re-raise
except Exception as exc:
if not showtb:
sys.exit(f'ERROR: {exc}')
raise # re-raise
except KeyboardInterrupt:
if not showtb:
sys.exit('\nINTERRUPTED')
raise # re-raise
except BaseException as exc:
if not showtb:
sys.exit(f'{type(exc).__name__}: {exc}')
raise # re-raise
finally:
restore()
ns[key] = traceback_cm()
return key
return process_args
def add_sepval_cli(parser, opt, dest, choices, *, sep=',', **kwargs):
# if opt is True:
# parser.add_argument(f'--{dest}', action='append', **kwargs)
# elif isinstance(opt, str) and opt.startswith('-'):
# parser.add_argument(opt, dest=dest, action='append', **kwargs)
# else:
# arg = dest if not opt else opt
# kwargs.setdefault('nargs', '+')
# parser.add_argument(arg, dest=dest, action='append', **kwargs)
if not isinstance(opt, str):
parser.error(f'opt must be a string, got {opt!r}')
elif opt.startswith('-'):
parser.add_argument(opt, dest=dest, action='append', **kwargs)
else:
kwargs.setdefault('nargs', '+')
#kwargs.setdefault('metavar', opt.upper())
parser.add_argument(opt, dest=dest, action='append', **kwargs)
def process_args(args):
ns = vars(args)
# XXX Use normalize_selection()?
if isinstance(ns[dest], str):
ns[dest] = [ns[dest]]
selections = []
for many in ns[dest] or ():
for value in many.split(sep):
if value not in choices:
parser.error(f'unknown {dest} {value!r}')
selections.append(value)
ns[dest] = selections
return process_args
def add_files_cli(parser, *, excluded=None, nargs=None):
process_files = add_file_filtering_cli(parser, excluded=excluded)
parser.add_argument('filenames', nargs=nargs or '+', metavar='FILENAME')
return [
process_files,
]
def add_file_filtering_cli(parser, *, excluded=None):
parser.add_argument('--start')
parser.add_argument('--include', action='append')
parser.add_argument('--exclude', action='append')
excluded = tuple(excluded or ())
def process_args(args):
ns = vars(args)
key = 'iter_filenames'
if key in ns:
parser.error(f'duplicate arg {key!r}')
_include = tuple(ns.pop('include') or ())
_exclude = excluded + tuple(ns.pop('exclude') or ())
kwargs = dict(
start=ns.pop('start'),
include=tuple(_parse_files(_include)),
exclude=tuple(_parse_files(_exclude)),
# We use the default for "show_header"
)
def process_filenames(filenames, relroot=None):
return fsutil.process_filenames(filenames, relroot=relroot, **kwargs)
ns[key] = process_filenames
return process_args
def _parse_files(filenames):
for filename, _ in strutil.parse_entries(filenames):
yield filename.strip()
def add_progress_cli(parser, *, threshold=VERBOSITY, **kwargs):
parser.add_argument('--progress', dest='track_progress', action='store_const', const=True)
parser.add_argument('--no-progress', dest='track_progress', action='store_false')
parser.set_defaults(track_progress=True)
def process_args(args):
if args.track_progress:
ns = vars(args)
verbosity = ns.get('verbosity', VERBOSITY)
if verbosity <= threshold:
args.track_progress = track_progress_compact
else:
args.track_progress = track_progress_flat
return process_args
def add_failure_filtering_cli(parser, pool, *, default=False):
parser.add_argument('--fail', action='append',
metavar=f'"{{all|{"|".join(sorted(pool))}}},..."')
parser.add_argument('--no-fail', dest='fail', action='store_const', const=())
def process_args(args):
ns = vars(args)
fail = ns.pop('fail')
try:
fail = normalize_selection(fail, possible=pool)
except UnsupportedSelectionError as exc:
parser.error(f'invalid --fail values: {", ".join(exc.unique)}')
else:
if fail is None:
fail = default
if fail is True:
def ignore_exc(_exc):
return False
elif fail is False:
def ignore_exc(_exc):
return True
else:
def ignore_exc(exc):
for err in fail:
if type(exc) == pool[err]:
return False
else:
return True
args.ignore_exc = ignore_exc
return process_args
def add_kind_filtering_cli(parser, *, default=None):
parser.add_argument('--kinds', action='append')
def process_args(args):
ns = vars(args)
kinds = []
for kind in ns.pop('kinds') or default or ():
kinds.extend(kind.strip().replace(',', ' ').split())
if not kinds:
match_kind = (lambda k: True)
else:
included = set()
excluded = set()
for kind in kinds:
if kind.startswith('-'):
kind = kind[1:]
excluded.add(kind)
if kind in included:
included.remove(kind)
else:
included.add(kind)
if kind in excluded:
excluded.remove(kind)
if excluded:
if included:
... # XXX fail?
def match_kind(kind, *, _excluded=excluded):
return kind not in _excluded
else:
def match_kind(kind, *, _included=included):
return kind in _included
args.match_kind = match_kind
return process_args
COMMON_CLI = [
add_verbosity_cli,
add_traceback_cli,
#add_dryrun_cli,
]
def add_commands_cli(parser, commands, *, commonspecs=COMMON_CLI, subset=None):
arg_processors = {}
if isinstance(subset, str):
cmdname = subset
try:
_, argspecs, _ = commands[cmdname]
except KeyError:
raise ValueError(f'unsupported subset {subset!r}')
parser.set_defaults(cmd=cmdname)
arg_processors[cmdname] = _add_cmd_cli(parser, commonspecs, argspecs)
else:
if subset is None:
cmdnames = subset = list(commands)
elif not subset:
raise NotImplementedError
elif isinstance(subset, set):
cmdnames = [k for k in commands if k in subset]
subset = sorted(subset)
else:
cmdnames = [n for n in subset if n in commands]
if len(cmdnames) < len(subset):
bad = tuple(n for n in subset if n not in commands)
raise ValueError(f'unsupported subset {bad}')
common = argparse.ArgumentParser(add_help=False)
common_processors = apply_cli_argspecs(common, commonspecs)
subs = parser.add_subparsers(dest='cmd')
for cmdname in cmdnames:
description, argspecs, _ = commands[cmdname]
sub = subs.add_parser(
cmdname,
description=description,
parents=[common],
)
cmd_processors = _add_cmd_cli(sub, (), argspecs)
arg_processors[cmdname] = common_processors + cmd_processors
return arg_processors
def _add_cmd_cli(parser, commonspecs, argspecs):
processors = []
argspecs = list(commonspecs or ()) + list(argspecs or ())
for argspec in argspecs:
if callable(argspec):
procs = argspec(parser)
_add_procs(processors, procs)
else:
if not argspec:
raise NotImplementedError
args = list(argspec)
if not isinstance(args[-1], str):
kwargs = args.pop()
if not isinstance(args[0], str):
try:
args, = args
except (TypeError, ValueError):
parser.error(f'invalid cmd args {argspec!r}')
else:
kwargs = {}
parser.add_argument(*args, **kwargs)
# There will be nothing to process.
return processors
def _flatten_processors(processors):
for proc in processors:
if proc is None:
continue
if callable(proc):
yield proc
else:
yield from _flatten_processors(proc)
def process_args(args, processors, *, keys=None):
processors = _flatten_processors(processors)
ns = vars(args)
extracted = {}
if keys is None:
for process_args in processors:
for key in process_args(args):
extracted[key] = ns.pop(key)
else:
remainder = set(keys)
for process_args in processors:
hanging = process_args(args)
if isinstance(hanging, str):
hanging = [hanging]
for key in hanging or ():
if key not in remainder:
raise NotImplementedError(key)
extracted[key] = ns.pop(key)
remainder.remove(key)
if remainder:
raise NotImplementedError(sorted(remainder))
return extracted
def process_args_by_key(args, processors, keys):
extracted = process_args(args, processors, keys=keys)
return [extracted[key] for key in keys]
##################################
# commands
def set_command(name, add_cli):
"""A decorator factory to set CLI info."""
def decorator(func):
if hasattr(func, '__cli__'):
raise Exception(f'already set')
func.__cli__ = (name, add_cli)
return func
return decorator
##################################
# main() helpers
def filter_filenames(filenames, process_filenames=None, relroot=fsutil.USE_CWD):
# We expect each filename to be a normalized, absolute path.
for filename, _, check, _ in _iter_filenames(filenames, process_filenames, relroot):
if (reason := check()):
logger.debug(f'{filename}: {reason}')
continue
yield filename
def main_for_filenames(filenames, process_filenames=None, relroot=fsutil.USE_CWD):
filenames, relroot = fsutil.fix_filenames(filenames, relroot=relroot)
for filename, relfile, check, show in _iter_filenames(filenames, process_filenames, relroot):
if show:
print()
print(relfile)
print('-------------------------------------------')
if (reason := check()):
print(reason)
continue
yield filename, relfile
def _iter_filenames(filenames, process, relroot):
if process is None:
yield from fsutil.process_filenames(filenames, relroot=relroot)
return
onempty = Exception('no filenames provided')
items = process(filenames, relroot=relroot)
items, peeked = iterutil.peek_and_iter(items)
if not items:
raise onempty
if isinstance(peeked, str):
if relroot and relroot is not fsutil.USE_CWD:
relroot = os.path.abspath(relroot)
check = (lambda: True)
for filename, ismany in iterutil.iter_many(items, onempty):
relfile = fsutil.format_filename(filename, relroot, fixroot=False)
yield filename, relfile, check, ismany
elif len(peeked) == 4:
yield from items
else:
raise NotImplementedError
def track_progress_compact(items, *, groups=5, **mark_kwargs):
last = os.linesep
marks = iter_marks(groups=groups, **mark_kwargs)
for item in items:
last = next(marks)
print(last, end='', flush=True)
yield item
if not last.endswith(os.linesep):
print()
def track_progress_flat(items, fmt='<{}>'):
for item in items:
print(fmt.format(item), flush=True)
yield item
def iter_marks(mark='.', *, group=5, groups=2, lines=_NOT_SET, sep=' '):
mark = mark or ''
group = group if group and group > 1 else 1
groups = groups if groups and groups > 1 else 1
sep = f'{mark}{sep}' if sep else mark
end = f'{mark}{os.linesep}'
div = os.linesep
perline = group * groups
if lines is _NOT_SET:
# By default we try to put about 100 in each line group.
perlines = 100 // perline * perline
elif not lines or lines < 0:
perlines = None
else:
perlines = perline * lines
if perline == 1:
yield end
elif group == 1:
yield sep
count = 1
while True:
if count % perline == 0:
yield end
if perlines and count % perlines == 0:
yield div
elif count % group == 0:
yield sep
else:
yield mark
count += 1