blob: ecf1b7b6d76b022c91855d29066808c740783750 [file] [log] [blame]
# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import common
import compiler, logging, os, random, re, time
from autotest_lib.client.common_lib import control_data, global_config, error
from autotest_lib.client.common_lib import utils
from autotest_lib.server.cros import control_file_getter
from autotest_lib.server import frontend
VERSION_PREFIX = 'cros-version-'
CONFIG = global_config.global_config
def inject_vars(vars, control_file_in):
"""
Inject the contents of |vars| into |control_file_in|
@param vars: a dict to shoehorn into the provided control file string.
@param control_file_in: the contents of a control file to munge.
@return the modified control file string.
"""
control_file = ''
for key, value in vars.iteritems():
control_file += "%s='%s'\n" % (key, value)
return control_file + control_file_in
def _image_url_pattern():
return CONFIG.get_config_value('CROS', 'image_url_pattern', type=str)
def _package_url_pattern():
return CONFIG.get_config_value('CROS', 'package_url_pattern', type=str)
class Reimager(object):
"""
A class that can run jobs to reimage devices.
@var _afe: a frontend.AFE instance used to talk to autotest.
@var _tko: a frontend.TKO instance used to query the autotest results db.
@var _cf_getter: a ControlFileGetter used to get the AU control file.
"""
def __init__(self, autotest_dir, afe=None, tko=None):
"""
Constructor
@param autotest_dir: the place to find autotests.
@param afe: an instance of AFE as defined in server/frontend.py.
@param tko: an instance of TKO as defined in server/frontend.py.
"""
self._afe = afe or frontend.AFE(debug=False)
self._tko = tko or frontend.TKO(debug=False)
self._cf_getter = control_file_getter.FileSystemGetter(
[os.path.join(autotest_dir, 'server/site_tests')])
def skip(self, g):
return 'SKIP_IMAGE' in g and g['SKIP_IMAGE']
def attempt(self, build, board, record, num=None):
"""
Synchronously attempt to reimage some machines.
Fire off attempts to reimage |num| machines of type |board|, using an
image at |url| called |build|. Wait for completion, polling every
10s, and log results with |record| upon completion.
@param build: the build to install e.g.
x86-alex-release/R18-1655.0.0-a1-b1584.
@param board: which kind of devices to reimage.
@param record: callable that records job status.
prototype:
record(status, subdir, name, reason)
@param num: how many devices to reimage.
@return True if all reimaging jobs succeed, false otherwise.
"""
if not num:
num = CONFIG.get_config_value('CROS', 'sharding_factor', type=int)
logging.debug("scheduling reiamging across %d machines", num)
wrapper_job_name = 'try new image'
record('START', None, wrapper_job_name)
self._ensure_version_label(VERSION_PREFIX + build)
canary = self._schedule_reimage_job(build, num, board)
logging.debug('Created re-imaging job: %d', canary.id)
while len(self._afe.get_jobs(id=canary.id, not_yet_run=True)) > 0:
time.sleep(10)
logging.debug('Re-imaging job running.')
while len(self._afe.get_jobs(id=canary.id, finished=True)) == 0:
time.sleep(10)
logging.debug('Re-imaging job finished.')
canary.result = self._afe.poll_job_results(self._tko, canary, 0)
if canary.result is True:
self._report_results(canary, record)
record('END GOOD', None, wrapper_job_name)
return True
if canary.result is None:
record('FAIL', None, canary.name, 're-imaging tasks did not run')
else: # canary.result is False
self._report_results(canary, record)
record('END FAIL', None, wrapper_job_name)
return False
def _ensure_version_label(self, name):
"""
Ensure that a label called |name| exists in the autotest DB.
@param name: the label to check for/create.
"""
labels = self._afe.get_labels(name=name)
if len(labels) == 0:
self._afe.create_label(name=name)
def _schedule_reimage_job(self, build, num_machines, board):
"""
Schedules the reimaging of |num_machines| |board| devices with |image|.
Sends an RPC to the autotest frontend to enqueue reimaging jobs on
|num_machines| devices of type |board|
@param build: the build to install (must be unique).
@param num_machines: how many devices to reimage.
@param board: which kind of devices to reimage.
@return a frontend.Job object for the reimaging job we scheduled.
"""
control_file = inject_vars(
{'image_url': _image_url_pattern() % build, 'image_name': build},
self._cf_getter.get_control_file_contents_by_name('autoupdate'))
return self._afe.create_job(control_file=control_file,
name=build + '-try',
control_type='Server',
meta_hosts=[board] * num_machines)
def _report_results(self, job, record):
"""
Record results from a completed frontend.Job object.
@param job: a completed frontend.Job object populated by
frontend.AFE.poll_job_results.
@param record: callable that records job status.
prototype:
record(status, subdir, name, reason)
"""
if job.result == True:
record('GOOD', None, job.name)
return
for platform in job.results_platform_map:
for status in job.results_platform_map[platform]:
if status == 'Total':
continue
for host in job.results_platform_map[platform][status]:
if host not in job.test_status:
record('ERROR', None, host, 'Job failed to run.')
elif status == 'Failed':
for test_status in job.test_status[host].fail:
record('FAIL', None, host, test_status.reason)
elif status == 'Aborted':
for test_status in job.test_status[host].fail:
record('ABORT', None, host, test_status.reason)
elif status == 'Completed':
record('GOOD', None, host)
class Suite(object):
"""
A suite of tests, defined by some predicate over control file variables.
Given a place to search for control files a predicate to match the desired
tests, can gather tests and fire off jobs to run them, and then wait for
results.
@var _predicate: a function that should return True when run over a
ControlData representation of a control file that should be in
this Suite.
@var _tag: a string with which to tag jobs run in this suite.
@var _afe: an instance of AFE as defined in server/frontend.py.
@var _tko: an instance of TKO as defined in server/frontend.py.
@var _jobs: currently scheduled jobs, if any.
@var _cf_getter: a control_file_getter.ControlFileGetter
"""
@staticmethod
def create_fs_getter(autotest_dir):
"""
@param autotest_dir: the place to find autotests.
@return a FileSystemGetter instance that looks under |autotest_dir|.
"""
# currently hard-coded places to look for tests.
subpaths = ['server/site_tests', 'client/site_tests']
directories = [os.path.join(autotest_dir, p) for p in subpaths]
return control_file_getter.FileSystemGetter(directories)
@staticmethod
def create_from_name(name, autotest_dir, afe=None, tko=None):
"""
Create a Suite using a predicate based on the SUITE control file var.
Makes a predicate based on |name| and uses it to instantiate a Suite
that looks for tests in |autotest_dir| and will schedule them using
|afe|. Results will be pulled from |tko| upon completion
@param name: a value of the SUITE control file variable to search for.
@param autotest_dir: the place to find autotests.
@param afe: an instance of AFE as defined in server/frontend.py.
@param tko: an instance of TKO as defined in server/frontend.py.
@return a Suite instance.
"""
return Suite(lambda t: hasattr(t, 'suite') and t.suite == name,
name, autotest_dir, afe, tko)
def __init__(self, predicate, tag, autotest_dir, afe=None, tko=None):
"""
Constructor
@param predicate: a function that should return True when run over a
ControlData representation of a control file that should be in
this Suite.
@param tag: a string with which to tag jobs run in this suite.
@param autotest_dir: the place to find autotests.
@param afe: an instance of AFE as defined in server/frontend.py.
@param tko: an instance of TKO as defined in server/frontend.py.
"""
self._predicate = predicate
self._tag = tag
self._afe = afe or frontend.AFE(debug=False)
self._tko = tko or frontend.TKO(debug=False)
self._jobs = []
self._cf_getter = Suite.create_fs_getter(autotest_dir)
self._tests = Suite.find_and_parse_tests(self._cf_getter,
self._predicate,
add_experimental=True)
@property
def tests(self):
"""
A list of ControlData objects in the suite, with added |text| attr.
"""
return self._tests
def stable_tests(self):
"""
|self.tests|, filtered for non-experimental tests.
"""
return filter(lambda t: not t.experimental, self.tests)
def unstable_tests(self):
"""
|self.tests|, filtered for experimental tests.
"""
return filter(lambda t: t.experimental, self.tests)
def _create_job(self, test, image_name):
"""
Thin wrapper around frontend.AFE.create_job().
@param test: ControlData object for a test to run.
@param image_name: the name of an image against which to test.
@return frontend.Job object for the job just scheduled.
"""
return self._afe.create_job(
control_file=test.text,
name='/'.join([image_name, self._tag, test.name]),
control_type=test.test_type.capitalize(),
meta_hosts=[VERSION_PREFIX+image_name])
def run_and_wait(self, image_name, record, add_experimental=True):
"""
Synchronously run tests in |self.tests|.
Schedules tests against a device running image |image_name|, and
then polls for status, using |record| to print status when each
completes.
Tests returned by self.stable_tests() will always be run, while tests
in self.unstable_tests() will only be run if |add_experimental| is true.
@param image_name: the name of an image against which to test.
@param record: callable that records job status.
prototype:
record(status, subdir, name, reason)
@param add_experimental: schedule experimental tests as well, or not.
"""
try:
record('INFO', None, 'Start %s' % self._tag)
self.schedule(image_name, add_experimental)
try:
for result in self.wait_for_results():
# |result| will be a tuple of a maximum of 4 entries and a
# minimum of 3. We use the first 3 for START and END
# entries so we separate those variables out for legible
# variable names, nothing more.
status = result[0]
test_name = result[2]
record('START', None, test_name)
record(*result)
record('END %s' % status, None, test_name)
except Exception as e:
logging.error(e)
record('FAIL', None, self._tag,
'Exception waiting for results')
except Exception as e:
logging.error(e)
record('FAIL', None, self._tag,
'Exception while scheduling suite')
def schedule(self, image_name, add_experimental=True):
"""
Schedule jobs using |self._afe|.
frontend.Job objects representing each scheduled job will be put in
|self._jobs|.
@param image_name: the name of an image against which to test.
@param add_experimental: schedule experimental tests as well, or not.
"""
for test in self.stable_tests():
logging.debug('Scheduling %s', test.name)
self._jobs.append(self._create_job(test, image_name))
if add_experimental:
# TODO(cmasone): ensure I can log results from these differently.
for test in self.unstable_tests():
logging.debug('Scheduling %s', test.name)
self._jobs.append(self._create_job(test, image_name))
def _status_is_relevant(self, status):
"""
Indicates whether the status of a given test is meaningful or not.
@param status: frontend.TestStatus object to look at.
@return True if this is a test result worth looking at further.
"""
return not (status.test_name.startswith('SERVER_JOB') or
status.test_name.startswith('CLIENT_JOB'))
def _collate_aborted(self, current_value, entry):
"""
reduce() over a list of HostQueueEntries for a job; True if any aborted.
Functor that can be reduced()ed over a list of
HostQueueEntries for a job. If any were aborted
(|entry.aborted| exists and is True), then the reduce() will
return True.
Ex:
entries = self._afe.run('get_host_queue_entries', job=job.id)
reduce(self._collate_aborted, entries, False)
@param current_value: the current accumulator (a boolean).
@param entry: the current entry under consideration.
@return the value of |entry.aborted| if it exists, False if not.
"""
return current_value or ('aborted' in entry and entry['aborted'])
def wait_for_results(self):
"""
Wait for results of all tests in all jobs in |self._jobs|.
Currently polls for results every 5s. When all results are available,
@return a list of tuples, one per test: (status, subdir, name, reason)
"""
while self._jobs:
for job in list(self._jobs):
if not self._afe.get_jobs(id=job.id, finished=True):
continue
self._jobs.remove(job)
entries = self._afe.run('get_host_queue_entries', job=job.id)
if reduce(self._collate_aborted, entries, False):
yield('ABORT', None, job.name)
else:
statuses = self._tko.get_status_counts(job=job.id)
for s in filter(self._status_is_relevant, statuses):
yield(s.status, None, s.test_name, s.reason)
time.sleep(5)
@staticmethod
def find_and_parse_tests(cf_getter, predicate, add_experimental=False):
"""
Function to scan through all tests and find eligible tests.
Looks at control files returned by _cf_getter.get_control_file_list()
for tests that pass self._predicate().
@param cf_getter: a control_file_getter.ControlFileGetter used to list
and fetch the content of control files
@param predicate: a function that should return True when run over a
ControlData representation of a control file that should be in
this Suite.
@param add_experimental: add tests with experimental attribute set.
@return list of ControlData objects that should be run, with control
file text added in |text| attribute.
"""
tests = {}
files = cf_getter.get_control_file_list()
for file in files:
text = cf_getter.get_control_file_contents(file)
try:
found_test = control_data.parse_control_string(text,
raise_warnings=True)
if not add_experimental and found_test.experimental:
continue
found_test.text = text
tests[file] = found_test
except control_data.ControlVariableException, e:
logging.warn("Skipping %s\n%s", file, e)
except Exception, e:
logging.error("Bad %s\n%s", file, e)
return [test for test in tests.itervalues() if predicate(test)]