When a delayed call task finishes waiting for extra hosts to enter
Pending state on an atomic group job, re-confirm that the job still has
enough Pending hosts to run. It could have been Aborted either
manually or due to a timeout meaning it should no longer be run.
Signed-off-by: Gregory Smith <gps@google.com>
git-svn-id: http://test.kernel.org/svn/autotest/trunk@3820 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py
index 6cd7015..ca71582 100755
--- a/scheduler/monitor_db.py
+++ b/scheduler/monitor_db.py
@@ -1085,7 +1085,8 @@
def _schedule_delay_tasks(self):
- for entry in HostQueueEntry.fetch(where="status = 'Waiting'"):
+ for entry in HostQueueEntry.fetch(where='status = "%s"' %
+ models.HostQueueEntry.Status.WAITING):
task = entry.job.schedule_delayed_callback_task(entry)
if task:
self.add_agent(Agent(task, num_processes=0))
@@ -2905,6 +2906,7 @@
host=models.Host(id=self.host.id))
self.set_status(Status.ABORTED)
+ self.job.abort_delay_ready_task()
def get_group_name(self):
@@ -3019,6 +3021,17 @@
return pending_entries.count()
+ def _pending_threshold(self, atomic_group):
+ """
+ @param atomic_group: The AtomicGroup associated with this job that we
+ are using to bound the threshold.
+ @returns The minimum number of HostQueueEntries assigned a Host before
+ this job can run.
+ """
+ return min(self._hosts_assigned_count(),
+ atomic_group.max_number_of_machines)
+
+
def is_ready(self):
# NOTE: Atomic group jobs stop reporting ready after they have been
# started to avoid launching multiple copies of one atomic job.
@@ -3027,7 +3040,7 @@
pending_count = self._pending_count()
atomic_and_has_started = self._atomic_and_has_started()
ready = (pending_count >= self.synch_count
- and not self._atomic_and_has_started())
+ and not atomic_and_has_started)
if not ready:
logging.info(
@@ -3275,9 +3288,8 @@
assert queue_entry.job_id == self.id
assert queue_entry.atomic_group
delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
- pending_threshold = min(self._hosts_assigned_count(),
- queue_entry.atomic_group.max_number_of_machines)
- over_max_threshold = (self._pending_count() >= pending_threshold)
+ over_max_threshold = (self._pending_count() >=
+ self._pending_threshold(queue_entry.atomic_group))
delay_expired = (self._delay_ready_task and
time.time() >= self._delay_ready_task.end_time)
@@ -3297,7 +3309,14 @@
delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
def run_job_after_delay():
- logging.info('Job %s done waiting for extra hosts.', self.id)
+ logging.info('Job %s done waiting for extra hosts.', self)
+ # Check to see if the job is still relevant. It could have aborted
+ # while we were waiting or hosts could have disappearred, etc.
+ threshold = self._pending_threshold(queue_entry.atomic_group)
+ if self._pending_count() < threshold:
+ logging.info('Job %s had too few Pending hosts after waiting '
+ 'for extras. Not running.', self)
+ return
return self.run(queue_entry)
logging.info('Job %s waiting up to %s seconds for more hosts.',
@@ -3323,11 +3342,17 @@
def _finish_run(self, queue_entries):
for queue_entry in queue_entries:
queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
+ self.abort_delay_ready_task()
+
+
+ def abort_delay_ready_task(self):
+ """Abort the delayed task associated with this job, if any."""
if self._delay_ready_task:
# Cancel any pending callback that would try to run again
# as we are already running.
self._delay_ready_task.abort()
+
def __str__(self):
return '%s-%s' % (self.id, self.owner)