Autotest: Add timeout logic for Try-Jobs.

Changed the reimager/job_status logic to monitor the Try-Job it's
waiting on and if a certain amount of time passes, it will time it
out and abort it.

Also adapted the suite job automatic abort on aborted Try-Job logic,
to have it decide whether or not to continue depending on what
percentage of Host Queue Entries (i.e. number of devices being imaged)
was successful. This is done through a new public method in job_status.

By default, the timeout time (4 hours) is controlled by a value in the
global config but can be passed in by the suite control file.

BUG=chromium-os:36175
TEST=Existing unit tests and ensuring that if not enough jobs are
     completed, it aborts and if enough are done it continues.

Change-Id: I3ebd1653ba2b196e027e72768b33e02d87ec9673
Reviewed-on: https://gerrit.chromium.org/gerrit/39125
Commit-Ready: Simran Basi <sbasi@chromium.org>
Reviewed-by: Simran Basi <sbasi@chromium.org>
Tested-by: Simran Basi <sbasi@chromium.org>
diff --git a/global_config.ini b/global_config.ini
index bafb403..95bb445 100644
--- a/global_config.ini
+++ b/global_config.ini
@@ -94,6 +94,8 @@
 copy_parse_log_back: False
 tick_debug: True
 extra_debugging: False
+try_job_timeout_mins: 240
+hqe_maximum_abort_rate_float: .5
 
 
 [HOSTS]
diff --git a/server/cros/dynamic_suite/dynamic_suite.py b/server/cros/dynamic_suite/dynamic_suite.py
index 2edc031..4c918cd 100644
--- a/server/cros/dynamic_suite/dynamic_suite.py
+++ b/server/cros/dynamic_suite/dynamic_suite.py
@@ -269,6 +269,9 @@
 """
 
 
+DEFAULT_TRY_JOB_TIMEOUT_MINS = global_config.global_config.get_config_value(
+            'SCHEDULER', 'try_job_timeout_mins', type=int, default=4*60)
+
 # Relevant CrosDynamicSuiteExceptions are defined in client/common_lib/error.py.
 
 class SuiteSpec(object):
@@ -300,7 +303,8 @@
     def __init__(self, build=None, board=None, name=None, job=None,
                  pool=None, num=None, check_hosts=True,
                  skip_reimage=False, add_experimental=True, file_bugs=False,
-                 max_runtime_mins=24*60, **dargs):
+                 max_runtime_mins=24*60,
+                 try_job_timeout_mins=DEFAULT_TRY_JOB_TIMEOUT_MINS, **dargs):
         """
         Vets arguments for reimage_and_run() and populates self with supplied
         values.
@@ -323,6 +327,10 @@
                              Default: False
         @param add_experimental: schedule experimental tests as well, or not.
                                  Default: True
+        @param max_runtime_mins: Max runtime in mins for each of the sub-jobs
+                                 this suite will run.
+        @param try_job_timeout_mins: Max time in mins we allow a try job to run
+                                     before timing out.
         @param **dargs: these arguments will be ignored.  This allows us to
                         deprecate and remove arguments in ToT while not
                         breaking branch builds.
@@ -352,6 +360,7 @@
         self.file_bugs = file_bugs
         self.dependencies = {'': []}
         self.max_runtime_mins = max_runtime_mins
+        self.try_job_timeout_mins = try_job_timeout_mins
 
 
 def skip_reimage(g):
@@ -433,13 +442,9 @@
     with host_lock_manager.HostsLockedBy(manager):
         tests_to_skip = []
         if spec.skip_reimage or reimager.attempt(spec.build, spec.board,
-                                                 spec.pool, spec.devserver,
-                                                 spec.job.record_entry,
-                                                 spec.check_hosts,
-                                                 manager,
-                                                 tests_to_skip,
-                                                 spec.dependencies,
-                                                 num=spec.num):
+                spec.pool, spec.devserver, spec.job.record_entry,
+                spec.check_hosts, manager, tests_to_skip, spec.dependencies,
+                num=spec.num, timeout_mins=spec.try_job_timeout_mins):
             # Ensure that the image's artifacts have completed downloading.
             try:
                 spec.devserver.finish_download(spec.build)
diff --git a/server/cros/dynamic_suite/job_status.py b/server/cros/dynamic_suite/job_status.py
index 828e9e6..43e4130 100644
--- a/server/cros/dynamic_suite/job_status.py
+++ b/server/cros/dynamic_suite/job_status.py
@@ -3,13 +3,19 @@
 # found in the LICENSE file.
 
 import datetime, logging, time
-from autotest_lib.client.common_lib import base_job, log
+
+
+from autotest_lib.client.common_lib import base_job, error, global_config, log
 from autotest_lib.client.common_lib.host_queue_entry_states \
     import IntStatus as HqeIntStatus
 
 TIME_FMT = '%Y-%m-%d %H:%M:%S'
 DEFAULT_POLL_INTERVAL_SECONDS = 10
 
+HQE_MAXIMUM_ABORT_RATE_FLOAT = global_config.global_config.get_config_value(
+            'SCHEDULER', 'hqe_maximum_abort_rate_float', type=float,
+            default=0.5)
+
 
 def view_is_relevant(view):
     """
@@ -78,15 +84,79 @@
     return hosts
 
 
-def wait_for_jobs_to_start(afe, jobs, interval=DEFAULT_POLL_INTERVAL_SECONDS):
+def check_job_abort_status(afe, jobs):
+    """
+    Checks the abort status of all the jobs in jobs and if any have too many
+    aborted HostQueueEntries, return True.
+
+    In the case that any job in jobs has too many aborted host queue entries,
+    it will raise an exception.
+
+    @param afe: an instance of AFE as defined in server/frontend.py.
+    @param jobs: an iterable of Running frontend.Jobs
+
+    @returns True if a job in job has too many host queue entries aborted.
+             False otherwise.
+    """
+    for job in jobs:
+        entries = afe.run('get_host_queue_entries', job=job.id)
+        num_aborted = 0
+        for hqe in entries:
+            if hqe['aborted']:
+                num_aborted = num_aborted + 1
+        if num_aborted > len(entries) * HQE_MAXIMUM_ABORT_RATE_FLOAT:
+            # This job was not successful, returning True.
+            logging.error('Too many host queue entries were aborted for job: '
+                          '%s.', job.id)
+            return True
+    return False
+
+
+def _abort_jobs_if_timedout(afe, jobs, start_time, timeout_mins):
+    """
+    Abort all of the jobs in jobs if the running time has past the timeout.
+
+    @param afe: an instance of AFE as defined in server/frontend.py.
+    @param jobs: an iterable of Running frontend.Jobs
+    @param start_time: Time to compare to the current time to see if a timeout
+                       has occurred.
+    @param timeout_mins: Time in minutes to wait before aborting the jobs we
+                         are waiting on.
+
+    @returns True if we there was a timeout, False if not.
+    """
+    if datetime.datetime.utcnow() < (start_time +
+                                     datetime.timedelta(minutes=timeout_mins)):
+        return False
+    for job in jobs:
+        logging.debug('Job: %s has timed out after %s minutes. Aborting job.',
+                      job.id, timeout_mins)
+        afe.run('abort_host_queue_entries', job=job.id)
+    return True
+
+
+def wait_for_jobs_to_start(afe, jobs, interval=DEFAULT_POLL_INTERVAL_SECONDS,
+                           start_time=None, wait_timeout_mins=None):
     """
     Wait for the job specified by |job.id| to start.
 
     @param afe: an instance of AFE as defined in server/frontend.py.
     @param jobs: the jobs to poll on.
+    @param start_time: Time to compare to the current time to see if a timeout
+                       has occurred.
+    @param wait_timeout_mins: Time in minutes to wait before aborting the jobs
+                               we are waiting on.
+
+    @returns True if the jobs have started, False if they get aborted.
     """
+    if not start_time:
+        start_time = datetime.datetime.utcnow()
     job_ids = [j.id for j in jobs]
     while job_ids:
+        if wait_timeout_mins and _abort_jobs_if_timedout(afe, jobs, start_time,
+                    wait_timeout_mins):
+            # The timeout parameter is not None and we have indeed timed out.
+            return False
         for job_id in list(job_ids):
             if len(afe.get_jobs(id=job_id, not_yet_run=True)) > 0:
                 continue
@@ -95,6 +165,7 @@
         if job_ids:
             logging.debug('Waiting %ds before polling again.', interval)
             time.sleep(interval)
+    return True
 
 
 def wait_for_jobs_to_finish(afe, jobs, interval=DEFAULT_POLL_INTERVAL_SECONDS):
@@ -115,24 +186,9 @@
             time.sleep(interval)
 
 
-def _check_jobs_aborted(afe, jobs):
-    """
-    Check through the AFE if all of the jobs in jobs have been aborted.
-
-    @param jobs: an iterable of Running frontend.Jobs
-
-    @returns True if all of jobs have been aborted, False if any are running.
-    """
-    for job in jobs:
-        entries = afe.run('get_host_queue_entries', job=job.id)
-        if not reduce(_collate_aborted, entries, False):
-            # One of the jobs we are polling has not aborted.
-            return False
-    return True
-
-
 def wait_for_and_lock_job_hosts(afe, jobs, manager,
-                                interval=DEFAULT_POLL_INTERVAL_SECONDS):
+                                interval=DEFAULT_POLL_INTERVAL_SECONDS,
+                                start_time=None, wait_timeout_mins=None):
     """
     Poll until devices have begun reimaging, locking them as we go.
 
@@ -145,6 +201,11 @@
     @param jobs: an iterable of Running frontend.Jobs
     @param manager: a HostLockManager instance.  Hosts will be added to it
                     as they start Running, and it will be used to lock them.
+    @param start_time: Time to compare to the current time to see if a timeout
+                       has occurred.
+    @param wait_timeout_mins: Time in minutes to wait before aborting the jobs
+                              we are waiting on.
+
     @return iterable of the hosts that were locked or None if all the jobs in
             jobs have been aborted.
     """
@@ -154,15 +215,17 @@
             all_hosts.extend(gather_job_hostnames(afe, job))
         return all_hosts
 
+    if not start_time:
+        start_time = datetime.datetime.utcnow()
     locked_hosts = set()
     expected_hosts = set(get_all_hosts(jobs))
     logging.debug('Initial expected hosts: %r', expected_hosts)
 
     while locked_hosts != expected_hosts:
-        if _check_jobs_aborted(afe, jobs):
-            logging.error('All the jobs we are waiting for hosts from have'
-                          ' aborted. Returning None.')
-            return None
+        if wait_timeout_mins and _abort_jobs_if_timedout(afe, jobs, start_time,
+                                                         wait_timeout_mins):
+            # The timeout parameter is not None and we have timed out.
+            return locked_hosts
         hosts_to_check = [e for e in expected_hosts if e]
         if hosts_to_check:
             logging.debug('Checking to see if %r are Running.', hosts_to_check)
diff --git a/server/cros/dynamic_suite/job_status_unittest.py b/server/cros/dynamic_suite/job_status_unittest.py
index c47a75c..f18db19 100644
--- a/server/cros/dynamic_suite/job_status_unittest.py
+++ b/server/cros/dynamic_suite/job_status_unittest.py
@@ -20,6 +20,9 @@
 from autotest_lib.server import frontend
 
 
+DEFAULT_WAITTIMEOUT_MINS = 60 * 4
+
+
 class StatusTest(mox.MoxTestBase):
     """Unit tests for job_status.Status.
     """
@@ -190,10 +193,7 @@
         """Ensure we lock all running hosts as they're discovered."""
         self.mox.StubOutWithMock(time, 'sleep')
         self.mox.StubOutWithMock(job_status, 'gather_job_hostnames')
-        self.mox.StubOutWithMock(job_status, '_check_jobs_aborted')
 
-        job_status._check_jobs_aborted(mox.IgnoreArg(),
-                mox.IgnoreArg()).MultipleTimes().AndReturn(False)
         manager = self.mox.CreateMock(host_lock_manager.HostLockManager)
         expected_hostnames=['host1', 'host0']
         expected_hosts = [FakeHost(h) for h in expected_hostnames]
@@ -228,33 +228,100 @@
                                                           manager)))
 
 
-    def testWaitForAndLockWithAbortedSubJobs(self):
+    def testWaitForAndLockWithTimeOutInStartJobs(self):
+        """If we experience a timeout, no locked hosts are returned"""
         self.mox.StubOutWithMock(job_status, 'gather_job_hostnames')
-        self.mox.StubOutWithMock(job_status, '_check_jobs_aborted')
+        self.mox.StubOutWithMock(job_status, '_abort_jobs_if_timedout')
 
+        job_status._abort_jobs_if_timedout(mox.IgnoreArg(), mox.IgnoreArg(),
+                mox.IgnoreArg(), mox.IgnoreArg()).AndReturn(True)
         manager = self.mox.CreateMock(host_lock_manager.HostLockManager)
         expected_hostnames=['host1', 'host0']
         expected_hosts = [FakeHost(h) for h in expected_hostnames]
         job = FakeJob(7, hostnames=[None, None])
         job_status.gather_job_hostnames(mox.IgnoreArg(),
                                         job).AndReturn(expected_hostnames)
-        job_status._check_jobs_aborted(mox.IgnoreArg(),
-                                       mox.IgnoreArg()).AndReturn(True)
         self.mox.ReplayAll()
-        self.assertEquals(None,
-                          job_status.wait_for_and_lock_job_hosts(self.afe,
-                                                                 [job],
-                                                                 manager))
+        self.assertFalse(job_status.wait_for_and_lock_job_hosts(self.afe,
+                [job], manager, wait_timeout_mins=DEFAULT_WAITTIMEOUT_MINS))
+
+
+    def testWaitForAndLockWithTimedOutSubJobs(self):
+        """If we experience a timeout, no locked hosts are returned"""
+        self.mox.StubOutWithMock(job_status, 'gather_job_hostnames')
+        self.mox.StubOutWithMock(job_status, '_abort_jobs_if_timedout')
+
+        job_status._abort_jobs_if_timedout(mox.IgnoreArg(), mox.IgnoreArg(),
+                mox.IgnoreArg(), mox.IgnoreArg()).AndReturn(True)
+        manager = self.mox.CreateMock(host_lock_manager.HostLockManager)
+        expected_hostnames=['host1', 'host0']
+        expected_hosts = [FakeHost(h) for h in expected_hostnames]
+        job = FakeJob(7, hostnames=[None, None])
+        job_status.gather_job_hostnames(mox.IgnoreArg(),
+                                        job).AndReturn(expected_hostnames)
+        self.mox.ReplayAll()
+        self.assertEquals(set(),
+                job_status.wait_for_and_lock_job_hosts(self.afe, [job],
+                manager, wait_timeout_mins=DEFAULT_WAITTIMEOUT_MINS))
+
+
+    def testWaitForSingleJobHostsWithTimeout(self):
+        """Discover a single host for this job then timeout."""
+        self.mox.StubOutWithMock(time, 'sleep')
+        self.mox.StubOutWithMock(job_status, 'gather_job_hostnames')
+        self.mox.StubOutWithMock(job_status, '_abort_jobs_if_timedout')
+
+        manager = self.mox.CreateMock(host_lock_manager.HostLockManager)
+        expected_hostnames=['host1', 'host0']
+        expected_hosts = [FakeHost(h) for h in expected_hostnames]
+        job = FakeJob(7, hostnames=[None, None])
+
+        time.sleep(mox.IgnoreArg()).MultipleTimes()
+        job_status._abort_jobs_if_timedout(mox.IgnoreArg(), mox.IgnoreArg(),
+                mox.IgnoreArg(), mox.IgnoreArg()).AndReturn(False)
+        self.expect_hosts_query_and_lock([job], manager, [], False)
+
+        # First, only one test in the job has had a host assigned at all.
+        # Since no hosts are running, expect no locking.
+        job_status._abort_jobs_if_timedout(mox.IgnoreArg(), mox.IgnoreArg(),
+                mox.IgnoreArg(), mox.IgnoreArg()).AndReturn(False)
+        job.hostnames = [None] + expected_hostnames[1:]
+        self.expect_hosts_query_and_lock([job], manager, [], False)
+
+        # Then, that host starts running, but no other tests have hosts.
+        job_status._abort_jobs_if_timedout(mox.IgnoreArg(), mox.IgnoreArg(),
+                mox.IgnoreArg(), mox.IgnoreArg()).AndReturn(False)
+        self.expect_hosts_query_and_lock([job], manager, expected_hosts[1:])
+
+        # The second test gets a host assigned, but it's not yet running.
+        # Since no new running hosts are found, no locking should happen.
+        job_status._abort_jobs_if_timedout(mox.IgnoreArg(), mox.IgnoreArg(),
+                mox.IgnoreArg(), mox.IgnoreArg()).AndReturn(False)
+        job.hostnames = expected_hostnames
+        self.expect_hosts_query_and_lock([job], manager, expected_hosts[1:],
+                                         False)
+
+        # A timeout occurs, and only the locked hosts should be returned.
+        job_status._abort_jobs_if_timedout(mox.IgnoreArg(), mox.IgnoreArg(),
+                mox.IgnoreArg(), mox.IgnoreArg()).AndReturn(True)
+
+        # The last loop update; doesn't impact behavior.
+        job_status.gather_job_hostnames(mox.IgnoreArg(),
+                                        job).AndReturn(expected_hostnames)
+        self.mox.ReplayAll()
+
+        # Because of the timeout only one host is returned.
+        expect_timeout_hostnames = ['host0']
+        self.assertEquals(sorted(expect_timeout_hostnames),sorted(
+                job_status.wait_for_and_lock_job_hosts(self.afe,
+                [job],manager, wait_timeout_mins=DEFAULT_WAITTIMEOUT_MINS)))
 
 
     def testWaitForSingleJobHostsToRunAndGetLockedSerially(self):
         """Lock running hosts as discovered, serially."""
         self.mox.StubOutWithMock(time, 'sleep')
         self.mox.StubOutWithMock(job_status, 'gather_job_hostnames')
-        self.mox.StubOutWithMock(job_status, '_check_jobs_aborted')
 
-        job_status._check_jobs_aborted(mox.IgnoreArg(),
-                mox.IgnoreArg()).MultipleTimes().AndReturn(False)
         manager = self.mox.CreateMock(host_lock_manager.HostLockManager)
         expected_hostnames=['host1', 'host0']
         expected_hosts = [FakeHost(h) for h in expected_hostnames]
@@ -293,10 +360,7 @@
         """Ensure we lock all running hosts for all jobs as discovered."""
         self.mox.StubOutWithMock(time, 'sleep')
         self.mox.StubOutWithMock(job_status, 'gather_job_hostnames')
-        self.mox.StubOutWithMock(job_status, '_check_jobs_aborted')
 
-        job_status._check_jobs_aborted(mox.IgnoreArg(),
-                mox.IgnoreArg()).MultipleTimes().AndReturn(False)
         manager = self.mox.CreateMock(host_lock_manager.HostLockManager)
         expected_hostnames = ['host1', 'host0', 'host2']
         expected_hosts = [FakeHost(h) for h in expected_hostnames]
diff --git a/server/cros/dynamic_suite/reimager.py b/server/cros/dynamic_suite/reimager.py
index b4de8d9..3224ff1 100644
--- a/server/cros/dynamic_suite/reimager.py
+++ b/server/cros/dynamic_suite/reimager.py
@@ -23,6 +23,10 @@
 from autotest_lib.frontend.afe.json_rpc import proxy
 
 
+DEFAULT_TRY_JOB_TIMEOUT_MINS = global_config.global_config.get_config_value(
+            'SCHEDULER', 'try_job_timeout_mins', type=int, default=4*60)
+
+
 class Reimager(object):
     """
     A class that can run jobs to reimage devices.
@@ -64,7 +68,8 @@
 
 
     def attempt(self, build, board, pool, devserver, record, check_hosts,
-                manager, tests_to_skip, dependencies={'':[]}, num=None):
+                manager, tests_to_skip, dependencies={'':[]}, num=None,
+                timeout_mins=DEFAULT_TRY_JOB_TIMEOUT_MINS):
         """
         Synchronously attempt to reimage some machines.
 
@@ -112,6 +117,8 @@
                              with builds that have no dependency information.
 
         @param num: the maximum number of devices to reimage.
+        @param timeout_mins: Amount of time in mins to wait before timing out
+                             this reimage attempt.
         @return True if all reimaging jobs succeed, false if they all fail or
                 atleast one is aborted.
         """
@@ -145,12 +152,16 @@
             self._record_job_if_possible(Reimager.JOB_NAME, canary_job)
             logging.info('Created re-imaging job: %d', canary_job.id)
 
-            job_status.wait_for_jobs_to_start(self._afe, [canary_job])
+            start_time = datetime.datetime.utcnow()
+            if not job_status.wait_for_jobs_to_start(self._afe, [canary_job],
+                    start_time=start_time, wait_timeout_mins=timeout_mins):
+                raise error.ReimageAbortedException('Try job was aborted.')
             logging.debug('Re-imaging job running.')
 
             hosts = job_status.wait_for_and_lock_job_hosts(
-                self._afe, [canary_job], manager)
-            if not hosts:
+                    self._afe, [canary_job], manager, start_time=start_time,
+                    wait_timeout_mins=timeout_mins)
+            if job_status.check_job_abort_status(self._afe, [canary_job]):
                 raise error.ReimageAbortedException('Try job was aborted.')
             logging.info('%r locked for reimaging.', hosts)
 
diff --git a/server/cros/dynamic_suite/reimager_unittest.py b/server/cros/dynamic_suite/reimager_unittest.py
index aec1915..895ca40 100644
--- a/server/cros/dynamic_suite/reimager_unittest.py
+++ b/server/cros/dynamic_suite/reimager_unittest.py
@@ -270,7 +270,8 @@
 
 
     def expect_attempt(self, canary_job, statuses, ex=None, check_hosts=True,
-                       unsatisfiable_specs=[], doomed_specs=[]):
+                       unsatisfiable_specs=[], doomed_specs=[],
+                       tryjob_aborted=False):
         """Sets up |self.reimager| to expect an attempt().
 
         The return value of attempt() is dictated by the aggregate of the
@@ -297,7 +298,7 @@
         self.mox.StubOutWithMock(job_status, 'wait_for_jobs_to_finish')
         self.mox.StubOutWithMock(job_status, 'gather_per_host_results')
         self.mox.StubOutWithMock(job_status, 'check_and_record_reimage_results')
-        self.mox.StubOutWithMock(job_status, '_check_jobs_aborted')
+        self.mox.StubOutWithMock(job_status, 'check_job_abort_status')
 
         self.reimager._ensure_version_label(mox.StrContains(self._BUILD))
 
@@ -311,10 +312,18 @@
             host_group,
             self.devserver).AndReturn(canary_job)
 
-        job_status.wait_for_jobs_to_start(self.afe, [canary_job])
-        job_status.wait_for_and_lock_job_hosts(
-            self.afe, [canary_job], self.manager).AndReturn(statuses.keys())
+        job_status.wait_for_jobs_to_start(self.afe, [canary_job],
+            start_time=mox.IgnoreArg(),
+            wait_timeout_mins=mox.IgnoreArg()).AndReturn(True)
 
+        job_status.wait_for_and_lock_job_hosts(
+            self.afe, [canary_job], self.manager, start_time=mox.IgnoreArg(),
+            wait_timeout_mins=mox.IgnoreArg()).AndReturn(statuses.keys())
+
+        job_status.check_job_abort_status(mox.IgnoreArg(), mox.IgnoreArg()
+            ).AndReturn(tryjob_aborted)
+        if tryjob_aborted:
+            return
         if ex:
             job_status.wait_for_jobs_to_finish(self.afe,
                                                [canary_job]).AndRaise(ex)
@@ -350,6 +359,27 @@
         self.reimager.clear_reimaged_host_state(self._BUILD)
 
 
+    def testTryJobAborted(self):
+        """Should attempt a reimage that aborts and record ABORT"""
+        canary = FakeJob()
+        statuses = {canary.hostnames[0]:
+                    job_status.Status('GOOD', canary.hostnames[0])}
+        self.expect_attempt(canary, statuses, tryjob_aborted=True)
+
+        rjob = self.mox.CreateMock(base_job.base_job)
+        rjob.record_entry(StatusContains.CreateFromStrings('START'))
+        rjob.record_entry(StatusContains.CreateFromStrings('ABORT'))
+        rjob.record_entry(StatusContains.CreateFromStrings('END ABORT'))
+
+        self.mox.ReplayAll()
+        self.assertFalse(self.reimager.attempt(self._BUILD, self._BOARD,
+                                               self._POOL, self.devserver,
+                                               rjob.record_entry, True,
+                                               self.manager, [],
+                                               self._DEPENDENCIES))
+        self.reimager.clear_reimaged_host_state(self._BUILD)
+
+
     def testSuccessfulReimageByMetahost(self):
         """Should attempt a reimage by metahost and record success."""
         canary = FakeJob()