Move distribute_across_machines and gtest to site extension.

(Patch 11: Simplify how inserts gtest_runner
           into the namespace parameter of

(Patch 10: Update gtest_parser to warn and gracefully exit if the test
           log file does not exist.  This happens in a gtest suite
           throws a test exception before completing and copying the
           test log file back to the server.

           Also updated FailureDescription() to not return the name of
           the test to simplify checking for failure lines.  This
           makes "if failures: then use failures" work instead of
           "if len(failures) >1: then use failures[1:]")

(Patch 9: Code review fixes, clearer comments and one line argument

(Patch 8: Fix PyAuto parse failure with no error lines.)

To make keeping in step with upstream Autotest moving the
distribute_across_machines and gtest_runner into site extensions.

Clean up the old include, exclude, action test attributes from the old
(test_name, {args}, [include], [exclude], [action]) to a cleaner form
of (test_name, {args}, {attributes} where the attributes dictionary
is keyed to include, exclude, and attributes for the same behavior as
before in a nicer format.

Updated BVT and Regressions to use the new format.

    Removed unused imports I added for removed functions.
    Move site functions to end of file to enable importing
    Removed distribute_across_machines() and record_skipped_test().
    Removed gtest_runner from the default namespace.

    Added imports and functions removed from server_job.
    Changed distribute_across_machines from using threads that
        launched subprocesses to just using subprocesses.

    Fixed test attributes to use a dictionary instead of 3 lists.
    Enabled running server jobs in addition to client jobs.
    Removed base thread class from machine_worker since the instances
        are run by subcommands now.




TEST=Local Runs.

Change-Id: I118ae8bdc2b49d4190051d59a748ecb01d0da33c
Reviewed-by: Dale Curtis <>
Tested-by: Paul Pendlebury <>
diff --git a/server/ b/server/
new file mode 100644
index 0000000..2cb9c6f
--- /dev/null
+++ b/server/
@@ -0,0 +1,227 @@
+# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+"""Utility classes used by site_server_job.distribute_across_machines().
+test_item: extends the basic test tuple to add include/exclude attributes and
+    pre/post actions.
+machine_worker: is a thread that manages running tests on a host.  It
+    verifies test are valid for a host using the test attributes from test_item
+    and the host attributes from host_attributes.
+import logging, os, Queue
+from autotest_lib.client.common_lib import error, utils
+from autotest_lib.server import autotest, hosts, host_attributes
+class test_item(object):
+    """Adds machine verification logic to the basic test tuple.
+    Tests can either be tuples of the existing form ('testName', {args}) or the
+    extended form ('testname', {args}, {'include': [], 'exclude': [],
+    'attributes': []}) where include and exclude are lists of host attribute
+    labels and attributes is a list of strings. A machine must have all the
+    labels in include and must not have any of the labels in exclude to be valid
+    for the test. Attributes strings can include reboot_before, reboot_after,
+    and server_job.
+    """
+    def __init__(self, test_name, test_args, test_attribs={}):
+        """Creates an instance of test_item.
+        Args:
+            test_name: string, name of test to execute.
+            test_args: dictionary, arguments to pass into test.
+            test_attribs: Dictionary of test attributes. Valid keys are:
+              include - labels a machine must have to run a test.
+              exclude - labels preventing a machine from running a test.
+              attributes - reboot before/after test, run test as server job.
+        """
+        self.test_name = test_name
+        self.test_args = test_args
+        self.inc_set = set(test_attribs.get('include', []))
+        self.exc_set = set(test_attribs.get('exclude', []))
+        self.attributes = test_attribs.get('attributes', [])
+    def __str__(self):
+        """Return an info string of this test."""
+        params = ['%s=%s' % (k, v) for k, v in self.test_args.items()]
+        msg = '%s(%s)' % (self.test_name, params)
+        if self.inc_set:
+            msg += ' include=%s' % [s for s in self.inc_set]
+        if self.exc_set:
+            msg += ' exclude=%s' % [s for s in self.exc_set]
+        if self.attributes:
+            msg += ' attributes=%s' % self.attributes
+        return msg
+    def validate(self, machine_attributes):
+        """Check if this test can run on machine with machine_attributes.
+        If the test has include attributes, a candidate machine must have all
+        the attributes to be valid.
+        If the test has exclude attributes, a candidate machine cannot have any
+        of the attributes to be valid.
+        Args:
+            machine_attributes: set, True attributes of candidate machine.
+        Returns:
+            True/False if the machine is valid for this test.
+        """
+        if self.inc_set is not None:
+            if not self.inc_set <= machine_attributes:
+                return False
+        if self.exc_set is not None:
+            if self.exc_set & machine_attributes:
+                return False
+        return True
+    def run_test(self, client_at, work_dir='.', server_job=None):
+        """Runs the test on the client using autotest.
+        Args:
+            client_at: Autotest instance for this host.
+            work_dir: Directory to use for results and log files.
+            server_job: Server_Job instance to use to runs server tests.
+        """
+        if 'reboot_before' in self.attributes:
+        try:
+            if 'server_job' in self.attributes:
+                if 'host' in self.test_args:
+                    self.test_args['host'] =
+                if server_job is not None:
+          'Running Server_Job=%s', self.test_name)
+                    server_job.run_test(self.test_name, **self.test_args)
+                else:
+                    logging.error('No Server_Job instance provided for test '
+                                  '%s.', self.test_name)
+            else:
+                client_at.run_test(self.test_name, results_dir=work_dir,
+                                   **self.test_args)
+        finally:
+            if 'reboot_after' in self.attributes:
+class machine_worker(object):
+    """Worker that runs tests on a remote host machine."""
+    def __init__(self, server_job, machine, work_dir, test_queue, queue_lock,
+                 continuous_parsing=False):
+        """Creates an instance of machine_worker to run tests on a remote host.
+        Retrieves that host attributes for this machine and creates the set of
+        True attributes to validate against test include/exclude attributes.
+        Creates a directory to hold the log files for tests run and writes the
+        hostname and tko parser version into keyvals file.
+        Args:
+            server_job: run tests for this server_job.
+            machine: name of remote host.
+            work_dir: directory server job is using.
+            test_queue: queue of tests.
+            queue_lock: lock protecting test_queue.
+            continuous_parsing: bool, enable continuous parsing.
+        """
+        self._server_job = server_job
+        self._test_queue = test_queue
+        self._test_queue_lock = queue_lock
+        self._continuous_parsing = continuous_parsing
+        self._tests_run = 0
+        self._machine = machine
+        self._host = hosts.create_host(self._machine)
+        self._client_at = autotest.Autotest(self._host)
+        client_attributes = host_attributes.host_attributes(machine)
+        self.attribute_set = set(client_attributes.get_attributes())
+        self._results_dir = work_dir
+        if not os.path.exists(self._results_dir):
+            os.makedirs(self._results_dir)
+        machine_data = {'hostname': self._machine,
+                        'status_version': str(1)}
+        utils.write_keyval(self._results_dir, machine_data)
+    def __str__(self):
+        attributes = [a for a in self.attribute_set]
+        return '%s attributes=%s' % (self._machine, attributes)
+    def get_test(self):
+        """Return a test from the queue to run on this host.
+        The test queue can be non-empty, but still not contain a test that is
+        valid for this machine. This function will take exclusive access to
+        the queue via _test_queue_lock and repeatedly pop tests off the queue
+        until finding a valid test or depleting the queue.  In either case if
+        invalid tests have been popped from the queue, they are pushed back
+        onto the queue before returning.
+        Returns:
+            test_item, or None if no more tests exist for this machine.
+        """
+        good_test = None
+        skipped_tests = []
+        with self._test_queue_lock:
+            while True:
+                try:
+                    canidate_test = self._test_queue.get_nowait()
+                    # Check if test is valid for this machine.
+                    if canidate_test.validate(self.attribute_set):
+                        good_test = canidate_test
+                        break
+                    skipped_tests.append(canidate_test)
+                except Queue.Empty:
+                    break
+            # Return any skipped tests to the queue.
+            for st in skipped_tests:
+                self._test_queue.put(st)
+        return good_test
+    def run(self):
+        """Executes tests on the host machine.
+        If continuous parsing was requested, start the parser before running
+        tests.
+        """
+        if self._continuous_parsing:
+            self._server_job._parse_job += "/" + self._machine
+            self._server_job._using_parser = True
+            self._server_job.machines = [self._machine]
+            self._server_job.push_execution_context(self._machine)
+            self._server_job.init_parser()
+        while True:
+            active_test = self.get_test()
+            if active_test is None:
+                break
+  '%s running %s', self._machine, active_test)
+            try:
+                active_test.run_test(self._client_at, self._results_dir,
+                                     self._server_job)
+            except error.AutoservError:
+                logging.exception('Autoserv error running "%s".', active_test)
+            except error.AutotestError:
+                logging.exception('Autotest error running  "%s".', active_test)
+            except Exception:
+                logging.exception('Exception running test "%s".', active_test)
+                raise
+            finally:
+                self._test_queue.task_done()
+                self._tests_run += 1
+        if self._continuous_parsing:
+            self._server_job.cleanup_parser()
+'%s completed %d tests.', self._machine, self._tests_run)