[autotest] Create a dynamic bvt suite control file

Adds the dynamic_suite module, uses it to create a dynamic bvt suite.

BUG=chromium-os:21953
TEST=./server/autoserv test_suites/control.dummy
TEST=./server/autoserv test_suites/control.bvt

Change-Id: Ib6713a5f382665c3e51e54a8c290cb1d1669e258
Reviewed-on: https://gerrit.chromium.org/gerrit/10454
Tested-by: Chris Masone <cmasone@chromium.org>
Reviewed-by: Scott Zawalski <scottz@chromium.org>
Commit-Ready: Chris Masone <cmasone@chromium.org>
diff --git a/client/common_lib/control_data.py b/client/common_lib/control_data.py
index dcc49cd..8f6e348 100644
--- a/client/common_lib/control_data.py
+++ b/client/common_lib/control_data.py
@@ -120,6 +120,10 @@
         self._set_int('sync_count', val, min=1)
 
 
+    def set_suite(self, val):
+        self._set_string('suite', val)
+
+
     def set_time(self, val):
         self._set_option('time', val, ['short', 'medium', 'long'])
 
@@ -172,13 +176,24 @@
     return (key, val)
 
 
+def parse_control_string(control, raise_warnings=False):
+    try:
+        mod = compiler.parse(control)
+    except SyntaxError, e:
+        raise ControlVariableException("Error parsing data because %s" % e)
+    return finish_parse(mod, '', raise_warnings)
+
+
 def parse_control(path, raise_warnings=False):
     try:
         mod = compiler.parseFile(path)
     except SyntaxError, e:
         raise ControlVariableException("Error parsing %s because %s" %
                                        (path, e))
+    return finish_parse(mod, path, raise_warnings)
 
+
+def finish_parse(mod, path, raise_warnings):
     assert(mod.__class__ == compiler.ast.Module)
     assert(mod.node.__class__ == compiler.ast.Stmt)
     assert(mod.node.nodes.__class__ == list)
diff --git a/client/site_tests/dummy_Fail/control b/client/site_tests/dummy_Fail/control
index 8adc7e4..725a7e6 100644
--- a/client/site_tests/dummy_Fail/control
+++ b/client/site_tests/dummy_Fail/control
@@ -6,6 +6,7 @@
 NAME = "dummy_Fail"
 PURPOSE = "Demonstrate failure methods of autotests."
 CRITERIA = "This test will never succeed."
+SUITE = "dummy"
 TIME = "SHORT"
 TEST_CATEGORY = "General"
 TEST_CLASS = "dummy"
diff --git a/client/site_tests/dummy_Pass/control b/client/site_tests/dummy_Pass/control
new file mode 100644
index 0000000..43ae8d3
--- /dev/null
+++ b/client/site_tests/dummy_Pass/control
@@ -0,0 +1,21 @@
+# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+AUTHOR = "Chrome OS Team"
+NAME = "dummy_Pass"
+PURPOSE = "Demonstrate success methods of autotests."
+CRITERIA = "This test will always succeed."
+SUITE = "dummy"
+TIME = "SHORT"
+TEST_CATEGORY = "General"
+TEST_CLASS = "dummy"
+TEST_TYPE = "client"
+
+DOC = """
+This is a helper test that will succeed.
+"""
+
+job.run_test('dummy_Pass')
+
+
diff --git a/server/cros/control_file_getter.py b/server/cros/control_file_getter.py
new file mode 100644
index 0000000..fc5ea43
--- /dev/null
+++ b/server/cros/control_file_getter.py
@@ -0,0 +1,99 @@
+# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import common
+import compiler, logging, os, random, re, time
+from autotest_lib.client.common_lib import control_data, error, utils
+
+
+class ControlFileGetter(object):
+    """
+    Interface for classes that can list and fetch known control files.
+
+    @var _CONTROL_PATTERN: control file name format to match.
+    """
+
+    _CONTROL_PATTERN = '^control(?:\..+)?$'
+
+    def __init__(self):
+        pass
+
+
+    def get_control_file_list(self):
+        """
+        Gather a list of paths to control files matching |_CONTROL_PATTERN|.
+
+        @return A list of files that match regexp
+        """
+        pass
+
+
+    def get_control_file_contents(self, test_path):
+        """
+        Given a path to a control file, return its contents.
+
+        @param test_path: the path to the control file
+        @return the contents of the control file specified by the path.
+        """
+        pass
+
+
+    def get_control_file_contents_by_name(self, test_name):
+        """
+        Given the name of a control file, return its contents.
+
+        @param test_name: the path to the control file.
+        @return the contents of the control file specified by the path.
+        """
+        pass
+
+
+    def _is_useful_file(self, name):
+        return '__init__.py' not in name and '.svn' not in name
+
+
+class FileSystemGetter(ControlFileGetter):
+    def __init__(self, paths):
+        """
+        @param paths: base directories to start search.
+        """
+        self._paths = paths
+        self._files = []
+
+
+    def get_control_file_list(self):
+        """
+        Gather a list of paths to control files under |_paths|.
+
+        @return A list of files that match |_CONTROL_PATTERN|.
+        """
+        regexp = re.compile(self._CONTROL_PATTERN)
+        directories = self._paths
+        while len(directories) > 0:
+            directory = directories.pop()
+            if not os.path.exists(directory):
+                continue
+            for name in os.listdir(directory):
+                fullpath = os.path.join(directory, name)
+                if os.path.isfile(fullpath):
+                    if regexp.search(name):
+                        # if we are a control file
+                        self._files.append(fullpath)
+                elif os.path.isdir(fullpath):
+                    directories.append(fullpath)
+        return [f for f in self._files if self._is_useful_file(f)]
+
+
+    def get_control_file_contents(self, test_path):
+        return utils.read_file(test_path)
+
+
+    def get_control_file_contents_by_name(self, test_name):
+        if not self._files:
+            self.get_control_file_list()
+        regexp = re.compile(os.path.join(test_name, 'control'))
+        candidates = filter(regexp.search, self._files)
+        if not candidates or len(candidates) > 1:
+            raise error.TestError(test_name + ' is not unique.')
+        return self.get_control_file_contents(candidates[0])
diff --git a/server/cros/dynamic_suite.py b/server/cros/dynamic_suite.py
new file mode 100644
index 0000000..066661d
--- /dev/null
+++ b/server/cros/dynamic_suite.py
@@ -0,0 +1,412 @@
+# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import common
+import compiler, logging, os, random, re, time
+from autotest_lib.client.common_lib import control_data, error, utils
+from autotest_lib.server.cros import control_file_getter
+from autotest_lib.server import frontend
+
+
+VERSION_PREFIX = 'cros-version-'
+
+class Reimager(object):
+    """
+    A class that can run jobs to reimage devices.
+
+    @var _afe: a frontend.AFE instance used to talk to autotest.
+    @var _tko: a frontend.TKO instance used to query the autotest results db.
+    @var _cf_getter: a ControlFileGetter used to get the AU control file.
+    """
+
+
+    def __init__(self, autotest_dir, afe=None, tko=None):
+        """
+        Constructor
+
+        @param autotest_dir: the place to find autotests.
+        @param afe: an instance of AFE as defined in server/frontend.py.
+        @param tko: an instance of TKO as defined in server/frontend.py.
+        """
+        self._afe = afe or frontend.AFE(debug=False)
+        self._tko = tko or frontend.TKO(debug=False)
+        self._cf_getter = control_file_getter.FileSystemGetter(
+            [os.path.join(autotest_dir, 'server/site_tests')])
+
+
+    def attempt(self, url, name, num, board, record):
+        """
+        Synchronously attempt to reimage some machines.
+
+        Fire off attempts to reimage |num| machines of type |board|, using an
+        image at |url| called |name|.  Wait for completion, polling every
+        10s, and log results with |record| upon completion.
+
+        @param url: the URL of the image to install.
+        @param name: the name of the image to install (must be unique).
+        @param num: how many devices to reimage.
+        @param board: which kind of devices to reimage.
+        @param record: callable that records job status.
+                 prototype:
+                   record(status, subdir, name, reason)
+        @return True if all reimaging jobs succeed, false otherwise.
+        """
+        record('START', None, 'try new image')
+        self._ensure_version_label(VERSION_PREFIX+name)
+        canary = self._schedule_reimage_job(url, name, num, board)
+        logging.debug('Created re-imaging job: %d', canary.id)
+        while len(self._afe.get_jobs(id=canary.id, not_yet_run=True)) > 0:
+            time.sleep(10)
+        logging.debug('Re-imaging job running.')
+        while len(self._afe.get_jobs(id=canary.id, finished=True)) == 0:
+            time.sleep(10)
+        logging.debug('Re-imaging job finished.')
+        canary.result = self._afe.poll_job_results(self._tko, canary, 0)
+
+        if canary.result is True:
+            self._report_results(canary, record)
+            record('END GOOD', None, None)
+            return True
+
+        if canary.result is None:
+            record('FAIL', None, canary.name, 're-imaging tasks did not run')
+        else:  # canary.result is False
+            self._report_results(canary, record)
+
+        record('END FAIL', None, None)
+        return False
+
+
+    def _ensure_version_label(self, name):
+        """
+        Ensure that a label called |name| exists in the autotest DB.
+
+        @param name: the label to check for/create.
+        """
+        labels = self._afe.get_labels(name=name)
+        if len(labels) == 0:
+            self._afe.create_label(name=name)
+
+
+    def _inject_vars(self, vars, control_file_in):
+        """
+        Inject the contents of |vars| into |control_file_in|
+
+        @param vars: a dict to shoehorn into the provided control file string.
+        @param control_file_in: the contents of a control file to munge.
+        @return the modified control file string.
+        """
+        control_file = ''
+        for key, value in vars.iteritems():
+            control_file += "%s='%s'\n" % (key, value)
+        return control_file + control_file_in
+
+
+    def _schedule_reimage_job(self, url, name, num_machines, board):
+        """
+        Schedules the reimaging of |num_machines| |board| devices with |image|.
+
+        Sends an RPC to the autotest frontend to enqueue reimaging jobs on
+        |num_machines| devices of type |board|
+
+        @param url: the URL of the image to install.
+        @param name: the name of the image to install (must be unique).
+        @param num: how many devices to reimage.
+        @param board: which kind of devices to reimage.
+        @return a frontend.Job object for the reimaging job we scheduled.
+        """
+        control_file = self._inject_vars(
+            { 'image_url': url,
+              'image_name': name },
+            self._cf_getter.get_control_file_contents_by_name('autoupdate'))
+
+        dargs = { 'control_file': control_file,
+                  'name': name + '-try',
+                  'control_type': 'Server',
+                  'meta_hosts': [board] * num_machines }
+
+        return self._afe.create_job(**dargs)
+
+
+    def _report_results(self, job, record):
+        """
+        Record results from a completed frontend.Job object.
+
+        @param job: a completed frontend.Job object populated by
+               frontend.AFE.poll_job_results.
+        @param record: callable that records job status.
+               prototype:
+                 record(status, subdir, name, reason)
+        """
+        if job.result == True:
+            record('GOOD', None, job.name)
+            return
+
+        for platform in job.results_platform_map:
+            for status in job.results_platform_map[platform]:
+                if status == 'Total':
+                    continue
+                for host in job.results_platform_map[platform][status]:
+                    if host not in job.test_status:
+                        record('ERROR', None, host, 'Job failed to run.')
+                    elif status == 'Failed':
+                        for test_status in job.test_status[host].fail:
+                            record('FAIL', None, host, test_status.reason)
+                    elif status == 'Aborted':
+                        for test_status in job.test_status[host].fail:
+                            record('ABORT', None, host, test_status.reason)
+                    elif status == 'Completed':
+                        record('GOOD', None, host)
+
+
+class Suite(object):
+    """
+    A suite of tests, defined by some predicate over control file variables.
+
+    Given a place to search for control files a predicate to match the desired
+    tests, can gather tests and fire off jobs to run them, and then wait for
+    results.
+
+    @var _predicate: a function that should return True when run over a
+         ControlData representation of a control file that should be in
+         this Suite.
+    @var _tag: a string with which to tag jobs run in this suite.
+    @var _afe: an instance of AFE as defined in server/frontend.py.
+    @var _tko: an instance of TKO as defined in server/frontend.py.
+    @var _jobs: currently scheduled jobs, if any.
+    @var _cf_getter: a control_file_getter.ControlFileGetter
+    """
+
+
+    @classmethod
+    def create_from_name(cls, name, autotest_dir, afe=None, tko=None):
+        """
+        Create a Suite using a predicate based on the SUITE control file var.
+
+        Makes a predicate based on |name| and uses it to instantiate a Suite
+        that looks for tests in |autotest_dir| and will schedule them using
+        |afe|.  Results will be pulled from |tko| upon completion
+
+        @param name: a value of the SUITE control file variable to search for.
+        @param autotest_dir: the place to find autotests.
+        @param afe: an instance of AFE as defined in server/frontend.py.
+        @param tko: an instance of TKO as defined in server/frontend.py.
+        @return a Suite instance.
+        """
+        return Suite(lambda t: hasattr(t, 'suite') and t.suite == name,
+                     name, autotest_dir, afe, tko)
+
+
+    def __init__(self, predicate, tag, autotest_dir, afe=None, tko=None):
+        """
+        Constructor
+
+        @param predicate: a function that should return True when run over a
+               ControlData representation of a control file that should be in
+               this Suite.
+        @param tag: a string with which to tag jobs run in this suite.
+        @param autotest_dir: the place to find autotests.
+        @param afe: an instance of AFE as defined in server/frontend.py.
+        @param tko: an instance of TKO as defined in server/frontend.py.
+        """
+        self._predicate = predicate
+        self._tag = tag
+        self._afe = afe or frontend.AFE(debug=False)
+        self._tko = tko or frontend.TKO(debug=False)
+        self._jobs = []
+
+        # currently hard-coded places to look for tests.
+        subpaths = [ 'server/site_tests', 'client/site_tests']
+        directories = [ os.path.join(autotest_dir, p) for p in subpaths ]
+        self._cf_getter = control_file_getter.FileSystemGetter(directories)
+
+        self._tests = Suite.find_and_parse_tests(self._cf_getter,
+                                                 self._predicate,
+                                                 add_experimental=True)
+
+
+    @property
+    def tests(self):
+        """
+        A list of ControlData objects in the suite, with added |text| attr.
+        """
+        return self._tests
+
+
+    def stable_tests(self):
+        """
+        |self.tests|, filtered for non-experimental tests.
+        """
+        return filter(lambda t: not t.experimental, self.tests)
+
+
+    def unstable_tests(self):
+        """
+        |self.tests|, filtered for experimental tests.
+        """
+        return filter(lambda t: t.experimental, self.tests)
+
+
+    def _create_job(self, test, image_name):
+        """
+        Thin wrapper around frontend.AFE.create_job().
+
+        @param test: ControlData object for a test to run.
+        @param image_name: the name of an image against which to test.
+        @return frontend.Job object for the job just scheduled.
+        """
+        return self._afe.create_job(
+            control_file=test.text,
+            name='/'.join([image_name, self._tag, test.name]),
+            control_type=test.test_type.capitalize(),
+            meta_hosts=[VERSION_PREFIX+image_name])
+
+
+    def run_and_wait(self, image_name, record, add_experimental=True):
+        """
+        Synchronously run tests in |self.tests|.
+
+        Schedules tests against a device running image |image_name|, and
+        then polls for status, using |record| to print status when each
+        completes.
+
+        Tests returned by self.stable_tests() will always be run, while tests
+        in self.unstable_tests() will only be run if |add_experimental| is true.
+
+        @param image_name: the name of an image against which to test.
+        @param record: callable that records job status.
+                 prototype:
+                   record(status, subdir, name, reason)
+        @param add_experimental: schedule experimental tests as well, or not.
+        """
+        try:
+            record('START', None, self._tag)
+            self.schedule(image_name, add_experimental)
+            try:
+                for result in self.wait_for_results():
+                    record(*result)
+                record('END GOOD', None, None)
+            except Exception as e:
+                logging.error(e)
+                record('END ERROR', None, None, 'Exception waiting for results')
+        except Exception as e:
+            logging.error(e)
+            record('END ERROR', None, None, 'Exception while scheduling suite')
+
+
+    def schedule(self, image_name, add_experimental=True):
+        """
+        Schedule jobs using |self._afe|.
+
+        frontend.Job objects representing each scheduled job will be put in
+        |self._jobs|.
+
+        @param image_name: the name of an image against which to test.
+        @param add_experimental: schedule experimental tests as well, or not.
+        """
+        for test in self.stable_tests():
+            logging.debug('Scheduling %s', test.name)
+            self._jobs.append(self._create_job(test, image_name))
+
+        if add_experimental:
+            # TODO(cmasone): ensure I can log results from these differently.
+            for test in self.unstable_tests():
+                logging.debug('Scheduling %s', test.name)
+                self._jobs.append(self._create_job(test, image_name))
+
+
+    def _status_is_relevant(self, status):
+        """
+        Indicates whether the status of a given test is meaningful or not.
+
+        @param status: frontend.TestStatus object to look at.
+        @return True if this is a test result worth looking at further.
+        """
+        return not (status.test_name.startswith('SERVER_JOB') or
+                    status.test_name.startswith('CLIENT_JOB'))
+
+
+    def _collate_aborted(self, current_value, entry):
+        """
+        reduce() over a list of HostQueueEntries for a job; True if any aborted.
+
+        Functor that can be reduced()ed over a list of
+        HostQueueEntries for a job.  If any were aborted
+        (|entry.aborted| exists and is True), then the reduce() will
+        return True.
+
+        Ex:
+            entries = self._afe.run('get_host_queue_entries', job=job.id)
+            reduce(self._collate_aborted, entries, False)
+
+        @param current_value: the current accumulator (a boolean).
+        @param entry: the current entry under consideration.
+        @return the value of |entry.aborted| if it exists, False if not.
+        """
+        return current_value or ('aborted' in entry and entry['aborted'])
+
+
+    def wait_for_results(self):
+        """
+        Wait for results of all tests in all jobs in |self._jobs|.
+
+        Currently polls for results every 5s.  When all results are available,
+        @return a list of tuples, one per test: (status, subdir, name, reason)
+        """
+        results = []
+        while self._jobs:
+            for job in list(self._jobs):
+                if not self._afe.get_jobs(id=job.id, finished=True):
+                    continue
+
+                self._jobs.remove(job)
+
+                entries = self._afe.run('get_host_queue_entries', job=job.id)
+                if reduce(self._collate_aborted, entries, False):
+                    results.append(('ABORT', None, job.name))
+                else:
+                    statuses = self._tko.get_status_counts(job=job.id)
+                    for s in filter(self._status_is_relevant, statuses):
+                        results.append((s.status, None, s.test_name, s.reason))
+            time.sleep(5)
+
+        return results
+
+
+    @classmethod
+    def find_and_parse_tests(cls, cf_getter, predicate, add_experimental=False):
+        """
+        Function to scan through all tests and find eligible tests.
+
+        Looks at control files returned by _cf_getter.get_control_file_list()
+        for tests that pass self._predicate().
+
+        @param cf_getter: a control_file_getter.ControlFileGetter used to list
+               and fetch the content of control files
+        @param predicate: a function that should return True when run over a
+               ControlData representation of a control file that should be in
+               this Suite.
+        @param add_experimental: add tests with experimental attribute set.
+
+        @return list of ControlData objects that should be run, with control
+                file text added in |text| attribute.
+        """
+        tests = {}
+        files = cf_getter.get_control_file_list()
+        for file in files:
+            text = cf_getter.get_control_file_contents(file)
+            try:
+                found_test = control_data.parse_control_string(text,
+                                                            raise_warnings=True)
+                if not add_experimental and found_test.experimental:
+                    continue
+
+                found_test.text = text
+                tests[file] = found_test
+            except control_data.ControlVariableException, e:
+                logging.warn("Skipping %s\n%s", file, e)
+            except Exception, e:
+                logging.error("Bad %s\n%s", file, e)
+
+        return [test for test in tests.itervalues() if predicate(test)]
diff --git a/server/cros/dynamic_suite_unittest.py b/server/cros/dynamic_suite_unittest.py
new file mode 100755
index 0000000..226657e
--- /dev/null
+++ b/server/cros/dynamic_suite_unittest.py
@@ -0,0 +1,489 @@
+#!/usr/bin/python
+#
+# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Unit tests for server/cros/dynamic_suite.py."""
+
+import logging
+import mox
+import shutil
+import tempfile
+import time
+import unittest
+
+from autotest_lib.client.common_lib import base_job, control_data
+from autotest_lib.server.cros import control_file_getter, dynamic_suite
+from autotest_lib.server import frontend
+
+class FakeJob(object):
+    """Faked out RPC-client-side Job object."""
+    def __init__(self, id=0, statuses=[]):
+        self.id = id
+        self.name = 'Fake Job %d' % self.id
+        self.statuses = statuses
+
+
+class ReimagerTest(mox.MoxTestBase):
+    """Unit tests for dynamic_suite.Reimager.
+
+    @var _URL: fake image url
+    @var _NAME: fake image name
+    @var _NUM: fake number of machines to run on
+    @var _BOARD: fake board to reimage
+    """
+
+    _URL = 'http://nothing'
+    _NAME = 'name'
+    _NUM = 4
+    _BOARD = 'board'
+
+
+    def setUp(self):
+        super(ReimagerTest, self).setUp()
+        self.afe = self.mox.CreateMock(frontend.AFE)
+        self.tko = self.mox.CreateMock(frontend.TKO)
+        self.reimager = dynamic_suite.Reimager('', afe=self.afe, tko=self.tko)
+
+
+    def testEnsureVersionLabelAlreadyExists(self):
+        """Should not create a label if it already exists."""
+        name = 'label'
+        self.afe.get_labels(name=name).AndReturn([name])
+        self.mox.ReplayAll()
+        self.reimager._ensure_version_label(name)
+
+
+    def testEnsureVersionLabel(self):
+        """Should create a label if it doesn't already exist."""
+        name = 'label'
+        self.afe.get_labels(name=name).AndReturn([])
+        self.afe.create_label(name=name)
+        self.mox.ReplayAll()
+        self.reimager._ensure_version_label(name)
+
+
+    def testInjectVars(self):
+        """Should inject dict of varibles into provided strings."""
+        def find_all_in(d, s):
+            """Returns true if all key-value pairs in |d| are printed in |s|."""
+            return reduce(lambda b,i: "%s='%s'\n" % i in s, d.iteritems(), True)
+
+        v = {'v1': 'one', 'v2': 'two'}
+        self.assertTrue(find_all_in(v, self.reimager._inject_vars(v, '')))
+        self.assertTrue(find_all_in(v, self.reimager._inject_vars(v, 'ctrl')))
+
+
+    def testReportResultsGood(self):
+        """Should report results in the case where all jobs passed."""
+        job = self.mox.CreateMock(frontend.Job)
+        job.name = 'RPC Client job'
+        job.result = True
+        recorder = self.mox.CreateMock(base_job.base_job)
+        recorder.record('GOOD', mox.IgnoreArg(), job.name)
+        self.mox.ReplayAll()
+        self.reimager._report_results(job, recorder.record)
+
+
+    def testReportResultsBad(self):
+        """Should report results in various job failure cases.
+
+        In this test scenario, there are five hosts, all the 'netbook' platform.
+
+        h1: Did not run
+        h2: Two failed tests
+        h3: Two aborted tests
+        h4: completed, GOOD
+        h5: completed, GOOD
+        """
+        H1 = 'host1'
+        H2 = 'host2'
+        H3 = 'host3'
+        H4 = 'host4'
+        H5 = 'host5'
+
+        class FakeResult(object):
+            def __init__(self, reason):
+                self.reason = reason
+
+
+        # The RPC-client-side Job object that is annotated with results.
+        job = FakeJob()
+        job.result = None  # job failed, there are results to report.
+
+        # The semantics of |results_platform_map| and |test_results| are
+        # drawn from frontend.AFE.poll_all_jobs()
+        job.results_platform_map = {'netbook': {'Aborted' : [H3],
+                                                  'Completed' : [H1, H4, H5],
+                                                  'Failed':     [H2]
+                                                }
+                                    }
+        # Gin up fake results for H2 and H3 failure cases.
+        h2 = frontend.TestResults()
+        h2.fail = [FakeResult('a'), FakeResult('b')]
+        h3 = frontend.TestResults()
+        h3.fail = [FakeResult('a'), FakeResult('b')]
+        # Skipping H1 in |test_status| dict means that it did not get run.
+        job.test_status = {H2: h2, H3: h3, H4: {}, H5: {}}
+
+        # Set up recording expectations.
+        rjob = self.mox.CreateMock(base_job.base_job)
+        for res in h2.fail:
+            rjob.record('FAIL', mox.IgnoreArg(), H2, res.reason).InAnyOrder()
+        for res in h3.fail:
+            rjob.record('ABORT', mox.IgnoreArg(), H3, res.reason).InAnyOrder()
+        rjob.record('GOOD', mox.IgnoreArg(), H4).InAnyOrder()
+        rjob.record('GOOD', mox.IgnoreArg(), H5).InAnyOrder()
+        rjob.record(
+            'ERROR', mox.IgnoreArg(), H1, mox.IgnoreArg()).InAnyOrder()
+
+        self.mox.ReplayAll()
+        self.reimager._report_results(job, rjob.record)
+
+
+    def testScheduleJob(self):
+        """Should be able to create a job with the AFE."""
+        # Fake out getting the autoupdate control file contents.
+        cf_getter = self.mox.CreateMock(control_file_getter.ControlFileGetter)
+        cf_getter.get_control_file_contents_by_name('autoupdate').AndReturn('')
+        self.reimager._cf_getter = cf_getter
+
+        self.afe.create_job(
+            control_file=mox.And(mox.StrContains(self._NAME),
+                                 mox.StrContains(self._URL)),
+            name=mox.StrContains(self._NAME),
+            control_type='Server',
+            meta_hosts=[self._BOARD] * self._NUM)
+        self.mox.ReplayAll()
+        self.reimager._schedule_reimage_job(self._URL, self._NAME,
+                                            self._NUM, self._BOARD)
+
+
+    def expect_attempt(self, success):
+        """Sets up |self.reimager| to expect an attempt() that returns |success|
+
+        @param success the value returned by poll_job_results()
+        @return a FakeJob configured with appropriate expectations
+        """
+        canary = FakeJob()
+        self.mox.StubOutWithMock(self.reimager, '_ensure_version_label')
+        self.reimager._ensure_version_label(mox.StrContains(self._NAME))
+
+        self.mox.StubOutWithMock(self.reimager, '_schedule_reimage_job')
+        self.reimager._schedule_reimage_job(self._URL,
+                                            self._NAME,
+                                            self._NUM,
+                                            self._BOARD).AndReturn(canary)
+        if success is not None:
+            self.mox.StubOutWithMock(self.reimager, '_report_results')
+            self.reimager._report_results(canary, mox.IgnoreArg())
+
+        self.afe.get_jobs(id=canary.id, not_yet_run=True).AndReturn([])
+        self.afe.get_jobs(id=canary.id, finished=True).AndReturn([canary])
+        self.afe.poll_job_results(mox.IgnoreArg(), canary, 0).AndReturn(success)
+
+        return canary
+
+
+    def testSuccessfulReimage(self):
+        """Should attempt a reimage and record success."""
+        canary = self.expect_attempt(True)
+
+        rjob = self.mox.CreateMock(base_job.base_job)
+        rjob.record('START', mox.IgnoreArg(), mox.IgnoreArg())
+        rjob.record('END GOOD', mox.IgnoreArg(), mox.IgnoreArg())
+        self.mox.ReplayAll()
+        self.reimager.attempt(self._URL, self._NAME,
+                              self._NUM, self._BOARD, rjob.record)
+
+
+    def testFailedReimage(self):
+        """Should attempt a reimage and record failure."""
+        canary = self.expect_attempt(False)
+
+        rjob = self.mox.CreateMock(base_job.base_job)
+        rjob.record('START', mox.IgnoreArg(), mox.IgnoreArg())
+        rjob.record('END FAIL', mox.IgnoreArg(), mox.IgnoreArg())
+        self.mox.ReplayAll()
+        self.reimager.attempt(self._URL, self._NAME,
+                              self._NUM, self._BOARD, rjob.record)
+
+
+    def testReimageThatNeverHappened(self):
+        """Should attempt a reimage and record that it didn't run."""
+        canary = self.expect_attempt(None)
+
+        rjob = self.mox.CreateMock(base_job.base_job)
+        rjob.record('START', mox.IgnoreArg(), mox.IgnoreArg())
+        rjob.record('FAIL', mox.IgnoreArg(), canary.name, mox.IgnoreArg())
+        rjob.record('END FAIL', mox.IgnoreArg(), mox.IgnoreArg())
+        self.mox.ReplayAll()
+        self.reimager.attempt(self._URL, self._NAME,
+                              self._NUM, self._BOARD, rjob.record)
+
+
+class SuiteTest(mox.MoxTestBase):
+    """Unit tests for dynamic_suite.Suite.
+
+    @var _NAME: fake image name
+    @var _TAG: fake suite tag
+    """
+
+    _NAME = 'name'
+    _TAG = 'suite tag'
+
+
+    def setUp(self):
+        class FakeControlData(object):
+            """A fake parsed control file data structure."""
+            def __init__(self, data, expr=False):
+                self.string = 'text-' + data
+                self.name = 'name-' + data
+                self.data = data
+                self.test_type = 'Client'
+                self.experimental = expr
+
+
+        super(SuiteTest, self).setUp()
+        self.afe = self.mox.CreateMock(frontend.AFE)
+        self.tko = self.mox.CreateMock(frontend.TKO)
+
+        self.tmpdir = tempfile.mkdtemp(suffix=type(self).__name__)
+
+        self.getter = self.mox.CreateMock(control_file_getter.ControlFileGetter)
+
+        self.files = {'one': FakeControlData('data_one', expr=True),
+                      'two': FakeControlData('data_two'),
+                      'three': FakeControlData('data_three')}
+
+
+    def tearDown(self):
+        super(SuiteTest, self).tearDown()
+        shutil.rmtree(self.tmpdir, ignore_errors=True)
+
+
+    def expect_control_file_parsing(self):
+        """Expect an attempt to parse the 'control files' in |self.files|."""
+        self.getter.get_control_file_list().AndReturn(self.files.keys())
+        self.mox.StubOutWithMock(control_data, 'parse_control_string')
+        for file, data in self.files.iteritems():
+            self.getter.get_control_file_contents(file).AndReturn(data.string)
+            control_data.parse_control_string(
+                data.string, raise_warnings=True).AndReturn(data)
+
+
+    def testFindAndParseStableTests(self):
+        """Should find only non-experimental tests that match a predicate."""
+        self.expect_control_file_parsing()
+        self.mox.ReplayAll()
+
+        predicate = lambda d: d.text == self.files['two'].string
+        tests = dynamic_suite.Suite.find_and_parse_tests(self.getter, predicate)
+        self.assertEquals(len(tests), 1)
+        self.assertEquals(tests[0], self.files['two'])
+
+
+    def testFindAndParseTests(self):
+        """Should find all tests that match a predicate."""
+        self.expect_control_file_parsing()
+        self.mox.ReplayAll()
+
+        predicate = lambda d: d.text != self.files['two'].string
+        tests = dynamic_suite.Suite.find_and_parse_tests(self.getter,
+                                                         predicate,
+                                                         add_experimental=True)
+        self.assertEquals(len(tests), 2)
+        self.assertTrue(self.files['one'] in tests)
+        self.assertTrue(self.files['three'] in tests)
+
+
+    def mock_control_file_parsing(self):
+        """Fake out find_and_parse_tests(), returning content from |self.files|.
+        """
+        for test in self.files.values():
+            test.text = test.string  # mimic parsing.
+        self.mox.StubOutWithMock(dynamic_suite.Suite, 'find_and_parse_tests')
+        dynamic_suite.Suite.find_and_parse_tests(
+            mox.IgnoreArg(),
+            mox.IgnoreArg(),
+            add_experimental=True).AndReturn(self.files.values())
+
+
+    def testStableUnstableFilter(self):
+        """Should distinguish between experimental and stable tests."""
+        self.mock_control_file_parsing()
+        self.mox.ReplayAll()
+        suite = dynamic_suite.Suite.create_from_name(self._TAG, self.tmpdir,
+                                                     self.afe, self.tko)
+
+        self.assertTrue(self.files['one'] in suite.tests)
+        self.assertTrue(self.files['two'] in suite.tests)
+        self.assertTrue(self.files['one'] in suite.unstable_tests())
+        self.assertTrue(self.files['two'] in suite.stable_tests())
+        self.assertFalse(self.files['one'] in suite.stable_tests())
+        self.assertFalse(self.files['two'] in suite.unstable_tests())
+
+
+    def expect_job_scheduling(self, add_experimental):
+        """Expect jobs to be scheduled for 'tests' in |self.files|.
+
+        @param add_experimental: expect jobs for experimental tests as well.
+        """
+        for test in self.files.values():
+            if not add_experimental and test.experimental:
+                continue
+            self.afe.create_job(
+                control_file=test.text,
+                name=mox.And(mox.StrContains(self._NAME),
+                             mox.StrContains(test.name)),
+                control_type=mox.IgnoreArg(),
+                meta_hosts=[dynamic_suite.VERSION_PREFIX+self._NAME])
+
+
+    def testScheduleTests(self):
+        """Should schedule stable and experimental tests with the AFE."""
+        self.mock_control_file_parsing()
+        self.expect_job_scheduling(add_experimental=True)
+
+        self.mox.ReplayAll()
+        suite = dynamic_suite.Suite.create_from_name(self._TAG, self.tmpdir,
+                                                     self.afe, self.tko)
+        suite.schedule(self._NAME)
+
+
+    def testScheduleStableTests(self):
+        """Should schedule only stable tests with the AFE."""
+        self.mock_control_file_parsing()
+        self.expect_job_scheduling(add_experimental=False)
+
+        self.mox.ReplayAll()
+        suite = dynamic_suite.Suite.create_from_name(self._TAG, self.tmpdir,
+                                                     self.afe, self.tko)
+        suite.schedule(self._NAME, add_experimental=False)
+
+
+    def expect_result_gathering(self, job):
+        self.afe.get_jobs(id=job.id, finished=True).AndReturn(job)
+        entries = map(lambda s: s.entry, job.statuses)
+        self.afe.run('get_host_queue_entries',
+                     job=job.id).AndReturn(entries)
+        if True not in map(lambda e: 'aborted' in e and e['aborted'], entries):
+            self.tko.get_status_counts(job=job.id).AndReturn(job.statuses)
+
+
+    def testWaitForResults(self):
+        """Should gather status and return records for job summaries."""
+        class FakeStatus(object):
+            """Fake replacement for server-side job status objects.
+
+            @var status: 'GOOD', 'FAIL', 'ERROR', etc.
+            @var test_name: name of the test this is status for
+            @var reason: reason for failure, if any
+            @var aborted: present and True if the job was aborted.  Optional.
+            """
+            def __init__(self, code, name, reason, aborted=None):
+                self.status = code
+                self.test_name = name
+                self.reason = reason
+                self.entry = {}
+                if aborted:
+                    self.entry['aborted'] = True
+
+            def equals_record(self, args):
+                """Compares this object to a recorded status."""
+                return self._equals_record(*args)
+
+            def _equals_record(self, status, subdir, name, reason):
+                """Compares this object and fields of recorded status."""
+                if 'aborted' in self.entry and self.entry['aborted']:
+                    return status == 'ABORT'
+                return (self.status == status and
+                        self.test_name == name and
+                        self.reason == reason)
+
+
+        jobs = [FakeJob(0, [FakeStatus('GOOD', 'T0', ''),
+                            FakeStatus('GOOD', 'T1', '')]),
+                FakeJob(1, [FakeStatus('ERROR', 'T0', 'err', False),
+                            FakeStatus('GOOD', 'T1', '')]),
+                FakeJob(2, [FakeStatus('TEST_NA', 'T0', 'no')]),
+                FakeJob(2, [FakeStatus('FAIL', 'T0', 'broken')]),
+                FakeJob(3, [FakeStatus('ERROR', 'T0', 'gah', True)])]
+        # To simulate a job that isn't ready the first time we check.
+        self.afe.get_jobs(id=jobs[0].id, finished=True).AndReturn([])
+        # Expect all the rest of the jobs to be good to go the first time.
+        for job in jobs[1:]:
+            self.expect_result_gathering(job)
+        # Then, expect job[0] to be ready.
+        self.expect_result_gathering(jobs[0])
+        # Expect us to poll twice.
+        self.mox.StubOutWithMock(time, 'sleep')
+        time.sleep(5)
+        time.sleep(5)
+        self.mox.ReplayAll()
+
+        suite = dynamic_suite.Suite.create_from_name(self._TAG, self.tmpdir,
+                                                     self.afe, self.tko)
+        suite._jobs = list(jobs)
+        results = suite.wait_for_results()
+        for job in jobs:
+            for status in job.statuses:
+                self.assertTrue(True in map(status.equals_record, results))
+
+
+    def testRunAndWaitSuccess(self):
+        """Should record successful results."""
+        results = [('GOOD', None, 'good'), ('FAIL', None, 'bad', 'reason')]
+
+        recorder = self.mox.CreateMock(base_job.base_job)
+        recorder.record('START', mox.IgnoreArg(), self._TAG)
+        for result in results:
+            recorder.record(*result).InAnyOrder('results')
+        recorder.record('END GOOD', mox.IgnoreArg(), mox.IgnoreArg())
+
+        suite = dynamic_suite.Suite.create_from_name(self._TAG, self.tmpdir,
+                                                     self.afe, self.tko)
+        self.mox.StubOutWithMock(suite, 'schedule')
+        suite.schedule(self._NAME, True)
+        self.mox.StubOutWithMock(suite, 'wait_for_results')
+        suite.wait_for_results().AndReturn(results)
+        self.mox.ReplayAll()
+
+        suite.run_and_wait(self._NAME, recorder.record, True)
+
+
+    def testRunAndWaitFailure(self):
+        """Should record failure to gather results."""
+        recorder = self.mox.CreateMock(base_job.base_job)
+        recorder.record('START', mox.IgnoreArg(), self._TAG)
+        recorder.record('END ERROR', None, None, mox.StrContains('waiting'))
+
+        suite = dynamic_suite.Suite.create_from_name(self._TAG, self.tmpdir,
+                                                     self.afe, self.tko)
+        self.mox.StubOutWithMock(suite, 'schedule')
+        suite.schedule(self._NAME, True)
+        self.mox.StubOutWithMock(suite, 'wait_for_results')
+        suite.wait_for_results().AndRaise(Exception())
+        self.mox.ReplayAll()
+
+        suite.run_and_wait(self._NAME, recorder.record, True)
+
+
+    def testRunAndWaitScheduleFailure(self):
+        """Should record failure to gather results."""
+        recorder = self.mox.CreateMock(base_job.base_job)
+        recorder.record('START', mox.IgnoreArg(), self._TAG)
+        recorder.record('END ERROR', None, None, mox.StrContains('scheduling'))
+
+        suite = dynamic_suite.Suite.create_from_name(self._TAG, self.tmpdir,
+                                                     self.afe, self.tko)
+        self.mox.StubOutWithMock(suite, 'schedule')
+        suite.schedule(self._NAME, True).AndRaise(Exception())
+        self.mox.ReplayAll()
+
+        suite.run_and_wait(self._NAME, recorder.record, True)
+
+
+if __name__ == '__main__':
+  unittest.main()
diff --git a/server/frontend.py b/server/frontend.py
index ca22d0d..beb00a5 100644
--- a/server/frontend.py
+++ b/server/frontend.py
@@ -435,8 +435,7 @@
         results = []
         for job in jobs:
             if getattr(job, 'result', None) is None:
-                enough = lambda x, y: x + 1 >= y
-                job.result = self.poll_job_results(tko, job, enough)
+                job.result = self.poll_job_results(tko, job)
                 if job.result is not None:
                     self.result_notify(job, email_from, email_to)
 
@@ -606,24 +605,22 @@
         if self.job:
             self.job.record(result, None, testname, status='')
 
-
-    def poll_job_results(self, tko, job, enough_completed, debug=False):
+    def poll_job_results(self, tko, job, enough=1, debug=False):
         """
         Analyse all job results by platform
 
           params:
             tko: a TKO object representing the results DB.
             job: the job to be examined.
-            enough_completed: a predicate that takes the number of completed
-                              tests and the total number of tests and returns
-                              True if enough have completed, False if not.
+            enough: the acceptable delta between the number of completed
+                    tests and the total number of tests.
             debug: enable debugging output.
 
           returns:
-            False: if any platform has more than |enough_completed| failures
-            None:  if any platform has less than |enough_completed| machines
+            False: if any platform has more than |enough| failures
+            None:  if any platform has less than |enough| machines
                    not yet Good.
-            True:  if all platforms have at least |enough_completed| machines
+            True:  if all platforms have at least |enough| machines
                    Good.
         """
         self._job_test_results(tko, job, debug)
@@ -654,14 +651,13 @@
                 else:
                     platform_test_result = 'WARN'
 
-            if aborted > 1:
+            if aborted > enough:
                 aborted_platforms.append(platform)
                 self.set_platform_results(job, platform, platform_test_result)
-            elif (failed * 2 >= total) or (failed > 1):
+            elif (failed * 2 >= total) or (failed > enough):
                 failed_platforms.append(platform)
                 self.set_platform_results(job, platform, platform_test_result)
-            elif (completed >= 1) and enough_completed(completed, total):
-                # if all or all but one are good, call the job good.
+            elif (completed >= enough) and (completed + enough >= total):
                 good_platforms.append(platform)
                 self.set_platform_results(job, platform, 'GOOD')
             else:
diff --git a/server/site_tests/autoupdate/control b/server/site_tests/autoupdate/control
new file mode 100644
index 0000000..7c4c347
--- /dev/null
+++ b/server/site_tests/autoupdate/control
@@ -0,0 +1,40 @@
+# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+AUTHOR = "Chromium OS"
+NAME = "Auto Update Host"
+TIME = "MEDIUM"
+TEST_CATEGORY = "Functional"
+TEST_CLASS = "platform"
+TEST_TYPE = "server"
+
+DOC = """
+
+"""
+
+from autotest_lib.server import frontend
+
+vers = 'cros-version-'
+try:
+    from autotest_lib.server.cros import dynamic_suite
+    vers = dynamic_suite.VERSION_PREFIX
+except:
+    pass
+
+AFE = frontend.AFE(debug=False)
+
+def clear_version_labels(machine):
+    labels = AFE.get_labels(name__startswith=vers)
+    for label in labels: label.remove_hosts(hosts=[machine])
+
+
+def run(machine):
+    clear_version_labels(machine)
+    host = hosts.create_host(machine, initialize=False)
+    if job.run_test('autoupdate', host=host, update_url=image_url):
+        label = AFE.get_labels(name=vers+image_name)[0]
+        label.add_hosts([machine])
+
+
+job.parallel_simple(run, machines)
diff --git a/test_suites/control.bvt b/test_suites/control.bvt
new file mode 100755
index 0000000..385b470
--- /dev/null
+++ b/test_suites/control.bvt
@@ -0,0 +1,38 @@
+# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+AUTHOR = "Chrome OS Team"
+NAME = "bvt"
+PURPOSE = "Test basic, required functionality."
+CRITERIA = "All tests with SUITE=bvt must pass."
+
+TIME = "SHORT"
+TEST_CATEGORY = "General"
+TEST_CLASS = "suite"
+TEST_TYPE = "Server"
+
+DOC = """
+This is the Build Verification Test suite.  It should consist of SHORT tests
+that validate critical functionality -- ability to acquire connectivity, perform
+crash reporting, get updates, and allow a user to log in, among other things..
+"""
+
+import common
+from autotest_lib.server.cros import dynamic_suite
+
+
+# These params should be injected by the thing scheduling the job
+image_url = 'http://172.22.50.205:8080/update/x86-mario-r17/R17-1388.0.0-a1-b1323'
+image_name ='x86-mario-r17/R17-1388.0.0-a1-b1323'
+board = 'netbook_MARIO_MP'
+
+# This is pretty much just here for testing.
+SKIP_IMAGE = True
+
+suite_tag = 'bvt'
+reimager = dynamic_suite.Reimager(job.autodir)
+
+if SKIP_IMAGE or reimager.attempt(image_url, image_name, 4, board, job.record):
+    bvt = dynamic_suite.Suite.create_from_name(suite_tag, job.autodir)
+    bvt.run_and_wait(image_name, job.record, add_experimental=True)
diff --git a/test_suites/control.dummy b/test_suites/control.dummy
new file mode 100644
index 0000000..e301614
--- /dev/null
+++ b/test_suites/control.dummy
@@ -0,0 +1,37 @@
+# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+AUTHOR = "Chrome OS Team"
+NAME = "dummy"
+PURPOSE = "To be run while testing the infrastructure and test harness."
+CRITERIA = "None."
+
+TIME = "SHORT"
+TEST_CATEGORY = "Dummy"
+TEST_CLASS = "suite"
+TEST_TYPE = "Server"
+
+DOC = """
+This is a dummy test suite.  It runs dummy tests that always pass or always fail
+so that we can test result gathering and reporting mechanisms.
+"""
+
+import common
+from autotest_lib.server.cros import dynamic_suite
+
+
+# These params should be injected by the thing scheduling the job
+image_url = 'http://172.22.50.205:8080/update/x86-mario-r17/R17-1388.0.0-a1-b1323'
+image_name ='x86-mario-r17/R17-1388.0.0-a1-b1323'
+board = 'netbook_MARIO_MP'
+
+# This is pretty much just here for testing.
+SKIP_IMAGE = False
+
+suite_tag = 'dummy'
+reimager = dynamic_suite.Reimager(job.autodir)
+
+if SKIP_IMAGE or reimager.attempt(image_url, image_name, 4, board, job.record):
+    bvt = dynamic_suite.Suite.create_from_name(suite_tag, job.autodir)
+    bvt.run_and_wait(image_name, job.record, add_experimental=True)