| # pylint: disable-msg=C0111 |
| # Copyright 2008 Google Inc. Released under the GPL v2 |
| |
| import warnings |
| with warnings.catch_warnings(): |
| # The 'compiler' module is gone in Python 3.0. Let's not say |
| # so in every log file. |
| warnings.simplefilter("ignore", DeprecationWarning) |
| import compiler |
| import logging, textwrap |
| |
| from autotest_lib.client.common_lib import enum |
| |
| REQUIRED_VARS = set(['author', 'doc', 'name', 'time', 'test_type']) |
| OBSOLETE_VARS = set(['experimental']) |
| |
| CONTROL_TYPE = enum.Enum('Server', 'Client', start_value=1) |
| CONTROL_TYPE_NAMES = enum.Enum(*CONTROL_TYPE.names, string_values=True) |
| |
| class ControlVariableException(Exception): |
| pass |
| |
| |
| class ControlData(object): |
| # Available TIME settings in control file, the list must be in lower case |
| # and in ascending order, test running faster comes first. |
| TEST_TIME_LIST = ['fast', 'short', 'medium', 'long', 'lengthy'] |
| TEST_TIME = enum.Enum(*TEST_TIME_LIST, string_values=False) |
| |
| @staticmethod |
| def get_test_time_index(time): |
| """ |
| Get the order of estimated test time, based on the TIME setting in |
| Control file. Faster test gets a lower index number. |
| """ |
| try: |
| return ControlData.TEST_TIME.get_value(time.lower()) |
| except AttributeError: |
| # Raise exception if time value is not a valid TIME setting. |
| error_msg = '%s is not a valid TIME.' % time |
| logging.error(error_msg) |
| raise ControlVariableException(error_msg) |
| |
| |
| def __init__(self, vars, path, raise_warnings=False): |
| # Defaults |
| self.path = path |
| self.dependencies = set() |
| # TODO(jrbarnette): This should be removed once outside |
| # code that uses can be changed. |
| self.experimental = False |
| self.run_verify = True |
| self.sync_count = 1 |
| self.test_parameters = set() |
| self.test_category = '' |
| self.test_class = '' |
| self.retries = 0 |
| self.job_retries = 0 |
| # Default to require server-side package. Unless require_ssp is |
| # explicitly set to False, server-side package will be used for the |
| # job. This can be overridden by global config |
| # AUTOSERV/enable_ssp_container |
| self.require_ssp = None |
| self.attributes = set() |
| |
| diff = REQUIRED_VARS - set(vars) |
| if diff: |
| warning = ('WARNING: Not all required control ' |
| 'variables were specified in %s. Please define ' |
| '%s.') % (self.path, ', '.join(diff)) |
| if raise_warnings: |
| raise ControlVariableException(warning) |
| print textwrap.wrap(warning, 80) |
| |
| obsolete = OBSOLETE_VARS & set(vars) |
| if obsolete: |
| warning = ('WARNING: Obsolete variables were ' |
| 'specified in %s. Please remove ' |
| '%s.') % (self.path, ', '.join(obsolete)) |
| if raise_warnings: |
| raise ControlVariableException(warning) |
| print textwrap.wrap(warning, 80) |
| |
| for key, val in vars.iteritems(): |
| try: |
| self.set_attr(key, val, raise_warnings) |
| except Exception, e: |
| if raise_warnings: |
| raise |
| print 'WARNING: %s; skipping' % e |
| |
| |
| def set_attr(self, attr, val, raise_warnings=False): |
| attr = attr.lower() |
| try: |
| set_fn = getattr(self, 'set_%s' % attr) |
| set_fn(val) |
| except AttributeError: |
| # This must not be a variable we care about |
| pass |
| |
| |
| def _set_string(self, attr, val): |
| val = str(val) |
| setattr(self, attr, val) |
| |
| |
| def _set_option(self, attr, val, options): |
| val = str(val) |
| if val.lower() not in [x.lower() for x in options]: |
| raise ValueError("%s must be one of the following " |
| "options: %s" % (attr, |
| ', '.join(options))) |
| setattr(self, attr, val) |
| |
| |
| def _set_bool(self, attr, val): |
| val = str(val).lower() |
| if val == "false": |
| val = False |
| elif val == "true": |
| val = True |
| else: |
| msg = "%s must be either true or false" % attr |
| raise ValueError(msg) |
| setattr(self, attr, val) |
| |
| |
| def _set_int(self, attr, val, min=None, max=None): |
| val = int(val) |
| if min is not None and min > val: |
| raise ValueError("%s is %d, which is below the " |
| "minimum of %d" % (attr, val, min)) |
| if max is not None and max < val: |
| raise ValueError("%s is %d, which is above the " |
| "maximum of %d" % (attr, val, max)) |
| setattr(self, attr, val) |
| |
| |
| def _set_set(self, attr, val): |
| val = str(val) |
| items = [x.strip() for x in val.split(',')] |
| setattr(self, attr, set(items)) |
| |
| |
| def set_author(self, val): |
| self._set_string('author', val) |
| |
| |
| def set_dependencies(self, val): |
| self._set_set('dependencies', val) |
| |
| |
| def set_doc(self, val): |
| self._set_string('doc', val) |
| |
| |
| def set_name(self, val): |
| self._set_string('name', val) |
| |
| |
| def set_run_verify(self, val): |
| self._set_bool('run_verify', val) |
| |
| |
| def set_sync_count(self, val): |
| self._set_int('sync_count', val, min=1) |
| |
| |
| def set_suite(self, val): |
| self._set_string('suite', val) |
| |
| |
| def set_time(self, val): |
| self._set_option('time', val, ControlData.TEST_TIME_LIST) |
| |
| |
| def set_test_class(self, val): |
| self._set_string('test_class', val.lower()) |
| |
| |
| def set_test_category(self, val): |
| self._set_string('test_category', val.lower()) |
| |
| |
| def set_test_type(self, val): |
| self._set_option('test_type', val, list(CONTROL_TYPE.names)) |
| |
| |
| def set_test_parameters(self, val): |
| self._set_set('test_parameters', val) |
| |
| |
| def set_retries(self, val): |
| self._set_int('retries', val) |
| |
| |
| def set_job_retries(self, val): |
| self._set_int('job_retries', val) |
| |
| |
| def set_bug_template(self, val): |
| if type(val) == dict: |
| setattr(self, 'bug_template', val) |
| |
| |
| def set_require_ssp(self, val): |
| self._set_bool('require_ssp', val) |
| |
| |
| def set_attributes(self, val): |
| self._set_set('attributes', val) |
| |
| |
| def _extract_const(expr): |
| assert(expr.__class__ == compiler.ast.Const) |
| assert(expr.value.__class__ in (str, int, float, unicode)) |
| return str(expr.value).strip() |
| |
| |
| def _extract_dict(expr): |
| assert(expr.__class__ == compiler.ast.Dict) |
| assert(expr.items.__class__ == list) |
| cf_dict = {} |
| for key, value in expr.items: |
| try: |
| key = _extract_const(key) |
| val = _extract_expression(value) |
| except (AssertionError, ValueError): |
| pass |
| else: |
| cf_dict[key] = val |
| return cf_dict |
| |
| |
| def _extract_list(expr): |
| assert(expr.__class__ == compiler.ast.List) |
| list_values = [] |
| for value in expr.nodes: |
| try: |
| list_values.append(_extract_expression(value)) |
| except (AssertionError, ValueError): |
| pass |
| return list_values |
| |
| |
| def _extract_name(expr): |
| assert(expr.__class__ == compiler.ast.Name) |
| assert(expr.name in ('False', 'True', 'None')) |
| return str(expr.name) |
| |
| |
| def _extract_expression(expr): |
| if expr.__class__ == compiler.ast.Const: |
| return _extract_const(expr) |
| if expr.__class__ == compiler.ast.Name: |
| return _extract_name(expr) |
| if expr.__class__ == compiler.ast.Dict: |
| return _extract_dict(expr) |
| if expr.__class__ == compiler.ast.List: |
| return _extract_list(expr) |
| raise ValueError('Unknown rval %s' % expr) |
| |
| |
| def _extract_assignment(n): |
| assert(n.__class__ == compiler.ast.Assign) |
| assert(n.nodes.__class__ == list) |
| assert(len(n.nodes) == 1) |
| assert(n.nodes[0].__class__ == compiler.ast.AssName) |
| assert(n.nodes[0].flags.__class__ == str) |
| assert(n.nodes[0].name.__class__ == str) |
| |
| val = _extract_expression(n.expr) |
| key = n.nodes[0].name.lower() |
| |
| return (key, val) |
| |
| |
| def parse_control_string(control, raise_warnings=False): |
| try: |
| mod = compiler.parse(control) |
| except SyntaxError, e: |
| raise ControlVariableException("Error parsing data because %s" % e) |
| return finish_parse(mod, '', raise_warnings) |
| |
| |
| def parse_control(path, raise_warnings=False): |
| try: |
| mod = compiler.parseFile(path) |
| except SyntaxError, e: |
| raise ControlVariableException("Error parsing %s because %s" % |
| (path, e)) |
| return finish_parse(mod, path, raise_warnings) |
| |
| |
| def finish_parse(mod, path, raise_warnings): |
| assert(mod.__class__ == compiler.ast.Module) |
| assert(mod.node.__class__ == compiler.ast.Stmt) |
| assert(mod.node.nodes.__class__ == list) |
| |
| vars = {} |
| for n in mod.node.nodes: |
| try: |
| key, val = _extract_assignment(n) |
| vars[key] = val |
| except (AssertionError, ValueError): |
| pass |
| return ControlData(vars, path, raise_warnings) |