Have the scheduler wait a configurable amount of time before starting
atomic group jobs as soon as minimum synch count hosts are available
in Pending state up until AtomicGroup.max_number_of_hosts are available.

Adds a DelayedCallTask class to monitor_db along with logic in the Job
class to use this to delay the job becoming ready to run for a little
while as well as making sure the job is run at the end of the delay
without needing to wait for another host to change state.

Signed-off-by: Gregory Smith <gps@google.com>


git-svn-id: http://test.kernel.org/svn/autotest/trunk@3236 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py
index bee86e2..e7c6d1f 100755
--- a/scheduler/monitor_db.py
+++ b/scheduler/monitor_db.py
@@ -553,7 +553,7 @@
         assert queue_entry.atomic_group_id is not None
         job = queue_entry.job
         assert job.synch_count and job.synch_count > 0
-        atomic_group = AtomicGroup(id=queue_entry.atomic_group_id)
+        atomic_group = queue_entry.atomic_group
         if job.synch_count > atomic_group.max_number_of_machines:
             # Such a Job and HostQueueEntry should never be possible to
             # create using the frontend.  Regardless, we can't process it.
@@ -929,11 +929,8 @@
 
 
     def _run_queue_entry(self, queue_entry, host):
-        agent = queue_entry.run(assigned_host=host)
-        # in some cases (synchronous jobs with run_verify=False), agent may be
-        # None
-        if agent:
-            self.add_agent(agent)
+        agent = queue_entry.run_pre_job_tasks(assigned_host=host)
+        self.add_agent(agent)
 
 
     def _find_aborting(self):
@@ -1187,9 +1184,40 @@
 
 
 class Agent(object):
+    """
+    An agent for use by the Dispatcher class to perform a sequence of tasks.
+
+    The following methods are required on all task objects:
+        poll() - Called periodically to let the task check its status and
+                update its internal state.  If the task succeeded.
+        is_done() - Returns True if the task is finished.
+        abort() - Called when an abort has been requested.  The task must
+                set its aborted attribute to True if it actually aborted.
+
+    The following attributes are required on all task objects:
+        aborted - bool, True if this task was aborted.
+        failure_tasks - A sequence of tasks to be run using a new Agent
+                by the dispatcher should this task fail.
+        success - bool, True if this task succeeded.
+        queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
+        host_ids - A sequence of Host ids this task represents.
+
+    The following attribute is written to all task objects:
+        agent - A reference to the Agent instance that the task has been
+                added to.
+    """
+
+
     def __init__(self, tasks, num_processes=1):
+        """
+        @param tasks: A list of tasks as described in the class docstring.
+        @param num_processes: The number of subprocesses the Agent represents.
+                This is used by the Dispatcher for managing the load on the
+                system.  Defaults to 1.
+        """
         self.active_task = None
         self.queue = None
+        # This is filled in by Dispatcher.add_agent()
         self.dispatcher = None
         self.num_processes = num_processes
 
@@ -1264,6 +1292,65 @@
             self.active_task = None
 
 
+class DelayedCallTask(object):
+    """
+    A task object like AgentTask for an Agent to run that waits for the
+    specified amount of time to have elapsed before calling the supplied
+    callback once and finishing.  If the callback returns anything, it is
+    assumed to be a new Agent instance and will be added to the dispatcher.
+
+    @attribute end_time: The absolute posix time after which this task will
+            call its callback when it is polled and be finished.
+
+    Also has all attributes required by the Agent class.
+    """
+    def __init__(self, delay_seconds, callback, now_func=None):
+        """
+        @param delay_seconds: The delay in seconds from now that this task
+                will call the supplied callback and be done.
+        @param callback: A callable to be called by this task once after at
+                least delay_seconds time has elapsed.  It must return None
+                or a new Agent instance.
+        @param now_func: A time.time like function.  Default: time.time.
+                Used for testing.
+        """
+        assert delay_seconds > 0
+        assert callable(callback)
+        if not now_func:
+            now_func = time.time
+        self._now_func = now_func
+        self._callback = callback
+
+        self.end_time = self._now_func() + delay_seconds
+
+        # These attributes are required by Agent.
+        self.aborted = False
+        self.failure_tasks = ()
+        self.host_ids = ()
+        self.success = False
+        self.queue_entry_ids = ()
+        # This is filled in by Agent.add_task().
+        self.agent = None
+
+
+    def poll(self):
+        if self._callback and self._now_func() >= self.end_time:
+            new_agent = self._callback()
+            if new_agent:
+                self.agent.dispatcher.add_agent(new_agent)
+            self._callback = None
+            self.success = True
+
+
+    def is_done(self):
+        return not self._callback
+
+
+    def abort(self):
+        self.aborted = True
+        self._callback = None
+
+
 class AgentTask(object):
     def __init__(self, cmd, working_directory=None, failure_tasks=[],
                  pidfile_name=None, paired_with_pidfile=None):
@@ -2315,6 +2402,12 @@
         else:
             self.host = None
 
+        if self.atomic_group_id:
+            self.atomic_group = AtomicGroup(self.atomic_group_id,
+                                            always_query=False)
+        else:
+            self.atomic_group = None
+
         self.queue_log_path = os.path.join(self.job.tag(),
                                            'queue.log.' + str(self.id))
 
@@ -2474,8 +2567,8 @@
         email_manager.manager.send_email(self.job.email_list, subject, body)
 
 
-    def run(self, assigned_host=None):
-        if self.meta_host is not None or self.atomic_group_id is not None:
+    def run_pre_job_tasks(self, assigned_host=None):
+        if self.meta_host is not None or self.atomic_group:
             assert assigned_host
             # ensure results dir exists for the queue log
             self.set_host(assigned_host)
@@ -2484,7 +2577,17 @@
                      self.job.name, self.meta_host, self.atomic_group_id,
                      self.host.hostname, self.status)
 
-        return self.job.run(queue_entry=self)
+        return self._do_run_pre_job_tasks()
+
+
+    def _do_run_pre_job_tasks(self):
+        # Every host goes thru the Verifying stage (which may or may not
+        # actually do anything as determined by get_pre_job_tasks).
+        self.set_status(models.HostQueueEntry.Status.VERIFYING)
+
+        # The pre job tasks always end with a SetEntryPendingTask which
+        # will continue as appropriate through queue_entry.on_pending().
+        return Agent(self.job.get_pre_job_tasks(queue_entry=self))
 
 
     def requeue(self):
@@ -2543,10 +2646,7 @@
         """
         self.set_status('Pending')
         self.get_host().set_status('Pending')
-        if self.job.is_ready():
-            return self.job.run(self)
-        self.job.stop_if_necessary()
-        return None
+        return self.job.run_if_ready(queue_entry=self)
 
 
     def abort(self, dispatcher):
@@ -2580,6 +2680,20 @@
                'run_verify', 'email_list', 'reboot_before', 'reboot_after',
                'parse_failed_repair', 'max_runtime_hrs')
 
+    # This does not need to be a column in the DB.  The delays are likely to
+    # be configured short.  If the scheduler is stopped and restarted in
+    # the middle of a job's delay cycle, the delay cycle will either be
+    # repeated or skipped depending on the number of Pending machines found
+    # when the restarted scheduler recovers to track it.  Not a problem.
+    #
+    # A reference to the DelayedCallTask that will wake up the job should
+    # no other HQEs change state in time.  Its end_time attribute is used
+    # by our run_with_ready_delay() method to determine if the wait is over.
+    _delay_ready_task = None
+
+    # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
+    # all status='Pending' atomic group HQEs incase a delay was running when the
+    # scheduler was restarted and no more hosts ever successfully exit Verify.
 
     def __init__(self, id=None, row=None, **kwargs):
         assert id or row
@@ -2614,10 +2728,38 @@
                 queue_entry.set_status(status)
 
 
+    def _atomic_and_has_started(self):
+        """
+        @returns True if any of the HostQueueEntries associated with this job
+        have entered the Status.STARTING state or beyond.
+        """
+        atomic_entries = models.HostQueueEntry.objects.filter(
+                job=self.id, atomic_group__isnull=False)
+        if atomic_entries.count() <= 0:
+            return False
+
+        non_started_statuses = (models.HostQueueEntry.Status.QUEUED,
+                                models.HostQueueEntry.Status.VERIFYING,
+                                models.HostQueueEntry.Status.PENDING)
+        started_entries = atomic_entries.exclude(
+                status__in=non_started_statuses)
+        return started_entries.count() > 0
+
+
+    def _pending_count(self):
+        """The number of HostQueueEntries for this job in the Pending state."""
+        pending_entries = models.HostQueueEntry.objects.filter(
+                job=self.id, status=models.HostQueueEntry.Status.PENDING)
+        return pending_entries.count()
+
+
     def is_ready(self):
-        pending_entries = models.HostQueueEntry.objects.filter(job=self.id,
-                                                               status='Pending')
-        return (pending_entries.count() >= self.synch_count)
+        # NOTE: Atomic group jobs stop reporting ready after they have been
+        # started to avoid launching multiple copies of one atomic job.
+        # Only possible if synch_count is less than than half the number of
+        # machines in the atomic group.
+        return (self._pending_count() >= self.synch_count
+                and not self._atomic_and_has_started())
 
 
     def num_machines(self, clause = None):
@@ -2748,7 +2890,16 @@
         return self.run_verify
 
 
-    def _get_pre_job_tasks(self, queue_entry):
+    def get_pre_job_tasks(self, queue_entry):
+        """
+        Get a list of tasks to perform before the host_queue_entry
+        may be used to run this Job (such as Cleanup & Verify).
+
+        @returns A list of tasks to be done to the given queue_entry before
+                it should be considered be ready to run this job.  The last 
+                task in the list calls HostQueueEntry.on_pending(), which
+                continues the flow of the job.
+        """
         tasks = []
         if self._should_run_cleanup(queue_entry):
             tasks.append(CleanupTask(queue_entry=queue_entry))
@@ -2777,12 +2928,7 @@
                 used to run this Job, a string group name to suggest giving
                 to this job in the results database.
         """
-        if include_queue_entry.atomic_group_id:
-            atomic_group = AtomicGroup(include_queue_entry.atomic_group_id,
-                                       always_query=False)
-        else:
-            atomic_group = None
-
+        atomic_group = include_queue_entry.atomic_group
         chosen_entries = [include_queue_entry]
         if atomic_group:
             num_entries_wanted = atomic_group.max_number_of_machines
@@ -2822,11 +2968,70 @@
         return chosen_entries, group_name
 
 
-    def run(self, queue_entry):
+    def run_if_ready(self, queue_entry):
+        """
+        @returns An Agent instance to ultimately run this job if enough hosts
+                are ready for it to run.
+        @returns None and potentially cleans up excess hosts if this Job
+                is not ready to run.
+        """
         if not self.is_ready():
-            queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
-            return Agent(self._get_pre_job_tasks(queue_entry))
+            self.stop_if_necessary()
+            return None
 
+        if queue_entry.atomic_group:
+            return self.run_with_ready_delay(queue_entry)
+
+        return self.run(queue_entry)
+
+
+    def run_with_ready_delay(self, queue_entry):
+        """
+        Start a delay to wait for more hosts to enter Pending state before
+        launching an atomic group job.  Once set, the a delay cannot be reset.
+
+        @param queue_entry: The HostQueueEntry object to get atomic group
+                info from and pass to run_if_ready when the delay is up.
+
+        @returns An Agent to run the job as appropriate or None if a delay
+                has already been set.
+        """
+        assert queue_entry.job_id == self.id
+        assert queue_entry.atomic_group
+        delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
+        pending_threshold = queue_entry.atomic_group.max_number_of_machines
+        over_max_threshold = (self._pending_count() >= pending_threshold)
+        delay_expired = (self._delay_ready_task and
+                         time.time() >= self._delay_ready_task.end_time)
+
+        # Delay is disabled or we already have enough?  Do not wait to run.
+        if not delay or over_max_threshold or delay_expired:
+            return self.run(queue_entry)
+
+        # A delay was previously scheduled.
+        if self._delay_ready_task:
+            return None
+
+        def run_job_after_delay():
+            logging.info('Job %s done waiting for extra hosts.', self.id)
+            return self.run(queue_entry)
+
+        self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
+                                                 callback=run_job_after_delay)
+
+        return Agent([self._delay_ready_task], num_processes=0)
+
+
+    def run(self, queue_entry):
+        """
+        @param queue_entry: The HostQueueEntry instance calling this method.
+        @returns An Agent instance to run this job or None if we've already
+                been run.
+        """
+        if queue_entry.atomic_group and self._atomic_and_has_started():
+            logging.error('Job.run() called on running atomic Job %d '
+                          'with HQE %s.', self.id, queue_entry)
+            return None
         queue_entries, group_name = self._choose_group_to_run(queue_entry)
         return self._finish_run(queue_entries, group_name)
 
@@ -2838,7 +3043,10 @@
         queue_task = QueueTask(job=self, queue_entries=queue_entries,
                                cmd=params, group_name=group_name)
         tasks = [queue_task]
-        entry_ids = [entry.id for entry in queue_entries]
+        if self._delay_ready_task:
+            # Cancel any pending callback that would try to run again
+            # as we are already running.
+            self._delay_ready_task.abort()
 
         return Agent(tasks, num_processes=len(queue_entries))