autotest: end-to-end autoupdate test

This implements an autotest test for the Chrome OS autoupdate
functionality.  It receives a single test configuration and performs an
end-to-end test of the update process specified by it, which includes:

* Initializing the DUT to a known start state (source version)

* Spawning a local Omaha/devserver instance.

* Triggering an update process.

* Tracking the progress of the update.

* Rebooting the DUT after the update.

* Ensuring that the update has succeeded (target version running).

BUG=chromium-os:33760,chromium-os:33761,chromium-os:33762
TEST=Running with run_remote_tests.sh works

Change-Id: I215e8214d3239f525f062a27a4d97806451c9736
Reviewed-on: https://gerrit.chromium.org/gerrit/37432
Commit-Ready: Gilad Arnold <garnold@chromium.org>
Reviewed-by: Gilad Arnold <garnold@chromium.org>
Tested-by: Gilad Arnold <garnold@chromium.org>
diff --git a/client/common_lib/cros/autoupdater.py b/client/common_lib/cros/autoupdater.py
index b9d91f2..2020de0 100644
--- a/client/common_lib/cros/autoupdater.py
+++ b/client/common_lib/cros/autoupdater.py
@@ -169,6 +169,23 @@
         return self._run('/postinst %s 2>&1' % part)
 
 
+    def trigger_update(self):
+        """Triggers a background update on a test image.
+
+        @raise RootFSUpdateError if anything went wrong.
+
+        """
+        autoupdate_cmd = '%s --check_for_update --omaha_url=%s' % (
+            UPDATER_BIN, self.update_url)
+        logging.info('triggering update via: %s', autoupdate_cmd)
+        try:
+            # This should return immediately, hence the short timeout.
+            self._run(autoupdate_cmd, timeout=10)
+        except error.AutoservRunError, e:
+            raise RootFSUpdateError('update triggering failed on %s: %s' %
+                                    (self.host.hostname, str(e)))
+
+
     def _update_root(self):
         logging.info('Updating root partition...')
 
diff --git a/client/common_lib/cros/dev_server.py b/client/common_lib/cros/dev_server.py
index 38281f8..b1cea4a 100644
--- a/client/common_lib/cros/dev_server.py
+++ b/client/common_lib/cros/dev_server.py
@@ -177,11 +177,9 @@
         devservers = cls.servers()
         while devservers:
             hash_index = hash(build) % len(devservers)
-            devserver = devservers[hash_index]
+            devserver = devservers.pop(hash_index)
             if cls._devserver_up(devserver):
                 return cls(devserver)
-            else:
-                devservers.pop(hash_index)
         else:
             logging.error('All devservers are currently down!!!')
             raise DevServerException('All devservers are currently down!!!')
@@ -257,6 +255,22 @@
         return cls.servers()[0]
 
 
+    class ArtifactUrls(object):
+        """A container for URLs of staged artifacts.
+
+        Attributes:
+            full_payload: URL for downloading a staged full release update
+            mton_payload: URL for downloading a staged M-to-N release update
+            nton_payload: URL for downloading a staged N-to-N release update
+
+        """
+        def __init__(self, full_payload=None, mton_payload=None,
+                     nton_payload=None):
+            self.full_payload = full_payload
+            self.mton_payload = mton_payload
+            self.nton_payload = nton_payload
+
+
     @remote_devserver_call
     def trigger_download(self, image, synchronous=True):
         """Tell the devserver to download and stage |image|.
@@ -272,8 +286,10 @@
 
         @param image: the image to fetch and stage.
         @param synchronous: if True, waits until all components of the image are
-                staged before returning.
+               staged before returning.
+
         @raise DevServerException upon any return code that's not HTTP OK.
+
         """
         call = self.build_call(
                 'download', archive_url=_get_image_storage_server() + image)
@@ -308,6 +324,87 @@
 
 
     @remote_devserver_call
+    def trigger_test_image_download(self, image_dir):
+        """Tell the devserver to download and stage a Chrome OS test image.
+
+        Tells the devserver to fetch a test image from |image_dir| on the image
+        storage server named by _get_image_storage_server(). The call is
+        synchronous.
+
+        @param image_dir: the directory from which to fetch the image
+
+        @raise DevServerException upon any return code that's not HTTP OK.
+
+        """
+        call = self.build_call(
+                'stage_images',
+                archive_url=_get_image_storage_server() + image_dir,
+                image_types='test')
+        response = urllib2.urlopen(call)
+        was_successful = response.read() == 'Success'
+        if not was_successful:
+            raise DevServerException(
+                "trigger_download of test image from %s failed; "
+                "HTTP OK not accompanied by 'Success'." %
+                image_dir)
+
+
+    def get_delta_payload_url(self, payload_type, board, release, branch):
+        """Returns a URL to a staged delta payload.
+
+        @param payload_type: either 'mton' or 'nton'
+        @param board: the board the payload corresponds to (e.g. 'x86-alex')
+        @param release: the payload target release version (e.g. '2673.0.0')
+        @param branch: the payload target release branch (e.g. 'R22')
+
+        @return A fully qualified URL that can be used for downloading the
+                payload.
+
+        @raise DevServerException if payload type argument is invalid.
+
+        """
+        if payload_type not in ('mton', 'nton'):
+            raise DevServerException('invalid delta payload type: %s' %
+                                     payload_type)
+        url_pattern = CONFIG.get_config_value(
+                'CROS', 'delta_payload_url_pattern', type=str)
+        return url_pattern % (self.url(), board, branch, release, branch,
+                              release, payload_type)
+
+
+    def get_full_payload_url(self, board, release, branch):
+        """Returns a URL to a staged full payload.
+
+        @param board: the board the payload corresponds to (e.g. 'x86-alex')
+        @param release: the payload target release version (e.g. '2673.0.0')
+        @param branch: the payload target release branch (e.g. 'R22')
+
+        @return A fully qualified URL that can be used for downloading the
+                payload.
+
+        """
+        url_pattern = CONFIG.get_config_value(
+                'CROS', 'full_payload_url_pattern', type=str)
+        return url_pattern % (self.url(), board, branch, release)
+
+
+    def get_test_image_url(self, board, release, branch):
+        """Returns a URL to a staged test image.
+
+        @param board: the board to which the image corresponds (e.g. 'x86-alex')
+        @param release: the image release version (e.g. '2673.0.0')
+        @param branch: the image release branch (e.g. 'R22')
+
+        @return A fully qualified URL that can be used for downloading the
+                image.
+
+        """
+        url_pattern = CONFIG.get_config_value(
+                'CROS', 'test_image_url_pattern', type=str)
+        return url_pattern % (self.url(), board, branch, release)
+
+
+    @remote_devserver_call
     def list_control_files(self, build):
         """Ask the devserver to list all control files for |build|.
 
diff --git a/client/common_lib/site_utils.py b/client/common_lib/site_utils.py
index bf77300..cf1e610 100644
--- a/client/common_lib/site_utils.py
+++ b/client/common_lib/site_utils.py
@@ -15,6 +15,10 @@
 CHECK_PID_IS_ALIVE_TIMEOUT = 6
 
 
+
+_LOCAL_HOST_LIST = ['localhost', '127.0.0.1']
+
+
 def ping(host, deadline=None, tries=None, timeout=60):
     """Attempt to ping |host|.
 
@@ -125,6 +129,21 @@
         return True
 
 
+def gs_ls(uri_pattern):
+    """Returns a list of URIs that match a given pattern.
+
+    @param uri_pattern: a GS URI pattern, may contain wildcards
+
+    @return A list of URIs matching the given pattern.
+
+    @raise CmdError: the gsutil command failed.
+
+    """
+    gs_cmd = ' '.join(['gsutil', 'ls', uri_pattern])
+    result = base_utils.system_output(gs_cmd).splitlines()
+    return [path.rstrip() for path in result if path]
+
+
 def nuke_pids(pid_list, signal_queue=[signal.SIGTERM, signal.SIGKILL]):
     """
     Given a list of pid's, kill them via an esclating series of signals.
@@ -153,3 +172,14 @@
     if failed_list:
         raise error.AutoservRunError('Following errors occured: %s' %
                                      failed_list, None)
+
+
+def externalize_host(host):
+    """Returns an externally accessible host name.
+
+    @param host: a host name or address (string)
+
+    @return An externally visible host name or address
+
+    """
+    return socket.gethostname() if host in _LOCAL_HOST_LIST else host
diff --git a/global_config.ini b/global_config.ini
index a3b09c9..b1fd919 100644
--- a/global_config.ini
+++ b/global_config.ini
@@ -146,6 +146,9 @@
 image_url_pattern: %s/update/%s
 package_url_pattern: %s/static/archive/%s/autotest/packages
 log_url_pattern: http://%s/tko/retrieve_logs.cgi?job=/results/%s/
+delta_payload_url_pattern: %s/static/%s-release/%s-%s/au/%s-%s_%s/update.gz
+full_payload_url_pattern: %s/static/%s-release/%s-%s/update.gz
+test_image_url_pattern: %s/static/images/%s-release/%s-%s/chromiumos_test_image.bin
 # Username and password for connecting to remote power control switches of
 # the "Sentry Switched CDU" type
 rpm_sentry_username: fake_user
diff --git a/server/site_tests/autoupdate_EndToEndTest/autoupdate_EndToEndTest.py b/server/site_tests/autoupdate_EndToEndTest/autoupdate_EndToEndTest.py
new file mode 100755
index 0000000..da83c30
--- /dev/null
+++ b/server/site_tests/autoupdate_EndToEndTest/autoupdate_EndToEndTest.py
@@ -0,0 +1,683 @@
+# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import json
+import logging
+import socket
+import subprocess
+import time
+import urllib2
+import urlparse
+from autotest_lib.client.common_lib import error, site_utils
+from autotest_lib.client.common_lib.cros import autoupdater, dev_server
+from autotest_lib.server import test
+
+
+def _wait(secs, desc=None):
+    """Emits a log message and sleeps for a given number of seconds."""
+    msg = 'waiting %s seconds' % secs
+    if desc:
+        msg += ' (%s)' % desc
+    logging.info(msg)
+    time.sleep(secs)
+
+
+class ExpectedUpdateEvent(object):
+    """Defines an expected event in a host update process."""
+    def __init__(self, event_type=None, event_result=None, version=None,
+                 previous_version=None):
+        self._expected_attrs = {
+            'event_type': event_type,
+            'event_result': event_result,
+            'version': version,
+            'previous_version': previous_version,
+        }
+
+
+    def __str__(self):
+        return ' '.join(['%s=%s' % (attr_name, attr_val or 'any')
+                         for attr_name, attr_val
+                         in self._expected_attrs.iteritems()])
+
+
+    def verify(self, actual_event):
+        """Verify the attributes of an actual event.
+
+        @params actual_event: a dictionary containing event attributes
+
+        @return True if all attributes as expected, False otherwise.
+
+        """
+        return all([self._verify_attr(attr_name, expected_attr_val,
+                                      actual_event.get(attr_name))
+                    for attr_name, expected_attr_val
+                    in self._expected_attrs.iteritems() if expected_attr_val])
+
+
+    def _verify_attr(self, attr_name, expected_attr_val, actual_attr_val):
+        """Verifies that an actual log event attributes matches expected on.
+
+        @param attr_name: name of the attribute to verify
+        @param expected_attr_val: expected attribute value
+        @param actual_attr_val: actual attribute value
+
+        @return True if actual value is present and matches, False otherwise.
+
+        """
+        if not (actual_attr_val and
+                str(actual_attr_val) == str(expected_attr_val)):
+            logging.error(
+                    'actual %s (%s) not as expected (%s)',
+                    attr_name, actual_attr_val, expected_attr_val)
+            return False
+
+        return True
+
+
+class ExpectedUpdateEventChain(object):
+    """Defines a chain of expected update events."""
+    def __init__(self, *expected_event_chain_args):
+        """Initialize the chain object.
+
+        @param expected_event_chain_args: list of tuples arguments, each
+               containing a timeout (in seconds) and an ExpectedUpdateEvent
+               object.
+
+        """
+        self._expected_event_chain = expected_event_chain_args
+
+
+    def _format_event_with_timeout(self, timeout, expected_event):
+        return ('%s %s' %
+                (expected_event,
+                 ('within %s seconds' % timeout) if timeout
+                 else 'indefinitely'))
+
+
+    def __str__(self):
+        return ('[%s]' %
+                ', '.join(
+                    [self._format_event_with_timeout(timeout, expected_event)
+                     for timeout, expected_event
+                     in self._expected_event_chain]))
+
+
+    def __repr__(self):
+        return str(self._expected_event_chain)
+
+
+    def verify(self, get_next_event):
+        """Verifies that an actual stream of events complies.
+
+        @param get_next_event: a function returning the next event
+
+        @return True if chain was satisfied, False otherwise.
+
+        """
+        for timeout, expected_event in self._expected_event_chain:
+            logging.info(
+                    'expecting %s',
+                    self._format_event_with_timeout(timeout, expected_event))
+            if not self._verify_event_with_timeout(
+                    timeout, expected_event, get_next_event):
+                return False
+        return True
+
+
+    def _verify_event_with_timeout(self, timeout, expected_event,
+                                   get_next_event):
+        """Verify an expected event occurs within a given timeout.
+
+        @param timeout: specified in seconds
+        @param expected_event: an expected event specification
+        @param get_next_event: function returning the next event in a stream
+
+        @return True if event complies, False otherwise.
+
+        """
+        base_timestamp = curr_timestamp = time.time()
+        expired_timestamp = base_timestamp + timeout
+        while curr_timestamp <= expired_timestamp:
+            new_event = get_next_event()
+            if new_event:
+                logging.info('event received after %s seconds',
+                             curr_timestamp - base_timestamp)
+                return expected_event.verify(new_event)
+
+            # No new events, sleep for one second only (so we don't miss
+            # events at the end of the allotted timeout).
+            time.sleep(1)
+            curr_timestamp = time.time()
+
+        logging.error('timeout expired')
+        return False
+
+
+class UpdateEventLogVerifier(object):
+    """Verifies update event chains on a devserver update log."""
+    def __init__(self, event_log_url):
+        self._event_log_url = event_log_url
+        self._event_log = []
+        self._num_consumed_events = 0
+
+
+    def verify_expected_event_chain(self, expected_event_chain):
+        """Verify a given event chain."""
+        return expected_event_chain.verify(self._get_next_log_event)
+
+
+    def _get_next_log_event(self):
+        """Returns the next event in an event log.
+
+        Uses the URL handed to it during initialization to obtain the host log
+        from a devserver. If new events are encountered, the first of them is
+        consumed and returned.
+
+        @return The next new event in the host log, as reported by devserver;
+                None if no such event was found.
+
+        """
+        # (Re)read event log from devserver, if necessary.
+        if len(self._event_log) <= self._num_consumed_events:
+            conn = urllib2.urlopen(self._event_log_url)
+            event_log_resp = conn.read()
+            conn.close()
+            self._event_log = json.loads(event_log_resp)
+
+        # Return next new event, if such is present.
+        if len(self._event_log) > self._num_consumed_events:
+            new_event = self._event_log[self._num_consumed_events]
+            self._num_consumed_events += 1
+            logging.info('consumed new event: %s', new_event)
+            return new_event
+
+
+class OmahaDevserver(object):
+    """Spawns a test-private devserver instance."""
+    _WAIT_FOR_DEVSERVER_STARTED_SECONDS = 15
+
+
+    def __init__(self, omaha_host, dut_ip_addr, update_payload_lorry_url):
+        """Starts a private devserver instance, operating at Omaha capacity.
+
+        @param omaha_host: host address where the devserver is spawned.
+        @param dut_ip_addr: the IP address of the client DUT, used for deriving
+               a unique port number.
+        @param update_payload_lorry_url: URL to provision for update requests.
+
+        @raise error.TestError when things go wrong.
+
+        """
+        # First, obtain the target URL base / image strings.
+        if not update_payload_lorry_url:
+            raise error.TestError('missing update payload url')
+        update_payload_url_base, update_payload_path, _ = self._split_url(
+                update_payload_lorry_url)
+
+        # Second, compute a unique port for the DUT update checks to use, based
+        # on the DUT's IP address.
+        self._omaha_port = self._get_unique_port(dut_ip_addr)
+        logging.debug('dut ip addr: %s => omaha/devserver port: %d',
+                      dut_ip_addr, self._omaha_port)
+
+        # Invoke the Omaha/devserver.
+        cmdlist = [
+                'start_devserver',
+                '--archive_dir=static/',
+                '--payload=%s' % update_payload_path,
+                '--port=%d' % self._omaha_port,
+                '--remote_payload',
+                '--urlbase=%s' % update_payload_url_base,
+                '--max_updates=1',
+                '--host_log',
+        ]
+        logging.info('launching omaha/devserver on %s: %s',
+                     omaha_host, ' '.join(cmdlist))
+        # TODO(garnold) invoke omaha/devserver remotely! The host needs to be
+        # either globally known to all DUTs, or inferrable based on the DUT's
+        # own IP address, or otherwise provisioned to it.
+        is_omaha_devserver_local = (
+                omaha_host in ['localhost', socket.gethostname()])
+        if not is_omaha_devserver_local:
+          raise error.TestError(
+              'remote omaha/devserver invocation unsupported yet')
+        # We are using subprocess directly (as opposed to existing util
+        # wrappers like utils.run() or utils.BgJob) because we need to be able
+        # to terminate the subprocess once the test finishes.
+        self._devserver = subprocess.Popen(cmdlist, 0, None, subprocess.PIPE,
+                                           None, subprocess.PIPE)
+        timeout = self._WAIT_FOR_DEVSERVER_STARTED_SECONDS
+        devserver_stderr_log = []
+        while timeout and self._devserver.returncode is None:
+            time.sleep(1)
+            timeout -= 1
+            devserver_started = False
+            while not devserver_started:
+                line = self._devserver.stderr.readline()
+                if not line:
+                    break
+                log_line = '[devserver]' + line.rstrip('\n')
+                logging.debug(log_line)
+                devserver_stderr_log.append(log_line)
+                devserver_started = 'Bus STARTED' in line
+            else:
+                break
+        else:
+            raise error.TestError(
+                'omaha/devserver not running, error log:\n%s' %
+                '\n'.join(devserver_strerr_log))
+
+        self._omaha_host = site_utils.externalize_host(omaha_host)
+
+
+    @staticmethod
+    def _split_url(url):
+        """Splits a URL into the URL base, path and file name."""
+        split_url = urlparse.urlsplit(url)
+        url_base = urlparse.urlunsplit(
+                [split_url.scheme, split_url.netloc, '', '', ''])
+        url_path = url_file = ''
+        if split_url.path:
+            url_path, url_file = split_url.path.rsplit('/', 1)
+        return url_base, url_path.lstrip('/'), url_file
+
+
+    @staticmethod
+    def _get_unique_port(dut_ip_addr):
+        """Compute a unique IP port based on the DUT's IP address.
+
+        We need a mapping that can be mirrored by a DUT running an official
+        image, based only on the DUT's own state. Here, we simply take the two
+        least significant bytes in the DUT's IPv4 address and bitwise-OR them
+        with 0xc0000, resulting in a 16-bit IP port within the
+        private/unallocated range. Using the least significant bytes of the IP
+        address guarantees (sort of) that we'll have a unique mapping in a
+        small lab setting.
+
+        """
+        ip_addr_bytes = [int(byte_str) for byte_str in dut_ip_addr.split('.')]
+        return (((ip_addr_bytes[2] << 8) | ip_addr_bytes[3] | 0x8000) & ~0x4000)
+
+
+    def get_netloc(self):
+        if not self._devserver:
+            raise error.TestError('no running omaha/devserver')
+        return '%s:%s' % (self._omaha_host, self._omaha_port)
+
+
+    def kill(self):
+        """Kill private devserver, wait for it to die."""
+        if not self._devserver:
+            raise error.TestError('no running omaha/devserver')
+        logging.info('killing omaha/devserver')
+        self._devserver.terminate()
+        self._devserver.communicate()
+        self._devserver = None
+
+
+class autoupdate_EndToEndTest(test.test):
+    """Complete update test between two Chrome OS releases.
+
+    Performs an end-to-end test of updating a ChromeOS device from one version
+    to another. This script requires a running (possibly remote) servod
+    instance connected to an actual servo board, which controls the DUT. It
+    also assumes that a corresponding target (update) image was staged for
+    download on the central Lorry/devserver.
+
+    The test performs the following steps:
+
+      0. Stages the source image and target update payload on the central
+         Lorry/devserver.
+      1. Spawns a private Omaha/devserver instance, configured to return the
+         target (update) image URL in response for an update check.
+      2. Connects to servod.
+         a. Resets the DUT to a known initial state.
+         b. Installs a source image on the DUT via recovery.
+      3. Reboots the DUT with the new image.
+      4. Triggers an update check at the DUT.
+      5. Watches as the DUT obtains an update and applies it.
+      6. Repeats 3-5, ensuring that the next update check shows the new image
+         version.
+
+    """
+    version = 1
+
+    # Timeout periods, given in seconds.
+    _WAIT_AFTER_SHUTDOWN_SECONDS        = 10
+    _WAIT_AFTER_UPDATE_SECONDS          = 20
+    _WAIT_FOR_USB_INSTALL_SECONDS       = 4 * 60
+    _WAIT_FOR_MP_RECOVERY_SECONDS       = 8 * 60
+    _WAIT_FOR_INITIAL_UPDATE_CHECK_SECONDS = 12 * 60
+    _WAIT_FOR_DOWNLOAD_STARTED_SECONDS     = 2 * 60
+    _WAIT_FOR_DOWNLOAD_COMPLETED_SECONDS   = 5 * 60
+    _WAIT_FOR_UPDATE_COMPLETED_SECONDS     = 4 * 60
+    _WAIT_FOR_UPDATE_CHECK_AFTER_REBOOT_SECONDS = 15 * 60
+
+    # Omaha event types/results, from update_engine/omaha_request_action.h
+    EVENT_TYPE_UNKNOWN           = 0
+    EVENT_TYPE_DOWNLOAD_COMPLETE = 1
+    EVENT_TYPE_INSTALL_COMPLETE  = 2
+    EVENT_TYPE_UPDATE_COMPLETE   = 3
+    EVENT_TYPE_DOWNLOAD_STARTED  = 13
+    EVENT_TYPE_DOWNLOAD_FINISHED = 14
+    EVENT_RESULT_ERROR           = 0
+    EVENT_RESULT_SUCCESS         = 1
+    EVENT_RESULT_SUCCESS_REBOOT  = 2
+    EVENT_RESULT_UPDATE_DEFERRED = 9
+
+
+    def _servo_dut_power_up(self, host, is_dev_mode):
+        """Powers up the DUT, optionally simulating a Ctrl-D key press."""
+        host.servo.power_short_press()
+        if is_dev_mode:
+            host.servo.pass_devmode()
+
+
+    def _servo_dut_reboot(self, host, is_dev_mode, is_using_test_images,
+                          is_disable_usb_hub=False):
+        """Reboots a DUT.
+
+        @param host: a host object
+        @param is_dev_mode: whether or not the DUT is in dev mode
+        @param is_using_test_images: whether or not a test image should be
+               assumed
+        @param is_disable_usb_hub: disabled the servo USB hub in between power
+               off/on cycles; this is useful when (for example) a USB booted
+               device need not see the attached USB key after the reboot.
+
+        @raise error.TestFail if DUT fails to reboot.
+
+        """
+        logging.info('rebooting dut')
+        host.servo.power_long_press()
+        _wait(self._WAIT_AFTER_SHUTDOWN_SECONDS, 'after shutdown')
+        if is_disable_usb_hub:
+            host.servo.disable_usb_hub()
+        self._servo_dut_power_up(host, is_dev_mode)
+        if is_using_test_images:
+            if not host.wait_up(timeout=host.BOOT_TIMEOUT):
+                raise error.TestFail(
+                        'dut %s failed to boot after %d secs' %
+                        (host.ip, host.BOOT_TIMEOUT))
+        else:
+          # TODO(garnold) chromium-os:33766: implement waiting for MP-signed
+          # images; ideas include waiting for a ping reply, or using a GPIO
+          # signal.
+          pass
+
+
+    def _install_mp_image(self, host, lorry_image_url, is_dev_mode):
+        """Installs an MP-signed recovery image on a DUT.
+
+        @param host: a host object
+        @param lorry_image_url: URL of the image on a Lorry/devserver
+        @param is_dev_nmode: whether or not the DUT is in dev mode
+
+        """
+        # Flash DUT with source image version, using recovery.
+        logging.info('installing source mp-signed image via recovery: %s',
+                     lorry_image_url)
+        host.servo.install_recovery_image(
+                lorry_image_url,
+                wait_timeout=self._WAIT_FOR_MP_RECOVERY_SECONDS)
+
+        # Reboot the DUT after installation.
+        self._servo_dut_reboot(host, is_dev_mode, False,
+                               is_disable_usb_hub=True)
+
+
+    def _install_test_image(self, host, lorry_image_url, is_dev_mode):
+        """Installs a test image on a DUT, booted via recovery.
+
+        @param host: a host object
+        @param lorry_image_url: URL of the image on a Lorry/devserver
+        @param is_dev_nmode: whether or not the DUT is in dev mode
+
+        @raise error.TestFail if DUT cannot boot the test image from USB;
+               AutotestHostRunError if failed to run the install command on the
+               DUT.
+
+        """
+        logging.info('installing source test image via recovery: %s',
+                     lorry_image_url)
+        host.servo.install_recovery_image(lorry_image_url)
+        logging.info('waiting for image to boot')
+        if not host.wait_up(timeout=host.USB_BOOT_TIMEOUT):
+          raise error.TestFail(
+              'dut %s boot from usb timed out after %d secs' %
+              (host, host.USB_BOOT_TIMEOUT))
+        logging.info('installing new image onto ssd')
+        try:
+            cmd_result = host.run(
+                    'chromeos-install --yes',
+                    timeout=self._WAIT_FOR_USB_INSTALL_SECONDS,
+                    stdout_tee=None, stderr_tee=None)
+        except AutotestHostRunError, e:
+            # Dump stdout (with stderr) to the error log.
+            logging.error('command failed, stderr:\n' + cmd_result.stderr)
+            raise
+
+        # Reboot the DUT after installation.
+        self._servo_dut_reboot(host, is_dev_mode, True,
+                               is_disable_usb_hub=True)
+
+
+    def _trigger_test_update(self, host, omaha_netloc):
+        """Trigger an update check on a test image.
+
+        Uses update_engine_client via SSH. This is an async call, hence a very
+        short timeout.
+
+        @param host: a host object
+        @param omaha_netloc: the network location of the Omaha/devserver
+               (http://host:port)
+
+        @raise RootFSUpdateError if anything went wrong.
+
+        """
+        omaha_update_url = urlparse.urlunsplit(
+                ['http', omaha_netloc, '/update', '', ''])
+        updater = autoupdater.ChromiumOSUpdater(omaha_update_url, host=host)
+        updater.trigger_update()
+
+
+    def stage_image(self, lorry_devserver, image_uri, board, release, branch,
+                    is_using_test_images):
+        """Stage a Chrome OS image on Lorry/devserver.
+
+        @return URL of the staged image on the server.
+
+        @raise error.TestError if there's a problem with staging.
+
+        """
+        staged_url = None
+        if is_using_test_images:
+            # For this call, we just need the URL path up to the image.zip file
+            # (exclusive).
+            image_uri_path = urlparse.urlsplit(image_uri).path.partition(
+                    'image.zip')[0].strip('/')
+            try:
+                lorry_devserver.trigger_test_image_download(image_uri_path)
+                staged_url = lorry_devserver.get_test_image_url(
+                        board, release, branch)
+            except dev_server.DevServerException, e:
+                raise error.TestError(
+                        'failed to stage source test image: %s' % str(e))
+        else:
+            # TODO(garnold) chromium-os:33766: implement staging of MP-signed
+            # images.
+            pass
+
+        if not staged_url:
+            raise error.TestError('staged source test image url missing')
+        return staged_url
+
+
+    def stage_payload(self, lorry_devserver, payload_uri, board, release,
+                      branch, is_using_test_images, is_delta, is_nton):
+        """Stage an update target payload on Lorry/devserver.
+
+        @return URL of the staged payload on the server.
+
+        @raise error.TestError if there's a problem with staging.
+
+        """
+        staged_url = None
+        if is_using_test_images:
+            # For this call, we'll need the URL path without the payload file
+            # name.
+            payload_uri_path = urlparse.urlsplit(payload_uri).path.rsplit(
+                    '/', 1)[0].strip('/')
+            try:
+                lorry_devserver.trigger_download(payload_uri_path)
+                if is_delta:
+                    staged_url = lorry_devserver.get_delta_payload_url(
+                            'nton' if is_nton else 'mton',
+                            board, release, branch)
+                else:
+                    staged_url = lorry_devserver.get_full_payload_url(
+                            board, release, branch)
+            except dev_server.DevServerException, e:
+                raise error.TestError(
+                        'failed to stage target test payload: %s' % str(e))
+        else:
+            # TODO(garnold) chromium-os:33766: implement staging of MP-signed
+            # images.
+            pass
+
+        if not staged_url:
+            raise error.TestError('staged target test payload url missing')
+        return staged_url
+
+
+    def run_once(self, host, test_conf):
+        """Performs a complete auto update test.
+
+        @param host: a host object representing the DUT
+        @param test_conf: a dictionary containing test configuration values
+
+        @raise error.TestError if anything went wrong with setting up the test;
+               error.TestFail if any part of the test has failed.
+
+        """
+        is_using_test_images = test_conf.get('image_type') != 'mp'
+        is_dev_mode = test_conf.get('dev_mode') == 'yes'
+        omaha_host = test_conf.get('omaha_host')
+
+        # Stage source images and update payloads on lorry/devserver. We use
+        # the payload URI as argument for the lab's devserver load-balancing
+        # mechanism.
+        lorry_devserver = dev_server.ImageServer.resolve(
+                test_conf['target_payload_uri'])
+        logging.info('staging image and payload on lorry/devserver (%s)',
+                     lorry_devserver.url())
+        test_conf['source_image_lorry_url'] = self.stage_image(
+                lorry_devserver, test_conf['source_image_uri'],
+                test_conf['board'], test_conf['source_release'],
+                test_conf['source_branch'], is_using_test_images)
+        test_conf['target_payload_lorry_url'] = self.stage_payload(
+                lorry_devserver, test_conf['target_payload_uri'],
+                test_conf['board'], test_conf['target_release'],
+                test_conf['target_branch'], is_using_test_images,
+                test_conf['update_type'] == 'delta',
+                test_conf['target_release'] == test_conf['source_release'])
+
+        # Launch Omaha/devserver.
+        try:
+            self._omaha_devserver = OmahaDevserver(
+                    omaha_host, host.ip,
+                    test_conf.get('target_payload_lorry_url'))
+        except error.TestError, e:
+            logging.error('failed to start omaha/devserver: %s', str(e))
+            raise
+
+        try:
+            # Install source image (test vs MP).
+            if is_using_test_images:
+                self._install_test_image(
+                        host, test_conf['source_image_lorry_url'],
+                        is_dev_mode)
+            else:
+                self._install_mp_image(test_conf['source_image_lorry_url'],
+                                       is_dev_mode)
+
+            omaha_netloc = self._omaha_devserver.get_netloc()
+
+            # Trigger an update (test vs MP).
+            if is_using_test_images:
+                self._trigger_test_update(host, omaha_netloc)
+            else:
+                # TODO(garnold) chromium-os:33766: use GPIOs to trigger an
+                # update.
+                pass
+
+            # Track update progress.
+            omaha_hostlog_url = urlparse.urlunsplit(
+                    ['http', omaha_netloc, '/api/hostlog', 'ip=' + host.ip, ''])
+            logging.info('polling update progress from omaha/devserver: %s',
+                         omaha_hostlog_url)
+            log_verifier = UpdateEventLogVerifier(omaha_hostlog_url)
+
+            # Verify chain of events in a successful update process.
+            chain = ExpectedUpdateEventChain(
+                    (self._WAIT_FOR_INITIAL_UPDATE_CHECK_SECONDS,
+                     ExpectedUpdateEvent(
+                         version=test_conf['source_release'])),
+                    (self._WAIT_FOR_DOWNLOAD_STARTED_SECONDS,
+                     ExpectedUpdateEvent(
+                         event_type=self.EVENT_TYPE_DOWNLOAD_STARTED,
+                         event_result=self.EVENT_RESULT_SUCCESS,
+                         version=test_conf['source_release'])),
+                    (self._WAIT_FOR_DOWNLOAD_COMPLETED_SECONDS,
+                     ExpectedUpdateEvent(
+                         event_type=self.EVENT_TYPE_DOWNLOAD_FINISHED,
+                         event_result=self.EVENT_RESULT_SUCCESS,
+                         version=test_conf['source_release'])),
+                    (self._WAIT_FOR_UPDATE_COMPLETED_SECONDS,
+                     ExpectedUpdateEvent(
+                         event_type=self.EVENT_TYPE_UPDATE_COMPLETE,
+                         event_result=self.EVENT_RESULT_SUCCESS,
+                         version=test_conf['source_release'])))
+            if not log_verifier.verify_expected_event_chain(chain):
+                raise error.TestFail(
+                        'could not verify that update was successful')
+
+            # Wait after an update completion (safety margin).
+            _wait(self._WAIT_AFTER_UPDATE_SECONDS, 'after update completion')
+
+            # Reboot the DUT after the update.
+            self._servo_dut_reboot(host, is_dev_mode, is_using_test_images)
+
+            # Trigger a second update check (again, test vs MP).
+            if is_using_test_images:
+                self._trigger_test_update(host, omaha_netloc)
+            else:
+                # TODO(garnold) chromium-os:33766: use GPIOs to trigger an
+                # update.
+                pass
+
+            # Observe post-reboot update check, which should indicate that the
+            # image version has been updated.  Note that the previous version
+            # is currently not reported by AU, as one may have expected; had it
+            # been reported, we should have included
+            # expect_previous_version=test_conf['source_release'] as well.
+            chain = ExpectedUpdateEventChain(
+                    (self._WAIT_FOR_UPDATE_CHECK_AFTER_REBOOT_SECONDS,
+                     ExpectedUpdateEvent(
+                         event_type=self.EVENT_TYPE_UPDATE_COMPLETE,
+                         event_result=self.EVENT_RESULT_SUCCESS_REBOOT,
+                         version=test_conf['target_release'])))
+            if not log_verifier.verify_expected_event_chain(chain):
+                raise error.TestFail('could not verify that machine rebooted '
+                                     'after update')
+
+        except error.TestFail:
+            raise
+        except Exception, e:
+            # Convert any other exception into a test failure.
+            raise error.TestFail(str(e))
+
+        finally:
+            self._omaha_devserver.kill()
+
diff --git a/server/site_tests/autoupdate_EndToEndTest/control b/server/site_tests/autoupdate_EndToEndTest/control
new file mode 100644
index 0000000..970c0dd
--- /dev/null
+++ b/server/site_tests/autoupdate_EndToEndTest/control
@@ -0,0 +1,86 @@
+# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import logging
+from autotest_lib.client.common_lib import error, utils
+from autotest_lib.server import host_attributes
+
+AUTHOR = "Chromium OS"
+NAME = "autoupdate_EndToEndTest"
+TIME = "MEDIUM"
+TEST_CATEGORY = "Functional"
+TEST_CLASS = "platform"
+TEST_TYPE = "server"
+
+DOC = """
+This is an end-to-end update test of Chrome OS releases. Given a test
+configuration, it will perform an end-to-end test of a Chrome OS update
+payload. A test configuration can be given as command-line arguments (see
+below) or instantiated inline as local varibles.
+
+To invoke this test locally with an attached servo board:
+
+  run_remote_tests.sh \
+          --args="<ARGLIST>" \
+          --servo \
+          --remote=<DUT-IPADDR \
+          --ssh_connect_timeout 2 \       # Make the test run faster
+          --ssh_connection_attempts 2 \   # when not using test
+          --allow_offline_remote \        # images.
+          autoupdate_EndToEndTest
+
+where ARGLIST is a whitespace separated list of the following key=value pairs.
+Values pertaining to the test case include:
+
+  board=BOARD        type of board tested (e.g. 'x86-alex')
+  name=TAG           name tag for the test (e.g. 'nmo', 'npo' or 'fsi')
+  image_type=test|mp      type of images used, either 'test' or 'mp'
+  update_type=full|delta  type of update being applied, either 'full' or 'delta'
+  source_release=REL      source image release version (e.g. 2672.0.0)
+  target_release=REL      target image release version (e.g. 2673.0.0)
+  source_branch=BR        source image release branch (e.g. R22)
+  target_branch=BR        target image release branch (e.g. R22)
+  source_image_uri=URI    URI of the source image
+  target_payload_uri=URI  URI of the target payload
+
+Other values pertaining to the test environment include:
+
+  servo_host=HOST         host running servod (default: localhost)
+  servo_port=PORT         servod's IP port (default: servod's default port)
+  omaha_host=HOST    host on which to spawn Omaha/devserver (default: localhost)
+  dev_mode=yes|no    Speed up dev-mode boot by simulating Ctrl-D (default: no)
+
+"""
+
+TEST_CONF_KEYS = (
+    'board', 'name', 'image_type', 'update_type',
+    'source_release', 'target_release', 'source_branch', 'target_branch',
+    'source_image_uri', 'target_payload_uri', 'omaha_host', 'dev_mode')
+
+
+args_dict = utils.args_to_dict(args)
+servo_args = hosts.SiteHost.get_servo_arguments(args_dict)
+
+# Create test configuration based on command-line arguments (higher precedence,
+# for run_remote_tests.sh invocation) and local variables (lower precedence,
+# for Autotest front-end invocation).
+test_conf = {}
+for key in TEST_CONF_KEYS:
+    test_conf[key] = args_dict.get(key) or locals().get(key)
+
+
+def run_test(machine):
+    """Execute a test configuration on a given machine."""
+    host = hosts.create_host(machine, servo_args=servo_args)
+    job.run_test(
+            "autoupdate_EndToEndTest",
+            tag='%s_%s_%s_%s' % (
+                test_conf['board'], test_conf['name'],
+                test_conf['image_type'], test_conf['update_type']),
+            host=host,
+            test_conf=test_conf)
+
+
+# Invoke parallel tests.
+parallel_simple(run_test, machines)