| # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Site extensions to server_job. Adds distribute_across_machines().""" |
| |
| import os, logging, multiprocessing |
| from autotest_lib.server import site_gtest_runner, site_server_job_utils |
| from autotest_lib.server import subcommand |
| from autotest_lib.server.server_job import base_server_job |
| import utils |
| |
| |
| def get_site_job_data(job): |
| """Add custom data to the job keyval info. |
| |
| When multiple machines are used in a job, change the hostname to |
| the platform of the first machine instead of machine1,machine2,... This |
| makes the job reports easier to read and keeps the tko_machines table from |
| growing too large. |
| |
| Args: |
| job: instance of server_job. |
| |
| Returns: |
| keyval dictionary with new hostname value, or empty dictionary. |
| """ |
| site_job_data = {} |
| # Only modify hostname on multimachine jobs. Assume all host have the same |
| # platform. |
| if len(job.machines) > 1: |
| # Search through machines for first machine with a platform. |
| for host in job.machines: |
| keyval_path = os.path.join(job.resultdir, 'host_keyvals', host) |
| keyvals = utils.read_keyval(keyval_path) |
| host_plat = keyvals.get('platform', None) |
| if not host_plat: |
| continue |
| site_job_data['hostname'] = host_plat |
| break |
| return site_job_data |
| |
| |
| class site_server_job(base_server_job): |
| """Extend server_job adding distribute_across_machines.""" |
| |
| def __init__(self, *args, **dargs): |
| super(site_server_job, self).__init__(*args, **dargs) |
| |
| |
| def run(self, *args, **dargs): |
| """Extend server_job.run adding gtest_runner to the namespace.""" |
| |
| gtest_run = {'gtest_runner': site_gtest_runner.gtest_runner()} |
| |
| # Namespace is the 5th parameter to run(). If args has 5 or more |
| # entries in it then we need to fix-up this namespace entry. |
| if len(args) >= 5: |
| args[4].update(gtest_run) |
| # Else, if present, namespace must be in dargs. |
| else: |
| dargs.setdefault('namespace', gtest_run).update(gtest_run) |
| # Now call the original run() with the modified namespace containing a |
| # gtest_runner |
| super(site_server_job, self).run(*args, **dargs) |
| |
| |
| def distribute_across_machines(self, tests, machines, |
| continuous_parsing=False): |
| """Run each test in tests once using machines. |
| |
| Instead of running each test on each machine like parallel_on_machines, |
| run each test once across all machines. Put another way, the total |
| number of tests run by parallel_on_machines is len(tests) * |
| len(machines). The number of tests run by distribute_across_machines is |
| len(tests). |
| |
| Args: |
| tests: List of tests to run. |
| machines: List of machines to use. |
| continuous_parsing: Bool, if true parse job while running. |
| """ |
| # The Queue is thread safe, but since a machine may have to search |
| # through the queue to find a valid test the lock provides exclusive |
| # queue access for more than just the get call. |
| test_queue = multiprocessing.JoinableQueue() |
| test_queue_lock = multiprocessing.Lock() |
| |
| unique_machine_attributes = [] |
| sub_commands = [] |
| work_dir = self.resultdir |
| |
| for machine in machines: |
| if 'group' in self.resultdir: |
| work_dir = os.path.join(self.resultdir, machine) |
| |
| mw = site_server_job_utils.machine_worker(self, |
| machine, |
| work_dir, |
| test_queue, |
| test_queue_lock, |
| continuous_parsing) |
| |
| # Create the subcommand instance to run this machine worker. |
| sub_commands.append(subcommand.subcommand(mw.run, |
| [], |
| work_dir)) |
| |
| # To (potentially) speed up searching for valid tests create a list |
| # of unique attribute sets present in the machines for this job. If |
| # sets were hashable we could just use a dictionary for fast |
| # verification. This at least reduces the search space from the |
| # number of machines to the number of unique machines. |
| if not mw.attribute_set in unique_machine_attributes: |
| unique_machine_attributes.append(mw.attribute_set) |
| |
| # Only queue tests which are valid on at least one machine. Record |
| # skipped tests in the status.log file using record_skipped_test(). |
| for test_entry in tests: |
| # Check if it's an old style test entry. |
| if len(test_entry) > 2 and not isinstance(test_entry[2], dict): |
| test_attribs = {'include': test_entry[2]} |
| if len(test_entry) > 3: |
| test_attribs['exclude'] = test_entry[3] |
| if len(test_entry) > 4: |
| test_attribs['attributes'] = test_entry[4] |
| |
| test_entry = list(test_entry[:2]) |
| test_entry.append(test_attribs) |
| |
| ti = site_server_job_utils.test_item(*test_entry) |
| machine_found = False |
| for ma in unique_machine_attributes: |
| if ti.validate(ma): |
| test_queue.put(ti) |
| machine_found = True |
| break |
| if not machine_found: |
| self.record_skipped_test(ti) |
| |
| # Run valid tests and wait for completion. |
| subcommand.parallel(sub_commands) |
| |
| |
| def record_skipped_test(self, skipped_test, message=None): |
| """Insert a failure record into status.log for this test.""" |
| msg = message |
| if msg is None: |
| msg = 'No valid machines found for test %s.' % skipped_test |
| logging.info(msg) |
| self.record('START', None, skipped_test.test_name) |
| self.record('INFO', None, skipped_test.test_name, msg) |
| self.record('END TEST_NA', None, skipped_test.test_name, msg) |
| |