Modify dynamic suite pathway to allow some artifacts to be staged in the bgnd.

This change modifies the site_rpc_interface->dynamic_suite.py workflow to allow
autotest to stage larger build components while imaging devices. It uses
the download/wait_for_status interface provided by the devserver to
accomplish it.

BUG=chromium-os:27285
TEST=All effected unittests pass. Staging larger test on local devserver in
parallel.

Change-Id: Iae2a20762c565b8e6d440c9a501c1f99c73129ed
Reviewed-on: https://gerrit.chromium.org/gerrit/19328
Tested-by: Chris Sosa <sosa@chromium.org>
Reviewed-by: Scott Zawalski <scottz@chromium.org>
Tested-by: Scott Zawalski <scottz@chromium.org>
Commit-Ready: Scott Zawalski <scottz@chromium.org>
diff --git a/client/common_lib/cros/dev_server.py b/client/common_lib/cros/dev_server.py
index 5cfe00c..b2bde59 100644
--- a/client/common_lib/cros/dev_server.py
+++ b/client/common_lib/cros/dev_server.py
@@ -19,6 +19,30 @@
     return CONFIG.get_config_value('CROS', 'dev_server', type=str)
 
 
+def remote_devserver_call(internal_error_return_val):
+    """A decorator to use with remote devserver calls.
+
+    This decorater handles httplib.INTERNAL_SERVER_ERROR's cleanly while
+    raising all other exceptions. It requires that you pass in the value you
+    want the method to return when it receives a httplib.INTERNAL_SERVER_ERROR.
+    """
+    def wrapper(method):
+      """Wrapper just wraps the method."""
+      def inner_wrapper(*args, **kwargs):
+        """This wrapper actually catches the httplib.INTERNAL_SERVER_ERROR."""
+        try:
+            return method(*args, **kwargs)
+        except urllib2.HTTPError as e:
+            if e.code == httplib.INTERNAL_SERVER_ERROR:
+                return internal_error_return_val
+            else:
+                raise
+
+      return inner_wrapper
+
+    return wrapper
+
+
 class DevServer(object):
     """Helper class for interacting with the Dev Server via http."""
     def __init__(self, dev_host=None):
@@ -53,30 +77,59 @@
                                                  'args': argstr}
 
 
-    def trigger_download(self, image):
+    @remote_devserver_call(False)
+    def trigger_download(self, image, synchronous=True):
         """Tell the dev server to download and stage |image|.
 
         Tells the dev server at |self._dev_server| to fetch |image|
         from the image storage server named by _get_image_storage_server().
 
+        If |synchronous| is True, waits for the entire download to finish
+        staging before returning. Otherwise only the artifacts necessary
+        to start installing images onto DUT's will be staged before returning.
+        A caller can then call finish_download to guarantee the rest of the
+        artifacts have finished staging.
+
+        @param image: the image to fetch and stage.
+        @param synchronous: if True, waits until all components of the image are
+                staged before returning.
+        @return True if the remote call returns HTTP OK, False if it returns
+                an internal server error.
+        @throws urllib2.HTTPError upon any return code that's not 200 or 500.
+        """
+        call = self._build_call(
+            'download',
+            archive_url=_get_image_storage_server() + image)
+        response = urllib2.urlopen(call)
+        was_successful = response.read() == 'Success'
+        if was_successful and synchronous:
+            return self.finish_download(image)
+        else:
+            return was_successful
+
+
+    @remote_devserver_call(False)
+    def finish_download(self, image):
+        """Tell the dev server to finish staging |image|.
+
+        If trigger_download is called with synchronous=False, it will return
+        before all artifacts have been staged. This method contacts the
+        devserver and blocks until all staging is completed and should be
+        called after a call to trigger_download.
+
         @param image: the image to fetch and stage.
         @return True if the remote call returns HTTP OK, False if it returns
                 an internal server error.
         @throws urllib2.HTTPError upon any return code that's not 200 or 500.
         """
-        try:
-            call = self._build_call(
-                'download',
-                archive_url=_get_image_storage_server() + image)
-            response = urllib2.urlopen(call)
-            return response.read() == 'Success'
-        except urllib2.HTTPError as e:
-            if e.code == httplib.INTERNAL_SERVER_ERROR:
-                return False
-            else:
-                raise
+        call = self._build_call(
+            'wait_for_status',
+            archive_url=_get_image_storage_server() + image)
+        response = urllib2.urlopen(call)
+        return response.read() == 'Success'
 
 
+    @remote_devserver_call(None)
     def list_control_files(self, build):
         """Ask the dev server to list all control files for |build|.
 
@@ -89,17 +142,12 @@
                 (e.g. server/site_tests/autoupdate/control)
         @throws urllib2.HTTPError upon any return code that's not 200 or 500.
         """
-        try:
-            call = self._build_call('controlfiles', build=build)
-            response = urllib2.urlopen(call)
-            return [line.rstrip() for line in response]
-        except urllib2.HTTPError as e:
-            if e.code == httplib.INTERNAL_SERVER_ERROR:
-                return None
-            else:
-                raise
+        call = self._build_call('controlfiles', build=build)
+        response = urllib2.urlopen(call)
+        return [line.rstrip() for line in response]
 
 
+    @remote_devserver_call(None)
     def get_control_file(self, build, control_path):
         """Ask the dev server for the contents of a control file.
 
@@ -113,16 +161,12 @@
         @return The contents of the desired file, or None
         @throws urllib2.HTTPError upon any return code that's not 200 or 500.
         """
-        try:
-            call = self._build_call('controlfiles',
-                                    build=build, control_path=control_path)
-            return urllib2.urlopen(call).read()
-        except urllib2.HTTPError as e:
-            if e.code == httplib.INTERNAL_SERVER_ERROR:
-                return None
-            else:
-                raise
+        call = self._build_call('controlfiles',
+                                build=build, control_path=control_path)
+        return urllib2.urlopen(call).read()
 
+
+    @remote_devserver_call(None)
     def get_latest_build(self, target, milestone=''):
         """Ask the dev server for the latest build for a given target.
 
@@ -141,12 +185,6 @@
                 or None.
         @throws urllib2.HTTPError upon any return code that's not 200 or 500.
         """
-        try:
-            call = self._build_call('latestbuild', target=target,
-                                    milestone=milestone)
-            return urllib2.urlopen(call).read()
-        except urllib2.HTTPError as e:
-            if e.code == httplib.INTERNAL_SERVER_ERROR:
-                return None
-            else:
-                raise
+        call = self._build_call('latestbuild', target=target,
+                                milestone=milestone)
+        return urllib2.urlopen(call).read()
diff --git a/client/common_lib/cros/dev_server_unittest.py b/client/common_lib/cros/dev_server_unittest.py
index 4a58245..d4aee17 100644
--- a/client/common_lib/cros/dev_server_unittest.py
+++ b/client/common_lib/cros/dev_server_unittest.py
@@ -49,15 +49,36 @@
         urllib2.urlopen(mox.IgnoreArg()).AndRaise(e403)
 
 
-    def testSuccessfulTriggerDownload(self):
-        """Should successfully call the dev server's download method."""
+    def testSuccessfulTriggerDownloadSync(self):
+        """Call the dev server's download method with synchronous=True."""
         name = 'fake/image'
         self.mox.StubOutWithMock(urllib2, 'urlopen')
+        self.mox.StubOutWithMock(dev_server.DevServer, 'finish_download')
         to_return = StringIO.StringIO('Success')
         urllib2.urlopen(mox.And(mox.StrContains(self._HOST),
                                 mox.StrContains(name))).AndReturn(to_return)
+        self.dev_server.finish_download(name).AndReturn(True)
+
+        # Synchronous case requires a call to finish download.
         self.mox.ReplayAll()
-        self.assertTrue(self.dev_server.trigger_download(name))
+        self.assertTrue(self.dev_server.trigger_download(name,
+                                                         synchronous=True))
+        self.mox.VerifyAll()
+
+
+    def testSuccessfulTriggerDownloadASync(self):
+        """Call the dev server's download method with synchronous=False."""
+        name = 'fake/image'
+        self.mox.StubOutWithMock(urllib2, 'urlopen')
+        self.mox.StubOutWithMock(dev_server.DevServer, 'finish_download')
+        to_return = StringIO.StringIO('Success')
+        urllib2.urlopen(mox.And(mox.StrContains(self._HOST),
+                                mox.StrContains(name))).AndReturn(to_return)
+
+        self.mox.ReplayAll()
+        self.assertTrue(self.dev_server.trigger_download(name,
+                                                         synchronous=False))
+        self.mox.VerifyAll()
 
 
     def testFailedTriggerDownload(self):
@@ -76,6 +97,28 @@
                           '')
 
 
+    def testSuccessfulFinishDownload(self):
+        """Should successfully call the dev server's finish download method."""
+        name = 'fake/image'
+        self.mox.StubOutWithMock(urllib2, 'urlopen')
+        to_return = StringIO.StringIO('Success')
+        urllib2.urlopen(mox.And(mox.StrContains(self._HOST),
+                                mox.StrContains(name))).AndReturn(to_return)
+
+        # Synchronous case requires a call to finish download.
+        self.mox.ReplayAll()
+        self.assertTrue(self.dev_server.finish_download(name))
+        self.mox.VerifyAll()
+
+
+    def testFailedTriggerDownload(self):
+        """Should call the dev server's finish download method, fail gracefully.
+        """
+        self._returnHttpServerError()
+        self.mox.ReplayAll()
+        self.assertFalse(self.dev_server.finish_download(''))
+
+
     def testListControlFiles(self):
         """Should successfully list control files from the dev server."""
         name = 'fake/build'
diff --git a/frontend/afe/site_rpc_interface.py b/frontend/afe/site_rpc_interface.py
index 3fb8937..c727319 100644
--- a/frontend/afe/site_rpc_interface.py
+++ b/frontend/afe/site_rpc_interface.py
@@ -63,9 +63,11 @@
     """
     # All suite names are assumed under test_suites/control.XX.
     suite_name = canonicalize_suite_name(suite_name)
-    # Ensure |build| is staged is on the dev server.
+    # Ensure components of |build| necessary for installing images are staged
+    # on the dev server. However set synchronous to False to allow other
+    # components to be downloaded in the background.
     ds = dev_server.DevServer.create()
-    if not ds.trigger_download(build):
+    if not ds.trigger_download(build, synchronous=False):
         raise StageBuildFailure("Server error while staging " + build)
 
     getter = control_file_getter.DevServerGetter.create(build, ds)
diff --git a/frontend/afe/site_rpc_interface_unittest.py b/frontend/afe/site_rpc_interface_unittest.py
index 686d636..c691e28 100644
--- a/frontend/afe/site_rpc_interface_unittest.py
+++ b/frontend/afe/site_rpc_interface_unittest.py
@@ -67,7 +67,8 @@
 
     def testStageBuildFail(self):
         """Ensure that a failure to stage the desired build fails the RPC."""
-        self.dev_server.trigger_download(self._BUILD).AndReturn(False)
+        self.dev_server.trigger_download(self._BUILD,
+                                         synchronous=False).AndReturn(False)
         self.mox.ReplayAll()
         self.assertRaises(site_rpc_interface.StageBuildFailure,
                           site_rpc_interface.create_suite_job,
@@ -80,7 +81,8 @@
     def testGetControlFileFail(self):
         """Ensure that a failure to get needed control file fails the RPC."""
         self._mockDevServerGetter()
-        self.dev_server.trigger_download(self._BUILD).AndReturn(True)
+        self.dev_server.trigger_download(self._BUILD,
+                                         synchronous=False).AndReturn(True)
         self.getter.get_control_file_contents_by_name(
             self._SUITE_NAME).AndReturn(None)
         self.mox.ReplayAll()
@@ -95,7 +97,8 @@
     def testGetControlFileListFail(self):
         """Ensure that a failure to get needed control file fails the RPC."""
         self._mockDevServerGetter()
-        self.dev_server.trigger_download(self._BUILD).AndReturn(True)
+        self.dev_server.trigger_download(self._BUILD,
+                                         synchronous=False).AndReturn(True)
         self.getter.get_control_file_contents_by_name(
             self._SUITE_NAME).AndRaise(control_file_getter.NoControlFileList())
         self.mox.ReplayAll()
@@ -110,7 +113,8 @@
     def testCreateSuiteJobFail(self):
         """Ensure that failure to schedule the suite job fails the RPC."""
         self._mockDevServerGetter()
-        self.dev_server.trigger_download(self._BUILD).AndReturn(True)
+        self.dev_server.trigger_download(self._BUILD,
+                                         synchronous=False).AndReturn(True)
         self.getter.get_control_file_contents_by_name(
             self._SUITE_NAME).AndReturn('f')
         self._mockRpcUtils(-1)
@@ -119,13 +123,14 @@
                                                               self._BOARD,
                                                               self._BUILD,
                                                               None),
-                          -1)
+                          - 1)
 
 
     def testCreateSuiteJobSuccess(self):
         """Ensures that success results in a successful RPC."""
         self._mockDevServerGetter()
-        self.dev_server.trigger_download(self._BUILD).AndReturn(True)
+        self.dev_server.trigger_download(self._BUILD,
+                                         synchronous=False).AndReturn(True)
         self.getter.get_control_file_contents_by_name(
             self._SUITE_NAME).AndReturn('f')
         job_id = 5
@@ -141,7 +146,8 @@
     def testCreateSuiteJobNoHostCheckSuccess(self):
         """Ensures that success results in a successful RPC."""
         self._mockDevServerGetter()
-        self.dev_server.trigger_download(self._BUILD).AndReturn(True)
+        self.dev_server.trigger_download(self._BUILD,
+                                         synchronous=False).AndReturn(True)
         self.getter.get_control_file_contents_by_name(
             self._SUITE_NAME).AndReturn('f')
         job_id = 5
diff --git a/server/cros/dynamic_suite.py b/server/cros/dynamic_suite.py
index 5d114b3..a1b01fe 100644
--- a/server/cros/dynamic_suite.py
+++ b/server/cros/dynamic_suite.py
@@ -15,6 +15,12 @@
 CONFIG = global_config.global_config
 
 
+class AsynchronousBuildFailure(Exception):
+    """Raised when the dev server throws 500 while finishing staging of a build.
+    """
+    pass
+
+
 class SuiteArgumentException(Exception):
     """Raised when improper arguments are used to run a suite."""
     pass
@@ -51,6 +57,8 @@
                          Default: False
     @param add_experimental: schedule experimental tests as well, or not.
                              Default: True
+    @raises AsynchronousBuildFailure: if there was an issue finishing staging
+                                      from the devserver.
     """
     (build, board, name, job, pool, num, check_hosts, skip_reimage,
      add_experimental) = _vet_reimage_and_run_args(**dargs)
@@ -61,6 +69,13 @@
 
     if skip_reimage or reimager.attempt(build, board, job.record, check_hosts,
                                         num=num):
+
+        # Ensure that the image's artifacts have completed downloading.
+        ds = dev_server.DevServer.create()
+        if not ds.finish_download(build):
+            raise AsynchronousBuildFailure(
+                "Server error completing staging for " + build)
+
         suite = Suite.create_from_name(name, build, pool=pool,
                                        results_dir=job.resultdir)
         suite.run_and_wait(job.record, add_experimental=add_experimental)