| #!/usr/bin/python |
| """ |
| Script to verify errors on autotest code contributions (patches). |
| The workflow is as follows: |
| |
| * Patch will be applied and eventual problems will be notified. |
| * If there are new files created, remember user to add them to VCS. |
| * If any added file looks like a executable file, remember user to make them |
| executable. |
| * If any of the files added or modified introduces trailing whitespaces, tabs |
| or incorrect indentation, report problems. |
| * If any of the files have problems during pylint validation, report failures. |
| * If any of the files changed have a unittest suite, run the unittest suite |
| and report any failures. |
| |
| Usage: check_patch.py -p [/path/to/patch] |
| check_patch.py -i [patchwork id] |
| |
| @copyright: Red Hat Inc, 2009. |
| @author: Lucas Meneghel Rodrigues <lmr@redhat.com> |
| """ |
| |
| import os, stat, logging, sys, optparse, time |
| import common |
| from autotest_lib.client.common_lib import utils, error, logging_config |
| from autotest_lib.client.common_lib import logging_manager |
| |
| |
| class CheckPatchLoggingConfig(logging_config.LoggingConfig): |
| def configure_logging(self, results_dir=None, verbose=False): |
| super(CheckPatchLoggingConfig, self).configure_logging(use_console=True, |
| verbose=verbose) |
| |
| |
| class VCS(object): |
| """ |
| Abstraction layer to the version control system. |
| """ |
| def __init__(self): |
| """ |
| Class constructor. Guesses the version control name and instantiates it |
| as a backend. |
| """ |
| backend_name = self.guess_vcs_name() |
| if backend_name == "SVN": |
| self.backend = SubVersionBackend() |
| |
| |
| def guess_vcs_name(self): |
| if os.path.isdir(".svn"): |
| return "SVN" |
| else: |
| logging.error("Could not figure version control system. Are you " |
| "on a working directory? Aborting.") |
| sys.exit(1) |
| |
| |
| def get_unknown_files(self): |
| """ |
| Return a list of files unknown to the VCS. |
| """ |
| return self.backend.get_unknown_files() |
| |
| |
| def get_modified_files(self): |
| """ |
| Return a list of files that were modified, according to the VCS. |
| """ |
| return self.backend.get_modified_files() |
| |
| |
| def add_untracked_file(self, file): |
| """ |
| Add an untracked file to version control. |
| """ |
| return self.backend.add_untracked_file(file) |
| |
| |
| def revert_file(self, file): |
| """ |
| Restore file according to the latest state on the reference repo. |
| """ |
| return self.backend.revert_file(file) |
| |
| |
| def apply_patch(self, patch): |
| """ |
| Applies a patch using the most appropriate method to the particular VCS. |
| """ |
| return self.backend.apply_patch(patch) |
| |
| |
| def update(self): |
| """ |
| Updates the tree according to the latest state of the public tree |
| """ |
| return self.backend.update() |
| |
| |
| class SubVersionBackend(object): |
| """ |
| Implementation of a subversion backend for use with the VCS abstraction |
| layer. |
| """ |
| def __init__(self): |
| logging.debug("Subversion VCS backend initialized.") |
| self.ignored_extension_list = ['.orig', '.bak'] |
| |
| |
| def get_unknown_files(self): |
| status = utils.system_output("svn status --ignore-externals") |
| unknown_files = [] |
| for line in status.split("\n"): |
| status_flag = line[0] |
| if line and status_flag == "?": |
| for extension in self.ignored_extension_list: |
| if not line.endswith(extension): |
| unknown_files.append(line[1:].strip()) |
| return unknown_files |
| |
| |
| def get_modified_files(self): |
| status = utils.system_output("svn status --ignore-externals") |
| modified_files = [] |
| for line in status.split("\n"): |
| status_flag = line[0] |
| if line and status_flag == "M" or status_flag == "A": |
| modified_files.append(line[1:].strip()) |
| return modified_files |
| |
| |
| def add_untracked_file(self, file): |
| """ |
| Add an untracked file under revision control. |
| |
| @param file: Path to untracked file. |
| """ |
| try: |
| utils.run('svn add %s' % file) |
| except error.CmdError, e: |
| logging.error("Problem adding file %s to svn: %s", file, e) |
| sys.exit(1) |
| |
| |
| def revert_file(self, file): |
| """ |
| Revert file against last revision. |
| |
| @param file: Path to file to be reverted. |
| """ |
| try: |
| utils.run('svn revert %s' % file) |
| except error.CmdError, e: |
| logging.error("Problem reverting file %s: %s", file, e) |
| sys.exit(1) |
| |
| |
| def apply_patch(self, patch): |
| """ |
| Apply a patch to the code base. Patches are expected to be made using |
| level -p1, and taken according to the code base top level. |
| |
| @param patch: Path to the patch file. |
| """ |
| try: |
| utils.system_output("patch -p1 < %s" % patch) |
| except: |
| logging.error("Patch applied incorrectly. Possible causes: ") |
| logging.error("1 - Patch might not be -p1") |
| logging.error("2 - You are not at the top of the autotest tree") |
| logging.error("3 - Patch was made using an older tree") |
| logging.error("4 - Mailer might have messed the patch") |
| sys.exit(1) |
| |
| def update(self): |
| try: |
| utils.system("svn update", ignore_status=True) |
| except error.CmdError, e: |
| logging.error("SVN tree update failed: %s" % e) |
| |
| |
| class FileChecker(object): |
| """ |
| Picks up a given file and performs various checks, looking after problems |
| and eventually suggesting solutions. |
| """ |
| def __init__(self, path, confirm=False): |
| """ |
| Class constructor, sets the path attribute. |
| |
| @param path: Path to the file that will be checked. |
| @param confirm: Whether to answer yes to all questions asked without |
| prompting the user. |
| """ |
| self.path = path |
| self.confirm = confirm |
| self.basename = os.path.basename(self.path) |
| if self.basename.endswith('.py'): |
| self.is_python = True |
| else: |
| self.is_python = False |
| |
| mode = os.stat(self.path)[stat.ST_MODE] |
| if mode & stat.S_IXUSR: |
| self.is_executable = True |
| else: |
| self.is_executable = False |
| |
| checked_file = open(self.path, "r") |
| self.first_line = checked_file.readline() |
| checked_file.close() |
| self.corrective_actions = [] |
| self.indentation_exceptions = ['job_unittest.py'] |
| |
| |
| def _check_indent(self): |
| """ |
| Verifies the file with reindent.py. This tool performs the following |
| checks on python files: |
| |
| * Trailing whitespaces |
| * Tabs |
| * End of line |
| * Incorrect indentation |
| |
| For the purposes of checking, the dry run mode is used and no changes |
| are made. It is up to the user to decide if he wants to run reindent |
| to correct the issues. |
| """ |
| reindent_raw = utils.system_output('reindent.py -v -d %s | head -1' % |
| self.path) |
| reindent_results = reindent_raw.split(" ")[-1].strip(".") |
| if reindent_results == "changed": |
| if self.basename not in self.indentation_exceptions: |
| self.corrective_actions.append("reindent.py -v %s" % self.path) |
| |
| |
| def _check_code(self): |
| """ |
| Verifies the file with run_pylint.py. This tool will call the static |
| code checker pylint using the special autotest conventions and warn |
| only on problems. If problems are found, a report will be generated. |
| Some of the problems reported might be bogus, but it's allways good |
| to look at them. |
| """ |
| c_cmd = 'run_pylint.py %s' % self.path |
| rc = utils.system(c_cmd, ignore_status=True) |
| if rc != 0: |
| logging.error("Syntax issues found during '%s'", c_cmd) |
| |
| |
| def _check_unittest(self): |
| """ |
| Verifies if the file in question has a unittest suite, if so, run the |
| unittest and report on any failures. This is important to keep our |
| unit tests up to date. |
| """ |
| if "unittest" not in self.basename: |
| stripped_name = self.basename.strip(".py") |
| unittest_name = stripped_name + "_unittest.py" |
| unittest_path = self.path.replace(self.basename, unittest_name) |
| if os.path.isfile(unittest_path): |
| unittest_cmd = 'python %s' % unittest_path |
| rc = utils.system(unittest_cmd, ignore_status=True) |
| if rc != 0: |
| logging.error("Unittest issues found during '%s'", |
| unittest_cmd) |
| |
| |
| def _check_permissions(self): |
| """ |
| Verifies the execution permissions, specifically: |
| * Files with no shebang and execution permissions are reported. |
| * Files with shebang and no execution permissions are reported. |
| """ |
| if self.first_line.startswith("#!"): |
| if not self.is_executable: |
| self.corrective_actions.append("svn propset svn:executable ON %s" % self.path) |
| else: |
| if self.is_executable: |
| self.corrective_actions.append("svn propdel svn:executable %s" % self.path) |
| |
| |
| def report(self): |
| """ |
| Executes all required checks, if problems are found, the possible |
| corrective actions are listed. |
| """ |
| self._check_permissions() |
| if self.is_python: |
| self._check_indent() |
| self._check_code() |
| self._check_unittest() |
| if self.corrective_actions: |
| for action in self.corrective_actions: |
| answer = utils.ask("Would you like to execute %s?" % action, |
| auto=self.confirm) |
| if answer == "y": |
| rc = utils.system(action, ignore_status=True) |
| if rc != 0: |
| logging.error("Error executing %s" % action) |
| |
| |
| class PatchChecker(object): |
| def __init__(self, patch=None, patchwork_id=None, confirm=False): |
| self.confirm = confirm |
| self.base_dir = os.getcwd() |
| if patch: |
| self.patch = os.path.abspath(patch) |
| if patchwork_id: |
| self.patch = self._fetch_from_patchwork(patchwork_id) |
| |
| if not os.path.isfile(self.patch): |
| logging.error("Invalid patch file %s provided. Aborting.", |
| self.patch) |
| sys.exit(1) |
| |
| self.vcs = VCS() |
| changed_files_before = self.vcs.get_modified_files() |
| if changed_files_before: |
| logging.error("Repository has changed files prior to patch " |
| "application. ") |
| answer = utils.ask("Would you like to revert them?", auto=self.confirm) |
| if answer == "n": |
| logging.error("Not safe to proceed without reverting files.") |
| sys.exit(1) |
| else: |
| for changed_file in changed_files_before: |
| self.vcs.revert_file(changed_file) |
| |
| self.untracked_files_before = self.vcs.get_unknown_files() |
| self.vcs.update() |
| |
| |
| def _fetch_from_patchwork(self, id): |
| """ |
| Gets a patch file from patchwork and puts it under the cwd so it can |
| be applied. |
| |
| @param id: Patchwork patch id. |
| """ |
| patch_url = "http://patchwork.test.kernel.org/patch/%s/mbox/" % id |
| patch_dest = os.path.join(self.base_dir, 'patchwork-%s.patch' % id) |
| patch = utils.get_file(patch_url, patch_dest) |
| # Patchwork sometimes puts garbage on the path, such as long |
| # sequences of underscores (_______). Get rid of those. |
| patch_ro = open(patch, 'r') |
| patch_contents = patch_ro.readlines() |
| patch_ro.close() |
| patch_rw = open(patch, 'w') |
| for line in patch_contents: |
| if not line.startswith("___"): |
| patch_rw.write(line) |
| patch_rw.close() |
| return patch |
| |
| |
| def _check_files_modified_patch(self): |
| untracked_files_after = self.vcs.get_unknown_files() |
| modified_files_after = self.vcs.get_modified_files() |
| add_to_vcs = [] |
| for untracked_file in untracked_files_after: |
| if untracked_file not in self.untracked_files_before: |
| add_to_vcs.append(untracked_file) |
| |
| if add_to_vcs: |
| logging.info("The files: ") |
| for untracked_file in add_to_vcs: |
| logging.info(untracked_file) |
| logging.info("Might need to be added to VCS") |
| answer = utils.ask("Would you like to add them to VCS ?") |
| if answer == "y": |
| for untracked_file in add_to_vcs: |
| self.vcs.add_untracked_file(untracked_file) |
| modified_files_after.append(untracked_file) |
| elif answer == "n": |
| pass |
| |
| for modified_file in modified_files_after: |
| # Additional safety check, new commits might introduce |
| # new directories |
| if os.path.isfile(modified_file): |
| file_checker = FileChecker(modified_file) |
| file_checker.report() |
| |
| |
| def check(self): |
| self.vcs.apply_patch(self.patch) |
| self._check_files_modified_patch() |
| |
| |
| if __name__ == "__main__": |
| parser = optparse.OptionParser() |
| parser.add_option('-p', '--patch', dest="local_patch", action='store', |
| help='path to a patch file that will be checked') |
| parser.add_option('-i', '--patchwork-id', dest="id", action='store', |
| help='id of a given patchwork patch') |
| parser.add_option('--verbose', dest="debug", action='store_true', |
| help='include debug messages in console output') |
| parser.add_option('-f', '--full-check', dest="full_check", |
| action='store_true', |
| help='check the full tree for corrective actions') |
| parser.add_option('-y', '--yes', dest="confirm", |
| action='store_true', |
| help='Answer yes to all questions') |
| |
| options, args = parser.parse_args() |
| local_patch = options.local_patch |
| id = options.id |
| debug = options.debug |
| full_check = options.full_check |
| confirm = options.confirm |
| |
| logging_manager.configure_logging(CheckPatchLoggingConfig(), verbose=debug) |
| |
| ignore_file_list = ['common.py'] |
| if full_check: |
| for root, dirs, files in os.walk('.'): |
| if not '.svn' in root: |
| for file in files: |
| if file not in ignore_file_list: |
| path = os.path.join(root, file) |
| file_checker = FileChecker(path, confirm=confirm) |
| file_checker.report() |
| else: |
| if local_patch: |
| patch_checker = PatchChecker(patch=local_patch, confirm=confirm) |
| elif id: |
| patch_checker = PatchChecker(patchwork_id=id, confirm=confirm) |
| else: |
| logging.error('No patch or patchwork id specified. Aborting.') |
| sys.exit(1) |
| patch_checker.check() |