blob: a5380e052eb219c7246c5a36ca7ac0e12172b97b [file] [log] [blame]
"""Tools for running presubmit checks in a Git repository.
A presubmit checks are defined as a function or other callable. The function may
take either no arguments or a list of the paths on which to run. Presubmit
checks communicate failure by raising any exception.
For example, either of these functions may be used as presubmit checks:
def file_contains_ni(paths):
for path in paths:
with open(path) as file:
contents = file.read()
if 'ni' not in contents and 'nee' not in contents:
raise PresumitFailure('Files must say "ni"!', path=path)
def run_the_build():
subprocess.run(['make', 'release'], check=True)
Presubmit checks are provided to the parse_args_and_run_presubmit or
run_presubmit function as a list. For example,
PRESUBMIT_CHECKS = [file_contains_ni, run_the_build]
sys.exit(0 if parse_args_and_run_presubmit(PRESUBMIT_CHECKS) else 1)
Presubmit checks that accept a list of paths may use the filter_paths decorator
to automatically filter the paths list for file types they care about. See the
pragma_once function for an example.
"""
import argparse
import collections
import enum
import functools
import re
import os
import shlex
import subprocess
import sys
import time
from typing import Callable, Iterable, Optional, Sequence
from inspect import signature
def plural(items_or_count, singular: str, count_format='') -> str:
"""Returns the singular or plural form of a word based on a count."""
try:
count = len(items_or_count)
except TypeError:
count = items_or_count
num = f'{count:{count_format}}'
if singular.endswith('y'):
return f'{num} {singular[:-1]}{"y" if count == 1 else "ies"}'
if singular.endswith('s'):
return f'{num} {singular}{"" if count == 1 else "es"}'
return f'{num} {singular}{"" if count == 1 else "s"}'
def git_stdout(*args: str) -> str:
return subprocess.run(('git', ) + args, stdout=subprocess.PIPE,
check=True).stdout.decode().strip()
def git_list_files(*args: str) -> Sequence[str]:
return git_stdout('ls-files', *args).split()
def git_diff_names(against: str = 'HEAD',
paths: Sequence[str] = ()) -> Sequence[str]:
return git_stdout('diff', '--name-only', against, '--', *paths).split()
def list_files(commit: Optional[str] = None,
paths: Sequence[str] = (),
exclude: Sequence = ()) -> Sequence[str]:
"""Lists files changed since the specified commit."""
return [
path for path in
(git_diff_names(commit, paths) if commit else git_list_files(*paths))
if not any(pattern.search(path) for pattern in exclude)
]
def git_repo_root(path: str = './') -> str:
return git_stdout('-C', path, 'rev-parse', '--show-toplevel')
def _make_color(*codes: int):
start = ''.join(f'\033[{code}m' for code in codes)
return f'{start}{{}}\033[0m'.format if os.name == 'posix' else str
color_red = _make_color(31, 1)
color_bold_red = _make_color(30, 41)
color_yellow = _make_color(33, 1)
color_green = _make_color(32)
color_bold_green = _make_color(30, 42)
def _make_box(section_alignments: Sequence[str]) -> str:
indices = [i + 1 for i in range(len(section_alignments))]
top_sections = '{2}'.join('{1:{1}^{width%d}}' % i for i in indices)
mid_sections = '{5}'.join('{section%d:%s{width%d}}' %
(i, section_alignments[i - 1], i)
for i in indices)
bot_sections = '{9}'.join('{8:{8}^{width%d}}' % i for i in indices)
# yapf: disable
return ''.join(['{0}', *top_sections, '{3}\n',
'{4}', *mid_sections, '{6}\n',
'{7}', *bot_sections, '{10}'])
# yapf: enable
_DOUBLE = '╔═╦╗║║║╚═╩╝'
_TOP = '┏━┯┓┃│┃┖┄┴┚'
_BOTTOM = '┎┄┬┒┃│┃┗━┷┛'
WIDTH = 80
def _title(msg, style=_DOUBLE):
msg = f' {msg} '.center(WIDTH - 4)
return _make_box('^').format(*style, section1=msg, width1=len(msg))
def _start_box(style, count, title, box=_make_box('><')):
info = f' {title} '.ljust(WIDTH - 13)
return box.format(*style,
section1=count + ' ',
width1=8,
section2=info,
width2=len(info))
def _result_box(style, result, color, title, time_s, box=_make_box('^<^')):
minutes, seconds = divmod(time_s, 60)
time_str = ' ' * 8 if time_s is None else f'{int(minutes)}:{seconds:04.1f}'
info = f' {title} '.ljust(WIDTH - 14 - len(time_str))
return box.format(*style,
section1=f' {color(result.center(6))} ',
width1=8,
section2=info,
width2=len(info),
section3=time_str,
width3=len(time_str) + 2)
class PresubmitFailure(Exception):
"""Optional exception to use for presubmit failures."""
def __init__(self, description: str = '', path: Optional[str] = None):
super().__init__(f'{path}: {description}' if path else description)
class _Result(enum.Enum):
PASS = 'PASSED' # Check completed successfully.
FAIL = 'FAILED' # Check failed.
CANCEL = 'CANCEL' # Check didn't complete.
def color(self) -> Callable:
if self is _Result.PASS:
return color_green
if self is _Result.FAIL:
return color_red
if self is _Result.CANCEL:
return color_yellow
class Presubmit:
"""Runs a series of presubmit checks on a list of files."""
def __init__(self, root: str, paths: Sequence[str]):
self.root = root
self.paths = paths
self.log = print
def run(self, program: Sequence[Callable]) -> bool:
"""Executes a series of presubmit checks on the paths."""
self.log(_title(f'Presubmit checks for {os.path.basename(self.root)}'))
self._log_presubmit_start(program)
start_time = time.time()
for i, check in enumerate(program):
self.log(_start_box(_TOP, f'{i+1}/{len(program)}', check.__name__))
time_s = time.time()
result = self._run_check(check)
time_s = time.time() - time_s
self.log(
_result_box(_BOTTOM, result.value, result.color(),
check.__name__, time_s))
if result is not _Result.PASS:
passed, failed = program[:i], program[i:]
break
else:
passed, failed = program, []
self._log_summary(time.time() - start_time, len(passed), len(failed))
return not failed
def _run_check(self, check: Callable) -> _Result:
try:
check(*([self.paths] if signature(check).parameters else []))
return _Result.PASS
except Exception as failure:
self.log(failure)
return _Result.FAIL
except KeyboardInterrupt:
self.log()
return _Result.CANCEL
def _log_presubmit_start(self, program: Sequence[Callable]) -> None:
self.log(f'Running {plural(program, "check")} on '
f'{plural(self.paths, "file")} in {self.root}\n')
files = collections.Counter(
os.path.splitext(path)[1] or os.path.basename(path)
for path in self.paths)
if files:
width = max(len(f) for f in files) + 2
max_count_width = len(str(max(files.values())))
for ext, count in sorted(files.items()):
self.log(
f'{ext:>{width}}: {plural(count, "file", max_count_width)}'
)
def _log_summary(self, time_s: float, passed: int, failed: int) -> None:
text = 'FAILED' if failed else 'PASSED'
color = color_bold_red if failed else color_bold_green
self.log(
_result_box(_DOUBLE, text, color,
f'Finished {passed} of {passed + failed} checks',
time_s))
def add_parser_arguments(parser) -> None:
"""Adds common presubmit check options to an argument parser."""
parser.add_argument(
'paths',
nargs='*',
help=('Paths to which to restrict the presubmit checks. '
'Used in conjunction with --base, if provided.'))
parser.add_argument(
'-b',
'--base',
metavar='COMMIT',
help=('Git revision against which to diff for changed files. '
'If none is provided, the entire repository is used.'))
parser.add_argument(
'-r',
'--repository',
default='.',
help=('Path to the repository in which to run the checks; '
"defaults to the current directory's Git repo."))
parser.add_argument(
'-e',
'--exclude',
metavar='REGULAR_EXPRESSION',
default=[],
action='append',
type=re.compile,
help='Exclude paths matching any of these regular expressions.')
def run_presubmit(program: Sequence[Callable],
base: Optional[str] = None,
paths: Sequence[str] = (),
exclude: Sequence = (),
repository: str = '.') -> bool:
"""Lists files in the current Git repo and runs a Presubmit with them."""
root = git_repo_root(repository)
os.chdir(root)
return Presubmit(root, list_files(base, paths, exclude)).run(program)
def parse_args_and_run_presubmit(
program: Sequence[Callable],
arg_parser: Optional[argparse.ArgumentParser] = None) -> bool:
"""Parses the command line arguments and calls run_presubmit with them."""
if arg_parser is None:
arg_parser = argparse.ArgumentParser(
description='Runs presubmit checks on a Git repository.',
formatter_class=argparse.RawDescriptionHelpFormatter)
add_parser_arguments(arg_parser)
return run_presubmit(program, **vars(arg_parser.parse_args()))
def filter_paths(endswith: Iterable[str] = (), skip_if_empty: bool = True):
"""Decorator for filtering the files list for a presubmit check function."""
endswith = frozenset([endswith] if isinstance(endswith, str) else endswith)
def filter_paths_for_function(function: Callable):
@functools.wraps(function)
def wrapped_function(paths: Iterable[str]):
paths = [
path for path in paths if any(
path.endswith(end) for end in endswith)
]
if not paths and skip_if_empty:
print('Skipping check: no affected files')
else:
return function(paths)
return wrapped_function
return filter_paths_for_function
def call(*args, **kwargs) -> None:
"""Optional subprocess wrapper with helpful output."""
print('[COMMAND]',
', '.join(f'{k}={v}' for k, v in sorted(kwargs.items())))
print(' '.join(shlex.quote(arg) for arg in args))
process = subprocess.run(args,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
**kwargs)
if process.returncode:
print(f'[RESULT] Failed with return code {process.returncode}.')
print('[OUTPUT]')
print(process.stdout.decode(errors="backslashreplace"))
raise PresubmitFailure
@filter_paths(endswith='.h')
def pragma_once(paths: Iterable[str]) -> None:
"""Checks that all header files contain '#pragma once'."""
for path in paths:
with open(path) as file:
for line in file:
if line.startswith('#pragma once'):
break
else:
raise PresubmitFailure('#pragma once is missing!', path=path)
if __name__ == '__main__':
# As an example, run a presubmit with the pragma_once check.
sys.exit(0 if parse_args_and_run_presubmit([pragma_once]) else 1)