When a delayed call task finishes waiting for extra hosts to enter
Pending state on an atomic group job, re-confirm that the job still has
enough Pending hosts to run.  It could have been Aborted either
manually or due to a timeout meaning it should no longer be run.

Signed-off-by: Gregory Smith <gps@google.com>


git-svn-id: http://test.kernel.org/svn/autotest/trunk@3820 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py
index 6cd7015..ca71582 100755
--- a/scheduler/monitor_db.py
+++ b/scheduler/monitor_db.py
@@ -1085,7 +1085,8 @@
 
 
     def _schedule_delay_tasks(self):
-        for entry in HostQueueEntry.fetch(where="status = 'Waiting'"):
+        for entry in HostQueueEntry.fetch(where='status = "%s"' %
+                                          models.HostQueueEntry.Status.WAITING):
             task = entry.job.schedule_delayed_callback_task(entry)
             if task:
                 self.add_agent(Agent(task, num_processes=0))
@@ -2905,6 +2906,7 @@
                     host=models.Host(id=self.host.id))
 
         self.set_status(Status.ABORTED)
+        self.job.abort_delay_ready_task()
 
 
     def get_group_name(self):
@@ -3019,6 +3021,17 @@
         return pending_entries.count()
 
 
+    def _pending_threshold(self, atomic_group):
+        """
+        @param atomic_group: The AtomicGroup associated with this job that we
+                are using to bound the threshold.
+        @returns The minimum number of HostQueueEntries assigned a Host before
+                this job can run.
+        """
+        return min(self._hosts_assigned_count(),
+                   atomic_group.max_number_of_machines)
+
+
     def is_ready(self):
         # NOTE: Atomic group jobs stop reporting ready after they have been
         # started to avoid launching multiple copies of one atomic job.
@@ -3027,7 +3040,7 @@
         pending_count = self._pending_count()
         atomic_and_has_started = self._atomic_and_has_started()
         ready = (pending_count >= self.synch_count
-                 and not self._atomic_and_has_started())
+                 and not atomic_and_has_started)
 
         if not ready:
             logging.info(
@@ -3275,9 +3288,8 @@
         assert queue_entry.job_id == self.id
         assert queue_entry.atomic_group
         delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
-        pending_threshold = min(self._hosts_assigned_count(),
-                                queue_entry.atomic_group.max_number_of_machines)
-        over_max_threshold = (self._pending_count() >= pending_threshold)
+        over_max_threshold = (self._pending_count() >=
+                              self._pending_threshold(queue_entry.atomic_group))
         delay_expired = (self._delay_ready_task and
                          time.time() >= self._delay_ready_task.end_time)
 
@@ -3297,7 +3309,14 @@
         delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
 
         def run_job_after_delay():
-            logging.info('Job %s done waiting for extra hosts.', self.id)
+            logging.info('Job %s done waiting for extra hosts.', self)
+            # Check to see if the job is still relevant.  It could have aborted
+            # while we were waiting or hosts could have disappearred, etc.
+            threshold = self._pending_threshold(queue_entry.atomic_group)
+            if self._pending_count() < threshold:
+                logging.info('Job %s had too few Pending hosts after waiting '
+                             'for extras.  Not running.', self)
+                return
             return self.run(queue_entry)
 
         logging.info('Job %s waiting up to %s seconds for more hosts.',
@@ -3323,11 +3342,17 @@
     def _finish_run(self, queue_entries):
         for queue_entry in queue_entries:
             queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
+        self.abort_delay_ready_task()
+
+
+    def abort_delay_ready_task(self):
+        """Abort the delayed task associated with this job, if any."""
         if self._delay_ready_task:
             # Cancel any pending callback that would try to run again
             # as we are already running.
             self._delay_ready_task.abort()
 
+
     def __str__(self):
         return '%s-%s' % (self.id, self.owner)