[autotest] Create a dynamic bvt suite control file
Adds the dynamic_suite module, uses it to create a dynamic bvt suite.
BUG=chromium-os:21953
TEST=./server/autoserv test_suites/control.dummy
TEST=./server/autoserv test_suites/control.bvt
Change-Id: Ib6713a5f382665c3e51e54a8c290cb1d1669e258
Reviewed-on: https://gerrit.chromium.org/gerrit/10454
Tested-by: Chris Masone <cmasone@chromium.org>
Reviewed-by: Scott Zawalski <scottz@chromium.org>
Commit-Ready: Chris Masone <cmasone@chromium.org>
diff --git a/server/cros/dynamic_suite.py b/server/cros/dynamic_suite.py
new file mode 100644
index 0000000..066661d
--- /dev/null
+++ b/server/cros/dynamic_suite.py
@@ -0,0 +1,412 @@
+# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import common
+import compiler, logging, os, random, re, time
+from autotest_lib.client.common_lib import control_data, error, utils
+from autotest_lib.server.cros import control_file_getter
+from autotest_lib.server import frontend
+
+
+VERSION_PREFIX = 'cros-version-'
+
+class Reimager(object):
+ """
+ A class that can run jobs to reimage devices.
+
+ @var _afe: a frontend.AFE instance used to talk to autotest.
+ @var _tko: a frontend.TKO instance used to query the autotest results db.
+ @var _cf_getter: a ControlFileGetter used to get the AU control file.
+ """
+
+
+ def __init__(self, autotest_dir, afe=None, tko=None):
+ """
+ Constructor
+
+ @param autotest_dir: the place to find autotests.
+ @param afe: an instance of AFE as defined in server/frontend.py.
+ @param tko: an instance of TKO as defined in server/frontend.py.
+ """
+ self._afe = afe or frontend.AFE(debug=False)
+ self._tko = tko or frontend.TKO(debug=False)
+ self._cf_getter = control_file_getter.FileSystemGetter(
+ [os.path.join(autotest_dir, 'server/site_tests')])
+
+
+ def attempt(self, url, name, num, board, record):
+ """
+ Synchronously attempt to reimage some machines.
+
+ Fire off attempts to reimage |num| machines of type |board|, using an
+ image at |url| called |name|. Wait for completion, polling every
+ 10s, and log results with |record| upon completion.
+
+ @param url: the URL of the image to install.
+ @param name: the name of the image to install (must be unique).
+ @param num: how many devices to reimage.
+ @param board: which kind of devices to reimage.
+ @param record: callable that records job status.
+ prototype:
+ record(status, subdir, name, reason)
+ @return True if all reimaging jobs succeed, false otherwise.
+ """
+ record('START', None, 'try new image')
+ self._ensure_version_label(VERSION_PREFIX+name)
+ canary = self._schedule_reimage_job(url, name, num, board)
+ logging.debug('Created re-imaging job: %d', canary.id)
+ while len(self._afe.get_jobs(id=canary.id, not_yet_run=True)) > 0:
+ time.sleep(10)
+ logging.debug('Re-imaging job running.')
+ while len(self._afe.get_jobs(id=canary.id, finished=True)) == 0:
+ time.sleep(10)
+ logging.debug('Re-imaging job finished.')
+ canary.result = self._afe.poll_job_results(self._tko, canary, 0)
+
+ if canary.result is True:
+ self._report_results(canary, record)
+ record('END GOOD', None, None)
+ return True
+
+ if canary.result is None:
+ record('FAIL', None, canary.name, 're-imaging tasks did not run')
+ else: # canary.result is False
+ self._report_results(canary, record)
+
+ record('END FAIL', None, None)
+ return False
+
+
+ def _ensure_version_label(self, name):
+ """
+ Ensure that a label called |name| exists in the autotest DB.
+
+ @param name: the label to check for/create.
+ """
+ labels = self._afe.get_labels(name=name)
+ if len(labels) == 0:
+ self._afe.create_label(name=name)
+
+
+ def _inject_vars(self, vars, control_file_in):
+ """
+ Inject the contents of |vars| into |control_file_in|
+
+ @param vars: a dict to shoehorn into the provided control file string.
+ @param control_file_in: the contents of a control file to munge.
+ @return the modified control file string.
+ """
+ control_file = ''
+ for key, value in vars.iteritems():
+ control_file += "%s='%s'\n" % (key, value)
+ return control_file + control_file_in
+
+
+ def _schedule_reimage_job(self, url, name, num_machines, board):
+ """
+ Schedules the reimaging of |num_machines| |board| devices with |image|.
+
+ Sends an RPC to the autotest frontend to enqueue reimaging jobs on
+ |num_machines| devices of type |board|
+
+ @param url: the URL of the image to install.
+ @param name: the name of the image to install (must be unique).
+ @param num: how many devices to reimage.
+ @param board: which kind of devices to reimage.
+ @return a frontend.Job object for the reimaging job we scheduled.
+ """
+ control_file = self._inject_vars(
+ { 'image_url': url,
+ 'image_name': name },
+ self._cf_getter.get_control_file_contents_by_name('autoupdate'))
+
+ dargs = { 'control_file': control_file,
+ 'name': name + '-try',
+ 'control_type': 'Server',
+ 'meta_hosts': [board] * num_machines }
+
+ return self._afe.create_job(**dargs)
+
+
+ def _report_results(self, job, record):
+ """
+ Record results from a completed frontend.Job object.
+
+ @param job: a completed frontend.Job object populated by
+ frontend.AFE.poll_job_results.
+ @param record: callable that records job status.
+ prototype:
+ record(status, subdir, name, reason)
+ """
+ if job.result == True:
+ record('GOOD', None, job.name)
+ return
+
+ for platform in job.results_platform_map:
+ for status in job.results_platform_map[platform]:
+ if status == 'Total':
+ continue
+ for host in job.results_platform_map[platform][status]:
+ if host not in job.test_status:
+ record('ERROR', None, host, 'Job failed to run.')
+ elif status == 'Failed':
+ for test_status in job.test_status[host].fail:
+ record('FAIL', None, host, test_status.reason)
+ elif status == 'Aborted':
+ for test_status in job.test_status[host].fail:
+ record('ABORT', None, host, test_status.reason)
+ elif status == 'Completed':
+ record('GOOD', None, host)
+
+
+class Suite(object):
+ """
+ A suite of tests, defined by some predicate over control file variables.
+
+ Given a place to search for control files a predicate to match the desired
+ tests, can gather tests and fire off jobs to run them, and then wait for
+ results.
+
+ @var _predicate: a function that should return True when run over a
+ ControlData representation of a control file that should be in
+ this Suite.
+ @var _tag: a string with which to tag jobs run in this suite.
+ @var _afe: an instance of AFE as defined in server/frontend.py.
+ @var _tko: an instance of TKO as defined in server/frontend.py.
+ @var _jobs: currently scheduled jobs, if any.
+ @var _cf_getter: a control_file_getter.ControlFileGetter
+ """
+
+
+ @classmethod
+ def create_from_name(cls, name, autotest_dir, afe=None, tko=None):
+ """
+ Create a Suite using a predicate based on the SUITE control file var.
+
+ Makes a predicate based on |name| and uses it to instantiate a Suite
+ that looks for tests in |autotest_dir| and will schedule them using
+ |afe|. Results will be pulled from |tko| upon completion
+
+ @param name: a value of the SUITE control file variable to search for.
+ @param autotest_dir: the place to find autotests.
+ @param afe: an instance of AFE as defined in server/frontend.py.
+ @param tko: an instance of TKO as defined in server/frontend.py.
+ @return a Suite instance.
+ """
+ return Suite(lambda t: hasattr(t, 'suite') and t.suite == name,
+ name, autotest_dir, afe, tko)
+
+
+ def __init__(self, predicate, tag, autotest_dir, afe=None, tko=None):
+ """
+ Constructor
+
+ @param predicate: a function that should return True when run over a
+ ControlData representation of a control file that should be in
+ this Suite.
+ @param tag: a string with which to tag jobs run in this suite.
+ @param autotest_dir: the place to find autotests.
+ @param afe: an instance of AFE as defined in server/frontend.py.
+ @param tko: an instance of TKO as defined in server/frontend.py.
+ """
+ self._predicate = predicate
+ self._tag = tag
+ self._afe = afe or frontend.AFE(debug=False)
+ self._tko = tko or frontend.TKO(debug=False)
+ self._jobs = []
+
+ # currently hard-coded places to look for tests.
+ subpaths = [ 'server/site_tests', 'client/site_tests']
+ directories = [ os.path.join(autotest_dir, p) for p in subpaths ]
+ self._cf_getter = control_file_getter.FileSystemGetter(directories)
+
+ self._tests = Suite.find_and_parse_tests(self._cf_getter,
+ self._predicate,
+ add_experimental=True)
+
+
+ @property
+ def tests(self):
+ """
+ A list of ControlData objects in the suite, with added |text| attr.
+ """
+ return self._tests
+
+
+ def stable_tests(self):
+ """
+ |self.tests|, filtered for non-experimental tests.
+ """
+ return filter(lambda t: not t.experimental, self.tests)
+
+
+ def unstable_tests(self):
+ """
+ |self.tests|, filtered for experimental tests.
+ """
+ return filter(lambda t: t.experimental, self.tests)
+
+
+ def _create_job(self, test, image_name):
+ """
+ Thin wrapper around frontend.AFE.create_job().
+
+ @param test: ControlData object for a test to run.
+ @param image_name: the name of an image against which to test.
+ @return frontend.Job object for the job just scheduled.
+ """
+ return self._afe.create_job(
+ control_file=test.text,
+ name='/'.join([image_name, self._tag, test.name]),
+ control_type=test.test_type.capitalize(),
+ meta_hosts=[VERSION_PREFIX+image_name])
+
+
+ def run_and_wait(self, image_name, record, add_experimental=True):
+ """
+ Synchronously run tests in |self.tests|.
+
+ Schedules tests against a device running image |image_name|, and
+ then polls for status, using |record| to print status when each
+ completes.
+
+ Tests returned by self.stable_tests() will always be run, while tests
+ in self.unstable_tests() will only be run if |add_experimental| is true.
+
+ @param image_name: the name of an image against which to test.
+ @param record: callable that records job status.
+ prototype:
+ record(status, subdir, name, reason)
+ @param add_experimental: schedule experimental tests as well, or not.
+ """
+ try:
+ record('START', None, self._tag)
+ self.schedule(image_name, add_experimental)
+ try:
+ for result in self.wait_for_results():
+ record(*result)
+ record('END GOOD', None, None)
+ except Exception as e:
+ logging.error(e)
+ record('END ERROR', None, None, 'Exception waiting for results')
+ except Exception as e:
+ logging.error(e)
+ record('END ERROR', None, None, 'Exception while scheduling suite')
+
+
+ def schedule(self, image_name, add_experimental=True):
+ """
+ Schedule jobs using |self._afe|.
+
+ frontend.Job objects representing each scheduled job will be put in
+ |self._jobs|.
+
+ @param image_name: the name of an image against which to test.
+ @param add_experimental: schedule experimental tests as well, or not.
+ """
+ for test in self.stable_tests():
+ logging.debug('Scheduling %s', test.name)
+ self._jobs.append(self._create_job(test, image_name))
+
+ if add_experimental:
+ # TODO(cmasone): ensure I can log results from these differently.
+ for test in self.unstable_tests():
+ logging.debug('Scheduling %s', test.name)
+ self._jobs.append(self._create_job(test, image_name))
+
+
+ def _status_is_relevant(self, status):
+ """
+ Indicates whether the status of a given test is meaningful or not.
+
+ @param status: frontend.TestStatus object to look at.
+ @return True if this is a test result worth looking at further.
+ """
+ return not (status.test_name.startswith('SERVER_JOB') or
+ status.test_name.startswith('CLIENT_JOB'))
+
+
+ def _collate_aborted(self, current_value, entry):
+ """
+ reduce() over a list of HostQueueEntries for a job; True if any aborted.
+
+ Functor that can be reduced()ed over a list of
+ HostQueueEntries for a job. If any were aborted
+ (|entry.aborted| exists and is True), then the reduce() will
+ return True.
+
+ Ex:
+ entries = self._afe.run('get_host_queue_entries', job=job.id)
+ reduce(self._collate_aborted, entries, False)
+
+ @param current_value: the current accumulator (a boolean).
+ @param entry: the current entry under consideration.
+ @return the value of |entry.aborted| if it exists, False if not.
+ """
+ return current_value or ('aborted' in entry and entry['aborted'])
+
+
+ def wait_for_results(self):
+ """
+ Wait for results of all tests in all jobs in |self._jobs|.
+
+ Currently polls for results every 5s. When all results are available,
+ @return a list of tuples, one per test: (status, subdir, name, reason)
+ """
+ results = []
+ while self._jobs:
+ for job in list(self._jobs):
+ if not self._afe.get_jobs(id=job.id, finished=True):
+ continue
+
+ self._jobs.remove(job)
+
+ entries = self._afe.run('get_host_queue_entries', job=job.id)
+ if reduce(self._collate_aborted, entries, False):
+ results.append(('ABORT', None, job.name))
+ else:
+ statuses = self._tko.get_status_counts(job=job.id)
+ for s in filter(self._status_is_relevant, statuses):
+ results.append((s.status, None, s.test_name, s.reason))
+ time.sleep(5)
+
+ return results
+
+
+ @classmethod
+ def find_and_parse_tests(cls, cf_getter, predicate, add_experimental=False):
+ """
+ Function to scan through all tests and find eligible tests.
+
+ Looks at control files returned by _cf_getter.get_control_file_list()
+ for tests that pass self._predicate().
+
+ @param cf_getter: a control_file_getter.ControlFileGetter used to list
+ and fetch the content of control files
+ @param predicate: a function that should return True when run over a
+ ControlData representation of a control file that should be in
+ this Suite.
+ @param add_experimental: add tests with experimental attribute set.
+
+ @return list of ControlData objects that should be run, with control
+ file text added in |text| attribute.
+ """
+ tests = {}
+ files = cf_getter.get_control_file_list()
+ for file in files:
+ text = cf_getter.get_control_file_contents(file)
+ try:
+ found_test = control_data.parse_control_string(text,
+ raise_warnings=True)
+ if not add_experimental and found_test.experimental:
+ continue
+
+ found_test.text = text
+ tests[file] = found_test
+ except control_data.ControlVariableException, e:
+ logging.warn("Skipping %s\n%s", file, e)
+ except Exception, e:
+ logging.error("Bad %s\n%s", file, e)
+
+ return [test for test in tests.itervalues() if predicate(test)]