| # pylint: disable-msg=C0111 |
| # Copyright 2008 Google Inc. Released under the GPL v2 |
| |
| import warnings |
| with warnings.catch_warnings(): |
| # The 'compiler' module is gone in Python 3.0. Let's not say |
| # so in every log file. |
| warnings.simplefilter("ignore", DeprecationWarning) |
| import compiler |
| import logging |
| import textwrap |
| import re |
| |
| from autotest_lib.client.common_lib import enum |
| from autotest_lib.client.common_lib import global_config |
| from autotest_lib.client.common_lib import priorities |
| |
| REQUIRED_VARS = set(['author', 'doc', 'name', 'time', 'test_type']) |
| OBSOLETE_VARS = set(['experimental']) |
| |
| CONTROL_TYPE = enum.Enum('Server', 'Client', start_value=1) |
| CONTROL_TYPE_NAMES = enum.Enum(*CONTROL_TYPE.names, string_values=True) |
| |
| _SUITE_ATTRIBUTE_PREFIX = 'suite:' |
| |
| CONFIG = global_config.global_config |
| |
| # Default maximum test result size in kB. |
| DEFAULT_MAX_RESULT_SIZE_KB = CONFIG.get_config_value( |
| 'AUTOSERV', 'default_max_result_size_KB', type=int, default=20000) |
| |
| |
| class ControlVariableException(Exception): |
| pass |
| |
| def _validate_control_file_fields(control_file_path, control_file_vars, |
| raise_warnings): |
| """Validate the given set of variables from a control file. |
| |
| @param control_file_path: string path of the control file these were |
| loaded from. |
| @param control_file_vars: dict of variables set in a control file. |
| @param raise_warnings: True iff we should raise on invalid variables. |
| |
| """ |
| diff = REQUIRED_VARS - set(control_file_vars) |
| if diff: |
| warning = ('WARNING: Not all required control ' |
| 'variables were specified in %s. Please define ' |
| '%s.') % (control_file_path, ', '.join(diff)) |
| if raise_warnings: |
| raise ControlVariableException(warning) |
| print textwrap.wrap(warning, 80) |
| |
| obsolete = OBSOLETE_VARS & set(control_file_vars) |
| if obsolete: |
| warning = ('WARNING: Obsolete variables were ' |
| 'specified in %s. Please remove ' |
| '%s.') % (control_file_path, ', '.join(obsolete)) |
| if raise_warnings: |
| raise ControlVariableException(warning) |
| print textwrap.wrap(warning, 80) |
| |
| |
| class ControlData(object): |
| # Available TIME settings in control file, the list must be in lower case |
| # and in ascending order, test running faster comes first. |
| TEST_TIME_LIST = ['fast', 'short', 'medium', 'long', 'lengthy'] |
| TEST_TIME = enum.Enum(*TEST_TIME_LIST, string_values=False) |
| |
| @staticmethod |
| def get_test_time_index(time): |
| """ |
| Get the order of estimated test time, based on the TIME setting in |
| Control file. Faster test gets a lower index number. |
| """ |
| try: |
| return ControlData.TEST_TIME.get_value(time.lower()) |
| except AttributeError: |
| # Raise exception if time value is not a valid TIME setting. |
| error_msg = '%s is not a valid TIME.' % time |
| logging.error(error_msg) |
| raise ControlVariableException(error_msg) |
| |
| |
| def __init__(self, vars, path, raise_warnings=False): |
| # Defaults |
| self.path = path |
| self.dependencies = set() |
| # TODO(jrbarnette): This should be removed once outside |
| # code that uses can be changed. |
| self.experimental = False |
| self.run_verify = True |
| self.sync_count = 1 |
| self.test_parameters = set() |
| self.test_category = '' |
| self.test_class = '' |
| self.job_retries = 0 |
| # Default to require server-side package. Unless require_ssp is |
| # explicitly set to False, server-side package will be used for the |
| # job. This can be overridden by global config |
| # AUTOSERV/enable_ssp_container |
| self.require_ssp = None |
| self.attributes = set() |
| self.max_result_size_KB = DEFAULT_MAX_RESULT_SIZE_KB |
| self.priority = priorities.Priority.DEFAULT |
| self.fast = False |
| |
| _validate_control_file_fields(self.path, vars, raise_warnings) |
| |
| for key, val in vars.iteritems(): |
| try: |
| self.set_attr(key, val, raise_warnings) |
| except Exception, e: |
| if raise_warnings: |
| raise |
| print 'WARNING: %s; skipping' % e |
| |
| self._patch_up_suites_from_attributes() |
| |
| |
| @property |
| def suite_tag_parts(self): |
| """Return the part strings of the test's suite tag.""" |
| if hasattr(self, 'suite'): |
| return [part.strip() for part in self.suite.split(',')] |
| else: |
| return [] |
| |
| |
| def set_attr(self, attr, val, raise_warnings=False): |
| attr = attr.lower() |
| try: |
| set_fn = getattr(self, 'set_%s' % attr) |
| set_fn(val) |
| except AttributeError: |
| # This must not be a variable we care about |
| pass |
| |
| |
| def _patch_up_suites_from_attributes(self): |
| """Patch up the set of suites this test is part of. |
| |
| Legacy builds will not have an appropriate ATTRIBUTES field set. |
| Take the union of suites specified via ATTRIBUTES and suites specified |
| via SUITE. |
| |
| SUITE used to be its own variable, but now suites are taken only from |
| the attributes. |
| |
| """ |
| |
| suite_names = set() |
| # Extract any suites we know ourselves to be in based on the SUITE |
| # line. This line is deprecated, but control files in old builds will |
| # still have it. |
| if hasattr(self, 'suite'): |
| existing_suites = self.suite.split(',') |
| existing_suites = [name.strip() for name in existing_suites] |
| existing_suites = [name for name in existing_suites if name] |
| suite_names.update(existing_suites) |
| |
| # Figure out if our attributes mention any suites. |
| for attribute in self.attributes: |
| if not attribute.startswith(_SUITE_ATTRIBUTE_PREFIX): |
| continue |
| suite_name = attribute[len(_SUITE_ATTRIBUTE_PREFIX):] |
| suite_names.add(suite_name) |
| |
| # Rebuild the suite field if necessary. |
| if suite_names: |
| self.set_suite(','.join(sorted(list(suite_names)))) |
| |
| |
| def _set_string(self, attr, val): |
| val = str(val) |
| setattr(self, attr, val) |
| |
| |
| def _set_option(self, attr, val, options): |
| val = str(val) |
| if val.lower() not in [x.lower() for x in options]: |
| raise ValueError("%s must be one of the following " |
| "options: %s" % (attr, |
| ', '.join(options))) |
| setattr(self, attr, val) |
| |
| |
| def _set_bool(self, attr, val): |
| val = str(val).lower() |
| if val == "false": |
| val = False |
| elif val == "true": |
| val = True |
| else: |
| msg = "%s must be either true or false" % attr |
| raise ValueError(msg) |
| setattr(self, attr, val) |
| |
| |
| def _set_int(self, attr, val, min=None, max=None): |
| val = int(val) |
| if min is not None and min > val: |
| raise ValueError("%s is %d, which is below the " |
| "minimum of %d" % (attr, val, min)) |
| if max is not None and max < val: |
| raise ValueError("%s is %d, which is above the " |
| "maximum of %d" % (attr, val, max)) |
| setattr(self, attr, val) |
| |
| |
| def _set_set(self, attr, val): |
| val = str(val) |
| items = [x.strip() for x in val.split(',') if x.strip()] |
| setattr(self, attr, set(items)) |
| |
| |
| def set_author(self, val): |
| self._set_string('author', val) |
| |
| |
| def set_dependencies(self, val): |
| self._set_set('dependencies', val) |
| |
| |
| def set_doc(self, val): |
| self._set_string('doc', val) |
| |
| |
| def set_name(self, val): |
| self._set_string('name', val) |
| |
| |
| def set_run_verify(self, val): |
| self._set_bool('run_verify', val) |
| |
| |
| def set_sync_count(self, val): |
| self._set_int('sync_count', val, min=1) |
| |
| |
| def set_suite(self, val): |
| self._set_string('suite', val) |
| |
| |
| def set_time(self, val): |
| self._set_option('time', val, ControlData.TEST_TIME_LIST) |
| |
| |
| def set_test_class(self, val): |
| self._set_string('test_class', val.lower()) |
| |
| |
| def set_test_category(self, val): |
| self._set_string('test_category', val.lower()) |
| |
| |
| def set_test_type(self, val): |
| self._set_option('test_type', val, list(CONTROL_TYPE.names)) |
| |
| |
| def set_test_parameters(self, val): |
| self._set_set('test_parameters', val) |
| |
| |
| def set_job_retries(self, val): |
| self._set_int('job_retries', val) |
| |
| |
| def set_bug_template(self, val): |
| if type(val) == dict: |
| setattr(self, 'bug_template', val) |
| |
| |
| def set_require_ssp(self, val): |
| self._set_bool('require_ssp', val) |
| |
| |
| def set_build(self, val): |
| self._set_string('build', val) |
| |
| |
| def set_builds(self, val): |
| if type(val) == dict: |
| setattr(self, 'builds', val) |
| |
| def set_max_result_size_kb(self, val): |
| self._set_int('max_result_size_KB', val) |
| |
| def set_priority(self, val): |
| self._set_int('priority', val) |
| |
| def set_fast(self, val): |
| self._set_bool('fast', val) |
| |
| def set_update_type(self, val): |
| self._set_string('update_type', val) |
| |
| def set_source_release(self, val): |
| self._set_string('source_release', val) |
| |
| def set_target_release(self, val): |
| self._set_string('target_release', val) |
| |
| def set_target_payload_uri(self, val): |
| self._set_string('target_payload_uri', val) |
| |
| def set_source_payload_uri(self, val): |
| self._set_string('source_payload_uri', val) |
| |
| def set_source_archive_uri(self, val): |
| self._set_string('source_archive_uri', val) |
| |
| def set_attributes(self, val): |
| # Add subsystem:default if subsystem is not specified. |
| self._set_set('attributes', val) |
| if not any(a.startswith('subsystem') for a in self.attributes): |
| self.attributes.add('subsystem:default') |
| |
| |
| def _extract_const(expr): |
| assert(expr.__class__ == compiler.ast.Const) |
| assert(expr.value.__class__ in (str, int, float, unicode)) |
| return str(expr.value).strip() |
| |
| |
| def _extract_dict(expr): |
| assert(expr.__class__ == compiler.ast.Dict) |
| assert(expr.items.__class__ == list) |
| cf_dict = {} |
| for key, value in expr.items: |
| try: |
| key = _extract_const(key) |
| val = _extract_expression(value) |
| except (AssertionError, ValueError): |
| pass |
| else: |
| cf_dict[key] = val |
| return cf_dict |
| |
| |
| def _extract_list(expr): |
| assert(expr.__class__ == compiler.ast.List) |
| list_values = [] |
| for value in expr.nodes: |
| try: |
| list_values.append(_extract_expression(value)) |
| except (AssertionError, ValueError): |
| pass |
| return list_values |
| |
| |
| def _extract_name(expr): |
| assert(expr.__class__ == compiler.ast.Name) |
| assert(expr.name in ('False', 'True', 'None')) |
| return str(expr.name) |
| |
| |
| def _extract_expression(expr): |
| if expr.__class__ == compiler.ast.Const: |
| return _extract_const(expr) |
| if expr.__class__ == compiler.ast.Name: |
| return _extract_name(expr) |
| if expr.__class__ == compiler.ast.Dict: |
| return _extract_dict(expr) |
| if expr.__class__ == compiler.ast.List: |
| return _extract_list(expr) |
| raise ValueError('Unknown rval %s' % expr) |
| |
| |
| def _extract_assignment(n): |
| assert(n.__class__ == compiler.ast.Assign) |
| assert(n.nodes.__class__ == list) |
| assert(len(n.nodes) == 1) |
| assert(n.nodes[0].__class__ == compiler.ast.AssName) |
| assert(n.nodes[0].flags.__class__ == str) |
| assert(n.nodes[0].name.__class__ == str) |
| |
| val = _extract_expression(n.expr) |
| key = n.nodes[0].name.lower() |
| |
| return (key, val) |
| |
| |
| def parse_control_string(control, raise_warnings=False, path=''): |
| """Parse a control file from a string. |
| |
| @param control: string containing the text of a control file. |
| @param raise_warnings: True iff ControlData should raise an error on |
| warnings about control file contents. |
| @param path: string path to the control file. |
| |
| """ |
| try: |
| mod = compiler.parse(control) |
| except SyntaxError as e: |
| logging.error('Syntax error (%s) while parsing control string:', e) |
| lines = control.split('\n') |
| for n, l in enumerate(lines): |
| logging.error('Line %d: %s', n + 1, l) |
| raise ControlVariableException("Error parsing data because %s" % e) |
| return finish_parse(mod, path, raise_warnings) |
| |
| |
| def parse_control(path, raise_warnings=False): |
| try: |
| mod = compiler.parseFile(path) |
| except SyntaxError, e: |
| raise ControlVariableException("Error parsing %s because %s" % |
| (path, e)) |
| return finish_parse(mod, path, raise_warnings) |
| |
| |
| def _try_extract_assignment(node, variables): |
| """Try to extract assignment from the given node. |
| |
| @param node: An Assign object. |
| @param variables: Dictionary to store the parsed assignments. |
| """ |
| try: |
| key, val = _extract_assignment(node) |
| variables[key] = val |
| except (AssertionError, ValueError): |
| pass |
| |
| |
| def finish_parse(mod, path, raise_warnings): |
| assert(mod.__class__ == compiler.ast.Module) |
| assert(mod.node.__class__ == compiler.ast.Stmt) |
| assert(mod.node.nodes.__class__ == list) |
| |
| variables = {} |
| injection_variables = {} |
| for n in mod.node.nodes: |
| if (n.__class__ == compiler.ast.Function and |
| re.match('step\d+', n.name)): |
| vars_in_step = {} |
| for sub_node in n.code.nodes: |
| _try_extract_assignment(sub_node, vars_in_step) |
| if vars_in_step: |
| # Empty the vars collection so assignments from multiple steps |
| # won't be mixed. |
| variables.clear() |
| variables.update(vars_in_step) |
| else: |
| _try_extract_assignment(n, injection_variables) |
| |
| variables.update(injection_variables) |
| return ControlData(variables, path, raise_warnings) |