Move distribute_across_machines and gtest to site extension.

(Patch 11: Simplify how site_server_job.run() inserts gtest_runner
           into the namespace parameter of server_job.run().)

(Patch 10: Update gtest_parser to warn and gracefully exit if the test
           log file does not exist.  This happens in a gtest suite
           throws a test exception before completing and copying the
           test log file back to the server.

           Also updated FailureDescription() to not return the name of
           the test to simplify checking for failure lines.  This
           makes "if failures: then use failures" work instead of
           "if len(failures) >1: then use failures[1:]")

(Patch 9: Code review fixes, clearer comments and one line argument
           parsing.)

(Patch 8: Fix PyAuto parse failure with no error lines.)

To make keeping in step with upstream Autotest moving the
distribute_across_machines and gtest_runner into site extensions.

Clean up the old include, exclude, action test attributes from the old
(test_name, {args}, [include], [exclude], [action]) to a cleaner form
of (test_name, {args}, {attributes} where the attributes dictionary
is keyed to include, exclude, and attributes for the same behavior as
before in a nicer format.

Updated BVT and Regressions to use the new format.

Server_Job:
    Removed unused imports I added for removed functions.
    Move site functions to end of file to enable importing
        base_server_job.
    Removed distribute_across_machines() and record_skipped_test().
    Removed gtest_runner from the default namespace.

Site_Server_Job:
    Added imports and functions removed from server_job.
    Changed distribute_across_machines from using threads that
        launched subprocesses to just using subprocesses.

Site_Server_Job_Utils:
    Fixed test attributes to use a dictionary instead of 3 lists.
    Enabled running server jobs in addition to client jobs.
    Removed base thread class from machine_worker since the instances
        are run by subcommands now.

logging_KernelCrashServer
http://pauldean.kir/afe/#tab_id=view_job&object_id=327

BVT
http://pauldean.kir/afe/#tab_id=view_job&object_id=328

Regression
http://pauldean.kir/afe/#tab_id=view_job&object_id=330

BUG=None.
TEST=Local Runs.

Change-Id: I118ae8bdc2b49d4190051d59a748ecb01d0da33c
Reviewed-on: http://gerrit.chromium.org/gerrit/2698
Reviewed-by: Dale Curtis <dalecurtis@chromium.org>
Tested-by: Paul Pendlebury <pauldean@chromium.org>
diff --git a/server/server_job.py b/server/server_job.py
index ba5f0a2..7b207b2 100644
--- a/server/server_job.py
+++ b/server/server_job.py
@@ -10,14 +10,12 @@
 """
 
 import getpass, os, sys, re, stat, tempfile, time, select, subprocess, platform
-import multiprocessing
 import traceback, shutil, warnings, fcntl, pickle, logging, itertools, errno
 from autotest_lib.client.bin import sysinfo
 from autotest_lib.client.common_lib import base_job
 from autotest_lib.client.common_lib import error, log, utils, packages
 from autotest_lib.client.common_lib import logging_manager
-from autotest_lib.server import test, subcommand, profilers, server_job_utils
-from autotest_lib.server import gtest_runner
+from autotest_lib.server import test, subcommand, profilers
 from autotest_lib.server.hosts import abstract_ssh
 from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils
 
@@ -47,12 +45,6 @@
     return {}
 
 
-# load up site-specific code for generating site-specific job data
-get_site_job_data = utils.import_site_function(__file__,
-    "autotest_lib.server.site_server_job", "get_site_job_data",
-    _get_site_job_data_dummy)
-
-
 class status_indenter(base_job.status_indenter):
     """Provide a simple integer-backed status indenter."""
     def __init__(self):
@@ -466,75 +458,6 @@
         return success_machines
 
 
-    def distribute_across_machines(self, tests, machines,
-                                   continuous_parsing=False):
-        """Run each test in tests once using machines.
-
-        Instead of running each test on each machine like parallel_on_machines,
-        run each test once across all machines. Put another way, the total
-        number of tests run by parallel_on_machines is len(tests) *
-        len(machines). The number of tests run by distribute_across_machines is
-        len(tests).
-
-        Args:
-            tests: List of tests to run.
-            machines: List of machines to use.
-            continuous_parsing: Bool, if true parse job while running.
-        """
-        # The Queue is thread safe, but since a machine may have to search
-        # through the queue to find a valid test the lock provides exclusive
-        # queue access for more than just the get call.
-        test_queue = multiprocessing.JoinableQueue()
-        test_queue_lock = multiprocessing.Lock()
-
-        machine_workers = [server_job_utils.machine_worker(self,
-                                                           machine,
-                                                           self.resultdir,
-                                                           test_queue,
-                                                           test_queue_lock,
-                                                           continuous_parsing)
-                           for machine in machines]
-
-        # To (potentially) speed up searching for valid tests create a list of
-        # unique attribute sets present in the machines for this job. If sets
-        # were hashable we could just use a dictionary for fast verification.
-        # This at least reduces the search space from the number of machines to
-        # the number of unique machines.
-        unique_machine_attributes = []
-        for mw in machine_workers:
-            if not mw.attribute_set in unique_machine_attributes:
-                unique_machine_attributes.append(mw.attribute_set)
-
-        # Only queue tests which are valid on at least one machine.  Record
-        # skipped tests in the status.log file using record_skipped_test().
-        for test_entry in tests:
-            ti = server_job_utils.test_item(*test_entry)
-            machine_found = False
-            for ma in unique_machine_attributes:
-                if ti.validate(ma):
-                    test_queue.put(ti)
-                    machine_found = True
-                    break
-            if not machine_found:
-                self.record_skipped_test(ti)
-
-        # Run valid tests and wait for completion.
-        for worker in machine_workers:
-            worker.start()
-        test_queue.join()
-
-
-    def record_skipped_test(self, skipped_test, message=None):
-        """Insert a failure record into status.log for this test."""
-        msg = message
-        if msg is None:
-            msg = 'No valid machines found for test %s.' % skipped_test
-        logging.info(msg)
-        self.record('START', None, skipped_test.test_name)
-        self.record('INFO', None, skipped_test.test_name, msg)
-        self.record('END TEST_NA', None, skipped_test.test_name, msg)
-
-
     _USE_TEMP_DIR = object()
     def run(self, cleanup=False, install_before=False, install_after=False,
             collect_crashdumps=True, namespace={}, control=None,
@@ -567,7 +490,6 @@
             control_file_dir = self.resultdir
 
         self.aborted = False
-        namespace['gtest_runner'] = gtest_runner.gtest_runner()
         namespace['machines'] = machines
         namespace['args'] = self.args
         namespace['job'] = self
@@ -1183,14 +1105,6 @@
                 host.clear_known_hosts()
 
 
-site_server_job = utils.import_site_class(
-    __file__, "autotest_lib.server.site_server_job", "site_server_job",
-    base_server_job)
-
-class server_job(site_server_job):
-    pass
-
-
 class warning_manager(object):
     """Class for controlling warning logs. Manages the enabling and disabling
     of warnings."""
@@ -1222,3 +1136,18 @@
         intervals = self.disabled_warnings.get(warning_type, [])
         if intervals and intervals[-1][1] is None:
             intervals[-1] = (intervals[-1][0], int(current_time_func()))
+
+
+# load up site-specific code for generating site-specific job data
+get_site_job_data = utils.import_site_function(__file__,
+    "autotest_lib.server.site_server_job", "get_site_job_data",
+    _get_site_job_data_dummy)
+
+
+site_server_job = utils.import_site_class(
+    __file__, "autotest_lib.server.site_server_job", "site_server_job",
+    base_server_job)
+
+
+class server_job(site_server_job):
+    pass
\ No newline at end of file
diff --git a/server/gtest_runner.py b/server/site_gtest_runner.py
similarity index 97%
rename from server/gtest_runner.py
rename to server/site_gtest_runner.py
index e702437..b07bfd9 100644
--- a/server/gtest_runner.py
+++ b/server/site_gtest_runner.py
@@ -12,7 +12,7 @@
 
 import logging, os, re
 from autotest_lib.server import autotest, hosts, host_attributes
-from autotest_lib.server import server_job_utils
+from autotest_lib.server import site_server_job_utils
 
 
 class gtest_runner(object):
@@ -42,7 +42,7 @@
             work_dir: Local directory to run tests in.
 
         """
-        self._gtest = server_job_utils.test_item(*gtest_entry)
+        self._gtest = site_server_job_utils.test_item(*gtest_entry)
         self._host = hosts.create_host(machine)
         self._results_dir = work_dir
 
@@ -67,11 +67,14 @@
         Uses gtest_parser to pull the test results out of the gtest log file.
         Then creates entries  in status.log file for each test.
         """
-        parser = gtest_parser()
-
         # Find gtest log files from the autotest client run.
         log_path = os.path.join(self._results_dir, self._gtest.test_name,
                                 'debug', self._gtest.test_name + '.DEBUG')
+        if not os.path.exists(log_path):
+            logging.error('gtest log file "%s" is missing.', log_path)
+            return
+
+        parser = gtest_parser()
 
         # Read the log file line-by-line, passing each line into the parser.
         with open(log_path, 'r') as log_file:
@@ -83,11 +86,11 @@
         # Record each failed test.
         for failed in parser.FailedTests():
             fail_description = parser.FailureDescription(failed)
-            if len(fail_description) > 1:
-                self.record_failed_test(failed, fail_description[1].strip(),
-                                        ''.join(fail_description[1:]))
+            if fail_description:
+                self.record_failed_test(failed, fail_description[0].strip(),
+                                        ''.join(fail_description))
             else:
-                self.record_failed_test(failed, '')
+                self.record_failed_test(failed, 'NO ERROR LINES FOUND.')
 
         # Finally record each successful test.
         for passed in parser.PassedTests():
@@ -283,7 +286,7 @@
             List of test name, and failure string.
         """
         test_status = self._test_status.get(test, ('', []))
-        return ["%s: " % test] + test_status[1]
+        return test_status[1]
 
     def SuppressionHashes(self):
         """Returns list of suppression hashes found in the log."""
diff --git a/server/site_server_job.py b/server/site_server_job.py
index d96c288..0695c9c 100644
--- a/server/site_server_job.py
+++ b/server/site_server_job.py
@@ -2,8 +2,12 @@
 # Use of this source code is governed by a BSD-style license that can be
 # found in the LICENSE file.
 
-import os
-import common
+"""Site extensions to server_job.  Adds distribute_across_machines()."""
+
+import os, logging, multiprocessing
+from autotest_lib.server import site_gtest_runner, site_server_job_utils
+from autotest_lib.server import subcommand
+from autotest_lib.server.server_job import base_server_job
 import utils
 
 
@@ -37,5 +41,103 @@
     return site_job_data
 
 
-class site_server_job(object):
-    pass
+class site_server_job(base_server_job):
+    """Extend server_job adding distribute_across_machines."""
+
+    def __init__(self, *args, **dargs):
+        super(site_server_job, self).__init__(*args, **dargs)
+
+
+    def run(self, *args, **dargs):
+        """Extend server_job.run adding gtest_runner to the namespace."""
+
+        gtest_run = {'gtest_runner': site_gtest_runner.gtest_runner()}
+
+        # Namespace is the 5th parameter to run().  If args has 5 or more
+        # entries in it then we need to fix-up this namespace entry.
+        if len(args) >= 5:
+            args[4].update(gtest_run)
+        # Else, if present, namespace must be in dargs.
+        else:
+            dargs.setdefault('namespace', gtest_run).update(gtest_run)
+        # Now call the original run() with the modified namespace containing a
+        # gtest_runner
+        super(site_server_job, self).run(*args, **dargs)
+
+
+    def distribute_across_machines(self, tests, machines,
+                                   continuous_parsing=False):
+        """Run each test in tests once using machines.
+
+        Instead of running each test on each machine like parallel_on_machines,
+        run each test once across all machines. Put another way, the total
+        number of tests run by parallel_on_machines is len(tests) *
+        len(machines). The number of tests run by distribute_across_machines is
+        len(tests).
+
+        Args:
+            tests: List of tests to run.
+            machines: List of machines to use.
+            continuous_parsing: Bool, if true parse job while running.
+        """
+        # The Queue is thread safe, but since a machine may have to search
+        # through the queue to find a valid test the lock provides exclusive
+        # queue access for more than just the get call.
+        test_queue = multiprocessing.JoinableQueue()
+        test_queue_lock = multiprocessing.Lock()
+
+        unique_machine_attributes = []
+        sub_commands = []
+        work_dir = self.resultdir
+
+        for machine in machines:
+            if 'group' in self.resultdir:
+                work_dir = os.path.join(self.resultdir, machine)
+
+            mw = site_server_job_utils.machine_worker(self,
+                                                      machine,
+                                                      work_dir,
+                                                      test_queue,
+                                                      test_queue_lock,
+                                                      continuous_parsing)
+
+            # Create the subcommand instance to run this machine worker.
+            sub_commands.append(subcommand.subcommand(mw.run,
+                                                      [],
+                                                      work_dir))
+
+            # To (potentially) speed up searching for valid tests create a list
+            # of unique attribute sets present in the machines for this job. If
+            # sets were hashable we could just use a dictionary for fast
+            # verification. This at least reduces the search space from the
+            # number of machines to the number of unique machines.
+            if not mw.attribute_set in unique_machine_attributes:
+                unique_machine_attributes.append(mw.attribute_set)
+
+        # Only queue tests which are valid on at least one machine.  Record
+        # skipped tests in the status.log file using record_skipped_test().
+        for test_entry in tests:
+            ti = site_server_job_utils.test_item(*test_entry)
+            machine_found = False
+            for ma in unique_machine_attributes:
+                if ti.validate(ma):
+                    test_queue.put(ti)
+                    machine_found = True
+                    break
+            if not machine_found:
+                self.record_skipped_test(ti)
+
+        # Run valid tests and wait for completion.
+        subcommand.parallel(sub_commands)
+
+
+    def record_skipped_test(self, skipped_test, message=None):
+        """Insert a failure record into status.log for this test."""
+        msg = message
+        if msg is None:
+            msg = 'No valid machines found for test %s.' % skipped_test
+        logging.info(msg)
+        self.record('START', None, skipped_test.test_name)
+        self.record('INFO', None, skipped_test.test_name, msg)
+        self.record('END TEST_NA', None, skipped_test.test_name, msg)
+
diff --git a/server/server_job_utils.py b/server/site_server_job_utils.py
similarity index 71%
rename from server/server_job_utils.py
rename to server/site_server_job_utils.py
index 49d06e7..2cb9c6f 100644
--- a/server/server_job_utils.py
+++ b/server/site_server_job_utils.py
@@ -2,7 +2,7 @@
 # Use of this source code is governed by a BSD-style license that can be
 # found in the LICENSE file.
 
-"""Utility classes used by server_job.distribute_across_machines().
+"""Utility classes used by site_server_job.distribute_across_machines().
 
 test_item: extends the basic test tuple to add include/exclude attributes and
     pre/post actions.
@@ -13,54 +13,40 @@
 """
 
 
-import logging, os, Queue, threading
+import logging, os, Queue
 from autotest_lib.client.common_lib import error, utils
-from autotest_lib.server import autotest, hosts, host_attributes, subcommand
+from autotest_lib.server import autotest, hosts, host_attributes
 
 
 class test_item(object):
     """Adds machine verification logic to the basic test tuple.
 
     Tests can either be tuples of the existing form ('testName', {args}) or the
-    extended form ('testname', {args}, ['include'], ['exclude'], ['attribs'])
-    where include and exclude are lists of attributes and actions is a list of
-    strings. A machine must have all the attributes in include and must not
-    have any of the attributes in exclude to be valid for the test. Attribs
-    strings can include 'reboot_before', 'reboot_after', and 'server_job'
+    extended form ('testname', {args}, {'include': [], 'exclude': [],
+    'attributes': []}) where include and exclude are lists of host attribute
+    labels and attributes is a list of strings. A machine must have all the
+    labels in include and must not have any of the labels in exclude to be valid
+    for the test. Attributes strings can include reboot_before, reboot_after,
+    and server_job.
     """
 
-    def __init__(self, test_name, test_args, include_attribs=None,
-                 exclude_attribs=None, test_attribs=None):
+    def __init__(self, test_name, test_args, test_attribs={}):
         """Creates an instance of test_item.
 
         Args:
             test_name: string, name of test to execute.
             test_args: dictionary, arguments to pass into test.
-            include_attribs: attributes a machine must have to run test.
-            exclude_attribs: attributes preventing a machine from running test.
-            test_attribs: reboot before/after, run as server job.
+            test_attribs: Dictionary of test attributes. Valid keys are:
+              include - labels a machine must have to run a test.
+              exclude - labels preventing a machine from running a test.
+              attributes - reboot before/after test, run test as server job.
         """
         self.test_name = test_name
         self.test_args = test_args
 
-        # Remove host parameter if present. Autotest defaults to the client
-        # host so warn if host set to any other value.
-        if 'host' in self.test_args:
-            if self.test_args['host'] != 'client':
-                logging.error('Unsupported test parameter host=%s.',
-                              self.test_args['host'])
-
-        self.inc_set = None
-        if include_attribs is not None:
-            self.inc_set = set(include_attribs)
-
-        self.exc_set = None
-        if exclude_attribs is not None:
-            self.exc_set = set(exclude_attribs)
-
-        self.test_attribs = []
-        if test_attribs is not None:
-            self.test_attribs = test_attribs
+        self.inc_set = set(test_attribs.get('include', []))
+        self.exc_set = set(test_attribs.get('exclude', []))
+        self.attributes = test_attribs.get('attributes', [])
 
     def __str__(self):
         """Return an info string of this test."""
@@ -70,8 +56,8 @@
             msg += ' include=%s' % [s for s in self.inc_set]
         if self.exc_set:
             msg += ' exclude=%s' % [s for s in self.exc_set]
-        if self.test_attribs:
-            msg += ' attributes=%s' % self.test_attribs
+        if self.attributes:
+            msg += ' attributes=%s' % self.attributes
         return msg
 
     def validate(self, machine_attributes):
@@ -103,12 +89,13 @@
         Args:
             client_at: Autotest instance for this host.
             work_dir: Directory to use for results and log files.
+            server_job: Server_Job instance to use to runs server tests.
         """
-        if 'reboot_before' in self.test_attribs:
+        if 'reboot_before' in self.attributes:
             client_at.host.reboot()
 
         try:
-            if 'server_job' in self.test_attribs:
+            if 'server_job' in self.attributes:
                 if 'host' in self.test_args:
                     self.test_args['host'] = client_at.host
                 if server_job is not None:
@@ -121,12 +108,12 @@
                 client_at.run_test(self.test_name, results_dir=work_dir,
                                    **self.test_args)
         finally:
-            if 'reboot_after' in self.test_attribs:
+            if 'reboot_after' in self.attributes:
                 client_at.host.reboot()
 
 
-class machine_worker(threading.Thread):
-    """Thread that runs tests on a remote host machine."""
+class machine_worker(object):
+    """Worker that runs tests on a remote host machine."""
 
     def __init__(self, server_job, machine, work_dir, test_queue, queue_lock,
                  continuous_parsing=False):
@@ -146,7 +133,6 @@
             queue_lock: lock protecting test_queue.
             continuous_parsing: bool, enable continuous parsing.
         """
-        threading.Thread.__init__(self)
         self._server_job = server_job
         self._test_queue = test_queue
         self._test_queue_lock = queue_lock
@@ -158,14 +144,11 @@
         client_attributes = host_attributes.host_attributes(machine)
         self.attribute_set = set(client_attributes.get_attributes())
         self._results_dir = work_dir
-        # Only create machine subdir when running a multi-machine job.
-        if not self._machine in work_dir:
-            self._results_dir = os.path.join(work_dir, self._machine)
-            if not os.path.exists(self._results_dir):
-                os.makedirs(self._results_dir)
-            machine_data = {'hostname': self._machine,
-                            'status_version': str(1)}
-            utils.write_keyval(self._results_dir, machine_data)
+        if not os.path.exists(self._results_dir):
+            os.makedirs(self._results_dir)
+        machine_data = {'hostname': self._machine,
+                        'status_version': str(1)}
+        utils.write_keyval(self._results_dir, machine_data)
 
     def __str__(self):
         attributes = [a for a in self.attribute_set]
@@ -207,21 +190,6 @@
         return good_test
 
     def run(self):
-        """Use subcommand to fork process and execute tests.
-
-        The forked processes prevents log files from simultaneous tests
-        interweaving with each other. Logging doesn't communicate host autotest
-        to client autotest, it communicates host module to client autotest.  So
-        different server side autotest instances share the same module and
-        require split processes to have clean logging.
-        """
-        sub_cmd = subcommand.subcommand(self._run,
-                                        [],
-                                        self._results_dir)
-        sub_cmd.fork_start()
-        sub_cmd.fork_waitfor()
-
-    def _run(self):
         """Executes tests on the host machine.
 
         If continuous parsing was requested, start the parser before running
diff --git a/server/site_tests/suites/control.bvt b/server/site_tests/suites/control.bvt
index 06fc09f..f3bb07e 100644
--- a/server/site_tests/suites/control.bvt
+++ b/server/site_tests/suites/control.bvt
@@ -52,8 +52,8 @@
   ('network_DisableInterface', {'iface_name': 'wifi_only',
                                 'tag': 'wifi_only'}),
   ('network_Ping', {}),
-  ('power_Resume', {}, [], ['has_resume_bug']),
-  ('platform_Shutdown', {}, [], [], ['reboot_before']),
+  ('power_Resume', {}, {'exclude': ['has_resume_bug']}),
+  ('platform_Shutdown', {}, {'attributes': ['reboot_before']}),
 ]
 
 job.distribute_across_machines(TESTS, machines)
diff --git a/server/site_tests/suites/control.regression b/server/site_tests/suites/control.regression
index 10dfeb9..4af854d 100644
--- a/server/site_tests/suites/control.regression
+++ b/server/site_tests/suites/control.regression
@@ -85,19 +85,21 @@
 # ('platform_ToolchainOptions', {}),
   ('realtimecomm_GTalkAudioBench', {}),
   ('realtimecomm_GTalkunittest', {}),
-  ('hardware_SsdDetection', {}, ['has_ssd']),
+  ('hardware_SsdDetection', {}, {'include': ['has_ssd']}),
 # TODO(dalecurtis): Re-enable if we ever have any platforms with Bluetooth.
 #  ('network_DisableInterface', {'iface_name' : 'hci0',
 #                                'tag' : 'hci0'}, ['has_bluetooth']),
-  ('network_WiFiCaps', {}, ['has_80211n']),
-  ('power_Resume', {}, [], ['has_resume_bug']),
+  ('network_WiFiCaps', {}, {'include': ['has_80211n']}),
+  ('power_Resume', {}, {'exclude': ['has_resume_bug']}),
 # TODO(dalecurtis): Kernel tests disabled for all platforms until the following
 # bugs are fixed:  and http://crosbug.com/14497
 #
 #  # Disable Kernel crash tests until http://crosbug.com/14497 is fixed.
-#  ('platform_KernelErrorPaths',  {'host': 'client'}, [], [], ['server_job']),
-  ('logging_KernelCrashServer',  {'host': 'client'}, ['has_chromeos_firmware'],
-                                 [], ['server_job']),
+#  ('platform_KernelErrorPaths', {'host': 'client'},
+#                                {'attributes': ['server_job']})),
+  ('logging_KernelCrashServer', {'host': 'client'},
+                                {'include': ['has_chromeos_firmware'],
+                                 'attributes': ['server_job']}),
   # Some of the tests in the main regression suite must be run last to prevent
   # cross-test issues. Always add new suites above this line.
   #-----------------------------------------------------------------------------
@@ -107,7 +109,7 @@
   # This test stops tcsd which is known to cause issues with cryptohome and
   # possibly other services. Specifically, cros_ui_test based tests will fail;
   # so always run it last. See http://crosbug.com/14265
-  ('hardware_TPMCheck', {}, ['has_chromeos_firmware']),
+  ('hardware_TPMCheck', {}, {'include': ['has_chromeos_firmware']}),
 ]
 
 job.distribute_across_machines(TESTS, machines)