Some improvements to process tracking in the scheduler.
* have all AgentTasks declare how many processes they'll create (as an instance attribute).  this is really where the information belongs.
* have Agent read its num_processes from its AgentTask, rather than requiring clients to pass it into the constructor.
* have AgentTasks pass this num_processes value into the DroneManager when executing commands, and have the DroneManager use this value rather than the hack of parsing it out of the command line.  this required various changed to the DroneManager code which actually fix some small bugs and make the code cleaner in my opinion.

Signed-off-by: Steve Howard <>

git-svn-id: 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/scheduler/ b/scheduler/
index f4cdcbb..858bb52 100644
--- a/scheduler/
+++ b/scheduler/
@@ -67,17 +67,22 @@
         Object to represent pidfile IDs that is opaque to the scheduler code but
         still debugging-friendly for us.
-        def __init__(self, debug_string):
+        def __init__(self, debug_string, num_processes):
             self._debug_string = debug_string
+            self._num_processes = num_processes
         def __str__(self):
             return self._debug_string
+        def __repr__(self):
+            return '<_DummyPidfileId: %s>' % str(self)
     def __init__(self):
         super(MockDroneManager, self).__init__()
-        self.max_runnable_processes_value = 100
+        self.process_capacity = 100
         # maps result_dir to set of tuples (file_path, file_contents)
         self._attached_files = {}
@@ -122,11 +127,15 @@
         return pidfile_id in self._killed_pidfiles
-    def running_pidfile_ids(self):
-        return [str(pidfile_id) for pidfile_id, pidfile_contents
+    def nonfinished_pidfile_ids(self):
+        return [pidfile_id for pidfile_id, pidfile_contents
                 in self._pidfiles.iteritems()
-                if pidfile_contents.process is not None
-                and pidfile_contents.exit_status is None]
+                if pidfile_contents.exit_status is None]
+    def running_pidfile_ids(self):
+        return [pidfile_id for pidfile_id in self.nonfinished_pidfile_ids()
+                if self._pidfiles[pidfile_id].process is not None]
     # DroneManager emulation APIs for use by monitor_db
@@ -136,11 +145,12 @@
     def total_running_processes(self):
-        return 0
+        return sum(pidfile_id._num_processes
+                   for pidfile_id in self.nonfinished_pidfile_ids())
     def max_runnable_processes(self, username):
-        return self.max_runnable_processes_value
+        return self.process_capacity - self.total_running_processes()
     def refresh(self):
@@ -190,10 +200,11 @@
     def execute_command(self, command, working_directory, pidfile_name,
-                        log_file=None, paired_with_pidfile=None,
+                        num_processes, log_file=None, paired_with_pidfile=None,
         pidfile_id = self._DummyPidfileId(
-                self._get_pidfile_debug_string(working_directory, pidfile_name))
+                self._get_pidfile_debug_string(working_directory, pidfile_name),
+                num_processes)
         self._pidfile_index[(working_directory, pidfile_name)] = pidfile_id
@@ -237,7 +248,8 @@
                         + self._get_pidfile_debug_string(execution_tag,
         return self._pidfile_index.get((execution_tag, pidfile_name),
-                                       self._DummyPidfileId(debug_string))
+                                       self._DummyPidfileId(debug_string,
+                                                            num_processes=0))
     def kill_process(self, process):
@@ -318,11 +330,13 @@
                      '%s/%s not executed' % (working_directory, pidfile_name))
-    def _check_statuses(self, queue_entry, queue_entry_status, host_status):
+    def _check_statuses(self, queue_entry, queue_entry_status,
+                        host_status=None):
         # update from DB
         queue_entry = models.HostQueueEntry.objects.get(
         self.assertEquals(queue_entry.status, queue_entry_status)
-        self.assertEquals(, host_status)
+        if host_status:
+            self.assertEquals(, host_status)
     def _check_host_status(self, host, status):
@@ -686,10 +700,10 @@
         job1, queue_entry1 = self._make_job_and_queue_entry()
         job2, queue_entry2 = self._make_job_and_queue_entry()
-        self.mock_drone_manager.max_runnable_processes_value = 0
+        self.mock_drone_manager.process_capacity = 0
         self._run_dispatcher() # schedule job1, but won't start verify
-        self.mock_drone_manager.max_runnable_processes_value = 100
+        self.mock_drone_manager.process_capacity = 100
         self._run_dispatcher() # cleanup must run here, not verify for job2
         self._check_statuses(queue_entry1, HqeStatus.ABORTED,
@@ -699,29 +713,6 @@
-    def _test_job_scheduled_just_after_abort_2(self):
-        # test a pretty obscure corner case where a job is aborted while queued,
-        # another job is ready to run, and throttling is active. the post-abort
-        # cleanup must not be pre-empted by the second job.
-        job1, _ = self._make_job_and_queue_entry()
-        job2 = self._create_job(hosts=[1,2])
-        job2.synch_count = 2
-        self.mock_drone_manager.max_runnable_processes_value = 0
-        self._run_dispatcher() # schedule job1, but won't start verify
-        job1.hostqueueentry_set.update(aborted=True)
-        self.mock_drone_manager.max_runnable_processes_value = 100
-        self._run_dispatcher()
-        self.mock_drone_manager.finish_process(_PidfileType.VERIFY)
-        self._run_dispatcher()
-        self.mock_drone_manager.finish_process(_PidfileType.CLEANUP)
-        self._run_dispatcher()
-        self.mock_drone_manager.finish_specific_process(
-                'hosts/host2/2-verify', monitor_db._AUTOSERV_PID_FILE)
-        self._run_dispatcher()
     def test_reverify_interrupting_pre_job(self):
         # ensure things behave sanely if a reverify is scheduled in the middle
         # of pre-job actions
@@ -806,5 +797,57 @@
         self._check_host_status(, HostStatus.READY)
+    def test_throttling(self):
+        job = self._create_job(hosts=[1,2,3])
+        job.synch_count = 3
+        queue_entries = list(job.hostqueueentry_set.all())
+        def _check_hqe_statuses(*statuses):
+            for queue_entry, status in zip(queue_entries, statuses):
+                self._check_statuses(queue_entry, status)
+        self.mock_drone_manager.process_capacity = 2
+        self._run_dispatcher() # verify runs on 1 and 2
+        _check_hqe_statuses(HqeStatus.VERIFYING, HqeStatus.VERIFYING,
+                            HqeStatus.VERIFYING)
+        self.assertEquals(len(self.mock_drone_manager.running_pidfile_ids()), 2)
+        self.mock_drone_manager.finish_specific_process(
+                'hosts/host1/1-verify', monitor_db._AUTOSERV_PID_FILE)
+        self.mock_drone_manager.finish_process(_PidfileType.VERIFY)
+        self._run_dispatcher() # verify runs on 3
+        _check_hqe_statuses(HqeStatus.PENDING, HqeStatus.PENDING,
+                            HqeStatus.VERIFYING)
+        self.mock_drone_manager.finish_process(_PidfileType.VERIFY)
+        self._run_dispatcher() # job won't run due to throttling
+        _check_hqe_statuses(HqeStatus.STARTING, HqeStatus.STARTING,
+                            HqeStatus.STARTING)
+        self._assert_nothing_is_running()
+        self.mock_drone_manager.process_capacity = 3
+        self._run_dispatcher() # now job runs
+        _check_hqe_statuses(HqeStatus.RUNNING, HqeStatus.RUNNING,
+                            HqeStatus.RUNNING)
+        self.mock_drone_manager.process_capacity = 2
+        self.mock_drone_manager.finish_process(_PidfileType.JOB,
+                                               exit_status=271)
+        self._run_dispatcher() # gathering won't run due to throttling
+        _check_hqe_statuses(HqeStatus.GATHERING, HqeStatus.GATHERING,
+                            HqeStatus.GATHERING)
+        self._assert_nothing_is_running()
+        self.mock_drone_manager.process_capacity = 3
+        self._run_dispatcher() # now gathering runs
+        self.mock_drone_manager.process_capacity = 0
+        self.mock_drone_manager.finish_process(_PidfileType.GATHER)
+        self._run_dispatcher() # parsing runs despite throttling
+        _check_hqe_statuses(HqeStatus.PARSING, HqeStatus.PARSING,
+                            HqeStatus.PARSING)
 if __name__ == '__main__':