Paul Pendlebury | 9dc949c | 2011-04-06 13:31:07 -0700 | [diff] [blame] | 1 | # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. |
| 2 | # Use of this source code is governed by a BSD-style license that can be |
| 3 | # found in the LICENSE file. |
| 4 | |
Paul Pendlebury | 5759356 | 2011-06-15 10:45:49 -0700 | [diff] [blame] | 5 | """Site extensions to server_job. Adds distribute_across_machines().""" |
| 6 | |
| 7 | import os, logging, multiprocessing |
| 8 | from autotest_lib.server import site_gtest_runner, site_server_job_utils |
| 9 | from autotest_lib.server import subcommand |
| 10 | from autotest_lib.server.server_job import base_server_job |
Paul Pendlebury | 9dc949c | 2011-04-06 13:31:07 -0700 | [diff] [blame] | 11 | import utils |
| 12 | |
| 13 | |
| 14 | def get_site_job_data(job): |
| 15 | """Add custom data to the job keyval info. |
| 16 | |
| 17 | When multiple machines are used in a job, change the hostname to |
| 18 | the platform of the first machine instead of machine1,machine2,... This |
| 19 | makes the job reports easier to read and keeps the tko_machines table from |
| 20 | growing too large. |
| 21 | |
| 22 | Args: |
| 23 | job: instance of server_job. |
| 24 | |
| 25 | Returns: |
| 26 | keyval dictionary with new hostname value, or empty dictionary. |
| 27 | """ |
| 28 | site_job_data = {} |
| 29 | # Only modify hostname on multimachine jobs. Assume all host have the same |
| 30 | # platform. |
| 31 | if len(job.machines) > 1: |
| 32 | # Search through machines for first machine with a platform. |
| 33 | for host in job.machines: |
| 34 | keyval_path = os.path.join(job.resultdir, 'host_keyvals', host) |
| 35 | keyvals = utils.read_keyval(keyval_path) |
| 36 | host_plat = keyvals.get('platform', None) |
| 37 | if not host_plat: |
| 38 | continue |
| 39 | site_job_data['hostname'] = host_plat |
| 40 | break |
| 41 | return site_job_data |
| 42 | |
| 43 | |
Paul Pendlebury | 5759356 | 2011-06-15 10:45:49 -0700 | [diff] [blame] | 44 | class site_server_job(base_server_job): |
| 45 | """Extend server_job adding distribute_across_machines.""" |
| 46 | |
| 47 | def __init__(self, *args, **dargs): |
| 48 | super(site_server_job, self).__init__(*args, **dargs) |
| 49 | |
| 50 | |
| 51 | def run(self, *args, **dargs): |
| 52 | """Extend server_job.run adding gtest_runner to the namespace.""" |
| 53 | |
| 54 | gtest_run = {'gtest_runner': site_gtest_runner.gtest_runner()} |
| 55 | |
| 56 | # Namespace is the 5th parameter to run(). If args has 5 or more |
| 57 | # entries in it then we need to fix-up this namespace entry. |
| 58 | if len(args) >= 5: |
| 59 | args[4].update(gtest_run) |
| 60 | # Else, if present, namespace must be in dargs. |
| 61 | else: |
| 62 | dargs.setdefault('namespace', gtest_run).update(gtest_run) |
| 63 | # Now call the original run() with the modified namespace containing a |
| 64 | # gtest_runner |
| 65 | super(site_server_job, self).run(*args, **dargs) |
| 66 | |
| 67 | |
| 68 | def distribute_across_machines(self, tests, machines, |
| 69 | continuous_parsing=False): |
| 70 | """Run each test in tests once using machines. |
| 71 | |
| 72 | Instead of running each test on each machine like parallel_on_machines, |
| 73 | run each test once across all machines. Put another way, the total |
| 74 | number of tests run by parallel_on_machines is len(tests) * |
| 75 | len(machines). The number of tests run by distribute_across_machines is |
| 76 | len(tests). |
| 77 | |
| 78 | Args: |
| 79 | tests: List of tests to run. |
| 80 | machines: List of machines to use. |
| 81 | continuous_parsing: Bool, if true parse job while running. |
| 82 | """ |
| 83 | # The Queue is thread safe, but since a machine may have to search |
| 84 | # through the queue to find a valid test the lock provides exclusive |
| 85 | # queue access for more than just the get call. |
| 86 | test_queue = multiprocessing.JoinableQueue() |
| 87 | test_queue_lock = multiprocessing.Lock() |
| 88 | |
| 89 | unique_machine_attributes = [] |
| 90 | sub_commands = [] |
| 91 | work_dir = self.resultdir |
| 92 | |
| 93 | for machine in machines: |
| 94 | if 'group' in self.resultdir: |
| 95 | work_dir = os.path.join(self.resultdir, machine) |
| 96 | |
| 97 | mw = site_server_job_utils.machine_worker(self, |
| 98 | machine, |
| 99 | work_dir, |
| 100 | test_queue, |
| 101 | test_queue_lock, |
| 102 | continuous_parsing) |
| 103 | |
| 104 | # Create the subcommand instance to run this machine worker. |
| 105 | sub_commands.append(subcommand.subcommand(mw.run, |
| 106 | [], |
| 107 | work_dir)) |
| 108 | |
| 109 | # To (potentially) speed up searching for valid tests create a list |
| 110 | # of unique attribute sets present in the machines for this job. If |
| 111 | # sets were hashable we could just use a dictionary for fast |
| 112 | # verification. This at least reduces the search space from the |
| 113 | # number of machines to the number of unique machines. |
| 114 | if not mw.attribute_set in unique_machine_attributes: |
| 115 | unique_machine_attributes.append(mw.attribute_set) |
| 116 | |
| 117 | # Only queue tests which are valid on at least one machine. Record |
| 118 | # skipped tests in the status.log file using record_skipped_test(). |
| 119 | for test_entry in tests: |
Dale Curtis | e3c4349 | 2011-07-13 10:54:45 -0700 | [diff] [blame] | 120 | # Check if it's an old style test entry. |
| 121 | if len(test_entry) > 2 and not isinstance(test_entry[2], dict): |
| 122 | test_attribs = {'include': test_entry[2]} |
| 123 | if len(test_entry) > 3: |
| 124 | test_attribs['exclude'] = test_entry[3] |
| 125 | if len(test_entry) > 4: |
| 126 | test_attribs['attributes'] = test_entry[4] |
| 127 | |
| 128 | test_entry = list(test_entry[:2]) |
| 129 | test_entry.append(test_attribs) |
| 130 | |
Paul Pendlebury | 5759356 | 2011-06-15 10:45:49 -0700 | [diff] [blame] | 131 | ti = site_server_job_utils.test_item(*test_entry) |
| 132 | machine_found = False |
| 133 | for ma in unique_machine_attributes: |
| 134 | if ti.validate(ma): |
| 135 | test_queue.put(ti) |
| 136 | machine_found = True |
| 137 | break |
| 138 | if not machine_found: |
| 139 | self.record_skipped_test(ti) |
| 140 | |
| 141 | # Run valid tests and wait for completion. |
| 142 | subcommand.parallel(sub_commands) |
| 143 | |
| 144 | |
| 145 | def record_skipped_test(self, skipped_test, message=None): |
| 146 | """Insert a failure record into status.log for this test.""" |
| 147 | msg = message |
| 148 | if msg is None: |
| 149 | msg = 'No valid machines found for test %s.' % skipped_test |
| 150 | logging.info(msg) |
| 151 | self.record('START', None, skipped_test.test_name) |
| 152 | self.record('INFO', None, skipped_test.test_name, msg) |
| 153 | self.record('END TEST_NA', None, skipped_test.test_name, msg) |