Move distribute_across_machines and gtest to site extension.
(Patch 11: Simplify how site_server_job.run() inserts gtest_runner
into the namespace parameter of server_job.run().)
(Patch 10: Update gtest_parser to warn and gracefully exit if the test
log file does not exist. This happens in a gtest suite
throws a test exception before completing and copying the
test log file back to the server.
Also updated FailureDescription() to not return the name of
the test to simplify checking for failure lines. This
makes "if failures: then use failures" work instead of
"if len(failures) >1: then use failures[1:]")
(Patch 9: Code review fixes, clearer comments and one line argument
parsing.)
(Patch 8: Fix PyAuto parse failure with no error lines.)
To make keeping in step with upstream Autotest moving the
distribute_across_machines and gtest_runner into site extensions.
Clean up the old include, exclude, action test attributes from the old
(test_name, {args}, [include], [exclude], [action]) to a cleaner form
of (test_name, {args}, {attributes} where the attributes dictionary
is keyed to include, exclude, and attributes for the same behavior as
before in a nicer format.
Updated BVT and Regressions to use the new format.
Server_Job:
Removed unused imports I added for removed functions.
Move site functions to end of file to enable importing
base_server_job.
Removed distribute_across_machines() and record_skipped_test().
Removed gtest_runner from the default namespace.
Site_Server_Job:
Added imports and functions removed from server_job.
Changed distribute_across_machines from using threads that
launched subprocesses to just using subprocesses.
Site_Server_Job_Utils:
Fixed test attributes to use a dictionary instead of 3 lists.
Enabled running server jobs in addition to client jobs.
Removed base thread class from machine_worker since the instances
are run by subcommands now.
logging_KernelCrashServer
http://pauldean.kir/afe/#tab_id=view_job&object_id=327
BVT
http://pauldean.kir/afe/#tab_id=view_job&object_id=328
Regression
http://pauldean.kir/afe/#tab_id=view_job&object_id=330
BUG=None.
TEST=Local Runs.
Change-Id: I118ae8bdc2b49d4190051d59a748ecb01d0da33c
Reviewed-on: http://gerrit.chromium.org/gerrit/2698
Reviewed-by: Dale Curtis <dalecurtis@chromium.org>
Tested-by: Paul Pendlebury <pauldean@chromium.org>
diff --git a/server/server_job.py b/server/server_job.py
index ba5f0a2..7b207b2 100644
--- a/server/server_job.py
+++ b/server/server_job.py
@@ -10,14 +10,12 @@
"""
import getpass, os, sys, re, stat, tempfile, time, select, subprocess, platform
-import multiprocessing
import traceback, shutil, warnings, fcntl, pickle, logging, itertools, errno
from autotest_lib.client.bin import sysinfo
from autotest_lib.client.common_lib import base_job
from autotest_lib.client.common_lib import error, log, utils, packages
from autotest_lib.client.common_lib import logging_manager
-from autotest_lib.server import test, subcommand, profilers, server_job_utils
-from autotest_lib.server import gtest_runner
+from autotest_lib.server import test, subcommand, profilers
from autotest_lib.server.hosts import abstract_ssh
from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils
@@ -47,12 +45,6 @@
return {}
-# load up site-specific code for generating site-specific job data
-get_site_job_data = utils.import_site_function(__file__,
- "autotest_lib.server.site_server_job", "get_site_job_data",
- _get_site_job_data_dummy)
-
-
class status_indenter(base_job.status_indenter):
"""Provide a simple integer-backed status indenter."""
def __init__(self):
@@ -466,75 +458,6 @@
return success_machines
- def distribute_across_machines(self, tests, machines,
- continuous_parsing=False):
- """Run each test in tests once using machines.
-
- Instead of running each test on each machine like parallel_on_machines,
- run each test once across all machines. Put another way, the total
- number of tests run by parallel_on_machines is len(tests) *
- len(machines). The number of tests run by distribute_across_machines is
- len(tests).
-
- Args:
- tests: List of tests to run.
- machines: List of machines to use.
- continuous_parsing: Bool, if true parse job while running.
- """
- # The Queue is thread safe, but since a machine may have to search
- # through the queue to find a valid test the lock provides exclusive
- # queue access for more than just the get call.
- test_queue = multiprocessing.JoinableQueue()
- test_queue_lock = multiprocessing.Lock()
-
- machine_workers = [server_job_utils.machine_worker(self,
- machine,
- self.resultdir,
- test_queue,
- test_queue_lock,
- continuous_parsing)
- for machine in machines]
-
- # To (potentially) speed up searching for valid tests create a list of
- # unique attribute sets present in the machines for this job. If sets
- # were hashable we could just use a dictionary for fast verification.
- # This at least reduces the search space from the number of machines to
- # the number of unique machines.
- unique_machine_attributes = []
- for mw in machine_workers:
- if not mw.attribute_set in unique_machine_attributes:
- unique_machine_attributes.append(mw.attribute_set)
-
- # Only queue tests which are valid on at least one machine. Record
- # skipped tests in the status.log file using record_skipped_test().
- for test_entry in tests:
- ti = server_job_utils.test_item(*test_entry)
- machine_found = False
- for ma in unique_machine_attributes:
- if ti.validate(ma):
- test_queue.put(ti)
- machine_found = True
- break
- if not machine_found:
- self.record_skipped_test(ti)
-
- # Run valid tests and wait for completion.
- for worker in machine_workers:
- worker.start()
- test_queue.join()
-
-
- def record_skipped_test(self, skipped_test, message=None):
- """Insert a failure record into status.log for this test."""
- msg = message
- if msg is None:
- msg = 'No valid machines found for test %s.' % skipped_test
- logging.info(msg)
- self.record('START', None, skipped_test.test_name)
- self.record('INFO', None, skipped_test.test_name, msg)
- self.record('END TEST_NA', None, skipped_test.test_name, msg)
-
-
_USE_TEMP_DIR = object()
def run(self, cleanup=False, install_before=False, install_after=False,
collect_crashdumps=True, namespace={}, control=None,
@@ -567,7 +490,6 @@
control_file_dir = self.resultdir
self.aborted = False
- namespace['gtest_runner'] = gtest_runner.gtest_runner()
namespace['machines'] = machines
namespace['args'] = self.args
namespace['job'] = self
@@ -1183,14 +1105,6 @@
host.clear_known_hosts()
-site_server_job = utils.import_site_class(
- __file__, "autotest_lib.server.site_server_job", "site_server_job",
- base_server_job)
-
-class server_job(site_server_job):
- pass
-
-
class warning_manager(object):
"""Class for controlling warning logs. Manages the enabling and disabling
of warnings."""
@@ -1222,3 +1136,18 @@
intervals = self.disabled_warnings.get(warning_type, [])
if intervals and intervals[-1][1] is None:
intervals[-1] = (intervals[-1][0], int(current_time_func()))
+
+
+# load up site-specific code for generating site-specific job data
+get_site_job_data = utils.import_site_function(__file__,
+ "autotest_lib.server.site_server_job", "get_site_job_data",
+ _get_site_job_data_dummy)
+
+
+site_server_job = utils.import_site_class(
+ __file__, "autotest_lib.server.site_server_job", "site_server_job",
+ base_server_job)
+
+
+class server_job(site_server_job):
+ pass
\ No newline at end of file
diff --git a/server/gtest_runner.py b/server/site_gtest_runner.py
similarity index 97%
rename from server/gtest_runner.py
rename to server/site_gtest_runner.py
index e702437..b07bfd9 100644
--- a/server/gtest_runner.py
+++ b/server/site_gtest_runner.py
@@ -12,7 +12,7 @@
import logging, os, re
from autotest_lib.server import autotest, hosts, host_attributes
-from autotest_lib.server import server_job_utils
+from autotest_lib.server import site_server_job_utils
class gtest_runner(object):
@@ -42,7 +42,7 @@
work_dir: Local directory to run tests in.
"""
- self._gtest = server_job_utils.test_item(*gtest_entry)
+ self._gtest = site_server_job_utils.test_item(*gtest_entry)
self._host = hosts.create_host(machine)
self._results_dir = work_dir
@@ -67,11 +67,14 @@
Uses gtest_parser to pull the test results out of the gtest log file.
Then creates entries in status.log file for each test.
"""
- parser = gtest_parser()
-
# Find gtest log files from the autotest client run.
log_path = os.path.join(self._results_dir, self._gtest.test_name,
'debug', self._gtest.test_name + '.DEBUG')
+ if not os.path.exists(log_path):
+ logging.error('gtest log file "%s" is missing.', log_path)
+ return
+
+ parser = gtest_parser()
# Read the log file line-by-line, passing each line into the parser.
with open(log_path, 'r') as log_file:
@@ -83,11 +86,11 @@
# Record each failed test.
for failed in parser.FailedTests():
fail_description = parser.FailureDescription(failed)
- if len(fail_description) > 1:
- self.record_failed_test(failed, fail_description[1].strip(),
- ''.join(fail_description[1:]))
+ if fail_description:
+ self.record_failed_test(failed, fail_description[0].strip(),
+ ''.join(fail_description))
else:
- self.record_failed_test(failed, '')
+ self.record_failed_test(failed, 'NO ERROR LINES FOUND.')
# Finally record each successful test.
for passed in parser.PassedTests():
@@ -283,7 +286,7 @@
List of test name, and failure string.
"""
test_status = self._test_status.get(test, ('', []))
- return ["%s: " % test] + test_status[1]
+ return test_status[1]
def SuppressionHashes(self):
"""Returns list of suppression hashes found in the log."""
diff --git a/server/site_server_job.py b/server/site_server_job.py
index d96c288..0695c9c 100644
--- a/server/site_server_job.py
+++ b/server/site_server_job.py
@@ -2,8 +2,12 @@
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
-import os
-import common
+"""Site extensions to server_job. Adds distribute_across_machines()."""
+
+import os, logging, multiprocessing
+from autotest_lib.server import site_gtest_runner, site_server_job_utils
+from autotest_lib.server import subcommand
+from autotest_lib.server.server_job import base_server_job
import utils
@@ -37,5 +41,103 @@
return site_job_data
-class site_server_job(object):
- pass
+class site_server_job(base_server_job):
+ """Extend server_job adding distribute_across_machines."""
+
+ def __init__(self, *args, **dargs):
+ super(site_server_job, self).__init__(*args, **dargs)
+
+
+ def run(self, *args, **dargs):
+ """Extend server_job.run adding gtest_runner to the namespace."""
+
+ gtest_run = {'gtest_runner': site_gtest_runner.gtest_runner()}
+
+ # Namespace is the 5th parameter to run(). If args has 5 or more
+ # entries in it then we need to fix-up this namespace entry.
+ if len(args) >= 5:
+ args[4].update(gtest_run)
+ # Else, if present, namespace must be in dargs.
+ else:
+ dargs.setdefault('namespace', gtest_run).update(gtest_run)
+ # Now call the original run() with the modified namespace containing a
+ # gtest_runner
+ super(site_server_job, self).run(*args, **dargs)
+
+
+ def distribute_across_machines(self, tests, machines,
+ continuous_parsing=False):
+ """Run each test in tests once using machines.
+
+ Instead of running each test on each machine like parallel_on_machines,
+ run each test once across all machines. Put another way, the total
+ number of tests run by parallel_on_machines is len(tests) *
+ len(machines). The number of tests run by distribute_across_machines is
+ len(tests).
+
+ Args:
+ tests: List of tests to run.
+ machines: List of machines to use.
+ continuous_parsing: Bool, if true parse job while running.
+ """
+ # The Queue is thread safe, but since a machine may have to search
+ # through the queue to find a valid test the lock provides exclusive
+ # queue access for more than just the get call.
+ test_queue = multiprocessing.JoinableQueue()
+ test_queue_lock = multiprocessing.Lock()
+
+ unique_machine_attributes = []
+ sub_commands = []
+ work_dir = self.resultdir
+
+ for machine in machines:
+ if 'group' in self.resultdir:
+ work_dir = os.path.join(self.resultdir, machine)
+
+ mw = site_server_job_utils.machine_worker(self,
+ machine,
+ work_dir,
+ test_queue,
+ test_queue_lock,
+ continuous_parsing)
+
+ # Create the subcommand instance to run this machine worker.
+ sub_commands.append(subcommand.subcommand(mw.run,
+ [],
+ work_dir))
+
+ # To (potentially) speed up searching for valid tests create a list
+ # of unique attribute sets present in the machines for this job. If
+ # sets were hashable we could just use a dictionary for fast
+ # verification. This at least reduces the search space from the
+ # number of machines to the number of unique machines.
+ if not mw.attribute_set in unique_machine_attributes:
+ unique_machine_attributes.append(mw.attribute_set)
+
+ # Only queue tests which are valid on at least one machine. Record
+ # skipped tests in the status.log file using record_skipped_test().
+ for test_entry in tests:
+ ti = site_server_job_utils.test_item(*test_entry)
+ machine_found = False
+ for ma in unique_machine_attributes:
+ if ti.validate(ma):
+ test_queue.put(ti)
+ machine_found = True
+ break
+ if not machine_found:
+ self.record_skipped_test(ti)
+
+ # Run valid tests and wait for completion.
+ subcommand.parallel(sub_commands)
+
+
+ def record_skipped_test(self, skipped_test, message=None):
+ """Insert a failure record into status.log for this test."""
+ msg = message
+ if msg is None:
+ msg = 'No valid machines found for test %s.' % skipped_test
+ logging.info(msg)
+ self.record('START', None, skipped_test.test_name)
+ self.record('INFO', None, skipped_test.test_name, msg)
+ self.record('END TEST_NA', None, skipped_test.test_name, msg)
+
diff --git a/server/server_job_utils.py b/server/site_server_job_utils.py
similarity index 71%
rename from server/server_job_utils.py
rename to server/site_server_job_utils.py
index 49d06e7..2cb9c6f 100644
--- a/server/server_job_utils.py
+++ b/server/site_server_job_utils.py
@@ -2,7 +2,7 @@
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
-"""Utility classes used by server_job.distribute_across_machines().
+"""Utility classes used by site_server_job.distribute_across_machines().
test_item: extends the basic test tuple to add include/exclude attributes and
pre/post actions.
@@ -13,54 +13,40 @@
"""
-import logging, os, Queue, threading
+import logging, os, Queue
from autotest_lib.client.common_lib import error, utils
-from autotest_lib.server import autotest, hosts, host_attributes, subcommand
+from autotest_lib.server import autotest, hosts, host_attributes
class test_item(object):
"""Adds machine verification logic to the basic test tuple.
Tests can either be tuples of the existing form ('testName', {args}) or the
- extended form ('testname', {args}, ['include'], ['exclude'], ['attribs'])
- where include and exclude are lists of attributes and actions is a list of
- strings. A machine must have all the attributes in include and must not
- have any of the attributes in exclude to be valid for the test. Attribs
- strings can include 'reboot_before', 'reboot_after', and 'server_job'
+ extended form ('testname', {args}, {'include': [], 'exclude': [],
+ 'attributes': []}) where include and exclude are lists of host attribute
+ labels and attributes is a list of strings. A machine must have all the
+ labels in include and must not have any of the labels in exclude to be valid
+ for the test. Attributes strings can include reboot_before, reboot_after,
+ and server_job.
"""
- def __init__(self, test_name, test_args, include_attribs=None,
- exclude_attribs=None, test_attribs=None):
+ def __init__(self, test_name, test_args, test_attribs={}):
"""Creates an instance of test_item.
Args:
test_name: string, name of test to execute.
test_args: dictionary, arguments to pass into test.
- include_attribs: attributes a machine must have to run test.
- exclude_attribs: attributes preventing a machine from running test.
- test_attribs: reboot before/after, run as server job.
+ test_attribs: Dictionary of test attributes. Valid keys are:
+ include - labels a machine must have to run a test.
+ exclude - labels preventing a machine from running a test.
+ attributes - reboot before/after test, run test as server job.
"""
self.test_name = test_name
self.test_args = test_args
- # Remove host parameter if present. Autotest defaults to the client
- # host so warn if host set to any other value.
- if 'host' in self.test_args:
- if self.test_args['host'] != 'client':
- logging.error('Unsupported test parameter host=%s.',
- self.test_args['host'])
-
- self.inc_set = None
- if include_attribs is not None:
- self.inc_set = set(include_attribs)
-
- self.exc_set = None
- if exclude_attribs is not None:
- self.exc_set = set(exclude_attribs)
-
- self.test_attribs = []
- if test_attribs is not None:
- self.test_attribs = test_attribs
+ self.inc_set = set(test_attribs.get('include', []))
+ self.exc_set = set(test_attribs.get('exclude', []))
+ self.attributes = test_attribs.get('attributes', [])
def __str__(self):
"""Return an info string of this test."""
@@ -70,8 +56,8 @@
msg += ' include=%s' % [s for s in self.inc_set]
if self.exc_set:
msg += ' exclude=%s' % [s for s in self.exc_set]
- if self.test_attribs:
- msg += ' attributes=%s' % self.test_attribs
+ if self.attributes:
+ msg += ' attributes=%s' % self.attributes
return msg
def validate(self, machine_attributes):
@@ -103,12 +89,13 @@
Args:
client_at: Autotest instance for this host.
work_dir: Directory to use for results and log files.
+ server_job: Server_Job instance to use to runs server tests.
"""
- if 'reboot_before' in self.test_attribs:
+ if 'reboot_before' in self.attributes:
client_at.host.reboot()
try:
- if 'server_job' in self.test_attribs:
+ if 'server_job' in self.attributes:
if 'host' in self.test_args:
self.test_args['host'] = client_at.host
if server_job is not None:
@@ -121,12 +108,12 @@
client_at.run_test(self.test_name, results_dir=work_dir,
**self.test_args)
finally:
- if 'reboot_after' in self.test_attribs:
+ if 'reboot_after' in self.attributes:
client_at.host.reboot()
-class machine_worker(threading.Thread):
- """Thread that runs tests on a remote host machine."""
+class machine_worker(object):
+ """Worker that runs tests on a remote host machine."""
def __init__(self, server_job, machine, work_dir, test_queue, queue_lock,
continuous_parsing=False):
@@ -146,7 +133,6 @@
queue_lock: lock protecting test_queue.
continuous_parsing: bool, enable continuous parsing.
"""
- threading.Thread.__init__(self)
self._server_job = server_job
self._test_queue = test_queue
self._test_queue_lock = queue_lock
@@ -158,14 +144,11 @@
client_attributes = host_attributes.host_attributes(machine)
self.attribute_set = set(client_attributes.get_attributes())
self._results_dir = work_dir
- # Only create machine subdir when running a multi-machine job.
- if not self._machine in work_dir:
- self._results_dir = os.path.join(work_dir, self._machine)
- if not os.path.exists(self._results_dir):
- os.makedirs(self._results_dir)
- machine_data = {'hostname': self._machine,
- 'status_version': str(1)}
- utils.write_keyval(self._results_dir, machine_data)
+ if not os.path.exists(self._results_dir):
+ os.makedirs(self._results_dir)
+ machine_data = {'hostname': self._machine,
+ 'status_version': str(1)}
+ utils.write_keyval(self._results_dir, machine_data)
def __str__(self):
attributes = [a for a in self.attribute_set]
@@ -207,21 +190,6 @@
return good_test
def run(self):
- """Use subcommand to fork process and execute tests.
-
- The forked processes prevents log files from simultaneous tests
- interweaving with each other. Logging doesn't communicate host autotest
- to client autotest, it communicates host module to client autotest. So
- different server side autotest instances share the same module and
- require split processes to have clean logging.
- """
- sub_cmd = subcommand.subcommand(self._run,
- [],
- self._results_dir)
- sub_cmd.fork_start()
- sub_cmd.fork_waitfor()
-
- def _run(self):
"""Executes tests on the host machine.
If continuous parsing was requested, start the parser before running
diff --git a/server/site_tests/suites/control.bvt b/server/site_tests/suites/control.bvt
index 06fc09f..f3bb07e 100644
--- a/server/site_tests/suites/control.bvt
+++ b/server/site_tests/suites/control.bvt
@@ -52,8 +52,8 @@
('network_DisableInterface', {'iface_name': 'wifi_only',
'tag': 'wifi_only'}),
('network_Ping', {}),
- ('power_Resume', {}, [], ['has_resume_bug']),
- ('platform_Shutdown', {}, [], [], ['reboot_before']),
+ ('power_Resume', {}, {'exclude': ['has_resume_bug']}),
+ ('platform_Shutdown', {}, {'attributes': ['reboot_before']}),
]
job.distribute_across_machines(TESTS, machines)
diff --git a/server/site_tests/suites/control.regression b/server/site_tests/suites/control.regression
index 10dfeb9..4af854d 100644
--- a/server/site_tests/suites/control.regression
+++ b/server/site_tests/suites/control.regression
@@ -85,19 +85,21 @@
# ('platform_ToolchainOptions', {}),
('realtimecomm_GTalkAudioBench', {}),
('realtimecomm_GTalkunittest', {}),
- ('hardware_SsdDetection', {}, ['has_ssd']),
+ ('hardware_SsdDetection', {}, {'include': ['has_ssd']}),
# TODO(dalecurtis): Re-enable if we ever have any platforms with Bluetooth.
# ('network_DisableInterface', {'iface_name' : 'hci0',
# 'tag' : 'hci0'}, ['has_bluetooth']),
- ('network_WiFiCaps', {}, ['has_80211n']),
- ('power_Resume', {}, [], ['has_resume_bug']),
+ ('network_WiFiCaps', {}, {'include': ['has_80211n']}),
+ ('power_Resume', {}, {'exclude': ['has_resume_bug']}),
# TODO(dalecurtis): Kernel tests disabled for all platforms until the following
# bugs are fixed: and http://crosbug.com/14497
#
# # Disable Kernel crash tests until http://crosbug.com/14497 is fixed.
-# ('platform_KernelErrorPaths', {'host': 'client'}, [], [], ['server_job']),
- ('logging_KernelCrashServer', {'host': 'client'}, ['has_chromeos_firmware'],
- [], ['server_job']),
+# ('platform_KernelErrorPaths', {'host': 'client'},
+# {'attributes': ['server_job']})),
+ ('logging_KernelCrashServer', {'host': 'client'},
+ {'include': ['has_chromeos_firmware'],
+ 'attributes': ['server_job']}),
# Some of the tests in the main regression suite must be run last to prevent
# cross-test issues. Always add new suites above this line.
#-----------------------------------------------------------------------------
@@ -107,7 +109,7 @@
# This test stops tcsd which is known to cause issues with cryptohome and
# possibly other services. Specifically, cros_ui_test based tests will fail;
# so always run it last. See http://crosbug.com/14265
- ('hardware_TPMCheck', {}, ['has_chromeos_firmware']),
+ ('hardware_TPMCheck', {}, {'include': ['has_chromeos_firmware']}),
]
job.distribute_across_machines(TESTS, machines)