| # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Utility classes used by site_server_job.distribute_across_machines(). |
| |
| test_item: extends the basic test tuple to add include/exclude attributes and |
| pre/post actions. |
| |
| machine_worker: is a thread that manages running tests on a host. It |
| verifies test are valid for a host using the test attributes from test_item |
| and the host attributes from host_attributes. |
| """ |
| |
| |
| import logging, os, Queue |
| from autotest_lib.client.common_lib import error, utils |
| from autotest_lib.server import autotest, hosts, host_attributes |
| |
| |
| class test_item(object): |
| """Adds machine verification logic to the basic test tuple. |
| |
| Tests can either be tuples of the existing form ('testName', {args}) or the |
| extended form ('testname', {args}, {'include': [], 'exclude': [], |
| 'attributes': []}) where include and exclude are lists of host attribute |
| labels and attributes is a list of strings. A machine must have all the |
| labels in include and must not have any of the labels in exclude to be valid |
| for the test. Attributes strings can include reboot_before, reboot_after, |
| and server_job. |
| """ |
| |
| def __init__(self, test_name, test_args, test_attribs=None): |
| """Creates an instance of test_item. |
| |
| Args: |
| test_name: string, name of test to execute. |
| test_args: dictionary, arguments to pass into test. |
| test_attribs: Dictionary of test attributes. Valid keys are: |
| include - labels a machine must have to run a test. |
| exclude - labels preventing a machine from running a test. |
| attributes - reboot before/after test, run test as server job. |
| """ |
| self.test_name = test_name |
| self.test_args = test_args |
| self.tagged_test_name = test_name |
| if test_args.get('tag'): |
| self.tagged_test_name = test_name + '.' + test_args.get('tag') |
| |
| if test_attribs is None: |
| test_attribs = {} |
| self.inc_set = set(test_attribs.get('include', [])) |
| self.exc_set = set(test_attribs.get('exclude', [])) |
| self.attributes = test_attribs.get('attributes', []) |
| |
| def __str__(self): |
| """Return an info string of this test.""" |
| params = ['%s=%s' % (k, v) for k, v in self.test_args.items()] |
| msg = '%s(%s)' % (self.test_name, params) |
| if self.inc_set: |
| msg += ' include=%s' % [s for s in self.inc_set] |
| if self.exc_set: |
| msg += ' exclude=%s' % [s for s in self.exc_set] |
| if self.attributes: |
| msg += ' attributes=%s' % self.attributes |
| return msg |
| |
| def validate(self, machine_attributes): |
| """Check if this test can run on machine with machine_attributes. |
| |
| If the test has include attributes, a candidate machine must have all |
| the attributes to be valid. |
| |
| If the test has exclude attributes, a candidate machine cannot have any |
| of the attributes to be valid. |
| |
| Args: |
| machine_attributes: set, True attributes of candidate machine. |
| |
| Returns: |
| True/False if the machine is valid for this test. |
| """ |
| if self.inc_set is not None: |
| if not self.inc_set <= machine_attributes: |
| return False |
| if self.exc_set is not None: |
| if self.exc_set & machine_attributes: |
| return False |
| return True |
| |
| def run_test(self, client_at, work_dir='.', server_job=None): |
| """Runs the test on the client using autotest. |
| |
| Args: |
| client_at: Autotest instance for this host. |
| work_dir: Directory to use for results and log files. |
| server_job: Server_Job instance to use to runs server tests. |
| """ |
| if 'reboot_before' in self.attributes: |
| client_at.host.reboot() |
| |
| try: |
| if 'server_job' in self.attributes: |
| if 'host' in self.test_args: |
| self.test_args['host'] = client_at.host |
| if server_job is not None: |
| logging.info('Running Server_Job=%s', self.test_name) |
| server_job.run_test(self.test_name, **self.test_args) |
| else: |
| logging.error('No Server_Job instance provided for test ' |
| '%s.', self.test_name) |
| else: |
| client_at.run_test(self.test_name, results_dir=work_dir, |
| **self.test_args) |
| finally: |
| if 'reboot_after' in self.attributes: |
| client_at.host.reboot() |
| |
| |
| class machine_worker(object): |
| """Worker that runs tests on a remote host machine.""" |
| |
| def __init__(self, server_job, machine, work_dir, test_queue, queue_lock, |
| continuous_parsing=False): |
| """Creates an instance of machine_worker to run tests on a remote host. |
| |
| Retrieves that host attributes for this machine and creates the set of |
| True attributes to validate against test include/exclude attributes. |
| |
| Creates a directory to hold the log files for tests run and writes the |
| hostname and tko parser version into keyvals file. |
| |
| Args: |
| server_job: run tests for this server_job. |
| machine: name of remote host. |
| work_dir: directory server job is using. |
| test_queue: queue of tests. |
| queue_lock: lock protecting test_queue. |
| continuous_parsing: bool, enable continuous parsing. |
| """ |
| self._server_job = server_job |
| self._test_queue = test_queue |
| self._test_queue_lock = queue_lock |
| self._continuous_parsing = continuous_parsing |
| self._tests_run = 0 |
| self._machine = machine |
| self._host = hosts.create_host(self._machine) |
| self._client_at = autotest.Autotest(self._host) |
| client_attributes = host_attributes.host_attributes(machine) |
| self.attribute_set = set(client_attributes.get_attributes()) |
| self._results_dir = work_dir |
| if not os.path.exists(self._results_dir): |
| os.makedirs(self._results_dir) |
| machine_data = {'hostname': self._machine, |
| 'status_version': str(1)} |
| utils.write_keyval(self._results_dir, machine_data) |
| |
| def __str__(self): |
| attributes = [a for a in self.attribute_set] |
| return '%s attributes=%s' % (self._machine, attributes) |
| |
| def get_test(self): |
| """Return a test from the queue to run on this host. |
| |
| The test queue can be non-empty, but still not contain a test that is |
| valid for this machine. This function will take exclusive access to |
| the queue via _test_queue_lock and repeatedly pop tests off the queue |
| until finding a valid test or depleting the queue. In either case if |
| invalid tests have been popped from the queue, they are pushed back |
| onto the queue before returning. |
| |
| Returns: |
| test_item, or None if no more tests exist for this machine. |
| """ |
| good_test = None |
| skipped_tests = [] |
| |
| with self._test_queue_lock: |
| while True: |
| try: |
| canidate_test = self._test_queue.get_nowait() |
| # Check if test is valid for this machine. |
| if canidate_test.validate(self.attribute_set): |
| good_test = canidate_test |
| break |
| skipped_tests.append(canidate_test) |
| |
| except Queue.Empty: |
| break |
| |
| # Return any skipped tests to the queue. |
| for st in skipped_tests: |
| self._test_queue.put(st) |
| |
| return good_test |
| |
| def run(self): |
| """Executes tests on the host machine. |
| |
| If continuous parsing was requested, start the parser before running |
| tests. |
| """ |
| # Modify job.resultdir so that it points to the results directory for |
| # the machine we're working on. Required so that server jobs will write |
| # to the proper location. |
| self._server_job.machines = [self._machine] |
| self._server_job.push_execution_context(self._machine) |
| os.chdir(self._server_job.resultdir) |
| if self._continuous_parsing: |
| self._server_job._parse_job += "/" + self._machine |
| self._server_job._using_parser = True |
| self._server_job.init_parser() |
| |
| while True: |
| active_test = self.get_test() |
| if active_test is None: |
| break |
| |
| logging.info('%s running %s', self._machine, active_test) |
| try: |
| active_test.run_test(self._client_at, self._results_dir, |
| self._server_job) |
| except error.AutoservError: |
| logging.exception('Autoserv error running "%s".', active_test) |
| except error.AutotestError: |
| logging.exception('Autotest error running "%s".', active_test) |
| except Exception: |
| logging.exception('Exception running test "%s".', active_test) |
| raise |
| finally: |
| self._test_queue.task_done() |
| self._tests_run += 1 |
| |
| if self._continuous_parsing: |
| self._server_job.cleanup_parser() |
| logging.info('%s completed %d tests.', self._machine, self._tests_run) |