| Paul Pendlebury | f807c18 | 2011-04-05 11:24:34 -0700 | [diff] [blame^] | 1 | # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. |
| 2 | # Use of this source code is governed by a BSD-style license that can be |
| 3 | # found in the LICENSE file. |
| 4 | |
| 5 | """ |
| 6 | Utility classes used by server_job.distribute_across_machines(). |
| 7 | |
| 8 | test_item: extends the basic test tuple to add include/exclude attributes. |
| 9 | |
| 10 | machine_worker: is a thread that manages running tests on a host. It |
| 11 | verifies test are valid for a host using the test attributes from test_item |
| 12 | and the host attributes from host_attributes. |
| 13 | """ |
| 14 | |
| 15 | |
| 16 | import logging, os, Queue, threading |
| 17 | from autotest_lib.client.common_lib import error, utils |
| 18 | from autotest_lib.server import autotest, hosts, host_attributes, subcommand |
| 19 | |
| 20 | |
| 21 | class test_item(object): |
| 22 | """Adds machine verification logic to the basic test tuple. |
| 23 | |
| 24 | Tests can either be tuples of the existing form ('testName', {args}) or the |
| 25 | extended for of ('testname', {args}, ['include'], ['exclude']) where include |
| 26 | and exclude are lists of attributes. A machine must have all the attributes |
| 27 | in include and must not have any of the attributes in exclude to be valid |
| 28 | for the test. |
| 29 | """ |
| 30 | |
| 31 | def __init__(self, test_name, test_args, include_attribs=None, |
| 32 | exclude_attribs=None): |
| 33 | """Creates an instance of test_item. |
| 34 | |
| 35 | Args: |
| 36 | test_name: string, name of test to execute. |
| 37 | test_args: dictionary, arguments to pass into test. |
| 38 | include_attribs: attributes a machine must have to run test. |
| 39 | exclude_attribs: attributes preventing a machine from running test. |
| 40 | """ |
| 41 | self.test_name = test_name |
| 42 | self.test_args = test_args |
| 43 | self.inc_set = None |
| 44 | if include_attribs is not None: |
| 45 | self.inc_set = set(include_attribs) |
| 46 | self.exc_set = None |
| 47 | if exclude_attribs is not None: |
| 48 | self.exc_set = set(exclude_attribs) |
| 49 | |
| 50 | def __str__(self): |
| 51 | """Return an info string of this test.""" |
| 52 | params = ['%s=%s' % (k, v) for k, v in self.test_args.items()] |
| 53 | msg = '%s(%s)' % (self.test_name, params) |
| 54 | if self.inc_set: msg += ' include=%s' % [s for s in self.inc_set] |
| 55 | if self.exc_set: msg += ' exclude=%s' % [s for s in self.exc_set] |
| 56 | return msg |
| 57 | |
| 58 | def validate(self, machine_attributes): |
| 59 | """Check if this test can run on machine with machine_attributes. |
| 60 | |
| 61 | If the test has include attributes, a candidate machine must have all |
| 62 | the attributes to be valid. |
| 63 | |
| 64 | If the test has exclude attributes, a candidate machine cannot have any |
| 65 | of the attributes to be valid. |
| 66 | |
| 67 | Args: |
| 68 | machine_attributes: set, True attributes of candidate machine. |
| 69 | |
| 70 | Returns: |
| 71 | True/False if the machine is valid for this test. |
| 72 | """ |
| 73 | if self.inc_set is not None: |
| 74 | if not self.inc_set <= machine_attributes: return False |
| 75 | if self.exc_set is not None: |
| 76 | if self.exc_set & machine_attributes: return False |
| 77 | return True |
| 78 | |
| 79 | |
| 80 | class machine_worker(threading.Thread): |
| 81 | """Thread that runs tests on a remote host machine.""" |
| 82 | |
| 83 | def __init__(self, machine, work_dir, test_queue, queue_lock): |
| 84 | """Creates an instance of machine_worker to run tests on a remote host. |
| 85 | |
| 86 | Retrieves that host attributes for this machine and creates the set of |
| 87 | True attributes to validate against test include/exclude attributes. |
| 88 | |
| 89 | Creates a directory to hold the log files for tests run and writes the |
| 90 | hostname and tko parser version into keyvals file. |
| 91 | |
| 92 | Args: |
| 93 | machine: name of remote host. |
| 94 | work_dir: directory server job is using. |
| 95 | test_queue: queue of tests. |
| 96 | queue_lock: lock protecting test_queue. |
| 97 | """ |
| 98 | threading.Thread.__init__(self) |
| 99 | self._test_queue = test_queue |
| 100 | self._test_queue_lock = queue_lock |
| 101 | self._tests_run = 0 |
| 102 | self._machine = machine |
| 103 | self._host = hosts.create_host(self._machine) |
| 104 | self._client_at = autotest.Autotest(self._host) |
| 105 | client_attributes = host_attributes.host_attributes(machine) |
| 106 | self.attribute_set = set([key for key, value in |
| 107 | client_attributes.__dict__.items() if value]) |
| 108 | self._results_dir = os.path.join(work_dir, self._machine) |
| 109 | if not os.path.exists(self._results_dir): |
| 110 | os.makedirs(self._results_dir) |
| 111 | machine_data = {'hostname': self._machine, |
| 112 | 'status_version': str(1)} |
| 113 | utils.write_keyval(self._results_dir, machine_data) |
| 114 | |
| 115 | def __str__(self): |
| 116 | attributes = [a for a in self.attribute_set] |
| 117 | return '%s attributes=%s' % (self._machine, attributes) |
| 118 | |
| 119 | def get_test(self): |
| 120 | """Return a test from the queue to run on this host. |
| 121 | |
| 122 | The test queue can be non-empty, but still not contain a test that is |
| 123 | valid for this machine. This function will take exclusive access to |
| 124 | the queue via _test_queue_lock and repeatedly pop tests off the queue |
| 125 | until finding a valid test or depleting the queue. In either case if |
| 126 | invalid tests have been popped from the queue, they are pushed back |
| 127 | onto the queue before returning. |
| 128 | |
| 129 | Returns: |
| 130 | test_item, or None if no more tests exist for this machine. |
| 131 | """ |
| 132 | good_test = None |
| 133 | skipped_tests = [] |
| 134 | |
| 135 | with self._test_queue_lock: |
| 136 | while True: |
| 137 | try: |
| 138 | canidate_test = self._test_queue.get_nowait() |
| 139 | # Check if test is valid for this machine. |
| 140 | if canidate_test.validate(self.attribute_set): |
| 141 | good_test = canidate_test |
| 142 | break |
| 143 | skipped_tests.append(canidate_test) |
| 144 | |
| 145 | except Queue.Empty: |
| 146 | break |
| 147 | |
| 148 | # Return any skipped tests to the queue. |
| 149 | for st in skipped_tests: |
| 150 | self._test_queue.put(st) |
| 151 | |
| 152 | return good_test |
| 153 | |
| 154 | def run_subcommand(self, active_test): |
| 155 | """Use subcommand to fork process and execute test.""" |
| 156 | sub_cmd = subcommand.subcommand(self.subcommand_wrapper, [active_test]) |
| 157 | sub_cmd.fork_start() |
| 158 | sub_cmd.fork_waitfor() |
| 159 | |
| 160 | def subcommand_wrapper(self, active_test): |
| 161 | """Callback for subcommand to call into with the test parameter.""" |
| 162 | self._client_at.run_test(active_test.test_name, |
| 163 | results_dir=self._results_dir, |
| 164 | **active_test.test_args) |
| 165 | |
| 166 | def run(self): |
| 167 | """Executes tests on host machine. |
| 168 | |
| 169 | Uses subprocess to fork the process when running tests so unique client |
| 170 | jobs talk to unique server jobs which prevents log files from |
| 171 | simultaneous tests interweaving with each other. |
| 172 | """ |
| 173 | while True: |
| 174 | active_test = self.get_test() |
| 175 | if active_test is None: |
| 176 | break |
| 177 | |
| 178 | logging.info('%s running %s', self._machine, active_test) |
| 179 | try: |
| 180 | self.run_subcommand(active_test) |
| 181 | except (error.AutoservError, error.AutotestError): |
| 182 | logging.exception('Error running test "%s".', active_test) |
| 183 | except Exception: |
| 184 | logging.exception('Exception running test "%s".', active_test) |
| 185 | raise |
| 186 | finally: |
| 187 | self._test_queue.task_done() |
| 188 | self._tests_run += 1 |
| 189 | |
| 190 | logging.info('%s completed %d tests.', self._machine, self._tests_run) |