Have the scheduler wait a configurable amount of time before starting
atomic group jobs as soon as minimum synch count hosts are available
in Pending state up until AtomicGroup.max_number_of_hosts are available.
Adds a DelayedCallTask class to monitor_db along with logic in the Job
class to use this to delay the job becoming ready to run for a little
while as well as making sure the job is run at the end of the delay
without needing to wait for another host to change state.
Signed-off-by: Gregory Smith <gps@google.com>
git-svn-id: http://test.kernel.org/svn/autotest/trunk@3236 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py
index bee86e2..e7c6d1f 100755
--- a/scheduler/monitor_db.py
+++ b/scheduler/monitor_db.py
@@ -553,7 +553,7 @@
assert queue_entry.atomic_group_id is not None
job = queue_entry.job
assert job.synch_count and job.synch_count > 0
- atomic_group = AtomicGroup(id=queue_entry.atomic_group_id)
+ atomic_group = queue_entry.atomic_group
if job.synch_count > atomic_group.max_number_of_machines:
# Such a Job and HostQueueEntry should never be possible to
# create using the frontend. Regardless, we can't process it.
@@ -929,11 +929,8 @@
def _run_queue_entry(self, queue_entry, host):
- agent = queue_entry.run(assigned_host=host)
- # in some cases (synchronous jobs with run_verify=False), agent may be
- # None
- if agent:
- self.add_agent(agent)
+ agent = queue_entry.run_pre_job_tasks(assigned_host=host)
+ self.add_agent(agent)
def _find_aborting(self):
@@ -1187,9 +1184,40 @@
class Agent(object):
+ """
+ An agent for use by the Dispatcher class to perform a sequence of tasks.
+
+ The following methods are required on all task objects:
+ poll() - Called periodically to let the task check its status and
+ update its internal state. If the task succeeded.
+ is_done() - Returns True if the task is finished.
+ abort() - Called when an abort has been requested. The task must
+ set its aborted attribute to True if it actually aborted.
+
+ The following attributes are required on all task objects:
+ aborted - bool, True if this task was aborted.
+ failure_tasks - A sequence of tasks to be run using a new Agent
+ by the dispatcher should this task fail.
+ success - bool, True if this task succeeded.
+ queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
+ host_ids - A sequence of Host ids this task represents.
+
+ The following attribute is written to all task objects:
+ agent - A reference to the Agent instance that the task has been
+ added to.
+ """
+
+
def __init__(self, tasks, num_processes=1):
+ """
+ @param tasks: A list of tasks as described in the class docstring.
+ @param num_processes: The number of subprocesses the Agent represents.
+ This is used by the Dispatcher for managing the load on the
+ system. Defaults to 1.
+ """
self.active_task = None
self.queue = None
+ # This is filled in by Dispatcher.add_agent()
self.dispatcher = None
self.num_processes = num_processes
@@ -1264,6 +1292,65 @@
self.active_task = None
+class DelayedCallTask(object):
+ """
+ A task object like AgentTask for an Agent to run that waits for the
+ specified amount of time to have elapsed before calling the supplied
+ callback once and finishing. If the callback returns anything, it is
+ assumed to be a new Agent instance and will be added to the dispatcher.
+
+ @attribute end_time: The absolute posix time after which this task will
+ call its callback when it is polled and be finished.
+
+ Also has all attributes required by the Agent class.
+ """
+ def __init__(self, delay_seconds, callback, now_func=None):
+ """
+ @param delay_seconds: The delay in seconds from now that this task
+ will call the supplied callback and be done.
+ @param callback: A callable to be called by this task once after at
+ least delay_seconds time has elapsed. It must return None
+ or a new Agent instance.
+ @param now_func: A time.time like function. Default: time.time.
+ Used for testing.
+ """
+ assert delay_seconds > 0
+ assert callable(callback)
+ if not now_func:
+ now_func = time.time
+ self._now_func = now_func
+ self._callback = callback
+
+ self.end_time = self._now_func() + delay_seconds
+
+ # These attributes are required by Agent.
+ self.aborted = False
+ self.failure_tasks = ()
+ self.host_ids = ()
+ self.success = False
+ self.queue_entry_ids = ()
+ # This is filled in by Agent.add_task().
+ self.agent = None
+
+
+ def poll(self):
+ if self._callback and self._now_func() >= self.end_time:
+ new_agent = self._callback()
+ if new_agent:
+ self.agent.dispatcher.add_agent(new_agent)
+ self._callback = None
+ self.success = True
+
+
+ def is_done(self):
+ return not self._callback
+
+
+ def abort(self):
+ self.aborted = True
+ self._callback = None
+
+
class AgentTask(object):
def __init__(self, cmd, working_directory=None, failure_tasks=[],
pidfile_name=None, paired_with_pidfile=None):
@@ -2315,6 +2402,12 @@
else:
self.host = None
+ if self.atomic_group_id:
+ self.atomic_group = AtomicGroup(self.atomic_group_id,
+ always_query=False)
+ else:
+ self.atomic_group = None
+
self.queue_log_path = os.path.join(self.job.tag(),
'queue.log.' + str(self.id))
@@ -2474,8 +2567,8 @@
email_manager.manager.send_email(self.job.email_list, subject, body)
- def run(self, assigned_host=None):
- if self.meta_host is not None or self.atomic_group_id is not None:
+ def run_pre_job_tasks(self, assigned_host=None):
+ if self.meta_host is not None or self.atomic_group:
assert assigned_host
# ensure results dir exists for the queue log
self.set_host(assigned_host)
@@ -2484,7 +2577,17 @@
self.job.name, self.meta_host, self.atomic_group_id,
self.host.hostname, self.status)
- return self.job.run(queue_entry=self)
+ return self._do_run_pre_job_tasks()
+
+
+ def _do_run_pre_job_tasks(self):
+ # Every host goes thru the Verifying stage (which may or may not
+ # actually do anything as determined by get_pre_job_tasks).
+ self.set_status(models.HostQueueEntry.Status.VERIFYING)
+
+ # The pre job tasks always end with a SetEntryPendingTask which
+ # will continue as appropriate through queue_entry.on_pending().
+ return Agent(self.job.get_pre_job_tasks(queue_entry=self))
def requeue(self):
@@ -2543,10 +2646,7 @@
"""
self.set_status('Pending')
self.get_host().set_status('Pending')
- if self.job.is_ready():
- return self.job.run(self)
- self.job.stop_if_necessary()
- return None
+ return self.job.run_if_ready(queue_entry=self)
def abort(self, dispatcher):
@@ -2580,6 +2680,20 @@
'run_verify', 'email_list', 'reboot_before', 'reboot_after',
'parse_failed_repair', 'max_runtime_hrs')
+ # This does not need to be a column in the DB. The delays are likely to
+ # be configured short. If the scheduler is stopped and restarted in
+ # the middle of a job's delay cycle, the delay cycle will either be
+ # repeated or skipped depending on the number of Pending machines found
+ # when the restarted scheduler recovers to track it. Not a problem.
+ #
+ # A reference to the DelayedCallTask that will wake up the job should
+ # no other HQEs change state in time. Its end_time attribute is used
+ # by our run_with_ready_delay() method to determine if the wait is over.
+ _delay_ready_task = None
+
+ # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
+ # all status='Pending' atomic group HQEs incase a delay was running when the
+ # scheduler was restarted and no more hosts ever successfully exit Verify.
def __init__(self, id=None, row=None, **kwargs):
assert id or row
@@ -2614,10 +2728,38 @@
queue_entry.set_status(status)
+ def _atomic_and_has_started(self):
+ """
+ @returns True if any of the HostQueueEntries associated with this job
+ have entered the Status.STARTING state or beyond.
+ """
+ atomic_entries = models.HostQueueEntry.objects.filter(
+ job=self.id, atomic_group__isnull=False)
+ if atomic_entries.count() <= 0:
+ return False
+
+ non_started_statuses = (models.HostQueueEntry.Status.QUEUED,
+ models.HostQueueEntry.Status.VERIFYING,
+ models.HostQueueEntry.Status.PENDING)
+ started_entries = atomic_entries.exclude(
+ status__in=non_started_statuses)
+ return started_entries.count() > 0
+
+
+ def _pending_count(self):
+ """The number of HostQueueEntries for this job in the Pending state."""
+ pending_entries = models.HostQueueEntry.objects.filter(
+ job=self.id, status=models.HostQueueEntry.Status.PENDING)
+ return pending_entries.count()
+
+
def is_ready(self):
- pending_entries = models.HostQueueEntry.objects.filter(job=self.id,
- status='Pending')
- return (pending_entries.count() >= self.synch_count)
+ # NOTE: Atomic group jobs stop reporting ready after they have been
+ # started to avoid launching multiple copies of one atomic job.
+ # Only possible if synch_count is less than than half the number of
+ # machines in the atomic group.
+ return (self._pending_count() >= self.synch_count
+ and not self._atomic_and_has_started())
def num_machines(self, clause = None):
@@ -2748,7 +2890,16 @@
return self.run_verify
- def _get_pre_job_tasks(self, queue_entry):
+ def get_pre_job_tasks(self, queue_entry):
+ """
+ Get a list of tasks to perform before the host_queue_entry
+ may be used to run this Job (such as Cleanup & Verify).
+
+ @returns A list of tasks to be done to the given queue_entry before
+ it should be considered be ready to run this job. The last
+ task in the list calls HostQueueEntry.on_pending(), which
+ continues the flow of the job.
+ """
tasks = []
if self._should_run_cleanup(queue_entry):
tasks.append(CleanupTask(queue_entry=queue_entry))
@@ -2777,12 +2928,7 @@
used to run this Job, a string group name to suggest giving
to this job in the results database.
"""
- if include_queue_entry.atomic_group_id:
- atomic_group = AtomicGroup(include_queue_entry.atomic_group_id,
- always_query=False)
- else:
- atomic_group = None
-
+ atomic_group = include_queue_entry.atomic_group
chosen_entries = [include_queue_entry]
if atomic_group:
num_entries_wanted = atomic_group.max_number_of_machines
@@ -2822,11 +2968,70 @@
return chosen_entries, group_name
- def run(self, queue_entry):
+ def run_if_ready(self, queue_entry):
+ """
+ @returns An Agent instance to ultimately run this job if enough hosts
+ are ready for it to run.
+ @returns None and potentially cleans up excess hosts if this Job
+ is not ready to run.
+ """
if not self.is_ready():
- queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
- return Agent(self._get_pre_job_tasks(queue_entry))
+ self.stop_if_necessary()
+ return None
+ if queue_entry.atomic_group:
+ return self.run_with_ready_delay(queue_entry)
+
+ return self.run(queue_entry)
+
+
+ def run_with_ready_delay(self, queue_entry):
+ """
+ Start a delay to wait for more hosts to enter Pending state before
+ launching an atomic group job. Once set, the a delay cannot be reset.
+
+ @param queue_entry: The HostQueueEntry object to get atomic group
+ info from and pass to run_if_ready when the delay is up.
+
+ @returns An Agent to run the job as appropriate or None if a delay
+ has already been set.
+ """
+ assert queue_entry.job_id == self.id
+ assert queue_entry.atomic_group
+ delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
+ pending_threshold = queue_entry.atomic_group.max_number_of_machines
+ over_max_threshold = (self._pending_count() >= pending_threshold)
+ delay_expired = (self._delay_ready_task and
+ time.time() >= self._delay_ready_task.end_time)
+
+ # Delay is disabled or we already have enough? Do not wait to run.
+ if not delay or over_max_threshold or delay_expired:
+ return self.run(queue_entry)
+
+ # A delay was previously scheduled.
+ if self._delay_ready_task:
+ return None
+
+ def run_job_after_delay():
+ logging.info('Job %s done waiting for extra hosts.', self.id)
+ return self.run(queue_entry)
+
+ self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
+ callback=run_job_after_delay)
+
+ return Agent([self._delay_ready_task], num_processes=0)
+
+
+ def run(self, queue_entry):
+ """
+ @param queue_entry: The HostQueueEntry instance calling this method.
+ @returns An Agent instance to run this job or None if we've already
+ been run.
+ """
+ if queue_entry.atomic_group and self._atomic_and_has_started():
+ logging.error('Job.run() called on running atomic Job %d '
+ 'with HQE %s.', self.id, queue_entry)
+ return None
queue_entries, group_name = self._choose_group_to_run(queue_entry)
return self._finish_run(queue_entries, group_name)
@@ -2838,7 +3043,10 @@
queue_task = QueueTask(job=self, queue_entries=queue_entries,
cmd=params, group_name=group_name)
tasks = [queue_task]
- entry_ids = [entry.id for entry in queue_entries]
+ if self._delay_ready_task:
+ # Cancel any pending callback that would try to run again
+ # as we are already running.
+ self._delay_ready_task.abort()
return Agent(tasks, num_processes=len(queue_entries))