blob: fab956e2fc29fae5c64305d5039f54677aa11e9b [file] [log] [blame]
Paul Pendleburyf807c182011-04-05 11:24:34 -07001# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""
6Utility classes used by server_job.distribute_across_machines().
7
8test_item: extends the basic test tuple to add include/exclude attributes.
9
10machine_worker: is a thread that manages running tests on a host. It
11 verifies test are valid for a host using the test attributes from test_item
12 and the host attributes from host_attributes.
13"""
14
15
16import logging, os, Queue, threading
17from autotest_lib.client.common_lib import error, utils
18from autotest_lib.server import autotest, hosts, host_attributes, subcommand
19
20
21class test_item(object):
22 """Adds machine verification logic to the basic test tuple.
23
24 Tests can either be tuples of the existing form ('testName', {args}) or the
25 extended for of ('testname', {args}, ['include'], ['exclude']) where include
26 and exclude are lists of attributes. A machine must have all the attributes
27 in include and must not have any of the attributes in exclude to be valid
28 for the test.
29 """
30
31 def __init__(self, test_name, test_args, include_attribs=None,
32 exclude_attribs=None):
33 """Creates an instance of test_item.
34
35 Args:
36 test_name: string, name of test to execute.
37 test_args: dictionary, arguments to pass into test.
38 include_attribs: attributes a machine must have to run test.
39 exclude_attribs: attributes preventing a machine from running test.
40 """
41 self.test_name = test_name
42 self.test_args = test_args
43 self.inc_set = None
44 if include_attribs is not None:
45 self.inc_set = set(include_attribs)
46 self.exc_set = None
47 if exclude_attribs is not None:
48 self.exc_set = set(exclude_attribs)
49
50 def __str__(self):
51 """Return an info string of this test."""
52 params = ['%s=%s' % (k, v) for k, v in self.test_args.items()]
53 msg = '%s(%s)' % (self.test_name, params)
54 if self.inc_set: msg += ' include=%s' % [s for s in self.inc_set]
55 if self.exc_set: msg += ' exclude=%s' % [s for s in self.exc_set]
56 return msg
57
58 def validate(self, machine_attributes):
59 """Check if this test can run on machine with machine_attributes.
60
61 If the test has include attributes, a candidate machine must have all
62 the attributes to be valid.
63
64 If the test has exclude attributes, a candidate machine cannot have any
65 of the attributes to be valid.
66
67 Args:
68 machine_attributes: set, True attributes of candidate machine.
69
70 Returns:
71 True/False if the machine is valid for this test.
72 """
73 if self.inc_set is not None:
74 if not self.inc_set <= machine_attributes: return False
75 if self.exc_set is not None:
76 if self.exc_set & machine_attributes: return False
77 return True
78
79
80class machine_worker(threading.Thread):
81 """Thread that runs tests on a remote host machine."""
82
83 def __init__(self, machine, work_dir, test_queue, queue_lock):
84 """Creates an instance of machine_worker to run tests on a remote host.
85
86 Retrieves that host attributes for this machine and creates the set of
87 True attributes to validate against test include/exclude attributes.
88
89 Creates a directory to hold the log files for tests run and writes the
90 hostname and tko parser version into keyvals file.
91
92 Args:
93 machine: name of remote host.
94 work_dir: directory server job is using.
95 test_queue: queue of tests.
96 queue_lock: lock protecting test_queue.
97 """
98 threading.Thread.__init__(self)
99 self._test_queue = test_queue
100 self._test_queue_lock = queue_lock
101 self._tests_run = 0
102 self._machine = machine
103 self._host = hosts.create_host(self._machine)
104 self._client_at = autotest.Autotest(self._host)
105 client_attributes = host_attributes.host_attributes(machine)
106 self.attribute_set = set([key for key, value in
107 client_attributes.__dict__.items() if value])
108 self._results_dir = os.path.join(work_dir, self._machine)
109 if not os.path.exists(self._results_dir):
110 os.makedirs(self._results_dir)
111 machine_data = {'hostname': self._machine,
112 'status_version': str(1)}
113 utils.write_keyval(self._results_dir, machine_data)
114
115 def __str__(self):
116 attributes = [a for a in self.attribute_set]
117 return '%s attributes=%s' % (self._machine, attributes)
118
119 def get_test(self):
120 """Return a test from the queue to run on this host.
121
122 The test queue can be non-empty, but still not contain a test that is
123 valid for this machine. This function will take exclusive access to
124 the queue via _test_queue_lock and repeatedly pop tests off the queue
125 until finding a valid test or depleting the queue. In either case if
126 invalid tests have been popped from the queue, they are pushed back
127 onto the queue before returning.
128
129 Returns:
130 test_item, or None if no more tests exist for this machine.
131 """
132 good_test = None
133 skipped_tests = []
134
135 with self._test_queue_lock:
136 while True:
137 try:
138 canidate_test = self._test_queue.get_nowait()
139 # Check if test is valid for this machine.
140 if canidate_test.validate(self.attribute_set):
141 good_test = canidate_test
142 break
143 skipped_tests.append(canidate_test)
144
145 except Queue.Empty:
146 break
147
148 # Return any skipped tests to the queue.
149 for st in skipped_tests:
150 self._test_queue.put(st)
151
152 return good_test
153
154 def run_subcommand(self, active_test):
155 """Use subcommand to fork process and execute test."""
156 sub_cmd = subcommand.subcommand(self.subcommand_wrapper, [active_test])
157 sub_cmd.fork_start()
158 sub_cmd.fork_waitfor()
159
160 def subcommand_wrapper(self, active_test):
161 """Callback for subcommand to call into with the test parameter."""
162 self._client_at.run_test(active_test.test_name,
163 results_dir=self._results_dir,
164 **active_test.test_args)
165
166 def run(self):
167 """Executes tests on host machine.
168
169 Uses subprocess to fork the process when running tests so unique client
170 jobs talk to unique server jobs which prevents log files from
171 simultaneous tests interweaving with each other.
172 """
173 while True:
174 active_test = self.get_test()
175 if active_test is None:
176 break
177
178 logging.info('%s running %s', self._machine, active_test)
179 try:
180 self.run_subcommand(active_test)
181 except (error.AutoservError, error.AutotestError):
182 logging.exception('Error running test "%s".', active_test)
183 except Exception:
184 logging.exception('Exception running test "%s".', active_test)
185 raise
186 finally:
187 self._test_queue.task_done()
188 self._tests_run += 1
189
190 logging.info('%s completed %d tests.', self._machine, self._tests_run)